add userdatastream cmd for testing private stream

This commit is contained in:
c9s 2021-05-28 00:47:34 +08:00
parent 545d0f18e3
commit 777701c0cb
5 changed files with 155 additions and 27 deletions

69
pkg/cmd/userdatastream.go Normal file
View File

@ -0,0 +1,69 @@
package cmd
import (
"context"
"fmt"
"syscall"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/types"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
// go run ./cmd/bbgo userdatastream --session=ftx
var userDataStreamCmd = &cobra.Command{
Use: "userdatastream",
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
if userConfig == nil {
return errors.New("--config option or config file is missing")
}
environ := bbgo.NewEnvironment()
if err := environ.ConfigureExchangeSessions(userConfig); err != nil {
return err
}
sessionName, err := cmd.Flags().GetString("session")
if err != nil {
return err
}
session, ok := environ.Session(sessionName)
if !ok {
return fmt.Errorf("session %s not found", sessionName)
}
s := session.Exchange.NewStream()
s.OnOrderUpdate(func(order types.Order) {
log.Infof("order update: %+v", order)
})
s.OnTradeUpdate(func(trade types.Trade) {
log.Infof("trade update: %+v", trade)
})
s.OnBalanceUpdate(func(trade types.BalanceMap) {
log.Infof("balance update: %+v", trade)
})
s.OnBalanceSnapshot(func(trade types.BalanceMap) {
log.Infof("balance snapshot: %+v", trade)
})
log.Infof("connecting...")
if err := s.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to %s", sessionName)
}
log.Infof("connected")
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
return nil
},
}
func init() {
userDataStreamCmd.Flags().String("session", "", "session name")
RootCmd.AddCommand(userDataStreamCmd)
}

View File

@ -52,7 +52,8 @@ func toGlobalBalance(balanceSummaries []okexapi.BalanceSummary) types.BalanceMap
type WebsocketSubscription struct { type WebsocketSubscription struct {
Channel string `json:"channel"` Channel string `json:"channel"`
InstrumentID string `json:"instId"` InstrumentID string `json:"instId,omitempty"`
InstrumentType string `json:"instType,omitempty"`
} }
var CandleChannels = []string{ var CandleChannels = []string{

View File

@ -177,7 +177,7 @@ func (c *RestClient) newAuthenticatedRequest(method, refURL string, params url.V
} }
signKey := timestamp + strings.ToUpper(method) + path + string(body) signKey := timestamp + strings.ToUpper(method) + path + string(body)
signature := sign(signKey, c.Secret) signature := Sign(signKey, c.Secret)
req, err := http.NewRequest(method, pathURL.String(), bytes.NewReader(body)) req, err := http.NewRequest(method, pathURL.String(), bytes.NewReader(body))
if err != nil { if err != nil {
@ -388,7 +388,7 @@ func (c *RestClient) MarketTickers(instType InstrumentType) ([]MarketTicker, err
return tickerResponse.Data, nil return tickerResponse.Data, nil
} }
func sign(payload string, secret string) string { func Sign(payload string, secret string) string {
var sig = hmac.New(sha256.New, []byte(secret)) var sig = hmac.New(sha256.New, []byte(secret))
_, err := sig.Write([]byte(payload)) _, err := sig.Write([]byte(payload))
if err != nil { if err != nil {

View File

@ -30,20 +30,23 @@ func Parse(str string) (interface{}, error) {
} }
type WebSocketEvent struct { type WebSocketEvent struct {
Event string Event string `json:"event"`
Code string Code string `json:"code,omitempty"`
Message string Message string `json:"msg,omitempty"`
Arg interface{} `json:"arg,omitempty"`
} }
func parseEvent(v *fastjson.Value) (*WebSocketEvent, error) { func parseEvent(v *fastjson.Value) (*WebSocketEvent, error) {
// event could be "subscribe", "unsubscribe" or "error" // event could be "subscribe", "unsubscribe" or "error"
event := string(v.GetStringBytes("event")) event := string(v.GetStringBytes("event"))
code := string(v.GetStringBytes("code")) code := string(v.GetStringBytes("code"))
message := string(v.GetStringBytes("message")) message := string(v.GetStringBytes("msg"))
arg := v.GetObject("arg")
return &WebSocketEvent{ return &WebSocketEvent{
Event: event, Event: event,
Code: code, Code: code,
Message: message, Message: message,
Arg: arg,
}, nil }, nil
} }

View File

@ -3,6 +3,7 @@ package okex
import ( import (
"context" "context"
"net" "net"
"strconv"
"sync" "sync"
"time" "time"
@ -16,6 +17,13 @@ type WebsocketOp struct {
Args interface{} `json:"args"` Args interface{} `json:"args"`
} }
type WebsocketLogin struct {
Key string `json:"apiKey"`
Passphrase string `json:"passphrase"`
Timestamp string `json:"timestamp"`
Sign string `json:"sign"`
}
//go:generate callbackgen -type Stream -interface //go:generate callbackgen -type Stream -interface
type Stream struct { type Stream struct {
types.StandardStream types.StandardStream
@ -76,30 +84,78 @@ func NewStream(client *okexapi.RestClient) *Stream {
} }
}) })
stream.OnEvent(func(event WebSocketEvent) {
log.Infof("event: %+v", event)
switch event.Event {
case "login":
if event.Code == "0" {
var subs = []WebsocketSubscription{
{Channel: "account"},
{Channel: "orders", InstrumentType: string(okexapi.InstrumentTypeSpot)},
}
log.Infof("subscribing private channels: %+v", subs)
err := stream.Conn.WriteJSON(WebsocketOp{
Op: "subscribe",
Args: subs,
})
if err != nil {
log.WithError(err).Error("private channel subscribe error")
}
}
}
})
stream.OnConnect(func() { stream.OnConnect(func() {
var subs []WebsocketSubscription if stream.publicOnly {
for _, subscription := range stream.Subscriptions { var subs []WebsocketSubscription
sub, err := convertSubscription(subscription) for _, subscription := range stream.Subscriptions {
if err != nil { sub, err := convertSubscription(subscription)
log.WithError(err).Errorf("subscription convert error") if err != nil {
continue log.WithError(err).Errorf("subscription convert error")
continue
}
subs = append(subs, sub)
}
if len(subs) == 0 {
return
} }
subs = append(subs, sub) log.Infof("subscribing channels: %+v", subs)
} err := stream.Conn.WriteJSON(WebsocketOp{
Op: "subscribe",
Args: subs,
})
if len(subs) == 0 { if err != nil {
return log.WithError(err).Error("subscribe error")
} }
} else {
// login as private channel
// sign example:
// sign=CryptoJS.enc.Base64.Stringify(CryptoJS.HmacSHA256(timestamp +'GET'+'/users/self/verify', secretKey))
msTimestamp := strconv.FormatFloat(float64(time.Now().UnixNano())/float64(time.Second), 'f', -1, 64)
payload := msTimestamp + "GET" + "/users/self/verify"
sign := okexapi.Sign(payload, stream.Client.Secret)
op := WebsocketOp{
Op: "login",
Args: []WebsocketLogin{
{
Key: stream.Client.Key,
Passphrase: stream.Client.Passphrase,
Timestamp: msTimestamp,
Sign: sign,
},
},
}
log.Infof("subscribing channels: %+v", subs) log.Infof("sending login request: %+v", op)
err := stream.Conn.WriteJSON(WebsocketOp{ err := stream.Conn.WriteJSON(op)
Op: "subscribe", if err != nil {
Args: subs, log.WithError(err).Errorf("can not send login message")
}) }
if err != nil {
log.WithError(err).Error("subscribe error")
} }
}) })
@ -210,7 +266,6 @@ func (s *Stream) read(ctx context.Context) {
} }
mt, message, err := s.Conn.ReadMessage() mt, message, err := s.Conn.ReadMessage()
if err != nil { if err != nil {
// if it's a network timeout error, we should re-connect // if it's a network timeout error, we should re-connect
switch err := err.(type) { switch err := err.(type) {