diff --git a/pkg/cmd/orderbook.go b/pkg/cmd/orderbook.go index 7cc59fd04..eb2926bef 100644 --- a/pkg/cmd/orderbook.go +++ b/pkg/cmd/orderbook.go @@ -3,8 +3,8 @@ package cmd import ( "context" "fmt" - "os" "syscall" + "time" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -15,24 +15,14 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -// go run ./cmd/bbgo orderbook --exchange=ftx --symbol=BTCUSDT +// go run ./cmd/bbgo orderbook --session=ftx --symbol=BTCUSDT var orderbookCmd = &cobra.Command{ Use: "orderbook", Short: "connect to the order book market data streaming service of an exchange", RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() - exName, err := cmd.Flags().GetString("exchange") - if err != nil { - return fmt.Errorf("can not get exchange from flags: %w", err) - } - - exchangeName, err := types.ValidExchangeName(exName) - if err != nil { - return err - } - - ex, err := cmdutil.NewExchange(exchangeName) + sessionName, err := cmd.Flags().GetString("session") if err != nil { return err } @@ -46,7 +36,21 @@ var orderbookCmd = &cobra.Command{ return fmt.Errorf("--symbol option is required") } - s := ex.NewStream() + if userConfig == nil { + return errors.New("--config option or config file is missing") + } + + environ := bbgo.NewEnvironment() + if err := environ.ConfigureExchangeSessions(userConfig); err != nil { + return err + } + + session, ok := environ.Session(sessionName) + if !ok { + return fmt.Errorf("session %s not found", sessionName) + } + + s := session.Exchange.NewStream() s.SetPublicOnly() s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) s.OnBookSnapshot(func(book types.SliceOrderBook) { @@ -58,9 +62,18 @@ var orderbookCmd = &cobra.Command{ log.Infof("connecting...") if err := s.Connect(ctx); err != nil { - return fmt.Errorf("failed to connect to %s", exchangeName) + return fmt.Errorf("failed to connect to %s", sessionName) } + log.Infof("connected") + defer func() { + log.Infof("closing connection...") + if err := s.Close(); err != nil { + log.WithError(err).Errorf("connection close error") + } + time.Sleep(1 * time.Second) + }() + cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) return nil }, @@ -72,34 +85,11 @@ var orderUpdateCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() - configFile, err := cmd.Flags().GetString("config") - if err != nil { - return err - } - - if len(configFile) == 0 { - return errors.New("--config option is required") - } - - // if config file exists, use the config loaded from the config file. - // otherwise, use a empty config object - var userConfig *bbgo.Config - if _, err := os.Stat(configFile); err == nil { - // load successfully - userConfig, err = bbgo.Load(configFile, false) - if err != nil { - return err - } - } else if os.IsNotExist(err) { - // config file doesn't exist - userConfig = &bbgo.Config{} - } else { - // other error - return err + if userConfig == nil { + return errors.New("--config option or config file is missing") } environ := bbgo.NewEnvironment() - if err := environ.ConfigureExchangeSessions(userConfig); err != nil { return err } @@ -131,8 +121,7 @@ var orderUpdateCmd = &cobra.Command{ } func init() { - // since the public data does not require trading authentication, we use --exchange option here. - orderbookCmd.Flags().String("exchange", "", "the exchange name for sync") + orderbookCmd.Flags().String("session", "", "session name") orderbookCmd.Flags().String("symbol", "", "the trading pair. e.g, BTCUSDT, LTCUSDT...") orderUpdateCmd.Flags().String("session", "", "session name") diff --git a/pkg/cmd/userdatastream.go b/pkg/cmd/userdatastream.go index b90f0dd0d..30db3e65b 100644 --- a/pkg/cmd/userdatastream.go +++ b/pkg/cmd/userdatastream.go @@ -24,13 +24,13 @@ var userDataStreamCmd = &cobra.Command{ return errors.New("--config option or config file is missing") } - environ := bbgo.NewEnvironment() - if err := environ.ConfigureExchangeSessions(userConfig); err != nil { + sessionName, err := cmd.Flags().GetString("session") + if err != nil { return err } - sessionName, err := cmd.Flags().GetString("session") - if err != nil { + environ := bbgo.NewEnvironment() + if err := environ.ConfigureExchangeSessions(userConfig); err != nil { return err } diff --git a/pkg/depth/buffer.go b/pkg/depth/buffer.go index 30a72a869..8c2ec40e0 100644 --- a/pkg/depth/buffer.go +++ b/pkg/depth/buffer.go @@ -20,7 +20,6 @@ type Update struct { //go:generate callbackgen -type Buffer type Buffer struct { - Symbol string buffer []Update finalUpdateID int64 @@ -36,9 +35,8 @@ type Buffer struct { once util.Reonce } -func NewDepthBuffer(symbol string, fetcher SnapshotFetcher) *Buffer { +func NewBuffer(fetcher SnapshotFetcher) *Buffer { return &Buffer{ - Symbol: symbol, fetcher: fetcher, resetC: make(chan struct{}, 1), } @@ -57,6 +55,13 @@ func (b *Buffer) emitReset() { } } +func (b *Buffer) Reset() { + b.mu.Lock() + b.resetSnapshot() + b.emitReset() + b.mu.Unlock() +} + // AddUpdate adds the update to the buffer or push the update to the subscriber func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error { finalUpdateID := firstUpdateID @@ -95,9 +100,8 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg // if there is a missing update, we should reset the snapshot and re-fetch the snapshot if u.FirstUpdateID > b.finalUpdateID+1 { // emitReset will reset the once outside the mutex lock section + b.buffer = []Update{u} b.resetSnapshot() - b.buffer = nil - b.buffer = append(b.buffer, u) b.emitReset() b.mu.Unlock() return fmt.Errorf("there is a missing update between %d and %d", u.FirstUpdateID, b.finalUpdateID+1) diff --git a/pkg/depth/buffer_test.go b/pkg/depth/buffer_test.go index e538733db..a3cc2d569 100644 --- a/pkg/depth/buffer_test.go +++ b/pkg/depth/buffer_test.go @@ -11,7 +11,7 @@ import ( ) func TestDepthBuffer_ReadyState(t *testing.T) { - buf := NewDepthBuffer("", func() (book types.SliceOrderBook, finalID int64, err error) { + buf := NewBuffer(func() (book types.SliceOrderBook, finalID int64, err error) { return types.SliceOrderBook{ Bids: types.PriceVolumeSlice{ {Price: 100, Volume: 1}, @@ -48,7 +48,7 @@ func TestDepthBuffer_CorruptedUpdateAtTheBeginning(t *testing.T) { // snapshot starts from 30, // the first ready event should have a snapshot(30) and updates (31~50) var snapshotFinalID int64 = 0 - buf := NewDepthBuffer("", func() (types.SliceOrderBook, int64, error) { + buf := NewBuffer(func() (types.SliceOrderBook, int64, error) { snapshotFinalID += 30 return types.SliceOrderBook{ Bids: types.PriceVolumeSlice{ @@ -87,7 +87,7 @@ func TestDepthBuffer_CorruptedUpdateAtTheBeginning(t *testing.T) { func TestDepthBuffer_ConcurrentRun(t *testing.T) { var snapshotFinalID int64 = 0 - buf := NewDepthBuffer("", func() (types.SliceOrderBook, int64, error) { + buf := NewBuffer(func() (types.SliceOrderBook, int64, error) { snapshotFinalID += 30 time.Sleep(10 * time.Millisecond) return types.SliceOrderBook{ diff --git a/pkg/exchange/binance/depthframe.go b/pkg/exchange/binance/depthframe.go deleted file mode 100644 index 6b5435196..000000000 --- a/pkg/exchange/binance/depthframe.go +++ /dev/null @@ -1,246 +0,0 @@ -package binance - -import ( - "context" - "fmt" - "sync" - "time" - - "github.com/adshao/go-binance/v2" - "github.com/c9s/bbgo/pkg/fixedpoint" - "github.com/c9s/bbgo/pkg/types" -) - -//go:generate callbackgen -type DepthFrame -type DepthFrame struct { - Symbol string - - client *binance.Client - context context.Context - - snapshotMutex sync.Mutex - snapshotDepth *DepthEvent - - bufMutex sync.Mutex - bufEvents []DepthEvent - - resetC chan struct{} - once sync.Once - - readyCallbacks []func(snapshotDepth DepthEvent, bufEvents []DepthEvent) - pushCallbacks []func(e DepthEvent) -} - -func (f *DepthFrame) reset() { - if debugBinanceDepth { - log.Infof("resetting %s depth frame", f.Symbol) - } - - f.bufMutex.Lock() - f.bufEvents = nil - f.bufMutex.Unlock() - - f.snapshotMutex.Lock() - f.snapshotDepth = nil - f.once = sync.Once{} - f.snapshotMutex.Unlock() -} - -func (f *DepthFrame) emitReset() { - select { - case f.resetC <- struct{}{}: - default: - } -} - -func (f *DepthFrame) bufferEvent(e DepthEvent) { - if debugBinanceDepth { - log.Infof("buffering %s depth event FirstUpdateID = %d, FinalUpdateID = %d", f.Symbol, e.FirstUpdateID, e.FinalUpdateID) - } - - f.bufMutex.Lock() - f.bufEvents = append(f.bufEvents, e) - f.bufMutex.Unlock() -} - -func (f *DepthFrame) loadDepthSnapshot() error { - if debugBinanceDepth { - log.Infof("buffering %s depth events...", f.Symbol) - } - - time.Sleep(3 * time.Second) - - depth, err := f.fetch(f.context) - if err != nil { - return err - } - - if len(depth.Asks) == 0 { - return fmt.Errorf("%s depth response error: empty asks", f.Symbol) - } - - if len(depth.Bids) == 0 { - return fmt.Errorf("%s depth response error: empty bids", f.Symbol) - } - - if debugBinanceDepth { - log.Infof("fetched %s depth, last update ID = %d", f.Symbol, depth.FinalUpdateID) - } - - // filter the events by the event IDs - f.bufMutex.Lock() - bufEvents := f.bufEvents - f.bufEvents = nil - f.bufMutex.Unlock() - - var events []DepthEvent - for _, e := range bufEvents { - if e.FinalUpdateID < depth.FinalUpdateID { - if debugBinanceDepth { - log.Infof("DROP %s depth event (final update id is %d older than the last update), updateID %d ~ %d (len %d)", - f.Symbol, - depth.FinalUpdateID-e.FinalUpdateID, - e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) - } - - continue - } - - if debugBinanceDepth { - log.Infof("KEEP %s depth event, updateID %d ~ %d (len %d)", - f.Symbol, - e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) - } - - events = append(events, e) - } - - // since we're buffering the update events, ideally the some of the head events - // should be older than the received depth snapshot. - // if the head event is newer than the depth we got, - // then there are something missed, we need to restart the process. - if len(events) > 0 { - // The first processed event should have U (final update ID) <= lastUpdateId+1 AND (first update id) >= lastUpdateId+1. - firstEvent := events[0] - - // valid - nextID := depth.FinalUpdateID + 1 - if firstEvent.FirstUpdateID > nextID || firstEvent.FinalUpdateID < nextID { - return fmt.Errorf("mismatch %s final update id for order book, resetting depth", f.Symbol) - } - - if debugBinanceDepth { - log.Infof("VALID first %s depth event, updateID %d ~ %d (len %d)", - f.Symbol, - firstEvent.FirstUpdateID, firstEvent.FinalUpdateID, firstEvent.FinalUpdateID-firstEvent.FirstUpdateID) - } - } - - if debugBinanceDepth { - log.Infof("READY %s depth, %d bufferred events", f.Symbol, len(events)) - } - - f.snapshotMutex.Lock() - f.snapshotDepth = depth - f.snapshotMutex.Unlock() - - f.EmitReady(*depth, events) - return nil -} - -func (f *DepthFrame) PushEvent(e DepthEvent) { - select { - case <-f.resetC: - f.reset() - default: - } - - f.snapshotMutex.Lock() - snapshot := f.snapshotDepth - f.snapshotMutex.Unlock() - - // before the snapshot is loaded, we need to buffer the events until we loaded the snapshot. - if snapshot == nil { - // buffer the events until we loaded the snapshot - f.bufferEvent(e) - - go f.once.Do(func() { - if err := f.loadDepthSnapshot(); err != nil { - log.WithError(err).Errorf("%s depth snapshot load failed, resetting..", f.Symbol) - f.emitReset() - } - }) - return - } - - // drop old events - if e.FinalUpdateID <= snapshot.FinalUpdateID { - log.Infof("DROP %s depth update event, updateID %d ~ %d (len %d)", - f.Symbol, - e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) - return - } - - if e.FirstUpdateID > snapshot.FinalUpdateID+1 { - log.Infof("MISSING %s depth update event, resetting, updateID %d ~ %d (len %d)", - f.Symbol, - e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) - - f.emitReset() - return - } - - f.snapshotMutex.Lock() - f.snapshotDepth.FinalUpdateID = e.FinalUpdateID - f.snapshotMutex.Unlock() - f.EmitPush(e) -} - -// fetch fetches the depth and convert to the depth event so that we can reuse the event structure to convert it to the global orderbook type -func (f *DepthFrame) fetch(ctx context.Context) (*DepthEvent, error) { - if debugBinanceDepth { - log.Infof("fetching %s depth snapshot", f.Symbol) - } - - response, err := f.client.NewDepthService().Symbol(f.Symbol).Do(ctx) - if err != nil { - return nil, err - } - - event := DepthEvent{ - Symbol: f.Symbol, - FirstUpdateID: 0, - FinalUpdateID: response.LastUpdateID, - } - - for _, entry := range response.Bids { - // entry.Price, Quantity: entry.Quantity - price, err := fixedpoint.NewFromString(entry.Price) - if err != nil { - return nil, err - } - - quantity, err := fixedpoint.NewFromString(entry.Quantity) - if err != nil { - return nil, err - } - - event.Bids = append(event.Bids, types.PriceVolume{Price: price, Volume: quantity}) - } - - for _, entry := range response.Asks { - price, err := fixedpoint.NewFromString(entry.Price) - if err != nil { - return nil, err - } - - quantity, err := fixedpoint.NewFromString(entry.Quantity) - if err != nil { - return nil, err - } - - event.Asks = append(event.Asks, types.PriceVolume{Price: price, Volume : quantity}) - } - - return &event, nil -} diff --git a/pkg/exchange/binance/depthframe_callbacks.go b/pkg/exchange/binance/depthframe_callbacks.go deleted file mode 100644 index df45c16b4..000000000 --- a/pkg/exchange/binance/depthframe_callbacks.go +++ /dev/null @@ -1,25 +0,0 @@ -// Code generated by "callbackgen -type DepthFrame"; DO NOT EDIT. - -package binance - -import () - -func (f *DepthFrame) OnReady(cb func(snapshotDepth DepthEvent, bufEvents []DepthEvent)) { - f.readyCallbacks = append(f.readyCallbacks, cb) -} - -func (f *DepthFrame) EmitReady(snapshotDepth DepthEvent, bufEvents []DepthEvent) { - for _, cb := range f.readyCallbacks { - cb(snapshotDepth, bufEvents) - } -} - -func (f *DepthFrame) OnPush(cb func(e DepthEvent)) { - f.pushCallbacks = append(f.pushCallbacks, cb) -} - -func (f *DepthFrame) EmitPush(e DepthEvent) { - for _, cb := range f.pushCallbacks { - cb(e) - } -} diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index b10ef41b5..9e3bdc3e4 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -2,7 +2,6 @@ package binance import ( "context" - "github.com/c9s/bbgo/pkg/util" "math/rand" "net" "net/http" @@ -11,6 +10,10 @@ import ( "sync" "time" + "github.com/c9s/bbgo/pkg/depth" + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/util" + "github.com/adshao/go-binance/v2" "github.com/adshao/go-binance/v2/futures" @@ -90,7 +93,7 @@ type Stream struct { orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent) - depthFrames map[string]*DepthFrame + depthFrames map[string]*depth.Buffer } func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream { @@ -100,7 +103,7 @@ func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream { }, Client: client, futuresClient: futuresClient, - depthFrames: make(map[string]*DepthFrame), + depthFrames: make(map[string]*depth.Buffer), } stream.OnDepthEvent(func(e *DepthEvent) { @@ -110,52 +113,69 @@ func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream { f, ok := stream.depthFrames[e.Symbol] if !ok { - f = &DepthFrame{ - client: client, - context: context.Background(), - Symbol: e.Symbol, - resetC: make(chan struct{}, 1), - } + f = depth.NewBuffer(func() (snapshot types.SliceOrderBook, finalUpdateID int64, err error) { + response, err := client.NewDepthService().Symbol(e.Symbol).Do(context.Background()) + if err != nil { + return snapshot, finalUpdateID, err + } + + snapshot.Symbol = e.Symbol + finalUpdateID = response.LastUpdateID + for _, entry := range response.Bids { + // entry.Price, Quantity: entry.Quantity + price, err := fixedpoint.NewFromString(entry.Price) + if err != nil { + return snapshot, finalUpdateID, err + } + + quantity, err := fixedpoint.NewFromString(entry.Quantity) + if err != nil { + return snapshot, finalUpdateID, err + } + + snapshot.Bids = append(snapshot.Bids, types.PriceVolume{Price: price, Volume: quantity}) + } + + for _, entry := range response.Asks { + price, err := fixedpoint.NewFromString(entry.Price) + if err != nil { + return snapshot, finalUpdateID, err + } + + quantity, err := fixedpoint.NewFromString(entry.Quantity) + if err != nil { + return snapshot, finalUpdateID, err + } + + snapshot.Asks = append(snapshot.Asks, types.PriceVolume{Price: price, Volume: quantity}) + } + + return snapshot, finalUpdateID, nil + }) stream.depthFrames[e.Symbol] = f - f.OnReady(func(snapshotDepth DepthEvent, bufEvents []DepthEvent) { - log.Infof("depth snapshot ready: %s", snapshotDepth.String()) - - snapshot, err := snapshotDepth.OrderBook() - if err != nil { - log.WithError(err).Error("book snapshot convert error") - return - } - + f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { if valid, err := snapshot.IsValid(); !valid { - log.Errorf("depth snapshot is invalid, event: %+v, error: %v", snapshotDepth, err) + log.Errorf("depth snapshot is invalid, error: %v", err) + return } stream.EmitBookSnapshot(snapshot) - for _, e := range bufEvents { - bookUpdate, err := e.OrderBook() - if err != nil { - log.WithError(err).Error("book convert error") - return - } - - stream.EmitBookUpdate(bookUpdate) + for _, u := range updates { + stream.EmitBookUpdate(u.Object) } }) - - f.OnPush(func(e DepthEvent) { - book, err := e.OrderBook() - if err != nil { - log.WithError(err).Error("book convert error") - return - } - - stream.EmitBookUpdate(book) + f.OnPush(func(update depth.Update) { + stream.EmitBookUpdate(update.Object) }) } else { - f.PushEvent(*e) + f.AddUpdate(types.SliceOrderBook{ + Symbol: e.Symbol, + Bids: e.Bids, + Asks: e.Asks, + }, e.FirstUpdateID, e.FinalUpdateID) } }) @@ -270,7 +290,7 @@ func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream { stream.OnDisconnect(func() { log.Infof("resetting depth snapshots...") for _, f := range stream.depthFrames { - f.emitReset() + f.Reset() } })