Backend server for SDL
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

182 lines
4.2 KiB

package common
import (
"encoding/hex"
"encoding/json"
"sync"
"sync/atomic"
"time"
log "github.com/sirupsen/logrus"
// Go is too dumb to import from branches so use upstream for now
//"github.com/zcash/lightwalletd/walletrpc"
//"git.hush.is/hush/lightwalletd/walletrpc"
"git.hush.is/duke/lightwalletd/walletrpc"
)
var (
// List of all mempool transactions
txns map[string]*walletrpc.RawTransaction = make(map[string]*walletrpc.RawTransaction)
// List of all clients waiting to recieve mempool txns
clients []chan<- *walletrpc.RawTransaction
// Last height of the blocks. If this changes, then close all the clients and flush the mempool
lastHeight int
// A pointer to the blockcache
blockcache *BlockCache
// Mutex to lock the above 2 structs
lock sync.Mutex
// Since the mutex doesn't have a "try_lock" method, we'll have to improvize with this
refreshing int32 = 0
)
// AddNewClient adds a new client to the list of clients to notify for mempool txns
func AddNewClient(client chan<- *walletrpc.RawTransaction) {
lock.Lock()
defer lock.Unlock()
//Log.Infoln("Adding new client, sending ", len(txns), " transactions")
// Also send all pending mempool txns
for _, rtx := range txns {
if client != nil {
client <- rtx
}
}
if client != nil {
clients = append(clients, client)
}
}
// RefreshMempoolTxns gets all new mempool txns and sends any new ones to waiting clients
func refreshMempoolTxns() error {
log.Infoln("Refreshing mempool")
// First check if another refresh is running, if it is, just return
if !atomic.CompareAndSwapInt32(&refreshing, 0, 1) {
log.Warnln("Another refresh in progress, returning")
return nil
}
// Set refreshing to 0 when we exit
defer func() {
refreshing = 0
}()
// Check if the blockchain has changed, and if it has, then clear everything
lock.Lock()
defer lock.Unlock()
if lastHeight < blockcache.GetLatestHeight() {
log.Infoln("Block height changed, clearing everything")
// Flush all the clients
for _, client := range clients {
if client != nil {
close(client)
}
}
clients = make([]chan<- *walletrpc.RawTransaction, 0)
// Clear txns
txns = make(map[string]*walletrpc.RawTransaction)
lastHeight = blockcache.GetLatestHeight()
}
var mempoolList []string
params := make([]json.RawMessage, 0)
result, rpcErr := RawRequest("getrawmempool", params)
if rpcErr != nil {
return rpcErr
}
err := json.Unmarshal(result, &mempoolList)
if err != nil {
return err
}
//println("getrawmempool size ", len(mempoolList))
// Fetch all new mempool txns and add them into `newTxns`
for _, txidstr := range mempoolList {
if _, ok := txns[txidstr]; !ok {
txidJSON, err := json.Marshal(txidstr)
if err != nil {
return err
}
// The "0" is because we only need the raw hex, which is returned as
// just a hex string, and not even a json string (with quotes).
params := []json.RawMessage{txidJSON, json.RawMessage("0")}
result, rpcErr := RawRequest("getrawtransaction", params)
if rpcErr != nil {
// Not an error; mempool transactions can disappear
continue
}
// strip the quotes
var txStr string
err = json.Unmarshal(result, &txStr)
if err != nil {
return err
}
// conver to binary
txBytes, err := hex.DecodeString(txStr)
if err != nil {
return err
}
newRtx := &walletrpc.RawTransaction{
Data: txBytes,
Height: uint64(lastHeight),
}
// Notify waiting clients
for _, client := range clients {
if client != nil {
client <- newRtx
}
}
log.Infoln("Adding new mempool txid", txidstr, " sending to ", len(clients), " clients")
txns[txidstr] = newRtx
}
}
return nil
}
// StartMempoolMonitor starts monitoring the mempool
func StartMempoolMonitor(cache *BlockCache, done <-chan bool) {
go func() {
ticker := time.NewTicker(2 * time.Second)
blockcache = cache
lastHeight = blockcache.GetLatestHeight()
for {
select {
case <-ticker.C:
go func() {
//Log.Infoln("Ticker triggered")
err := refreshMempoolTxns()
if err != nil {
log.Errorln("Mempool refresh error:", err.Error())
}
}()
case <-done:
for _, client := range clients {
close(client)
}
return
}
}
}()
}