// Copyright (c) 2016-2024 The Hush developers // Copyright (c) 2016 The Zcash developers // Distributed under the GPLv3 software license, see the accompanying // file COPYING or https://www.gnu.org/licenses/gpl-3.0.en.html /****************************************************************************** * Copyright © 2014-2019 The SuperNET Developers. * * * * See the AUTHORS, DEVELOPER-AGREEMENT and LICENSE files at * * the top-level directory of this distribution for the individual copyright * * holder information and the developer policies on copyright and licensing. * * * * Unless otherwise agreed in a custom licensing agreement, no part of the * * SuperNET software, including this file may be copied, modified, propagated * * or distributed except according to the terms contained in the LICENSE file * * * * Removal or modification of this copyright notice is prohibited. * * * ******************************************************************************/ #include "asyncrpcqueue.h" static std::atomic workerCounter(0); /** * Static method to return the shared/default queue. */ shared_ptr AsyncRPCQueue::sharedInstance() { // Thread-safe in C+11 and gcc 4.3 static shared_ptr q = std::make_shared(); return q; } AsyncRPCQueue::AsyncRPCQueue() : closed_(false), finish_(false) { } AsyncRPCQueue::~AsyncRPCQueue() { closeAndWait(); // join on all worker threads } /** * A worker will execute this method on a new thread */ void AsyncRPCQueue::run(size_t workerId) { while (true) { AsyncRPCOperationId key; std::shared_ptr operation; { std::unique_lock guard(lock_); while (operation_id_queue_.empty() && !isClosed() && !isFinishing()) { this->condition_.wait(guard); } // Exit if the queue is empty and we are finishing up if (isFinishing() && operation_id_queue_.empty()) { break; } // Exit if the queue is closing. if (isClosed()) { while (!operation_id_queue_.empty()) { operation_id_queue_.pop(); } break; } // Get operation id key = operation_id_queue_.front(); operation_id_queue_.pop(); // Search operation map AsyncRPCOperationMap::const_iterator iter = operation_map_.find(key); if (iter != operation_map_.end()) { operation = iter->second; } } if (!operation) { // cannot find operation in map, may have been removed } else if (operation->isCancelled()) { // skip cancelled operation } else { operation->main(); } } } /** * Add shared_ptr to operation. * * To retain polymorphic behaviour, i.e. main() method of derived classes is invoked, * caller should create the shared_ptr like this: * * std::shared_ptr ptr(new MyCustomAsyncRPCOperation(params)); * * Don't use std::make_shared(). */ void AsyncRPCQueue::addOperation(const std::shared_ptr &ptrOperation) { std::lock_guard guard(lock_); // Don't add if queue is closed or finishing if (isClosed() || isFinishing()) { return; } AsyncRPCOperationId id = ptrOperation->getId(); operation_map_.emplace(id, ptrOperation); operation_id_queue_.push(id); this->condition_.notify_one(); } /** * Return the operation for a given operation id. */ std::shared_ptr AsyncRPCQueue::getOperationForId(AsyncRPCOperationId id) const { std::shared_ptr ptr; std::lock_guard guard(lock_); AsyncRPCOperationMap::const_iterator iter = operation_map_.find(id); if (iter != operation_map_.end()) { ptr = iter->second; } return ptr; } /** * Return the operation for a given operation id and then remove the operation from internal storage. */ std::shared_ptr AsyncRPCQueue::popOperationForId(AsyncRPCOperationId id) { std::shared_ptr ptr = getOperationForId(id); if (ptr) { std::lock_guard guard(lock_); // Note: if the id still exists in the operationIdQueue, when it gets processed by a worker // there will no operation in the map to execute, so nothing will happen. operation_map_.erase(id); } return ptr; } /** * Return true if the queue is closed to new operations. */ bool AsyncRPCQueue::isClosed() const { return closed_.load(); } /** * Close the queue and cancel all existing operations */ void AsyncRPCQueue::close() { closed_.store(true); cancelAllOperations(); } /** * Return true if the queue is finishing up */ bool AsyncRPCQueue::isFinishing() const { return finish_.load(); } /** * Close the queue but finish existing operations. Do not accept new operations. */ void AsyncRPCQueue::finish() { finish_.store(true); } /** * Call cancel() on all operations */ void AsyncRPCQueue::cancelAllOperations() { std::lock_guard guard(lock_); for (auto key : operation_map_) { key.second->cancel(); } this->condition_.notify_all(); } /** * Return the number of operations in the queue */ size_t AsyncRPCQueue::getOperationCount() const { std::lock_guard guard(lock_); return operation_id_queue_.size(); } /** * Spawn a worker thread */ void AsyncRPCQueue::addWorker() { std::lock_guard guard(lock_); workers_.emplace_back( std::thread(&AsyncRPCQueue::run, this, ++workerCounter) ); } /** * Return the number of worker threads spawned by the queue */ size_t AsyncRPCQueue::getNumberOfWorkers() const { std::lock_guard guard(lock_); return workers_.size(); } /** * Return a list of all known operation ids found in internal storage. */ std::vector AsyncRPCQueue::getAllOperationIds() const { std::lock_guard guard(lock_); std::vector v; for(auto & entry: operation_map_) { v.push_back(entry.first); } return v; } /** * Calling thread will close and wait for worker threads to join. */ void AsyncRPCQueue::closeAndWait() { close(); wait_for_worker_threads(); } /** * Block current thread until all workers have finished their tasks. */ void AsyncRPCQueue::finishAndWait() { finish(); wait_for_worker_threads(); } /** * Block current thread until all operations are finished or the queue has closed. */ void AsyncRPCQueue::wait_for_worker_threads() { // Notify any workers who are waiting, so they see the updated queue state { std::lock_guard guard(lock_); this->condition_.notify_all(); } for (std::thread & t : this->workers_) { if (t.joinable()) { t.join(); } } }