Browse Source

mempool integration

mempool-new
Deniod 6 months ago
parent
commit
eff5dd7b6d
  1. 2058
      Cargo.lock
  2. 9
      cli/src/lib.rs
  3. 2
      cli/src/version.rs
  4. 8
      lib/proto/service.proto
  5. 21
      lib/src/commands.rs
  6. 35
      lib/src/grpcconnector.rs
  7. 73
      lib/src/lightclient.rs
  8. 631
      lib/src/lightwallet.rs
  9. 50
      lib/src/lightwallet/data.rs

2058
Cargo.lock

File diff suppressed because it is too large

9
cli/src/lib.rs

@ -201,13 +201,20 @@ pub fn start_interactive(command_tx: Sender<(String, Vec<String>)>, resp_rx: Rec
} }
} }
pub fn command_loop(lightclient: Arc<LightClient>) -> (Sender<(String, Vec<String>)>, Receiver<String>) { pub fn command_loop(lightclient: Arc<LightClient>) -> (Sender<(String, Vec<String>)>, Receiver<String>) {
let (command_tx, command_rx) = channel::<(String, Vec<String>)>(); let (command_tx, command_rx) = channel::<(String, Vec<String>)>();
let (resp_tx, resp_rx) = channel::<String>(); let (resp_tx, resp_rx) = channel::<String>();
let lc = lightclient.clone(); let lc = lightclient.clone();
std::thread::spawn(move || { std::thread::spawn(move || {
//start mempool_monitor
match LightClient::start_mempool_monitor(lc.clone()) {
Ok(_) => {},
Err(e) => {
error!("Error starting mempool: {:?}", e);
}
}
loop { loop {
match command_rx.recv_timeout(std::time::Duration::from_secs(5 * 60)) { match command_rx.recv_timeout(std::time::Duration::from_secs(5 * 60)) {
Ok((cmd, args)) => { Ok((cmd, args)) => {

2
cli/src/version.rs

@ -1 +1 @@
pub const VERSION:&str = "1.1.1"; pub const VERSION:&str = "1.1.2";

8
lib/proto/service.proto

@ -75,6 +75,10 @@ message TransparentAddressBlockFilter {
BlockRange range = 2; BlockRange range = 2;
} }
message Exclude {
repeated bytes txid = 1;
}
service CompactTxStreamer { service CompactTxStreamer {
// Compact Blocks // Compact Blocks
rpc GetLatestBlock(ChainSpec) returns (BlockID) {} rpc GetLatestBlock(ChainSpec) returns (BlockID) {}
@ -91,4 +95,8 @@ service CompactTxStreamer {
// Misc // Misc
rpc GetLightdInfo(Empty) returns (LightdInfo) {} rpc GetLightdInfo(Empty) returns (LightdInfo) {}
rpc GetCoinsupply(Empty) returns (Coinsupply) {} rpc GetCoinsupply(Empty) returns (Coinsupply) {}
//Mempool
rpc GetMempoolTx(Exclude) returns (stream CompactTx) {}
rpc GetMempoolStream(Empty) returns (stream RawTransaction) {}
} }

21
lib/src/commands.rs

@ -32,10 +32,11 @@ impl Command for SyncCommand {
fn exec(&self, _args: &[&str], lightclient: &LightClient) -> String { fn exec(&self, _args: &[&str], lightclient: &LightClient) -> String {
match lightclient.do_sync(true) { match lightclient.do_sync(true) {
Ok(j) => j.pretty(2), Ok(j) => j.pretty(2),
Err(e) => e Err(e) => e.to_string()
} }
} }
} }
struct EncryptionStatusCommand {} struct EncryptionStatusCommand {}
@ -112,7 +113,6 @@ impl Command for RescanCommand {
} }
} }
struct ClearCommand {} struct ClearCommand {}
impl Command for ClearCommand { impl Command for ClearCommand {
fn help(&self) -> String { fn help(&self) -> String {
@ -250,7 +250,6 @@ impl Command for BalanceCommand {
} }
} }
struct AddressCommand {} struct AddressCommand {}
impl Command for AddressCommand { impl Command for AddressCommand {
fn help(&self) -> String { fn help(&self) -> String {
@ -385,7 +384,6 @@ impl Command for DecryptCommand {
} }
} }
struct UnlockCommand {} struct UnlockCommand {}
impl Command for UnlockCommand { impl Command for UnlockCommand {
fn help(&self) -> String { fn help(&self) -> String {
@ -425,7 +423,6 @@ impl Command for UnlockCommand {
} }
} }
struct LockCommand {} struct LockCommand {}
impl Command for LockCommand { impl Command for LockCommand {
fn help(&self) -> String { fn help(&self) -> String {
@ -466,7 +463,6 @@ impl Command for LockCommand {
} }
} }
struct SendCommand {} struct SendCommand {}
impl Command for SendCommand { impl Command for SendCommand {
fn help(&self) -> String { fn help(&self) -> String {
@ -770,19 +766,17 @@ impl Command for HeightCommand {
} }
} }
struct NewAddressCommand {} struct NewAddressCommand {}
impl Command for NewAddressCommand { impl Command for NewAddressCommand {
fn help(&self) -> String { fn help(&self) -> String {
let mut h = vec![]; let mut h = vec![];
h.push("Create a new address in this wallet"); h.push("Create a new address in this wallet");
h.push("Usage:"); h.push("Usage:");
h.push("new [z | t]"); h.push("new [z | r]");
h.push(""); h.push("");
h.push("Example:"); h.push("Example:");
h.push("To create a new z address:"); h.push("To create a new zs address:");
h.push("new z"); h.push("new zs");
h.join("\n") h.join("\n")
} }
@ -939,9 +933,6 @@ pub fn do_user_command(cmd: &str, args: &Vec<&str>, lightclient: &LightClient) -
} }
} }
#[cfg(test)] #[cfg(test)]
pub mod tests { pub mod tests {
use lazy_static::lazy_static; use lazy_static::lazy_static;

35
lib/src/grpcconnector.rs

@ -1,6 +1,6 @@
// Copyright The Hush Developers 2019-2022 // Copyright The Hush Developers 2019-2022
// Released under the GPLv3 // Released under the GPLv3
use log::{error}; use log::{info,error};
use std::sync::Arc; use std::sync::Arc;
use zcash_primitives::transaction::{TxId}; use zcash_primitives::transaction::{TxId};
@ -189,6 +189,39 @@ async fn get_address_txids<F : 'static + std::marker::Send>(uri: &http::Uri, add
Ok(()) Ok(())
} }
// function to monitor mempool transactions
pub async fn monitor_mempool<F: 'static + std::marker::Send>(
uri: &http::Uri,
no_cert: bool,
mut c: F
) -> Result<(), Box<dyn std::error::Error>>
where
F: FnMut(RawTransaction) -> Result<(), Box<dyn std::error::Error>>,
{
let mut client = get_client(uri, no_cert)
.await
.map_err(|e| format!("Error getting client: {:?}", e))?;
let request = Request::new(Empty {});
let mut response = client
.get_mempool_stream(request)
.await
.map_err(|e| format!("{}", e))?
.into_inner();
while let Ok(Some(rtx)) = response.message().await {
if let Err(e) = c(rtx) {
info!("Error processing RawTransaction: {:?}", e);
}
}
Ok(())
}
pub fn fetch_transparent_txids<F : 'static + std::marker::Send>(uri: &http::Uri, address: String, pub fn fetch_transparent_txids<F : 'static + std::marker::Send>(uri: &http::Uri, address: String,
start_height: u64, end_height: u64, no_cert: bool, c: F) -> Result<(), String> start_height: u64, end_height: u64, no_cert: bool, c: F) -> Result<(), String>

73
lib/src/lightclient.rs

@ -11,6 +11,10 @@ use std::cmp::{max, min};
use std::io; use std::io;
use std::io::prelude::*; use std::io::prelude::*;
use std::io::{BufReader, BufWriter, Error, ErrorKind}; use std::io::{BufReader, BufWriter, Error, ErrorKind};
use tokio::runtime::Runtime;
use tokio::time::{Duration};
use crate::grpc_client::RawTransaction;
use protobuf::parse_from_bytes; use protobuf::parse_from_bytes;
@ -629,6 +633,7 @@ impl LightClient {
object!{ object!{
"address" => zaddress.clone(), "address" => zaddress.clone(),
"zbalance" => wallet.zbalance(Some(zaddress.clone())), "zbalance" => wallet.zbalance(Some(zaddress.clone())),
"unconfirmed" => wallet.unconfirmed_zbalance(Some(zaddress.clone())),
"verified_zbalance" => wallet.verified_zbalance(Some(zaddress.clone())), "verified_zbalance" => wallet.verified_zbalance(Some(zaddress.clone())),
"spendable_zbalance" => wallet.spendable_zbalance(Some(zaddress.clone())) "spendable_zbalance" => wallet.spendable_zbalance(Some(zaddress.clone()))
} }
@ -647,6 +652,7 @@ impl LightClient {
object!{ object!{
"zbalance" => wallet.zbalance(None), "zbalance" => wallet.zbalance(None),
"unconfirmed" => wallet.unconfirmed_zbalance(None),
"verified_zbalance" => wallet.verified_zbalance(None), "verified_zbalance" => wallet.verified_zbalance(None),
"spendable_zbalance" => wallet.spendable_zbalance(None), "spendable_zbalance" => wallet.spendable_zbalance(None),
"tbalance" => wallet.tbalance(None), "tbalance" => wallet.tbalance(None),
@ -940,7 +946,30 @@ impl LightClient {
txns txns
}) })
.collect::<Vec<JsonValue>>(); .collect::<Vec<JsonValue>>();
// Add the incoming Mempool - incoming_mempool flag is atm useless, but we can use that in future maybe
tx_list.extend(
wallet.incoming_mempool_txs.read().unwrap().iter().flat_map(|(_, wtxs)| {
wtxs.iter().flat_map(|wtx| {
wtx.incoming_metadata.iter()
.enumerate()
.map(move |(_i, om)|
object! {
"block_height" => wtx.block.clone(),
"datetime" => wtx.datetime.clone(),
"position" => om.position,
"txid" => format!("{}", wtx.txid),
"amount" => om.value as i64,
"address" => om.address.clone(),
"memo" => LightWallet::memo_str(&Some(om.memo.clone())),
"unconfirmed" => true,
"incoming_mempool" => true,
}
)
})
})
);
// Add in all mempool txns // Add in all mempool txns
tx_list.extend(wallet.mempool_txs.read().unwrap().iter().map( |(_, wtx)| { tx_list.extend(wallet.mempool_txs.read().unwrap().iter().map( |(_, wtx)| {
use zcash_primitives::transaction::components::amount::DEFAULT_FEE; use zcash_primitives::transaction::components::amount::DEFAULT_FEE;
@ -956,6 +985,7 @@ impl LightClient {
"address" => om.address.clone(), "address" => om.address.clone(),
"value" => om.value, "value" => om.value,
"memo" => LightWallet::memo_str(&Some(om.memo.clone())), "memo" => LightWallet::memo_str(&Some(om.memo.clone())),
}).collect::<Vec<JsonValue>>(); }).collect::<Vec<JsonValue>>();
object! { object! {
@ -1037,6 +1067,47 @@ impl LightClient {
Ok(array![new_address]) Ok(array![new_address])
} }
// Start Mempool-Monitor
pub fn start_mempool_monitor(lc: Arc<LightClient>) -> Result<(), String> {
let config = lc.config.clone();
let uri = config.server.clone();
let (incoming_mempool_tx, incoming_mempool_rx) = std::sync::mpsc::channel::<RawTransaction>();
// Thread for reveive transactions
std::thread::spawn(move || {
while let Ok(rtx) = incoming_mempool_rx.recv() {
if let Ok(tx) = Transaction::read(
&rtx.data[..])
{
let light_wallet_clone = lc.wallet.clone();
light_wallet_clone.read().unwrap().scan_full_mempool_tx(&tx, rtx.height as i32, 0, true);
}
}
});
// Thread mempool monitor
std::thread::spawn(move || {
let mut rt = Runtime::new().unwrap();
rt.block_on(async {
loop {
let incoming_mempool_tx_clone = incoming_mempool_tx.clone();
let send_closure = move |rtx: RawTransaction| {
incoming_mempool_tx_clone.send(rtx).map_err(|e| Box::new(e) as Box<dyn std::error::Error>)
};
match grpcconnector::monitor_mempool(&uri.clone(), true, send_closure).await {
Ok(_) => info!("Mempool monitor loop successful"),
Err(e) => warn!("Mempool monitor returned {:?}, will restart listening", e),
}
std::thread::sleep(Duration::from_secs(10));
}
});
});
Ok(())
}
/// Convinence function to determine what type of key this is and import it /// Convinence function to determine what type of key this is and import it
pub fn do_import_key(&self, key: String, birthday: u64) -> Result<JsonValue, String> { pub fn do_import_key(&self, key: String, birthday: u64) -> Result<JsonValue, String> {
if key.starts_with(self.config.hrp_sapling_private_key()) { if key.starts_with(self.config.hrp_sapling_private_key()) {

631
lib/src/lightwallet.rs

@ -53,9 +53,6 @@ use zcash_primitives::{
}; };
use crate::lightclient::{LightClientConfig}; use crate::lightclient::{LightClientConfig};
mod data; mod data;
@ -65,7 +62,7 @@ mod address;
mod prover; mod prover;
mod walletzkey; mod walletzkey;
use data::{BlockData, WalletTx, Utxo, SaplingNoteData, SpendableNote, OutgoingTxMetadata}; use data::{BlockData, WalletTx, Utxo, SaplingNoteData, SpendableNote, OutgoingTxMetadata, IncomingTxMetadata};
use extended_key::{KeyIndex, ExtendedPrivKey}; use extended_key::{KeyIndex, ExtendedPrivKey};
use walletzkey::{WalletZKey, WalletTKey, WalletZKeyType}; use walletzkey::{WalletZKey, WalletTKey, WalletZKeyType};
@ -102,7 +99,7 @@ impl ToBase58Check for [u8] {
payload.extend_from_slice(self); payload.extend_from_slice(self);
payload.extend_from_slice(suffix); payload.extend_from_slice(suffix);
let mut checksum = double_sha256(&payload); let checksum = double_sha256(&payload);
payload.append(&mut checksum[..4].to_vec()); payload.append(&mut checksum[..4].to_vec());
payload.to_base58() payload.to_base58()
} }
@ -135,6 +132,7 @@ pub struct LightWallet {
// Transactions that are only in the mempool, but haven't been confirmed yet. // Transactions that are only in the mempool, but haven't been confirmed yet.
// This is not stored to disk. // This is not stored to disk.
pub mempool_txs: Arc<RwLock<HashMap<TxId, WalletTx>>>, pub mempool_txs: Arc<RwLock<HashMap<TxId, WalletTx>>>,
pub incoming_mempool_txs: Arc<RwLock<HashMap<TxId, Vec<WalletTx>>>>,
// The block at which this wallet was born. Rescans // The block at which this wallet was born. Rescans
// will start from here. // will start from here.
@ -199,7 +197,7 @@ impl LightWallet {
let zdustextfvk = ExtendedFullViewingKey::from(&zdustextsk); let zdustextfvk = ExtendedFullViewingKey::from(&zdustextsk);
let zdustaddress = zdustextfvk.default_address().unwrap().1; let zdustaddress = zdustextfvk.default_address().unwrap().1;
(zdustaddress) zdustaddress
} }
pub fn is_shielded_address(addr: &String, config: &LightClientConfig) -> bool { pub fn is_shielded_address(addr: &String, config: &LightClientConfig) -> bool {
@ -258,6 +256,7 @@ impl LightWallet {
blocks: Arc::new(RwLock::new(vec![])), blocks: Arc::new(RwLock::new(vec![])),
txs: Arc::new(RwLock::new(HashMap::new())), txs: Arc::new(RwLock::new(HashMap::new())),
mempool_txs: Arc::new(RwLock::new(HashMap::new())), mempool_txs: Arc::new(RwLock::new(HashMap::new())),
incoming_mempool_txs: Arc::new(RwLock::new(HashMap::new())),
config: config.clone(), config: config.clone(),
birthday: latest_block, birthday: latest_block,
total_scan_duration: Arc::new(RwLock::new(vec![Duration::new(0, 0)])), total_scan_duration: Arc::new(RwLock::new(vec![Duration::new(0, 0)])),
@ -427,6 +426,7 @@ impl LightWallet {
blocks: Arc::new(RwLock::new(blocks)), blocks: Arc::new(RwLock::new(blocks)),
txs: Arc::new(RwLock::new(txs)), txs: Arc::new(RwLock::new(txs)),
mempool_txs: Arc::new(RwLock::new(HashMap::new())), mempool_txs: Arc::new(RwLock::new(HashMap::new())),
incoming_mempool_txs: Arc::new(RwLock::new(HashMap::new())),
config: config.clone(), config: config.clone(),
birthday, birthday,
total_scan_duration: Arc::new(RwLock::new(vec![Duration::new(0, 0)])), total_scan_duration: Arc::new(RwLock::new(vec![Duration::new(0, 0)])),
@ -749,6 +749,7 @@ impl LightWallet {
self.blocks.write().unwrap().clear(); self.blocks.write().unwrap().clear();
self.txs.write().unwrap().clear(); self.txs.write().unwrap().clear();
self.mempool_txs.write().unwrap().clear(); self.mempool_txs.write().unwrap().clear();
self.incoming_mempool_txs.write().unwrap().clear();
} }
pub fn set_initial_block(&self, height: i32, hash: &str, sapling_tree: &str) -> bool { pub fn set_initial_block(&self, height: i32, hash: &str, sapling_tree: &str) -> bool {
@ -1048,27 +1049,36 @@ impl LightWallet {
} }
pub fn zbalance(&self, addr: Option<String>) -> u64 { pub fn zbalance(&self, addr: Option<String>) -> u64 {
self.txs.read().unwrap() let unconfirmed_balance = self.unconfirmed_zbalance(addr.clone());
let confirmed_balance = self.txs.read().unwrap()
.values() .values()
.map (|tx| { .map(|tx| {
tx.notes.iter() tx.notes.iter()
.filter(|nd| { // TODO, this whole section is shared with verified_balance. Refactor it. .filter(|nd| {
match addr.clone() { match addr.as_ref() {
Some(a) => a == encode_payment_address( Some(a) => *a == encode_payment_address(
self.config.hrp_sapling_address(), self.config.hrp_sapling_address(),
&nd.extfvk.fvk.vk &nd.extfvk.fvk.vk
.into_payment_address(nd.diversifier, &JUBJUB).unwrap() .into_payment_address(nd.diversifier, &JUBJUB).unwrap()
), ),
None => true None => true
}
})
.map(|nd| {
if nd.spent.is_none() && nd.unconfirmed_spent.is_none() {
nd.note.value
} else {
0
} }
}) })
.map(|nd| if nd.spent.is_none() { nd.note.value } else { 0 })
.sum::<u64>() .sum::<u64>()
}) })
.sum::<u64>() as u64 .sum::<u64>();
confirmed_balance + unconfirmed_balance
} }
// Get all (unspent) utxos. Unconfirmed spent utxos are included // Get all (unspent) utxos. Unconfirmed spent utxos are included
pub fn get_utxos(&self) -> Vec<Utxo> { pub fn get_utxos(&self) -> Vec<Utxo> {
let txs = self.txs.read().unwrap(); let txs = self.txs.read().unwrap();
@ -1109,8 +1119,8 @@ impl LightWallet {
.iter() .iter()
.filter(|nd| nd.spent.is_none() && nd.unconfirmed_spent.is_none()) .filter(|nd| nd.spent.is_none() && nd.unconfirmed_spent.is_none())
.filter(|nd| { // TODO, this whole section is shared with verified_balance. Refactor it. .filter(|nd| { // TODO, this whole section is shared with verified_balance. Refactor it.
match addr.clone() { match addr.as_ref() {
Some(a) => a == encode_payment_address( Some(a) => *a == encode_payment_address(
self.config.hrp_sapling_address(), self.config.hrp_sapling_address(),
&nd.extfvk.fvk.vk &nd.extfvk.fvk.vk
.into_payment_address(nd.diversifier, &JUBJUB).unwrap() .into_payment_address(nd.diversifier, &JUBJUB).unwrap()
@ -1144,8 +1154,8 @@ impl LightWallet {
self.have_spendingkey_for_extfvk(&nd.extfvk) self.have_spendingkey_for_extfvk(&nd.extfvk)
}) })
.filter(|nd| { // TODO, this whole section is shared with verified_balance. Refactor it. .filter(|nd| { // TODO, this whole section is shared with verified_balance. Refactor it.
match addr.clone() { match addr.as_ref() {
Some(a) => a == encode_payment_address( Some(a) => *a == encode_payment_address(
self.config.hrp_sapling_address(), self.config.hrp_sapling_address(),
&nd.extfvk.fvk.vk &nd.extfvk.fvk.vk
.into_payment_address(nd.diversifier, &JUBJUB).unwrap() .into_payment_address(nd.diversifier, &JUBJUB).unwrap()
@ -1162,6 +1172,30 @@ impl LightWallet {
.sum::<u64>() as u64 .sum::<u64>() as u64
} }
pub fn unconfirmed_zbalance(&self, addr: Option<String>) -> u64 {
self.incoming_mempool_txs.read().unwrap()
.values()
.flat_map(|txs| txs.iter())
.map(|tx| {
tx.incoming_metadata.iter()
.filter(|meta| {
match addr.as_ref() {
Some(a) => {
a == &meta.address
},
None => true
}
})
.map(|meta| {
meta.value
})
.sum::<u64>()
})
.sum::<u64>()
}
pub fn have_spendingkey_for_extfvk(&self, extfvk: &ExtendedFullViewingKey) -> bool { pub fn have_spendingkey_for_extfvk(&self, extfvk: &ExtendedFullViewingKey) -> bool {
match self.zkeys.read().unwrap().iter().find(|zk| zk.extfvk == *extfvk) { match self.zkeys.read().unwrap().iter().find(|zk| zk.extfvk == *extfvk) {
None => false, None => false,
@ -1275,224 +1309,365 @@ impl LightWallet {
} }
} }
// Scan the full Tx and update memos for incoming shielded transactions. // Scan the full Tx and update memos for incoming shielded transactions.
pub fn scan_full_tx(&self, tx: &Transaction, height: i32, datetime: u64) { pub fn scan_full_tx(&self, tx: &Transaction, height: i32, datetime: u64) {
let mut total_transparent_spend: u64 = 0; let mut total_transparent_spend: u64 = 0;
// Scan all the inputs to see if we spent any transparent funds in this tx
// Scan all the inputs to see if we spent any transparent funds in this tx for vin in tx.vin.iter() {
for vin in tx.vin.iter() { // Find the txid in the list of utxos that we have.
// Find the txid in the list of utxos that we have. let txid = TxId {0: vin.prevout.hash};
let txid = TxId {0: vin.prevout.hash}; match self.txs.write().unwrap().get_mut(&txid) {
match self.txs.write().unwrap().get_mut(&txid) { Some(wtx) => {
Some(wtx) => { //println!("Looking for {}, {}", txid, vin.prevout.n);
//println!("Looking for {}, {}", txid, vin.prevout.n); // One of the tx outputs is a match
let spent_utxo = wtx.utxos.iter_mut()
// One of the tx outputs is a match .find(|u| u.txid == txid && u.output_index == (vin.prevout.n as u64));
let spent_utxo = wtx.utxos.iter_mut() match spent_utxo {
.find(|u| u.txid == txid && u.output_index == (vin.prevout.n as u64)); Some(su) => {
info!("Spent utxo from {} was spent in {}", txid, tx.txid());
match spent_utxo { su.spent = Some(tx.txid().clone());
Some(su) => { su.unconfirmed_spent = None;
info!("Spent utxo from {} was spent in {}", txid, tx.txid()); total_transparent_spend += su.value;
su.spent = Some(tx.txid().clone()); },
su.unconfirmed_spent = None; _ => {}
}
total_transparent_spend += su.value; },
}, _ => {}
_ => {} };
}
if total_transparent_spend > 0 {
// Update the WalletTx. Do it in a short scope because of the write lock.
let mut txs = self.txs.write().unwrap();
if !txs.contains_key(&tx.txid()) {
let tx_entry = WalletTx::new(height, datetime, &tx.txid());
txs.insert(tx.txid().clone(), tx_entry);
}
txs.get_mut(&tx.txid()).unwrap()
.total_transparent_value_spent = total_transparent_spend;
}
// Scan for t outputs
let all_taddresses = self.tkeys.read().unwrap().iter()
.map(|wtx| wtx.address.clone())
.map(|a| a.clone())
.collect::<Vec<_>>();
for address in all_taddresses {
for (n, vout) in tx.vout.iter().enumerate() {
match vout.script_pubkey.address() {
Some(TransparentAddress::PublicKey(hash)) => {
if address == hash.to_base58check(&self.config.base58_pubkey_address(), &[]) {
// This is our address. Add this as an output to the txid
self.add_toutput_to_wtx(height, datetime, &tx.txid(), &vout, n as u64);
// Ensure that we add any new HD addresses
self.ensure_hd_taddresses(&address);
} }
}, },
_ => {} _ => {}
}; }
} }
}
if total_transparent_spend > 0 { {
// Update the WalletTx. Do it in a short scope because of the write lock. let total_shielded_value_spent = self.txs.read().unwrap().get(&tx.txid()).map_or(0, |wtx| wtx.total_shielded_value_spent);
let mut txs = self.txs.write().unwrap(); if total_transparent_spend + total_shielded_value_spent > 0 {
// We spent money in this Tx, so grab all the transparent outputs (except ours) and add them to the
if !txs.contains_key(&tx.txid()) { // outgoing metadata
let tx_entry = WalletTx::new(height, datetime, &tx.txid()); // Collect our t-addresses
txs.insert(tx.txid().clone(), tx_entry); let wallet_taddrs = self.tkeys.read().unwrap().iter()
.map(|wtx| wtx.address.clone())
.map(|a| a.clone())
.collect::<HashSet<String>>();
for vout in tx.vout.iter() {
let taddr = self.address_from_pubkeyhash(vout.script_pubkey.address());
if taddr.is_some() && !wallet_taddrs.contains(&taddr.clone().unwrap()) {
let taddr = taddr.unwrap();
// Add it to outgoing metadata
let mut txs = self.txs.write().unwrap();
if txs.get(&tx.txid()).unwrap().outgoing_metadata.iter()
.find(|om|
om.address == taddr && Amount::from_u64(om.value).unwrap() == vout.value)
.is_some() {
warn!("Duplicate outgoing metadata");
continue;
}
// Write the outgoing metadata
txs.get_mut(&tx.txid()).unwrap()
.outgoing_metadata
.push(OutgoingTxMetadata{
address: taddr,
value: vout.value.into(),
memo: Memo::default(),
});
}
} }
}
txs.get_mut(&tx.txid()).unwrap() }
.total_transparent_value_spent = total_transparent_spend; // Scan shielded sapling outputs to see if anyone of them is us, and if it is, extract the memo
} for output in tx.shielded_outputs.iter() {
let ivks: Vec<_> = self.zkeys.read().unwrap().iter()
// Scan for t outputs .map(|zk| zk.extfvk.fvk.vk.ivk()
let all_taddresses = self.tkeys.read().unwrap().iter() ).collect();
.map(|wtx| wtx.address.clone()) let cmu = output.cmu;
.map(|a| a.clone()) let ct = output.enc_ciphertext;
.collect::<Vec<_>>(); // Search all of our keys
for address in all_taddresses { for ivk in ivks {
for (n, vout) in tx.vout.iter().enumerate() { let epk_prime = output.ephemeral_key.as_prime_order(&JUBJUB).unwrap();
match vout.script_pubkey.address() { let (note, _to, memo) = match try_sapling_note_decryption(&ivk, &epk_prime, &cmu, &ct) {
Some(TransparentAddress::PublicKey(hash)) => { Some(ret) => ret,
if address == hash.to_base58check(&self.config.base58_pubkey_address(), &[]) { None => continue,
// This is our address. Add this as an output to the txid };
self.add_toutput_to_wtx(height, datetime, &tx.txid(), &vout, n as u64); if memo.to_utf8().is_some() {
// info!("A sapling note was sent to wallet in {} that had a memo", tx.txid());
// Ensure that we add any new HD addresses // Do it in a short scope because of the write lock.
self.ensure_hd_taddresses(&address); let mut txs = self.txs.write().unwrap();
} // Update memo if we have this Tx.
match txs.get_mut(&tx.txid())
.and_then(|t| {
t.notes.iter_mut().find(|nd| nd.note == note)
}) {
None => {
info!("No txid matched for incoming sapling funds while updating memo");
()
}, },
_ => {} Some(nd) => {
} nd.memo = Some(memo)
}
}
} }
} }
// Also scan the output to see if it can be decoded with our OutgoingViewKey
{ // If it can, then we sent this transaction, so we should be able to get
let total_shielded_value_spent = self.txs.read().unwrap().get(&tx.txid()).map_or(0, |wtx| wtx.total_shielded_value_spent); // the memo and value for our records
if total_transparent_spend + total_shielded_value_spent > 0 { // First, collect all our z addresses, to check for change
// We spent money in this Tx, so grab all the transparent outputs (except ours) and add them to the // Collect z addresses
// outgoing metadata let z_addresses = self.zkeys.read().unwrap().iter().map( |zk| {
encode_payment_address(self.config.hrp_sapling_address(), &zk.zaddress)
// Collect our t-addresses }).collect::<HashSet<String>>();
let wallet_taddrs = self.tkeys.read().unwrap().iter() // Search all ovks that we have
.map(|wtx| wtx.address.clone()) let ovks: Vec<_> = self.zkeys.read().unwrap().iter()
.map(|a| a.clone()) .map(|zk| zk.extfvk.fvk.ovk.clone())
.collect::<HashSet<String>>(); .collect();
for ovk in ovks {
for vout in tx.vout.iter() { match try_sapling_output_recovery(
let taddr = self.address_from_pubkeyhash(vout.script_pubkey.address()); &ovk,
&output.cv,
if taddr.is_some() && !wallet_taddrs.contains(&taddr.clone().unwrap()) { &output.cmu,
let taddr = taddr.unwrap(); &output.ephemeral_key.as_prime_order(&JUBJUB).unwrap(),
&output.enc_ciphertext,
// Add it to outgoing metadata &output.out_ciphertext) {
let mut txs = self.txs.write().unwrap(); Some((note, payment_address, memo)) => {
if txs.get(&tx.txid()).unwrap().outgoing_metadata.iter() let address = encode_payment_address(self.config.hrp_sapling_address(),
.find(|om| &payment_address);
om.address == taddr && Amount::from_u64(om.value).unwrap() == vout.value) // Check if this is change, and if it also doesn't have a memo, don't add
.is_some() { // to the outgoing metadata.
warn!("Duplicate outgoing metadata"); // If this is change (i.e., funds sent to ourself) AND has a memo, then
// presumably the users is writing a memo to themself, so we will add it to
// the outgoing metadata, even though it might be confusing in the UI, but hopefully
// the user can make sense of it.
if z_addresses.contains(&address) && memo.to_utf8().is_none() {
continue; continue;
} }
// Update the WalletTx
// Write the outgoing metadata // Do it in a short scope because of the write lock.
txs.get_mut(&tx.txid()).unwrap() {
.outgoing_metadata info!("A sapling output was sent in {}", tx.txid());
.push(OutgoingTxMetadata{ let mut txs = self.txs.write().unwrap();
address: taddr, if txs.get(&tx.txid()).unwrap().outgoing_metadata.iter()
value: vout.value.into(), .find(|om| om.address == address && om.value == note.value && om.memo == memo)
memo: Memo::default(), .is_some() {
}); warn!("Duplicate outgoing metadata");
} continue;
} }
}
// Write the outgoing metadata
txs.get_mut(&tx.txid()).unwrap()
.outgoing_metadata
.push(OutgoingTxMetadata{
address, value: note.value, memo,
});
}
},
None => {}
};
} }
}
// Mark this Tx as scanned
{
let mut txs = self.txs.write().unwrap();
match txs.get_mut(&tx.txid()) {
Some(wtx) => wtx.full_tx_scanned = true,
None => {},
};
}
}
// Scan shielded sapling outputs to see if anyone of them is us, and if it is, extract the memo pub fn scan_full_mempool_tx(&self, tx: &Transaction, height: i32, _datetime: u64, mempool_transaction: bool) {
for output in tx.shielded_outputs.iter() { if tx.shielded_outputs.is_empty() {
let ivks: Vec<_> = self.zkeys.read().unwrap().iter() error!("Something went wrong, there are no shielded outputs");
.map(|zk| zk.extfvk.fvk.vk.ivk() return;
).collect(); }
let cmu = output.cmu;
let ct = output.enc_ciphertext;
// Search all of our keys for output in tx.shielded_outputs.iter() {
for ivk in ivks { let ivks: Vec<_> = self.zkeys.read().unwrap().iter()
let epk_prime = output.ephemeral_key.as_prime_order(&JUBJUB).unwrap(); .map(|zk| zk.extfvk.fvk.vk.ivk())
.collect();
let cmu = output.cmu;
let ct = output.enc_ciphertext;
// Search all of our keys
for ivk in ivks {
let epk_prime = output.ephemeral_key.as_prime_order(&JUBJUB).unwrap();
let (note, _to, memo) = match try_sapling_note_decryption(&ivk, &epk_prime, &cmu, &ct) {
Some(ret) => ret,
None => continue,
};
let (note, _to, memo) = match try_sapling_note_decryption(&ivk, &epk_prime, &cmu, &ct) { if mempool_transaction {
Some(ret) => ret, let mut incoming_mempool_txs = match self.incoming_mempool_txs.write() {
None => continue, Ok(txs) => txs,
Err(e) => {
error!("Error acquiring write lock: {}", e);
return;
}
}; };
if memo.to_utf8().is_some() { let addr = encode_payment_address(self.config.hrp_sapling_address(), &_to);
// info!("A sapling note was sent to wallet in {} that had a memo", tx.txid()); let amt = note.value;
let mut wtx = WalletTx::new(height, now() as u64, &tx.txid());
let formatted_memo = LightWallet::memo_str(&Some(memo.clone()));
let existing_txs = incoming_mempool_txs.entry(tx.txid())
.or_insert_with(Vec::new);
if formatted_memo.as_ref().map_or(false, |m| !m.is_empty()) {
// Check if a transaction with the exact same memo already exists
if existing_txs.iter().any(|tx| tx.incoming_metadata.iter().any(|meta| LightWallet::memo_str(&Some(meta.memo.clone())) == formatted_memo.as_ref().cloned())) {
// Transaction with this memo already exists, do nothing
return;
}
// Do it in a short scope because of the write lock. let position = if formatted_memo.as_ref().map_or(false, |m| m.starts_with('{')) {
let mut txs = self.txs.write().unwrap(); 1
// Update memo if we have this Tx. } else {
match txs.get_mut(&tx.txid()) existing_txs.iter()
.and_then(|t| { .filter(|tx| !LightWallet::memo_str(&Some(tx.incoming_metadata.iter().last().unwrap().memo.clone())).as_ref().map_or(false, |m| m.starts_with('{')))
t.notes.iter_mut().find(|nd| nd.note == note) .count() as u64 + 2
}) { };
None => {
info!("No txid matched for incoming sapling funds while updating memo"); let incoming_metadata = IncomingTxMetadata {
() address: addr.clone(),
}, value: amt,
Some(nd) => { memo: memo.clone(),
nd.memo = Some(memo) incoming_mempool: true,
} position: position,
} };
}
} wtx.incoming_metadata.push(incoming_metadata);
existing_txs.push(wtx);
let mut txs = match self.txs.write() {
Ok(t) => t,
Err(e) => {
error!("Error acquiring write lock: {}", e);
return;
}
};
if let Some(wtx) = txs.get_mut(&tx.txid()) {
wtx.incoming_metadata.push(IncomingTxMetadata {
address: addr.clone(),
value: amt,
memo: memo.clone(),
incoming_mempool: true,
position: position,
});
} else {
let mut new_wtx = WalletTx::new(height, now() as u64, &tx.txid());
new_wtx.incoming_metadata.push(IncomingTxMetadata {
address: addr.clone(),
value: amt,
memo: memo.clone(),
incoming_mempool: true,
position: position,
});
txs.insert(tx.txid(), new_wtx);
}
// Also scan the output to see if it can be decoded with our OutgoingViewKey info!("Successfully added txid with memo");
// If it can, then we sent this transaction, so we should be able to get } else {
// the memo and value for our records let position = 0;
// Check if txid already exists in the hashmap
let txid_exists = match self.txs.read() {
Ok(t) => t.contains_key(&tx.txid()),
Err(e) => {
error!("Error acquiring read lock: {}", e);
return;
}
};
// First, collect all our z addresses, to check for change if txid_exists {
// Collect z addresses // If txid already exists, do not process further
let z_addresses = self.zkeys.read().unwrap().iter().map( |zk| { info!("Txid already exists, not adding");
encode_payment_address(self.config.hrp_sapling_address(), &zk.zaddress) return;
}).collect::<HashSet<String>>(); }
// Search all ovks that we have let incoming_metadata = IncomingTxMetadata {
let ovks: Vec<_> = self.zkeys.read().unwrap().iter() address: addr.clone(),
.map(|zk| zk.extfvk.fvk.ovk.clone()) value: amt,
.collect(); memo: memo.clone(),
incoming_mempool: true,
position: position,
};
wtx.incoming_metadata.push(incoming_metadata);
existing_txs.push(wtx);
let mut txs = match self.txs.write() {
Ok(t) => t,
Err(e) => {
error!("Error acquiring write lock: {}", e);
return;
}
};
if let Some(wtx) = txs.get_mut(&tx.txid()) {
wtx.incoming_metadata.push(IncomingTxMetadata {
address: addr.clone(),
value: amt,
memo: memo.clone(),
incoming_mempool: true,
position: position,
});
} else {
let mut new_wtx = WalletTx::new(height, now() as u64, &tx.txid());
new_wtx.incoming_metadata.push(IncomingTxMetadata {
address: addr.clone(),
value: amt,
memo: memo.clone(),
incoming_mempool: true,
position: position,
});
txs.insert(tx.txid(), new_wtx);
}
for ovk in ovks { info!("Successfully added txid");
match try_sapling_output_recovery( }
&ovk, } else {
&output.cv, info!("Not a mempool transaction");
&output.cmu, }
&output.ephemeral_key.as_prime_order(&JUBJUB).unwrap(),
&output.enc_ciphertext,
&output.out_ciphertext) {
Some((note, payment_address, memo)) => {
let address = encode_payment_address(self.config.hrp_sapling_address(),
&payment_address);
// Check if this is change, and if it also doesn't have a memo, don't add
// to the outgoing metadata.
// If this is change (i.e., funds sent to ourself) AND has a memo, then
// presumably the users is writing a memo to themself, so we will add it to
// the outgoing metadata, even though it might be confusing in the UI, but hopefully
// the user can make sense of it.
if z_addresses.contains(&address) && memo.to_utf8().is_none() {
continue;
}
// Update the WalletTx // Mark this Tx as scanned
// Do it in a short scope because of the write lock. {
{ let mut txs = self.txs.write().unwrap();
info!("A sapling output was sent in {}", tx.txid()); match txs.get_mut(&tx.txid()) {
Some(wtx) => wtx.full_tx_scanned = true,
let mut txs = self.txs.write().unwrap(); None => {},
if txs.get(&tx.txid()).unwrap().outgoing_metadata.iter()
.find(|om| om.address == address && om.value == note.value && om.memo == memo)
.is_some() {
warn!("Duplicate outgoing metadata");
continue;
}
// Write the outgoing metadata
txs.get_mut(&tx.txid()).unwrap()
.outgoing_metadata
.push(OutgoingTxMetadata{
address, value: note.value, memo,
});
}
},
None => {}
}; };
} }
} }
// Mark this Tx as scanned
{
let mut txs = self.txs.write().unwrap();
match txs.get_mut(&tx.txid()) {
Some(wtx) => wtx.full_tx_scanned = true,
None => {},
};
}
} }
}
// Invalidate all blocks including and after "at_height". // Invalidate all blocks including and after "at_height".
// Returns the number of blocks invalidated // Returns the number of blocks invalidated
@ -1981,6 +2156,7 @@ impl LightWallet {
{ {
// Cleanup mempool tx after adding a block, to remove all txns that got mined // Cleanup mempool tx after adding a block, to remove all txns that got mined
self.cleanup_mempool(); self.cleanup_mempool();
self.cleanup_incoming_mempool();
} }
// Print info about the block every 10,000 blocks // Print info about the block every 10,000 blocks
@ -1999,7 +2175,7 @@ impl LightWallet {
consensus_branch_id: u32, consensus_branch_id: u32,
spend_params: &[u8], spend_params: &[u8],
output_params: &[u8], output_params: &[u8],
transparent_only: bool, _transparent_only: bool,
tos: Vec<(&str, u64, Option<String>)>, tos: Vec<(&str, u64, Option<String>)>,
broadcast_fn: F broadcast_fn: F
) -> Result<(String, Vec<u8>), String> ) -> Result<(String, Vec<u8>), String>
@ -2311,6 +2487,27 @@ impl LightWallet {
}); });
} }
} }
pub fn cleanup_incoming_mempool(&self) {
const DEFAULT_TX_EXPIRY_DELTA: i32 = 20;
let current_height = self.blocks.read().unwrap().last().map(|b| b.height).unwrap_or(0);
{
// Remove all expired Txns
self.incoming_mempool_txs.write().unwrap().retain(|_, wtxs| {
wtxs.retain(|wtx| current_height < (wtx.block + DEFAULT_TX_EXPIRY_DELTA));
!wtxs.is_empty() // Behalte den Eintrag nur, wenn nicht alle Transaktionen abgelaufen sind
});
}
{
// Remove all txns where the txid is added to the wallet directly
self.incoming_mempool_txs.write().unwrap().retain(|txid, _| {
self.txs.read().unwrap().get(txid).is_none()
});
}
}
} }
#[cfg(test)] #[cfg(test)]

50
lib/src/lightwallet/data.rs

@ -349,6 +349,49 @@ impl OutgoingTxMetadata {
memo, memo,
}) })
} }
pub fn write<W: Write>(&self, mut writer: W) -> io::Result<()> {
// Strings are written as len + utf8
writer.write_u64::<LittleEndian>(self.address.as_bytes().len() as u64)?;
writer.write_all(self.address.as_bytes())?;
writer.write_u64::<LittleEndian>(self.value)?;
writer.write_all(self.memo.as_bytes())
}
}
#[derive(Debug)]
pub struct IncomingTxMetadata {
pub address: String,
pub value : u64,
pub memo : Memo,
pub incoming_mempool: bool,
pub position: u64,
}
impl IncomingTxMetadata {
pub fn read<R: Read>(mut reader: R) -> io::Result<Self> {
let address_len = reader.read_u64::<LittleEndian>()?;
let mut address_bytes = vec![0; address_len as usize];
reader.read_exact(&mut address_bytes)?;
let address = String::from_utf8(address_bytes).unwrap();
let value = reader.read_u64::<LittleEndian>()?;
let incoming_mempool = true;
let position = 0;
let mut memo_bytes = [0u8; 512];
reader.read_exact(&mut memo_bytes)?;
let memo = Memo::from_bytes(&memo_bytes).unwrap();
Ok(IncomingTxMetadata{
address,
value,
memo,
incoming_mempool,
position,
})
}
pub fn write<W: Write>(&self, mut writer: W) -> io::Result<()> { pub fn write<W: Write>(&self, mut writer: W) -> io::Result<()> {
// Strings are written as len + utf8 // Strings are written as len + utf8
@ -389,6 +432,8 @@ pub struct WalletTx {
// All outgoing sapling sends to addresses outside this wallet // All outgoing sapling sends to addresses outside this wallet
pub outgoing_metadata: Vec<OutgoingTxMetadata>, pub outgoing_metadata: Vec<OutgoingTxMetadata>,
pub incoming_metadata: Vec<IncomingTxMetadata>,
// Whether this TxID was downloaded from the server and scanned for Memos // Whether this TxID was downloaded from the server and scanned for Memos
pub full_tx_scanned: bool, pub full_tx_scanned: bool,
} }
@ -408,6 +453,7 @@ impl WalletTx {
total_shielded_value_spent: 0, total_shielded_value_spent: 0,
total_transparent_value_spent: 0, total_transparent_value_spent: 0,
outgoing_metadata: vec![], outgoing_metadata: vec![],
incoming_metadata: vec![],
full_tx_scanned: false, full_tx_scanned: false,
} }
} }
@ -438,6 +484,8 @@ impl WalletTx {
// Outgoing metadata was only added in version 2 // Outgoing metadata was only added in version 2
let outgoing_metadata = Vector::read(&mut reader, |r| OutgoingTxMetadata::read(r))?; let outgoing_metadata = Vector::read(&mut reader, |r| OutgoingTxMetadata::read(r))?;
let incoming_metadata = Vector::read(&mut reader, |r| IncomingTxMetadata::read(r))?;
let full_tx_scanned = reader.read_u8()? > 0; let full_tx_scanned = reader.read_u8()? > 0;
Ok(WalletTx{ Ok(WalletTx{
@ -449,6 +497,7 @@ impl WalletTx {
total_shielded_value_spent, total_shielded_value_spent,
total_transparent_value_spent, total_transparent_value_spent,
outgoing_metadata, outgoing_metadata,
incoming_metadata,
full_tx_scanned full_tx_scanned
}) })
} }
@ -470,6 +519,7 @@ impl WalletTx {
// Write the outgoing metadata // Write the outgoing metadata
Vector::write(&mut writer, &self.outgoing_metadata, |w, om| om.write(w))?; Vector::write(&mut writer, &self.outgoing_metadata, |w, om| om.write(w))?;
Vector::write(&mut writer, &self.incoming_metadata, |w, om| om.write(w))?;
writer.write_u8(if self.full_tx_scanned {1} else {0})?; writer.write_u8(if self.full_tx_scanned {1} else {0})?;

Loading…
Cancel
Save