improve orderbook validation error

This commit is contained in:
c9s 2021-01-25 13:53:11 +08:00
parent 7310700540
commit 1aefbbfddc
4 changed files with 36 additions and 8 deletions

View File

@ -62,8 +62,8 @@ var rootCmd = &cobra.Command{
bestBid, hasBid := book.BestBid() bestBid, hasBid := book.BestBid()
bestAsk, hasAsk := book.BestAsk() bestAsk, hasAsk := book.BestAsk()
if !book.IsValid() { if valid, err := book.IsValid(); !valid {
log.Warnf("order book is invalid") log.Errorf("order book is invalid, error: %v", err)
return return
} }

View File

@ -44,6 +44,16 @@ func (f *DepthFrame) loadDepthSnapshot() {
f.mu.Lock() f.mu.Lock()
if len(depth.Asks) == 0 {
log.Errorf("depth response error: empty asks")
return
}
if len(depth.Bids) == 0 {
log.Errorf("depth response error: empty bids")
return
}
// filter the events by the event IDs // filter the events by the event IDs
var events []DepthEvent var events []DepthEvent
for _, e := range f.BufEvents { for _, e := range f.BufEvents {

View File

@ -7,6 +7,7 @@ import (
"os" "os"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/adshao/go-binance/v2" "github.com/adshao/go-binance/v2"
@ -52,6 +53,7 @@ type Stream struct {
Client *binance.Client Client *binance.Client
ListenKey string ListenKey string
Conn *websocket.Conn Conn *websocket.Conn
connLock sync.Mutex
publicOnly bool publicOnly bool
@ -92,8 +94,8 @@ func NewStream(client *binance.Client) *Stream {
return return
} }
if !snapshot.IsValid() { if valid, err := snapshot.IsValid(); !valid {
log.Warnf("depth snapshot is invalid, event: %+v", e) log.Warnf("depth snapshot is invalid, event: %+v, error: %v", e, err)
} }
stream.EmitBookSnapshot(snapshot) stream.EmitBookSnapshot(snapshot)
@ -174,6 +176,7 @@ func NewStream(client *binance.Client) *Stream {
// reset the previous frames // reset the previous frames
for _, f := range stream.depthFrames { for _, f := range stream.depthFrames {
f.Reset() f.Reset()
f.loadDepthSnapshot()
} }
var params []string var params []string
@ -273,7 +276,10 @@ func (s *Stream) connect(ctx context.Context) error {
} }
log.Infof("websocket connected") log.Infof("websocket connected")
s.connLock.Lock()
s.Conn = conn s.Conn = conn
s.connLock.Unlock()
s.EmitConnect() s.EmitConnect()
return nil return nil
@ -320,10 +326,13 @@ func (s *Stream) read(ctx context.Context) {
return return
case <-pingTicker.C: case <-pingTicker.C:
s.connLock.Lock()
if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(3*time.Second)); err != nil { if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(3*time.Second)); err != nil {
log.WithError(err).Error("ping error", err) log.WithError(err).Error("ping error", err)
} }
s.connLock.Unlock()
case <-keepAliveTicker.C: case <-keepAliveTicker.C:
if !s.publicOnly { if !s.publicOnly {
if err := s.keepaliveListenKey(ctx, s.ListenKey); err != nil { if err := s.keepaliveListenKey(ctx, s.ListenKey); err != nil {
@ -453,6 +462,9 @@ func (s *Stream) Close() error {
log.Infof("user data stream closed") log.Infof("user data stream closed")
} }
s.connLock.Lock()
defer s.connLock.Unlock()
return s.Conn.Close() return s.Conn.Close()
} }

View File

@ -5,6 +5,8 @@ import (
"sort" "sort"
"sync" "sync"
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/sigchan" "github.com/c9s/bbgo/pkg/sigchan"
) )
@ -139,15 +141,19 @@ func (b *OrderBook) BestAsk() (PriceVolume, bool) {
return b.Asks[0], true return b.Asks[0], true
} }
func (b *OrderBook) IsValid() bool { func (b *OrderBook) IsValid() (bool, error) {
bid, hasBid := b.BestBid() bid, hasBid := b.BestBid()
ask, hasAsk := b.BestAsk() ask, hasAsk := b.BestAsk()
if !hasBid || !hasAsk { if !hasBid {
return false return false, errors.New("empty bids")
} }
return bid.Price < ask.Price if !hasAsk {
return false, errors.New("empty asks")
}
return bid.Price < ask.Price, fmt.Errorf("bid price %f > ask price %f", bid.Price.Float64(), ask.Price.Float64())
} }
func (b *OrderBook) PriceVolumesBySide(side SideType) PriceVolumeSlice { func (b *OrderBook) PriceVolumesBySide(side SideType) PriceVolumeSlice {