bbgo_origin/pkg/exchange/ftx/stream.go

123 lines
2.7 KiB
Go
Raw Normal View History

package ftx
import (
"context"
2021-03-28 07:07:46 +00:00
"fmt"
"sync/atomic"
2021-03-27 10:07:35 +00:00
"time"
2021-03-28 07:07:46 +00:00
"github.com/gorilla/websocket"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
)
2021-03-28 07:07:46 +00:00
const endpoint = "wss://ftx.com/ws/"
type Stream struct {
2021-03-02 14:18:41 +00:00
*types.StandardStream
2021-03-28 07:07:46 +00:00
ws *service.WebsocketClientBase
2021-03-28 07:07:46 +00:00
// publicOnly can only be configured before connecting
publicOnly int32
2021-03-28 07:07:46 +00:00
key string
secret string
2021-03-28 08:29:36 +00:00
// subscriptions are only accessed in single goroutine environment, so I don't use mutex to protect them
2021-03-28 07:07:46 +00:00
subscriptions []websocketRequest
}
func NewStream(key, secret string) *Stream {
s := &Stream{
2021-03-28 07:07:46 +00:00
key: key,
secret: secret,
2021-03-02 14:18:41 +00:00
StandardStream: &types.StandardStream{},
2021-03-28 07:07:46 +00:00
ws: service.NewWebsocketClientBase(endpoint, 3*time.Second),
}
2021-02-27 10:41:46 +00:00
2021-03-28 07:07:46 +00:00
s.ws.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage)
s.ws.OnConnected(func(conn *websocket.Conn) {
2021-03-28 08:29:36 +00:00
subs := []websocketRequest{newLoginRequest(s.key, s.secret, time.Now())}
subs = append(subs, s.subscriptions...)
for _, sub := range subs {
2021-03-28 07:07:46 +00:00
if err := conn.WriteJSON(sub); err != nil {
s.ws.EmitError(fmt.Errorf("failed to send subscription: %+v", sub))
}
}
})
2021-03-28 08:29:36 +00:00
return s
}
func (s *Stream) Connect(ctx context.Context) error {
2021-03-27 08:58:51 +00:00
// If it's not public only, let's do the authentication.
if atomic.LoadInt32(&s.publicOnly) == 0 {
2021-03-27 11:02:19 +00:00
s.subscribePrivateEvents()
2021-03-27 10:07:35 +00:00
}
2021-03-28 07:07:46 +00:00
if err := s.ws.Connect(ctx); err != nil {
2021-03-27 10:07:35 +00:00
return err
2021-03-27 08:58:51 +00:00
}
2021-03-28 08:29:36 +00:00
go func() {
// https://docs.ftx.com/?javascript#request-process
tk := time.NewTicker(15 * time.Second)
defer tk.Stop()
for {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil {
logger.WithError(err).Errorf("websocket ping goroutine is terminated")
}
case <-tk.C:
if err := s.ws.Conn().WriteJSON(websocketRequest{
Operation: ping,
}); err != nil {
logger.WithError(err).Warnf("failed to ping, try in next tick")
}
}
}
}()
2021-03-27 08:58:51 +00:00
return nil
}
2021-03-27 11:02:19 +00:00
func (s *Stream) subscribePrivateEvents() {
2021-03-28 07:07:46 +00:00
s.addSubscription(websocketRequest{
2021-03-27 11:02:19 +00:00
Operation: subscribe,
Channel: privateOrdersChannel,
})
2021-03-28 07:07:46 +00:00
s.addSubscription(websocketRequest{
2021-03-28 06:42:54 +00:00
Operation: subscribe,
Channel: privateTradesChannel,
})
2021-03-27 11:02:19 +00:00
}
2021-03-28 07:07:46 +00:00
func (s *Stream) addSubscription(request websocketRequest) {
s.subscriptions = append(s.subscriptions, request)
}
func (s *Stream) SetPublicOnly() {
atomic.StoreInt32(&s.publicOnly, 1)
}
2021-02-27 10:41:46 +00:00
func (s *Stream) Subscribe(channel types.Channel, symbol string, _ types.SubscribeOptions) {
2021-03-27 10:07:35 +00:00
if channel != types.BookChannel {
panic("only support book channel now")
2021-02-27 10:41:46 +00:00
}
2021-03-28 07:07:46 +00:00
s.addSubscription(websocketRequest{
2021-03-27 10:07:35 +00:00
Operation: subscribe,
Channel: orderBookChannel,
2021-03-27 10:07:35 +00:00
Market: TrimUpperString(symbol),
})
}
2021-03-27 10:07:35 +00:00
2021-02-27 11:27:44 +00:00
func (s *Stream) Close() error {
2021-03-28 07:07:46 +00:00
s.subscriptions = nil
if s.ws != nil {
return s.ws.Conn().Close()
2021-03-27 01:54:12 +00:00
}
return nil
}