Browse Source

Add async RPC queue and operation classes.

Add z_getoperationstatus RPC command.
Add z_sendmany RPC command (dummy implementation, does not send actual
coins).
pull/145/head
Simon 8 years ago
parent
commit
fc72c078be
  1. 6
      src/Makefile.am
  2. 147
      src/asyncrpcoperation.cpp
  3. 148
      src/asyncrpcoperation.h
  4. 147
      src/asyncrpcqueue.cpp
  5. 61
      src/asyncrpcqueue.h
  6. 2
      src/rpcclient.cpp
  7. 23
      src/rpcserver.cpp
  8. 9
      src/rpcserver.h
  9. 77
      src/wallet/asyncrpcoperation_sendmany.cpp
  10. 32
      src/wallet/asyncrpcoperation_sendmany.h
  11. 191
      src/wallet/rpcwallet.cpp

6
src/Makefile.am

@ -86,6 +86,8 @@ BITCOIN_CORE_H = \
alert.h \
amount.h \
arith_uint256.h \
asyncrpcoperation.h \
asyncrpcqueue.h \
base58.h \
bloom.h \
chain.h \
@ -162,6 +164,7 @@ BITCOIN_CORE_H = \
utiltime.h \
validationinterface.h \
version.h \
wallet/asyncrpcoperation_sendmany.h \
wallet/crypter.h \
wallet/db.h \
wallet/wallet.h \
@ -191,6 +194,8 @@ libbitcoin_server_a_SOURCES = \
sendalert.cpp \
addrman.cpp \
alert.cpp \
asyncrpcoperation.cpp \
asyncrpcqueue.cpp \
bloom.cpp \
chain.cpp \
checkpoints.cpp \
@ -224,6 +229,7 @@ libbitcoin_server_a_SOURCES = \
libbitcoin_wallet_a_CPPFLAGS = $(BITCOIN_INCLUDES)
libbitcoin_wallet_a_SOURCES = \
zcbenchmarks.cpp \
wallet/asyncrpcoperation_sendmany.cpp \
wallet/crypter.cpp \
wallet/db.cpp \
wallet/rpcdump.cpp \

147
src/asyncrpcoperation.cpp

@ -0,0 +1,147 @@
// Copyright (c) 2016 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "asyncrpcoperation.h"
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <string>
#include <ctime>
#include <chrono>
using namespace std;
using namespace json_spirit;
static boost::uuids::random_generator uuidgen;
std::map<OperationStatus, std::string> OperationStatusMap = {
{OperationStatus::READY, "queued"},
{OperationStatus::EXECUTING, "executing"},
{OperationStatus::CANCELLED, "cancelled"},
{OperationStatus::FAILED, "failed"},
{OperationStatus::SUCCESS, "success"}
};
AsyncRPCOperation::AsyncRPCOperation() : errorCode(0), errorMessage() {
// Set a unique reference for each operation
boost::uuids::uuid uuid = uuidgen();
std::string s = boost::uuids::to_string(uuid);
setId(s);
setState(OperationStatus::READY);
creationTime = (int64_t)time(NULL);
}
AsyncRPCOperation::AsyncRPCOperation(const AsyncRPCOperation& o) : id(o.id), creationTime(o.creationTime), state(o.state.load())
{
}
AsyncRPCOperation& AsyncRPCOperation::operator=( const AsyncRPCOperation& other ) {
this->id = other.getId();
this->creationTime = other.creationTime;
this->state.store(other.state.load());
return *this;
}
AsyncRPCOperation::~AsyncRPCOperation() {
}
void AsyncRPCOperation::cancel() {
if (isReady())
setState(OperationStatus::CANCELLED);
}
void AsyncRPCOperation::startExecutionClock() {
startTime = std::chrono::system_clock::now();
}
void AsyncRPCOperation::stopExecutionClock() {
endTime = std::chrono::system_clock::now();
}
// Implement this method in any subclass.
// This is just an example implementation.
void AsyncRPCOperation::main() {
if (isCancelled())
return;
setState(OperationStatus::EXECUTING);
//
// Do some work here...
//
startExecutionClock();
//std::this_thread::sleep_for(std::chrono::milliseconds(10000));
stopExecutionClock();
// If there was an error...
// setErrorCode(123);
// setErrorMessage("Murphy's law");
// setState(OperationStatus::FAILED);
// Otherwise
Value v("We have a result!");
setResult(v);
setState(OperationStatus::SUCCESS);
}
Value AsyncRPCOperation::getError() const {
if (!isFailed())
return Value::null;
Object error;
error.push_back(Pair("code", this->errorCode));
error.push_back(Pair("message", this->errorMessage));
return Value(error);
}
Value AsyncRPCOperation::getResult() const {
if (!isSuccess())
return Value::null;
return this->resultValue;
}
/*
* Returns a status Value object.
* If the operation has failed, it will include an error object.
* If the operation has succeeded, it will include the result value.
*/
Value AsyncRPCOperation::getStatus() const {
OperationStatus status = this->getState();
Object obj;
obj.push_back(Pair("id", this->getId()));
obj.push_back(Pair("status", OperationStatusMap[status]));
obj.push_back(Pair("creation_time", this->creationTime));
// creation, exec time, duration, exec end, etc.
Value err = this->getError();
if (!err.is_null()) {
obj.push_back(Pair("error", err.get_obj()));
}
Value result = this->getResult();
if (!result.is_null()) {
obj.push_back(Pair("result", result));
// Include execution time for successful operation
std::chrono::duration<double> elapsed_seconds = endTime - startTime;
obj.push_back(Pair("execution_secs", elapsed_seconds.count()));
}
return Value(obj);
}

148
src/asyncrpcoperation.h

@ -0,0 +1,148 @@
// Copyright (c) 2016 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef ASYNCRPCOPERATION_H
#define ASYNCRPCOPERATION_H
#include <string>
#include <atomic>
#include <map>
#include <chrono>
#include "json/json_spirit_value.h"
#include "json/json_spirit_utils.h"
#include "json/json_spirit_reader_template.h"
#include "json/json_spirit_writer_template.h"
using namespace std;
using namespace json_spirit;
/**
* AsyncRPCOperations are given to the AsyncRPCQueue for processing.
*
* How to subclass:
* Implement the main() method, this is where work is performed.
* Update the operation status as work is underway and completes.
*/
typedef std::string AsyncRPCOperationId;
typedef enum class operationStateEnum {
READY = 0,
EXECUTING,
CANCELLED,
FAILED,
SUCCESS
} OperationStatus;
class AsyncRPCOperation {
public:
AsyncRPCOperation();
// Todo: keep or delete copy constructors and assignment?
AsyncRPCOperation(const AsyncRPCOperation& orig);
AsyncRPCOperation& operator=( const AsyncRPCOperation& other );
virtual ~AsyncRPCOperation();
// Implement this method in your subclass.
virtual void main();
void cancel();
// Getters and setters
OperationStatus getState() const {
return state.load();
}
AsyncRPCOperationId getId() const {
return id;
}
int64_t getCreationTime() const {
return creationTime;
}
Value getStatus() const;
Value getError() const;
Value getResult() const;
int getErrorCode() const {
return errorCode;
}
std::string getErrorMessage() const {
return errorMessage;
}
bool isCancelled() const {
return OperationStatus::CANCELLED==getState();
}
bool isExecuting() const {
return OperationStatus::EXECUTING==getState();
}
bool isReady() const {
return OperationStatus::READY==getState();
}
bool isFailed() const {
return OperationStatus::FAILED==getState();
}
bool isSuccess() const {
return OperationStatus::SUCCESS==getState();
}
protected:
Value resultValue;
int errorCode;
std::string errorMessage;
std::atomic<OperationStatus> state;
std::chrono::time_point<std::chrono::system_clock> startTime, endTime;
void startExecutionClock();
void stopExecutionClock();
void setState(OperationStatus state) {
this->state.store(state);
}
void setErrorCode(int errorCode) {
this->errorCode = errorCode;
}
void setErrorMessage(std::string errorMessage) {
this->errorMessage = errorMessage;
}
void setResult(Value v) {
this->resultValue = v;
}
private:
// Todo: Private for now. If copying an operation is possible, should it
// receive a new id and a new creation time?
void setId(AsyncRPCOperationId id) {
this->id = id;
}
// Todo: Ditto above.
void setCreationTime(int64_t creationTime) {
this->creationTime = creationTime;
}
AsyncRPCOperationId id;
int64_t creationTime;
};
#endif /* ASYNCRPCOPERATION_H */

147
src/asyncrpcqueue.cpp

@ -0,0 +1,147 @@
// Copyright (c) 2016 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "asyncrpcqueue.h"
static std::atomic<int> workerCounter(0);
AsyncRPCQueue::AsyncRPCQueue() : closed(false) {
}
/*
* Calling thread will join on all the worker threads
*/
AsyncRPCQueue::~AsyncRPCQueue() {
this->closed = true; // set this in case close() was not invoked
for (std::thread & t : this->workers) {
t.join();
}
}
/*
* A worker will execute this method on a new thread
*/
void AsyncRPCQueue::run(int workerId) {
// std::cout << "Launched queue worker " << workerId << std::endl;
while (!isClosed()) {
AsyncRPCOperationId key;
std::shared_ptr<AsyncRPCOperation> operation;
{
std::unique_lock< std::mutex > guard(cs_lock);
while (operationIdQueue.empty() && !isClosed()) {
this->cs_condition.wait(guard);
}
// Exit if the queue is closing.
if (isClosed())
break;
// Get operation id
key = operationIdQueue.front();
operationIdQueue.pop();
// Search operation map
AsyncRPCOperationMap::const_iterator iter = operationMap.find(key);
if (iter != operationMap.end()) {
operation = iter->second;
}
}
if (!operation) {
// cannot find operation in map, may have been removed
} else if (operation->isCancelled()) {
// skip cancelled operation
} else {
operation->main();
}
}
// std::cout << "Terminating queue worker " << workerId << std::endl;
}
/*
* Add shared_ptr to operation.
*
* To retain polymorphic behaviour, i.e. main() method of derived classes is invoked,
* caller should create the shared_ptr like thi:
*
* std::shared_ptr<AsyncRPCOperation> ptr(new MyCustomAsyncRPCOperation(params));
*
* Don't use std::make_shared<AsyncRPCOperation>().
*/
void AsyncRPCQueue::addOperation(const std::shared_ptr<AsyncRPCOperation> &ptrOperation) {
// Don't add if queue is closed
if (isClosed())
return;
AsyncRPCOperationId id = ptrOperation->getId();
{
std::lock_guard< std::mutex > guard(cs_lock);
operationMap.emplace(id, ptrOperation);
operationIdQueue.push(id);
this->cs_condition.notify_one();
}
}
std::shared_ptr<AsyncRPCOperation> AsyncRPCQueue::getOperationForId(AsyncRPCOperationId id) {
std::shared_ptr<AsyncRPCOperation> ptr;
std::lock_guard< std::mutex > guard(cs_lock);
AsyncRPCOperationMap::const_iterator iter = operationMap.find(id);
if (iter != operationMap.end()) {
ptr = iter->second;
}
return ptr;
}
std::shared_ptr<AsyncRPCOperation> AsyncRPCQueue::popOperationForId(AsyncRPCOperationId id) {
std::shared_ptr<AsyncRPCOperation> ptr = getOperationForId(id);
if (ptr) {
std::lock_guard< std::mutex > guard(cs_lock);
// Note: if the id still exists in the operationIdQueue, when it gets processed by a worker
// there will no operation in the map to execute, so nothing will happen.
operationMap.erase(id);
}
return ptr;
}
bool AsyncRPCQueue::isClosed() {
return closed;
}
void AsyncRPCQueue::close() {
this->closed = true;
cancelAllOperations();
}
/*
* Call cancel() on all operations
*/
void AsyncRPCQueue::cancelAllOperations() {
std::unique_lock< std::mutex > guard(cs_lock);
for (auto key : operationMap) {
key.second->cancel();
}
this->cs_condition.notify_all();
}
int AsyncRPCQueue::getOperationCount() {
std::unique_lock< std::mutex > guard(cs_lock);
return operationIdQueue.size();
}
/*
* Spawn a worker thread
*/
void AsyncRPCQueue::addWorker() {
std::unique_lock< std::mutex > guard(cs_lock); // Todo: could just have a lock on the vector
workers.emplace_back( std::thread(&AsyncRPCQueue::run, this, ++workerCounter) );
}
int AsyncRPCQueue::getNumberOfWorkers() {
return workers.size();
}

61
src/asyncrpcqueue.h

@ -0,0 +1,61 @@
// Copyright (c) 2014 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef ASYNCRPCQUEUE_H
#define ASYNCRPCQUEUE_H
#include "asyncrpcoperation.h"
#include <iostream>
#include <string>
#include <chrono>
#include <queue>
#include <unordered_map>
#include <vector>
#include <future>
#include <thread>
#include <utility>
#include <memory>
typedef std::unordered_map<AsyncRPCOperationId, std::shared_ptr<AsyncRPCOperation> > AsyncRPCOperationMap;
class AsyncRPCQueue {
public:
AsyncRPCQueue();
virtual ~AsyncRPCQueue();
// We don't want queue to be copied or moved around
AsyncRPCQueue(AsyncRPCQueue const&) = delete; // Copy construct
AsyncRPCQueue(AsyncRPCQueue&&) = delete; // Move construct
AsyncRPCQueue& operator=(AsyncRPCQueue const&) = delete; // Copy assign
AsyncRPCQueue& operator=(AsyncRPCQueue &&) = delete; // Move assign
void addWorker();
int getNumberOfWorkers();
bool isClosed();
void close();
void cancelAllOperations();
int getOperationCount();
std::shared_ptr<AsyncRPCOperation> getOperationForId(AsyncRPCOperationId);
std::shared_ptr<AsyncRPCOperation> popOperationForId(AsyncRPCOperationId);
void addOperation(const std::shared_ptr<AsyncRPCOperation> &ptrOperation);
private:
// addWorker() will spawn a new thread on this method
void run(int workerId);
std::mutex cs_lock;
std::condition_variable cs_condition;
bool closed;
AsyncRPCOperationMap operationMap;
std::queue <AsyncRPCOperationId> operationIdQueue;
std::vector<std::thread> workers;
};
#endif

2
src/rpcclient.cpp

@ -98,6 +98,8 @@ static const CRPCConvertParam vRPCConvertParams[] =
{ "zcbenchmark", 1 },
{ "zcbenchmark", 2 },
{ "getblocksubsidy", 0},
{ "z_sendmany", 1},
{ "z_sendmany", 2},
{ "z_importkey", 1 }
};

23
src/rpcserver.cpp

@ -15,6 +15,9 @@
#ifdef ENABLE_WALLET
#include "wallet/wallet.h"
#endif
#include "asyncrpcqueue.h"
#include <memory>
#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>
@ -49,6 +52,7 @@ static boost::thread_group* rpc_worker_group = NULL;
static boost::asio::io_service::work *rpc_dummy_work = NULL;
static std::vector<CSubNet> rpc_allow_subnets; //!< List of subnets to allow RPC connections from
static std::vector< boost::shared_ptr<ip::tcp::acceptor> > rpc_acceptors;
static shared_ptr<AsyncRPCQueue> async_rpc_queue;
static struct CRPCSignals
{
@ -382,6 +386,8 @@ static const CRPCCommand vRPCCommands[] =
{ "wallet", "zcrawjoinsplit", &zc_raw_joinsplit, true },
{ "wallet", "zcrawreceive", &zc_raw_receive, true },
{ "wallet", "zcsamplejoinsplit", &zc_sample_joinsplit, true },
{ "wallet", "z_sendmany", &z_sendmany, true },
{ "wallet", "z_getoperationstatus", &z_getoperationstatus, true },
{ "wallet", "z_getnewaddress", &z_getnewaddress, true },
{ "wallet", "z_listaddresses", &z_listaddresses, true },
{ "wallet", "z_exportkey", &z_exportkey, true },
@ -737,6 +743,13 @@ void StartRPCThreads()
rpc_worker_group->create_thread(boost::bind(&boost::asio::io_service::run, rpc_io_service));
fRPCRunning = true;
g_rpcSignals.Started();
// Launch at least one async rpc worker
async_rpc_queue = std::make_shared<AsyncRPCQueue>();
async_rpc_queue->addWorker();
async_rpc_queue->addWorker();
async_rpc_queue->addWorker();
}
void StartDummyRPCThread()
@ -786,6 +799,10 @@ void StopRPCThreads()
delete rpc_worker_group; rpc_worker_group = NULL;
delete rpc_ssl_context; rpc_ssl_context = NULL;
delete rpc_io_service; rpc_io_service = NULL;
// Tells async queue to cancel all operations and shutdown.
// The async queue destructor will block and join on worker threads.
async_rpc_queue->close();
}
bool IsRPCRunning()
@ -1048,3 +1065,9 @@ std::string HelpExampleRpc(string methodname, string args){
}
const CRPCTable tableRPC;
// Return async rpc queue
std::shared_ptr<AsyncRPCQueue> getAsyncRPCQueue()
{
return async_rpc_queue;
}

9
src/rpcserver.h

@ -14,11 +14,13 @@
#include <map>
#include <stdint.h>
#include <string>
#include <memory>
#include "json/json_spirit_reader_template.h"
#include "json/json_spirit_utils.h"
#include "json/json_spirit_writer_template.h"
class AsyncRPCQueue;
class CRPCCommand;
namespace RPCServer
@ -55,6 +57,10 @@ void StopRPCThreads();
/** Query whether RPC is running */
bool IsRPCRunning();
/** Get the async queue*/
std::shared_ptr<AsyncRPCQueue> getAsyncRPCQueue();
/**
* Set the RPC warmup status. When this is done, all RPC calls will error out
* immediately with RPC_IN_WARMUP.
@ -250,6 +256,9 @@ extern json_spirit::Value z_getnewaddress(const json_spirit::Array& params, bool
extern json_spirit::Value z_listaddresses(const json_spirit::Array& params, bool fHelp); // in rpcwallet.cpp
extern json_spirit::Value z_exportwallet(const json_spirit::Array& params, bool fHelp); // in rpcdump.cpp
extern json_spirit::Value z_importwallet(const json_spirit::Array& params, bool fHelp); // in rpcdump.cpp
extern json_spirit::Value z_sendmany(const json_spirit::Array& params, bool fHelp); // in rpcwallet.cpp
extern json_spirit::Value z_getoperationstatus(const json_spirit::Array& params, bool fHelp); // in rpcwallet.cpp
// in rest.cpp
extern bool HTTPReq_REST(AcceptedConnection *conn,

77
src/wallet/asyncrpcoperation_sendmany.cpp

@ -0,0 +1,77 @@
// Copyright (c) 2016 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "asyncrpcoperation_sendmany.h"
#include <iostream>
#include <chrono>
#include <thread>
AsyncRPCOperation_sendmany::AsyncRPCOperation_sendmany(std::string fromAddress, std::vector<SendManyRecipient> outputs, int minconf) : fromAddress(fromAddress), outputs(outputs), minconf(minconf)
{
}
AsyncRPCOperation_sendmany::AsyncRPCOperation_sendmany(const AsyncRPCOperation_sendmany& orig) {
}
AsyncRPCOperation_sendmany::~AsyncRPCOperation_sendmany() {
}
void AsyncRPCOperation_sendmany::main() {
if (isCancelled())
return;
setState(OperationStatus::EXECUTING);
startExecutionClock();
/**
* Dummy run of a sendmany operation
*/
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::cout << std::endl << "z_sendmany: **************** DUMMY RUN *****************" << std::endl;
std::cout << "z_sendmany: source of funds: " << fromAddress << std::endl;
std::cout << "z_sendmany: minconf: " << minconf << std::endl;
for (SendManyRecipient & t : outputs) {
std::cout << "z_sendmany: send " << std::get<1>(t) << " to " << std::get<0>(t) << std::endl;
std::string memo = std::get<2>(t);
if (memo.size()>0) {
std::cout << " : memo = " << memo << std::endl;
}
}
std::cout << "z_sendmany: checking balances and selecting coins and notes..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "z_sendmany: performing a joinsplit..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
std::cout << "z_sendmany: attempting to broadcast to network..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "z_sendmany: operation complete!" << std::endl;
std::cout << "z_sendmany: ********************************************" << std::endl;
stopExecutionClock();
// dummy run will say that even number of outputs is success
bool isEven = outputs.size() % 2 == 0;
//std::cout<< "here is r: " << r << std::endl;
if (isEven) {
setState(OperationStatus::SUCCESS);
Object obj;
obj.push_back(Pair("dummy_txid", "4a1298544a1298544a1298544a1298544a129854"));
obj.push_back(Pair("dummy_fee", 0.0001)); // dummy fee
setResult(Value(obj));
} else {
setState(OperationStatus::FAILED);
errorCode = std::rand();
errorMessage = "Dummy run tests error handling by not liking an odd number number of outputs.";
}
}

32
src/wallet/asyncrpcoperation_sendmany.h

@ -0,0 +1,32 @@
// Copyright (c) 2016 The Zcash developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef ASYNCRPCOPERATION_SENDMANY_H
#define ASYNCRPCOPERATION_SENDMANY_H
#include "../asyncrpcoperation.h"
#include "amount.h"
#include <tuple>
// A recipient is a tuple of address, amount, memo (optional if zaddr)
typedef std::tuple<std::string, CAmount, std::string> SendManyRecipient;
class AsyncRPCOperation_sendmany : public AsyncRPCOperation {
public:
AsyncRPCOperation_sendmany(std::string fromAddress, std::vector<SendManyRecipient> outputs, int minconf);
AsyncRPCOperation_sendmany(const AsyncRPCOperation_sendmany& orig);
virtual ~AsyncRPCOperation_sendmany();
virtual void main();
private:
std::string fromAddress;
std::vector<SendManyRecipient> outputs;
int minconf;
};
#endif /* ASYNCRPCOPERATION_SENDMANY_H */

191
src/wallet/rpcwallet.cpp

@ -20,6 +20,10 @@
#include "zcbenchmarks.h"
#include "script/interpreter.h"
#include "utiltime.h"
#include "asyncrpcoperation.h"
#include "wallet/asyncrpcoperation_sendmany.h"
#include "sodium.h"
#include <stdint.h>
@ -28,6 +32,7 @@
#include "json/json_spirit_utils.h"
#include "json/json_spirit_value.h"
#include "asyncrpcqueue.h"
using namespace std;
using namespace json_spirit;
@ -2827,3 +2832,189 @@ Value z_listaddresses(const Array& params, bool fHelp)
return ret;
}
Value z_getoperationstatus(const Array& params, bool fHelp)
{
if (!EnsureWalletIsAvailable(fHelp))
return Value::null;
if (fHelp || params.size() != 1)
throw runtime_error(
"z_getoperationstatus \"operationid\"\n"
"\nGet operation status and any associated result or error data."
+ HelpRequiringPassphrase() + "\n"
"\nArguments:\n"
"1. \"operationid\" (string, required) The operation id returned by an async operation call.\n"
"\nResult:\n"
"\" object\" (string) FIXME: ASYNC operation object \n"
" with some key value pairs.\n"
);
LOCK2(cs_main, pwalletMain->cs_wallet);
AsyncRPCOperationId id = params[0].get_str();
std::shared_ptr<AsyncRPCQueue> q = getAsyncRPCQueue();
std::shared_ptr<AsyncRPCOperation> operation = q->getOperationForId(id);
if (!operation) {
throw JSONRPCError(RPC_INVALID_PARAMETER, "No operation exists for that id.");
}
Value status = operation->getStatus();
// Remove operation from memory when it has finished and the caller has retrieved the result and reason for finishing.
if (operation->isSuccess() || operation->isFailed() || operation->isCancelled()) {
q->popOperationForId(id);
}
return status;
}
Value z_sendmany(const Array& params, bool fHelp)
{
if (!EnsureWalletIsAvailable(fHelp))
return Value::null;
if (fHelp || params.size() < 2 || params.size() > 3)
throw runtime_error(
"z_sendmany \"fromaddress\" [{\"address\":... ,\"amount\":...},...] ( minconf )\n"
"\n*** THIS COMMAND HAS NOT BEEN IMPLEMENTED YET ***"
"\nSend multiple times. Amounts are double-precision floating point numbers."
+ HelpRequiringPassphrase() + "\n"
"\nArguments:\n"
"1. \"fromaddress\" (string, required) The taddr or zaddr to send the funds from.\n"
"2. \"amounts\" (array, required) An array of json objects representing the amounts to send.\n"
" [{\n"
" \"address\":address (string, required) The address is a taddr or zaddr\n"
" \"amount\":amount (numeric, required) The numeric amount in ZEC is the value\n"
" \"memo\":memo (string, optional) If the address is a zaddr, raw data represented in hexadecimal string format\n"
" }, ... ]\n"
"3. minconf (numeric, optional, default=1) Only use funds confirmed at least this many times.\n"
"\nResult:\n"
"\"operationid\" (string) An operationid to pass to z_getoperationstatus to get the result of the operation.\n"
);
LOCK2(cs_main, pwalletMain->cs_wallet);
// Check that the from address is valid.
auto fromaddress = params[0].get_str();
bool fromTaddr = false;
CBitcoinAddress taddr(fromaddress);
fromTaddr = taddr.IsValid();
libzcash::PaymentAddress zaddr;
if (!fromTaddr) {
CZCPaymentAddress address(fromaddress);
try {
zaddr = address.Get();
} catch (std::runtime_error) {
// invalid
throw JSONRPCError(RPC_INVALID_ADDRESS_OR_KEY, "Invalid from address, should be a taddr or zaddr.");
}
}
// Check that we have the spending key?
if (!fromTaddr) {
if (!pwalletMain->HaveSpendingKey(zaddr)) {
throw JSONRPCError(RPC_INVALID_ADDRESS_OR_KEY, "From address does not belong to this node, zaddr spending key not found.");
}
}
Array outputs = params[1].get_array();
if (outputs.size()==0)
throw JSONRPCError(RPC_INVALID_PARAMETER, "Invalid parameter, amounts array is empty.");
// Keep track of addresses to spot duplicates
set<std::string> setAddress;
// Recipients
std::vector<SendManyRecipient> recipients;
BOOST_FOREACH(Value& output, outputs)
{
if (output.type() != obj_type)
throw JSONRPCError(RPC_INVALID_PARAMETER, "Invalid parameter, expected object");
const Object& o = output.get_obj();
RPCTypeCheck(o, boost::assign::map_list_of("address", str_type)("amount", real_type));
// sanity check, report error if unknown key-value pairs
// for (auto& p : o) {
for (const Pair& p : o) {
std::string s = p.name_;
if (s != "address" && s != "amount" && s!="memo")
throw JSONRPCError(RPC_INVALID_PARAMETER, string("Invalid parameter, unknown key: ")+s);
}
string address = find_value(o, "address").get_str();
bool isZaddr = false;
CBitcoinAddress taddr(address);
if (!taddr.IsValid()) {
try {
CZCPaymentAddress zaddr(address);
zaddr.Get();
isZaddr = true;
} catch (std::runtime_error) {
throw JSONRPCError(RPC_INVALID_PARAMETER, string("Invalid parameter, unknown address format: ")+address );
}
}
if (setAddress.count(address))
throw JSONRPCError(RPC_INVALID_PARAMETER, string("Invalid parameter, duplicated address: ")+address);
setAddress.insert(address);
Value memoValue = find_value(o, "memo");
string memo;
if (!memoValue.is_null()) {
memo = memoValue.get_str();
if (!isZaddr) {
throw JSONRPCError(RPC_INVALID_PARAMETER, "Memo can not be used with a taddr. It can only be used with a zaddr.");
} else if (!IsHex(memo)) {
throw JSONRPCError(RPC_INVALID_PARAMETER, "Invalid parameter, expected memo data in hexadecimal format.");
}
}
//int nOutput = find_value(o, "amount").get_real(); // int();
Value av = find_value(o, "amount");
CAmount nAmount = AmountFromValue( av );
if (nAmount < 0)
throw JSONRPCError(RPC_INVALID_PARAMETER, "Invalid parameter, amount must be positive");
recipients.push_back( SendManyRecipient(address, nAmount, memo) );
}
// Minimum confirmations
int nMinDepth = 1;
if (params.size() > 2)
nMinDepth = params[2].get_int();
// std::vector<string>
// GetPaymentAddresses(addresses);
// for (auto addr : addresses ) {
// ret.push_back(CZCPaymentAddress(addr).ToString());
// }
std::shared_ptr<AsyncRPCQueue> q = getAsyncRPCQueue();
std::shared_ptr<AsyncRPCOperation> operation( new AsyncRPCOperation_sendmany(fromaddress, recipients, nMinDepth) );
// operation->
//std::shared_ptr<AsyncRPCOperation> operation = make_shared<AsyncRPCOperation>(AsyncRPCOperation_sendmany());
q->addOperation(operation);
AsyncRPCOperationId operationId = operation->getId();
return operationId;
// return Value::null;
// Array ret;
// std::set<libzcash::PaymentAddress> addresses;
// pwalletMain->GetPaymentAddresses(addresses);
// for (auto addr : addresses ) {
// ret.push_back(CZCPaymentAddress(addr).ToString());
// }
// return ret;
}

Loading…
Cancel
Save