ftx: remove redundant abstraction

This commit is contained in:
ycdesu 2021-03-28 15:07:46 +08:00
parent e152aa1036
commit 53c9b0a606
2 changed files with 39 additions and 74 deletions

View File

@ -2,29 +2,48 @@ package ftx
import ( import (
"context" "context"
"fmt"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/gorilla/websocket"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
const endpoint = "wss://ftx.com/ws/"
type Stream struct { type Stream struct {
*types.StandardStream *types.StandardStream
wsService *WebsocketService ws *service.WebsocketClientBase
// publicOnly must be accessed atomically // publicOnly can only be configured before connecting
publicOnly int32 publicOnly int32
key string
secret string
subscriptions []websocketRequest
} }
func NewStream(key, secret string) *Stream { func NewStream(key, secret string) *Stream {
wss := NewWebsocketService(key, secret)
s := &Stream{ s := &Stream{
key: key,
secret: secret,
StandardStream: &types.StandardStream{}, StandardStream: &types.StandardStream{},
wsService: wss, ws: service.NewWebsocketClientBase(endpoint, 3*time.Second),
} }
wss.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage) s.ws.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage)
s.ws.OnConnected(func(conn *websocket.Conn) {
for _, sub := range s.subscriptions {
if err := conn.WriteJSON(sub); err != nil {
s.ws.EmitError(fmt.Errorf("failed to send subscription: %+v", sub))
}
}
})
return s return s
} }
@ -35,7 +54,7 @@ func (s *Stream) Connect(ctx context.Context) error {
s.subscribePrivateEvents() s.subscribePrivateEvents()
} }
if err := s.wsService.Connect(ctx); err != nil { if err := s.ws.Connect(ctx); err != nil {
return err return err
} }
@ -43,28 +62,33 @@ func (s *Stream) Connect(ctx context.Context) error {
} }
func (s *Stream) subscribePrivateEvents() { func (s *Stream) subscribePrivateEvents() {
s.wsService.Subscribe( s.addSubscription(
newLoginRequest(s.wsService.key, s.wsService.secret, time.Now()), newLoginRequest(s.key, s.secret, time.Now()),
) )
s.wsService.Subscribe(websocketRequest{ s.addSubscription(websocketRequest{
Operation: subscribe, Operation: subscribe,
Channel: privateOrdersChannel, Channel: privateOrdersChannel,
}) })
s.wsService.Subscribe(websocketRequest{ s.addSubscription(websocketRequest{
Operation: subscribe, Operation: subscribe,
Channel: privateTradesChannel, Channel: privateTradesChannel,
}) })
} }
func (s *Stream) addSubscription(request websocketRequest) {
s.subscriptions = append(s.subscriptions, request)
}
func (s *Stream) SetPublicOnly() { func (s *Stream) SetPublicOnly() {
atomic.StoreInt32(&s.publicOnly, 1) atomic.StoreInt32(&s.publicOnly, 1)
} }
func (s *Stream) Subscribe(channel types.Channel, symbol string, _ types.SubscribeOptions) { func (s *Stream) Subscribe(channel types.Channel, symbol string, _ types.SubscribeOptions) {
if channel != types.BookChannel { if channel != types.BookChannel {
// TODO: return err logger.Errorf("only support orderbook channel")
return
} }
s.wsService.Subscribe(websocketRequest{ s.addSubscription(websocketRequest{
Operation: subscribe, Operation: subscribe,
Channel: orderBookChannel, Channel: orderBookChannel,
Market: TrimUpperString(symbol), Market: TrimUpperString(symbol),
@ -72,8 +96,9 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, _ types.Subscri
} }
func (s *Stream) Close() error { func (s *Stream) Close() error {
if s.wsService != nil { s.subscriptions = nil
return s.wsService.Close() if s.ws != nil {
return s.ws.Conn().Close()
} }
return nil return nil
} }

View File

@ -1,60 +0,0 @@
package ftx
import (
"fmt"
"time"
"github.com/gorilla/websocket"
"github.com/c9s/bbgo/pkg/service"
)
type WebsocketService struct {
*service.WebsocketClientBase
key string
secret string
subscriptions []websocketRequest
}
const endpoint = "wss://ftx.com/ws/"
func NewWebsocketService(key string, secret string) *WebsocketService {
s := &WebsocketService{
WebsocketClientBase: service.NewWebsocketClientBase(endpoint, 3*time.Second),
key: key,
secret: secret,
}
s.OnConnected(func(_ *websocket.Conn) {
if err := s.sendSubscriptions(); err != nil {
s.EmitError(err)
}
})
return s
}
func (w *WebsocketService) Subscribe(request websocketRequest) {
w.subscriptions = append(w.subscriptions, request)
}
var errSubscriptionFailed = fmt.Errorf("failed to subscribe")
func (w *WebsocketService) sendSubscriptions() error {
conn := w.Conn()
for _, s := range w.subscriptions {
if err := conn.WriteJSON(s); err != nil {
return fmt.Errorf("can't send subscription request %+v: %w", s, errSubscriptionFailed)
}
}
return nil
}
// After closing the websocket connection, you have to subscribe all events again
func (w *WebsocketService) Close() error {
w.subscriptions = nil
if conn := w.Conn(); conn != nil {
return conn.Close()
}
return nil
}