binance: pull out dispatchEvent

This commit is contained in:
c9s 2021-12-30 16:30:02 +08:00
parent bae7df806f
commit ff87fb007e

View File

@ -388,7 +388,6 @@ func (s *Stream) connect(ctx context.Context) error {
if s.PublicOnly {
log.Infof("stream is set to public only mode")
} else {
listenKey, err = s.fetchListenKey(ctx)
if err != nil {
return err
@ -454,51 +453,6 @@ func (s *Stream) ping(ctx context.Context, conn *websocket.Conn, interval time.D
}
}
// From Binance
// Keepalive a user data stream to prevent a time out. User data streams will close after 60 minutes.
// It's recommended to send a ping about every 30 minutes.
func (s *Stream) listenKeyKeepAlive(ctx context.Context, listenKey string) {
keepAliveTicker := time.NewTicker(listenKeyKeepAliveInterval)
defer keepAliveTicker.Stop()
// if we exit, we should invalidate the existing listen key
defer func() {
log.Debugf("keepalive worker stopped")
if err := s.invalidateListenKey(context.Background(), listenKey); err != nil {
log.WithError(err).Errorf("invalidate listen key error: %v key: %s", err, util.MaskKey(listenKey))
}
}()
for {
select {
case <-ctx.Done():
return
case <-keepAliveTicker.C:
for i := 0; i < 5; i++ {
err := s.keepaliveListenKey(ctx, listenKey)
if err == nil {
break
} else {
switch err.(type) {
case net.Error:
log.WithError(err).Errorf("listen key keep-alive network error: %v key: %s", err, util.MaskKey(listenKey))
time.Sleep(5 * time.Second)
continue
default:
log.WithError(err).Errorf("listen key keep-alive unexpected error: %v key: %s", err, util.MaskKey(listenKey))
s.Reconnect()
return
}
}
}
}
}
}
func (s *Stream) read(ctx context.Context, conn *websocket.Conn) {
defer func() {
@ -570,42 +524,47 @@ func (s *Stream) read(ctx context.Context, conn *websocket.Conn) {
continue
}
switch e := e.(type) {
case *OutboundAccountPositionEvent:
s.EmitOutboundAccountPositionEvent(e)
case *OutboundAccountInfoEvent:
s.EmitOutboundAccountInfoEvent(e)
case *BalanceUpdateEvent:
s.EmitBalanceUpdateEvent(e)
case *KLineEvent:
s.EmitKLineEvent(e)
case *BookTickerEvent:
s.EmitBookTickerEvent(e)
case *DepthEvent:
s.EmitDepthEvent(e)
case *ExecutionReportEvent:
s.EmitExecutionReportEvent(e)
case *MarkPriceUpdateEvent:
s.EmitMarkPriceUpdateEvent(e)
case *ContinuousKLineEvent:
s.EmitContinuousKLineEvent(e)
case *OrderTradeUpdateEvent:
s.EmitOrderTradeUpdateEvent(e)
}
s.dispatchEvent(e)
}
}
}
func (s *Stream) dispatchEvent(e interface{}) {
switch e := e.(type) {
case *OutboundAccountPositionEvent:
s.EmitOutboundAccountPositionEvent(e)
case *OutboundAccountInfoEvent:
s.EmitOutboundAccountInfoEvent(e)
case *BalanceUpdateEvent:
s.EmitBalanceUpdateEvent(e)
case *KLineEvent:
s.EmitKLineEvent(e)
case *BookTickerEvent:
s.EmitBookTickerEvent(e)
case *DepthEvent:
s.EmitDepthEvent(e)
case *ExecutionReportEvent:
s.EmitExecutionReportEvent(e)
case *MarkPriceUpdateEvent:
s.EmitMarkPriceUpdateEvent(e)
case *ContinuousKLineEvent:
s.EmitContinuousKLineEvent(e)
case *OrderTradeUpdateEvent:
s.EmitOrderTradeUpdateEvent(e)
}
}
func (s *Stream) Close() error {
log.Infof("closing stream...")
@ -615,11 +574,12 @@ func (s *Stream) Close() error {
// get the connection object before call the context cancel function
s.ConnLock.Lock()
conn := s.Conn
connCancel := s.connCancel
s.ConnLock.Unlock()
// cancel the context so that the ticker loop and listen key updater will be stopped.
if s.connCancel != nil {
s.connCancel()
if connCancel != nil {
connCancel()
}
// gracefully write the close message to the connection
@ -628,7 +588,7 @@ func (s *Stream) Close() error {
return errors.Wrap(err, "websocket write close message error")
}
err = s.Conn.Close()
err = conn.Close()
return errors.Wrap(err, "websocket connection close error")
}
@ -700,3 +660,53 @@ func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err
return nil
}
// listenKeyKeepAlive
// From Binance
// Keepalive a user data stream to prevent a time out. User data streams will close after 60 minutes.
// It's recommended to send a ping about every 30 minutes.
func (s *Stream) listenKeyKeepAlive(ctx context.Context, listenKey string) {
keepAliveTicker := time.NewTicker(listenKeyKeepAliveInterval)
defer keepAliveTicker.Stop()
// if we exit, we should invalidate the existing listen key
defer func() {
log.Debugf("keepalive worker stopped")
if err := s.invalidateListenKey(context.Background(), listenKey); err != nil {
log.WithError(err).Errorf("invalidate listen key error: %v key: %s", err, util.MaskKey(listenKey))
}
}()
for {
select {
case <-s.CloseC:
return
case <-ctx.Done():
return
case <-keepAliveTicker.C:
for i := 0; i < 5; i++ {
err := s.keepaliveListenKey(ctx, listenKey)
if err == nil {
break
} else {
switch err.(type) {
case net.Error:
log.WithError(err).Errorf("listen key keep-alive network error: %v key: %s", err, util.MaskKey(listenKey))
time.Sleep(5 * time.Second)
continue
default:
log.WithError(err).Errorf("listen key keep-alive unexpected error: %v key: %s", err, util.MaskKey(listenKey))
s.Reconnect()
return
}
}
}
}
}
}