Merge pull request #163 from c9s/feature/mark-trade-strategy

This commit is contained in:
Yo-An Lin 2021-03-16 22:34:09 +08:00 committed by GitHub
commit 7ecb17dbe2
8 changed files with 83 additions and 17 deletions

View File

@ -13,9 +13,9 @@ func (n *NullNotifier) Notify(format string, args ...interface{}) {}
type Notifiability struct {
notifiers []Notifier
SessionChannelRouter *PatternChannelRouter
SymbolChannelRouter *PatternChannelRouter
ObjectChannelRouter *ObjectChannelRouter
SessionChannelRouter *PatternChannelRouter `json:"-"`
SymbolChannelRouter *PatternChannelRouter `json:"-"`
ObjectChannelRouter *ObjectChannelRouter `json:"-"`
}
// RouteSession routes symbol name to channel

View File

@ -92,8 +92,8 @@ func BootstrapEnvironment(ctx context.Context, environ *bbgo.Environment, userCo
}
}
if err := environ.ConfigureNotificationSystem(userConfig) ; err != nil {
return errors.Wrap(err,"notification configure error")
if err := environ.ConfigureNotificationSystem(userConfig); err != nil {
return errors.Wrap(err, "notification configure error")
}
return nil
@ -108,7 +108,7 @@ func runConfig(basectx context.Context, userConfig *bbgo.Config, enableApiServer
return err
}
if err := environ.Sync(ctx) ; err != nil {
if err := environ.Sync(ctx); err != nil {
return err
}
@ -262,4 +262,3 @@ func buildAndRun(ctx context.Context, userConfig *bbgo.Config, args ...string) (
runCmd.Stderr = os.Stderr
return runCmd, runCmd.Start()
}

View File

@ -180,7 +180,7 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since,
return nil, err
}
numBatches := 5
numBatches := 3
limit := 1000 // max limit = 1000
offset := limit * numBatches
orderIDs := make(map[uint64]struct{}, limit*2)

View File

@ -212,7 +212,7 @@ func convertWebSocketTrade(t max.TradeUpdate) (*types.Trade, error) {
ID: int64(t.ID),
OrderID: t.OrderID,
Symbol: toGlobalSymbol(t.Market),
Exchange: "max",
Exchange: types.ExchangeMax.String(),
Price: price,
Quantity: quantity,
Side: side,
@ -246,8 +246,9 @@ func toGlobalOrderUpdate(u max.OrderUpdate) (*types.Order, error) {
Price: util.MustParseFloat(u.Price),
StopPrice: util.MustParseFloat(u.StopPrice),
TimeInForce: "GTC", // MAX only supports GTC
GroupID: u.GroupID,
},
Exchange: "max",
Exchange: types.ExchangeMax.String(),
OrderID: u.ID,
Status: toGlobalOrderStatus(u.State, executedVolume, remainingVolume),
ExecutedQuantity: executedVolume.Float64(),

View File

@ -341,7 +341,7 @@ func (s *TradeService) Load(ctx context.Context, id int64) (*types.Trade, error)
return nil, errors.Wrapf(ErrTradeNotFound, "trade id:%d not found", id)
}
func (s *TradeService) MarkStrategyID(ctx context.Context, id int64, strategyID string) error {
func (s *TradeService) Mark(ctx context.Context, id int64, strategyID string) error {
result, err := s.DB.NamedExecContext(ctx, "UPDATE `trades` SET `strategy` = :strategy WHERE `id` = :id", map[string]interface{}{
"id": id,
"strategy": strategyID,

View File

@ -36,7 +36,7 @@ func Test_tradeService(t *testing.T) {
})
assert.NoError(t, err)
err = service.MarkStrategyID(ctx, 1, "grid")
err = service.Mark(ctx, 1, "grid")
assert.NoError(t, err)
tradeRecord, err := service.Load(ctx, 1)

View File

@ -6,6 +6,7 @@ import (
"hash/fnv"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
@ -25,6 +26,14 @@ func init() {
bbgo.RegisterStrategy(ID, &Strategy{})
}
// Snapshot is the grid snapshot
type Snapshot struct {
Orders []types.SubmitOrder `json:"orders,omitempty"`
FilledBuyGrids map[fixedpoint.Value]struct{} `json:"filledBuyGrids"`
FilledSellGrids map[fixedpoint.Value]struct{} `json:"filledSellGrids"`
Position *bbgo.Position `json:"position,omitempty"`
}
type Strategy struct {
// The notification system will be injected into the strategy automatically.
// This field will be injected automatically since it's a single exchange strategy.
@ -32,6 +41,8 @@ type Strategy struct {
*bbgo.Graceful `json:"-" yaml:"-"`
*bbgo.Persistence
// OrderExecutor is an interface for submitting order.
// This field will be injected automatically since it's a single exchange strategy.
bbgo.OrderExecutor `json:"-" yaml:"-"`
@ -332,6 +343,12 @@ func (s *Strategy) tradeUpdateHandler(trade types.Trade) {
if s.orderStore.Exists(trade.OrderID) {
log.Infof("received trade update of order %d: %+v", trade.OrderID, trade)
if s.TradeService != nil {
if err := s.TradeService.Mark(context.Background(), trade.ID, ID); err != nil {
log.WithError(err).Error("trade mark error")
}
}
if trade.Side == types.SideTypeSelf {
return
}
@ -404,6 +421,19 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
return fmt.Errorf("upper price (%f) should not be less than lower price (%f)", s.UpperPrice.Float64(), s.LowerPrice.Float64())
}
var snapshot Snapshot
var snapshotLoaded = false
if s.Persistence != nil {
if err := s.Persistence.Load(&snapshot, ID, s.Symbol, "snapshot"); err != nil {
if err != service.ErrPersistenceNotExists {
return errors.Wrapf(err, "snapshot load error")
}
} else {
log.Infof("active order snapshot loaded")
snapshotLoaded = true
}
}
s.filledBuyGrids = make(map[fixedpoint.Value]struct{})
s.filledSellGrids = make(map[fixedpoint.Value]struct{})
@ -431,6 +461,21 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
if s.Persistence != nil {
log.Infof("backing up active orders...")
submitOrders := s.activeOrders.Backup()
snapshot := Snapshot{
Orders: submitOrders,
Position: &s.position,
}
if err := s.Persistence.Save(&snapshot, ID, s.Symbol, "snapshot"); err != nil {
log.WithError(err).Error("can not save active order backups")
} else {
log.Infof("active order snapshot saved")
}
}
log.Infof("canceling active orders...")
if err := session.Exchange.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil {
log.WithError(err).Errorf("cancel order error")
@ -447,7 +492,25 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
session.Stream.OnTradeUpdate(s.tradeUpdateHandler)
session.Stream.OnStart(func() {
s.placeGridOrders(orderExecutor, session)
if snapshotLoaded && len(snapshot.Orders) > 0 {
createdOrders, err := orderExecutor.SubmitOrders(ctx, snapshot.Orders...)
if err != nil {
log.WithError(err).Error("active orders restore error")
}
s.activeOrders.Add(createdOrders...)
s.orderStore.Add(createdOrders...)
if snapshot.FilledSellGrids != nil {
s.filledSellGrids = snapshot.FilledSellGrids
}
if snapshot.FilledBuyGrids != nil {
s.filledBuyGrids = snapshot.FilledBuyGrids
}
if snapshot.Position != nil {
s.position = *snapshot.Position
}
} else {
s.placeGridOrders(orderExecutor, session)
}
})
return nil

View File

@ -96,7 +96,7 @@ type SubmitOrder struct {
Quantity float64 `json:"quantity" db:"quantity"`
Price float64 `json:"price" db:"price"`
StopPrice float64 `json:"stopPrice" db:"stop_price"`
StopPrice float64 `json:"stopPrice,omitempty" db:"stop_price"`
Market Market `json:"-" db:"-"`
@ -105,11 +105,11 @@ type SubmitOrder struct {
PriceString string `json:"-"`
QuantityString string `json:"-"`
TimeInForce string `json:"timeInForce" db:"time_in_force"` // GTC, IOC, FOK
TimeInForce string `json:"timeInForce,omitempty" db:"time_in_force"` // GTC, IOC, FOK
GroupID int64 `json:"groupID"`
GroupID int64 `json:"groupID,omitempty"`
MarginSideEffect MarginOrderSideEffectType `json:"marginSideEffect"` // AUTO_REPAY = repay, MARGIN_BUY = borrow, defaults to NO_SIDE_EFFECT
MarginSideEffect MarginOrderSideEffectType `json:"marginSideEffect,omitempty"` // AUTO_REPAY = repay, MARGIN_BUY = borrow, defaults to NO_SIDE_EFFECT
}
func (o *SubmitOrder) String() string {
@ -160,6 +160,9 @@ type Order struct {
func (o Order) Backup() SubmitOrder {
so := o.SubmitOrder
so.Quantity = o.Quantity - o.ExecutedQuantity
// ClientOrderID can not be reused
so.ClientOrderID = ""
return so
}