add test and remove recovered atmoic bool

This commit is contained in:
chiahung 2023-10-13 16:50:59 +08:00
parent de1a884153
commit c5449374cd
3 changed files with 216 additions and 38 deletions

View File

@ -3,15 +3,26 @@ package grid2
import ( import (
"context" "context"
"strconv" "strconv"
"sync/atomic"
"time" "time"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
) )
type SyncActiveOrdersOpts struct {
logger *logrus.Entry
metricsLabels prometheus.Labels
activeOrderBook *bbgo.ActiveOrderBook
orderQueryService types.ExchangeOrderQueryService
exchange types.Exchange
}
func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) { func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) {
// every time we activeOrdersRecoverCh receive signal, do active orders recover // every time we activeOrdersRecoverCh receive signal, do active orders recover
s.activeOrdersRecoverCh = make(chan struct{}, 1) s.activeOrdersRecoverCh = make(chan struct{}, 1)
@ -22,6 +33,14 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) {
ticker := time.NewTicker(interval) ticker := time.NewTicker(interval)
defer ticker.Stop() defer ticker.Stop()
opts := SyncActiveOrdersOpts{
logger: s.logger,
metricsLabels: s.newPrometheusLabels(),
activeOrderBook: s.orderExecutor.ActiveMakerOrders(),
orderQueryService: s.orderQueryService,
exchange: s.session.Exchange,
}
for { for {
select { select {
@ -29,12 +48,12 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) {
return return
case <-ticker.C: case <-ticker.C:
if err := s.syncActiveOrders(ctx); err != nil { if err := syncActiveOrders(ctx, opts); err != nil {
log.WithError(err).Errorf("unable to sync active orders") log.WithError(err).Errorf("unable to sync active orders")
} }
case <-s.activeOrdersRecoverCh: case <-s.activeOrdersRecoverCh:
if err := s.syncActiveOrders(ctx); err != nil { if err := syncActiveOrders(ctx, opts); err != nil {
log.WithError(err).Errorf("unable to sync active orders") log.WithError(err).Errorf("unable to sync active orders")
} }
@ -42,50 +61,38 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) {
} }
} }
func (s *Strategy) syncActiveOrders(ctx context.Context) error { func syncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error {
s.logger.Infof("[ActiveOrderRecover] syncActiveOrders") opts.logger.Infof("[ActiveOrderRecover] syncActiveOrders")
notAddNonExistingOpenOrdersAfter := time.Now().Add(-5 * time.Minute) notAddNonExistingOpenOrdersAfter := time.Now().Add(-5 * time.Minute)
recovered := atomic.LoadInt32(&s.recovered) openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, opts.exchange, opts.activeOrderBook.Symbol)
if recovered == 0 {
s.logger.Infof("[ActiveOrderRecover] skip recovering active orders because recover not ready")
return nil
}
if s.getGrid() == nil {
return nil
}
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol)
if err != nil { if err != nil {
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time") opts.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time")
return err return errors.Wrapf(err, "[ActiveOrderRecover] failed to query open orders, skip this time")
} }
metricsNumOfOpenOrders.With(s.newPrometheusLabels()).Set(float64(len(openOrders))) metricsNumOfOpenOrders.With(opts.metricsLabels).Set(float64(len(openOrders)))
s.mu.Lock() activeOrders := opts.activeOrderBook.Orders()
defer s.mu.Unlock()
activeOrderBook := s.orderExecutor.ActiveMakerOrders()
activeOrders := activeOrderBook.Orders()
openOrdersMap := make(map[uint64]types.Order) openOrdersMap := make(map[uint64]types.Order)
for _, openOrder := range openOrders { for _, openOrder := range openOrders {
openOrdersMap[openOrder.OrderID] = openOrder openOrdersMap[openOrder.OrderID] = openOrder
} }
var errs error
// update active orders not in open orders // update active orders not in open orders
for _, activeOrder := range activeOrders { for _, activeOrder := range activeOrders {
if _, exist := openOrdersMap[activeOrder.OrderID]; exist { if _, exist := openOrdersMap[activeOrder.OrderID]; exist {
// no need to sync active order already in active orderbook, because we only need to know if it filled or not. // no need to sync active order already in active orderbook, because we only need to know if it filled or not.
delete(openOrdersMap, activeOrder.OrderID) delete(openOrdersMap, activeOrder.OrderID)
} else { } else {
s.logger.Infof("found active order #%d is not in the open orders, updating...", activeOrder.OrderID) opts.logger.Infof("found active order #%d is not in the open orders, updating...", activeOrder.OrderID)
if err := s.syncActiveOrder(ctx, activeOrderBook, activeOrder.OrderID); err != nil { if err := syncActiveOrder(ctx, opts.activeOrderBook, opts.orderQueryService, activeOrder.OrderID); err != nil {
s.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order #%d", activeOrder.OrderID) opts.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order #%d", activeOrder.OrderID)
errs = multierr.Append(errs, err)
continue continue
} }
} }
@ -98,15 +105,16 @@ func (s *Strategy) syncActiveOrders(ctx context.Context) error {
continue continue
} }
activeOrderBook.Update(openOrder) opts.activeOrderBook.Add(openOrder)
// opts.activeOrderBook.Update(openOrder)
} }
return nil return errs
} }
func (s *Strategy) syncActiveOrder(ctx context.Context, activeOrderBook *bbgo.ActiveOrderBook, orderID uint64) error { func syncActiveOrder(ctx context.Context, activeOrderBook *bbgo.ActiveOrderBook, orderQueryService types.ExchangeOrderQueryService, orderID uint64) error {
updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, s.orderQueryService, types.OrderQuery{ updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, orderQueryService, types.OrderQuery{
Symbol: s.Symbol, Symbol: activeOrderBook.Symbol,
OrderID: strconv.FormatUint(orderID, 10), OrderID: strconv.FormatUint(orderID, 10),
}) })

View File

@ -0,0 +1,176 @@
package grid2
import (
"context"
"strconv"
"testing"
"time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/types/mocks"
"github.com/golang/mock/gomock"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)
func TestSyncActiveOrders(t *testing.T) {
assert := assert.New(t)
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
symbol := "ETHUSDT"
labels := prometheus.Labels{
"exchange": "default",
"symbol": symbol,
}
t.Run("all open orders are match with active orderbook", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)
opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}
order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
}
order.Symbol = symbol
activeOrderbook.Add(order)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil)
assert.NoError(syncActiveOrders(ctx, opts))
// verify active orderbook
activeOrders := activeOrderbook.Orders()
assert.Equal(1, len(activeOrders))
assert.Equal(uint64(1), activeOrders[0].OrderID)
assert.Equal(types.OrderStatusNew, activeOrders[0].Status)
})
t.Run("there is order in active orderbook but not in open orders", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)
opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}
order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
}
updatedOrder := order
updatedOrder.Status = types.OrderStatusFilled
activeOrderbook.Add(order)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return(nil, nil)
mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{
Symbol: symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
}).Return(&updatedOrder, nil)
assert.NoError(syncActiveOrders(ctx, opts))
// verify active orderbook
activeOrders := activeOrderbook.Orders()
assert.Equal(0, len(activeOrders))
})
t.Run("there is order on open orders but not in active orderbook", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)
opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}
order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
CreationTime: types.Time(time.Now()),
}
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil)
assert.NoError(syncActiveOrders(ctx, opts))
// verify active orderbook
activeOrders := activeOrderbook.Orders()
assert.Equal(1, len(activeOrders))
assert.Equal(uint64(1), activeOrders[0].OrderID)
assert.Equal(types.OrderStatusNew, activeOrders[0].Status)
})
t.Run("there is order on open order but not in active orderbook also order in active orderbook but not on open orders", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)
opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}
order1 := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
}
updatedOrder1 := order1
updatedOrder1.Status = types.OrderStatusFilled
order2 := types.Order{
OrderID: 2,
Status: types.OrderStatusNew,
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
}
activeOrderbook.Add(order1)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order2}, nil)
mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{
Symbol: symbol,
OrderID: strconv.FormatUint(order1.OrderID, 10),
}).Return(&updatedOrder1, nil)
assert.NoError(syncActiveOrders(ctx, opts))
// verify active orderbook
activeOrders := activeOrderbook.Orders()
assert.Equal(1, len(activeOrders))
assert.Equal(uint64(2), activeOrders[0].OrderID)
assert.Equal(types.OrderStatusNew, activeOrders[0].Status)
})
}

View File

@ -9,7 +9,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@ -206,7 +205,6 @@ type Strategy struct {
tradingCtx, writeCtx context.Context tradingCtx, writeCtx context.Context
cancelWrite context.CancelFunc cancelWrite context.CancelFunc
recovered int32
activeOrdersRecoverCh chan struct{} activeOrdersRecoverCh chan struct{}
// this ensures that bbgo.Sync to lock the object // this ensures that bbgo.Sync to lock the object
@ -2028,10 +2026,6 @@ func (s *Strategy) startProcess(ctx context.Context, session *bbgo.ExchangeSessi
} }
func (s *Strategy) recoverGrid(ctx context.Context, session *bbgo.ExchangeSession) error { func (s *Strategy) recoverGrid(ctx context.Context, session *bbgo.ExchangeSession) error {
defer func() {
atomic.AddInt32(&s.recovered, 1)
}()
if s.RecoverGridByScanningTrades { if s.RecoverGridByScanningTrades {
s.debugLog("recovering grid by scanning trades") s.debugLog("recovering grid by scanning trades")
return s.recoverByScanningTrades(ctx, session) return s.recoverByScanningTrades(ctx, session)