fix(ftx): iterate subscription arraywhile polling klines

This commit is contained in:
Jui-Nan Lin 2021-05-25 18:37:45 +08:00
parent a7a141c3ea
commit 1318f221b2
2 changed files with 37 additions and 30 deletions

View File

@ -100,7 +100,7 @@ var rootCmd = &cobra.Command{
})
stream.OnBalanceSnapshot(func(balances types.BalanceMap) {
log.Infof("balances: %+v",balances)
log.Infof("balances: %+v", balances)
})
streambook := types.NewStreamBook(symbol)

View File

@ -18,7 +18,6 @@ type Stream struct {
*types.StandardStream
ws *service.WebsocketClientBase
km chan klineMessage
exchange *Exchange
// publicOnly can only be configured before connecting
@ -29,10 +28,11 @@ type Stream struct {
subAccount string
// subscriptions are only accessed in single goroutine environment, so I don't use mutex to protect them
subscriptions []websocketRequest
subscriptions []websocketRequest
klineSubscriptions []klineSubscription
}
type klineMessage struct {
type klineSubscription struct {
symbol string
interval types.Interval
}
@ -41,7 +41,6 @@ func NewStream(key, secret string, subAccount string, e *Exchange) *Stream {
s := &Stream{
exchange: e,
key: key,
km: make(chan klineMessage),
secret: secret,
subAccount: subAccount,
StandardStream: &types.StandardStream{},
@ -74,7 +73,7 @@ func (s *Stream) Connect(ctx context.Context) error {
return err
}
s.EmitStart()
go s.handleChannelKlineMessage(ctx)
go s.pollKLines(ctx)
go func() {
// https://docs.ftx.com/?javascript#request-process
@ -127,43 +126,51 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.Su
} else if channel == types.KLineChannel {
// FTX does not support kline channel, do polling
go s.pollKLines(symbol, option)
interval := types.Interval(option.Interval)
ks := klineSubscription{symbol: symbol, interval: interval}
s.klineSubscriptions = append(s.klineSubscriptions, ks)
} else {
panic("only support book/kline channel now")
}
}
func (s *Stream) handleChannelKlineMessage(ctx context.Context) {
for {
select {
case km := <-s.km:
// FTX only returns closed kline
klines := getLastKLine(s.exchange, ctx, km.symbol, km.interval)
if len(klines) >= 0 {
// handle mutiple klines, get the latest one
s.EmitKLineClosed(klines[len(klines)-1])
}
func (s *Stream) pollKLines(ctx context.Context) {
// get current kline candle
for _, sub := range s.klineSubscriptions {
klines := getLastKLine(s.exchange, ctx, sub.symbol, sub.interval)
if len(klines) >= 0 {
// handle mutiple klines, get the latest one
s.EmitKLineClosed(klines[len(klines)-1])
}
}
}
func (s *Stream) pollKLines(symbol string, option types.SubscribeOptions) {
interval := types.Interval(option.Interval)
if !isIntervalSupportedInKLine(interval) {
logger.Errorf("not supported kline interval %s", option.Interval)
return
}
// get current kline candle
s.km <- klineMessage{symbol: symbol, interval: interval}
ticker := time.NewTicker(interval.Duration())
// the highest resolution of kline is 1min
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil {
logger.WithError(err).Errorf("pollKLines goroutine is terminated")
}
return
case <-ticker.C:
s.km <- klineMessage{symbol: symbol, interval: interval}
now := time.Now().Truncate(time.Minute)
for _, sub := range s.klineSubscriptions {
subTime := now.Truncate(sub.interval.Duration())
if now != subTime {
// not in the checking time slot, check next subscription
continue
}
klines := getLastKLine(s.exchange, ctx, sub.symbol, sub.interval)
if len(klines) >= 0 {
// handle mutiple klines, get the latest one
s.EmitKLineClosed(klines[len(klines)-1])
}
}
}
}
}