binance: refactor binance stream handlers

This commit is contained in:
c9s 2021-12-30 14:02:36 +08:00
parent 1ff0a45f30
commit 3b9a191c95

View File

@ -141,75 +141,11 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
} }
}) })
stream.OnOutboundAccountPositionEvent(func(e *OutboundAccountPositionEvent) { stream.OnOutboundAccountPositionEvent(stream.handleOutboundAccountPositionEvent)
snapshot := types.BalanceMap{} stream.OnKLineEvent(stream.handleKLineEvent)
for _, balance := range e.Balances { stream.OnBookTickerEvent(stream.handleBookTickerEvent)
snapshot[balance.Asset] = types.Balance{ stream.OnExecutionReportEvent(stream.handleExecutionReportEvent)
Currency: balance.Asset, stream.OnContinuousKLineEvent(stream.handleContinuousKLineEvent)
Available: balance.Free,
Locked: balance.Locked,
}
}
stream.EmitBalanceSnapshot(snapshot)
})
stream.OnKLineEvent(func(e *KLineEvent) {
kline := e.KLine.KLine()
if e.KLine.Closed {
stream.EmitKLineClosedEvent(e)
stream.EmitKLineClosed(kline)
} else {
stream.EmitKLine(kline)
}
})
stream.OnBookTickerEvent(func(e *BookTickerEvent) {
stream.EmitBookTickerUpdate(e.BookTicker())
})
stream.OnExecutionReportEvent(func(e *ExecutionReportEvent) {
switch e.CurrentExecutionType {
case "NEW", "CANCELED", "REJECTED", "EXPIRED", "REPLACED":
order, err := e.Order()
if err != nil {
log.WithError(err).Error("order convert error")
return
}
stream.EmitOrderUpdate(*order)
case "TRADE":
trade, err := e.Trade()
if err != nil {
log.WithError(err).Error("trade convert error")
return
}
stream.EmitTradeUpdate(*trade)
order, err := e.Order()
if err != nil {
log.WithError(err).Error("order convert error")
return
}
// Update Order with FILLED event
if order.Status == types.OrderStatusFilled {
stream.EmitOrderUpdate(*order)
}
}
})
stream.OnContinuousKLineEvent(func(e *ContinuousKLineEvent) {
kline := e.KLine.KLine()
if e.KLine.Closed {
stream.EmitContinuousKLineClosedEvent(e)
stream.EmitKLineClosed(kline)
} else {
stream.EmitKLine(kline)
}
})
stream.OnOrderTradeUpdateEvent(func(e *OrderTradeUpdateEvent) { stream.OnOrderTradeUpdateEvent(func(e *OrderTradeUpdateEvent) {
switch e.OrderTrade.CurrentExecutionType { switch e.OrderTrade.CurrentExecutionType {
@ -281,6 +217,76 @@ func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Clie
return stream return stream
} }
func (s *Stream) handleContinuousKLineEvent(e *ContinuousKLineEvent) {
kline := e.KLine.KLine()
if e.KLine.Closed {
s.EmitContinuousKLineClosedEvent(e)
s.EmitKLineClosed(kline)
} else {
s.EmitKLine(kline)
}
}
func (s *Stream) handleExecutionReportEvent(e *ExecutionReportEvent) {
switch e.CurrentExecutionType {
case "NEW", "CANCELED", "REJECTED", "EXPIRED", "REPLACED":
order, err := e.Order()
if err != nil {
log.WithError(err).Error("order convert error")
return
}
s.EmitOrderUpdate(*order)
case "TRADE":
trade, err := e.Trade()
if err != nil {
log.WithError(err).Error("trade convert error")
return
}
s.EmitTradeUpdate(*trade)
order, err := e.Order()
if err != nil {
log.WithError(err).Error("order convert error")
return
}
// Update Order with FILLED event
if order.Status == types.OrderStatusFilled {
s.EmitOrderUpdate(*order)
}
}
}
func (s *Stream) handleBookTickerEvent(e *BookTickerEvent) {
s.EmitBookTickerUpdate(e.BookTicker())
}
func (s *Stream) handleKLineEvent(e *KLineEvent) {
kline := e.KLine.KLine()
if e.KLine.Closed {
s.EmitKLineClosedEvent(e)
s.EmitKLineClosed(kline)
} else {
s.EmitKLine(kline)
}
}
func (s *Stream) handleOutboundAccountPositionEvent(e *OutboundAccountPositionEvent) {
snapshot := types.BalanceMap{}
for _, balance := range e.Balances {
snapshot[balance.Asset] = types.Balance{
Currency: balance.Asset,
Available: balance.Free,
Locked: balance.Locked,
}
}
s.EmitBalanceSnapshot(snapshot)
}
func (s *Stream) dial(listenKey string) (*websocket.Conn, error) { func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
var url string var url string
if s.PublicOnly { if s.PublicOnly {
@ -319,7 +325,6 @@ func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
return conn, nil return conn, nil
} }
func (s *Stream) Connect(ctx context.Context) error { func (s *Stream) Connect(ctx context.Context) error {
err := s.connect(ctx) err := s.connect(ctx)
if err != nil { if err != nil {
@ -573,7 +578,6 @@ func (s *Stream) read(ctx context.Context) {
} }
} }
func (s *Stream) Close() error { func (s *Stream) Close() error {
log.Infof("closing stream...") log.Infof("closing stream...")
@ -587,7 +591,6 @@ func (s *Stream) Close() error {
return err return err
} }
func (s *Stream) fetchListenKey(ctx context.Context) (string, error) { func (s *Stream) fetchListenKey(ctx context.Context) (string, error) {
if s.IsMargin { if s.IsMargin {
if s.IsIsolatedMargin { if s.IsIsolatedMargin {
@ -628,7 +631,6 @@ func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error
return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx) return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx)
} }
func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err error) { func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err error) {
// should use background context to invalidate the user stream // should use background context to invalidate the user stream
log.Debugf("closing listen key: %s", util.MaskKey(listenKey)) log.Debugf("closing listen key: %s", util.MaskKey(listenKey))