fix websocket ping/pong issue

This commit is contained in:
c9s 2021-05-28 23:34:21 +08:00
parent 002b28f75a
commit f49490f986
4 changed files with 25 additions and 14 deletions

View File

@ -346,6 +346,12 @@ func (s *Stream) connect(ctx context.Context) error {
log.Infof("websocket connected") log.Infof("websocket connected")
conn.SetReadDeadline(time.Now().Add(15 * time.Second))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(15 * time.Second))
return nil
})
s.Conn = conn s.Conn = conn
s.ConnLock.Unlock() s.ConnLock.Unlock()
@ -357,7 +363,7 @@ func (s *Stream) connect(ctx context.Context) error {
} }
func (s *Stream) ping(ctx context.Context) { func (s *Stream) ping(ctx context.Context) {
pingTicker := time.NewTicker(15 * time.Second) pingTicker := time.NewTicker(5 * time.Second)
defer pingTicker.Stop() defer pingTicker.Stop()
for { for {
@ -368,12 +374,10 @@ func (s *Stream) ping(ctx context.Context) {
return return
case <-pingTicker.C: case <-pingTicker.C:
s.ConnLock.Lock() if err := s.Conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(3*time.Second)); err != nil {
if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(3*time.Second)); err != nil {
log.WithError(err).Error("ping error", err) log.WithError(err).Error("ping error", err)
s.Reconnect() s.Reconnect()
} }
s.ConnLock.Unlock()
} }
} }
} }
@ -385,7 +389,7 @@ func (s *Stream) listenKeyKeepAlive(ctx context.Context, listenKey string) {
// if we exit, we should invalidate the existing listen key // if we exit, we should invalidate the existing listen key
defer func() { defer func() {
log.Info("keepalive worker stopped") log.Info("keepalive worker stopped")
if err := s.invalidateListenKey(ctx, listenKey); err != nil { if err := s.invalidateListenKey(context.Background(), listenKey); err != nil {
log.WithError(err).Error("invalidate listen key error") log.WithError(err).Error("invalidate listen key error")
} }
}() }()
@ -422,13 +426,11 @@ func (s *Stream) read(ctx context.Context) {
return return
default: default:
s.ConnLock.Lock()
if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil { if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error()) log.WithError(err).Errorf("set read deadline error: %s", err.Error())
} }
mt, message, err := s.Conn.ReadMessage() mt, message, err := s.Conn.ReadMessage()
s.ConnLock.Unlock()
if err != nil { if err != nil {
// if it's a network timeout error, we should re-connect // if it's a network timeout error, we should re-connect
@ -466,23 +468,19 @@ func (s *Stream) read(ctx context.Context) {
e, err := ParseEvent(string(message)) e, err := ParseEvent(string(message))
if err != nil { if err != nil {
log.WithError(err).Errorf("[binance] event parse error") log.WithError(err).Errorf("websocket event parse error")
continue continue
} }
// log.NotifyTo("[binance] event: %+v", e)
switch e := e.(type) { switch e := e.(type) {
case *OutboundAccountPositionEvent: case *OutboundAccountPositionEvent:
log.Info(e.Event, " ", e.Balances)
s.EmitOutboundAccountPositionEvent(e) s.EmitOutboundAccountPositionEvent(e)
case *OutboundAccountInfoEvent: case *OutboundAccountInfoEvent:
log.Info(e.Event, " ", e.Balances)
s.EmitOutboundAccountInfoEvent(e) s.EmitOutboundAccountInfoEvent(e)
case *BalanceUpdateEvent: case *BalanceUpdateEvent:
log.Info(e.Event, " ", e.Asset, " ", e.Delta)
s.EmitBalanceUpdateEvent(e) s.EmitBalanceUpdateEvent(e)
case *KLineEvent: case *KLineEvent:
@ -492,7 +490,6 @@ func (s *Stream) read(ctx context.Context) {
s.EmitDepthEvent(e) s.EmitDepthEvent(e)
case *ExecutionReportEvent: case *ExecutionReportEvent:
log.Info(e.Event, " ", e)
s.EmitExecutionReportEvent(e) s.EmitExecutionReportEvent(e)
} }
} }

View File

@ -119,6 +119,12 @@ func (s *WebSocketService) connect(ctx context.Context) error {
return err return err
} }
conn.SetReadDeadline(time.Now().Add(15 * time.Second))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(15 * time.Second))
return nil
})
s.mu.Lock() s.mu.Lock()
s.conn = conn s.conn = conn
s.mu.Unlock() s.mu.Unlock()

View File

@ -240,6 +240,10 @@ func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders [
} }
func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) error { func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) error {
if len(orders) == 0 {
return nil
}
var reqs []*okexapi.CancelOrderRequest var reqs []*okexapi.CancelOrderRequest
for _, order := range orders { for _, order := range orders {
if len(order.Symbol) == 0 { if len(order.Symbol) == 0 {
@ -249,7 +253,6 @@ func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) erro
req := e.client.TradeService.NewCancelOrderRequest() req := e.client.TradeService.NewCancelOrderRequest()
req.InstrumentID(toLocalSymbol(order.Symbol)) req.InstrumentID(toLocalSymbol(order.Symbol))
req.OrderID(strconv.FormatUint(order.OrderID, 10)) req.OrderID(strconv.FormatUint(order.OrderID, 10))
if len(order.ClientOrderID) > 0 { if len(order.ClientOrderID) > 0 {
req.ClientOrderID(order.ClientOrderID) req.ClientOrderID(order.ClientOrderID)
} }

View File

@ -264,6 +264,11 @@ func (s *Stream) connect(ctx context.Context) error {
} }
log.Infof("websocket connected") log.Infof("websocket connected")
conn.SetReadDeadline(time.Now().Add(15 * time.Second))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(15 * time.Second))
return nil
})
s.Conn = conn s.Conn = conn
s.connLock.Unlock() s.connLock.Unlock()