Browse Source

Merge branch 'dev' into master

pull/55/head
jahway603 11 months ago
parent
commit
428e72dd8c
  1. 2
      README.md
  2. 257
      cmd/server/main.old
  3. 32
      cmd/server/main_test.old
  4. 73
      common/common.go
  5. 16
      frontend/service.go

2
README.md

@ -174,6 +174,8 @@ protoc --go_out=paths=source_relative:. foo.proto
Or do `make protobuf`
To update the version of lightwalletd, update the value of the `Version` variable in common/common.go .
## Support
For support or other questions, join us on [Telegram](https://hush.is/telegram), or tweet at [@HushIsPrivacy](https://twitter.com/HushIsPrivacy), or toot at our [Mastodon](https://fosstodon.org/@myhushteam) or join [Telegram Support](https://hush.is/telegram_support).

257
cmd/server/main.old

@ -1,257 +0,0 @@
package main
import (
"context"
"flag"
"fmt"
"net"
"os"
"os/signal"
"syscall"
"time"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/reflection"
"git.hush.is/hush/lightwalletd/common"
"git.hush.is/hush/lightwalletd/frontend"
"git.hush.is/hush/lightwalletd/walletrpc"
)
var log *logrus.Entry
var logger = logrus.New()
func init() {
logger.SetFormatter(&logrus.TextFormatter{
//DisableColors: true,
FullTimestamp: true,
DisableLevelTruncation: true,
})
log = logger.WithFields(logrus.Fields{
"app": "frontend-grpc",
})
}
// TODO stream logging
func LoggingInterceptor() grpc.ServerOption {
return grpc.UnaryInterceptor(logInterceptor)
}
func logInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
reqLog := loggerFromContext(ctx)
start := time.Now()
resp, err := handler(ctx, req)
entry := reqLog.WithFields(logrus.Fields{
"method": info.FullMethod,
"duration": time.Since(start),
"error": err,
})
if err != nil {
entry.Error("call failed")
} else {
entry.Info("method called")
}
return resp, err
}
func loggerFromContext(ctx context.Context) *logrus.Entry {
// TODO: anonymize the addresses. cryptopan?
if peerInfo, ok := peer.FromContext(ctx); ok {
return log.WithFields(logrus.Fields{"peer_addr": peerInfo.Addr})
}
return log.WithFields(logrus.Fields{"peer_addr": "unknown"})
}
type Options struct {
bindAddr string `json:"bind_address,omitempty"`
tlsCertPath string `json:"tls_cert_path,omitempty"`
tlsKeyPath string `json:"tls_cert_key,omitempty"`
noTLS bool `json:no_tls,omitempty`
logLevel uint64 `json:"log_level,omitempty"`
logPath string `json:"log_file,omitempty"`
hush3ConfPath string `json:"hush3_conf,omitempty"`
cacheSize int `json:"hush3_conf,omitempty"`
}
func main() {
var version = "0.1.1" // set version number
opts := &Options{}
flag.StringVar(&opts.bindAddr, "bind-addr", "127.0.0.1:9067", "the address to listen on")
flag.StringVar(&opts.tlsCertPath, "tls-cert", "", "the path to a TLS certificate (optional)")
flag.StringVar(&opts.tlsKeyPath, "tls-key", "", "the path to a TLS key file (optional)")
flag.BoolVar(&opts.noTLS, "no-tls", false, "Disable TLS, serve un-encrypted traffic.")
flag.Uint64Var(&opts.logLevel, "log-level", uint64(logrus.InfoLevel), "log level (logrus 1-7)")
flag.StringVar(&opts.logPath, "log-file", "", "log file to write to")
flag.StringVar(&opts.hush3ConfPath, "conf-file", "", "conf file to pull RPC creds from")
flag.IntVar(&opts.cacheSize, "cache-size", 40000, "number of blocks to hold in the cache")
// creating --version as a requirement of help2man
if len(os.Args) > 1 && (os.Args[1] == "--version" || os.Args[1] == "-v") {
fmt.Printf("Hush lightwalletd version " + version + "\n")
os.Exit(0)
}
// TODO prod metrics
// TODO support config from file and env vars
flag.Parse()
if opts.hush3ConfPath == "" {
flag.Usage()
os.Exit(1)
}
if !opts.noTLS && (opts.tlsCertPath == "" || opts.tlsKeyPath == "") {
println("Please specify a TLS certificate/key to use. You can use a self-signed certificate.")
println("See https://git.hush.is/hush/lightwalletd/src/branch/master/README.md#running-your-own-sdl-lightwalletd")
os.Exit(1)
}
if opts.logPath != "" {
// instead write parsable logs for logstash/splunk/etc
output, err := os.OpenFile(opts.logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
"path": opts.logPath,
}).Fatal("couldn't open log file")
}
defer output.Close()
logger.SetOutput(output)
logger.SetFormatter(&logrus.JSONFormatter{})
}
logger.SetLevel(logrus.Level(opts.logLevel))
// gRPC initialization
var server *grpc.Server
if !opts.noTLS && (opts.tlsCertPath != "" && opts.tlsKeyPath != "") {
transportCreds, err := credentials.NewServerTLSFromFile(opts.tlsCertPath, opts.tlsKeyPath)
if err != nil {
log.WithFields(logrus.Fields{
"cert_file": opts.tlsCertPath,
"key_path": opts.tlsKeyPath,
"error": err,
}).Fatal("couldn't load TLS credentials")
}
server = grpc.NewServer(grpc.Creds(transportCreds), LoggingInterceptor())
} else {
server = grpc.NewServer(LoggingInterceptor())
}
// Enable reflection for debugging
if opts.logLevel >= uint64(logrus.WarnLevel) {
reflection.Register(server)
}
// Initialize Hush RPC client. Right now (Jan 2018) this is only for
// sending transactions, but in the future it could back a different type
// of block streamer.
rpcClient, err := frontend.NewZRPCFromConf(opts.hush3ConfPath)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Warn("HUSH3.conf failed, will try empty credentials for rpc")
rpcClient, err = frontend.NewZRPCFromCreds("127.0.0.1:18031", "", "")
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Warn("couldn't start rpc conn. won't be able to send transactions")
}
}
// Get the sapling activation height from the RPC
saplingHeight, blockHeight, chainName, branchID, difficulty, longestchain, notarized, err := common.GetSaplingInfo(rpcClient)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Warn("Unable to get sapling activation height")
}
log.Info("Got sapling height ", saplingHeight, " chain ", chainName, " branchID ", branchID, " difficulty ", difficulty, longestchain, " longestchain ", notarized, " notarized ")
// Get the Coinsupply from the RPC
result, coin, height, supply, zfunds, total, err := common.GetCoinsupply(rpcClient)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Warn("Unable to get coinsupply")
}
log.Info(" result ", result, " coin ", coin, " height", height, "supply", supply, "zfunds", zfunds, "total", total)
// Initialize the cache
cache := common.NewBlockCache(opts.cacheSize)
stopChan := make(chan bool, 1)
// Start the block cache importer at latestblock - 100k(cache size)
cacheStart := blockHeight - opts.cacheSize
if cacheStart < saplingHeight {
cacheStart = saplingHeight
}
go common.BlockIngestor(rpcClient, cache, log, stopChan, cacheStart)
// Compact transaction service initialization
service, err := frontend.NewSQLiteStreamer(rpcClient, cache, log)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Fatal("couldn't create SQL backend")
}
defer service.(*frontend.SqlStreamer).GracefulStop()
// Register service
walletrpc.RegisterCompactTxStreamerServer(server, service)
// Start listening
listener, err := net.Listen("tcp", opts.bindAddr)
if err != nil {
log.WithFields(logrus.Fields{
"bind_addr": opts.bindAddr,
"error": err,
}).Fatal("couldn't create listener")
}
// Signal handler for graceful stops
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
go func() {
s := <-signals
log.WithFields(logrus.Fields{
"signal": s.String(),
}).Info("caught signal, stopping gRPC server")
// Stop the server
server.GracefulStop()
// Stop the block ingestor
stopChan <- true
}()
log.Infof("Starting gRPC server on %s", opts.bindAddr)
err = server.Serve(listener)
if err != nil {
log.WithFields(logrus.Fields{
"error": err,
}).Fatal("gRPC server exited")
}
}

32
cmd/server/main_test.old

@ -1,32 +0,0 @@
// Copyright 2021 The Hush developers
// Released under the GPLv3
package main
import (
"os"
"testing"
)
// TestFileExists checks whether or not the file exists
func TestFileExists(t *testing.T) {
if fileExists("nonexistent-file") {
t.Fatal("fileExists unexpected success")
}
// If the path exists but is a directory, should return false
if fileExists(".") {
t.Fatal("fileExists unexpected success")
}
// The following file should exist, it's what's being tested
if !fileExists("main.go") {
t.Fatal("fileExists failed")
}
}
// fileExists checks if file exists and is not directory to prevent further errors
func fileExists(filename string) bool {
info, err := os.Stat(filename)
if os.IsNotExist(err) {
return false
}
return !info.IsDir()
}

73
common/common.go

@ -4,21 +4,21 @@
package common
import (
"encoding/hex"
"encoding/json"
"strconv"
"strings"
"time"
"git.hush.is/hush/lightwalletd/parser"
"git.hush.is/hush/lightwalletd/walletrpc"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"fmt"
"encoding/hex"
"encoding/json"
"strconv"
"strings"
"time"
"git.hush.is/hush/lightwalletd/parser"
"git.hush.is/hush/lightwalletd/walletrpc"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// TODO: 'make build' will overwrite this string with the output of git-describe (tag)
var (
Version = "v0.1.2"
Version = "v0.1.3"
GitCommit = ""
Branch = ""
BuildDate = ""
@ -159,7 +159,7 @@ type (
//TODO: this function is not currently used, but some of it's code
// needs to be implemented elsewhere
func GetSaplingInfo() (int, int, string, string, int, int, int, error) {
result, rpcErr := RawRequest("getblockchaininfo", []json.RawMessage{})
result, rpcErr := CallRpcWithRetries("getblockchaininfo", []json.RawMessage{})
var err error
var errCode int64
@ -199,7 +199,8 @@ func GetSaplingInfo() (int, int, string, string, int, int, int, error) {
}
func GetLightdInfo() (*walletrpc.LightdInfo, error) {
result, rpcErr := RawRequest("getinfo", []json.RawMessage{})
params := []json.RawMessage{}
result, rpcErr := CallRpcWithRetries("getinfo", params)
if rpcErr != nil {
return nil, rpcErr
}
@ -209,7 +210,8 @@ func GetLightdInfo() (*walletrpc.LightdInfo, error) {
return nil, rpcErr
}
result, rpcErr = RawRequest("getblockchaininfo", []json.RawMessage{})
params = []json.RawMessage{}
result, rpcErr = CallRpcWithRetries("getblockchaininfo", params)
if rpcErr != nil {
return nil, rpcErr
}
@ -260,7 +262,7 @@ func FirstRPC() {
break
}
retryCount++
if retryCount > 10 {
if retryCount > 20 {
Log.WithFields(logrus.Fields{
"timeouts": retryCount,
}).Fatal("unable to issue getblockchaininfo RPC call to hushd node")
@ -273,8 +275,36 @@ func FirstRPC() {
}
}
func CallRpcWithRetries(method string, params []json.RawMessage) (json.RawMessage, error) {
retryCount := 0
maxRetries := 50
for {
// params := []json.RawMessage{}
result, err := RawRequest(method, params)
if err == nil {
if retryCount > 0 {
Log.Warn(fmt.Sprintf("%s RPC successful"), method)
}
return result, err
break
}
retryCount++
if retryCount > maxRetries {
Log.WithFields(logrus.Fields{
"timeouts": retryCount,
}).Fatal(fmt.Sprintf("unable to issue %s RPC call to hushd node"), method)
}
Log.WithFields(logrus.Fields{
"error": err.Error(),
"retry": retryCount,
}).Warn(fmt.Sprintf("error with %s rpc, retrying..."), method)
Time.Sleep(time.Duration(10+retryCount*5) * time.Second) // backoff
}
return nil, nil
}
func GetBlockChainInfo() (*HushdRpcReplyGetblockchaininfo, error) {
// we don't use CallRpcWithRetries here because the calling code does it already
result, rpcErr := RawRequest("getblockchaininfo", []json.RawMessage{})
if rpcErr != nil {
return nil, rpcErr
@ -288,7 +318,8 @@ func GetBlockChainInfo() (*HushdRpcReplyGetblockchaininfo, error) {
}
func GetCoinsupply() (string, string, int, int, int, int, error) {
result1, rpcErr := RawRequest("coinsupply", []json.RawMessage{})
params := []json.RawMessage{}
result1, rpcErr := CallRpcWithRetries("coinsupply", params)
var err error
var errCode int64
@ -337,13 +368,12 @@ func getBlockFromRPC(height int) (*walletrpc.CompactBlock, error) {
}
params[0] = heightJSON
// Fetch the block using the verbose option ("1") because it provides
// both the list of txids, which we're not yet able to compute for
// Orchard (V5) transactions, and the block hash (block ID), which
// both the list of txids and the block hash (block ID), which
// we need to fetch the raw data format of the same block. Don't fetch
// by height in case a reorg occurs between the two getblock calls;
// using block hash ensures that we're fetching the same block.
params[1] = json.RawMessage("1")
result, rpcErr := RawRequest("getblock", params)
result, rpcErr := CallRpcWithRetries("getblock", params)
if rpcErr != nil {
// Check to see if we are requesting a height the hushd doesn't have yet
if (strings.Split(rpcErr.Error(), ":"))[0] == "-8" {
@ -362,7 +392,7 @@ func getBlockFromRPC(height int) (*walletrpc.CompactBlock, error) {
}
params[0] = blockHash
params[1] = json.RawMessage("0") // non-verbose (raw hex)
result, rpcErr = RawRequest("getblock", params)
result, rpcErr = CallRpcWithRetries("getblock", params)
// For some reason, the error responses are not JSON
if rpcErr != nil {
@ -435,7 +465,8 @@ func BlockIngestor(c *BlockCache, rep int) {
default:
}
result, err := RawRequest("getbestblockhash", []json.RawMessage{})
params := []json.RawMessage{}
result, err := CallRpcWithRetries("getbestblockhash", params)
if err != nil {
Log.WithFields(logrus.Fields{
"error": err,

16
frontend/service.go

@ -95,7 +95,7 @@ func (s *lwdStreamer) GetTaddressTxids(addressBlockFilter *walletrpc.Transparent
return err
}
params[0] = param
result, rpcErr := common.RawRequest("getaddresstxids", params)
result, rpcErr := common.CallRpcWithRetries("getaddresstxids", params)
// For some reason, the error responses are not JSON
if rpcErr != nil {
@ -200,7 +200,7 @@ func (s *lwdStreamer) GetTransaction(ctx context.Context, txf *walletrpc.TxFilte
leHashStringJSON,
json.RawMessage("1"),
}
result, rpcErr := common.RawRequest("getrawtransaction", params)
result, rpcErr := common.CallRpcWithRetries("getrawtransaction", params)
// For some reason, the error responses are not JSON
if rpcErr != nil {
@ -279,7 +279,7 @@ func (s *lwdStreamer) SendTransaction(ctx context.Context, rawtx *walletrpc.RawT
return &walletrpc.SendResponse{}, err
}
params[0] = txJSON
result, rpcErr := common.RawRequest("sendrawtransaction", params)
result, rpcErr := common.CallRpcWithRetries("sendrawtransaction", params)
var errCode int64
var errMsg string
@ -325,7 +325,7 @@ func getTaddressBalanceHushdRpc(addressList []string) (*walletrpc.Balance, error
}
params[0] = param
result, rpcErr := common.RawRequest("getaddressbalance", params)
result, rpcErr := common.CallRpcWithRetries("getaddressbalance", params)
if rpcErr != nil {
return &walletrpc.Balance{}, rpcErr
}
@ -353,7 +353,7 @@ func getAddressUtxos(arg *walletrpc.GetAddressUtxosArg, f func(*walletrpc.GetAdd
return err
}
params[0] = param
result, rpcErr := common.RawRequest("getaddressutxos", params)
result, rpcErr := common.CallRpcWithRetries("getaddressutxos", params)
if rpcErr != nil {
return rpcErr
}
@ -431,7 +431,7 @@ func (s *lwdStreamer) GetMempoolStream(_empty *walletrpc.Empty, resp walletrpc.C
var mempoolMap *map[string]*walletrpc.CompactTx
var mempoolList []string
// Last time we pulled a copy of the mempool from zcashd.
// Last time we pulled a copy of the mempool from hushd
var lastMempool time.Time
func (s *lwdStreamer) GetMempoolTx(exclude *walletrpc.Exclude, resp walletrpc.CompactTxStreamer_GetMempoolTxServer) error {
@ -442,7 +442,7 @@ func (s *lwdStreamer) GetMempoolTx(exclude *walletrpc.Exclude, resp walletrpc.Co
lastMempool = time.Now()
// Refresh our copy of the mempool.
params := make([]json.RawMessage, 0)
result, rpcErr := common.RawRequest("getrawmempool", params)
result, rpcErr := common.CallRpcWithRetries("getrawmempool", params)
if rpcErr != nil {
return rpcErr
}
@ -467,7 +467,7 @@ func (s *lwdStreamer) GetMempoolTx(exclude *walletrpc.Exclude, resp walletrpc.Co
// The "0" is because we only need the raw hex, which is returned as
// just a hex string, and not even a json string (with quotes).
params := []json.RawMessage{txidJSON, json.RawMessage("0")}
result, rpcErr := common.RawRequest("getrawtransaction", params)
result, rpcErr := common.CallRpcWithRetries("getrawtransaction", params)
if rpcErr != nil {
// Not an error; mempool transactions can disappear
continue

Loading…
Cancel
Save