binance: refactor binance depthBuffer with depth query

This commit is contained in:
c9s 2021-12-25 02:10:13 +08:00
parent 7e7115b18f
commit 217499528d
3 changed files with 75 additions and 48 deletions

View File

@ -50,14 +50,39 @@ var orderbookCmd = &cobra.Command{
return fmt.Errorf("session %s not found", sessionName)
}
orderBook := types.NewMutexOrderBook(symbol)
s := session.Exchange.NewStream()
s.SetPublicOnly()
s.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{})
s.OnBookSnapshot(func(book types.SliceOrderBook) {
log.Infof("orderbook snapshot: %s", book.String())
orderBook.Load(book)
if ok, err := orderBook.IsValid() ; !ok {
log.WithError(err).Panicf("invalid error book snapshot")
}
if bid, ask, ok := orderBook.BestBidAndAsk() ; ok {
log.Infof("ASK | %f x %f / %f x %f | BID",
ask.Volume.Float64(), ask.Price.Float64(),
bid.Price.Float64(), bid.Volume.Float64())
}
})
s.OnBookUpdate(func(book types.SliceOrderBook) {
log.Infof("orderbook update: %s", book.String())
orderBook.Update(book)
if ok, err := orderBook.IsValid() ; !ok {
log.WithError(err).Panicf("invalid error book update")
}
if bid, ask, ok := orderBook.BestBidAndAsk() ; ok {
log.Infof("ASK | %f x %f / %f x %f | BID",
ask.Volume.Float64(), ask.Price.Float64(),
bid.Price.Float64(), bid.Volume.Float64())
}
})
log.Infof("connecting...")

View File

@ -171,7 +171,7 @@ func (e *Exchange) QueryAveragePrice(ctx context.Context, symbol string) (float6
}
func (e *Exchange) NewStream() types.Stream {
stream := NewStream(e.Client, e.futuresClient)
stream := NewStream(e, e.Client, e.futuresClient)
stream.MarginSettings = e.MarginSettings
stream.FuturesSettings = e.FuturesSettings
return stream
@ -1050,6 +1050,46 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
}
}
func (e *Exchange) QueryDepth(ctx context.Context, symbol string) (snapshot types.SliceOrderBook, finalUpdateID int64, err error) {
response, err := e.Client.NewDepthService().Symbol(symbol).Do(ctx)
if err != nil {
return snapshot, finalUpdateID, err
}
snapshot.Symbol = symbol
finalUpdateID = response.LastUpdateID
for _, entry := range response.Bids {
// entry.Price, Quantity: entry.Quantity
price, err := fixedpoint.NewFromString(entry.Price)
if err != nil {
return snapshot, finalUpdateID, err
}
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
return snapshot, finalUpdateID, err
}
snapshot.Bids = append(snapshot.Bids, types.PriceVolume{Price: price, Volume: quantity})
}
for _, entry := range response.Asks {
price, err := fixedpoint.NewFromString(entry.Price)
if err != nil {
return snapshot, finalUpdateID, err
}
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
return snapshot, finalUpdateID, err
}
snapshot.Asks = append(snapshot.Asks, types.PriceVolume{Price: price, Volume: quantity})
}
return snapshot, finalUpdateID, nil
}
func (e *Exchange) BatchQueryKLines(ctx context.Context, symbol string, interval types.Interval, startTime, endTime time.Time) ([]types.KLine, error) {
var allKLines []types.KLine

View File

@ -11,7 +11,6 @@ import (
"time"
"github.com/c9s/bbgo/pkg/depth"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/util"
"github.com/adshao/go-binance/v2"
@ -93,17 +92,17 @@ type Stream struct {
orderTradeUpdateEventCallbacks []func(e *OrderTradeUpdateEvent)
depthFrames map[string]*depth.Buffer
depthBuffers map[string]*depth.Buffer
}
func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream {
func NewStream(ex *Exchange, client *binance.Client, futuresClient *futures.Client) *Stream {
stream := &Stream{
StandardStream: types.StandardStream{
ReconnectC: make(chan struct{}, 1),
},
Client: client,
futuresClient: futuresClient,
depthFrames: make(map[string]*depth.Buffer),
depthBuffers: make(map[string]*depth.Buffer),
}
stream.OnDepthEvent(func(e *DepthEvent) {
@ -111,49 +110,12 @@ func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream {
log.Infof("received %s depth event updateID %d ~ %d (len %d)", e.Symbol, e.FirstUpdateID, e.FinalUpdateID, e.FinalUpdateID-e.FirstUpdateID)
}
f, ok := stream.depthFrames[e.Symbol]
f, ok := stream.depthBuffers[e.Symbol]
if !ok {
f = depth.NewBuffer(func() (snapshot types.SliceOrderBook, finalUpdateID int64, err error) {
response, err := client.NewDepthService().Symbol(e.Symbol).Do(context.Background())
if err != nil {
return snapshot, finalUpdateID, err
}
snapshot.Symbol = e.Symbol
finalUpdateID = response.LastUpdateID
for _, entry := range response.Bids {
// entry.Price, Quantity: entry.Quantity
price, err := fixedpoint.NewFromString(entry.Price)
if err != nil {
return snapshot, finalUpdateID, err
}
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
return snapshot, finalUpdateID, err
}
snapshot.Bids = append(snapshot.Bids, types.PriceVolume{Price: price, Volume: quantity})
}
for _, entry := range response.Asks {
price, err := fixedpoint.NewFromString(entry.Price)
if err != nil {
return snapshot, finalUpdateID, err
}
quantity, err := fixedpoint.NewFromString(entry.Quantity)
if err != nil {
return snapshot, finalUpdateID, err
}
snapshot.Asks = append(snapshot.Asks, types.PriceVolume{Price: price, Volume: quantity})
}
return snapshot, finalUpdateID, nil
f = depth.NewBuffer(func() (types.SliceOrderBook, int64, error) {
return ex.QueryDepth(context.Background(), e.Symbol)
})
stream.depthFrames[e.Symbol] = f
stream.depthBuffers[e.Symbol] = f
f.OnReady(func(snapshot types.SliceOrderBook, updates []depth.Update) {
if valid, err := snapshot.IsValid(); !valid {
@ -289,7 +251,7 @@ func NewStream(client *binance.Client, futuresClient *futures.Client) *Stream {
stream.OnDisconnect(func() {
log.Infof("resetting depth snapshots...")
for _, f := range stream.depthFrames {
for _, f := range stream.depthBuffers {
f.Reset()
}
})
@ -679,7 +641,7 @@ func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err
}
func (s *Stream) Close() error {
log.Infof("closing user data stream...")
log.Infof("closing stream...")
if s.connCancel != nil {
s.connCancel()