fix stream book usage

This commit is contained in:
c9s 2021-01-25 14:13:39 +08:00
parent 1aefbbfddc
commit b99c01a03f
3 changed files with 43 additions and 25 deletions

View File

@ -58,15 +58,22 @@ var rootCmd = &cobra.Command{
streambook := types.NewStreamBook(symbol)
streambook.BindStream(stream)
streambook.OnUpdate(func(book *types.OrderBook) {
bestBid, hasBid := book.BestBid()
bestAsk, hasAsk := book.BestAsk()
if valid, err := book.IsValid(); !valid {
go func() {
for {
select {
case <-ctx.Done():
return
case <-streambook.C:
if valid, err := streambook.IsValid(); !valid {
log.Errorf("order book is invalid, error: %v", err)
return
}
bestBid, hasBid := streambook.BestBid()
bestAsk, hasAsk := streambook.BestAsk()
if hasBid && hasAsk {
log.Infof("================================")
log.Infof("best ask %f % -12f",
@ -78,7 +85,10 @@ var rootCmd = &cobra.Command{
bestBid.Volume.Float64(),
)
}
})
}
}
}()
log.Info("connecting websocket...")
if err := stream.Connect(ctx); err != nil {

View File

@ -90,7 +90,7 @@ func NewStream(client *binance.Client) *Stream {
f.OnReady(func(e DepthEvent, bufEvents []DepthEvent) {
snapshot, err := e.OrderBook()
if err != nil {
log.WithError(err).Error("book convert error")
log.WithError(err).Error("book snapshot convert error")
return
}

View File

@ -171,8 +171,8 @@ func (b *OrderBook) PriceVolumesBySide(side SideType) PriceVolumeSlice {
func (b *OrderBook) Copy() (book OrderBook) {
book = *b
book.Bids = b.Bids.Copy()
book.Asks = b.Asks.Copy()
book.Bids = book.Bids.Copy()
book.Asks = book.Asks.Copy()
return book
}
@ -250,20 +250,28 @@ func (b *MutexOrderBook) Load(book OrderBook) {
b.Lock()
defer b.Unlock()
b.Reset()
b.update(book)
b.OrderBook.Reset()
b.OrderBook.update(book)
b.EmitLoad(b.OrderBook)
}
func (b *MutexOrderBook) Reset() {
b.Lock()
b.OrderBook.Reset()
b.Unlock()
}
func (b *MutexOrderBook) Get() OrderBook {
b.Lock()
defer b.Unlock()
return b.OrderBook.Copy()
}
func (b *MutexOrderBook) Update(book OrderBook) {
func (b *MutexOrderBook) Update(update OrderBook) {
b.Lock()
defer b.Unlock()
b.update(book)
b.OrderBook.update(update)
b.EmitUpdate(b.OrderBook)
}