From 93f68bc3d741a62b83750b9f77440ac242dbe852 Mon Sep 17 00:00:00 2001 From: kbearXD Date: Wed, 20 Nov 2024 00:00:11 +0800 Subject: [PATCH] FIX: new SetSnapshot method for depthBuffer and use it on snapshot event --- pkg/depth/buffer.go | 26 +++++++++++++++++++++++ pkg/exchange/max/maxapi/public_parser.go | 11 ++++++++-- pkg/exchange/max/stream.go | 27 ++++++++++++++++++------ 3 files changed, 55 insertions(+), 9 deletions(-) diff --git a/pkg/depth/buffer.go b/pkg/depth/buffer.go index 05ab39faa..c6d36c197 100644 --- a/pkg/depth/buffer.go +++ b/pkg/depth/buffer.go @@ -75,6 +75,32 @@ func (b *Buffer) Reset() { b.mu.Unlock() } +func (b *Buffer) SetSnapshot(snapshot types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error { + finalUpdateID := firstUpdateID + if len(finalArgs) > 0 { + finalUpdateID = finalArgs[0] + } + + b.mu.Lock() + + if b.finalUpdateID >= finalUpdateID { + b.mu.Unlock() + return nil + } + + // set the final update ID so that we will know if there is an update missing + b.finalUpdateID = finalUpdateID + + // set the snapshot + b.snapshot = &snapshot + + b.mu.Unlock() + + // should unlock first then call ready + b.EmitReady(snapshot, nil) + return nil +} + // AddUpdate adds the update to the buffer or push the update to the subscriber func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArgs ...int64) error { finalUpdateID := firstUpdateID diff --git a/pkg/exchange/max/maxapi/public_parser.go b/pkg/exchange/max/maxapi/public_parser.go index 55b7fe2d8..d9c6ace92 100644 --- a/pkg/exchange/max/maxapi/public_parser.go +++ b/pkg/exchange/max/maxapi/public_parser.go @@ -13,8 +13,15 @@ import ( var ErrIncorrectBookEntryElementLength = errors.New("incorrect book entry element length") -const Buy = 1 -const Sell = -1 +const ( + Buy = 1 + Sell = -1 +) + +const ( + BookEventSnapshot string = "snapshot" + BookEventUpdate string = "update" +) var parserPool fastjson.ParserPool diff --git a/pkg/exchange/max/stream.go b/pkg/exchange/max/stream.go index d29d46570..9a78732b2 100644 --- a/pkg/exchange/max/stream.go +++ b/pkg/exchange/max/stream.go @@ -255,13 +255,26 @@ func (s *Stream) handleBookEvent(ex *Exchange) func(e max.BookEvent) { return } - if err := f.AddUpdate(types.SliceOrderBook{ - Symbol: symbol, - Time: e.Time(), - Bids: e.Bids, - Asks: e.Asks, - }, e.FirstUpdateID, e.LastUpdateID); err != nil { - log.WithError(err).Errorf("found missing %s update event", e.Market) + if e.Event == max.BookEventSnapshot { + if err := f.SetSnapshot(types.SliceOrderBook{ + Symbol: symbol, + Time: e.Time(), + Bids: e.Bids, + Asks: e.Asks, + LastUpdateId: e.LastUpdateID, + }, e.FirstUpdateID, e.LastUpdateID); err != nil { + log.WithError(err).Errorf("failed to set %s snapshot", e.Market) + } + } else { + if err := f.AddUpdate(types.SliceOrderBook{ + Symbol: symbol, + Time: e.Time(), + Bids: e.Bids, + Asks: e.Asks, + LastUpdateId: e.LastUpdateID, + }, e.FirstUpdateID, e.LastUpdateID); err != nil { + log.WithError(err).Errorf("found missing %s update event", e.Market) + } } } }