Browse Source

Remove 0mq logic, add getblock RPC wrapper, add RPC polling logic, and remove old packages

remove_0mq
mdr0id 5 years ago
parent
commit
1587db121c
  1. 156
      cmd/ingest/main.go

156
cmd/ingest/main.go

@ -1,18 +1,23 @@
package main package main
import ( import (
"context"
"database/sql" "database/sql"
"encoding/binary"
"encoding/hex" "encoding/hex"
"flag" "flag"
"fmt" "fmt"
"os" "os"
"time"
"encoding/json"
"github.com/pkg/errors"
"strconv"
"strings"
"github.com/btcsuite/btcd/rpcclient"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
_ "github.com/mattn/go-sqlite3"
zmq "github.com/pebbe/zmq4"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/zcash-hackworks/lightwalletd/frontend"
"github.com/zcash-hackworks/lightwalletd/parser" "github.com/zcash-hackworks/lightwalletd/parser"
"github.com/zcash-hackworks/lightwalletd/storage" "github.com/zcash-hackworks/lightwalletd/storage"
) )
@ -22,20 +27,18 @@ var logger = logrus.New()
var db *sql.DB var db *sql.DB
type Options struct { type Options struct {
zmqAddr string
zmqTopic string
dbPath string dbPath string
logLevel uint64 logLevel uint64
logPath string logPath string
zcashConfPath string
} }
func main() { func main() {
opts := &Options{} opts := &Options{}
flag.StringVar(&opts.zmqAddr, "zmq-addr", "127.0.0.1:28332", "the address of the 0MQ publisher")
flag.StringVar(&opts.zmqTopic, "zmq-topic", "checkedblock", "the stream to listen to")
flag.StringVar(&opts.dbPath, "db-path", "", "the path to a sqlite database file") flag.StringVar(&opts.dbPath, "db-path", "", "the path to a sqlite database file")
flag.Uint64Var(&opts.logLevel, "log-level", uint64(logrus.InfoLevel), "log level (logrus 1-7)") flag.Uint64Var(&opts.logLevel, "log-level", uint64(logrus.InfoLevel), "log level (logrus 1-7)")
flag.StringVar(&opts.logPath, "log-file", "", "log file to write to") flag.StringVar(&opts.logPath, "log-file", "", "log file to write to")
flag.StringVar(&opts.zcashConfPath, "conf-file", "", "conf file to pull RPC creds from")
// TODO prod metrics // TODO prod metrics
// TODO support config from file and env vars // TODO support config from file and env vars
flag.Parse() flag.Parse()
@ -90,107 +93,107 @@ func main() {
}).Fatal("couldn't create SQL tables") }).Fatal("couldn't create SQL tables")
} }
// Initialize ZMQ //Initialize RPC connection with full node zcashd
ctx, err := zmq.NewContext() rpcClient, err := frontend.NewZRPCFromConf(opts.zcashConfPath)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"error": err, "error": err,
}).Fatal("couldn't create zmq context") }).Warn("zcash.conf failed, will try empty credentials for rpc")
}
defer ctx.Term()
// WARNING: The Socket is not thread safe. This means that you cannot //Default to testnet, but user MUST specify rpcuser and rpcpassword in zcash.conf; no default
// access the same Socket from different goroutines without using something rpcClient, err = frontend.NewZRPCFromCreds("127.0.0.1:18232", " ", " ")
// like a mutex.
sock, err := ctx.NewSocket(zmq.SUB) if err != nil {
if err != nil { log.WithFields(logrus.Fields{
log.WithFields(logrus.Fields{ "error": err,
"error": err, }).Fatal("couldn't start rpc connection")
}).Fatal("couldn't create zmq context socket") }
} }
err = sock.SetSubscribe(opts.zmqTopic) ctx := context.Background()
height, err := storage.GetCurrentHeight(ctx, db)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"error": err, "error": err,
"stream": opts.zmqTopic, }).Warn("unable to get current height from local db storage")
}).Fatal("couldn't subscribe to stream")
} }
connString := fmt.Sprintf("tcp://%s", opts.zmqAddr) if height < 0 {
height = 0
err = sock.Connect(connString)
if err != nil {
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"error": err, "error": err,
"connection": connString, }).Warn("invalid current height read from local db storage")
}).Fatal("couldn't connect to socket")
} }
defer sock.Close()
log.Printf("Listening to 0mq on %s", opts.zmqAddr)
// Start listening for new blocks // Start listening for new blocks
for { for {
msg, err := sock.RecvMessageBytes(0) block, err := getBlock(rpcClient, height)
if err != nil { if err != nil{
log.WithFields(logrus.Fields{ log.WithFields(logrus.Fields{
"error": err, "height": height,
}).Error("error on msg recv") "error": err,
continue }).Fatal("error with getblock")
} }
if block != nil{
if len(msg) < 3 { handleBlock(db , block)
log.WithFields(logrus.Fields{ height++
"msg": msg, //TODO store block current/prev hash for formal reorg
}).Warn("got unknown message type") }else{
continue //TODO implement blocknotify to minimize polling on corner cases
time.Sleep(60 * time.Second)
} }
}
}
topic, body := msg[0], msg[1] func getBlock(rpcClient *rpcclient.Client, height int) (*parser.Block, error) {
params := make([]json.RawMessage, 2)
var sequence int params[0] = json.RawMessage("\"" + strconv.Itoa(height) + "\"")
if len(msg[2]) == 4 { params[1] = json.RawMessage("0")
sequence = int(binary.LittleEndian.Uint32(msg[len(msg)-1])) result, rpcErr := rpcClient.RawRequest("getblock", params)
var err error
var errCode int64
// For some reason, the error responses are not JSON
if rpcErr != nil {
errParts := strings.SplitN(rpcErr.Error(), ":", 2)
errCode, err = strconv.ParseInt(errParts[0], 10, 32)
if err == nil && errCode == -8 {
return nil, nil
} }
return nil, errors.Wrap(rpcErr, "error requesting block")
}
switch string(topic) { var blockDataHex string
err = json.Unmarshal(result, &blockDataHex)
case opts.zmqTopic: if err != nil{
// there's an implicit mutex here return nil, errors.Wrap(err, "error reading JSON response")
go handleBlock(db, sequence, body) }
default: blockData, err := hex.DecodeString(blockDataHex)
log.WithFields(logrus.Fields{ if err != nil {
"seqnum": sequence, return nil, errors.Wrap(err, "error decoding getblock output")
"topic": topic,
}).Warn("got message with unknown topic")
}
} }
}
func handleBlock(db *sql.DB, sequence int, blockData []byte) {
block := parser.NewBlock() block := parser.NewBlock()
rest, err := block.ParseFromSlice(blockData) rest, err := block.ParseFromSlice(blockData)
if err != nil { if err != nil {
log.WithFields(logrus.Fields{ return nil, errors.Wrap(err, "error parsing block")
"seqnum": sequence,
"error": err,
}).Error("error parsing block")
return
} }
if len(rest) != 0 { if len(rest) != 0 {
log.WithFields(logrus.Fields{ return nil, errors.New("received overlong message")
"seqnum": sequence,
"length": len(rest),
}).Warn("received overlong message")
return
} }
return block, nil
}
func handleBlock(db *sql.DB, block *parser.Block) {
blockHash := hex.EncodeToString(block.GetEncodableHash()) blockHash := hex.EncodeToString(block.GetEncodableHash())
marshaledBlock, _ := proto.Marshal(block.ToCompact()) marshaledBlock, _ := proto.Marshal(block.ToCompact())
err = storage.StoreBlock( err := storage.StoreBlock(
db, db,
block.GetHeight(), block.GetHeight(),
blockHash, blockHash,
@ -199,7 +202,6 @@ func handleBlock(db *sql.DB, sequence int, blockData []byte) {
) )
entry := log.WithFields(logrus.Fields{ entry := log.WithFields(logrus.Fields{
"seqnum": sequence,
"block_height": block.GetHeight(), "block_height": block.GetHeight(),
"block_hash": hex.EncodeToString(block.GetDisplayHash()), "block_hash": hex.EncodeToString(block.GetDisplayHash()),
"block_version": block.GetVersion(), "block_version": block.GetVersion(),

Loading…
Cancel
Save