From 7a045a48d4b56d149e762f114e9ec897e0cb0503 Mon Sep 17 00:00:00 2001 From: zenix Date: Wed, 8 Jun 2022 12:14:53 +0900 Subject: [PATCH] fix: drift window in factorzoo, order_execution print order, refactor: use defer to mu.Unlock in depth/buffer.go --- pkg/bbgo/order_execution.go | 8 ++++---- pkg/depth/buffer.go | 8 ++------ pkg/strategy/factorzoo/strategy.go | 6 +++++- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/bbgo/order_execution.go b/pkg/bbgo/order_execution.go index a4b56fff6..940b69df5 100644 --- a/pkg/bbgo/order_execution.go +++ b/pkg/bbgo/order_execution.go @@ -85,9 +85,9 @@ func (e *ExchangeOrderExecutor) notifySubmitOrders(orders ...types.SubmitOrder) // pass submit order as an interface object. channel, ok := e.RouteObject(&order) if ok { - e.NotifyTo(channel, ":memo: Submitting %s %s %s order with quantity: %f @ %f", order.Symbol, order.Type, order.Side, order.Quantity.Float64(), order.Price.Float64(), &order) + e.NotifyTo(channel, ":memo: Submitting %s %s %s order with quantity: %f @ %f, order: %v", order.Symbol, order.Type, order.Side, order.Quantity.Float64(), order.Price.Float64(), &order) } else { - e.Notify(":memo: Submitting %s %s %s order with quantity: %f @ %f", order.Symbol, order.Type, order.Side, order.Quantity.Float64(), order.Price.Float64(), &order) + e.Notify(":memo: Submitting %s %s %s order with quantity: %f @ %f, order: %v", order.Symbol, order.Type, order.Side, order.Quantity.Float64(), order.Price.Float64(), &order) } } } @@ -102,9 +102,9 @@ func (e *ExchangeOrderExecutor) SubmitOrders(ctx context.Context, orders ...type // pass submit order as an interface object. channel, ok := e.RouteObject(&order) if ok { - e.NotifyTo(channel, ":memo: Submitting %s %s %s order with quantity: %f", order.Symbol, order.Type, order.Side, order.Quantity.Float64(), order) + e.NotifyTo(channel, ":memo: Submitting %s %s %s order with quantity: %f, order: %v", order.Symbol, order.Type, order.Side, order.Quantity.Float64(), &order) } else { - e.Notify(":memo: Submitting %s %s %s order with quantity: %f", order.Symbol, order.Type, order.Side, order.Quantity.Float64(), order) + e.Notify(":memo: Submitting %s %s %s order with quantity: %f: %v", order.Symbol, order.Type, order.Side, order.Quantity.Float64(), &order) } log.Infof("submitting order: %s", order.String()) diff --git a/pkg/depth/buffer.go b/pkg/depth/buffer.go index df635da70..26e74edf7 100644 --- a/pkg/depth/buffer.go +++ b/pkg/depth/buffer.go @@ -94,6 +94,7 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg // we lock here because there might be 2+ calls to the AddUpdate method // we don't want to reset sync.Once 2 times here b.mu.Lock() + defer b.mu.Unlock() select { case <-b.resetC: log.Warnf("received depth reset signal, resetting...") @@ -109,7 +110,6 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg b.once.Do(func() { go b.tryFetch() }) - b.mu.Unlock() return nil } @@ -120,7 +120,6 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg finalUpdateID = b.finalUpdateID b.resetSnapshot() b.emitReset() - b.mu.Unlock() return fmt.Errorf("found missing update between finalUpdateID %d and firstUpdateID %d, diff: %d", finalUpdateID+1, u.FirstUpdateID, @@ -130,7 +129,6 @@ func (b *Buffer) AddUpdate(o types.SliceOrderBook, firstUpdateID int64, finalArg log.Debugf("depth update id %d -> %d", b.finalUpdateID, u.FinalUpdateID) b.finalUpdateID = u.FinalUpdateID b.EmitPush(u) - b.mu.Unlock() return nil } @@ -143,13 +141,13 @@ func (b *Buffer) fetchAndPush() error { log.Debugf("fetched depth snapshot, final update id %d", finalUpdateID) b.mu.Lock() + defer b.mu.Unlock() if len(b.buffer) > 0 { // the snapshot is too early if finalUpdateID < b.buffer[0].FirstUpdateID { b.resetSnapshot() b.emitReset() - b.mu.Unlock() return fmt.Errorf("depth snapshot is too early, final update %d is < the first update id %d", finalUpdateID, b.buffer[0].FirstUpdateID) } } @@ -164,7 +162,6 @@ func (b *Buffer) fetchAndPush() error { if u.FirstUpdateID > finalUpdateID+1 { b.resetSnapshot() b.emitReset() - b.mu.Unlock() return fmt.Errorf("there is a missing depth update, the update id %d > final update id %d + 1", u.FirstUpdateID, finalUpdateID) } @@ -183,7 +180,6 @@ func (b *Buffer) fetchAndPush() error { // set the snapshot b.snapshot = &book - b.mu.Unlock() b.EmitReady(book, pushUpdates) return nil } diff --git a/pkg/strategy/factorzoo/strategy.go b/pkg/strategy/factorzoo/strategy.go index 0e86843ef..f4b049418 100644 --- a/pkg/strategy/factorzoo/strategy.go +++ b/pkg/strategy/factorzoo/strategy.go @@ -154,7 +154,11 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se // s.pvDivergence.OnUpdate(func(corr float64) { // //fmt.Printf("now we've got corr: %f\n", corr) // }) - drift := &indicator.Drift{IntervalWindow: types.IntervalWindow{Window: 14, Interval: s.Interval}} + windowSize := 360/s.Interval.Minutes() + if windowSize == 0 { + windowSize = 3 + } + drift := &indicator.Drift{IntervalWindow: types.IntervalWindow{Window: windowSize, Interval: s.Interval}} drift.Bind(st) s.Alpha = [][]float64{{}, {}, {}, {}, {}, {}}