From fc72c078be1337642a13db654a4abc03d0beafd4 Mon Sep 17 00:00:00 2001 From: Simon Date: Tue, 16 Aug 2016 10:33:04 -0700 Subject: [PATCH] Add async RPC queue and operation classes. Add z_getoperationstatus RPC command. Add z_sendmany RPC command (dummy implementation, does not send actual coins). --- src/Makefile.am | 6 + src/asyncrpcoperation.cpp | 147 +++++++++++++++++ src/asyncrpcoperation.h | 148 +++++++++++++++++ src/asyncrpcqueue.cpp | 147 +++++++++++++++++ src/asyncrpcqueue.h | 61 +++++++ src/rpcclient.cpp | 2 + src/rpcserver.cpp | 23 +++ src/rpcserver.h | 9 + src/wallet/asyncrpcoperation_sendmany.cpp | 77 +++++++++ src/wallet/asyncrpcoperation_sendmany.h | 32 ++++ src/wallet/rpcwallet.cpp | 191 ++++++++++++++++++++++ 11 files changed, 843 insertions(+) create mode 100644 src/asyncrpcoperation.cpp create mode 100644 src/asyncrpcoperation.h create mode 100644 src/asyncrpcqueue.cpp create mode 100644 src/asyncrpcqueue.h create mode 100644 src/wallet/asyncrpcoperation_sendmany.cpp create mode 100644 src/wallet/asyncrpcoperation_sendmany.h diff --git a/src/Makefile.am b/src/Makefile.am index 8cc9710df..28b73431d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -86,6 +86,8 @@ BITCOIN_CORE_H = \ alert.h \ amount.h \ arith_uint256.h \ + asyncrpcoperation.h \ + asyncrpcqueue.h \ base58.h \ bloom.h \ chain.h \ @@ -162,6 +164,7 @@ BITCOIN_CORE_H = \ utiltime.h \ validationinterface.h \ version.h \ + wallet/asyncrpcoperation_sendmany.h \ wallet/crypter.h \ wallet/db.h \ wallet/wallet.h \ @@ -191,6 +194,8 @@ libbitcoin_server_a_SOURCES = \ sendalert.cpp \ addrman.cpp \ alert.cpp \ + asyncrpcoperation.cpp \ + asyncrpcqueue.cpp \ bloom.cpp \ chain.cpp \ checkpoints.cpp \ @@ -224,6 +229,7 @@ libbitcoin_server_a_SOURCES = \ libbitcoin_wallet_a_CPPFLAGS = $(BITCOIN_INCLUDES) libbitcoin_wallet_a_SOURCES = \ zcbenchmarks.cpp \ + wallet/asyncrpcoperation_sendmany.cpp \ wallet/crypter.cpp \ wallet/db.cpp \ wallet/rpcdump.cpp \ diff --git a/src/asyncrpcoperation.cpp b/src/asyncrpcoperation.cpp new file mode 100644 index 000000000..debe2adc9 --- /dev/null +++ b/src/asyncrpcoperation.cpp @@ -0,0 +1,147 @@ +// Copyright (c) 2016 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "asyncrpcoperation.h" + +#include +#include +#include + +#include +#include +#include + +using namespace std; +using namespace json_spirit; + +static boost::uuids::random_generator uuidgen; + +std::map OperationStatusMap = { + {OperationStatus::READY, "queued"}, + {OperationStatus::EXECUTING, "executing"}, + {OperationStatus::CANCELLED, "cancelled"}, + {OperationStatus::FAILED, "failed"}, + {OperationStatus::SUCCESS, "success"} +}; + +AsyncRPCOperation::AsyncRPCOperation() : errorCode(0), errorMessage() { + // Set a unique reference for each operation + boost::uuids::uuid uuid = uuidgen(); + std::string s = boost::uuids::to_string(uuid); + setId(s); + + setState(OperationStatus::READY); + creationTime = (int64_t)time(NULL); +} + +AsyncRPCOperation::AsyncRPCOperation(const AsyncRPCOperation& o) : id(o.id), creationTime(o.creationTime), state(o.state.load()) +{ +} + + +AsyncRPCOperation& AsyncRPCOperation::operator=( const AsyncRPCOperation& other ) { + this->id = other.getId(); + this->creationTime = other.creationTime; + this->state.store(other.state.load()); + return *this; +} + + +AsyncRPCOperation::~AsyncRPCOperation() { +} + +void AsyncRPCOperation::cancel() { + if (isReady()) + setState(OperationStatus::CANCELLED); +} + + +void AsyncRPCOperation::startExecutionClock() { + startTime = std::chrono::system_clock::now(); +} + +void AsyncRPCOperation::stopExecutionClock() { + endTime = std::chrono::system_clock::now(); +} + + +// Implement this method in any subclass. +// This is just an example implementation. + + +void AsyncRPCOperation::main() { + if (isCancelled()) + return; + + setState(OperationStatus::EXECUTING); + + // + // Do some work here... + // + + startExecutionClock(); + + //std::this_thread::sleep_for(std::chrono::milliseconds(10000)); + + stopExecutionClock(); + + + // If there was an error... +// setErrorCode(123); +// setErrorMessage("Murphy's law"); +// setState(OperationStatus::FAILED); + + + // Otherwise + Value v("We have a result!"); + setResult(v); + setState(OperationStatus::SUCCESS); +} + +Value AsyncRPCOperation::getError() const { + if (!isFailed()) + return Value::null; + + Object error; + error.push_back(Pair("code", this->errorCode)); + error.push_back(Pair("message", this->errorMessage)); + return Value(error); +} + +Value AsyncRPCOperation::getResult() const { + if (!isSuccess()) + return Value::null; + + return this->resultValue; +} + + +/* + * Returns a status Value object. + * If the operation has failed, it will include an error object. + * If the operation has succeeded, it will include the result value. + */ +Value AsyncRPCOperation::getStatus() const { + OperationStatus status = this->getState(); + Object obj; + obj.push_back(Pair("id", this->getId())); + obj.push_back(Pair("status", OperationStatusMap[status])); + obj.push_back(Pair("creation_time", this->creationTime)); + // creation, exec time, duration, exec end, etc. + Value err = this->getError(); + if (!err.is_null()) { + obj.push_back(Pair("error", err.get_obj())); + } + Value result = this->getResult(); + if (!result.is_null()) { + obj.push_back(Pair("result", result)); + + // Include execution time for successful operation + std::chrono::duration elapsed_seconds = endTime - startTime; + obj.push_back(Pair("execution_secs", elapsed_seconds.count())); + + } + return Value(obj); +} + diff --git a/src/asyncrpcoperation.h b/src/asyncrpcoperation.h new file mode 100644 index 000000000..f30c174d6 --- /dev/null +++ b/src/asyncrpcoperation.h @@ -0,0 +1,148 @@ +// Copyright (c) 2016 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + + +#ifndef ASYNCRPCOPERATION_H +#define ASYNCRPCOPERATION_H + +#include +#include +#include +#include + +#include "json/json_spirit_value.h" +#include "json/json_spirit_utils.h" +#include "json/json_spirit_reader_template.h" +#include "json/json_spirit_writer_template.h" + +using namespace std; +using namespace json_spirit; + +/** + * AsyncRPCOperations are given to the AsyncRPCQueue for processing. + * + * How to subclass: + * Implement the main() method, this is where work is performed. + * Update the operation status as work is underway and completes. + */ + +typedef std::string AsyncRPCOperationId; + +typedef enum class operationStateEnum { + READY = 0, + EXECUTING, + CANCELLED, + FAILED, + SUCCESS +} OperationStatus; + +class AsyncRPCOperation { +public: + AsyncRPCOperation(); + + // Todo: keep or delete copy constructors and assignment? + AsyncRPCOperation(const AsyncRPCOperation& orig); + AsyncRPCOperation& operator=( const AsyncRPCOperation& other ); + + virtual ~AsyncRPCOperation(); + + // Implement this method in your subclass. + virtual void main(); + + void cancel(); + + // Getters and setters + + OperationStatus getState() const { + return state.load(); + } + + AsyncRPCOperationId getId() const { + return id; + } + + int64_t getCreationTime() const { + return creationTime; + } + + Value getStatus() const; + + Value getError() const; + + Value getResult() const; + + int getErrorCode() const { + return errorCode; + } + + std::string getErrorMessage() const { + return errorMessage; + } + + bool isCancelled() const { + return OperationStatus::CANCELLED==getState(); + } + + bool isExecuting() const { + return OperationStatus::EXECUTING==getState(); + } + + bool isReady() const { + return OperationStatus::READY==getState(); + } + + bool isFailed() const { + return OperationStatus::FAILED==getState(); + } + + bool isSuccess() const { + return OperationStatus::SUCCESS==getState(); + } + +protected: + + Value resultValue; + int errorCode; + std::string errorMessage; + std::atomic state; + std::chrono::time_point startTime, endTime; + + void startExecutionClock(); + void stopExecutionClock(); + + void setState(OperationStatus state) { + this->state.store(state); + } + + void setErrorCode(int errorCode) { + this->errorCode = errorCode; + } + + void setErrorMessage(std::string errorMessage) { + this->errorMessage = errorMessage; + } + + void setResult(Value v) { + this->resultValue = v; + } + +private: + + // Todo: Private for now. If copying an operation is possible, should it + // receive a new id and a new creation time? + void setId(AsyncRPCOperationId id) { + this->id = id; + } + + // Todo: Ditto above. + void setCreationTime(int64_t creationTime) { + this->creationTime = creationTime; + } + + AsyncRPCOperationId id; + int64_t creationTime; +}; + +#endif /* ASYNCRPCOPERATION_H */ + diff --git a/src/asyncrpcqueue.cpp b/src/asyncrpcqueue.cpp new file mode 100644 index 000000000..679f861c5 --- /dev/null +++ b/src/asyncrpcqueue.cpp @@ -0,0 +1,147 @@ +// Copyright (c) 2016 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "asyncrpcqueue.h" + +static std::atomic workerCounter(0); + +AsyncRPCQueue::AsyncRPCQueue() : closed(false) { +} + +/* + * Calling thread will join on all the worker threads + */ +AsyncRPCQueue::~AsyncRPCQueue() { + this->closed = true; // set this in case close() was not invoked + for (std::thread & t : this->workers) { + t.join(); + } +} + +/* + * A worker will execute this method on a new thread + */ +void AsyncRPCQueue::run(int workerId) { +// std::cout << "Launched queue worker " << workerId << std::endl; + + while (!isClosed()) { + AsyncRPCOperationId key; + std::shared_ptr operation; + { + std::unique_lock< std::mutex > guard(cs_lock); + while (operationIdQueue.empty() && !isClosed()) { + this->cs_condition.wait(guard); + } + + // Exit if the queue is closing. + if (isClosed()) + break; + + // Get operation id + key = operationIdQueue.front(); + operationIdQueue.pop(); + + // Search operation map + AsyncRPCOperationMap::const_iterator iter = operationMap.find(key); + if (iter != operationMap.end()) { + operation = iter->second; + } + } + + if (!operation) { + // cannot find operation in map, may have been removed + } else if (operation->isCancelled()) { + // skip cancelled operation + } else { + operation->main(); + } + } +// std::cout << "Terminating queue worker " << workerId << std::endl; +} + + +/* + * Add shared_ptr to operation. + * + * To retain polymorphic behaviour, i.e. main() method of derived classes is invoked, + * caller should create the shared_ptr like thi: + * + * std::shared_ptr ptr(new MyCustomAsyncRPCOperation(params)); + * + * Don't use std::make_shared(). + */ +void AsyncRPCQueue::addOperation(const std::shared_ptr &ptrOperation) { + + // Don't add if queue is closed + if (isClosed()) + return; + + AsyncRPCOperationId id = ptrOperation->getId(); + { + std::lock_guard< std::mutex > guard(cs_lock); + operationMap.emplace(id, ptrOperation); + operationIdQueue.push(id); + this->cs_condition.notify_one(); + } +} + + +std::shared_ptr AsyncRPCQueue::getOperationForId(AsyncRPCOperationId id) { + std::shared_ptr ptr; + + std::lock_guard< std::mutex > guard(cs_lock); + AsyncRPCOperationMap::const_iterator iter = operationMap.find(id); + if (iter != operationMap.end()) { + ptr = iter->second; + } + return ptr; +} + +std::shared_ptr AsyncRPCQueue::popOperationForId(AsyncRPCOperationId id) { + std::shared_ptr ptr = getOperationForId(id); + if (ptr) { + std::lock_guard< std::mutex > guard(cs_lock); + // Note: if the id still exists in the operationIdQueue, when it gets processed by a worker + // there will no operation in the map to execute, so nothing will happen. + operationMap.erase(id); + } + return ptr; +} + +bool AsyncRPCQueue::isClosed() { + return closed; +} + +void AsyncRPCQueue::close() { + this->closed = true; + cancelAllOperations(); +} + +/* + * Call cancel() on all operations + */ +void AsyncRPCQueue::cancelAllOperations() { + std::unique_lock< std::mutex > guard(cs_lock); + for (auto key : operationMap) { + key.second->cancel(); + } + this->cs_condition.notify_all(); +} + +int AsyncRPCQueue::getOperationCount() { + std::unique_lock< std::mutex > guard(cs_lock); + return operationIdQueue.size(); +} + +/* + * Spawn a worker thread + */ +void AsyncRPCQueue::addWorker() { + std::unique_lock< std::mutex > guard(cs_lock); // Todo: could just have a lock on the vector + workers.emplace_back( std::thread(&AsyncRPCQueue::run, this, ++workerCounter) ); +} + +int AsyncRPCQueue::getNumberOfWorkers() { + return workers.size(); +} diff --git a/src/asyncrpcqueue.h b/src/asyncrpcqueue.h new file mode 100644 index 000000000..99ba45f91 --- /dev/null +++ b/src/asyncrpcqueue.h @@ -0,0 +1,61 @@ +// Copyright (c) 2014 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef ASYNCRPCQUEUE_H +#define ASYNCRPCQUEUE_H + +#include "asyncrpcoperation.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + + +typedef std::unordered_map > AsyncRPCOperationMap; + + +class AsyncRPCQueue { +public: + AsyncRPCQueue(); + virtual ~AsyncRPCQueue(); + + // We don't want queue to be copied or moved around + AsyncRPCQueue(AsyncRPCQueue const&) = delete; // Copy construct + AsyncRPCQueue(AsyncRPCQueue&&) = delete; // Move construct + AsyncRPCQueue& operator=(AsyncRPCQueue const&) = delete; // Copy assign + AsyncRPCQueue& operator=(AsyncRPCQueue &&) = delete; // Move assign + + void addWorker(); + int getNumberOfWorkers(); + bool isClosed(); + void close(); + void cancelAllOperations(); + int getOperationCount(); + std::shared_ptr getOperationForId(AsyncRPCOperationId); + std::shared_ptr popOperationForId(AsyncRPCOperationId); + void addOperation(const std::shared_ptr &ptrOperation); + +private: + // addWorker() will spawn a new thread on this method + void run(int workerId); + + std::mutex cs_lock; + std::condition_variable cs_condition; + bool closed; + AsyncRPCOperationMap operationMap; + std::queue operationIdQueue; + std::vector workers; +}; + +#endif + + diff --git a/src/rpcclient.cpp b/src/rpcclient.cpp index 78ae7d930..f3d1a5454 100644 --- a/src/rpcclient.cpp +++ b/src/rpcclient.cpp @@ -98,6 +98,8 @@ static const CRPCConvertParam vRPCConvertParams[] = { "zcbenchmark", 1 }, { "zcbenchmark", 2 }, { "getblocksubsidy", 0}, + { "z_sendmany", 1}, + { "z_sendmany", 2}, { "z_importkey", 1 } }; diff --git a/src/rpcserver.cpp b/src/rpcserver.cpp index 3ad88b2e0..75f542aaa 100644 --- a/src/rpcserver.cpp +++ b/src/rpcserver.cpp @@ -15,6 +15,9 @@ #ifdef ENABLE_WALLET #include "wallet/wallet.h" #endif +#include "asyncrpcqueue.h" + +#include #include #include @@ -49,6 +52,7 @@ static boost::thread_group* rpc_worker_group = NULL; static boost::asio::io_service::work *rpc_dummy_work = NULL; static std::vector rpc_allow_subnets; //!< List of subnets to allow RPC connections from static std::vector< boost::shared_ptr > rpc_acceptors; +static shared_ptr async_rpc_queue; static struct CRPCSignals { @@ -382,6 +386,8 @@ static const CRPCCommand vRPCCommands[] = { "wallet", "zcrawjoinsplit", &zc_raw_joinsplit, true }, { "wallet", "zcrawreceive", &zc_raw_receive, true }, { "wallet", "zcsamplejoinsplit", &zc_sample_joinsplit, true }, + { "wallet", "z_sendmany", &z_sendmany, true }, + { "wallet", "z_getoperationstatus", &z_getoperationstatus, true }, { "wallet", "z_getnewaddress", &z_getnewaddress, true }, { "wallet", "z_listaddresses", &z_listaddresses, true }, { "wallet", "z_exportkey", &z_exportkey, true }, @@ -737,6 +743,13 @@ void StartRPCThreads() rpc_worker_group->create_thread(boost::bind(&boost::asio::io_service::run, rpc_io_service)); fRPCRunning = true; g_rpcSignals.Started(); + + // Launch at least one async rpc worker + async_rpc_queue = std::make_shared(); + async_rpc_queue->addWorker(); + async_rpc_queue->addWorker(); + async_rpc_queue->addWorker(); + } void StartDummyRPCThread() @@ -786,6 +799,10 @@ void StopRPCThreads() delete rpc_worker_group; rpc_worker_group = NULL; delete rpc_ssl_context; rpc_ssl_context = NULL; delete rpc_io_service; rpc_io_service = NULL; + + // Tells async queue to cancel all operations and shutdown. + // The async queue destructor will block and join on worker threads. + async_rpc_queue->close(); } bool IsRPCRunning() @@ -1048,3 +1065,9 @@ std::string HelpExampleRpc(string methodname, string args){ } const CRPCTable tableRPC; + +// Return async rpc queue +std::shared_ptr getAsyncRPCQueue() +{ + return async_rpc_queue; +} diff --git a/src/rpcserver.h b/src/rpcserver.h index 5828dd489..1d9712711 100644 --- a/src/rpcserver.h +++ b/src/rpcserver.h @@ -14,11 +14,13 @@ #include #include #include +#include #include "json/json_spirit_reader_template.h" #include "json/json_spirit_utils.h" #include "json/json_spirit_writer_template.h" +class AsyncRPCQueue; class CRPCCommand; namespace RPCServer @@ -55,6 +57,10 @@ void StopRPCThreads(); /** Query whether RPC is running */ bool IsRPCRunning(); +/** Get the async queue*/ +std::shared_ptr getAsyncRPCQueue(); + + /** * Set the RPC warmup status. When this is done, all RPC calls will error out * immediately with RPC_IN_WARMUP. @@ -250,6 +256,9 @@ extern json_spirit::Value z_getnewaddress(const json_spirit::Array& params, bool extern json_spirit::Value z_listaddresses(const json_spirit::Array& params, bool fHelp); // in rpcwallet.cpp extern json_spirit::Value z_exportwallet(const json_spirit::Array& params, bool fHelp); // in rpcdump.cpp extern json_spirit::Value z_importwallet(const json_spirit::Array& params, bool fHelp); // in rpcdump.cpp +extern json_spirit::Value z_sendmany(const json_spirit::Array& params, bool fHelp); // in rpcwallet.cpp +extern json_spirit::Value z_getoperationstatus(const json_spirit::Array& params, bool fHelp); // in rpcwallet.cpp + // in rest.cpp extern bool HTTPReq_REST(AcceptedConnection *conn, diff --git a/src/wallet/asyncrpcoperation_sendmany.cpp b/src/wallet/asyncrpcoperation_sendmany.cpp new file mode 100644 index 000000000..d379a010c --- /dev/null +++ b/src/wallet/asyncrpcoperation_sendmany.cpp @@ -0,0 +1,77 @@ +// Copyright (c) 2016 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + + +#include "asyncrpcoperation_sendmany.h" + +#include +#include +#include + +AsyncRPCOperation_sendmany::AsyncRPCOperation_sendmany(std::string fromAddress, std::vector outputs, int minconf) : fromAddress(fromAddress), outputs(outputs), minconf(minconf) +{ +} + +AsyncRPCOperation_sendmany::AsyncRPCOperation_sendmany(const AsyncRPCOperation_sendmany& orig) { +} + +AsyncRPCOperation_sendmany::~AsyncRPCOperation_sendmany() { +} + +void AsyncRPCOperation_sendmany::main() { + if (isCancelled()) + return; + + setState(OperationStatus::EXECUTING); + startExecutionClock(); + + /** + * Dummy run of a sendmany operation + */ + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + + std::cout << std::endl << "z_sendmany: **************** DUMMY RUN *****************" << std::endl; + std::cout << "z_sendmany: source of funds: " << fromAddress << std::endl; + std::cout << "z_sendmany: minconf: " << minconf << std::endl; + + for (SendManyRecipient & t : outputs) { + std::cout << "z_sendmany: send " << std::get<1>(t) << " to " << std::get<0>(t) << std::endl; + std::string memo = std::get<2>(t); + if (memo.size()>0) { + std::cout << " : memo = " << memo << std::endl; + } + } + + std::cout << "z_sendmany: checking balances and selecting coins and notes..." << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + std::cout << "z_sendmany: performing a joinsplit..." << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(3000)); + + std::cout << "z_sendmany: attempting to broadcast to network..." << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + std::cout << "z_sendmany: operation complete!" << std::endl; + std::cout << "z_sendmany: ********************************************" << std::endl; + + stopExecutionClock(); + + + // dummy run will say that even number of outputs is success + bool isEven = outputs.size() % 2 == 0; + //std::cout<< "here is r: " << r << std::endl; + if (isEven) { + setState(OperationStatus::SUCCESS); + Object obj; + obj.push_back(Pair("dummy_txid", "4a1298544a1298544a1298544a1298544a129854")); + obj.push_back(Pair("dummy_fee", 0.0001)); // dummy fee + setResult(Value(obj)); + } else { + setState(OperationStatus::FAILED); + errorCode = std::rand(); + errorMessage = "Dummy run tests error handling by not liking an odd number number of outputs."; + } + +} + diff --git a/src/wallet/asyncrpcoperation_sendmany.h b/src/wallet/asyncrpcoperation_sendmany.h new file mode 100644 index 000000000..81496c798 --- /dev/null +++ b/src/wallet/asyncrpcoperation_sendmany.h @@ -0,0 +1,32 @@ +// Copyright (c) 2016 The Zcash developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef ASYNCRPCOPERATION_SENDMANY_H +#define ASYNCRPCOPERATION_SENDMANY_H + +#include "../asyncrpcoperation.h" + +#include "amount.h" + +#include + +// A recipient is a tuple of address, amount, memo (optional if zaddr) +typedef std::tuple SendManyRecipient; + +class AsyncRPCOperation_sendmany : public AsyncRPCOperation { +public: + AsyncRPCOperation_sendmany(std::string fromAddress, std::vector outputs, int minconf); + AsyncRPCOperation_sendmany(const AsyncRPCOperation_sendmany& orig); + virtual ~AsyncRPCOperation_sendmany(); + + virtual void main(); + +private: + std::string fromAddress; + std::vector outputs; + int minconf; +}; + +#endif /* ASYNCRPCOPERATION_SENDMANY_H */ + diff --git a/src/wallet/rpcwallet.cpp b/src/wallet/rpcwallet.cpp index 44ac2c77c..de7a9f8ff 100644 --- a/src/wallet/rpcwallet.cpp +++ b/src/wallet/rpcwallet.cpp @@ -20,6 +20,10 @@ #include "zcbenchmarks.h" #include "script/interpreter.h" +#include "utiltime.h" +#include "asyncrpcoperation.h" +#include "wallet/asyncrpcoperation_sendmany.h" + #include "sodium.h" #include @@ -28,6 +32,7 @@ #include "json/json_spirit_utils.h" #include "json/json_spirit_value.h" +#include "asyncrpcqueue.h" using namespace std; using namespace json_spirit; @@ -2827,3 +2832,189 @@ Value z_listaddresses(const Array& params, bool fHelp) return ret; } + +Value z_getoperationstatus(const Array& params, bool fHelp) +{ + if (!EnsureWalletIsAvailable(fHelp)) + return Value::null; + + if (fHelp || params.size() != 1) + throw runtime_error( + "z_getoperationstatus \"operationid\"\n" + "\nGet operation status and any associated result or error data." + + HelpRequiringPassphrase() + "\n" + "\nArguments:\n" + "1. \"operationid\" (string, required) The operation id returned by an async operation call.\n" + "\nResult:\n" + "\" object\" (string) FIXME: ASYNC operation object \n" + " with some key value pairs.\n" + ); + + LOCK2(cs_main, pwalletMain->cs_wallet); + + AsyncRPCOperationId id = params[0].get_str(); + + std::shared_ptr q = getAsyncRPCQueue(); + std::shared_ptr operation = q->getOperationForId(id); + if (!operation) { + throw JSONRPCError(RPC_INVALID_PARAMETER, "No operation exists for that id."); + } + + Value status = operation->getStatus(); + + // Remove operation from memory when it has finished and the caller has retrieved the result and reason for finishing. + if (operation->isSuccess() || operation->isFailed() || operation->isCancelled()) { + q->popOperationForId(id); + } + + return status; +} + + +Value z_sendmany(const Array& params, bool fHelp) +{ + if (!EnsureWalletIsAvailable(fHelp)) + return Value::null; + + if (fHelp || params.size() < 2 || params.size() > 3) + throw runtime_error( + "z_sendmany \"fromaddress\" [{\"address\":... ,\"amount\":...},...] ( minconf )\n" + "\n*** THIS COMMAND HAS NOT BEEN IMPLEMENTED YET ***" + "\nSend multiple times. Amounts are double-precision floating point numbers." + + HelpRequiringPassphrase() + "\n" + "\nArguments:\n" + "1. \"fromaddress\" (string, required) The taddr or zaddr to send the funds from.\n" + "2. \"amounts\" (array, required) An array of json objects representing the amounts to send.\n" + " [{\n" + " \"address\":address (string, required) The address is a taddr or zaddr\n" + " \"amount\":amount (numeric, required) The numeric amount in ZEC is the value\n" + " \"memo\":memo (string, optional) If the address is a zaddr, raw data represented in hexadecimal string format\n" + " }, ... ]\n" + "3. minconf (numeric, optional, default=1) Only use funds confirmed at least this many times.\n" + "\nResult:\n" + "\"operationid\" (string) An operationid to pass to z_getoperationstatus to get the result of the operation.\n" + ); + + LOCK2(cs_main, pwalletMain->cs_wallet); + + + // Check that the from address is valid. + auto fromaddress = params[0].get_str(); + bool fromTaddr = false; + CBitcoinAddress taddr(fromaddress); + fromTaddr = taddr.IsValid(); + libzcash::PaymentAddress zaddr; + if (!fromTaddr) { + CZCPaymentAddress address(fromaddress); + try { + zaddr = address.Get(); + } catch (std::runtime_error) { + // invalid + throw JSONRPCError(RPC_INVALID_ADDRESS_OR_KEY, "Invalid from address, should be a taddr or zaddr."); + } + } + + // Check that we have the spending key? + if (!fromTaddr) { + if (!pwalletMain->HaveSpendingKey(zaddr)) { + throw JSONRPCError(RPC_INVALID_ADDRESS_OR_KEY, "From address does not belong to this node, zaddr spending key not found."); + } + } + + + Array outputs = params[1].get_array(); + + if (outputs.size()==0) + throw JSONRPCError(RPC_INVALID_PARAMETER, "Invalid parameter, amounts array is empty."); + + // Keep track of addresses to spot duplicates + set setAddress; + + // Recipients + std::vector recipients; + + BOOST_FOREACH(Value& output, outputs) + { + if (output.type() != obj_type) + throw JSONRPCError(RPC_INVALID_PARAMETER, "Invalid parameter, expected object"); + const Object& o = output.get_obj(); + + RPCTypeCheck(o, boost::assign::map_list_of("address", str_type)("amount", real_type)); + + // sanity check, report error if unknown key-value pairs +// for (auto& p : o) { + for (const Pair& p : o) { + std::string s = p.name_; + if (s != "address" && s != "amount" && s!="memo") + throw JSONRPCError(RPC_INVALID_PARAMETER, string("Invalid parameter, unknown key: ")+s); + } + + string address = find_value(o, "address").get_str(); + bool isZaddr = false; + CBitcoinAddress taddr(address); + if (!taddr.IsValid()) { + try { + CZCPaymentAddress zaddr(address); + zaddr.Get(); + isZaddr = true; + } catch (std::runtime_error) { + throw JSONRPCError(RPC_INVALID_PARAMETER, string("Invalid parameter, unknown address format: ")+address ); + } + } + + if (setAddress.count(address)) + throw JSONRPCError(RPC_INVALID_PARAMETER, string("Invalid parameter, duplicated address: ")+address); + setAddress.insert(address); + + Value memoValue = find_value(o, "memo"); + string memo; + if (!memoValue.is_null()) { + memo = memoValue.get_str(); + if (!isZaddr) { + throw JSONRPCError(RPC_INVALID_PARAMETER, "Memo can not be used with a taddr. It can only be used with a zaddr."); + } else if (!IsHex(memo)) { + throw JSONRPCError(RPC_INVALID_PARAMETER, "Invalid parameter, expected memo data in hexadecimal format."); + } + } + + //int nOutput = find_value(o, "amount").get_real(); // int(); + Value av = find_value(o, "amount"); + CAmount nAmount = AmountFromValue( av ); + if (nAmount < 0) + throw JSONRPCError(RPC_INVALID_PARAMETER, "Invalid parameter, amount must be positive"); + + recipients.push_back( SendManyRecipient(address, nAmount, memo) ); + } + + + + // Minimum confirmations + int nMinDepth = 1; + if (params.size() > 2) + nMinDepth = params[2].get_int(); + + +// std::vector +// GetPaymentAddresses(addresses); +// for (auto addr : addresses ) { +// ret.push_back(CZCPaymentAddress(addr).ToString()); +// } + + + std::shared_ptr q = getAsyncRPCQueue(); + std::shared_ptr operation( new AsyncRPCOperation_sendmany(fromaddress, recipients, nMinDepth) ); + // operation-> + //std::shared_ptr operation = make_shared(AsyncRPCOperation_sendmany()); + q->addOperation(operation); + AsyncRPCOperationId operationId = operation->getId(); + return operationId; +// return Value::null; + +// Array ret; +// std::set addresses; +// pwalletMain->GetPaymentAddresses(addresses); +// for (auto addr : addresses ) { +// ret.push_back(CZCPaymentAddress(addr).ToString()); +// } +// return ret; +}