Merge pull request #255 from c9s/fix/binance-depth-stream

fix: binance depth stream buffering
This commit is contained in:
Yo-An Lin 2021-05-25 21:45:15 +08:00 committed by GitHub
commit 7188f021e9
7 changed files with 370 additions and 223 deletions

View File

@ -48,16 +48,8 @@ var rootCmd = &cobra.Command{
stream.SetPublicOnly()
stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{})
stream.OnBookSnapshot(func(book types.SliceOrderBook) {
// log.Infof("book snapshot: %+v", book)
})
stream.OnBookUpdate(func(book types.SliceOrderBook) {
// log.Infof("book update: %+v", book)
})
streambook := types.NewStreamBook(symbol)
streambook.BindStream(stream)
streamBook := types.NewStreamBook(symbol)
streamBook.BindStream(stream)
go func() {
for {
@ -66,12 +58,12 @@ var rootCmd = &cobra.Command{
case <-ctx.Done():
return
case <-streambook.C:
book := streambook.Copy()
case <-streamBook.C:
book := streamBook.Copy()
if valid, err := book.IsValid(); !valid {
log.Errorf("order book is invalid, error: %v", err)
return
continue
}
bestBid, hasBid := book.BestBid()

View File

@ -3,6 +3,7 @@ package binance
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/adshao/go-binance/v2"
@ -283,3 +284,19 @@ func ConvertTrades(remoteTrades []*binance.TradeV3) (trades []types.Trade, err e
return trades, err
}
func convertSubscription(s types.Subscription) string {
// binance uses lower case symbol name,
// for kline, it's "<symbol>@kline_<interval>"
// for depth, it's "<symbol>@depth OR <symbol>@depth@100ms"
switch s.Channel {
case types.KLineChannel:
return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String())
case types.BookChannel:
return fmt.Sprintf("%s@depth", strings.ToLower(s.Symbol))
}
return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel)
}

View File

@ -2,7 +2,6 @@ package binance
import (
"context"
"math/rand"
"sync"
"time"
@ -11,28 +10,51 @@ import (
//go:generate callbackgen -type DepthFrame
type DepthFrame struct {
Symbol string
client *binance.Client
context context.Context
mu sync.Mutex
snapshotMutex sync.Mutex
snapshotDepth *DepthEvent
bufMutex sync.Mutex
bufEvents []DepthEvent
once sync.Once
SnapshotDepth *DepthEvent
Symbol string
BufEvents []DepthEvent
readyCallbacks []func(snapshotDepth DepthEvent, bufEvents []DepthEvent)
pushCallbacks []func(e DepthEvent)
}
func (f *DepthFrame) reset() {
f.mu.Lock()
f.SnapshotDepth = nil
f.BufEvents = nil
f.mu.Unlock()
if debugBinanceDepth {
log.Infof("resetting %s depth frame", f.Symbol)
}
f.bufMutex.Lock()
f.bufEvents = nil
f.bufMutex.Unlock()
f.snapshotMutex.Lock()
f.snapshotDepth = nil
f.once = sync.Once{}
f.snapshotMutex.Unlock()
}
func (f *DepthFrame) bufferEvent(e DepthEvent) {
if debugBinanceDepth {
log.Infof("buffering %s depth event FirstUpdateID = %d, FinalUpdateID = %d", f.Symbol, e.FirstUpdateID, e.FinalUpdateID)
}
f.bufMutex.Lock()
f.bufEvents = append(f.bufEvents, e)
f.bufMutex.Unlock()
}
func (f *DepthFrame) loadDepthSnapshot() {
f.mu.Lock()
log.Infof("buffering %s depth events for 3 seconds...", f.Symbol)
time.Sleep(3 * time.Second)
if debugBinanceDepth {
log.Infof("loading %s depth from the restful api", f.Symbol)
@ -41,29 +63,52 @@ func (f *DepthFrame) loadDepthSnapshot() {
depth, err := f.fetch(f.context)
if err != nil {
log.WithError(err).Errorf("depth api error")
f.mu.Unlock()
return
}
if len(depth.Asks) == 0 {
log.Errorf("depth response error: empty asks")
f.mu.Unlock()
return
}
if len(depth.Bids) == 0 {
log.Errorf("depth response error: empty bids")
f.mu.Unlock()
return
}
if debugBinanceDepth {
log.Infof("loaded %s depth, last update ID = %d", f.Symbol, depth.FinalUpdateID)
}
f.snapshotMutex.Lock()
f.snapshotDepth = depth
f.snapshotMutex.Unlock()
// filter the events by the event IDs
f.bufMutex.Lock()
bufEvents := f.bufEvents
f.bufEvents = nil
f.bufMutex.Unlock()
var events []DepthEvent
for _, e := range f.BufEvents {
if e.FirstUpdateID <= depth.FinalUpdateID || e.FinalUpdateID <= depth.FinalUpdateID {
for _, e := range bufEvents {
if e.FinalUpdateID < depth.FinalUpdateID {
if debugBinanceDepth {
log.Infof("DROP %s depth event (final update id is %d older than the last update), updateID %d ~ %d (len %d)",
f.Symbol,
depth.FinalUpdateID-e.FinalUpdateID,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
}
continue
}
if debugBinanceDepth {
log.Infof("KEEP %s depth event, updateID %d ~ %d (len %d)",
f.Symbol,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
}
events = append(events, e)
}
@ -72,85 +117,68 @@ func (f *DepthFrame) loadDepthSnapshot() {
// if the head event is newer than the depth we got,
// then there are something missed, we need to restart the process.
if len(events) > 0 {
// The first processed event should have U (final update ID) <= lastUpdateId+1 AND (first update id) >= lastUpdateId+1.
firstEvent := events[0]
if firstEvent.FirstUpdateID > depth.FinalUpdateID+1 {
// valid
nextID := depth.FinalUpdateID + 1
if firstEvent.FirstUpdateID > nextID || firstEvent.FinalUpdateID < nextID {
log.Warn("miss matched final update id for order book, resetting depth...")
f.SnapshotDepth = nil
f.BufEvents = nil
f.mu.Unlock()
f.reset()
return
}
if debugBinanceDepth {
log.Infof("VALID first %s depth event, updateID %d ~ %d (len %d)",
f.Symbol,
firstEvent.FirstUpdateID, firstEvent.FinalUpdateID, firstEvent.FinalUpdateID-firstEvent.FirstUpdateID)
}
}
f.SnapshotDepth = depth
f.BufEvents = nil
f.mu.Unlock()
if debugBinanceDepth {
log.Infof("READY %s depth, %d bufferred events", f.Symbol, len(events))
}
f.EmitReady(*depth, events)
}
func (f *DepthFrame) PushEvent(e DepthEvent) {
f.mu.Lock()
f.snapshotMutex.Lock()
snapshot := f.snapshotDepth
f.snapshotMutex.Unlock()
// before the snapshot is loaded, we need to buffer the events until we loaded the snapshot.
if f.SnapshotDepth == nil {
if snapshot == nil {
// buffer the events until we loaded the snapshot
f.BufEvents = append(f.BufEvents, e)
f.mu.Unlock()
f.bufferEvent(e)
f.loadDepthSnapshot()
// start a worker to update the snapshot periodically.
go f.once.Do(func() {
if debugBinanceDepth {
log.Infof("starting depth snapshot updater for %s market", f.Symbol)
}
ticker := time.NewTicker(30*time.Minute + time.Duration(rand.Intn(10))*time.Millisecond)
defer ticker.Stop()
for {
select {
case <-f.context.Done():
return
case <-ticker.C:
f.loadDepthSnapshot()
}
}
})
} else {
// if we have the snapshot, we could use that final update ID filter the events
// too old: drop any update ID < the final update ID
if e.FinalUpdateID < f.SnapshotDepth.FinalUpdateID {
if debugBinanceDepth {
log.Warnf("event final update id %d < depth final update id %d, skip", e.FinalUpdateID, f.SnapshotDepth.FinalUpdateID)
}
f.mu.Unlock()
return
}
// too new: if the first update ID > final update ID + 1, it means something is missing, we need to reload.
if e.FirstUpdateID > f.SnapshotDepth.FinalUpdateID+1 {
if debugBinanceDepth {
log.Warnf("event first update id %d > final update id + 1 (%d), resetting snapshot", e.FirstUpdateID, f.SnapshotDepth.FirstUpdateID+1)
}
f.SnapshotDepth = nil
// save the new event for later
f.BufEvents = append(f.BufEvents, e)
f.mu.Unlock()
// drop old events
if e.FinalUpdateID <= snapshot.FinalUpdateID {
log.Infof("DROP %s depth update event, updateID %d ~ %d (len %d)",
f.Symbol,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
return
}
// update the final update ID, so that we can check the next event
f.SnapshotDepth.FinalUpdateID = e.FinalUpdateID
f.mu.Unlock()
if e.FirstUpdateID > snapshot.FinalUpdateID+1 {
log.Infof("MISSING %s depth update event, resetting, updateID %d ~ %d (len %d)",
f.Symbol,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
f.reset()
return
}
f.snapshotMutex.Lock()
f.snapshotDepth.FinalUpdateID = e.FinalUpdateID
f.snapshotMutex.Unlock()
f.EmitPush(e)
}
}
// fetch fetches the depth and convert to the depth event so that we can reuse the event structure to convert it to the global orderbook type
@ -165,6 +193,7 @@ func (f *DepthFrame) fetch(ctx context.Context) (*DepthEvent, error) {
}
event := DepthEvent{
Symbol: f.Symbol,
FirstUpdateID: 0,
FinalUpdateID: response.LastUpdateID,
}

View File

@ -319,17 +319,40 @@ type DepthEvent struct {
Asks []DepthEntry
}
func (e *DepthEvent) String() (o string) {
o += fmt.Sprintf("Depth %s bid/ask = ", e.Symbol)
if len(e.Bids) == 0 {
o += "empty"
} else {
o += e.Bids[0].PriceLevel
}
o += "/"
if len(e.Asks) == 0 {
o += "empty"
} else {
o += e.Asks[0].PriceLevel
}
o += fmt.Sprintf(" %d ~ %d", e.FirstUpdateID, e.FinalUpdateID)
return o
}
func (e *DepthEvent) OrderBook() (book types.SliceOrderBook, err error) {
book.Symbol = e.Symbol
for _, entry := range e.Bids {
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
log.WithError(err).Errorf("depth quantity parse error: %s", entry.Quantity)
continue
}
price, err := fixedpoint.NewFromString(entry.PriceLevel)
if err != nil {
log.WithError(err).Errorf("depth price parse error: %s", entry.PriceLevel)
continue
}
@ -344,11 +367,13 @@ func (e *DepthEvent) OrderBook() (book types.SliceOrderBook, err error) {
for _, entry := range e.Asks {
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
log.WithError(err).Errorf("depth quantity parse error: %s", entry.Quantity)
continue
}
price, err := fixedpoint.NewFromString(entry.PriceLevel)
if err != nil {
log.WithError(err).Errorf("depth price parse error: %s", entry.PriceLevel)
continue
}
@ -360,7 +385,7 @@ func (e *DepthEvent) OrderBook() (book types.SliceOrderBook, err error) {
book.Asks = book.Asks.Upsert(pv, false)
}
return
return book, nil
}
func parseDepthEntry(val *fastjson.Value) (*DepthEntry, error) {

View File

@ -2,8 +2,8 @@ package binance
import (
"context"
"fmt"
"math/rand"
"net"
"os"
"strconv"
"strings"
@ -46,6 +46,10 @@ type Stream struct {
ListenKey string
Conn *websocket.Conn
connLock sync.Mutex
reconnectC chan struct{}
connCtx context.Context
connCancel context.CancelFunc
publicOnly bool
@ -66,9 +70,14 @@ func NewStream(client *binance.Client) *Stream {
stream := &Stream{
Client: client,
depthFrames: make(map[string]*DepthFrame),
reconnectC: make(chan struct{}, 1),
}
stream.OnDepthEvent(func(e *DepthEvent) {
if debugBinanceDepth {
log.Infof("received %s depth event updateID %d ~ %d (len %d)", e.Symbol, e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
}
f, ok := stream.depthFrames[e.Symbol]
if !ok {
f = &DepthFrame{
@ -79,27 +88,29 @@ func NewStream(client *binance.Client) *Stream {
stream.depthFrames[e.Symbol] = f
f.OnReady(func(e DepthEvent, bufEvents []DepthEvent) {
snapshot, err := e.OrderBook()
f.OnReady(func(snapshotDepth DepthEvent, bufEvents []DepthEvent) {
log.Infof("depth snapshot: %s", snapshotDepth.String())
snapshot, err := snapshotDepth.OrderBook()
if err != nil {
log.WithError(err).Error("book snapshot convert error")
return
}
if valid, err := snapshot.IsValid(); !valid {
log.Warnf("depth snapshot is invalid, event: %+v, error: %v", e, err)
log.Errorf("depth snapshot is invalid, event: %+v, error: %v", snapshotDepth, err)
}
stream.EmitBookSnapshot(snapshot)
for _, e := range bufEvents {
book, err := e.OrderBook()
bookUpdate, err := e.OrderBook()
if err != nil {
log.WithError(err).Error("book convert error")
return
}
stream.EmitBookUpdate(book)
stream.EmitBookUpdate(bookUpdate)
}
})
@ -175,13 +186,14 @@ func NewStream(client *binance.Client) *Stream {
}
})
stream.OnConnect(func() {
// reset the previous frames
stream.OnDisconnect(func() {
log.Infof("resetting depth snapshots...")
for _, f := range stream.depthFrames {
f.reset()
f.loadDepthSnapshot()
}
})
stream.OnConnect(func() {
var params []string
for _, subscription := range stream.Subscriptions {
params = append(params, convertSubscription(subscription))
@ -261,49 +273,11 @@ func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error
return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx)
}
func (s *Stream) connect(ctx context.Context) error {
if s.publicOnly {
log.Infof("stream is set to public only mode")
} else {
log.Infof("request listen key for creating user data stream...")
listenKey, err := s.fetchListenKey(ctx)
if err != nil {
return err
func (s *Stream) emitReconnect() {
select {
case s.reconnectC <- struct{}{}:
default:
}
s.ListenKey = listenKey
log.Infof("user data stream created. listenKey: %s", maskListenKey(s.ListenKey))
}
conn, err := s.dial(s.ListenKey)
if err != nil {
return err
}
log.Infof("websocket connected")
s.connLock.Lock()
s.Conn = conn
s.connLock.Unlock()
s.EmitConnect()
return nil
}
func convertSubscription(s types.Subscription) string {
// binance uses lower case symbol name,
// for kline, it's "<symbol>@kline_<interval>"
// for depth, it's "<symbol>@depth OR <symbol>@depth@100ms"
switch s.Channel {
case types.KLineChannel:
return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String())
case types.BookChannel:
return fmt.Sprintf("%s@depth", strings.ToLower(s.Symbol))
}
return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel)
}
func (s *Stream) Connect(ctx context.Context) error {
@ -312,43 +286,112 @@ func (s *Stream) Connect(ctx context.Context) error {
return err
}
go s.read(ctx)
// start one re-connector goroutine with the base context
go s.reconnector(ctx)
s.EmitStart()
return nil
}
func (s *Stream) read(ctx context.Context) {
func (s *Stream) reconnector(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
pingTicker := time.NewTicker(10 * time.Second)
case <-s.reconnectC:
// ensure the previous context is cancelled
if s.connCancel != nil {
s.connCancel()
}
log.Warnf("received reconnect signal, reconnecting...")
time.Sleep(3 * time.Second)
if err := s.connect(ctx); err != nil {
log.WithError(err).Errorf("connect error, try to reconnect again...")
s.emitReconnect()
}
}
}
}
func (s *Stream) connect(ctx context.Context) error {
// should only start one connection one time, so we lock the mutex
s.connLock.Lock()
// create a new context
s.connCtx, s.connCancel = context.WithCancel(ctx)
if s.publicOnly {
log.Infof("stream is set to public only mode")
} else {
log.Infof("request listen key for creating user data stream...")
listenKey, err := s.fetchListenKey(ctx)
if err != nil {
s.connCancel()
s.connLock.Unlock()
return err
}
s.ListenKey = listenKey
log.Infof("user data stream created. listenKey: %s", maskListenKey(s.ListenKey))
go s.listenKeyKeepAlive(s.connCtx, listenKey)
}
// when in public mode, the listen key is an empty string
conn, err := s.dial(s.ListenKey)
if err != nil {
s.connCancel()
s.connLock.Unlock()
return err
}
log.Infof("websocket connected")
s.Conn = conn
s.connLock.Unlock()
s.EmitConnect()
go s.read(s.connCtx)
go s.ping(s.connCtx)
return nil
}
func (s *Stream) ping(ctx context.Context) {
pingTicker := time.NewTicker(15 * time.Second)
defer pingTicker.Stop()
keepAliveTicker := time.NewTicker(5 * time.Minute)
defer keepAliveTicker.Stop()
go func() {
for {
select {
case <-ctx.Done():
log.Info("ping worker stopped")
return
case <-pingTicker.C:
s.connLock.Lock()
if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(3*time.Second)); err != nil {
log.WithError(err).Error("ping error", err)
s.emitReconnect()
}
s.connLock.Unlock()
}
}
}
case <-keepAliveTicker.C:
if !s.publicOnly {
if err := s.keepaliveListenKey(ctx, s.ListenKey); err != nil {
log.WithError(err).Errorf("listen key keep-alive error: %v key: %s", err, maskListenKey(s.ListenKey))
}
}
func (s *Stream) listenKeyKeepAlive(ctx context.Context, listenKey string) {
keepAliveTicker := time.NewTicker(5 * time.Minute)
defer keepAliveTicker.Stop()
}
// if we exit, we should invalidate the existing listen key
defer func() {
log.Info("keepalive worker stopped")
if err := s.invalidateListenKey(ctx, listenKey); err != nil {
log.WithError(err).Error("invalidate listen key error")
}
}()
@ -358,40 +401,65 @@ func (s *Stream) read(ctx context.Context) {
case <-ctx.Done():
return
default:
if err := s.Conn.SetReadDeadline(time.Now().Add(3 * time.Minute)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error())
case <-keepAliveTicker.C:
if err := s.keepaliveListenKey(ctx, listenKey); err != nil {
log.WithError(err).Errorf("listen key keep-alive error: %v key: %s", err, maskListenKey(listenKey))
s.emitReconnect()
return
}
mt, message, err := s.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) {
log.WithError(err).Errorf("read error: %s", err.Error())
} else {
log.Info("websocket connection closed, going away")
}
}
}
func (s *Stream) read(ctx context.Context) {
defer func() {
if s.connCancel != nil {
s.connCancel()
}
s.EmitDisconnect()
}()
// reconnect
for err != nil {
for {
select {
case <-ctx.Done():
return
default:
if !s.publicOnly {
if err := s.invalidateListenKey(ctx, s.ListenKey); err != nil {
log.WithError(err).Error("invalidate listen key error")
}
s.connLock.Lock()
if err := s.Conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error())
}
err = s.connect(ctx)
time.Sleep(5 * time.Second)
}
mt, message, err := s.Conn.ReadMessage()
s.connLock.Unlock()
if err != nil {
// if it's a network timeout error, we should re-connect
switch err := err.(type) {
// if it's a websocket related error
case *websocket.CloseError:
if err.Code == websocket.CloseNormalClosure {
return
}
continue
// for unexpected close error, we should re-connect
// emit reconnect to start a new connection
s.emitReconnect()
return
case net.Error:
log.WithError(err).Error("network error")
s.emitReconnect()
return
default:
log.WithError(err).Error("unexpected connection error")
s.emitReconnect()
return
}
}
// skip non-text messages
@ -465,17 +533,14 @@ func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err
func (s *Stream) Close() error {
log.Infof("closing user data stream...")
if !s.publicOnly {
if err := s.invalidateListenKey(context.Background(), s.ListenKey); err != nil {
log.WithError(err).Error("invalidate listen key error")
}
log.Infof("user data stream closed")
if s.connCancel != nil {
s.connCancel()
}
s.connLock.Lock()
defer s.connLock.Unlock()
return s.Conn.Close()
err := s.Conn.Close()
s.connLock.Unlock()
return err
}
func maskListenKey(listenKey string) string {

View File

@ -52,6 +52,27 @@ func NewMutexOrderBook(symbol string) *MutexOrderBook {
}
}
func (b *MutexOrderBook) IsValid() (ok bool, err error) {
b.Lock()
ok, err = b.OrderBook.IsValid()
b.Unlock()
return ok, err
}
func (b *MutexOrderBook) BestBid() (pv PriceVolume, ok bool) {
b.Lock()
pv, ok = b.OrderBook.BestBid()
b.Unlock()
return pv, ok
}
func (b *MutexOrderBook) BestAsk() (pv PriceVolume, ok bool) {
b.Lock()
pv, ok = b.OrderBook.BestAsk()
b.Unlock()
return pv, ok
}
func (b *MutexOrderBook) Load(book SliceOrderBook) {
b.Lock()
b.OrderBook.Load(book)
@ -66,14 +87,16 @@ func (b *MutexOrderBook) Reset() {
func (b *MutexOrderBook) CopyDepth(depth int) OrderBook {
b.Lock()
defer b.Unlock()
return b.OrderBook.CopyDepth(depth)
book := b.OrderBook.CopyDepth(depth)
b.Unlock()
return book
}
func (b *MutexOrderBook) Copy() OrderBook {
b.Lock()
defer b.Unlock()
return b.OrderBook.Copy()
book := b.OrderBook.Copy()
b.Unlock()
return book
}
func (b *MutexOrderBook) Update(update SliceOrderBook) {

View File

@ -93,10 +93,10 @@ func (b *SliceOrderBook) PriceVolumesBySide(side SideType) PriceVolumeSlice {
switch side {
case SideTypeBuy:
return b.Bids
return b.Bids.Copy()
case SideTypeSell:
return b.Asks
return b.Asks.Copy()
}
return nil
@ -122,11 +122,6 @@ func (b *SliceOrderBook) updateBids(pvs PriceVolumeSlice) {
}
}
func (b *SliceOrderBook) load(book SliceOrderBook) {
b.Reset()
b.update(book)
}
func (b *SliceOrderBook) update(book SliceOrderBook) {
b.updateBids(book.Bids)
b.updateAsks(book.Asks)
@ -138,7 +133,8 @@ func (b *SliceOrderBook) Reset() {
}
func (b *SliceOrderBook) Load(book SliceOrderBook) {
b.load(book)
b.Reset()
b.update(book)
b.EmitLoad(b)
}