FIX: new SetSnapshot method for depthBuffer and use it on snapshot event

This commit is contained in:
kbearXD 2024-11-20 00:00:11 +08:00
parent 8641640ee7
commit 93f68bc3d7
3 changed files with 55 additions and 9 deletions

View File

@ -75,6 +75,32 @@ func (b *Buffer) Reset() {
b.mu.Unlock() b.mu.Unlock()
} }
func (b *Buffer) SetSnapshot(snapshot types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error {
finalUpdateID := firstUpdateID
if len(finalArgs) > 0 {
finalUpdateID = finalArgs[0]
}
b.mu.Lock()
if b.finalUpdateID >= finalUpdateID {
b.mu.Unlock()
return nil
}
// set the final update ID so that we will know if there is an update missing
b.finalUpdateID = finalUpdateID
// set the snapshot
b.snapshot = &snapshot
b.mu.Unlock()
// should unlock first then call ready
b.EmitReady(snapshot, nil)
return nil
}
// AddUpdate adds the update to the buffer or push the update to the subscriber // AddUpdate adds the update to the buffer or push the update to the subscriber
func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error { func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error {
finalUpdateID := firstUpdateID finalUpdateID := firstUpdateID

View File

@ -13,8 +13,15 @@ import (
var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry element length") var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry element length")
const Buy = 1 const (
const Sell = -1 Buy = 1
Sell = -1
)
const (
BookEventSnapshot string = "snapshot"
BookEventUpdate string = "update"
)
var parserPool fastjson.ParserPool var parserPool fastjson.ParserPool

View File

@ -255,15 +255,28 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) {
return return
} }
if e.Event == max.BookEventSnapshot {
if err := f.SetSnapshot(types.SliceOrderBook{
Symbol: symbol,
Time: e.Time(),
Bids: e.Bids,
Asks: e.Asks,
LastUpdateId: e.LastUpdateID,
}, e.FirstUpdateID, e.LastUpdateID); err != nil {
log.WithError(err).Errorf("failed to set %s snapshot", e.Market)
}
} else {
if err := f.AddUpdate(types.SliceOrderBook{ if err := f.AddUpdate(types.SliceOrderBook{
Symbol: symbol, Symbol: symbol,
Time: e.Time(), Time: e.Time(),
Bids: e.Bids, Bids: e.Bids,
Asks: e.Asks, Asks: e.Asks,
LastUpdateId: e.LastUpdateID,
}, e.FirstUpdateID, e.LastUpdateID); err != nil { }, e.FirstUpdateID, e.LastUpdateID); err != nil {
log.WithError(err).Errorf("found missing %s update event", e.Market) log.WithError(err).Errorf("found missing %s update event", e.Market)
} }
} }
}
} }
func (s *Stream) String() string { func (s *Stream) String() string {