bbgo_origin/pkg/exchange/ftx/stream.go

236 lines
6.1 KiB
Go
Raw Permalink Normal View History

package ftx
import (
"context"
2021-03-28 07:07:46 +00:00
"fmt"
2021-03-27 10:07:35 +00:00
"time"
2021-03-28 07:07:46 +00:00
"github.com/gorilla/websocket"
2022-02-18 07:35:58 +00:00
"github.com/pkg/errors"
2021-03-28 07:07:46 +00:00
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
)
2021-03-28 07:07:46 +00:00
const endpoint = "wss://ftx.com/ws/"
type Stream struct {
2021-03-02 14:18:41 +00:00
*types.StandardStream
ws *service.WebsocketClientBase
exchange *Exchange
key string
secret string
subAccount string
2021-03-28 07:07:46 +00:00
2021-03-28 08:29:36 +00:00
// subscriptions are only accessed in single goroutine environment, so I don't use mutex to protect them
subscriptions []websocketRequest
klineSubscriptions []klineSubscription
}
type klineSubscription struct {
symbol string
interval types.Interval
}
func NewStream(key, secret string, subAccount string, e *Exchange) *Stream {
s := &Stream{
exchange: e,
2021-03-28 07:07:46 +00:00
key: key,
secret: secret,
subAccount: subAccount,
2021-03-02 14:18:41 +00:00
StandardStream: &types.StandardStream{},
2021-03-28 07:07:46 +00:00
ws: service.NewWebsocketClientBase(endpoint, 3*time.Second),
}
2021-02-27 10:41:46 +00:00
2021-03-28 07:07:46 +00:00
s.ws.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage)
s.ws.OnConnected(func(conn *websocket.Conn) {
subs := []websocketRequest{newLoginRequest(s.key, s.secret, time.Now(), s.subAccount)}
2021-03-28 08:29:36 +00:00
subs = append(subs, s.subscriptions...)
for _, sub := range subs {
2021-03-28 07:07:46 +00:00
if err := conn.WriteJSON(sub); err != nil {
s.ws.EmitError(fmt.Errorf("failed to send subscription: %+v", sub))
}
}
s.EmitConnect()
2021-03-28 07:07:46 +00:00
})
2021-03-28 08:29:36 +00:00
return s
}
func (s *Stream) Connect(ctx context.Context) error {
2021-03-27 08:58:51 +00:00
// If it's not public only, let's do the authentication.
2021-12-23 06:14:48 +00:00
if !s.PublicOnly {
2021-03-27 11:02:19 +00:00
s.subscribePrivateEvents()
2021-03-27 10:07:35 +00:00
}
2021-03-28 07:07:46 +00:00
if err := s.ws.Connect(ctx); err != nil {
2021-03-27 10:07:35 +00:00
return err
2021-03-27 08:58:51 +00:00
}
s.EmitStart()
go s.pollKLines(ctx)
2021-03-27 08:58:51 +00:00
2021-03-28 08:29:36 +00:00
go func() {
// https://docs.ftx.com/?javascript#request-process
tk := time.NewTicker(15 * time.Second)
defer tk.Stop()
for {
select {
case <-ctx.Done():
2022-02-18 07:35:58 +00:00
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
logger.WithError(err).Errorf("context returned error")
2021-03-28 08:29:36 +00:00
}
2022-02-18 07:35:58 +00:00
2021-03-28 08:29:36 +00:00
case <-tk.C:
if err := s.ws.Conn().WriteJSON(websocketRequest{
Operation: ping,
}); err != nil {
logger.WithError(err).Warnf("failed to ping, try in next tick")
}
}
}
}()
2021-03-27 08:58:51 +00:00
return nil
}
2021-03-27 11:02:19 +00:00
func (s *Stream) subscribePrivateEvents() {
2021-03-28 07:07:46 +00:00
s.addSubscription(websocketRequest{
2021-03-27 11:02:19 +00:00
Operation: subscribe,
Channel: privateOrdersChannel,
})
2021-03-28 07:07:46 +00:00
s.addSubscription(websocketRequest{
2021-03-28 06:42:54 +00:00
Operation: subscribe,
Channel: privateTradesChannel,
})
2021-03-27 11:02:19 +00:00
}
2021-03-28 07:07:46 +00:00
func (s *Stream) addSubscription(request websocketRequest) {
s.subscriptions = append(s.subscriptions, request)
}
func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.SubscribeOptions) {
switch channel {
case types.BookChannel:
s.addSubscription(websocketRequest{
Operation: subscribe,
Channel: orderBookChannel,
Market: toLocalSymbol(TrimUpperString(symbol)),
})
return
case types.BookTickerChannel:
s.addSubscription(websocketRequest{
Operation: subscribe,
Channel: bookTickerChannel,
Market: toLocalSymbol(TrimUpperString(symbol)),
})
return
case types.KLineChannel:
// FTX does not support kline channel, do polling
interval := types.Interval(option.Interval)
ks := klineSubscription{symbol: symbol, interval: interval}
s.klineSubscriptions = append(s.klineSubscriptions, ks)
return
case types.MarketTradeChannel:
s.addSubscription(websocketRequest{
Operation: subscribe,
Channel: marketTradeChannel,
Market: toLocalSymbol(TrimUpperString(symbol)),
})
return
default:
panic("only support book/kline/trade channel now")
2021-02-27 10:41:46 +00:00
}
}
func (s *Stream) pollKLines(ctx context.Context) {
2022-04-19 13:29:45 +00:00
lastClosed := make(map[string]map[types.Interval]time.Time, 0)
// get current kline candle
for _, sub := range s.klineSubscriptions {
klines := getLast2KLine(s.exchange, ctx, sub.symbol, sub.interval)
2022-04-19 13:29:45 +00:00
lastClosed[sub.symbol] = make(map[types.Interval]time.Time, 0)
2021-05-31 14:56:26 +00:00
if len(klines) > 0 {
// handle mutiple klines, get the latest one
2022-04-19 13:29:45 +00:00
if lastClosed[sub.symbol][sub.interval].Unix() < klines[0].StartTime.Unix() {
2022-01-01 18:34:29 +00:00
s.EmitKLine(klines[0])
s.EmitKLineClosed(klines[0])
2022-04-19 13:29:45 +00:00
lastClosed[sub.symbol][sub.interval] = klines[0].StartTime.Time()
2022-01-01 18:34:29 +00:00
}
if len(klines) > 1 {
s.EmitKLine(klines[1])
}
}
}
// the highest resolution of kline is 1min
ticker := time.NewTicker(time.Second * 30)
2021-05-24 01:45:33 +00:00
defer ticker.Stop()
2021-05-24 01:45:33 +00:00
for {
select {
case <-ctx.Done():
2022-02-18 10:21:51 +00:00
if err := ctx.Err(); err != nil && !errors.Is(err, context.Canceled) {
logger.WithError(err).Errorf("context returned error")
}
return
2021-05-24 01:45:33 +00:00
case <-ticker.C:
now := time.Now().Truncate(time.Minute)
for _, sub := range s.klineSubscriptions {
subTime := now.Truncate(sub.interval.Duration())
if now != subTime {
// not in the checking time slot, check next subscription
continue
}
klines := getLast2KLine(s.exchange, ctx, sub.symbol, sub.interval)
2021-09-03 07:36:50 +00:00
if len(klines) > 0 {
// handle mutiple klines, get the latest one
2022-04-19 13:29:45 +00:00
if lastClosed[sub.symbol][sub.interval].Unix() < klines[0].StartTime.Unix() {
s.EmitKLine(klines[0])
s.EmitKLineClosed(klines[0])
2022-04-19 13:29:45 +00:00
lastClosed[sub.symbol][sub.interval] = klines[0].StartTime.Time()
}
2021-12-21 12:46:40 +00:00
if len(klines) > 1 {
s.EmitKLine(klines[1])
}
}
}
}
2021-05-24 01:45:33 +00:00
}
}
func getLast2KLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine {
2021-05-24 01:45:33 +00:00
// set since to more 30s ago to avoid getting no kline candle
since := time.Now().Add(time.Duration(interval.Minutes()*-3) * time.Minute)
2021-05-24 01:45:33 +00:00
klines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
StartTime: &since,
Limit: 2,
2021-05-24 01:45:33 +00:00
})
if err != nil {
logger.WithError(err).Errorf("failed to get kline data")
return klines
}
2021-05-24 01:45:33 +00:00
return klines
}
func getLastClosedKLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine {
// set since to more 30s ago to avoid getting no kline candle
klines := getLast2KLine(e, ctx, symbol, interval)
2021-12-21 12:46:40 +00:00
if len(klines) == 0 {
return []types.KLine{}
}
return []types.KLine{klines[0]}
}
2021-03-27 10:07:35 +00:00
2021-02-27 11:27:44 +00:00
func (s *Stream) Close() error {
2021-03-28 07:07:46 +00:00
s.subscriptions = nil
if s.ws != nil {
return s.ws.Conn().Close()
2021-03-27 01:54:12 +00:00
}
return nil
}