diff --git a/pkg/cmd/orderbook.go b/pkg/cmd/orderbook.go index eb2926bef..3c1f7e947 100644 --- a/pkg/cmd/orderbook.go +++ b/pkg/cmd/orderbook.go @@ -50,14 +50,39 @@ var orderbookCmd = &cobra.Command{ return fmt.Errorf("session %s not found", sessionName) } + orderBook := types.NewMutexOrderBook(symbol) + s := session.Exchange.NewStream() s.SetPublicOnly() s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) s.OnBookSnapshot(func(book types.SliceOrderBook) { log.Infof("orderbook snapshot: %s", book.String()) + orderBook.Load(book) + + if ok, err := orderBook.IsValid() ; !ok { + log.WithError(err).Panicf("invalid error book snapshot") + } + + if bid, ask, ok := orderBook.BestBidAndAsk() ; ok { + log.Infof("ASK | %f x %f / %f x %f | BID", + ask.Volume.Float64(), ask.Price.Float64(), + bid.Price.Float64(), bid.Volume.Float64()) + } }) s.OnBookUpdate(func(book types.SliceOrderBook) { log.Infof("orderbook update: %s", book.String()) + + orderBook.Update(book) + + if ok, err := orderBook.IsValid() ; !ok { + log.WithError(err).Panicf("invalid error book update") + } + + if bid, ask, ok := orderBook.BestBidAndAsk() ; ok { + log.Infof("ASK | %f x %f / %f x %f | BID", + ask.Volume.Float64(), ask.Price.Float64(), + bid.Price.Float64(), bid.Volume.Float64()) + } }) log.Infof("connecting...") diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index 7aba0aaac..8ed0f365a 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -171,7 +171,7 @@ func (e *Exchange) QueryAveragePrice(ctx context.Context, symbol string) (float6 } func (e *Exchange) NewStream() types.Stream { - stream := NewStream(e.Client, e.futuresClient) + stream := NewStream(e, e.Client, e.futuresClient) stream.MarginSettings = e.MarginSettings stream.FuturesSettings = e.FuturesSettings return stream @@ -1050,6 +1050,46 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type } } +func (e *Exchange) QueryDepth(ctx context.Context, symbol string) (snapshot types.SliceOrderBook, finalUpdateID int64, err error) { + response, err := e.Client.NewDepthService().Symbol(symbol).Do(ctx) + if err != nil { + return snapshot, finalUpdateID, err + } + + snapshot.Symbol = symbol + finalUpdateID = response.LastUpdateID + for _, entry := range response.Bids { + // entry.Price, Quantity: entry.Quantity + price, err := fixedpoint.NewFromString(entry.Price) + if err != nil { + return snapshot, finalUpdateID, err + } + + quantity, err := fixedpoint.NewFromString(entry.Quantity) + if err != nil { + return snapshot, finalUpdateID, err + } + + snapshot.Bids = append(snapshot.Bids, types.PriceVolume{Price: price, Volume: quantity}) + } + + for _, entry := range response.Asks { + price, err := fixedpoint.NewFromString(entry.Price) + if err != nil { + return snapshot, finalUpdateID, err + } + + quantity, err := fixedpoint.NewFromString(entry.Quantity) + if err != nil { + return snapshot, finalUpdateID, err + } + + snapshot.Asks = append(snapshot.Asks, types.PriceVolume{Price: price, Volume: quantity}) + } + + return snapshot, finalUpdateID, nil +} + func (e *Exchange) BatchQueryKLines(ctx context.Context, symbol string, interval types.Interval, startTime, endTime time.Time) ([]types.KLine, error) { var allKLines []types.KLine diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index 9e3bdc3e4..515ee499b 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -11,7 +11,6 @@ import ( "time" "github.com/c9s/bbgo/pkg/depth" - "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/util" "github.com/adshao/go-binance/v2" @@ -93,17 +92,17 @@ type Stream struct { orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent) - depthFrames map[string]*depth.Buffer + depthBuffers map[string]*depth.Buffer } -func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream { +func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Client) *Stream { stream := &Stream{ StandardStream: types.StandardStream{ ReconnectC: make(chan struct{}, 1), }, Client: client, futuresClient: futuresClient, - depthFrames: make(map[string]*depth.Buffer), + depthBuffers: make(map[string]*depth.Buffer), } stream.OnDepthEvent(func(e *DepthEvent) { @@ -111,49 +110,12 @@ func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream { log.Infof("received %s depth event updateID %d ~ %d (len %d)", e.Symbol, e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID) } - f, ok := stream.depthFrames[e.Symbol] + f, ok := stream.depthBuffers[e.Symbol] if !ok { - f = depth.NewBuffer(func() (snapshot types.SliceOrderBook, finalUpdateID int64, err error) { - response, err := client.NewDepthService().Symbol(e.Symbol).Do(context.Background()) - if err != nil { - return snapshot, finalUpdateID, err - } - - snapshot.Symbol = e.Symbol - finalUpdateID = response.LastUpdateID - for _, entry := range response.Bids { - // entry.Price, Quantity: entry.Quantity - price, err := fixedpoint.NewFromString(entry.Price) - if err != nil { - return snapshot, finalUpdateID, err - } - - quantity, err := fixedpoint.NewFromString(entry.Quantity) - if err != nil { - return snapshot, finalUpdateID, err - } - - snapshot.Bids = append(snapshot.Bids, types.PriceVolume{Price: price, Volume: quantity}) - } - - for _, entry := range response.Asks { - price, err := fixedpoint.NewFromString(entry.Price) - if err != nil { - return snapshot, finalUpdateID, err - } - - quantity, err := fixedpoint.NewFromString(entry.Quantity) - if err != nil { - return snapshot, finalUpdateID, err - } - - snapshot.Asks = append(snapshot.Asks, types.PriceVolume{Price: price, Volume: quantity}) - } - - return snapshot, finalUpdateID, nil + f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) { + return ex.QueryDepth(context.Background(), e.Symbol) }) - - stream.depthFrames[e.Symbol] = f + stream.depthBuffers[e.Symbol] = f f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) { if valid, err := snapshot.IsValid(); !valid { @@ -289,7 +251,7 @@ func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream { stream.OnDisconnect(func() { log.Infof("resetting depth snapshots...") - for _, f := range stream.depthFrames { + for _, f := range stream.depthBuffers { f.Reset() } }) @@ -679,7 +641,7 @@ func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err } func (s *Stream) Close() error { - log.Infof("closing user data stream...") + log.Infof("closing stream...") if s.connCancel != nil { s.connCancel()