From 3bcd5a8e837d90da7140635c1e58209877cceb3d Mon Sep 17 00:00:00 2001 From: ycdesu Date: Sat, 27 Mar 2021 09:54:12 +0800 Subject: [PATCH 1/5] ftx: null guard in close --- pkg/exchange/ftx/stream.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/exchange/ftx/stream.go b/pkg/exchange/ftx/stream.go index dd4fb8d2d..8da44f6d9 100644 --- a/pkg/exchange/ftx/stream.go +++ b/pkg/exchange/ftx/stream.go @@ -41,5 +41,8 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, _ types.Subscri } } func (s *Stream) Close() error { - return s.wsService.Close() + if s.wsService != nil { + return s.wsService.Close() + } + return nil } From 691251169d12502be4cb775d4f649d8ac0480961 Mon Sep 17 00:00:00 2001 From: ycdesu Date: Sat, 27 Mar 2021 16:58:51 +0800 Subject: [PATCH 2/5] ftx: define ws login request --- pkg/exchange/ftx/stream.go | 9 ++++- pkg/exchange/ftx/websocket_messages.go | 44 ++++++++++++++++++++- pkg/exchange/ftx/websocket_messages_test.go | 12 ++++++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/pkg/exchange/ftx/stream.go b/pkg/exchange/ftx/stream.go index 8da44f6d9..c40bfea0f 100644 --- a/pkg/exchange/ftx/stream.go +++ b/pkg/exchange/ftx/stream.go @@ -28,7 +28,14 @@ func NewStream(key, secret string) *Stream { } func (s *Stream) Connect(ctx context.Context) error { - return s.wsService.Connect(ctx) + if err := s.wsService.Connect(ctx); err != nil { + return err + } + // If it's not public only, let's do the authentication. + if atomic.LoadInt32(&s.publicOnly) == 0 { + } + + return nil } func (s *Stream) SetPublicOnly() { diff --git a/pkg/exchange/ftx/websocket_messages.go b/pkg/exchange/ftx/websocket_messages.go index 2422ca672..47c8bb94c 100644 --- a/pkg/exchange/ftx/websocket_messages.go +++ b/pkg/exchange/ftx/websocket_messages.go @@ -15,6 +15,7 @@ import ( type operation string +const login operation = "login" const subscribe operation = "subscribe" const unsubscribe operation = "unsubscribe" @@ -24,7 +25,48 @@ const orderbook channel = "orderbook" const trades channel = "trades" const ticker channel = "ticker" -// {'op': 'subscribe', 'channel': 'trades', 'market': 'BTC-PERP'} +type websocketRequest struct { + Operation operation `json:"op"` + + // {'op': 'subscribe', 'channel': 'trades', 'market': 'BTC-PERP'} + Channel channel `json:"channel,omitempty"` + Market string `json:"market,omitempty"` + + Login loginArgs `json:"args,omitempty"` +} + +/* +{ + "args": { + "key": "", + "sign": "", + "time": + }, + "op": "login" +} +*/ +type loginArgs struct { + Key string `json:"key"` + Signature string `json:"sign"` + Time int64 `json:"time"` +} + +func newLoginRequest(key, secret string, t time.Time) websocketRequest { + millis := t.UnixNano() / int64(time.Millisecond) + return websocketRequest{ + Operation: login, + Login: loginArgs{ + Key: key, + Signature: sign(secret, loginBody(millis)), + Time: millis, + }, + } +} + +func loginBody(millis int64) string { + return fmt.Sprintf("%dwebsocket_login", millis) +} + type SubscribeRequest struct { Operation operation `json:"op"` Channel channel `json:"channel"` diff --git a/pkg/exchange/ftx/websocket_messages_test.go b/pkg/exchange/ftx/websocket_messages_test.go index 1196385f3..97122331c 100644 --- a/pkg/exchange/ftx/websocket_messages_test.go +++ b/pkg/exchange/ftx/websocket_messages_test.go @@ -3,7 +3,9 @@ package ftx import ( "encoding/json" "io/ioutil" + "strings" "testing" + "time" "github.com/stretchr/testify/assert" @@ -170,3 +172,13 @@ func Test_insertAt(t *testing.T) { r = insertAt([][]json.Number{{"1.2", "2"}, {"1.4", "2"}}, 2, []json.Number{"1.5", "2"}) assert.Equal(t, [][]json.Number{{"1.2", "2"}, {"1.4", "2"}, {"1.5", "2"}}, r) } + +func Test_newLoginRequest(t *testing.T) { + // From API doc: https://docs.ftx.com/?javascript#authentication-2 + r := newLoginRequest("", "Y2QTHI23f23f23jfjas23f23To0RfUwX3H42fvN-", time.Unix(0, 1557246346499*int64(time.Millisecond))) + expectedSignature := "d10b5a67a1a941ae9463a60b285ae845cdeac1b11edc7da9977bef0228b96de9" + assert.Equal(t, expectedSignature, r.Login.Signature) + jsonStr, err := json.Marshal(r) + assert.NoError(t, err) + assert.True(t, strings.Contains(string(jsonStr), expectedSignature)) +} From 34548f185c1ffc6bd3ed04672345b63b2e3c4fd6 Mon Sep 17 00:00:00 2001 From: ycdesu Date: Sat, 27 Mar 2021 17:00:07 +0800 Subject: [PATCH 3/5] ftx: add missing ftx case --- pkg/types/exchange.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/types/exchange.go b/pkg/types/exchange.go index 4e3121765..db80845d6 100644 --- a/pkg/types/exchange.go +++ b/pkg/types/exchange.go @@ -7,8 +7,6 @@ import ( "fmt" "strings" "time" - - "github.com/pkg/errors" ) const DateFormat = "2006-01-02" @@ -51,9 +49,11 @@ func ValidExchangeName(a string) (ExchangeName, error) { return ExchangeMax, nil case "binance", "bn": return ExchangeBinance, nil + case "ftx": + return ExchangeFTX, nil } - return "", errors.New("invalid exchange name") + return "", fmt.Errorf("invalid exchange name: %s", a) } type Exchange interface { From 24254a869d68698c992a78bb0ff030f93d8f408c Mon Sep 17 00:00:00 2001 From: ycdesu Date: Sat, 27 Mar 2021 17:37:16 +0800 Subject: [PATCH 4/5] ftx: invoke SetPublicOnly in orderbook command --- pkg/cmd/orderbook.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/cmd/orderbook.go b/pkg/cmd/orderbook.go index e0a0fb09d..37cfe7afc 100644 --- a/pkg/cmd/orderbook.go +++ b/pkg/cmd/orderbook.go @@ -44,6 +44,7 @@ var orderbookCmd = &cobra.Command{ } s := ex.NewStream() + s.SetPublicOnly() s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) s.OnBookSnapshot(func(book types.OrderBook) { log.Infof("orderbook snapshot: %s", book.String()) From f60f1ef52efe9ec336cb9fc572cb6c1a168c4d50 Mon Sep 17 00:00:00 2001 From: ycdesu Date: Sat, 27 Mar 2021 18:07:35 +0800 Subject: [PATCH 5/5] ftx: authenticate websocket --- pkg/cmd/orderbook.go | 70 ++++++++++++++++++++++ pkg/exchange/ftx/stream.go | 22 +++++-- pkg/exchange/ftx/stream_message_handler.go | 1 + pkg/exchange/ftx/websocket.go | 26 ++++---- pkg/exchange/ftx/websocket_messages.go | 6 -- 5 files changed, 98 insertions(+), 27 deletions(-) diff --git a/pkg/cmd/orderbook.go b/pkg/cmd/orderbook.go index 37cfe7afc..8496db631 100644 --- a/pkg/cmd/orderbook.go +++ b/pkg/cmd/orderbook.go @@ -3,11 +3,14 @@ package cmd import ( "context" "fmt" + "os" "syscall" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/cmd/cmdutil" "github.com/c9s/bbgo/pkg/types" ) @@ -63,9 +66,76 @@ var orderbookCmd = &cobra.Command{ }, } +// go run ./cmd/bbgo orderupdate +var orderUpdateCmd = &cobra.Command{ + Use: "orderupdate", + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + + configFile, err := cmd.Flags().GetString("config") + if err != nil { + return err + } + + if len(configFile) == 0 { + return errors.New("--config option is required") + } + + // if config file exists, use the config loaded from the config file. + // otherwise, use a empty config object + var userConfig *bbgo.Config + if _, err := os.Stat(configFile); err == nil { + // load successfully + userConfig, err = bbgo.Load(configFile, false) + if err != nil { + return err + } + } else if os.IsNotExist(err) { + // config file doesn't exist + userConfig = &bbgo.Config{} + } else { + // other error + return err + } + + environ := bbgo.NewEnvironment() + + if err := environ.ConfigureExchangeSessions(userConfig); err != nil { + return err + } + + sessionName, err := cmd.Flags().GetString("session") + if err != nil { + return err + } + + session, ok := environ.Session(sessionName) + if !ok { + return fmt.Errorf("session %s not found", sessionName) + } + + s := session.Exchange.NewStream() + s.OnOrderUpdate(func(order types.Order) { + log.Infof("order update: %+v", order) + }) + + log.Infof("connecting...") + if err := s.Connect(ctx); err != nil { + return fmt.Errorf("failed to connect to %s", sessionName) + } + log.Infof("connected") + + cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) + return nil + }, +} + func init() { // since the public data does not require trading authentication, we use --exchange option here. orderbookCmd.Flags().String("exchange", "", "the exchange name for sync") orderbookCmd.Flags().String("symbol", "", "the trading pair. e.g, BTCUSDT, LTCUSDT...") + + orderUpdateCmd.Flags().String("session", "", "session name") RootCmd.AddCommand(orderbookCmd) + RootCmd.AddCommand(orderUpdateCmd) } diff --git a/pkg/exchange/ftx/stream.go b/pkg/exchange/ftx/stream.go index c40bfea0f..1efcdb670 100644 --- a/pkg/exchange/ftx/stream.go +++ b/pkg/exchange/ftx/stream.go @@ -3,6 +3,7 @@ package ftx import ( "context" "sync/atomic" + "time" "github.com/c9s/bbgo/pkg/types" ) @@ -28,11 +29,16 @@ func NewStream(key, secret string) *Stream { } func (s *Stream) Connect(ctx context.Context) error { - if err := s.wsService.Connect(ctx); err != nil { - return err - } // If it's not public only, let's do the authentication. if atomic.LoadInt32(&s.publicOnly) == 0 { + logger.Infof("subscribe private events") + s.wsService.Subscribe( + newLoginRequest(s.wsService.key, s.wsService.secret, time.Now()), + ) + } + + if err := s.wsService.Connect(ctx); err != nil { + return err } return nil @@ -43,10 +49,16 @@ func (s *Stream) SetPublicOnly() { } func (s *Stream) Subscribe(channel types.Channel, symbol string, _ types.SubscribeOptions) { - if err := s.wsService.Subscribe(channel, symbol); err != nil { - logger.WithError(err).Errorf("subscribe failed, should never happen") + if channel != types.BookChannel { + // TODO: return err } + s.wsService.Subscribe(websocketRequest{ + Operation: subscribe, + Channel: orderbook, + Market: TrimUpperString(symbol), + }) } + func (s *Stream) Close() error { if s.wsService != nil { return s.wsService.Close() diff --git a/pkg/exchange/ftx/stream_message_handler.go b/pkg/exchange/ftx/stream_message_handler.go index 9c6cf19f2..b60678062 100644 --- a/pkg/exchange/ftx/stream_message_handler.go +++ b/pkg/exchange/ftx/stream_message_handler.go @@ -13,6 +13,7 @@ type messageHandler struct { } func (h *messageHandler) handleMessage(message []byte) { + log.Infof("raw: %s", string(message)) var r rawResponse if err := json.Unmarshal(message, &r); err != nil { logger.WithError(err).Errorf("failed to unmarshal resp: %s", string(message)) diff --git a/pkg/exchange/ftx/websocket.go b/pkg/exchange/ftx/websocket.go index 5a448ea26..086b6e3b7 100644 --- a/pkg/exchange/ftx/websocket.go +++ b/pkg/exchange/ftx/websocket.go @@ -2,13 +2,11 @@ package ftx import ( "fmt" - "strings" "time" "github.com/gorilla/websocket" "github.com/c9s/bbgo/pkg/service" - "github.com/c9s/bbgo/pkg/types" ) type WebsocketService struct { @@ -17,7 +15,7 @@ type WebsocketService struct { key string secret string - subscriptions []SubscribeRequest + subscriptions []websocketRequest } const endpoint = "wss://ftx.com/ws/" @@ -36,18 +34,8 @@ func NewWebsocketService(key string, secret string) *WebsocketService { return s } -func (w *WebsocketService) Subscribe(channel types.Channel, symbol string) error { - r := SubscribeRequest{ - Operation: subscribe, - } - if channel != types.BookChannel { - return fmt.Errorf("unsupported channel %+v", channel) - } - r.Channel = orderbook - r.Market = strings.ToUpper(strings.TrimSpace(symbol)) - - w.subscriptions = append(w.subscriptions, r) - return nil +func (w *WebsocketService) Subscribe(request websocketRequest) { + w.subscriptions = append(w.subscriptions, request) } var errSubscriptionFailed = fmt.Errorf("failed to subscribe") @@ -55,6 +43,7 @@ var errSubscriptionFailed = fmt.Errorf("failed to subscribe") func (w *WebsocketService) sendSubscriptions() error { conn := w.Conn() for _, s := range w.subscriptions { + logger.Infof("s: %+v", s) if err := conn.WriteJSON(s); err != nil { return fmt.Errorf("can't send subscription request %+v: %w", s, errSubscriptionFailed) } @@ -62,6 +51,11 @@ func (w *WebsocketService) sendSubscriptions() error { return nil } +// After closing the websocket connection, you have to subscribe all events again func (w *WebsocketService) Close() error { - return w.Conn().Close() + w.subscriptions = nil + if conn := w.Conn(); conn != nil { + return conn.Close() + } + return nil } diff --git a/pkg/exchange/ftx/websocket_messages.go b/pkg/exchange/ftx/websocket_messages.go index 47c8bb94c..3ea1f0f10 100644 --- a/pkg/exchange/ftx/websocket_messages.go +++ b/pkg/exchange/ftx/websocket_messages.go @@ -67,12 +67,6 @@ func loginBody(millis int64) string { return fmt.Sprintf("%dwebsocket_login", millis) } -type SubscribeRequest struct { - Operation operation `json:"op"` - Channel channel `json:"channel"` - Market string `json:"market"` -} - type respType string const errRespType respType = "error"