bbgo_origin/pkg/exchange/binance/depthframe.go
2021-05-26 00:57:35 +00:00

224 lines
5.4 KiB
Go

package binance
import (
"context"
"fmt"
"sync"
"time"
"github.com/adshao/go-binance/v2"
)
//go:generate callbackgen -type DepthFrame
type DepthFrame struct {
Symbol string
client *binance.Client
context context.Context
snapshotMutex sync.Mutex
snapshotDepth *DepthEvent
bufMutex sync.Mutex
bufEvents []DepthEvent
resetC chan struct{}
once sync.Once
readyCallbacks []func(snapshotDepth DepthEvent, bufEvents []DepthEvent)
pushCallbacks []func(e DepthEvent)
}
func (f *DepthFrame) reset() {
if debugBinanceDepth {
log.Infof("resetting %s depth frame", f.Symbol)
}
f.bufMutex.Lock()
f.bufEvents = nil
f.bufMutex.Unlock()
f.snapshotMutex.Lock()
f.snapshotDepth = nil
f.once = sync.Once{}
f.snapshotMutex.Unlock()
}
func (f *DepthFrame) emitReset() {
select {
case f.resetC <- struct{}{}:
default:
}
}
func (f *DepthFrame) bufferEvent(e DepthEvent) {
if debugBinanceDepth {
log.Infof("buffering %s depth event FirstUpdateID = %d, FinalUpdateID = %d", f.Symbol, e.FirstUpdateID, e.FinalUpdateID)
}
f.bufMutex.Lock()
f.bufEvents = append(f.bufEvents, e)
f.bufMutex.Unlock()
}
func (f *DepthFrame) loadDepthSnapshot() error {
if debugBinanceDepth {
log.Infof("buffering %s depth events...", f.Symbol)
}
time.Sleep(3 * time.Second)
depth, err := f.fetch(f.context)
if err != nil {
return err
}
if len(depth.Asks) == 0 {
return fmt.Errorf("%s depth response error: empty asks", f.Symbol)
}
if len(depth.Bids) == 0 {
return fmt.Errorf("%s depth response error: empty bids", f.Symbol)
}
if debugBinanceDepth {
log.Infof("fetched %s depth, last update ID = %d", f.Symbol, depth.FinalUpdateID)
}
// filter the events by the event IDs
f.bufMutex.Lock()
bufEvents := f.bufEvents
f.bufEvents = nil
f.bufMutex.Unlock()
var events []DepthEvent
for _, e := range bufEvents {
if e.FinalUpdateID < depth.FinalUpdateID {
if debugBinanceDepth {
log.Infof("DROP %s depth event (final update id is %d older than the last update), updateID %d ~ %d (len %d)",
f.Symbol,
depth.FinalUpdateID-e.FinalUpdateID,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
}
continue
}
if debugBinanceDepth {
log.Infof("KEEP %s depth event, updateID %d ~ %d (len %d)",
f.Symbol,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
}
events = append(events, e)
}
// since we're buffering the update events, ideally the some of the head events
// should be older than the received depth snapshot.
// if the head event is newer than the depth we got,
// then there are something missed, we need to restart the process.
if len(events) > 0 {
// The first processed event should have U (final update ID) <= lastUpdateId+1 AND (first update id) >= lastUpdateId+1.
firstEvent := events[0]
// valid
nextID := depth.FinalUpdateID + 1
if firstEvent.FirstUpdateID > nextID || firstEvent.FinalUpdateID < nextID {
return fmt.Errorf("mismatch %s final update id for order book, resetting depth", f.Symbol)
}
if debugBinanceDepth {
log.Infof("VALID first %s depth event, updateID %d ~ %d (len %d)",
f.Symbol,
firstEvent.FirstUpdateID, firstEvent.FinalUpdateID, firstEvent.FinalUpdateID-firstEvent.FirstUpdateID)
}
}
if debugBinanceDepth {
log.Infof("READY %s depth, %d bufferred events", f.Symbol, len(events))
}
f.snapshotMutex.Lock()
f.snapshotDepth = depth
f.snapshotMutex.Unlock()
f.EmitReady(*depth, events)
return nil
}
func (f *DepthFrame) PushEvent(e DepthEvent) {
select {
case <-f.resetC:
f.reset()
default:
}
f.snapshotMutex.Lock()
snapshot := f.snapshotDepth
f.snapshotMutex.Unlock()
// before the snapshot is loaded, we need to buffer the events until we loaded the snapshot.
if snapshot == nil {
// buffer the events until we loaded the snapshot
f.bufferEvent(e)
go f.once.Do(func() {
if err := f.loadDepthSnapshot(); err != nil {
log.WithError(err).Errorf("%s depth snapshot load failed, resetting..", f.Symbol)
f.emitReset()
}
})
return
}
// drop old events
if e.FinalUpdateID <= snapshot.FinalUpdateID {
log.Infof("DROP %s depth update event, updateID %d ~ %d (len %d)",
f.Symbol,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
return
}
if e.FirstUpdateID > snapshot.FinalUpdateID+1 {
log.Infof("MISSING %s depth update event, resetting, updateID %d ~ %d (len %d)",
f.Symbol,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
f.emitReset()
return
}
f.snapshotMutex.Lock()
f.snapshotDepth.FinalUpdateID = e.FinalUpdateID
f.snapshotMutex.Unlock()
f.EmitPush(e)
}
// fetch fetches the depth and convert to the depth event so that we can reuse the event structure to convert it to the global orderbook type
func (f *DepthFrame) fetch(ctx context.Context) (*DepthEvent, error) {
if debugBinanceDepth {
log.Infof("fetching %s depth snapshot", f.Symbol)
}
response, err := f.client.NewDepthService().Symbol(f.Symbol).Do(ctx)
if err != nil {
return nil, err
}
event := DepthEvent{
Symbol: f.Symbol,
FirstUpdateID: 0,
FinalUpdateID: response.LastUpdateID,
}
for _, entry := range response.Bids {
event.Bids = append(event.Bids, DepthEntry{PriceLevel: entry.Price, Quantity: entry.Quantity})
}
for _, entry := range response.Asks {
event.Asks = append(event.Asks, DepthEntry{PriceLevel: entry.Price, Quantity: entry.Quantity})
}
return &event, nil
}