From 714d61a829c5698d41e88736c79ec45d899a20c5 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 16 Mar 2021 18:39:18 +0800 Subject: [PATCH] add grid restore behavior --- pkg/bbgo/notifier.go | 6 ++--- pkg/cmd/run.go | 7 +++-- pkg/exchange/max/exchange.go | 2 +- pkg/exchange/max/stream.go | 5 ++-- pkg/strategy/grid/strategy.go | 50 ++++++++++++++++++++++++++++++++++- pkg/types/order.go | 11 +++++--- 6 files changed, 66 insertions(+), 15 deletions(-) diff --git a/pkg/bbgo/notifier.go b/pkg/bbgo/notifier.go index 0f6470a42..d1996e296 100644 --- a/pkg/bbgo/notifier.go +++ b/pkg/bbgo/notifier.go @@ -13,9 +13,9 @@ func (n *NullNotifier) Notify(format string, args ...interface{}) {} type Notifiability struct { notifiers []Notifier - SessionChannelRouter *PatternChannelRouter - SymbolChannelRouter *PatternChannelRouter - ObjectChannelRouter *ObjectChannelRouter + SessionChannelRouter *PatternChannelRouter `json:"-"` + SymbolChannelRouter *PatternChannelRouter `json:"-"` + ObjectChannelRouter *ObjectChannelRouter `json:"-"` } // RouteSession routes symbol name to channel diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index e604df702..f3c135cdf 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -92,8 +92,8 @@ func BootstrapEnvironment(ctx context.Context, environ *bbgo.Environment, userCo } } - if err := environ.ConfigureNotificationSystem(userConfig) ; err != nil { - return errors.Wrap(err,"notification configure error") + if err := environ.ConfigureNotificationSystem(userConfig); err != nil { + return errors.Wrap(err, "notification configure error") } return nil @@ -108,7 +108,7 @@ func runConfig(basectx context.Context, userConfig *bbgo.Config, enableApiServer return err } - if err := environ.Sync(ctx) ; err != nil { + if err := environ.Sync(ctx); err != nil { return err } @@ -262,4 +262,3 @@ func buildAndRun(ctx context.Context, userConfig *bbgo.Config, args ...string) ( runCmd.Stderr = os.Stderr return runCmd, runCmd.Start() } - diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index 6a8082c35..39e9f53da 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -180,7 +180,7 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, return nil, err } - numBatches := 5 + numBatches := 3 limit := 1000 // max limit = 1000 offset := limit * numBatches orderIDs := make(map[uint64]struct{}, limit*2) diff --git a/pkg/exchange/max/stream.go b/pkg/exchange/max/stream.go index 212c71c29..5609831f0 100644 --- a/pkg/exchange/max/stream.go +++ b/pkg/exchange/max/stream.go @@ -212,7 +212,7 @@ func convertWebSocketTrade(t max.TradeUpdate) (*types.Trade, error) { ID: int64(t.ID), OrderID: t.OrderID, Symbol: toGlobalSymbol(t.Market), - Exchange: "max", + Exchange: types.ExchangeMax.String(), Price: price, Quantity: quantity, Side: side, @@ -246,8 +246,9 @@ func toGlobalOrderUpdate(u max.OrderUpdate) (*types.Order, error) { Price: util.MustParseFloat(u.Price), StopPrice: util.MustParseFloat(u.StopPrice), TimeInForce: "GTC", // MAX only supports GTC + GroupID: u.GroupID, }, - Exchange: "max", + Exchange: types.ExchangeMax.String(), OrderID: u.ID, Status: toGlobalOrderStatus(u.State, executedVolume, remainingVolume), ExecutedQuantity: executedVolume.Float64(), diff --git a/pkg/strategy/grid/strategy.go b/pkg/strategy/grid/strategy.go index 590661957..f1a69a6d9 100644 --- a/pkg/strategy/grid/strategy.go +++ b/pkg/strategy/grid/strategy.go @@ -6,6 +6,7 @@ import ( "hash/fnv" "sync" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/bbgo" @@ -25,6 +26,13 @@ func init() { bbgo.RegisterStrategy(ID, &Strategy{}) } +// Snapshot is the grid snapshot +type Snapshot struct { + Orders []types.SubmitOrder `json:"orders,omitempty"` + FilledBuyGrids map[fixedpoint.Value]struct{} `json:"filledBuyGrids"` + FilledSellGrids map[fixedpoint.Value]struct{} `json:"filledSellGrids"` +} + type Strategy struct { // The notification system will be injected into the strategy automatically. // This field will be injected automatically since it's a single exchange strategy. @@ -32,6 +40,8 @@ type Strategy struct { *bbgo.Graceful `json:"-" yaml:"-"` + *bbgo.Persistence + // OrderExecutor is an interface for submitting order. // This field will be injected automatically since it's a single exchange strategy. bbgo.OrderExecutor `json:"-" yaml:"-"` @@ -410,6 +420,19 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se return fmt.Errorf("upper price (%f) should not be less than lower price (%f)", s.UpperPrice.Float64(), s.LowerPrice.Float64()) } + var snapshot Snapshot + var snapshotLoaded = false + if s.Persistence != nil { + if err := s.Persistence.Load(&snapshot, ID, s.Symbol, "snapshot"); err != nil { + if err != service.ErrPersistenceNotExists { + return errors.Wrapf(err, "snapshot load error") + } + } else { + log.Infof("active order snapshot loaded") + snapshotLoaded = true + } + } + s.filledBuyGrids = make(map[fixedpoint.Value]struct{}) s.filledSellGrids = make(map[fixedpoint.Value]struct{}) @@ -437,6 +460,20 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() + if s.Persistence != nil { + log.Infof("backing up active orders...") + submitOrders := s.activeOrders.Backup() + snapshot := Snapshot{ + Orders: submitOrders, + } + + if err := s.Persistence.Save(&snapshot, ID, s.Symbol, "snapshot"); err != nil { + log.WithError(err).Error("can not save active order backups") + } else { + log.Infof("active order snapshot saved") + } + } + log.Infof("canceling active orders...") if err := session.Exchange.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil { log.WithError(err).Errorf("cancel order error") @@ -453,7 +490,18 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se session.Stream.OnTradeUpdate(s.tradeUpdateHandler) session.Stream.OnStart(func() { - s.placeGridOrders(orderExecutor, session) + if snapshotLoaded && len(snapshot.Orders) > 0 { + createdOrders, err := orderExecutor.SubmitOrders(ctx, snapshot.Orders...) + if err != nil { + log.WithError(err).Error("active orders restore error") + } + s.activeOrders.Add(createdOrders...) + s.orderStore.Add(createdOrders...) + s.filledSellGrids = snapshot.FilledSellGrids + s.filledBuyGrids = snapshot.FilledBuyGrids + } else { + s.placeGridOrders(orderExecutor, session) + } }) return nil diff --git a/pkg/types/order.go b/pkg/types/order.go index 6abf47b4a..644a6753d 100644 --- a/pkg/types/order.go +++ b/pkg/types/order.go @@ -96,7 +96,7 @@ type SubmitOrder struct { Quantity float64 `json:"quantity" db:"quantity"` Price float64 `json:"price" db:"price"` - StopPrice float64 `json:"stopPrice" db:"stop_price"` + StopPrice float64 `json:"stopPrice,omitempty" db:"stop_price"` Market Market `json:"-" db:"-"` @@ -105,11 +105,11 @@ type SubmitOrder struct { PriceString string `json:"-"` QuantityString string `json:"-"` - TimeInForce string `json:"timeInForce" db:"time_in_force"` // GTC, IOC, FOK + TimeInForce string `json:"timeInForce,omitempty" db:"time_in_force"` // GTC, IOC, FOK - GroupID int64 `json:"groupID"` + GroupID int64 `json:"groupID,omitempty"` - MarginSideEffect MarginOrderSideEffectType `json:"marginSideEffect"` // AUTO_REPAY = repay, MARGIN_BUY = borrow, defaults to NO_SIDE_EFFECT + MarginSideEffect MarginOrderSideEffectType `json:"marginSideEffect,omitempty"` // AUTO_REPAY = repay, MARGIN_BUY = borrow, defaults to NO_SIDE_EFFECT } func (o *SubmitOrder) String() string { @@ -160,6 +160,9 @@ type Order struct { func (o Order) Backup() SubmitOrder { so := o.SubmitOrder so.Quantity = o.Quantity - o.ExecutedQuantity + + // ClientOrderID can not be reused + so.ClientOrderID = "" return so }