Browse Source

Add debug messages and smsg polling mode.

smsg_debug
tecnovert 6 months ago
parent
commit
5c4c5089cb
No known key found for this signature in database GPG Key ID: 8ED6D8750C4E3F93
  1. 49
      basicswap/basicswap.py

49
basicswap/basicswap.py

@ -244,6 +244,10 @@ class BasicSwap(BaseApp):
self._possibly_revoked_offers = collections.deque([], maxlen=48) # TODO: improve
self._updating_wallets_info = {}
self._last_updated_wallets_info = 0
self._zmq_queue_enabled = self.settings.get('zmq_queue_enabled', False)
self._poll_smsg = self.settings.get('poll_smsg', True)
self.check_smsg_seconds = self.settings.get('check_smsg_seconds', 10)
self._last_checked_smsg = 0
self._notifications_enabled = self.settings.get('notifications_enabled', True)
self._disabled_notification_types = self.settings.get('disabled_notification_types', [])
@ -336,11 +340,12 @@ class BasicSwap(BaseApp):
session.close()
session.remove()
self.zmqContext = zmq.Context()
self.zmqSubscriber = self.zmqContext.socket(zmq.SUB)
if self._zmq_queue_enabled:
self.zmqContext = zmq.Context()
self.zmqSubscriber = self.zmqContext.socket(zmq.SUB)
self.zmqSubscriber.connect(self.settings['zmqhost'] + ':' + str(self.settings['zmqport']))
self.zmqSubscriber.setsockopt_string(zmq.SUBSCRIBE, 'smsg')
self.zmqSubscriber.connect(self.settings['zmqhost'] + ':' + str(self.settings['zmqport']))
self.zmqSubscriber.setsockopt_string(zmq.SUBSCRIBE, 'smsg')
for c in Coins:
if c in chainparams:
@ -388,7 +393,8 @@ class BasicSwap(BaseApp):
else:
self.thread_pool.shutdown()
self.zmqContext.destroy()
if self._zmq_queue_enabled:
self.zmqContext.destroy()
self.swaps_in_progress.clear()
close_all_sessions()
@ -4236,7 +4242,7 @@ class BasicSwap(BaseApp):
self.log.error(traceback.format_exc())
now: int = self.getTime()
options = {'encoding': 'none'}
options = {'encoding': 'none', 'setread': False}
inbox_messages = ci_part.json_request(rpc_conn, 'smsginbox', ['all', '', options])['messages']
for msg in inbox_messages:
remove_if_expired(msg)
@ -6055,15 +6061,28 @@ class BasicSwap(BaseApp):
self.processMsg(msg)
def update(self) -> None:
try:
if self._read_zmq_queue:
message = self.zmqSubscriber.recv(flags=zmq.NOBLOCK)
if message == b'smsg':
self.processZmqSmsg()
except zmq.Again as ex:
pass
except Exception as ex:
self.logException(f'smsg zmq {ex}')
if self._zmq_queue_enabled:
try:
if self._read_zmq_queue:
message = self.zmqSubscriber.recv(flags=zmq.NOBLOCK)
self.log.debug('[rm] zmq msg {}'.format(message))
if message == b'smsg':
self.processZmqSmsg()
except zmq.Again as ex:
pass
except Exception as ex:
self.logException(f'smsg zmq {ex}')
if self._poll_smsg:
now: int = self.getTime()
if now - self._last_checked_smsg >= self.check_smsg_seconds:
self.log.debug('[rm] self._read_zmq_queue {}'.format(self._read_zmq_queue))
self._last_checked_smsg = now
options = {'encoding': 'hex', 'setread': True}
msgs = self.callrpc('smsginbox', ['unread', '', options])
self.log.debug('[rm] len msgs {}'.format(len(msgs['messages'])))
for msg in msgs['messages']:
self.processMsg(msg)
self.mxDB.acquire()
try:

Loading…
Cancel
Save