@ -31,10 +31,14 @@
# include "base/io/json/JsonRequest.h"
# include "base/io/log/Log.h"
# include "base/kernel/interfaces/IClientListener.h"
# include "base/net/dns/Dns.h"
# include "base/net/dns/DnsRecords.h"
# include "base/net/http/Fetch.h"
# include "base/net/http/HttpData.h"
# include "base/net/http/HttpListener.h"
# include "base/net/stratum/SubmitResult.h"
# include "base/net/tools/NetBuffer.h"
# include "base/tools/bswap_64.h"
# include "base/tools/Cvt.h"
# include "base/tools/Timer.h"
# include "base/tools/cryptonote/Signatures.h"
@ -48,7 +52,11 @@
namespace xmrig {
static const char * kBlocktemplateBlob = " blocktemplate_blob " ;
Storage < DaemonClient > DaemonClient : : m_storage ;
static const char * kBlocktemplateBlob = " blocktemplate_blob " ;
static const char * kGetHeight = " /getheight " ;
static const char * kGetInfo = " /getinfo " ;
static const char * kHash = " hash " ;
@ -57,6 +65,12 @@ static const char *kJsonRPC = "/json_rpc";
static constexpr size_t kBlobReserveSize = 8 ;
static const char kZMQGreeting [ 64 ] = { - 1 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 127 , 3 , 0 , ' N ' , ' U ' , ' L ' , ' L ' } ;
static constexpr size_t kZMQGreetingSize1 = 11 ;
static const char kZMQHandshake [ ] = " \4 \x19 \5 READY \ xbSocket-Type \0 \0 \0 \3 SUB " ;
static const char kZMQSubscribe [ ] = " \0 \x18 \1 json-minimal-chain_main " ;
}
@ -65,12 +79,25 @@ xmrig::DaemonClient::DaemonClient(int id, IClientListener *listener) :
{
m_httpListener = std : : make_shared < HttpListener > ( this ) ;
m_timer = new Timer ( this ) ;
m_key = m_storage . add ( this ) ;
}
xmrig : : DaemonClient : : ~ DaemonClient ( )
{
delete m_timer ;
delete m_ZMQSocket ;
}
void xmrig : : DaemonClient : : deleteLater ( )
{
if ( m_pool . zmq_port ( ) > = 0 ) {
ZMQClose ( true ) ;
}
else {
delete this ;
}
}
@ -159,7 +186,13 @@ void xmrig::DaemonClient::connect()
}
setState ( ConnectingState ) ;
getBlockTemplate ( ) ;
if ( m_pool . zmq_port ( ) > = 0 ) {
m_dns = Dns : : resolve ( m_pool . host ( ) , this ) ;
}
else {
getBlockTemplate ( ) ;
}
}
@ -238,7 +271,7 @@ void xmrig::DaemonClient::onHttpData(const HttpData &data)
void xmrig : : DaemonClient : : onTimer ( const Timer * )
{
if ( m_state = = ConnectingState ) {
getBlockTemplate ( ) ;
connect ( ) ;
}
else if ( m_state = = ConnectedState ) {
if ( m_apiVersion = = API_DERO ) {
@ -251,6 +284,43 @@ void xmrig::DaemonClient::onTimer(const Timer *)
}
void xmrig : : DaemonClient : : onResolved ( const DnsRecords & records , int status , const char * error )
{
m_dns . reset ( ) ;
if ( status < 0 & & records . isEmpty ( ) ) {
if ( ! isQuiet ( ) ) {
LOG_ERR ( " %s " RED ( " DNS error: " ) RED_BOLD ( " \" %s \" " ) , tag ( ) , error ) ;
}
retry ( ) ;
return ;
}
if ( m_ZMQSocket ) {
delete m_ZMQSocket ;
}
const auto & record = records . get ( ) ;
m_ip = record . ip ( ) ;
uv_connect_t * req = new uv_connect_t ;
req - > data = m_storage . ptr ( m_key ) ;
m_ZMQSocket = new uv_tcp_t ;
m_ZMQSocket - > data = m_storage . ptr ( m_key ) ;
uv_tcp_init ( uv_default_loop ( ) , m_ZMQSocket ) ;
uv_tcp_nodelay ( m_ZMQSocket , 1 ) ;
# ifndef WIN32
uv_tcp_keepalive ( m_ZMQSocket , 1 , 60 ) ;
# endif
uv_tcp_connect ( req , m_ZMQSocket , record . addr ( m_pool . zmq_port ( ) ) , onZMQConnect ) ;
}
bool xmrig : : DaemonClient : : isOutdated ( uint64_t height , const char * hash ) const
{
return m_job . height ( ) ! = height | | m_prevHash ! = hash ;
@ -452,7 +522,9 @@ bool xmrig::DaemonClient::parseResponse(int64_t id, const rapidjson::Value &resu
}
if ( handleSubmitResponse ( id , error_msg ) ) {
getBlockTemplate ( ) ;
if ( error_msg | | ( m_pool . zmq_port ( ) < 0 ) ) {
getBlockTemplate ( ) ;
}
return true ;
}
@ -504,6 +576,10 @@ void xmrig::DaemonClient::retry()
setState ( ConnectingState ) ;
}
if ( ( m_ZMQConnectionState ! = ZMQ_NOT_CONNECTED ) & & ( m_ZMQConnectionState ! = ZMQ_DISCONNECTING ) ) {
uv_close ( reinterpret_cast < uv_handle_t * > ( m_ZMQSocket ) , onZMQClose ) ;
}
m_timer - > stop ( ) ;
m_timer - > start ( m_retryPause , 0 ) ;
}
@ -531,8 +607,10 @@ void xmrig::DaemonClient::setState(SocketState state)
m_failures = 0 ;
m_listener - > onLoginSuccess ( this ) ;
const uint64_t interval = std : : max < uint64_t > ( 20 , m_pool . pollInterval ( ) ) ;
m_timer - > start ( interval , interval ) ;
if ( m_pool . zmq_port ( ) < 0 ) {
const uint64_t interval = std : : max < uint64_t > ( 20 , m_pool . pollInterval ( ) ) ;
m_timer - > start ( interval , interval ) ;
}
}
break ;
@ -545,3 +623,282 @@ void xmrig::DaemonClient::setState(SocketState state)
break ;
}
}
void xmrig : : DaemonClient : : onZMQConnect ( uv_connect_t * req , int status )
{
DaemonClient * client = getClient ( req - > data ) ;
delete req ;
if ( ! client ) {
return ;
}
if ( status < 0 ) {
LOG_ERR ( " %s " RED ( " ZMQ connect error: " ) RED_BOLD ( " \" %s \" " ) , client - > tag ( ) , uv_strerror ( status ) ) ;
client - > retry ( ) ;
return ;
}
client - > ZMQConnected ( ) ;
}
void xmrig : : DaemonClient : : onZMQRead ( uv_stream_t * stream , ssize_t nread , const uv_buf_t * buf )
{
DaemonClient * client = getClient ( stream - > data ) ;
if ( client ) {
client - > ZMQRead ( nread , buf ) ;
}
NetBuffer : : release ( buf ) ;
}
void xmrig : : DaemonClient : : onZMQClose ( uv_handle_t * handle )
{
DaemonClient * client = getClient ( handle - > data ) ;
if ( client ) {
# ifdef APP_DEBUG
LOG_DEBUG ( CYAN ( " tcp-zmq://%s:%u " ) BLACK_BOLD ( " disconnected " ) , client - > m_pool . host ( ) . data ( ) , client - > m_pool . zmq_port ( ) ) ;
# endif
client - > m_ZMQConnectionState = ZMQ_NOT_CONNECTED ;
}
}
void xmrig : : DaemonClient : : onZMQShutdown ( uv_handle_t * handle )
{
DaemonClient * client = getClient ( handle - > data ) ;
if ( client ) {
# ifdef APP_DEBUG
LOG_DEBUG ( CYAN ( " tcp-zmq://%s:%u " ) BLACK_BOLD ( " shutdown " ) , client - > m_pool . host ( ) . data ( ) , client - > m_pool . zmq_port ( ) ) ;
# endif
client - > m_ZMQConnectionState = ZMQ_NOT_CONNECTED ;
m_storage . remove ( client - > m_key ) ;
}
}
void xmrig : : DaemonClient : : ZMQConnected ( )
{
# ifdef APP_DEBUG
LOG_DEBUG ( CYAN ( " tcp-zmq://%s:%u " ) BLACK_BOLD ( " connected " ) , m_pool . host ( ) . data ( ) , m_pool . zmq_port ( ) ) ;
# endif
m_ZMQConnectionState = ZMQ_GREETING_1 ;
m_ZMQSendBuf . reserve ( 256 ) ;
m_ZMQRecvBuf . reserve ( 256 ) ;
if ( ZMQWrite ( kZMQGreeting , kZMQGreetingSize1 ) ) {
uv_read_start ( reinterpret_cast < uv_stream_t * > ( m_ZMQSocket ) , NetBuffer : : onAlloc , onZMQRead ) ;
}
}
bool xmrig : : DaemonClient : : ZMQWrite ( const char * data , size_t size )
{
m_ZMQSendBuf . assign ( data , data + size ) ;
uv_buf_t buf ;
buf . base = m_ZMQSendBuf . data ( ) ;
buf . len = static_cast < uint32_t > ( m_ZMQSendBuf . size ( ) ) ;
const int rc = uv_try_write ( reinterpret_cast < uv_stream_t * > ( m_ZMQSocket ) , & buf , 1 ) ;
if ( static_cast < size_t > ( rc ) = = buf . len ) {
return true ;
}
LOG_ERR ( " %s " RED ( " ZMQ write failed, rc = %d " ) , tag ( ) , rc ) ;
ZMQClose ( ) ;
return false ;
}
void xmrig : : DaemonClient : : ZMQRead ( ssize_t nread , const uv_buf_t * buf )
{
if ( nread < = 0 ) {
LOG_ERR ( " %s " RED ( " ZMQ read failed, nread = % " PRId64 ) , tag ( ) , nread ) ;
ZMQClose ( ) ;
return ;
}
m_ZMQRecvBuf . insert ( m_ZMQRecvBuf . end ( ) , buf - > base , buf - > base + nread ) ;
do {
switch ( m_ZMQConnectionState ) {
case ZMQ_GREETING_1 :
if ( m_ZMQRecvBuf . size ( ) > = kZMQGreetingSize1 ) {
if ( ( m_ZMQRecvBuf [ 0 ] = = - 1 ) & & ( m_ZMQRecvBuf [ 9 ] = = 127 ) & & ( m_ZMQRecvBuf [ 10 ] = = 3 ) ) {
ZMQWrite ( kZMQGreeting + kZMQGreetingSize1 , sizeof ( kZMQGreeting ) - kZMQGreetingSize1 ) ;
m_ZMQConnectionState = ZMQ_GREETING_2 ;
break ;
}
else {
LOG_ERR ( " %s " RED ( " ZMQ handshake failed: invalid greeting format " ) , tag ( ) ) ;
ZMQClose ( ) ;
}
}
return ;
case ZMQ_GREETING_2 :
if ( m_ZMQRecvBuf . size ( ) > = sizeof ( kZMQGreeting ) ) {
if ( memcmp ( m_ZMQRecvBuf . data ( ) + 12 , kZMQGreeting + 12 , 20 ) = = 0 ) {
m_ZMQConnectionState = ZMQ_HANDSHAKE ;
m_ZMQRecvBuf . erase ( m_ZMQRecvBuf . begin ( ) , m_ZMQRecvBuf . begin ( ) + sizeof ( kZMQGreeting ) ) ;
ZMQWrite ( kZMQHandshake , sizeof ( kZMQHandshake ) - 1 ) ;
break ;
}
else {
LOG_ERR ( " %s " RED ( " ZMQ handshake failed: invalid greeting format 2 " ) , tag ( ) ) ;
ZMQClose ( ) ;
}
}
return ;
case ZMQ_HANDSHAKE :
if ( m_ZMQRecvBuf . size ( ) > = 2 ) {
if ( m_ZMQRecvBuf [ 0 ] ! = 4 ) {
LOG_ERR ( " %s " RED ( " ZMQ handshake failed: invalid handshake format " ) , tag ( ) ) ;
ZMQClose ( ) ;
return ;
}
const size_t size = static_cast < unsigned char > ( m_ZMQRecvBuf [ 1 ] ) ;
if ( size < 18 ) {
LOG_ERR ( " %s " RED ( " ZMQ handshake failed: invalid handshake size " ) , tag ( ) ) ;
ZMQClose ( ) ;
return ;
}
if ( m_ZMQRecvBuf . size ( ) < size + 2 ) {
return ;
}
if ( memcmp ( m_ZMQRecvBuf . data ( ) + 2 , kZMQHandshake + 2 , 18 ) ! = 0 ) {
LOG_ERR ( " %s " RED ( " ZMQ handshake failed: invalid handshake data " ) , tag ( ) ) ;
ZMQClose ( ) ;
return ;
}
ZMQWrite ( kZMQSubscribe , sizeof ( kZMQSubscribe ) - 1 ) ;
m_ZMQConnectionState = ZMQ_CONNECTED ;
m_ZMQRecvBuf . erase ( m_ZMQRecvBuf . begin ( ) , m_ZMQRecvBuf . begin ( ) + size + 2 ) ;
getBlockTemplate ( ) ;
break ;
}
return ;
case ZMQ_CONNECTED :
ZMQParse ( ) ;
return ;
default :
return ;
}
} while ( true ) ;
}
void xmrig : : DaemonClient : : ZMQParse ( )
{
# ifdef APP_DEBUG
std : : vector < char > msg ;
# endif
size_t msg_size = 0 ;
char * data = m_ZMQRecvBuf . data ( ) ;
size_t avail = m_ZMQRecvBuf . size ( ) ;
bool more ;
do {
if ( avail < 1 ) {
return ;
}
more = ( data [ 0 ] & 1 ) ! = 0 ;
const bool long_size = ( data [ 0 ] & 2 ) ! = 0 ;
const bool command = ( data [ 0 ] & 4 ) ! = 0 ;
+ + data ;
- - avail ;
uint64_t size = 0 ;
if ( long_size )
{
if ( avail < sizeof ( uint64_t ) ) {
return ;
}
size = bswap_64 ( * ( ( uint64_t * ) data ) ) ;
data + = sizeof ( uint64_t ) ;
avail - = sizeof ( uint64_t ) ;
}
else
{
if ( avail < sizeof ( uint8_t ) ) {
return ;
}
size = static_cast < uint8_t > ( * data ) ;
+ + data ;
- - avail ;
}
if ( size > 1024U - msg_size )
{
LOG_ERR ( " %s " RED ( " ZMQ message is too large, size = % " PRIu64 " bytes " ) , tag ( ) , size ) ;
ZMQClose ( ) ;
return ;
}
if ( avail < size ) {
return ;
}
if ( ! command ) {
# ifdef APP_DEBUG
msg . insert ( msg . end ( ) , data , data + size ) ;
# endif
msg_size + = size ;
}
data + = size ;
avail - = size ;
} while ( more ) ;
m_ZMQRecvBuf . erase ( m_ZMQRecvBuf . begin ( ) , m_ZMQRecvBuf . begin ( ) + ( data - m_ZMQRecvBuf . data ( ) ) ) ;
# ifdef APP_DEBUG
LOG_DEBUG ( CYAN ( " tcp-zmq://%s:%u " ) BLACK_BOLD ( " read " ) CYAN_BOLD ( " %zu " ) BLACK_BOLD ( " bytes " ) " %s " , m_pool . host ( ) . data ( ) , m_pool . zmq_port ( ) , msg . size ( ) , msg . data ( ) ) ;
# endif
getBlockTemplate ( ) ;
}
bool xmrig : : DaemonClient : : ZMQClose ( bool shutdown )
{
if ( ( m_ZMQConnectionState = = ZMQ_NOT_CONNECTED ) | | ( m_ZMQConnectionState = = ZMQ_DISCONNECTING ) ) {
if ( shutdown ) {
m_storage . remove ( m_key ) ;
}
return false ;
}
m_ZMQConnectionState = ZMQ_DISCONNECTING ;
if ( uv_is_closing ( reinterpret_cast < uv_handle_t * > ( m_ZMQSocket ) ) = = 0 ) {
uv_close ( reinterpret_cast < uv_handle_t * > ( m_ZMQSocket ) , shutdown ? onZMQShutdown : onZMQClose ) ;
if ( ! shutdown ) {
retry ( ) ;
}
return true ;
}
return false ;
}