From c38febd60ef9de207206e0f32292b62b9ee28d97 Mon Sep 17 00:00:00 2001 From: Aditya Kulkarni Date: Fri, 27 Sep 2019 17:34:11 -0700 Subject: [PATCH] Add TLS support --- Cargo.toml | 9 +- src/lightclient.rs | 459 ++++++++++++++++++--------------------------- 2 files changed, 187 insertions(+), 281 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f9404fe..4e76748 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ http = "0.1" prost = "0.5" tokio = "0.1" tower-request-modifier = { git = "https://github.com/tower-rs/tower-http" } -tower-hyper = "0.1" hyper = "0.12" tower-util = "0.1" hex = "0.3" @@ -32,6 +31,14 @@ sha2 = "0.8.0" ripemd160 = "0.8.0" ring = "0.14.0" lazy_static = "1.2.0" +tower-service = "0.2" +tokio-rustls = "0.10.0-alpha.3" +webpki = "0.19.1" +webpki-roots = "0.16.0" +tower-h2 = { git = "https://github.com/tower-rs/tower-h2" } +openssl = "*" +openssl-probe = "*" + [dependencies.bellman] git = "https://github.com/adityapk00/librustzcash.git" diff --git a/src/lightclient.rs b/src/lightclient.rs index 2bf96ca..0714e40 100644 --- a/src/lightclient.rs +++ b/src/lightclient.rs @@ -2,40 +2,148 @@ use crate::lightwallet::LightWallet; use log::{info, warn, error}; +use std::sync::{Arc}; +use std::net::ToSocketAddrs; use std::path::Path; use std::fs::File; use std::io; use std::io::prelude::*; use std::io::{BufReader, BufWriter, Error, ErrorKind}; -use std::sync::Arc; use std::sync::atomic::{AtomicU64, AtomicI32, AtomicUsize, Ordering}; use json::{object, JsonValue}; +use futures::{Future}; +use futures::stream::Stream; + +use tower_util::MakeService; +use tower_grpc::Request; + +use tokio_rustls::client::TlsStream; +use tokio_rustls::{rustls::ClientConfig, TlsConnector}; +use std::net::SocketAddr; + +use tokio::executor::DefaultExecutor; +use tokio::net::tcp::TcpStream; +use tower_h2; + use zcash_primitives::transaction::{TxId, Transaction}; use zcash_client_backend::{ constants::testnet, constants::mainnet, constants::regtest, encoding::encode_payment_address, }; -use futures::Future; -use hyper::client::connect::{Destination, HttpConnector}; -use tower_grpc::Request; -use tower_hyper::{client, util}; -use tower_util::MakeService; -use futures::stream::Stream; use crate::grpc_client::{ChainSpec, BlockId, BlockRange, RawTransaction, TransparentAddressBlockFilter, TxFilter, Empty, LightdInfo}; use crate::grpc_client::client::CompactTxStreamer; // Used below to return the grpc "Client" type to calling methods -type Client = crate::grpc_client::client::CompactTxStreamer, tower_grpc::BoxBody>>; pub const DEFAULT_SERVER: &str = "http://3.15.168.203:9067"; pub const WALLET_NAME: &str = "zeclite.wallet.dat"; pub const LOGFILE_NAME: &str = "zeclite.debug.log"; + + +struct Dst(SocketAddr); + +impl tower_service::Service<()> for Dst { + type Response = TlsStream; + type Error = ::std::io::Error; + type Future = Box, Error = ::std::io::Error> + Send>; + + fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { + Ok(().into()) + } + + fn call(&mut self, _: ()) -> Self::Future { + let mut config = ClientConfig::new(); + + config.alpn_protocols.push(b"h2".to_vec()); + config.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + let config = Arc::new(config); + let tls_connector = TlsConnector::from(config); + + let addr_string_local = "lightd-test.zecwallet.co"; + + let domain = webpki::DNSNameRef::try_from_ascii_str(addr_string_local).unwrap(); + let domain_local = domain.to_owned(); + + let stream = TcpStream::connect(&self.0).and_then(move |sock| { + sock.set_nodelay(true).unwrap(); + tls_connector.connect(domain_local.as_ref(), sock) + }) + .map(move |tcp| tcp); + + Box::new(stream) + } +} + +// Same implementation but without TLS. Should make it straightforward to run without TLS +// when testing on local machine +// +// impl tower_service::Service<()> for Dst { +// type Response = TcpStream; +// type Error = ::std::io::Error; +// type Future = Box + Send>; +// +// fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { +// Ok(().into()) +// } +// +// fn call(&mut self, _: ()) -> Self::Future { +// let mut config = ClientConfig::new(); +// config.alpn_protocols.push(b"h2".to_vec()); +// config.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); +// +// let addr_string_local = "mydomain.com".to_string(); +// let addr = addr_string_local.as_str(); +// +// let stream = TcpStream::connect(&self.0) +// .and_then(move |sock| { +// sock.set_nodelay(true).unwrap(); +// Ok(sock) +// }); +// Box::new(stream) +// } +// } + + +macro_rules! make_grpc_client { + () => {{ + let uri: http::Uri = "https://lightd-test.zecwallet.co".parse().unwrap(); + + //let addr = "3.15.168.203:9067" + let addr = "lightd-test.zecwallet.co:443" + .to_socket_addrs() + .unwrap() + .next() + .unwrap(); + + let h2_settings = Default::default(); + let mut make_client = tower_h2::client::Connect::new(Dst {0: addr}, h2_settings, DefaultExecutor::current()); + + make_client + .make_service(()) + .map_err(|e| { + eprintln!("HTTP/2 connection failed; err={:?}", e); + }) + .and_then(move |conn| { + let conn = tower_request_modifier::Builder::new() + .set_origin(uri) + .build(conn) + .unwrap(); + + CompactTxStreamer::new(conn) + // Wait until the client is ready... + .ready() + .map_err(|e| eprintln!("client closed: {:?}", e)) + }) + }}; +} + + #[derive(Clone, Debug)] pub struct LightClientConfig { pub server : String, @@ -345,20 +453,23 @@ impl LightClient { let info = Arc::new(RefCell::::default()); let info_inner = info.clone(); - let say_hello = LightClient::make_grpc_client(uri).unwrap() + let runner = make_grpc_client!() .and_then(move |mut client| { client.get_lightd_info(Request::new(Empty{})) - }) - .and_then(move |response| { - info_inner.replace(response.into_inner()); + .map_err(|e| { + println!("ERR = {:?}", e); + }) + .and_then(move |response| { + info_inner.replace(response.into_inner()); - Ok(()) - }) - .map_err(|e| { - println!("ERR = {:?}", e); + Ok(()) + }) + .map_err(|e| { + println!("ERR = {:?}", e); + }) }); - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(say_hello).unwrap(); + tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner).unwrap(); let ans = info.borrow().clone(); ans @@ -736,31 +847,14 @@ impl LightClient { } } - pub fn fetch_blocks(&self, start_height: u64, end_height: u64, c: F) - where F : Fn(&[u8]) { - // Fetch blocks - let uri: http::Uri = self.get_server_uri(); + // ============== + // GRPC code + // ============== - let dst = Destination::try_from_uri(uri.clone()).unwrap(); - let connector = util::Connector::new(HttpConnector::new(4)); - let settings = client::Builder::new().http2_only(true).clone(); - let mut make_client = client::Connect::with_builder(connector, settings); - - let say_hello = make_client - .make_service(dst) - .map_err(|e| panic!("connect error: {:?}", e)) - .and_then(move |conn| { - - let conn = tower_request_modifier::Builder::new() - .set_origin(uri) - .build(conn) - .unwrap(); - // Wait until the client is ready... - CompactTxStreamer::new(conn) - .ready() - .map_err(|e| eprintln!("streaming error {:?}", e)) - }) + pub fn fetch_blocks(&self, start_height: u64, end_height: u64, c: F) + where F : Fn(&[u8]) { + let runner = make_grpc_client!() .and_then(move |mut client| { let bs = BlockId{ height: start_height, hash: vec!()}; let be = BlockId{ height: end_height, hash: vec!()}; @@ -786,34 +880,13 @@ impl LightClient { }) }); - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(say_hello).unwrap(); + tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner).unwrap(); } pub fn fetch_transparent_txids(&self, address: String, start_height: u64, end_height: u64,c: F) where F : Fn(&[u8], u64) { - let uri: http::Uri = self.get_server_uri(); - - let dst = Destination::try_from_uri(uri.clone()).unwrap(); - let connector = util::Connector::new(HttpConnector::new(4)); - let settings = client::Builder::new().http2_only(true).clone(); - let mut make_client = client::Connect::with_builder(connector, settings); - - let say_hello = make_client - .make_service(dst) - .map_err(|e| panic!("connect error: {:?}", e)) - .and_then(move |conn| { - - let conn = tower_request_modifier::Builder::new() - .set_origin(uri) - .build(conn) - .unwrap(); - - // Wait until the client is ready... - CompactTxStreamer::new(conn) - .ready() - .map_err(|e| eprintln!("streaming error {:?}", e)) - }) + let runner = make_grpc_client!() .and_then(move |mut client| { let start = Some(BlockId{ height: start_height, hash: vec!()}); let end = Some(BlockId{ height: end_height, hash: vec!()}); @@ -837,57 +910,59 @@ impl LightClient { }) }); - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(say_hello).unwrap(); + tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner).unwrap(); } pub fn fetch_full_tx(&self, txid: TxId, c: F) where F : Fn(&[u8]) { - let uri: http::Uri = self.get_server_uri(); - - let say_hello = LightClient::make_grpc_client(uri).unwrap() + let runner = make_grpc_client!() .and_then(move |mut client| { let txfilter = TxFilter { block: None, index: 0, hash: txid.0.to_vec() }; client.get_transaction(Request::new(txfilter)) - }) - .and_then(move |response| { - //let tx = Transaction::read(&response.into_inner().data[..]).unwrap(); - c(&response.into_inner().data); + .map_err(|e| { + eprintln!("RouteChat request failed; err={:?}", e); + }) + .and_then(move |response| { + //let tx = Transaction::read(&response.into_inner().data[..]).unwrap(); + c(&response.into_inner().data); - Ok(()) - }) - .map_err(|e| { - println!("ERR = {:?}", e); + Ok(()) + }) + .map_err(|e| { + println!("ERR = {:?}", e); + }) }); - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(say_hello).unwrap(); + tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner).unwrap(); } pub fn broadcast_raw_tx(&self, tx_bytes: Box<[u8]>) -> String { use std::cell::RefCell; - let uri: http::Uri = self.get_server_uri(); - let infostr = Arc::new(RefCell::::default()); let infostrinner = infostr.clone(); - let say_hello = LightClient::make_grpc_client(uri).unwrap() + let runner = make_grpc_client!() .and_then(move |mut client| { client.send_transaction(Request::new(RawTransaction {data: tx_bytes.to_vec(), height: 0})) - }) - .and_then(move |response| { - let sendresponse = response.into_inner(); - if sendresponse.error_code == 0 { - infostrinner.replace(format!("Successfully broadcast Tx: {}", sendresponse.error_message)); - } else { - infostrinner.replace(format!("Error: {:?}", sendresponse)); - } - Ok(()) - }) - .map_err(|e| { - println!("ERR = {:?}", e); + .map_err(|e| { + println!("ERR = {:?}", e); + }) + .and_then(move |response| { + let sendresponse = response.into_inner(); + if sendresponse.error_code == 0 { + infostrinner.replace(format!("Successfully broadcast Tx: {}", sendresponse.error_message)); + } else { + infostrinner.replace(format!("Error: {:?}", sendresponse)); + } + Ok(()) + }) + .map_err(|e| { + println!("ERR = {:?}", e); + }) }); - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(say_hello).unwrap(); + tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner).unwrap(); let ans = infostr.borrow().clone(); ans @@ -895,198 +970,22 @@ impl LightClient { pub fn fetch_latest_block(&self, mut c : F) where F : FnMut(BlockId) { - let uri: http::Uri = self.get_server_uri(); - - let say_hello = LightClient::make_grpc_client(uri).unwrap() + let runner = make_grpc_client!() .and_then(|mut client| { client.get_latest_block(Request::new(ChainSpec {})) - }) - .and_then(move |response| { - c(response.into_inner()); - Ok(()) - }) - .map_err(|e| { - println!("ERR = {:?}", e); + .map_err(|e| { + println!("ERR = {:?}", e); + }) + .and_then(move |response| { + c(response.into_inner()); + Ok(()) + }) + .map_err(|e| { + println!("ERR = {:?}", e); + }) }); - tokio::runtime::current_thread::Runtime::new().unwrap().block_on(say_hello).unwrap(); + tokio::runtime::current_thread::Runtime::new().unwrap().block_on(runner).unwrap(); } - fn make_grpc_client(uri: http::Uri) -> Result + Send>, Box> { - let dst = Destination::try_from_uri(uri.clone())?; - let connector = util::Connector::new(HttpConnector::new(4)); - let settings = client::Builder::new().http2_only(true).clone(); - let mut make_client = client::Connect::with_builder(connector, settings); - - let say_hello = make_client - .make_service(dst) - .map_err(|e| panic!("connect error: {:?}", e)) - .and_then(move |conn| { - - let conn = tower_request_modifier::Builder::new() - .set_origin(uri) - .build(conn) - .unwrap(); - - // Wait until the client is ready... - CompactTxStreamer::new(conn).ready() - }); - Ok(Box::new(say_hello)) - } -} - - - - - -/* - TLS Example https://gist.github.com/kiratp/dfcbcf0aa713a277d5d53b06d9db9308 - -// [dependencies] -// futures = "0.1.27" -// http = "0.1.17" -// tokio = "0.1.21" -// tower-request-modifier = { git = "https://github.com/tower-rs/tower-http" } -// tower-grpc = { version = "0.1.0", features = ["tower-hyper"] } -// tower-service = "0.2" -// tower-util = "0.1" -// tokio-rustls = "0.10.0-alpha.3" -// webpki = "0.19.1" -// webpki-roots = "0.16.0" -// tower-h2 = { git = "https://github.com/tower-rs/tower-h2" } -// openssl = "*" -// openssl-probe = "*" - -use std::thread; -use std::sync::{Arc}; -use futures::{future, Future}; -use tower_util::MakeService; - -use tokio_rustls::client::TlsStream; -use tokio_rustls::{rustls::ClientConfig, TlsConnector}; -use std::net::SocketAddr; - -use tokio::executor::DefaultExecutor; -use tokio::net::tcp::TcpStream; -use tower_h2; - -use std::net::ToSocketAddrs; - - - -struct Dst(SocketAddr); - - -impl tower_service::Service<()> for Dst { - type Response = TlsStream; - type Error = ::std::io::Error; - type Future = Box, Error = ::std::io::Error> + Send>; - - fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { - Ok(().into()) - } - - fn call(&mut self, _: ()) -> Self::Future { - println!("{:?}", self.0); - let mut config = ClientConfig::new(); - - config.alpn_protocols.push(b"h2".to_vec()); - config.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - let config = Arc::new(config); - let tls_connector = TlsConnector::from(config); - - let addr_string_local = "mydomain.com"; - - let domain = webpki::DNSNameRef::try_from_ascii_str(addr_string_local).unwrap(); - let domain_local = domain.to_owned(); - - let stream = TcpStream::connect(&self.0).and_then(move |sock| { - sock.set_nodelay(true).unwrap(); - tls_connector.connect(domain_local.as_ref(), sock) - }) - .map(move |tcp| tcp); - - Box::new(stream) - } } - -// Same implementation but without TLS. Should make it straightforward to run without TLS -// when testing on local machine - -// impl tower_service::Service<()> for Dst { -// type Response = TcpStream; -// type Error = ::std::io::Error; -// type Future = Box + Send>; - -// fn poll_ready(&mut self) -> futures::Poll<(), Self::Error> { -// Ok(().into()) -// } - -// fn call(&mut self, _: ()) -> Self::Future { -// let mut config = ClientConfig::new(); -// config.alpn_protocols.push(b"h2".to_vec()); -// config.root_store.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - -// let addr_string_local = "mydomain.com".to_string(); -// let addr = addr_string_local.as_str(); - -// let stream = TcpStream::connect(&self.0) -// .and_then(move |sock| { -// sock.set_nodelay(true).unwrap(); -// Ok(sock) -// }); -// Box::new(stream) -// } -// } - - -fn connect() { - let keepalive = future::loop_fn((), move |_| { - let uri: http::Uri = "https://mydomain.com".parse().unwrap(); - println!("Connecting to network at: {:?}", uri); - - let addr = "https://mydomain.com:443" - .to_socket_addrs() - .unwrap() - .next() - .unwrap(); - - let h2_settings = Default::default(); - let mut make_client = tower_h2::client::Connect::new(Dst {0: addr}, h2_settings, DefaultExecutor::current()); - - make_client - .make_service(()) - .map_err(|e| { - eprintln!("HTTP/2 connection failed; err={:?}", e); - }) - .and_then(move |conn| { - let conn = tower_request_modifier::Builder::new() - .set_origin(uri) - .build(conn) - .unwrap(); - - MyGrpcService::new(conn) - // Wait until the client is ready... - .ready() - .map_err(|e| eprintln!("client closed: {:?}", e)) - }) - .and_then(move |mut client| { - // do stuff - }) - .then(|e| { - eprintln!("Reopening client connection to network: {:?}", e); - let retry_sleep = std::time::Duration::from_secs(1); - - thread::sleep(retry_sleep); - Ok(future::Loop::Continue(())) - }) - }); - - thread::spawn(move || tokio::run(keepalive)); -} - -pub fn main() { - connect(); -} - - */ \ No newline at end of file