call Reset instead of replacing the whole map

the reason is that we have the update worker, which is already started.
This commit is contained in:
c9s 2021-01-24 14:09:07 +08:00
parent 2b441ad3bc
commit 50fc1fd3ac
2 changed files with 25 additions and 13 deletions

View File

@ -11,7 +11,8 @@ import (
//go:generate callbackgen -type DepthFrame //go:generate callbackgen -type DepthFrame
type DepthFrame struct { type DepthFrame struct {
client *binance.Client client *binance.Client
context context.Context
mu sync.Mutex mu sync.Mutex
once sync.Once once sync.Once
@ -26,12 +27,12 @@ type DepthFrame struct {
func (f *DepthFrame) Reset() { func (f *DepthFrame) Reset() {
f.mu.Lock() f.mu.Lock()
f.SnapshotDepth = nil f.SnapshotDepth = nil
f.once = sync.Once{} f.BufEvents = nil
f.mu.Unlock() f.mu.Unlock()
} }
func (f *DepthFrame) loadDepthSnapshot() { func (f *DepthFrame) loadDepthSnapshot() {
depth, err := f.fetch(context.Background()) depth, err := f.fetch(f.context)
if err != nil { if err != nil {
return return
} }
@ -75,6 +76,7 @@ func (f *DepthFrame) PushEvent(e DepthEvent) {
// before the snapshot is loaded, we need to buffer the events until we loaded the snapshot. // before the snapshot is loaded, we need to buffer the events until we loaded the snapshot.
if f.SnapshotDepth == nil { if f.SnapshotDepth == nil {
// buffer the events until we loaded the snapshot
f.BufEvents = append(f.BufEvents, e) f.BufEvents = append(f.BufEvents, e)
f.mu.Unlock() f.mu.Unlock()
@ -84,16 +86,15 @@ func (f *DepthFrame) PushEvent(e DepthEvent) {
log.Infof("starting depth snapshot updater for %s market", f.Symbol) log.Infof("starting depth snapshot updater for %s market", f.Symbol)
} }
ctx := context.Background()
f.loadDepthSnapshot() f.loadDepthSnapshot()
ticker := time.NewTicker(1*time.Minute + time.Duration(rand.Intn(10))*time.Millisecond) ticker := time.NewTicker(1*time.Minute + time.Duration(rand.Intn(10))*time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-ctx.Done(): case <-f.context.Done():
return return
case <-ticker.C: case <-ticker.C:
f.loadDepthSnapshot() f.loadDepthSnapshot()
} }
@ -110,6 +111,10 @@ func (f *DepthFrame) PushEvent(e DepthEvent) {
// if the first update ID > final update ID + 1, it means something is missing, we need to reload. // if the first update ID > final update ID + 1, it means something is missing, we need to reload.
if e.FirstUpdateID > f.SnapshotDepth.FinalUpdateID+1 { if e.FirstUpdateID > f.SnapshotDepth.FinalUpdateID+1 {
if debugBinanceDepth {
log.Warnf("event first update id %d > final update id + 1 (%d), resetting snapshot", e.FirstUpdateID, f.SnapshotDepth.FirstUpdateID+1)
}
f.SnapshotDepth = nil f.SnapshotDepth = nil
f.mu.Unlock() f.mu.Unlock()
return return

View File

@ -74,24 +74,28 @@ func NewStream(client *binance.Client) *Stream {
depthFrames: make(map[string]*DepthFrame), depthFrames: make(map[string]*DepthFrame),
} }
stream.OnConnect(func() {
// clear the previous frames
stream.depthFrames = make(map[string]*DepthFrame)
})
stream.OnDepthEvent(func(e *DepthEvent) { stream.OnDepthEvent(func(e *DepthEvent) {
f, ok := stream.depthFrames[e.Symbol] f, ok := stream.depthFrames[e.Symbol]
if !ok { if !ok {
f = &DepthFrame{ f = &DepthFrame{
client: client, client: client,
context: context.Background(),
Symbol: e.Symbol, Symbol: e.Symbol,
} }
stream.depthFrames[e.Symbol] = f
f.OnReady(func(e DepthEvent, bufEvents []DepthEvent) { f.OnReady(func(e DepthEvent, bufEvents []DepthEvent) {
snapshot, err := e.OrderBook() snapshot, err := e.OrderBook()
if err != nil { if err != nil {
log.WithError(err).Error("book convert error") log.WithError(err).Error("book convert error")
return return
} }
if !snapshot.IsValid() {
log.Warnf("depth snapshot is invalid, event: %+v", e)
}
stream.EmitBookSnapshot(snapshot) stream.EmitBookSnapshot(snapshot)
for _, e := range bufEvents { for _, e := range bufEvents {
@ -114,7 +118,6 @@ func NewStream(client *binance.Client) *Stream {
stream.EmitBookUpdate(book) stream.EmitBookUpdate(book)
}) })
stream.depthFrames[e.Symbol] = f
} else { } else {
f.PushEvent(*e) f.PushEvent(*e)
} }
@ -168,6 +171,11 @@ func NewStream(client *binance.Client) *Stream {
}) })
stream.OnConnect(func() { stream.OnConnect(func() {
// reset the previous frames
for _, f := range stream.depthFrames {
f.Reset()
}
var params []string var params []string
for _, subscription := range stream.Subscriptions { for _, subscription := range stream.Subscriptions {
params = append(params, convertSubscription(subscription)) params = append(params, convertSubscription(subscription))
@ -452,4 +460,3 @@ func maskListenKey(listenKey string) string {
} }
//go:generate callbackgen -type DepthFrame //go:generate callbackgen -type DepthFrame