diff --git a/examples/binance-book/main.go b/examples/binance-book/main.go index 9823bc454..c2ba4d6e7 100644 --- a/examples/binance-book/main.go +++ b/examples/binance-book/main.go @@ -62,8 +62,8 @@ var rootCmd = &cobra.Command{ bestBid, hasBid := book.BestBid() bestAsk, hasAsk := book.BestAsk() - if !book.IsValid() { - log.Warnf("order book is invalid") + if valid, err := book.IsValid(); !valid { + log.Errorf("order book is invalid, error: %v", err) return } diff --git a/pkg/exchange/binance/depthframe.go b/pkg/exchange/binance/depthframe.go index ca3ea046e..0c0934ec5 100644 --- a/pkg/exchange/binance/depthframe.go +++ b/pkg/exchange/binance/depthframe.go @@ -44,6 +44,16 @@ func (f *DepthFrame) loadDepthSnapshot() { f.mu.Lock() + if len(depth.Asks) == 0 { + log.Errorf("depth response error: empty asks") + return + } + + if len(depth.Bids) == 0 { + log.Errorf("depth response error: empty bids") + return + } + // filter the events by the event IDs var events []DepthEvent for _, e := range f.BufEvents { diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index f56becaeb..4704da4b2 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -7,6 +7,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "github.com/adshao/go-binance/v2" @@ -52,6 +53,7 @@ type Stream struct { Client *binance.Client ListenKey string Conn *websocket.Conn + connLock sync.Mutex publicOnly bool @@ -92,8 +94,8 @@ func NewStream(client *binance.Client) *Stream { return } - if !snapshot.IsValid() { - log.Warnf("depth snapshot is invalid, event: %+v", e) + if valid, err := snapshot.IsValid(); !valid { + log.Warnf("depth snapshot is invalid, event: %+v, error: %v", e, err) } stream.EmitBookSnapshot(snapshot) @@ -174,6 +176,7 @@ func NewStream(client *binance.Client) *Stream { // reset the previous frames for _, f := range stream.depthFrames { f.Reset() + f.loadDepthSnapshot() } var params []string @@ -273,7 +276,10 @@ func (s *Stream) connect(ctx context.Context) error { } log.Infof("websocket connected") + + s.connLock.Lock() s.Conn = conn + s.connLock.Unlock() s.EmitConnect() return nil @@ -320,10 +326,13 @@ func (s *Stream) read(ctx context.Context) { return case <-pingTicker.C: + s.connLock.Lock() if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(3*time.Second)); err != nil { log.WithError(err).Error("ping error", err) } + s.connLock.Unlock() + case <-keepAliveTicker.C: if !s.publicOnly { if err := s.keepaliveListenKey(ctx, s.ListenKey); err != nil { @@ -453,6 +462,9 @@ func (s *Stream) Close() error { log.Infof("user data stream closed") } + s.connLock.Lock() + defer s.connLock.Unlock() + return s.Conn.Close() } diff --git a/pkg/types/orderbook.go b/pkg/types/orderbook.go index 66c1c05b9..441f4694d 100644 --- a/pkg/types/orderbook.go +++ b/pkg/types/orderbook.go @@ -5,6 +5,8 @@ import ( "sort" "sync" + "github.com/pkg/errors" + "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/sigchan" ) @@ -139,15 +141,19 @@ func (b *OrderBook) BestAsk() (PriceVolume, bool) { return b.Asks[0], true } -func (b *OrderBook) IsValid() bool { +func (b *OrderBook) IsValid() (bool, error) { bid, hasBid := b.BestBid() ask, hasAsk := b.BestAsk() - if !hasBid || !hasAsk { - return false + if !hasBid { + return false, errors.New("empty bids") } - return bid.Price < ask.Price + if !hasAsk { + return false, errors.New("empty asks") + } + + return bid.Price < ask.Price, fmt.Errorf("bid price %f > ask price %f", bid.Price.Float64(), ask.Price.Float64()) } func (b *OrderBook) PriceVolumesBySide(side SideType) PriceVolumeSlice {