diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index 63780e185..24f12576b 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -346,6 +346,12 @@ func (s *Stream) connect(ctx context.Context) error { log.Infof("websocket connected") + conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + conn.SetPongHandler(func(string) error { + conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + return nil + }) + s.Conn = conn s.ConnLock.Unlock() @@ -357,7 +363,7 @@ func (s *Stream) connect(ctx context.Context) error { } func (s *Stream) ping(ctx context.Context) { - pingTicker := time.NewTicker(15 * time.Second) + pingTicker := time.NewTicker(5 * time.Second) defer pingTicker.Stop() for { @@ -368,12 +374,10 @@ func (s *Stream) ping(ctx context.Context) { return case <-pingTicker.C: - s.ConnLock.Lock() - if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(3*time.Second)); err != nil { + if err := s.Conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(3*time.Second)); err != nil { log.WithError(err).Error("ping error", err) s.Reconnect() } - s.ConnLock.Unlock() } } } @@ -385,7 +389,7 @@ func (s *Stream) listenKeyKeepAlive(ctx context.Context, listenKey string) { // if we exit, we should invalidate the existing listen key defer func() { log.Info("keepalive worker stopped") - if err := s.invalidateListenKey(ctx, listenKey); err != nil { + if err := s.invalidateListenKey(context.Background(), listenKey); err != nil { log.WithError(err).Error("invalidate listen key error") } }() @@ -422,13 +426,11 @@ func (s *Stream) read(ctx context.Context) { return default: - s.ConnLock.Lock() if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil { log.WithError(err).Errorf("set read deadline error: %s", err.Error()) } mt, message, err := s.Conn.ReadMessage() - s.ConnLock.Unlock() if err != nil { // if it's a network timeout error, we should re-connect @@ -466,23 +468,19 @@ func (s *Stream) read(ctx context.Context) { e, err := ParseEvent(string(message)) if err != nil { - log.WithError(err).Errorf("[binance] event parse error") + log.WithError(err).Errorf("websocket event parse error") continue } - // log.NotifyTo("[binance] event: %+v", e) switch e := e.(type) { case *OutboundAccountPositionEvent: - log.Info(e.Event, " ", e.Balances) s.EmitOutboundAccountPositionEvent(e) case *OutboundAccountInfoEvent: - log.Info(e.Event, " ", e.Balances) s.EmitOutboundAccountInfoEvent(e) case *BalanceUpdateEvent: - log.Info(e.Event, " ", e.Asset, " ", e.Delta) s.EmitBalanceUpdateEvent(e) case *KLineEvent: @@ -492,7 +490,6 @@ func (s *Stream) read(ctx context.Context) { s.EmitDepthEvent(e) case *ExecutionReportEvent: - log.Info(e.Event, " ", e) s.EmitExecutionReportEvent(e) } } diff --git a/pkg/exchange/max/maxapi/websocket.go b/pkg/exchange/max/maxapi/websocket.go index 7bdee08b7..6af6400b6 100644 --- a/pkg/exchange/max/maxapi/websocket.go +++ b/pkg/exchange/max/maxapi/websocket.go @@ -119,6 +119,12 @@ func (s *WebSocketService) connect(ctx context.Context) error { return err } + conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + conn.SetPongHandler(func(string) error { + conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + return nil + }) + s.mu.Lock() s.conn = conn s.mu.Unlock() diff --git a/pkg/exchange/okex/exchange.go b/pkg/exchange/okex/exchange.go index 607b0b344..cfa49056a 100644 --- a/pkg/exchange/okex/exchange.go +++ b/pkg/exchange/okex/exchange.go @@ -240,6 +240,10 @@ func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders [ } func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) error { + if len(orders) == 0 { + return nil + } + var reqs []*okexapi.CancelOrderRequest for _, order := range orders { if len(order.Symbol) == 0 { @@ -249,7 +253,6 @@ func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) erro req := e.client.TradeService.NewCancelOrderRequest() req.InstrumentID(toLocalSymbol(order.Symbol)) req.OrderID(strconv.FormatUint(order.OrderID, 10)) - if len(order.ClientOrderID) > 0 { req.ClientOrderID(order.ClientOrderID) } diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index 7180c2b54..a510a02a5 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -264,6 +264,11 @@ func (s *Stream) connect(ctx context.Context) error { } log.Infof("websocket connected") + conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + conn.SetPongHandler(func(string) error { + conn.SetReadDeadline(time.Now().Add(15 * time.Second)) + return nil + }) s.Conn = conn s.connLock.Unlock()