replace binance.depthFrame with the extracted depth.Buffer

This commit is contained in:
c9s 2021-12-25 01:57:05 +08:00
parent b217a0dec8
commit 7e7115b18f
7 changed files with 105 additions and 363 deletions

View File

@ -3,8 +3,8 @@ package cmd
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"syscall" "syscall"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -15,24 +15,14 @@ import (
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
// go run ./cmd/bbgo orderbook --exchange=ftx --symbol=BTCUSDT // go run ./cmd/bbgo orderbook --session=ftx --symbol=BTCUSDT
var orderbookCmd = &cobra.Command{ var orderbookCmd = &cobra.Command{
Use: "orderbook", Use: "orderbook",
Short: "connect to the order book market data streaming service of an exchange", Short: "connect to the order book market data streaming service of an exchange",
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background() ctx := context.Background()
exName, err := cmd.Flags().GetString("exchange") sessionName, err := cmd.Flags().GetString("session")
if err != nil {
return fmt.Errorf("can not get exchange from flags: %w", err)
}
exchangeName, err := types.ValidExchangeName(exName)
if err != nil {
return err
}
ex, err := cmdutil.NewExchange(exchangeName)
if err != nil { if err != nil {
return err return err
} }
@ -46,7 +36,21 @@ var orderbookCmd = &cobra.Command{
return fmt.Errorf("--symbol option is required") return fmt.Errorf("--symbol option is required")
} }
s := ex.NewStream() if userConfig == nil {
return errors.New("--config option or config file is missing")
}
environ := bbgo.NewEnvironment()
if err := environ.ConfigureExchangeSessions(userConfig); err != nil {
return err
}
session, ok := environ.Session(sessionName)
if !ok {
return fmt.Errorf("session %s not found", sessionName)
}
s := session.Exchange.NewStream()
s.SetPublicOnly() s.SetPublicOnly()
s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{})
s.OnBookSnapshot(func(book types.SliceOrderBook) { s.OnBookSnapshot(func(book types.SliceOrderBook) {
@ -58,9 +62,18 @@ var orderbookCmd = &cobra.Command{
log.Infof("connecting...") log.Infof("connecting...")
if err := s.Connect(ctx); err != nil { if err := s.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to %s", exchangeName) return fmt.Errorf("failed to connect to %s", sessionName)
} }
log.Infof("connected")
defer func() {
log.Infof("closing connection...")
if err := s.Close(); err != nil {
log.WithError(err).Errorf("connection close error")
}
time.Sleep(1 * time.Second)
}()
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
return nil return nil
}, },
@ -72,34 +85,11 @@ var orderUpdateCmd = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background() ctx := context.Background()
configFile, err := cmd.Flags().GetString("config") if userConfig == nil {
if err != nil { return errors.New("--config option or config file is missing")
return err
}
if len(configFile) == 0 {
return errors.New("--config option is required")
}
// if config file exists, use the config loaded from the config file.
// otherwise, use a empty config object
var userConfig *bbgo.Config
if _, err := os.Stat(configFile); err == nil {
// load successfully
userConfig, err = bbgo.Load(configFile, false)
if err != nil {
return err
}
} else if os.IsNotExist(err) {
// config file doesn't exist
userConfig = &bbgo.Config{}
} else {
// other error
return err
} }
environ := bbgo.NewEnvironment() environ := bbgo.NewEnvironment()
if err := environ.ConfigureExchangeSessions(userConfig); err != nil { if err := environ.ConfigureExchangeSessions(userConfig); err != nil {
return err return err
} }
@ -131,8 +121,7 @@ var orderUpdateCmd = &cobra.Command{
} }
func init() { func init() {
// since the public data does not require trading authentication, we use --exchange option here. orderbookCmd.Flags().String("session", "", "session name")
orderbookCmd.Flags().String("exchange", "", "the exchange name for sync")
orderbookCmd.Flags().String("symbol", "", "the trading pair. e.g, BTCUSDT, LTCUSDT...") orderbookCmd.Flags().String("symbol", "", "the trading pair. e.g, BTCUSDT, LTCUSDT...")
orderUpdateCmd.Flags().String("session", "", "session name") orderUpdateCmd.Flags().String("session", "", "session name")

View File

@ -24,13 +24,13 @@ var userDataStreamCmd = &cobra.Command{
return errors.New("--config option or config file is missing") return errors.New("--config option or config file is missing")
} }
environ := bbgo.NewEnvironment() sessionName, err := cmd.Flags().GetString("session")
if err := environ.ConfigureExchangeSessions(userConfig); err != nil { if err != nil {
return err return err
} }
sessionName, err := cmd.Flags().GetString("session") environ := bbgo.NewEnvironment()
if err != nil { if err := environ.ConfigureExchangeSessions(userConfig); err != nil {
return err return err
} }

View File

@ -20,7 +20,6 @@ type Update struct {
//go:generate callbackgen -type Buffer //go:generate callbackgen -type Buffer
type Buffer struct { type Buffer struct {
Symbol string
buffer []Update buffer []Update
finalUpdateID int64 finalUpdateID int64
@ -36,9 +35,8 @@ type Buffer struct {
once util.Reonce once util.Reonce
} }
func NewDepthBuffer(symbol string, fetcher SnapshotFetcher) *Buffer { func NewBuffer(fetcher SnapshotFetcher) *Buffer {
return &Buffer{ return &Buffer{
Symbol: symbol,
fetcher: fetcher, fetcher: fetcher,
resetC: make(chan struct{}, 1), resetC: make(chan struct{}, 1),
} }
@ -57,6 +55,13 @@ func (b *Buffer) emitReset() {
} }
} }
func (b *Buffer) Reset() {
b.mu.Lock()
b.resetSnapshot()
b.emitReset()
b.mu.Unlock()
}
// AddUpdate adds the update to the buffer or push the update to the subscriber // AddUpdate adds the update to the buffer or push the update to the subscriber
func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error { func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error {
finalUpdateID := firstUpdateID finalUpdateID := firstUpdateID
@ -95,9 +100,8 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg
// if there is a missing update, we should reset the snapshot and re-fetch the snapshot // if there is a missing update, we should reset the snapshot and re-fetch the snapshot
if u.FirstUpdateID > b.finalUpdateID+1 { if u.FirstUpdateID > b.finalUpdateID+1 {
// emitReset will reset the once outside the mutex lock section // emitReset will reset the once outside the mutex lock section
b.buffer = []Update{u}
b.resetSnapshot() b.resetSnapshot()
b.buffer = nil
b.buffer = append(b.buffer, u)
b.emitReset() b.emitReset()
b.mu.Unlock() b.mu.Unlock()
return fmt.Errorf("there is a missing update between %d and %d", u.FirstUpdateID, b.finalUpdateID+1) return fmt.Errorf("there is a missing update between %d and %d", u.FirstUpdateID, b.finalUpdateID+1)

View File

@ -11,7 +11,7 @@ import (
) )
func TestDepthBuffer_ReadyState(t *testing.T) { func TestDepthBuffer_ReadyState(t *testing.T) {
buf := NewDepthBuffer("", func() (book types.SliceOrderBook, finalID int64, err error) { buf := NewBuffer(func() (book types.SliceOrderBook, finalID int64, err error) {
return types.SliceOrderBook{ return types.SliceOrderBook{
Bids: types.PriceVolumeSlice{ Bids: types.PriceVolumeSlice{
{Price: 100, Volume: 1}, {Price: 100, Volume: 1},
@ -48,7 +48,7 @@ func TestDepthBuffer_CorruptedUpdateAtTheBeginning(t *testing.T) {
// snapshot starts from 30, // snapshot starts from 30,
// the first ready event should have a snapshot(30) and updates (31~50) // the first ready event should have a snapshot(30) and updates (31~50)
var snapshotFinalID int64 = 0 var snapshotFinalID int64 = 0
buf := NewDepthBuffer("", func() (types.SliceOrderBook, int64, error) { buf := NewBuffer(func() (types.SliceOrderBook, int64, error) {
snapshotFinalID += 30 snapshotFinalID += 30
return types.SliceOrderBook{ return types.SliceOrderBook{
Bids: types.PriceVolumeSlice{ Bids: types.PriceVolumeSlice{
@ -87,7 +87,7 @@ func TestDepthBuffer_CorruptedUpdateAtTheBeginning(t *testing.T) {
func TestDepthBuffer_ConcurrentRun(t *testing.T) { func TestDepthBuffer_ConcurrentRun(t *testing.T) {
var snapshotFinalID int64 = 0 var snapshotFinalID int64 = 0
buf := NewDepthBuffer("", func() (types.SliceOrderBook, int64, error) { buf := NewBuffer(func() (types.SliceOrderBook, int64, error) {
snapshotFinalID += 30 snapshotFinalID += 30
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
return types.SliceOrderBook{ return types.SliceOrderBook{

View File

@ -1,246 +0,0 @@
package binance
import (
"context"
"fmt"
"sync"
"time"
"github.com/adshao/go-binance/v2"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
//go:generate callbackgen -type DepthFrame
type DepthFrame struct {
Symbol string
client *binance.Client
context context.Context
snapshotMutex sync.Mutex
snapshotDepth *DepthEvent
bufMutex sync.Mutex
bufEvents []DepthEvent
resetC chan struct{}
once sync.Once
readyCallbacks []func(snapshotDepth DepthEvent, bufEvents []DepthEvent)
pushCallbacks []func(e DepthEvent)
}
func (f *DepthFrame) reset() {
if debugBinanceDepth {
log.Infof("resetting %s depth frame", f.Symbol)
}
f.bufMutex.Lock()
f.bufEvents = nil
f.bufMutex.Unlock()
f.snapshotMutex.Lock()
f.snapshotDepth = nil
f.once = sync.Once{}
f.snapshotMutex.Unlock()
}
func (f *DepthFrame) emitReset() {
select {
case f.resetC <- struct{}{}:
default:
}
}
func (f *DepthFrame) bufferEvent(e DepthEvent) {
if debugBinanceDepth {
log.Infof("buffering %s depth event FirstUpdateID = %d, FinalUpdateID = %d", f.Symbol, e.FirstUpdateID, e.FinalUpdateID)
}
f.bufMutex.Lock()
f.bufEvents = append(f.bufEvents, e)
f.bufMutex.Unlock()
}
func (f *DepthFrame) loadDepthSnapshot() error {
if debugBinanceDepth {
log.Infof("buffering %s depth events...", f.Symbol)
}
time.Sleep(3 * time.Second)
depth, err := f.fetch(f.context)
if err != nil {
return err
}
if len(depth.Asks) == 0 {
return fmt.Errorf("%s depth response error: empty asks", f.Symbol)
}
if len(depth.Bids) == 0 {
return fmt.Errorf("%s depth response error: empty bids", f.Symbol)
}
if debugBinanceDepth {
log.Infof("fetched %s depth, last update ID = %d", f.Symbol, depth.FinalUpdateID)
}
// filter the events by the event IDs
f.bufMutex.Lock()
bufEvents := f.bufEvents
f.bufEvents = nil
f.bufMutex.Unlock()
var events []DepthEvent
for _, e := range bufEvents {
if e.FinalUpdateID < depth.FinalUpdateID {
if debugBinanceDepth {
log.Infof("DROP %s depth event (final update id is %d older than the last update), updateID %d ~ %d (len %d)",
f.Symbol,
depth.FinalUpdateID-e.FinalUpdateID,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
}
continue
}
if debugBinanceDepth {
log.Infof("KEEP %s depth event, updateID %d ~ %d (len %d)",
f.Symbol,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
}
events = append(events, e)
}
// since we're buffering the update events, ideally the some of the head events
// should be older than the received depth snapshot.
// if the head event is newer than the depth we got,
// then there are something missed, we need to restart the process.
if len(events) > 0 {
// The first processed event should have U (final update ID) <= lastUpdateId+1 AND (first update id) >= lastUpdateId+1.
firstEvent := events[0]
// valid
nextID := depth.FinalUpdateID + 1
if firstEvent.FirstUpdateID > nextID || firstEvent.FinalUpdateID < nextID {
return fmt.Errorf("mismatch %s final update id for order book, resetting depth", f.Symbol)
}
if debugBinanceDepth {
log.Infof("VALID first %s depth event, updateID %d ~ %d (len %d)",
f.Symbol,
firstEvent.FirstUpdateID, firstEvent.FinalUpdateID, firstEvent.FinalUpdateID-firstEvent.FirstUpdateID)
}
}
if debugBinanceDepth {
log.Infof("READY %s depth, %d bufferred events", f.Symbol, len(events))
}
f.snapshotMutex.Lock()
f.snapshotDepth = depth
f.snapshotMutex.Unlock()
f.EmitReady(*depth, events)
return nil
}
func (f *DepthFrame) PushEvent(e DepthEvent) {
select {
case <-f.resetC:
f.reset()
default:
}
f.snapshotMutex.Lock()
snapshot := f.snapshotDepth
f.snapshotMutex.Unlock()
// before the snapshot is loaded, we need to buffer the events until we loaded the snapshot.
if snapshot == nil {
// buffer the events until we loaded the snapshot
f.bufferEvent(e)
go f.once.Do(func() {
if err := f.loadDepthSnapshot(); err != nil {
log.WithError(err).Errorf("%s depth snapshot load failed, resetting..", f.Symbol)
f.emitReset()
}
})
return
}
// drop old events
if e.FinalUpdateID <= snapshot.FinalUpdateID {
log.Infof("DROP %s depth update event, updateID %d ~ %d (len %d)",
f.Symbol,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
return
}
if e.FirstUpdateID > snapshot.FinalUpdateID+1 {
log.Infof("MISSING %s depth update event, resetting, updateID %d ~ %d (len %d)",
f.Symbol,
e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
f.emitReset()
return
}
f.snapshotMutex.Lock()
f.snapshotDepth.FinalUpdateID = e.FinalUpdateID
f.snapshotMutex.Unlock()
f.EmitPush(e)
}
// fetch fetches the depth and convert to the depth event so that we can reuse the event structure to convert it to the global orderbook type
func (f *DepthFrame) fetch(ctx context.Context) (*DepthEvent, error) {
if debugBinanceDepth {
log.Infof("fetching %s depth snapshot", f.Symbol)
}
response, err := f.client.NewDepthService().Symbol(f.Symbol).Do(ctx)
if err != nil {
return nil, err
}
event := DepthEvent{
Symbol: f.Symbol,
FirstUpdateID: 0,
FinalUpdateID: response.LastUpdateID,
}
for _, entry := range response.Bids {
// entry.Price, Quantity: entry.Quantity
price, err := fixedpoint.NewFromString(entry.Price)
if err != nil {
return nil, err
}
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
return nil, err
}
event.Bids = append(event.Bids, types.PriceVolume{Price: price, Volume: quantity})
}
for _, entry := range response.Asks {
price, err := fixedpoint.NewFromString(entry.Price)
if err != nil {
return nil, err
}
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
return nil, err
}
event.Asks = append(event.Asks, types.PriceVolume{Price: price, Volume : quantity})
}
return &event, nil
}

View File

@ -1,25 +0,0 @@
// Code generated by "callbackgen -type DepthFrame"; DO NOT EDIT.
package binance
import ()
func (f *DepthFrame) OnReady(cb func(snapshotDepth DepthEvent, bufEvents []DepthEvent)) {
f.readyCallbacks = append(f.readyCallbacks, cb)
}
func (f *DepthFrame) EmitReady(snapshotDepth DepthEvent, bufEvents []DepthEvent) {
for _, cb := range f.readyCallbacks {
cb(snapshotDepth, bufEvents)
}
}
func (f *DepthFrame) OnPush(cb func(e DepthEvent)) {
f.pushCallbacks = append(f.pushCallbacks, cb)
}
func (f *DepthFrame) EmitPush(e DepthEvent) {
for _, cb := range f.pushCallbacks {
cb(e)
}
}

View File

@ -2,7 +2,6 @@ package binance
import ( import (
"context" "context"
"github.com/c9s/bbgo/pkg/util"
"math/rand" "math/rand"
"net" "net"
"net/http" "net/http"
@ -11,6 +10,10 @@ import (
"sync" "sync"
"time" "time"
"github.com/c9s/bbgo/pkg/depth"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/util"
"github.com/adshao/go-binance/v2" "github.com/adshao/go-binance/v2"
"github.com/adshao/go-binance/v2/futures" "github.com/adshao/go-binance/v2/futures"
@ -90,7 +93,7 @@ type Stream struct {
orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent) orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent)
depthFrames map[string]*DepthFrame depthFrames map[string]*depth.Buffer
} }
func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream { func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream {
@ -100,7 +103,7 @@ func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream {
}, },
Client: client, Client: client,
futuresClient: futuresClient, futuresClient: futuresClient,
depthFrames: make(map[string]*DepthFrame), depthFrames: make(map[string]*depth.Buffer),
} }
stream.OnDepthEvent(func(e *DepthEvent) { stream.OnDepthEvent(func(e *DepthEvent) {
@ -110,52 +113,69 @@ func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream {
f, ok := stream.depthFrames[e.Symbol] f, ok := stream.depthFrames[e.Symbol]
if !ok { if !ok {
f = &DepthFrame{ f = depth.NewBuffer(func() (snapshot types.SliceOrderBook, finalUpdateID int64, err error) {
client: client, response, err := client.NewDepthService().Symbol(e.Symbol).Do(context.Background())
context: context.Background(), if err != nil {
Symbol: e.Symbol, return snapshot, finalUpdateID, err
resetC: make(chan struct{}, 1), }
}
snapshot.Symbol = e.Symbol
finalUpdateID = response.LastUpdateID
for _, entry := range response.Bids {
// entry.Price, Quantity: entry.Quantity
price, err := fixedpoint.NewFromString(entry.Price)
if err != nil {
return snapshot, finalUpdateID, err
}
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
return snapshot, finalUpdateID, err
}
snapshot.Bids = append(snapshot.Bids, types.PriceVolume{Price: price, Volume: quantity})
}
for _, entry := range response.Asks {
price, err := fixedpoint.NewFromString(entry.Price)
if err != nil {
return snapshot, finalUpdateID, err
}
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
return snapshot, finalUpdateID, err
}
snapshot.Asks = append(snapshot.Asks, types.PriceVolume{Price: price, Volume: quantity})
}
return snapshot, finalUpdateID, nil
})
stream.depthFrames[e.Symbol] = f stream.depthFrames[e.Symbol] = f
f.OnReady(func(snapshotDepth DepthEvent, bufEvents []DepthEvent) { f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
log.Infof("depth snapshot ready: %s", snapshotDepth.String())
snapshot, err := snapshotDepth.OrderBook()
if err != nil {
log.WithError(err).Error("book snapshot convert error")
return
}
if valid, err := snapshot.IsValid(); !valid { if valid, err := snapshot.IsValid(); !valid {
log.Errorf("depth snapshot is invalid, event: %+v, error: %v", snapshotDepth, err) log.Errorf("depth snapshot is invalid, error: %v", err)
return
} }
stream.EmitBookSnapshot(snapshot) stream.EmitBookSnapshot(snapshot)
for _, e := range bufEvents { for _, u := range updates {
bookUpdate, err := e.OrderBook() stream.EmitBookUpdate(u.Object)
if err != nil {
log.WithError(err).Error("book convert error")
return
}
stream.EmitBookUpdate(bookUpdate)
} }
}) })
f.OnPush(func(update depth.Update) {
f.OnPush(func(e DepthEvent) { stream.EmitBookUpdate(update.Object)
book, err := e.OrderBook()
if err != nil {
log.WithError(err).Error("book convert error")
return
}
stream.EmitBookUpdate(book)
}) })
} else { } else {
f.PushEvent(*e) f.AddUpdate(types.SliceOrderBook{
Symbol: e.Symbol,
Bids: e.Bids,
Asks: e.Asks,
}, e.FirstUpdateID, e.FinalUpdateID)
} }
}) })
@ -270,7 +290,7 @@ func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream {
stream.OnDisconnect(func() { stream.OnDisconnect(func() {
log.Infof("resetting depth snapshots...") log.Infof("resetting depth snapshots...")
for _, f := range stream.depthFrames { for _, f := range stream.depthFrames {
f.emitReset() f.Reset()
} }
}) })