diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 5dd3465..d0b8c33 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -146,8 +146,8 @@ pub fn start_interactive(command_tx: Sender<(String, Vec)>, resp_rx: Rec } }; - let info = &send_command("info".to_string(), vec![]); - let chain_name = json::parse(info).unwrap()["chain_name"].as_str().unwrap().to_string(); + let info = send_command("info".to_string(), vec![]); + let chain_name = json::parse(&info).unwrap()["chain_name"].as_str().unwrap().to_string(); loop { // Read the height first diff --git a/lib/src/grpcconnector.rs b/lib/src/grpcconnector.rs index 942b2e0..8e5f674 100644 --- a/lib/src/grpcconnector.rs +++ b/lib/src/grpcconnector.rs @@ -95,9 +95,9 @@ pub fn get_coinsupply(uri: http::Uri, no_cert: bool) -> Result(uri: &http::Uri, start_height: u64, end_height: u64, no_cert: bool, mut c: F) +async fn get_block_range(uri: &http::Uri, start_height: u64, end_height: u64, no_cert: bool, c: F) -> Result<(), Box> -where F : FnMut(&[u8], u64) { +where F : Fn(&[u8], u64) { let mut client = get_client(uri, no_cert).await?; let bs = BlockId{ height: start_height, hash: vec!()}; @@ -118,19 +118,28 @@ where F : FnMut(&[u8], u64) { Ok(()) } -pub fn fetch_blocks(uri: &http::Uri, start_height: u64, end_height: u64, no_cert: bool, c: F) - where F : FnMut(&[u8], u64) { +pub fn fetch_blocks(uri: &http::Uri, start_height: u64, end_height: u64, no_cert: bool, c: F) -> Result<(), String> + where F : Fn(&[u8], u64) { let mut rt = match tokio::runtime::Runtime::new() { Ok(r) => r, Err(e) => { - error!("Error fetching blocks {}", e.to_string()); + let es = format!("Error creating runtime {:?}", e); + error!("{}", es); eprintln!("{}", e); - return; + return Err(es); } }; - rt.block_on(get_block_range(uri, start_height, end_height, no_cert, c)).unwrap(); + match rt.block_on(get_block_range(uri, start_height, end_height, no_cert, c)) { + Ok(o) => Ok(o), + Err(e) => { + let e = format!("Error fetching blocks {:?}", e); + error!("{}", e); + eprintln!("{}", e); + Err(e) + } + } } @@ -157,19 +166,28 @@ async fn get_address_txids(uri: &http::Uri, add pub fn fetch_transparent_txids(uri: &http::Uri, address: String, - start_height: u64, end_height: u64, no_cert: bool, c: F) + start_height: u64, end_height: u64, no_cert: bool, c: F) -> Result<(), String> where F : Fn(&[u8], u64) { let mut rt = match tokio::runtime::Runtime::new() { Ok(r) => r, Err(e) => { - error!("Error creating runtime {}", e.to_string()); + let e = format!("Error creating runtime {:?}", e); + error!("{}", e); eprintln!("{}", e); - return; + return Err(e); } }; - rt.block_on(get_address_txids(uri, address, start_height, end_height, no_cert, c)).unwrap(); + match rt.block_on(get_address_txids(uri, address.clone(), start_height, end_height, no_cert, c)) { + Ok(o) => Ok(o), + Err(e) => { + let e = format!("Error with get_address_txids runtime {:?}", e); + error!("{}", e); + eprintln!("{}", e); + Err(e) + } + } } diff --git a/lib/src/lightclient.rs b/lib/src/lightclient.rs index 644c0c5..8177b0a 100644 --- a/lib/src/lightclient.rs +++ b/lib/src/lightclient.rs @@ -960,6 +960,25 @@ impl LightClient { } pub fn do_sync(&self, print_updates: bool) -> Result { + + let mut retry_count = 0; + loop { + match self.do_sync_internal(print_updates, retry_count) { + Ok(j) => return Ok(j), + Err(e) => { + retry_count += 1; + if retry_count > 5 { + return Err(e); + } + // Sleep exponentially backing off + std::thread::sleep(std::time::Duration::from_secs((2 as u64).pow(retry_count))); + println!("Sync error {}\nRetry count {}", e, retry_count); + } + } + } + } + + fn do_sync_internal(&self, print_updates: bool, retry_count: u32) -> Result { // We can only do one sync at a time because we sync blocks in serial order // If we allow multiple syncs, they'll all get jumbled up. let _lock = self.sync_lock.lock().unwrap(); @@ -990,7 +1009,8 @@ impl LightClient { info!("Latest block is {}", latest_block); // Get the end height to scan to. - let mut end_height = std::cmp::min(last_scanned_height + 1000, latest_block); + let scan_batch_size = 1000; + let mut end_height = std::cmp::min(last_scanned_height + scan_batch_size, latest_block); // If there's nothing to scan, just return if last_scanned_height == latest_block { @@ -1016,7 +1036,9 @@ impl LightClient { let all_new_txs = Arc::new(RwLock::new(vec![])); // Fetch CompactBlocks in increments + let mut pass = 0; loop { + pass +=1 ; // Collect all block times, because we'll need to update transparent tx // datetime via the block height timestamp let block_times = Arc::new(RwLock::new(HashMap::new())); @@ -1078,7 +1100,7 @@ impl LightClient { }; local_bytes_downloaded.fetch_add(encoded_block.len(), Ordering::SeqCst); - }); + })?; // Check if there was any invalid block, which means we might have to do a reorg let invalid_height = last_invalid_height.load(Ordering::SeqCst); @@ -1118,15 +1140,23 @@ impl LightClient { let wallet = self.wallet.clone(); let block_times_inner = block_times.clone(); - fetch_transparent_txids(&self.get_server_uri(), address, start_height, end_height, self.config.no_cert_verification, - move |tx_bytes: &[u8], height: u64| { + // If this is the first pass after a retry, fetch older t address txids too, becuse + // they might have been missed last time. + let transparent_start_height = if pass == 1 && retry_count > 0 { + start_height - scan_batch_size + } else { + start_height + }; + + fetch_transparent_txids(&self.get_server_uri(), address, transparent_start_height, end_height, self.config.no_cert_verification, + move |tx_bytes: &[u8], height: u64| { let tx = Transaction::read(tx_bytes).unwrap(); // Scan this Tx for transparent inputs and outputs let datetime = block_times_inner.read().unwrap().get(&height).map(|v| *v).unwrap_or(0); wallet.read().unwrap().scan_full_tx(&tx, height as i32, datetime as u64); } - ); + )?; } }