fix binance depth stream buffering

This commit is contained in:
c9s 2021-05-25 19:13:10 +08:00
parent bf684c0a5e
commit d3f06bc9d7
4 changed files with 282 additions and 186 deletions

View File

@ -71,7 +71,7 @@ var rootCmd = &cobra.Command{
if valid, err := book.IsValid(); !valid { if valid, err := book.IsValid(); !valid {
log.Errorf("order book is invalid, error: %v", err) log.Errorf("order book is invalid, error: %v", err)
return continue
} }
bestBid, hasBid := book.BestBid() bestBid, hasBid := book.BestBid()

View File

@ -3,6 +3,7 @@ package binance
import ( import (
"fmt" "fmt"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/adshao/go-binance/v2" "github.com/adshao/go-binance/v2"
@ -283,3 +284,19 @@ func ConvertTrades(remoteTrades []*binance.TradeV3) (trades []types.Trade, err e
return trades, err return trades, err
} }
func convertSubscription(s types.Subscription) string {
// binance uses lower case symbol name,
// for kline, it's "<symbol>@kline_<interval>"
// for depth, it's "<symbol>@depth OR <symbol>@depth@100ms"
switch s.Channel {
case types.KLineChannel:
return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String())
case types.BookChannel:
return fmt.Sprintf("%s@depth", strings.ToLower(s.Symbol))
}
return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel)
}

View File

@ -2,7 +2,6 @@ package binance
import ( import (
"context" "context"
"math/rand"
"sync" "sync"
"time" "time"
@ -11,28 +10,51 @@ import (
//go:generate callbackgen -type DepthFrame //go:generate callbackgen -type DepthFrame
type DepthFrame struct { type DepthFrame struct {
Symbol string
client *binance.Client client *binance.Client
context context.Context context context.Context
mu sync.Mutex snapshotMutex sync.Mutex
once sync.Once snapshotDepth *DepthEvent
SnapshotDepth *DepthEvent
Symbol string bufMutex sync.Mutex
BufEvents []DepthEvent bufEvents []DepthEvent
once sync.Once
readyCallbacks []func(snapshotDepth DepthEvent, bufEvents []DepthEvent) readyCallbacks []func(snapshotDepth DepthEvent, bufEvents []DepthEvent)
pushCallbacks []func(e DepthEvent) pushCallbacks []func(e DepthEvent)
} }
func (f *DepthFrame) reset() { func (f *DepthFrame) reset() {
f.mu.Lock() if debugBinanceDepth {
f.SnapshotDepth = nil log.Infof("resetting %s depth frame", f.Symbol)
f.BufEvents = nil }
f.mu.Unlock()
f.bufMutex.Lock()
f.bufEvents = nil
f.bufMutex.Unlock()
f.snapshotMutex.Lock()
f.snapshotDepth = nil
f.once = sync.Once{}
f.snapshotMutex.Unlock()
}
func (f *DepthFrame) bufferEvent(e DepthEvent) {
if debugBinanceDepth {
log.Infof("buffering %s depth event FirstUpdateID = %d, FinalUpdateID = %d", f.Symbol, e.FirstUpdateID, e.FinalUpdateID)
}
f.bufMutex.Lock()
f.bufEvents = append(f.bufEvents, e)
f.bufMutex.Unlock()
} }
func (f *DepthFrame) loadDepthSnapshot() { func (f *DepthFrame) loadDepthSnapshot() {
f.mu.Lock() log.Infof("buffering %s depth events for 3 seconds...", f.Symbol)
time.Sleep(3 * time.Second)
if debugBinanceDepth { if debugBinanceDepth {
log.Infof("loading %s depth from the restful api", f.Symbol) log.Infof("loading %s depth from the restful api", f.Symbol)
@ -41,29 +63,52 @@ func (f *DepthFrame) loadDepthSnapshot() {
depth, err := f.fetch(f.context) depth, err := f.fetch(f.context)
if err != nil { if err != nil {
log.WithError(err).Errorf("depth api error") log.WithError(err).Errorf("depth api error")
f.mu.Unlock()
return return
} }
if len(depth.Asks) == 0 { if len(depth.Asks) == 0 {
log.Errorf("depth response error: empty asks") log.Errorf("depth response error: empty asks")
f.mu.Unlock()
return return
} }
if len(depth.Bids) == 0 { if len(depth.Bids) == 0 {
log.Errorf("depth response error: empty bids") log.Errorf("depth response error: empty bids")
f.mu.Unlock()
return return
} }
if debugBinanceDepth {
log.Infof("loaded %s depth, last update ID = %d", f.Symbol, depth.FinalUpdateID)
}
f.snapshotMutex.Lock()
f.snapshotDepth = depth
f.snapshotMutex.Unlock()
// filter the events by the event IDs // filter the events by the event IDs
f.bufMutex.Lock()
bufEvents := f.bufEvents
f.bufEvents = nil
f.bufMutex.Unlock()
var events []DepthEvent var events []DepthEvent
for _, e := range f.BufEvents { for _, e := range bufEvents {
if e.FirstUpdateID <= depth.FinalUpdateID || e.FinalUpdateID <= depth.FinalUpdateID { if e.FinalUpdateID < depth.FinalUpdateID {
if debugBinanceDepth {
log.Infof("DROP %s depth event (final update id is %d older than the last update), updateID %d ~ %d (len %d)",
f.Symbol,
depth.FinalUpdateID-e.FinalUpdateID,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
}
continue continue
} }
if debugBinanceDepth {
log.Infof("KEEP %s depth event, updateID %d ~ %d (len %d)",
f.Symbol,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
}
events = append(events, e) events = append(events, e)
} }
@ -72,85 +117,56 @@ func (f *DepthFrame) loadDepthSnapshot() {
// if the head event is newer than the depth we got, // if the head event is newer than the depth we got,
// then there are something missed, we need to restart the process. // then there are something missed, we need to restart the process.
if len(events) > 0 { if len(events) > 0 {
// The first processed event should have U (final update ID) <= lastUpdateId+1 AND (first update id) >= lastUpdateId+1.
firstEvent := events[0] firstEvent := events[0]
if firstEvent.FirstUpdateID > depth.FinalUpdateID+1 {
// valid
nextID := depth.FinalUpdateID + 1
if firstEvent.FirstUpdateID > nextID || firstEvent.FinalUpdateID < nextID {
log.Warn("miss matched final update id for order book, resetting depth...") log.Warn("miss matched final update id for order book, resetting depth...")
f.SnapshotDepth = nil f.reset()
f.BufEvents = nil
f.mu.Unlock()
return return
} }
if debugBinanceDepth {
log.Infof("VALID first %s depth event, updateID %d ~ %d (len %d)",
f.Symbol,
firstEvent.FirstUpdateID, firstEvent.FinalUpdateID, firstEvent.FinalUpdateID-firstEvent.FirstUpdateID)
}
} }
f.SnapshotDepth = depth if debugBinanceDepth {
f.BufEvents = nil log.Infof("READY %s depth, %d bufferred events", f.Symbol, len(events))
f.mu.Unlock() }
f.EmitReady(*depth, events) f.EmitReady(*depth, events)
} }
func (f *DepthFrame) PushEvent(e DepthEvent) { func (f *DepthFrame) PushEvent(e DepthEvent) {
f.mu.Lock() f.snapshotMutex.Lock()
snapshot := f.snapshotDepth
f.snapshotMutex.Unlock()
// before the snapshot is loaded, we need to buffer the events until we loaded the snapshot. // before the snapshot is loaded, we need to buffer the events until we loaded the snapshot.
if f.SnapshotDepth == nil { if snapshot == nil {
// buffer the events until we loaded the snapshot // buffer the events until we loaded the snapshot
f.BufEvents = append(f.BufEvents, e) f.bufferEvent(e)
f.mu.Unlock()
f.loadDepthSnapshot()
// start a worker to update the snapshot periodically.
go f.once.Do(func() { go f.once.Do(func() {
if debugBinanceDepth { f.loadDepthSnapshot()
log.Infof("starting depth snapshot updater for %s market", f.Symbol)
}
ticker := time.NewTicker(30*time.Minute + time.Duration(rand.Intn(10))*time.Millisecond)
defer ticker.Stop()
for {
select {
case <-f.context.Done():
return
case <-ticker.C:
f.loadDepthSnapshot()
}
}
}) })
} else { return
// if we have the snapshot, we could use that final update ID filter the events
// too old: drop any update ID < the final update ID
if e.FinalUpdateID < f.SnapshotDepth.FinalUpdateID {
if debugBinanceDepth {
log.Warnf("event final update id %d < depth final update id %d, skip", e.FinalUpdateID, f.SnapshotDepth.FinalUpdateID)
}
f.mu.Unlock()
return
}
// too new: if the first update ID > final update ID + 1, it means something is missing, we need to reload.
if e.FirstUpdateID > f.SnapshotDepth.FinalUpdateID+1 {
if debugBinanceDepth {
log.Warnf("event first update id %d > final update id + 1 (%d), resetting snapshot", e.FirstUpdateID, f.SnapshotDepth.FirstUpdateID+1)
}
f.SnapshotDepth = nil
// save the new event for later
f.BufEvents = append(f.BufEvents, e)
f.mu.Unlock()
return
}
// update the final update ID, so that we can check the next event
f.SnapshotDepth.FinalUpdateID = e.FinalUpdateID
f.mu.Unlock()
f.EmitPush(e)
} }
// drop old events
if e.FinalUpdateID <= snapshot.FinalUpdateID {
return
}
f.snapshotMutex.Lock()
f.snapshotDepth.FinalUpdateID = e.FinalUpdateID
f.snapshotMutex.Unlock()
f.EmitPush(e)
} }
// fetch fetches the depth and convert to the depth event so that we can reuse the event structure to convert it to the global orderbook type // fetch fetches the depth and convert to the depth event so that we can reuse the event structure to convert it to the global orderbook type

View File

@ -2,8 +2,8 @@ package binance
import ( import (
"context" "context"
"fmt"
"math/rand" "math/rand"
"net"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@ -42,10 +42,14 @@ type Stream struct {
types.StandardStream types.StandardStream
Client *binance.Client Client *binance.Client
ListenKey string ListenKey string
Conn *websocket.Conn Conn *websocket.Conn
connLock sync.Mutex connLock sync.Mutex
reconnectC chan struct{}
connCtx context.Context
connCancel context.CancelFunc
publicOnly bool publicOnly bool
@ -66,9 +70,14 @@ func NewStream(client *binance.Client) *Stream {
stream := &Stream{ stream := &Stream{
Client: client, Client: client,
depthFrames: make(map[string]*DepthFrame), depthFrames: make(map[string]*DepthFrame),
reconnectC: make(chan struct{}, 1),
} }
stream.OnDepthEvent(func(e *DepthEvent) { stream.OnDepthEvent(func(e *DepthEvent) {
if debugBinanceDepth {
log.Infof("received %s depth event updateID %d~%d (len %d)", e.Symbol, e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
}
f, ok := stream.depthFrames[e.Symbol] f, ok := stream.depthFrames[e.Symbol]
if !ok { if !ok {
f = &DepthFrame{ f = &DepthFrame{
@ -175,13 +184,14 @@ func NewStream(client *binance.Client) *Stream {
} }
}) })
stream.OnConnect(func() { stream.OnDisconnect(func() {
// reset the previous frames log.Infof("resetting depth snapshot...")
for _, f := range stream.depthFrames { for _, f := range stream.depthFrames {
f.reset() f.reset()
f.loadDepthSnapshot()
} }
})
stream.OnConnect(func() {
var params []string var params []string
for _, subscription := range stream.Subscriptions { for _, subscription := range stream.Subscriptions {
params = append(params, convertSubscription(subscription)) params = append(params, convertSubscription(subscription))
@ -261,49 +271,11 @@ func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error
return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx) return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx)
} }
func (s *Stream) connect(ctx context.Context) error { func (s *Stream) emitReconnect() {
if s.publicOnly { select {
log.Infof("stream is set to public only mode") case s.reconnectC <- struct{}{}:
} else { default:
log.Infof("request listen key for creating user data stream...")
listenKey, err := s.fetchListenKey(ctx)
if err != nil {
return err
}
s.ListenKey = listenKey
log.Infof("user data stream created. listenKey: %s", maskListenKey(s.ListenKey))
} }
conn, err := s.dial(s.ListenKey)
if err != nil {
return err
}
log.Infof("websocket connected")
s.connLock.Lock()
s.Conn = conn
s.connLock.Unlock()
s.EmitConnect()
return nil
}
func convertSubscription(s types.Subscription) string {
// binance uses lower case symbol name,
// for kline, it's "<symbol>@kline_<interval>"
// for depth, it's "<symbol>@depth OR <symbol>@depth@100ms"
switch s.Channel {
case types.KLineChannel:
return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String())
case types.BookChannel:
return fmt.Sprintf("%s@depth", strings.ToLower(s.Symbol))
}
return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel)
} }
func (s *Stream) Connect(ctx context.Context) error { func (s *Stream) Connect(ctx context.Context) error {
@ -312,46 +284,140 @@ func (s *Stream) Connect(ctx context.Context) error {
return err return err
} }
go s.read(ctx) // start one re-connector goroutine with the base context
go s.reconnector(ctx)
s.EmitStart() s.EmitStart()
return nil return nil
} }
func (s *Stream) read(ctx context.Context) { func (s *Stream) reconnector(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
pingTicker := time.NewTicker(10 * time.Second) case <-s.reconnectC:
// ensure the previous context is cancelled
if s.connCancel != nil {
s.connCancel()
}
log.Warnf("received reconnect signal, reconnecting...")
time.Sleep(3 * time.Second)
if err := s.connect(ctx); err != nil {
log.WithError(err).Errorf("connect error, try to reconnect again...")
s.emitReconnect()
}
}
}
}
func (s *Stream) connect(ctx context.Context) error {
// should only start one connection one time, so we lock the mutex
s.connLock.Lock()
// create a new context
s.connCtx, s.connCancel = context.WithCancel(ctx)
if s.publicOnly {
log.Infof("stream is set to public only mode")
} else {
log.Infof("request listen key for creating user data stream...")
listenKey, err := s.fetchListenKey(ctx)
if err != nil {
s.connCancel()
s.connLock.Unlock()
return err
}
s.ListenKey = listenKey
log.Infof("user data stream created. listenKey: %s", maskListenKey(s.ListenKey))
go s.listenKeyKeepAlive(s.connCtx, listenKey)
}
// when in public mode, the listen key is an empty string
conn, err := s.dial(s.ListenKey)
if err != nil {
s.connCancel()
s.connLock.Unlock()
return err
}
log.Infof("websocket connected")
s.Conn = conn
s.connLock.Unlock()
s.EmitConnect()
go s.read(s.connCtx)
go s.ping(s.connCtx)
return nil
}
func (s *Stream) ping(ctx context.Context) {
pingTicker := time.NewTicker(15 * time.Second)
defer pingTicker.Stop() defer pingTicker.Stop()
for {
select {
case <-ctx.Done():
log.Info("ping worker stopped")
return
case <-pingTicker.C:
s.connLock.Lock()
if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(3*time.Second)); err != nil {
log.WithError(err).Error("ping error", err)
s.emitReconnect()
}
s.connLock.Unlock()
}
}
}
func (s *Stream) listenKeyKeepAlive(ctx context.Context, listenKey string) {
keepAliveTicker := time.NewTicker(5 * time.Minute) keepAliveTicker := time.NewTicker(5 * time.Minute)
defer keepAliveTicker.Stop() defer keepAliveTicker.Stop()
go func() { // if we exit, we should invalidate the existing listen key
for { defer func() {
select { log.Info("keepalive worker stopped")
if err := s.invalidateListenKey(ctx, listenKey); err != nil {
case <-ctx.Done(): log.WithError(err).Error("invalidate listen key error")
return
case <-pingTicker.C:
s.connLock.Lock()
if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(3*time.Second)); err != nil {
log.WithError(err).Error("ping error", err)
}
s.connLock.Unlock()
case <-keepAliveTicker.C:
if !s.publicOnly {
if err := s.keepaliveListenKey(ctx, s.ListenKey); err != nil {
log.WithError(err).Errorf("listen key keep-alive error: %v key: %s", err, maskListenKey(s.ListenKey))
}
}
}
} }
}() }()
for {
select {
case <-ctx.Done():
return
case <-keepAliveTicker.C:
if err := s.keepaliveListenKey(ctx, listenKey); err != nil {
log.WithError(err).Errorf("listen key keep-alive error: %v key: %s", err, maskListenKey(listenKey))
s.emitReconnect()
return
}
}
}
}
func (s *Stream) read(ctx context.Context) {
defer func() {
if s.connCancel != nil {
s.connCancel()
}
s.EmitDisconnect()
}()
for { for {
select { select {
@ -359,39 +425,39 @@ func (s *Stream) read(ctx context.Context) {
return return
default: default:
if err := s.Conn.SetReadDeadline(time.Now().Add(3 * time.Minute)); err != nil { s.connLock.Lock()
if err := s.Conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error()) log.WithError(err).Errorf("set read deadline error: %s", err.Error())
} }
mt, message, err := s.Conn.ReadMessage() mt, message, err := s.Conn.ReadMessage()
s.connLock.Unlock()
if err != nil { if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway) { // if it's a network timeout error, we should re-connect
log.WithError(err).Errorf("read error: %s", err.Error()) switch err := err.(type) {
} else {
log.Info("websocket connection closed, going away")
}
s.EmitDisconnect() // if it's a websocket related error
case *websocket.CloseError:
// reconnect if err.Code == websocket.CloseNormalClosure {
for err != nil {
select {
case <-ctx.Done():
return return
default:
if !s.publicOnly {
if err := s.invalidateListenKey(ctx, s.ListenKey); err != nil {
log.WithError(err).Error("invalidate listen key error")
}
}
err = s.connect(ctx)
time.Sleep(5 * time.Second)
} }
}
continue // for unexpected close error, we should re-connect
// emit reconnect to start a new connection
s.emitReconnect()
return
case net.Error:
log.WithError(err).Error("network error")
s.emitReconnect()
return
default:
log.WithError(err).Error("unexpected connection error")
s.emitReconnect()
return
}
} }
// skip non-text messages // skip non-text messages
@ -465,17 +531,14 @@ func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err
func (s *Stream) Close() error { func (s *Stream) Close() error {
log.Infof("closing user data stream...") log.Infof("closing user data stream...")
if !s.publicOnly { if s.connCancel != nil {
if err := s.invalidateListenKey(context.Background(), s.ListenKey); err != nil { s.connCancel()
log.WithError(err).Error("invalidate listen key error")
}
log.Infof("user data stream closed")
} }
s.connLock.Lock() s.connLock.Lock()
defer s.connLock.Unlock() err := s.Conn.Close()
s.connLock.Unlock()
return s.Conn.Close() return err
} }
func maskListenKey(listenKey string) string { func maskListenKey(listenKey string) string {