Compare commits

...

4 Commits

  1. 95
      lib/src/grpcconnector.rs
  2. 39
      lib/src/lightwallet.rs

95
lib/src/grpcconnector.rs

@ -1,7 +1,7 @@
// Copyright The Hush Developers 2019-2022
// Released under the GPLv3
use log::{info,error};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use zcash_primitives::transaction::{TxId};
use crate::grpc_client::{ChainSpec, BlockId, BlockRange, RawTransaction, CompactBlock,
@ -33,6 +33,10 @@ mod danger {
}
}
pub struct State {
last_processed_height: u64,
}
async fn get_client(uri: &http::Uri, no_cert: bool) -> Result<CompactTxStreamerClient<Channel>, Box<dyn std::error::Error>> {
let channel = if uri.scheme_str() == Some("http") {
//println!("http");
@ -78,11 +82,30 @@ async fn get_lightd_info(uri: &http::Uri, no_cert: bool) -> Result<LightdInfo, B
}
pub fn get_info(uri: &http::Uri, no_cert: bool) -> Result<LightdInfo, String> {
let mut rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
let mut rt = match tokio::runtime::Runtime::new() {
Ok(r) => r,
Err(e) => return Err(format!("Error while creating Tokio Runtime: {}", e)),
};
rt.block_on(get_lightd_info(uri, no_cert)).map_err( |e| e.to_string())
}
let mut attempts = 0;
let max_attempts = 10;
while attempts < max_attempts {
match rt.block_on(get_lightd_info(uri, no_cert)) {
Ok(result) => return Ok(result),
Err(e) => {
attempts += 1;
eprintln!("Try {} of {}: Error for get_info: {}", attempts, max_attempts, e);
if attempts >= max_attempts {
return Err(format!("Reached max retries for get_info: {}", e));
}
std::thread::sleep(std::time::Duration::from_secs(2 * attempts as u64));
}
}
}
Err("Unknown error while get_lightd_info ".to_string())
}
async fn get_coinsupply_info(uri: &http::Uri, no_cert: bool) -> Result<Coinsupply, Box<dyn std::error::Error>> {
let mut client = get_client(uri, no_cert).await?;
@ -167,12 +190,12 @@ pub fn fetch_blocks<F : 'static + std::marker::Send>(uri: &http::Uri, start_heig
}
}
// get_address_txids GRPC call
async fn get_address_txids<F : 'static + std::marker::Send>(uri: &http::Uri, address: String,
start_height: u64, end_height: u64, no_cert: bool, c: F) -> Result<(), Box<dyn std::error::Error>>
where F : Fn(&[u8], u64) {
async fn get_address_txids<F>(uri: &http::Uri, address: String,
start_height: u64, end_height: u64, no_cert: bool, c: F) -> Result<(), Box<dyn std::error::Error>>
where
F: Fn(&[u8], u64) + Send + 'static
{
let mut client = get_client(uri, no_cert).await?;
let start = Some(BlockId{ height: start_height, hash: vec!()});
let end = Some(BlockId{ height: end_height, hash: vec!()});
@ -183,6 +206,7 @@ async fn get_address_txids<F : 'static + std::marker::Send>(uri: &http::Uri, add
let mut response = maybe_response.into_inner();
while let Some(tx) = response.message().await? {
c(&tx.data, tx.height);
}
@ -223,31 +247,46 @@ where
Ok(())
}
pub fn fetch_transparent_txids<F : 'static + std::marker::Send>(uri: &http::Uri, address: String,
start_height: u64, end_height: u64, no_cert: bool, c: F) -> Result<(), String>
where F : Fn(&[u8], u64) {
pub fn fetch_transparent_txids<F>(uri: &http::Uri, address: String,
start_height: u64, end_height: u64, no_cert: bool, c: F) -> Result<(), String>
where
F: Fn(&[u8], u64) + Clone + Send + 'static
{
let mut rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
let state = Arc::new(Mutex::new(State { last_processed_height: start_height }));
let mut rt = match tokio::runtime::Runtime::new() {
Ok(r) => r,
Err(e) => {
let e = format!("Error creating runtime {:?}", e);
error!("{}", e);
eprintln!("{}", e);
return Err(e);
let mut attempts = 0;
let max_attempts = 10;
loop {
let state_clone = state.clone();
let c_clone = c.clone();
{
let current_state = state_clone.lock().unwrap();
if current_state.last_processed_height > end_height {
break;
}
}
};
match rt.block_on(get_address_txids(uri, address.clone(), start_height, end_height, no_cert, c)) {
Ok(o) => Ok(o),
Err(e) => {
let e = format!("Error with get_address_txids runtime {:?}", e);
error!("{}", e);
eprintln!("{}", e);
Err(e)
match rt.block_on(get_address_txids(uri, address.clone(), start_height, end_height, no_cert, move |data, height| {
c_clone(data, height);
let mut state = state_clone.lock().unwrap();
state.last_processed_height = height;
})) {
Ok(_) => break,
Err(_) => {
attempts += 1;
if attempts >= max_attempts {
return Err("Reached maximum number of attempts".to_string());
}
std::thread::sleep(std::time::Duration::from_secs(2 * attempts as u64));
}
}
}
}
Ok(())
}
// get_transaction GRPC call
async fn get_transaction(uri: &http::Uri, txid: TxId, no_cert: bool)

39
lib/src/lightwallet.rs

@ -1469,22 +1469,29 @@ pub fn scan_full_tx(&self, tx: &Transaction, height: i32, datetime: u64) {
// Update the WalletTx
// Do it in a short scope because of the write lock.
{
info!("A sapling output was sent in {}", tx.txid());
let mut txs = self.txs.write().unwrap();
if txs.get(&tx.txid()).unwrap().outgoing_metadata.iter()
.find(|om| om.address == address && om.value == note.value && om.memo == memo)
.is_some() {
warn!("Duplicate outgoing metadata");
continue;
}
// Write the outgoing metadata
txs.get_mut(&tx.txid()).unwrap()
.outgoing_metadata
.push(OutgoingTxMetadata{
address, value: note.value, memo,
});
}
match self.txs.write() {
Ok(txs) => {
match txs.get(&tx.txid()) {
Some(wtx) => {
if wtx.outgoing_metadata.iter()
.any(|om| om.address == address && om.value == note.value && om.memo == memo)
{
warn!("Duplicate outgoing metadata");
continue;
}
},
None => {
//in some cases (e.g. when restoring a wallet) not all addresses are found, as only a number of addresses are restored.
// So not all txids will be visible at this point. Generate more zaddr will fix this. No need to poision our lock
info!("No entry with txid : {}", tx.txid());
}
}
},
Err(poisoned) => {
warn!("Lock is poisoned: {}", poisoned);
}
}
}
},
None => {}
};

Loading…
Cancel
Save