FEATURE: merge active orders recover and grid recover logic

This commit is contained in:
chiahung 2023-10-20 14:34:08 +08:00
parent f8c47f72bf
commit 4a1fe80f5e
7 changed files with 353 additions and 852 deletions

View File

@ -1,159 +0,0 @@
package grid2
import (
"context"
"strconv"
"time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
)
type SyncActiveOrdersOpts struct {
logger *logrus.Entry
metricsLabels prometheus.Labels
activeOrderBook *bbgo.ActiveOrderBook
orderQueryService types.ExchangeOrderQueryService
exchange types.Exchange
}
func (s *Strategy) initializeRecoverCh() bool {
s.mu.Lock()
defer s.mu.Unlock()
isInitialize := false
if s.activeOrdersRecoverC == nil {
s.logger.Info("initializing recover channel")
s.activeOrdersRecoverC = make(chan struct{}, 1)
} else {
s.logger.Info("recover channel is already initialized, trigger active orders recover")
isInitialize = true
select {
case s.activeOrdersRecoverC <- struct{}{}:
s.logger.Info("trigger active orders recover")
default:
s.logger.Info("activeOrdersRecoverC is full")
}
}
return isInitialize
}
func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) {
// every time we activeOrdersRecoverC receive signal, do active orders recover
if isInitialize := s.initializeRecoverCh(); isInitialize {
return
}
// make ticker's interval random in 25 min ~ 35 min
interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000)
s.logger.Infof("[ActiveOrderRecover] interval: %s", interval)
ticker := time.NewTicker(interval)
defer ticker.Stop()
opts := SyncActiveOrdersOpts{
logger: s.logger,
metricsLabels: s.newPrometheusLabels(),
activeOrderBook: s.orderExecutor.ActiveMakerOrders(),
orderQueryService: s.orderQueryService,
exchange: s.session.Exchange,
}
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := syncActiveOrders(ctx, opts); err != nil {
log.WithError(err).Errorf("unable to sync active orders")
}
case <-s.activeOrdersRecoverC:
if err := syncActiveOrders(ctx, opts); err != nil {
log.WithError(err).Errorf("unable to sync active orders")
}
}
}
}
func syncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error {
opts.logger.Infof("[ActiveOrderRecover] syncActiveOrders")
notAddNonExistingOpenOrdersAfter := time.Now().Add(-5 * time.Minute)
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, opts.exchange, opts.activeOrderBook.Symbol)
if err != nil {
opts.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time")
return errors.Wrapf(err, "[ActiveOrderRecover] failed to query open orders, skip this time")
}
if metricsNumOfOpenOrders != nil {
metricsNumOfOpenOrders.With(opts.metricsLabels).Set(float64(len(openOrders)))
}
activeOrders := opts.activeOrderBook.Orders()
openOrdersMap := make(map[uint64]types.Order)
for _, openOrder := range openOrders {
openOrdersMap[openOrder.OrderID] = openOrder
}
var errs error
// update active orders not in open orders
for _, activeOrder := range activeOrders {
if _, exist := openOrdersMap[activeOrder.OrderID]; exist {
// no need to sync active order already in active orderbook, because we only need to know if it filled or not.
delete(openOrdersMap, activeOrder.OrderID)
} else {
opts.logger.Infof("found active order #%d is not in the open orders, updating...", activeOrder.OrderID)
// sleep 100ms to avoid DDOS
time.Sleep(100 * time.Millisecond)
if err := syncActiveOrder(ctx, opts.activeOrderBook, opts.orderQueryService, activeOrder.OrderID); err != nil {
opts.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order #%d", activeOrder.OrderID)
errs = multierr.Append(errs, err)
continue
}
}
}
// update open orders not in active orders
for _, openOrder := range openOrdersMap {
// we don't add open orders into active orderbook if updated in 5 min
if openOrder.UpdateTime.After(notAddNonExistingOpenOrdersAfter) {
continue
}
opts.activeOrderBook.Add(openOrder)
// opts.activeOrderBook.Update(openOrder)
}
return errs
}
func syncActiveOrder(ctx context.Context, activeOrderBook *bbgo.ActiveOrderBook, orderQueryService types.ExchangeOrderQueryService, orderID uint64) error {
updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, orderQueryService, types.OrderQuery{
Symbol: activeOrderBook.Symbol,
OrderID: strconv.FormatUint(orderID, 10),
})
if err != nil {
return err
}
activeOrderBook.Update(*updatedOrder)
return nil
}

View File

@ -1,176 +0,0 @@
package grid2
import (
"context"
"strconv"
"testing"
"time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/types/mocks"
"github.com/golang/mock/gomock"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
)
func TestSyncActiveOrders(t *testing.T) {
assert := assert.New(t)
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
symbol := "ETHUSDT"
labels := prometheus.Labels{
"exchange": "default",
"symbol": symbol,
}
t.Run("all open orders are match with active orderbook", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)
opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}
order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
}
order.Symbol = symbol
activeOrderbook.Add(order)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil)
assert.NoError(syncActiveOrders(ctx, opts))
// verify active orderbook
activeOrders := activeOrderbook.Orders()
assert.Equal(1, len(activeOrders))
assert.Equal(uint64(1), activeOrders[0].OrderID)
assert.Equal(types.OrderStatusNew, activeOrders[0].Status)
})
t.Run("there is order in active orderbook but not in open orders", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)
opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}
order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
}
updatedOrder := order
updatedOrder.Status = types.OrderStatusFilled
activeOrderbook.Add(order)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return(nil, nil)
mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{
Symbol: symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
}).Return(&updatedOrder, nil)
assert.NoError(syncActiveOrders(ctx, opts))
// verify active orderbook
activeOrders := activeOrderbook.Orders()
assert.Equal(0, len(activeOrders))
})
t.Run("there is order on open orders but not in active orderbook", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)
opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}
order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
CreationTime: types.Time(time.Now()),
}
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil)
assert.NoError(syncActiveOrders(ctx, opts))
// verify active orderbook
activeOrders := activeOrderbook.Orders()
assert.Equal(1, len(activeOrders))
assert.Equal(uint64(1), activeOrders[0].OrderID)
assert.Equal(types.OrderStatusNew, activeOrders[0].Status)
})
t.Run("there is order on open order but not in active orderbook also order in active orderbook but not on open orders", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)
opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}
order1 := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
}
updatedOrder1 := order1
updatedOrder1.Status = types.OrderStatusFilled
order2 := types.Order{
OrderID: 2,
Status: types.OrderStatusNew,
SubmitOrder: types.SubmitOrder{
Symbol: symbol,
},
}
activeOrderbook.Add(order1)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order2}, nil)
mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{
Symbol: symbol,
OrderID: strconv.FormatUint(order1.OrderID, 10),
}).Return(&updatedOrder1, nil)
assert.NoError(syncActiveOrders(ctx, opts))
// verify active orderbook
activeOrders := activeOrderbook.Orders()
assert.Equal(1, len(activeOrders))
assert.Equal(uint64(2), activeOrders[0].OrderID)
assert.Equal(types.OrderStatusNew, activeOrders[0].Status)
})
}

View File

@ -22,7 +22,6 @@ type GridProfitStats struct {
TotalFee map[string]fixedpoint.Value `json:"totalFee,omitempty"` TotalFee map[string]fixedpoint.Value `json:"totalFee,omitempty"`
Volume fixedpoint.Value `json:"volume,omitempty"` Volume fixedpoint.Value `json:"volume,omitempty"`
Market types.Market `json:"market,omitempty"` Market types.Market `json:"market,omitempty"`
ProfitEntries []*GridProfit `json:"profitEntries,omitempty"`
Since *time.Time `json:"since,omitempty"` Since *time.Time `json:"since,omitempty"`
InitialOrderID uint64 `json:"initialOrderID"` InitialOrderID uint64 `json:"initialOrderID"`
} }
@ -38,7 +37,6 @@ func newGridProfitStats(market types.Market) *GridProfitStats {
TotalFee: make(map[string]fixedpoint.Value), TotalFee: make(map[string]fixedpoint.Value),
Volume: fixedpoint.Zero, Volume: fixedpoint.Zero,
Market: market, Market: market,
ProfitEntries: nil,
} }
} }
@ -69,8 +67,6 @@ func (s *GridProfitStats) AddProfit(profit *GridProfit) {
case s.Market.BaseCurrency: case s.Market.BaseCurrency:
s.TotalBaseProfit = s.TotalBaseProfit.Add(profit.Profit) s.TotalBaseProfit = s.TotalBaseProfit.Add(profit.Profit)
} }
s.ProfitEntries = append(s.ProfitEntries, profit)
} }
func (s *GridProfitStats) SlackAttachment() slack.Attachment { func (s *GridProfitStats) SlackAttachment() slack.Attachment {

View File

@ -6,41 +6,121 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/pkg/errors"
) )
func (s *Strategy) recoverByScanningTrades(ctx context.Context, session *bbgo.ExchangeSession) error { /*
defer func() { Background knowledge
s.updateGridNumOfOrdersMetricsWithLock() 1. active orderbook add orders only when receive new order event or call Add/Update method manually
}() 2. active orderbook remove orders only when receive filled/cancelled event or call Remove/Update method manually
historyService, implemented := session.Exchange.(types.ExchangeTradeHistoryService) As a result
1. at the same twin-order-price, there is order in open orders but not in active orderbook
- not receive new order event
=> add order into active orderbook
2. at the same twin-order-price, there is order in active orderbook but not in open orders
- not receive filled event
=> query the filled order and call Update method
3. at the same twin-order-price, there is no order in open orders and no order in active orderbook
- failed to create the order
=> query the last order from trades to emit filled, and it will submit again
- not receive new order event and the order filled before we find it.
=> query the untracked order (also is the last order) from trades to emit filled and it will submit the reversed order
4. at the same twin-order-price, there are different orders in open orders and active orderbook
- should not happen !!!
=> log error
5. at the same twin-order-price, there is the same order in open orders and active orderbook
- normal case
=> no need to do anything
After killing pod, active orderbook must be empty. we can think it is the same as not receive new event.
Process
1. build twin orderbook with pins and open orders.
2. build twin orderbook with pins and active orders.
3. compare above twin orderbooks to add open orders into active orderbook and update active orders.
4. run grid recover to make sure all the twin price has its order.
*/
func (s *Strategy) initializeRecoverC() bool {
s.mu.Lock()
defer s.mu.Unlock()
isInitialize := false
if s.recoverC == nil {
s.logger.Info("initializing recover channel")
s.recoverC = make(chan struct{}, 1)
} else {
s.logger.Info("recover channel is already initialized, trigger active orders recover")
isInitialize = true
select {
case s.recoverC <- struct{}{}:
s.logger.Info("trigger active orders recover")
default:
s.logger.Info("activeOrdersRecoverC is full")
}
}
return isInitialize
}
func (s *Strategy) recoverPeriodically(ctx context.Context) {
// every time we activeOrdersRecoverC receive signal, do active orders recover
if isInitialize := s.initializeRecoverC(); isInitialize {
return
}
// make ticker's interval random in 25 min ~ 35 min
interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000)
s.logger.Infof("[Recover] interval: %s", interval)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := s.recover(ctx); err != nil {
s.logger.WithError(err).Error("failed to recover")
}
case <-s.recoverC:
if err := s.recover(ctx); err != nil {
s.logger.WithError(err).Error("failed to recover")
}
}
}
}
func (s *Strategy) recover(ctx context.Context) error {
historyService, implemented := s.session.Exchange.(types.ExchangeTradeHistoryService)
// if the exchange doesn't support ExchangeTradeHistoryService, do not run recover // if the exchange doesn't support ExchangeTradeHistoryService, do not run recover
if !implemented { if !implemented {
s.logger.Warn("ExchangeTradeHistoryService is not implemented, can not recover grid") s.logger.Warn("ExchangeTradeHistoryService is not implemented, can not recover grid")
return nil return nil
} }
openOrders, err := session.Exchange.QueryOpenOrders(ctx, s.Symbol) activeOrderBook := s.orderExecutor.ActiveMakerOrders()
activeOrders := activeOrderBook.Orders()
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol)
if err != nil { if err != nil {
return errors.Wrapf(err, "unable to query open orders when recovering") return err
} }
s.logger.Infof("found %d open orders left on the %s order book", len(openOrders), s.Symbol) // check if it's new strategy or need to recover
if len(activeOrders) == 0 && len(openOrders) == 0 && s.GridProfitStats.InitialOrderID == 0 {
if s.GridProfitStats.InitialOrderID != 0 { // even though there is no open orders and initial orderID is 0
s.logger.Info("InitialOrderID is already there, need to recover") // we still need to query trades to make sure if we need to recover or not
} else if len(openOrders) != 0 {
s.logger.Info("even though InitialOrderID is 0, there are open orders so need to recover")
} else {
s.logger.Info("InitialOrderID is 0 and there is no open orders, query trades to check it")
// initial order id may be new strategy or lost data in redis, so we need to check trades + open orders
// if there are open orders or trades, we need to recover
trades, err := historyService.QueryTrades(ctx, s.Symbol, &types.TradeQueryOptions{ trades, err := historyService.QueryTrades(ctx, s.Symbol, &types.TradeQueryOptions{
// from 1, because some API will ignore 0 last trade id // from 1, because some API will ignore 0 last trade id
LastTradeID: 1, LastTradeID: 1,
@ -53,181 +133,153 @@ func (s *Strategy) recoverByScanningTrades(ctx context.Context, session *bbgo.Ex
} }
if len(trades) == 0 { if len(trades) == 0 {
s.logger.Info("0 trades found, it's a new strategy so no need to recover") s.logger.Info("no open order, no active order, no trade, it's a new strategy so no need to recover")
return nil return nil
} }
} }
s.logger.Infof("start recovering") s.logger.Info("start recovering")
filledOrders, err := s.getFilledOrdersByScanningTrades(ctx, historyService, s.orderQueryService, openOrders)
if err != nil {
return errors.Wrap(err, "grid recover error")
}
s.debugOrders("emit filled orders", filledOrders)
// add open orders into avtive maker orders if s.getGrid() == nil {
s.addOrdersToActiveOrderBook(openOrders) s.setGrid(s.newGrid())
// emit the filled orders
activeOrderBook := s.orderExecutor.ActiveMakerOrders()
for _, filledOrder := range filledOrders {
activeOrderBook.EmitFilled(filledOrder)
} }
// emit ready after recover s.mu.Lock()
s.EmitGridReady() defer s.mu.Unlock()
// debug and send metrics pins := s.getGrid().Pins
// wait for the reverse order to be placed
time.Sleep(2 * time.Second)
debugGrid(s.logger, s.grid, s.orderExecutor.ActiveMakerOrders())
defer bbgo.Sync(ctx, s) activeOrdersInTwinOrderBook, err := s.buildTwinOrderBook(pins, activeOrders)
openOrdersInTwinOrderBook, err := s.buildTwinOrderBook(pins, openOrders)
if s.EnableProfitFixer { s.logger.Infof("active orders' twin orderbook\n%s", activeOrdersInTwinOrderBook.String())
until := time.Now() s.logger.Infof("open orders in twin orderbook\n%s", openOrdersInTwinOrderBook.String())
since := until.Add(-7 * 24 * time.Hour)
if s.FixProfitSince != nil { // remove index 0, because twin orderbook's price is from the second one
since = s.FixProfitSince.Time() pins = pins[1:]
var noTwinOrderPins []fixedpoint.Value
for _, pin := range pins {
v := fixedpoint.Value(pin)
activeOrder := activeOrdersInTwinOrderBook.GetTwinOrder(v)
openOrder := openOrdersInTwinOrderBook.GetTwinOrder(v)
if activeOrder == nil || openOrder == nil {
return fmt.Errorf("there is no any twin order at this pin, can not recover")
} }
fixer := newProfitFixer(s.grid, s.Symbol, historyService) var activeOrderID uint64 = 0
fixer.SetLogger(s.logger) if activeOrder.Exist() {
activeOrderID = activeOrder.GetOrder().OrderID
}
// set initial order ID = 0 instead of s.GridProfitStats.InitialOrderID because the order ID could be incorrect var openOrderID uint64 = 0
if err := fixer.Fix(ctx, since, until, 0, s.GridProfitStats); err != nil { if openOrder.Exist() {
openOrderID = openOrder.GetOrder().OrderID
}
// case 3
if activeOrderID == 0 && openOrderID == 0 {
noTwinOrderPins = append(noTwinOrderPins, v)
continue
}
// case 1
if activeOrderID == 0 {
activeOrderBook.Add(openOrder.GetOrder())
// also add open orders into active order's twin orderbook, we will use this active orderbook to recover empty price grid
activeOrdersInTwinOrderBook.AddTwinOrder(v, openOrder)
continue
}
// case 2
if openOrderID == 0 {
syncActiveOrder(ctx, activeOrderBook, s.orderQueryService, activeOrder.GetOrder().OrderID)
continue
}
// case 4
if activeOrderID != openOrderID {
return fmt.Errorf("there are two different orders in the same pin, can not recover")
}
// case 5
// do nothing
}
s.logger.Infof("twin orderbook after adding open orders\n%s", activeOrdersInTwinOrderBook.String())
if err := s.recoverEmptyGrid(ctx, activeOrdersInTwinOrderBook, historyService, s.orderQueryService); err != nil {
s.logger.WithError(err).Error("failed to recover empty grid")
return err return err
} }
s.logger.Infof("fixed profitStats: %#v", s.GridProfitStats) s.logger.Infof("twin orderbook after recovering\n%s", activeOrdersInTwinOrderBook.String())
s.EmitGridProfit(s.GridProfitStats, nil) if activeOrdersInTwinOrderBook.EmptyTwinOrderSize() > 0 {
return fmt.Errorf("there is still empty grid in twin orderbook")
} }
for _, pin := range noTwinOrderPins {
twinOrder := activeOrdersInTwinOrderBook.GetTwinOrder(pin)
if twinOrder == nil {
return fmt.Errorf("should not get nil twin order after recovering empty grid, check it")
}
if !twinOrder.Exist() {
return fmt.Errorf("should not get empty twin order after recovering empty grid, check it")
}
activeOrderBook.EmitFilled(twinOrder.GetOrder())
time.Sleep(100 * time.Millisecond)
}
s.EmitGridReady()
time.Sleep(2 * time.Second)
debugGrid(s.logger, s.grid, s.orderExecutor.ActiveMakerOrders())
bbgo.Sync(ctx, s)
return nil return nil
} }
func (s *Strategy) getFilledOrdersByScanningTrades(ctx context.Context, queryTradesService types.ExchangeTradeHistoryService, queryOrderService types.ExchangeOrderQueryService, openOrdersOnGrid []types.Order) ([]types.Order, error) { func (s *Strategy) buildTwinOrderBook(pins []Pin, orders []types.Order) (*TwinOrderBook, error) {
// set grid book := NewTwinOrderBook(pins)
grid := s.newGrid()
s.setGrid(grid)
expectedNumOfOrders := s.GridNum - 1 for _, order := range orders {
numGridOpenOrders := int64(len(openOrdersOnGrid)) if err := book.AddOrder(order); err != nil {
s.debugLog("open orders nums: %d, expected nums: %d", numGridOpenOrders, expectedNumOfOrders) return nil, err
if expectedNumOfOrders == numGridOpenOrders { }
// no need to recover, only need to add open orders back to active order book
return nil, nil
} else if expectedNumOfOrders < numGridOpenOrders {
return nil, fmt.Errorf("amount of grid's open orders should not > amount of expected grid's orders")
} }
// 1. build twin-order map return book, nil
twinOrdersOpen, err := s.buildTwinOrderMap(grid.Pins, openOrdersOnGrid)
if err != nil {
return nil, errors.Wrapf(err, "failed to build pin order map with open orders")
}
// 2. build the filled twin-order map by querying trades
expectedFilledNum := int(expectedNumOfOrders - numGridOpenOrders)
twinOrdersFilled, err := s.buildFilledTwinOrderMapFromTrades(ctx, queryTradesService, queryOrderService, twinOrdersOpen, expectedFilledNum)
if err != nil {
return nil, errors.Wrapf(err, "failed to build filled pin order map")
}
// 3. get the filled orders from twin-order map
filledOrders := twinOrdersFilled.AscendingOrders()
// 4. verify the grid
if err := s.verifyFilledTwinGrid(s.grid.Pins, twinOrdersOpen, filledOrders); err != nil {
return nil, errors.Wrapf(err, "verify grid with error")
}
return filledOrders, nil
} }
func (s *Strategy) verifyFilledTwinGrid(pins []Pin, twinOrders TwinOrderMap, filledOrders []types.Order) error { func syncActiveOrder(ctx context.Context, activeOrderBook *bbgo.ActiveOrderBook, orderQueryService types.ExchangeOrderQueryService, orderID uint64) error {
s.debugLog("verifying filled grid - pins: %+v", pins) updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, orderQueryService, types.OrderQuery{
s.debugOrders("verifying filled grid - filled orders", filledOrders) Symbol: activeOrderBook.Symbol,
s.debugLog("verifying filled grid - open twin orders:\n%s", twinOrders.String()) OrderID: strconv.FormatUint(orderID, 10),
})
if err := s.addOrdersIntoTwinOrderMap(twinOrders, filledOrders); err != nil { if err != nil {
return errors.Wrapf(err, "verifying filled grid error when add orders into twin order map") return err
} }
s.debugLog("verifying filled grid - filled twin orders:\n%+v", twinOrders.String()) activeOrderBook.Update(*updatedOrder)
for i, pin := range pins {
// we use twinOrderMap to make sure there are no duplicated order at one grid, and we use the sell price as key so we skip the pins[0] which is only for buy price
if i == 0 {
continue
}
twin, exist := twinOrders[fixedpoint.Value(pin)]
if !exist {
return fmt.Errorf("there is no order at price (%+v)", pin)
}
if !twin.Exist() {
return fmt.Errorf("all the price need a twin")
}
if !twin.IsValid() {
return fmt.Errorf("all the twins need to be valid")
}
}
return nil return nil
} }
// buildTwinOrderMap build the pin-order map with grid and open orders. func (s *Strategy) recoverEmptyGrid(ctx context.Context, twinOrderBook *TwinOrderBook, queryTradesService types.ExchangeTradeHistoryService, queryOrderService types.ExchangeOrderQueryService) error {
// The keys of this map contains all required pins of this grid. if twinOrderBook.EmptyTwinOrderSize() == 0 {
// If the Order of the pin is empty types.Order (OrderID == 0), it means there is no open orders at this pin. s.logger.Info("no empty grid")
func (s *Strategy) buildTwinOrderMap(pins []Pin, openOrders []types.Order) (TwinOrderMap, error) { return nil
twinOrderMap := make(TwinOrderMap)
for i, pin := range pins {
// twin order map only use sell price as key, so skip 0
if i == 0 {
continue
} }
twinOrderMap[fixedpoint.Value(pin)] = TwinOrder{} existedOrders := twinOrderBook.SyncOrderMap()
}
for _, openOrder := range openOrders {
twinKey, err := findTwinOrderMapKey(s.grid, openOrder)
if err != nil {
return nil, errors.Wrapf(err, "failed to build twin order map")
}
twinOrder, exist := twinOrderMap[twinKey]
if !exist {
return nil, fmt.Errorf("the price of the openOrder (id: %d) is not in pins", openOrder.OrderID)
}
if twinOrder.Exist() {
return nil, fmt.Errorf("there are multiple order in a twin")
}
twinOrder.SetOrder(openOrder)
twinOrderMap[twinKey] = twinOrder
}
return twinOrderMap, nil
}
// buildFilledTwinOrderMapFromTrades will query the trades from last 24 hour and use them to build a pin order map
// It will skip the orders on pins at which open orders are already
func (s *Strategy) buildFilledTwinOrderMapFromTrades(ctx context.Context, queryTradesService types.ExchangeTradeHistoryService, queryOrderService types.ExchangeOrderQueryService, twinOrdersOpen TwinOrderMap, expectedFillNum int) (TwinOrderMap, error) {
twinOrdersFilled := make(TwinOrderMap)
// existedOrders is used to avoid re-query the same orders
existedOrders := twinOrdersOpen.SyncOrderMap()
// get the filled orders when bbgo is down in order from trades
until := time.Now() until := time.Now()
// the first query only query the last 1 hour, because mostly shutdown and recovery happens within 1 hour
since := until.Add(-1 * time.Hour) since := until.Add(-1 * time.Hour)
// hard limit for recover // hard limit for recover
recoverSinceLimit := time.Date(2023, time.March, 10, 0, 0, 0, 0, time.UTC) recoverSinceLimit := time.Date(2023, time.March, 10, 0, 0, 0, 0, time.UTC)
@ -237,15 +289,15 @@ func (s *Strategy) buildFilledTwinOrderMapFromTrades(ctx context.Context, queryT
} }
for { for {
if err := s.queryTradesToUpdateTwinOrdersMap(ctx, queryTradesService, queryOrderService, twinOrdersOpen, twinOrdersFilled, existedOrders, since, until); err != nil { if err := s.queryTradesToUpdateTwinOrderBook(ctx, twinOrderBook, queryTradesService, queryOrderService, existedOrders, since, until); err != nil {
return nil, errors.Wrapf(err, "failed to query trades to update twin orders map") return errors.Wrapf(err, "failed to query trades to update twin orderbook")
} }
until = since until = since
since = until.Add(-6 * time.Hour) since = until.Add(-6 * time.Hour)
if len(twinOrdersFilled) >= expectedFillNum { if twinOrderBook.EmptyTwinOrderSize() == 0 {
s.logger.Infof("stop querying trades because twin orders filled (%d) >= expected filled nums (%d)", len(twinOrdersFilled), expectedFillNum) s.logger.Infof("stop querying trades because there is no empty twin order on twin orderbook")
break break
} }
@ -260,10 +312,10 @@ func (s *Strategy) buildFilledTwinOrderMapFromTrades(ctx context.Context, queryT
} }
} }
return twinOrdersFilled, nil return nil
} }
func (s *Strategy) queryTradesToUpdateTwinOrdersMap(ctx context.Context, queryTradesService types.ExchangeTradeHistoryService, queryOrderService types.ExchangeOrderQueryService, twinOrdersOpen, twinOrdersFilled TwinOrderMap, existedOrders *types.SyncOrderMap, since, until time.Time) error { func (s *Strategy) queryTradesToUpdateTwinOrderBook(ctx context.Context, twinOrderBook *TwinOrderBook, queryTradesService types.ExchangeTradeHistoryService, queryOrderService types.ExchangeOrderQueryService, existedOrders *types.SyncOrderMap, since, until time.Time) error {
var fromTradeID uint64 = 0 var fromTradeID uint64 = 0
var limit int64 = 1000 var limit int64 = 1000
for { for {
@ -275,9 +327,8 @@ func (s *Strategy) queryTradesToUpdateTwinOrdersMap(ctx context.Context, queryTr
}) })
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to query trades to recover the grid with open orders") return errors.Wrapf(err, "failed to query trades to recover the grid")
} }
s.debugLog("QueryTrades from %s <-> %s (from: %d) return %d trades", since, until, fromTradeID, len(trades)) s.debugLog("QueryTrades from %s <-> %s (from: %d) return %d trades", since, until, fromTradeID, len(trades))
for _, trade := range trades { for _, trade := range trades {
@ -306,31 +357,9 @@ func (s *Strategy) queryTradesToUpdateTwinOrdersMap(ctx context.Context, queryTr
// add 1 to avoid duplicate // add 1 to avoid duplicate
fromTradeID = trade.ID + 1 fromTradeID = trade.ID + 1
twinOrderKey, err := findTwinOrderMapKey(s.grid, *order) if err := twinOrderBook.AddOrder(*order); err != nil {
if err != nil { return errors.Wrapf(err, "failed to add queried order into twin orderbook")
return errors.Wrapf(err, "failed to find grid order map's key when recover")
} }
twinOrderOpen, exist := twinOrdersOpen[twinOrderKey]
if !exist {
return fmt.Errorf("the price of the order with the same GroupID is not in pins")
}
if twinOrderOpen.Exist() {
continue
}
if twinOrder, exist := twinOrdersFilled[twinOrderKey]; exist {
to := twinOrder.GetOrder()
if to.UpdateTime.Time().After(order.UpdateTime.Time()) {
s.logger.Infof("twinOrder's update time (%s) should not be after order's update time (%s)", to.UpdateTime, order.UpdateTime)
continue
}
}
twinOrder := TwinOrder{}
twinOrder.SetOrder(*order)
twinOrdersFilled[twinOrderKey] = twinOrder
} }
// stop condition // stop condition
@ -339,24 +368,3 @@ func (s *Strategy) queryTradesToUpdateTwinOrdersMap(ctx context.Context, queryTr
} }
} }
} }
func (s *Strategy) addOrdersIntoTwinOrderMap(twinOrders TwinOrderMap, orders []types.Order) error {
for _, order := range orders {
k, err := findTwinOrderMapKey(s.grid, order)
if err != nil {
return errors.Wrap(err, "failed to add orders into twin order map")
}
if v, exist := twinOrders[k]; !exist {
return fmt.Errorf("the price (%+v) is not in pins", k)
} else if v.Exist() {
return fmt.Errorf("there is already a twin order at this price (%+v)", k)
} else {
twin := TwinOrder{}
twin.SetOrder(order)
twinOrders[k] = twin
}
}
return nil
}

View File

@ -1,295 +0,0 @@
package grid2
import (
"context"
"encoding/csv"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"sort"
"strconv"
"testing"
"time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/types"
"github.com/stretchr/testify/assert"
)
type TestData struct {
Market types.Market `json:"market" yaml:"market"`
Strategy Strategy `json:"strategy" yaml:"strategy"`
OpenOrders []types.Order `json:"openOrders" yaml:"openOrders"`
ClosedOrders []types.Order `json:"closedOrders" yaml:"closedOrders"`
Trades []types.Trade `json:"trades" yaml:"trades"`
}
type TestDataService struct {
Orders map[string]types.Order
Trades []types.Trade
}
func (t *TestDataService) QueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) ([]types.Trade, error) {
var i int = 0
if options.LastTradeID != 0 {
for idx, trade := range t.Trades {
if trade.ID < options.LastTradeID {
continue
}
i = idx
break
}
}
var trades []types.Trade
l := len(t.Trades)
for ; i < l && len(trades) < int(options.Limit); i++ {
trades = append(trades, t.Trades[i])
}
return trades, nil
}
func (t *TestDataService) QueryOrder(ctx context.Context, q types.OrderQuery) (*types.Order, error) {
if len(q.OrderID) == 0 {
return nil, fmt.Errorf("order id should not be empty")
}
order, exist := t.Orders[q.OrderID]
if !exist {
return nil, fmt.Errorf("order not found")
}
return &order, nil
}
// dummy method for interface
func (t *TestDataService) QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []types.Order, err error) {
return nil, nil
}
// dummy method for interface
func (t *TestDataService) QueryOrderTrades(ctx context.Context, q types.OrderQuery) ([]types.Trade, error) {
return nil, nil
}
func NewStrategy(t *TestData) *Strategy {
s := t.Strategy
s.Debug = true
s.Initialize()
s.Market = t.Market
s.Position = types.NewPositionFromMarket(t.Market)
s.orderExecutor = bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, t.Market.Symbol, ID, s.InstanceID(), s.Position)
return &s
}
func NewTestDataService(t *TestData) *TestDataService {
var orders map[string]types.Order = make(map[string]types.Order)
for _, order := range t.OpenOrders {
orders[strconv.FormatUint(order.OrderID, 10)] = order
}
for _, order := range t.ClosedOrders {
orders[strconv.FormatUint(order.OrderID, 10)] = order
}
trades := t.Trades
sort.Slice(t.Trades, func(i, j int) bool {
return trades[i].ID < trades[j].ID
})
return &TestDataService{
Orders: orders,
Trades: trades,
}
}
func readSpec(fileName string) (*TestData, error) {
content, err := ioutil.ReadFile(fileName)
if err != nil {
return nil, err
}
market := types.Market{}
if err := json.Unmarshal(content, &market); err != nil {
return nil, err
}
strategy := Strategy{}
if err := json.Unmarshal(content, &strategy); err != nil {
return nil, err
}
data := TestData{
Market: market,
Strategy: strategy,
}
return &data, nil
}
func readOrdersFromCSV(fileName string) ([]types.Order, error) {
csvFile, err := os.Open(fileName)
if err != nil {
return nil, err
}
defer csvFile.Close()
csvReader := csv.NewReader(csvFile)
keys, err := csvReader.Read()
if err != nil {
return nil, err
}
var orders []types.Order
for {
row, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if len(row) != len(keys) {
return nil, fmt.Errorf("length of row should be equal to length of keys")
}
var m map[string]interface{} = make(map[string]interface{})
for i, key := range keys {
if key == "orderID" {
x, err := strconv.ParseUint(row[i], 10, 64)
if err != nil {
return nil, err
}
m[key] = x
} else {
m[key] = row[i]
}
}
b, err := json.Marshal(m)
if err != nil {
return nil, err
}
order := types.Order{}
if err = json.Unmarshal(b, &order); err != nil {
return nil, err
}
orders = append(orders, order)
}
return orders, nil
}
func readTradesFromCSV(fileName string) ([]types.Trade, error) {
csvFile, err := os.Open(fileName)
if err != nil {
return nil, err
}
defer csvFile.Close()
csvReader := csv.NewReader(csvFile)
keys, err := csvReader.Read()
if err != nil {
return nil, err
}
var trades []types.Trade
for {
row, err := csvReader.Read()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if len(row) != len(keys) {
return nil, fmt.Errorf("length of row should be equal to length of keys")
}
var m map[string]interface{} = make(map[string]interface{})
for i, key := range keys {
switch key {
case "id", "orderID":
x, err := strconv.ParseUint(row[i], 10, 64)
if err != nil {
return nil, err
}
m[key] = x
default:
m[key] = row[i]
}
}
b, err := json.Marshal(m)
if err != nil {
return nil, err
}
trade := types.Trade{}
if err = json.Unmarshal(b, &trade); err != nil {
return nil, err
}
trades = append(trades, trade)
}
return trades, nil
}
func readTestDataFrom(fileDir string) (*TestData, error) {
data, err := readSpec(fmt.Sprintf("%s/spec", fileDir))
if err != nil {
return nil, err
}
openOrders, err := readOrdersFromCSV(fmt.Sprintf("%s/open_orders.csv", fileDir))
if err != nil {
return nil, err
}
closedOrders, err := readOrdersFromCSV(fmt.Sprintf("%s/closed_orders.csv", fileDir))
if err != nil {
return nil, err
}
trades, err := readTradesFromCSV(fmt.Sprintf("%s/trades.csv", fileDir))
if err != nil {
return nil, err
}
data.OpenOrders = openOrders
data.ClosedOrders = closedOrders
data.Trades = trades
return data, nil
}
func TestRecoverByScanningTrades(t *testing.T) {
assert := assert.New(t)
t.Run("test case 1", func(t *testing.T) {
fileDir := "recovery_testcase/testcase1/"
data, err := readTestDataFrom(fileDir)
if !assert.NoError(err) {
return
}
testService := NewTestDataService(data)
strategy := NewStrategy(data)
filledOrders, err := strategy.getFilledOrdersByScanningTrades(context.Background(), testService, testService, data.OpenOrders)
if !assert.NoError(err) {
return
}
assert.Len(filledOrders, 0)
})
}

View File

@ -204,7 +204,7 @@ type Strategy struct {
tradingCtx, writeCtx context.Context tradingCtx, writeCtx context.Context
cancelWrite context.CancelFunc cancelWrite context.CancelFunc
activeOrdersRecoverC chan struct{} recoverC chan struct{}
// this ensures that bbgo.Sync to lock the object // this ensures that bbgo.Sync to lock the object
sync.Mutex sync.Mutex
@ -1953,7 +1953,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
return return
} }
s.recoverActiveOrdersPeriodically(ctx) s.recoverPeriodically(ctx)
}) })
} else { } else {
s.startProcess(ctx, session) s.startProcess(ctx, session)
@ -1968,7 +1968,7 @@ func (s *Strategy) startProcess(ctx context.Context, session *bbgo.ExchangeSessi
if s.RecoverOrdersWhenStart { if s.RecoverOrdersWhenStart {
// do recover only when triggerPrice is not set and not in the back-test mode // do recover only when triggerPrice is not set and not in the back-test mode
s.logger.Infof("recoverWhenStart is set, trying to recover grid orders...") s.logger.Infof("recoverWhenStart is set, trying to recover grid orders...")
if err := s.recoverGrid(ctx, session); err != nil { if err := s.recover(ctx); err != nil {
// if recover fail, return and do not open grid // if recover fail, return and do not open grid
s.logger.WithError(err).Error("failed to start process, recover error") s.logger.WithError(err).Error("failed to start process, recover error")
s.EmitGridError(errors.Wrapf(err, "failed to start process, recover error")) s.EmitGridError(errors.Wrapf(err, "failed to start process, recover error"))
@ -1985,6 +1985,7 @@ func (s *Strategy) startProcess(ctx context.Context, session *bbgo.ExchangeSessi
return nil return nil
} }
/*
func (s *Strategy) recoverGrid(ctx context.Context, session *bbgo.ExchangeSession) error { func (s *Strategy) recoverGrid(ctx context.Context, session *bbgo.ExchangeSession) error {
if s.RecoverGridByScanningTrades { if s.RecoverGridByScanningTrades {
s.debugLog("recovering grid by scanning trades") s.debugLog("recovering grid by scanning trades")
@ -1994,6 +1995,7 @@ func (s *Strategy) recoverGrid(ctx context.Context, session *bbgo.ExchangeSessio
s.debugLog("recovering grid by scanning orders") s.debugLog("recovering grid by scanning orders")
return s.recoverByScanningOrders(ctx, session) return s.recoverByScanningOrders(ctx, session)
} }
*/
func (s *Strategy) recoverByScanningOrders(ctx context.Context, session *bbgo.ExchangeSession) error { func (s *Strategy) recoverByScanningOrders(ctx context.Context, session *bbgo.ExchangeSession) error {
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol) openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol)

View File

@ -111,3 +111,128 @@ func (m TwinOrderMap) String() string {
sb.WriteString("================== END OF PIN ORDER MAP ==================\n") sb.WriteString("================== END OF PIN ORDER MAP ==================\n")
return sb.String() return sb.String()
} }
type TwinOrderBook struct {
// sort in asc order
pins []fixedpoint.Value
// pin index, use to find the next or last pin in desc order
pinIdx map[fixedpoint.Value]int
// orderbook
m map[fixedpoint.Value]*TwinOrder
size int
}
func NewTwinOrderBook(pins []Pin) *TwinOrderBook {
var v []fixedpoint.Value
for _, pin := range pins {
v = append(v, fixedpoint.Value(pin))
}
// sort it in asc order
sort.Slice(v, func(i, j int) bool {
return v[j].Compare(v[i]) > 0
})
pinIdx := make(map[fixedpoint.Value]int)
m := make(map[fixedpoint.Value]*TwinOrder)
for i, pin := range v {
m[pin] = &TwinOrder{}
pinIdx[pin] = i
}
ob := TwinOrderBook{
pins: v,
pinIdx: pinIdx,
m: m,
size: 0,
}
return &ob
}
func (book *TwinOrderBook) String() string {
var sb strings.Builder
sb.WriteString("================== TWIN ORDER MAP ==================\n")
for _, pin := range book.pins {
twin := book.m[fixedpoint.Value(pin)]
twinOrder := twin.GetOrder()
sb.WriteString(fmt.Sprintf("-> %8s) %s\n", pin, twinOrder.String()))
}
sb.WriteString("================== END OF PIN ORDER MAP ==================\n")
return sb.String()
}
func (book *TwinOrderBook) GetTwinOrderPin(order types.Order) (fixedpoint.Value, error) {
idx, exist := book.pinIdx[order.Price]
if !exist {
return fixedpoint.Zero, fmt.Errorf("the order's (%d) price (%s) is not in pins", order.OrderID, order.Price)
}
if order.Side == types.SideTypeBuy {
idx++
if idx >= len(book.pins) {
return fixedpoint.Zero, fmt.Errorf("this order's twin order price is not in pins, %+v", order)
}
} else if order.Side == types.SideTypeSell {
if idx == 0 {
return fixedpoint.Zero, fmt.Errorf("this order's twin order price is at zero index, %+v", order)
}
// do nothing
} else {
// should not happen
return fixedpoint.Zero, fmt.Errorf("the order's (%d) side (%s) is not supported", order.OrderID, order.Side)
}
return book.pins[idx], nil
}
func (book *TwinOrderBook) AddOrder(order types.Order) error {
pin, err := book.GetTwinOrderPin(order)
if err != nil {
return err
}
twinOrder, exist := book.m[pin]
if !exist {
// should not happen
return fmt.Errorf("no any empty twin order at pins, should not happen, check it")
}
if !twinOrder.Exist() {
book.size++
}
twinOrder.SetOrder(order)
return nil
}
func (book *TwinOrderBook) GetTwinOrder(pin fixedpoint.Value) *TwinOrder {
return book.m[pin]
}
func (book *TwinOrderBook) AddTwinOrder(pin fixedpoint.Value, order *TwinOrder) {
book.m[pin] = order
}
func (book *TwinOrderBook) Size() int {
return book.size
}
func (book *TwinOrderBook) EmptyTwinOrderSize() int {
return len(book.pins) - 1 - book.size
}
func (book *TwinOrderBook) SyncOrderMap() *types.SyncOrderMap {
orderMap := types.NewSyncOrderMap()
for _, twin := range book.m {
if twin.Exist() {
orderMap.Add(twin.GetOrder())
}
}
return orderMap
}