Merge pull request #1835 from c9s/kbearXD/max/not-use-snapshot-event
Some checks are pending
Go / build (1.21, 6.2) (push) Waiting to run
golang-lint / lint (push) Waiting to run

FEATURE: [max] new  method for depth buffer to set snapshot
This commit is contained in:
kbearXD 2024-11-20 14:04:59 +08:00 committed by GitHub
commit 505decce48
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 55 additions and 9 deletions

View File

@ -75,6 +75,32 @@ func (b *Buffer) Reset() {
b.mu.Unlock() b.mu.Unlock()
} }
func (b *Buffer) SetSnapshot(snapshot types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error {
finalUpdateID := firstUpdateID
if len(finalArgs) > 0 {
finalUpdateID = finalArgs[0]
}
b.mu.Lock()
if b.finalUpdateID >= finalUpdateID {
b.mu.Unlock()
return nil
}
// set the final update ID so that we will know if there is an update missing
b.finalUpdateID = finalUpdateID
// set the snapshot
b.snapshot = &snapshot
b.mu.Unlock()
// should unlock first then call ready
b.EmitReady(snapshot, nil)
return nil
}
// AddUpdate adds the update to the buffer or push the update to the subscriber // AddUpdate adds the update to the buffer or push the update to the subscriber
func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error { func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error {
finalUpdateID := firstUpdateID finalUpdateID := firstUpdateID

View File

@ -13,8 +13,15 @@ import (
var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry element length") var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry element length")
const Buy = 1 const (
const Sell = -1 Buy = 1
Sell = -1
)
const (
BookEventSnapshot string = "snapshot"
BookEventUpdate string = "update"
)
var parserPool fastjson.ParserPool var parserPool fastjson.ParserPool

View File

@ -255,15 +255,28 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) {
return return
} }
if e.Event == max.BookEventSnapshot {
if err := f.SetSnapshot(types.SliceOrderBook{
Symbol: symbol,
Time: e.Time(),
Bids: e.Bids,
Asks: e.Asks,
LastUpdateId: e.LastUpdateID,
}, e.FirstUpdateID, e.LastUpdateID); err != nil {
log.WithError(err).Errorf("failed to set %s snapshot", e.Market)
}
} else {
if err := f.AddUpdate(types.SliceOrderBook{ if err := f.AddUpdate(types.SliceOrderBook{
Symbol: symbol, Symbol: symbol,
Time: e.Time(), Time: e.Time(),
Bids: e.Bids, Bids: e.Bids,
Asks: e.Asks, Asks: e.Asks,
LastUpdateId: e.LastUpdateID,
}, e.FirstUpdateID, e.LastUpdateID); err != nil { }, e.FirstUpdateID, e.LastUpdateID); err != nil {
log.WithError(err).Errorf("found missing %s update event", e.Market) log.WithError(err).Errorf("found missing %s update event", e.Market)
} }
} }
}
} }
func (s *Stream) String() string { func (s *Stream) String() string {