From 3b9a191c95532db8efb8c6246b9970751830e815 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 30 Dec 2021 14:02:36 +0800 Subject: [PATCH] binance: refactor binance stream handlers --- pkg/exchange/binance/stream.go | 148 +++++++++++++++++---------------- 1 file changed, 75 insertions(+), 73 deletions(-) diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index 0cb0d19ff..2a033b7f6 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -141,75 +141,11 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie } }) - stream.OnOutboundAccountPositionEvent(func(e *OutboundAccountPositionEvent) { - snapshot := types.BalanceMap{} - for _, balance := range e.Balances { - snapshot[balance.Asset] = types.Balance{ - Currency: balance.Asset, - Available: balance.Free, - Locked: balance.Locked, - } - } - stream.EmitBalanceSnapshot(snapshot) - }) - - stream.OnKLineEvent(func(e *KLineEvent) { - kline := e.KLine.KLine() - if e.KLine.Closed { - stream.EmitKLineClosedEvent(e) - stream.EmitKLineClosed(kline) - } else { - stream.EmitKLine(kline) - } - }) - - stream.OnBookTickerEvent(func(e *BookTickerEvent) { - stream.EmitBookTickerUpdate(e.BookTicker()) - }) - - stream.OnExecutionReportEvent(func(e *ExecutionReportEvent) { - switch e.CurrentExecutionType { - - case "NEW", "CANCELED", "REJECTED", "EXPIRED", "REPLACED": - order, err := e.Order() - if err != nil { - log.WithError(err).Error("order convert error") - return - } - - stream.EmitOrderUpdate(*order) - - case "TRADE": - trade, err := e.Trade() - if err != nil { - log.WithError(err).Error("trade convert error") - return - } - - stream.EmitTradeUpdate(*trade) - - order, err := e.Order() - if err != nil { - log.WithError(err).Error("order convert error") - return - } - - // Update Order with FILLED event - if order.Status == types.OrderStatusFilled { - stream.EmitOrderUpdate(*order) - } - } - }) - - stream.OnContinuousKLineEvent(func(e *ContinuousKLineEvent) { - kline := e.KLine.KLine() - if e.KLine.Closed { - stream.EmitContinuousKLineClosedEvent(e) - stream.EmitKLineClosed(kline) - } else { - stream.EmitKLine(kline) - } - }) + stream.OnOutboundAccountPositionEvent(stream.handleOutboundAccountPositionEvent) + stream.OnKLineEvent(stream.handleKLineEvent) + stream.OnBookTickerEvent(stream.handleBookTickerEvent) + stream.OnExecutionReportEvent(stream.handleExecutionReportEvent) + stream.OnContinuousKLineEvent(stream.handleContinuousKLineEvent) stream.OnOrderTradeUpdateEvent(func(e *OrderTradeUpdateEvent) { switch e.OrderTrade.CurrentExecutionType { @@ -281,6 +217,76 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie return stream } +func (s *Stream) handleContinuousKLineEvent(e *ContinuousKLineEvent) { + kline := e.KLine.KLine() + if e.KLine.Closed { + s.EmitContinuousKLineClosedEvent(e) + s.EmitKLineClosed(kline) + } else { + s.EmitKLine(kline) + } +} + +func (s *Stream) handleExecutionReportEvent(e *ExecutionReportEvent) { + switch e.CurrentExecutionType { + + case "NEW", "CANCELED", "REJECTED", "EXPIRED", "REPLACED": + order, err := e.Order() + if err != nil { + log.WithError(err).Error("order convert error") + return + } + + s.EmitOrderUpdate(*order) + + case "TRADE": + trade, err := e.Trade() + if err != nil { + log.WithError(err).Error("trade convert error") + return + } + + s.EmitTradeUpdate(*trade) + + order, err := e.Order() + if err != nil { + log.WithError(err).Error("order convert error") + return + } + + // Update Order with FILLED event + if order.Status == types.OrderStatusFilled { + s.EmitOrderUpdate(*order) + } + } +} + +func (s *Stream) handleBookTickerEvent(e *BookTickerEvent) { + s.EmitBookTickerUpdate(e.BookTicker()) +} + +func (s *Stream) handleKLineEvent(e *KLineEvent) { + kline := e.KLine.KLine() + if e.KLine.Closed { + s.EmitKLineClosedEvent(e) + s.EmitKLineClosed(kline) + } else { + s.EmitKLine(kline) + } +} + +func (s *Stream) handleOutboundAccountPositionEvent(e *OutboundAccountPositionEvent) { + snapshot := types.BalanceMap{} + for _, balance := range e.Balances { + snapshot[balance.Asset] = types.Balance{ + Currency: balance.Asset, + Available: balance.Free, + Locked: balance.Locked, + } + } + s.EmitBalanceSnapshot(snapshot) +} + func (s *Stream) dial(listenKey string) (*websocket.Conn, error) { var url string if s.PublicOnly { @@ -319,7 +325,6 @@ func (s *Stream) dial(listenKey string) (*websocket.Conn, error) { return conn, nil } - func (s *Stream) Connect(ctx context.Context) error { err := s.connect(ctx) if err != nil { @@ -573,7 +578,6 @@ func (s *Stream) read(ctx context.Context) { } } - func (s *Stream) Close() error { log.Infof("closing stream...") @@ -587,7 +591,6 @@ func (s *Stream) Close() error { return err } - func (s *Stream) fetchListenKey(ctx context.Context) (string, error) { if s.IsMargin { if s.IsIsolatedMargin { @@ -628,7 +631,6 @@ func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx) } - func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err error) { // should use background context to invalidate the user stream log.Debugf("closing listen key: %s", util.MaskKey(listenKey))