diff --git a/pkg/cmd/userdatastream.go b/pkg/cmd/userdatastream.go new file mode 100644 index 000000000..9bf2db401 --- /dev/null +++ b/pkg/cmd/userdatastream.go @@ -0,0 +1,69 @@ +package cmd + +import ( + "context" + "fmt" + "syscall" + + "github.com/c9s/bbgo/pkg/bbgo" + "github.com/c9s/bbgo/pkg/cmd/cmdutil" + "github.com/c9s/bbgo/pkg/types" + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + "github.com/spf13/cobra" +) + +// go run ./cmd/bbgo userdatastream --session=ftx +var userDataStreamCmd = &cobra.Command{ + Use: "userdatastream", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + + if userConfig == nil { + return errors.New("--config option or config file is missing") + } + + environ := bbgo.NewEnvironment() + if err := environ.ConfigureExchangeSessions(userConfig); err != nil { + return err + } + + sessionName, err := cmd.Flags().GetString("session") + if err != nil { + return err + } + + session, ok := environ.Session(sessionName) + if !ok { + return fmt.Errorf("session %s not found", sessionName) + } + + s := session.Exchange.NewStream() + s.OnOrderUpdate(func(order types.Order) { + log.Infof("order update: %+v", order) + }) + s.OnTradeUpdate(func(trade types.Trade) { + log.Infof("trade update: %+v", trade) + }) + s.OnBalanceUpdate(func(trade types.BalanceMap) { + log.Infof("balance update: %+v", trade) + }) + s.OnBalanceSnapshot(func(trade types.BalanceMap) { + log.Infof("balance snapshot: %+v", trade) + }) + + log.Infof("connecting...") + if err := s.Connect(ctx); err != nil { + return fmt.Errorf("failed to connect to %s", sessionName) + } + log.Infof("connected") + + cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) + return nil + }, +} + +func init() { + userDataStreamCmd.Flags().String("session", "", "session name") + RootCmd.AddCommand(userDataStreamCmd) +} diff --git a/pkg/exchange/okex/convert.go b/pkg/exchange/okex/convert.go index 64e69daff..59014b9f1 100644 --- a/pkg/exchange/okex/convert.go +++ b/pkg/exchange/okex/convert.go @@ -52,7 +52,8 @@ func toGlobalBalance(balanceSummaries []okexapi.BalanceSummary) types.BalanceMap type WebsocketSubscription struct { Channel string `json:"channel"` - InstrumentID string `json:"instId"` + InstrumentID string `json:"instId,omitempty"` + InstrumentType string `json:"instType,omitempty"` } var CandleChannels = []string{ diff --git a/pkg/exchange/okex/okexapi/client.go b/pkg/exchange/okex/okexapi/client.go index 8007fda88..c7600e906 100644 --- a/pkg/exchange/okex/okexapi/client.go +++ b/pkg/exchange/okex/okexapi/client.go @@ -177,7 +177,7 @@ func (c *RestClient) newAuthenticatedRequest(method, refURL string, params url.V } signKey := timestamp + strings.ToUpper(method) + path + string(body) - signature := sign(signKey, c.Secret) + signature := Sign(signKey, c.Secret) req, err := http.NewRequest(method, pathURL.String(), bytes.NewReader(body)) if err != nil { @@ -388,7 +388,7 @@ func (c *RestClient) MarketTickers(instType InstrumentType) ([]MarketTicker, err return tickerResponse.Data, nil } -func sign(payload string, secret string) string { +func Sign(payload string, secret string) string { var sig = hmac.New(sha256.New, []byte(secret)) _, err := sig.Write([]byte(payload)) if err != nil { diff --git a/pkg/exchange/okex/parse.go b/pkg/exchange/okex/parse.go index 69fae792e..675bb61fa 100644 --- a/pkg/exchange/okex/parse.go +++ b/pkg/exchange/okex/parse.go @@ -30,20 +30,23 @@ func Parse(str string) (interface{}, error) { } type WebSocketEvent struct { - Event string - Code string - Message string + Event string `json:"event"` + Code string `json:"code,omitempty"` + Message string `json:"msg,omitempty"` + Arg interface{} `json:"arg,omitempty"` } func parseEvent(v *fastjson.Value) (*WebSocketEvent, error) { // event could be "subscribe", "unsubscribe" or "error" event := string(v.GetStringBytes("event")) code := string(v.GetStringBytes("code")) - message := string(v.GetStringBytes("message")) + message := string(v.GetStringBytes("msg")) + arg := v.GetObject("arg") return &WebSocketEvent{ Event: event, Code: code, Message: message, + Arg: arg, }, nil } diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index c3f8f0694..cf9aa7521 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -3,6 +3,7 @@ package okex import ( "context" "net" + "strconv" "sync" "time" @@ -16,6 +17,13 @@ type WebsocketOp struct { Args interface{} `json:"args"` } +type WebsocketLogin struct { + Key string `json:"apiKey"` + Passphrase string `json:"passphrase"` + Timestamp string `json:"timestamp"` + Sign string `json:"sign"` +} + //go:generate callbackgen -type Stream -interface type Stream struct { types.StandardStream @@ -76,30 +84,78 @@ func NewStream(client *okexapi.RestClient) *Stream { } }) + stream.OnEvent(func(event WebSocketEvent) { + log.Infof("event: %+v", event) + switch event.Event { + case "login": + if event.Code == "0" { + var subs = []WebsocketSubscription{ + {Channel: "account"}, + {Channel: "orders", InstrumentType: string(okexapi.InstrumentTypeSpot)}, + } + + log.Infof("subscribing private channels: %+v", subs) + err := stream.Conn.WriteJSON(WebsocketOp{ + Op: "subscribe", + Args: subs, + }) + + if err != nil { + log.WithError(err).Error("private channel subscribe error") + } + } + } + }) + stream.OnConnect(func() { - var subs []WebsocketSubscription - for _, subscription := range stream.Subscriptions { - sub, err := convertSubscription(subscription) - if err != nil { - log.WithError(err).Errorf("subscription convert error") - continue + if stream.publicOnly { + var subs []WebsocketSubscription + for _, subscription := range stream.Subscriptions { + sub, err := convertSubscription(subscription) + if err != nil { + log.WithError(err).Errorf("subscription convert error") + continue + } + + subs = append(subs, sub) + } + if len(subs) == 0 { + return } - subs = append(subs, sub) - } + log.Infof("subscribing channels: %+v", subs) + err := stream.Conn.WriteJSON(WebsocketOp{ + Op: "subscribe", + Args: subs, + }) - if len(subs) == 0 { - return - } + if err != nil { + log.WithError(err).Error("subscribe error") + } + } else { + // login as private channel + // sign example: + // sign=CryptoJS.enc.Base64.Stringify(CryptoJS.HmacSHA256(timestamp +'GET'+'/users/self/verify', secretKey)) + msTimestamp := strconv.FormatFloat(float64(time.Now().UnixNano())/float64(time.Second), 'f', -1, 64) + payload := msTimestamp + "GET" + "/users/self/verify" + sign := okexapi.Sign(payload, stream.Client.Secret) + op := WebsocketOp{ + Op: "login", + Args: []WebsocketLogin{ + { + Key: stream.Client.Key, + Passphrase: stream.Client.Passphrase, + Timestamp: msTimestamp, + Sign: sign, + }, + }, + } - log.Infof("subscribing channels: %+v", subs) - err := stream.Conn.WriteJSON(WebsocketOp{ - Op: "subscribe", - Args: subs, - }) - - if err != nil { - log.WithError(err).Error("subscribe error") + log.Infof("sending login request: %+v", op) + err := stream.Conn.WriteJSON(op) + if err != nil { + log.WithError(err).Errorf("can not send login message") + } } }) @@ -210,7 +266,6 @@ func (s *Stream) read(ctx context.Context) { } mt, message, err := s.Conn.ReadMessage() - if err != nil { // if it's a network timeout error, we should re-connect switch err := err.(type) {