diff --git a/pkg/exchange/ftx/stream.go b/pkg/exchange/ftx/stream.go index f5792912a..e1bfb7c31 100644 --- a/pkg/exchange/ftx/stream.go +++ b/pkg/exchange/ftx/stream.go @@ -2,29 +2,48 @@ package ftx import ( "context" + "fmt" "sync/atomic" "time" + "github.com/gorilla/websocket" + + "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/types" ) +const endpoint = "wss://ftx.com/ws/" + type Stream struct { *types.StandardStream - wsService *WebsocketService + ws *service.WebsocketClientBase - // publicOnly must be accessed atomically + // publicOnly can only be configured before connecting publicOnly int32 + + key string + secret string + + subscriptions []websocketRequest } func NewStream(key, secret string) *Stream { - wss := NewWebsocketService(key, secret) s := &Stream{ + key: key, + secret: secret, StandardStream: &types.StandardStream{}, - wsService: wss, + ws: service.NewWebsocketClientBase(endpoint, 3*time.Second), } - wss.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage) + s.ws.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage) + s.ws.OnConnected(func(conn *websocket.Conn) { + for _, sub := range s.subscriptions { + if err := conn.WriteJSON(sub); err != nil { + s.ws.EmitError(fmt.Errorf("failed to send subscription: %+v", sub)) + } + } + }) return s } @@ -35,7 +54,7 @@ func (s *Stream) Connect(ctx context.Context) error { s.subscribePrivateEvents() } - if err := s.wsService.Connect(ctx); err != nil { + if err := s.ws.Connect(ctx); err != nil { return err } @@ -43,28 +62,33 @@ func (s *Stream) Connect(ctx context.Context) error { } func (s *Stream) subscribePrivateEvents() { - s.wsService.Subscribe( - newLoginRequest(s.wsService.key, s.wsService.secret, time.Now()), + s.addSubscription( + newLoginRequest(s.key, s.secret, time.Now()), ) - s.wsService.Subscribe(websocketRequest{ + s.addSubscription(websocketRequest{ Operation: subscribe, Channel: privateOrdersChannel, }) - s.wsService.Subscribe(websocketRequest{ + s.addSubscription(websocketRequest{ Operation: subscribe, Channel: privateTradesChannel, }) } +func (s *Stream) addSubscription(request websocketRequest) { + s.subscriptions = append(s.subscriptions, request) +} + func (s *Stream) SetPublicOnly() { atomic.StoreInt32(&s.publicOnly, 1) } func (s *Stream) Subscribe(channel types.Channel, symbol string, _ types.SubscribeOptions) { if channel != types.BookChannel { - // TODO: return err + logger.Errorf("only support orderbook channel") + return } - s.wsService.Subscribe(websocketRequest{ + s.addSubscription(websocketRequest{ Operation: subscribe, Channel: orderBookChannel, Market: TrimUpperString(symbol), @@ -72,8 +96,9 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, _ types.Subscri } func (s *Stream) Close() error { - if s.wsService != nil { - return s.wsService.Close() + s.subscriptions = nil + if s.ws != nil { + return s.ws.Conn().Close() } return nil } diff --git a/pkg/exchange/ftx/websocket.go b/pkg/exchange/ftx/websocket.go deleted file mode 100644 index ec3f446fe..000000000 --- a/pkg/exchange/ftx/websocket.go +++ /dev/null @@ -1,60 +0,0 @@ -package ftx - -import ( - "fmt" - "time" - - "github.com/gorilla/websocket" - - "github.com/c9s/bbgo/pkg/service" -) - -type WebsocketService struct { - *service.WebsocketClientBase - - key string - secret string - - subscriptions []websocketRequest -} - -const endpoint = "wss://ftx.com/ws/" - -func NewWebsocketService(key string, secret string) *WebsocketService { - s := &WebsocketService{ - WebsocketClientBase: service.NewWebsocketClientBase(endpoint, 3*time.Second), - key: key, - secret: secret, - } - s.OnConnected(func(_ *websocket.Conn) { - if err := s.sendSubscriptions(); err != nil { - s.EmitError(err) - } - }) - return s -} - -func (w *WebsocketService) Subscribe(request websocketRequest) { - w.subscriptions = append(w.subscriptions, request) -} - -var errSubscriptionFailed = fmt.Errorf("failed to subscribe") - -func (w *WebsocketService) sendSubscriptions() error { - conn := w.Conn() - for _, s := range w.subscriptions { - if err := conn.WriteJSON(s); err != nil { - return fmt.Errorf("can't send subscription request %+v: %w", s, errSubscriptionFailed) - } - } - return nil -} - -// After closing the websocket connection, you have to subscribe all events again -func (w *WebsocketService) Close() error { - w.subscriptions = nil - if conn := w.Conn(); conn != nil { - return conn.Close() - } - return nil -}