improve order execution graceful shutdown

This commit is contained in:
c9s 2021-05-14 14:52:19 +08:00
parent dc040bb82b
commit bb34b1002a
4 changed files with 194 additions and 26 deletions

View File

@ -162,10 +162,10 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ...
// Increase the quantity if the amount is not enough, // Increase the quantity if the amount is not enough,
// this is the only increase op, later we will decrease the quantity if it meets the criteria // this is the only increase op, later we will decrease the quantity if it meets the criteria
quantity = AdjustQuantityByMinAmount(quantity, price, market.MinAmount*1.01) quantity = AdjustFloatQuantityByMinAmount(quantity, price, market.MinAmount*1.01)
if c.MaxOrderAmount > 0 { if c.MaxOrderAmount > 0 {
quantity = AdjustQuantityByMaxAmount(quantity, price, c.MaxOrderAmount.Float64()) quantity = AdjustFloatQuantityByMaxAmount(quantity, price, c.MaxOrderAmount.Float64())
} }
quoteAssetQuota := math.Max(0.0, quoteBalance.Available.Float64()-c.MinQuoteBalance.Float64()) quoteAssetQuota := math.Max(0.0, quoteBalance.Available.Float64()-c.MinQuoteBalance.Float64())
@ -178,7 +178,7 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ...
continue continue
} }
quantity = AdjustQuantityByMaxAmount(quantity, price, quoteAssetQuota) quantity = AdjustFloatQuantityByMaxAmount(quantity, price, quoteAssetQuota)
// if MaxBaseAssetBalance is enabled, we should check the current base asset balance // if MaxBaseAssetBalance is enabled, we should check the current base asset balance
if baseBalance, hasBaseAsset := balances[market.BaseCurrency]; hasBaseAsset && c.MaxBaseAssetBalance > 0 { if baseBalance, hasBaseAsset := balances[market.BaseCurrency]; hasBaseAsset && c.MaxBaseAssetBalance > 0 {
@ -226,7 +226,7 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ...
} }
// if the amount is too small, we should increase it. // if the amount is too small, we should increase it.
quantity = AdjustQuantityByMinAmount(quantity, price, market.MinNotional*1.01) quantity = AdjustFloatQuantityByMinAmount(quantity, price, market.MinNotional*1.01)
// we should not SELL too much // we should not SELL too much
quantity = math.Min(quantity, baseAssetBalance.Available.Float64()) quantity = math.Min(quantity, baseAssetBalance.Available.Float64())
@ -253,7 +253,7 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ...
} }
if c.MaxOrderAmount > 0 { if c.MaxOrderAmount > 0 {
quantity = AdjustQuantityByMaxAmount(quantity, price, c.MaxOrderAmount.Float64()) quantity = AdjustFloatQuantityByMaxAmount(quantity, price, c.MaxOrderAmount.Float64())
} }
notional := quantity * lastPrice notional := quantity * lastPrice

View File

@ -1,6 +1,7 @@
package bbgo package bbgo
import ( import (
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -14,7 +15,19 @@ var (
) )
// AdjustQuantityByMinAmount adjusts the quantity to make the amount greater than the given minAmount // AdjustQuantityByMinAmount adjusts the quantity to make the amount greater than the given minAmount
func AdjustQuantityByMinAmount(quantity, currentPrice, minAmount float64) float64 { func AdjustQuantityByMinAmount(quantity, currentPrice, minAmount fixedpoint.Value) fixedpoint.Value {
// modify quantity for the min amount
amount := currentPrice.Mul(quantity)
if amount < minAmount {
ratio := minAmount.Div(amount)
quantity = quantity.Mul(ratio)
}
return quantity
}
// AdjustFloatQuantityByMinAmount adjusts the quantity to make the amount greater than the given minAmount
func AdjustFloatQuantityByMinAmount(quantity, currentPrice, minAmount float64) float64 {
// modify quantity for the min amount // modify quantity for the min amount
amount := currentPrice * quantity amount := currentPrice * quantity
if amount < minAmount { if amount < minAmount {
@ -25,7 +38,7 @@ func AdjustQuantityByMinAmount(quantity, currentPrice, minAmount float64) float6
return quantity return quantity
} }
func AdjustQuantityByMaxAmount(quantity float64, price float64, maxAmount float64) float64 { func AdjustFloatQuantityByMaxAmount(quantity float64, price float64, maxAmount float64) float64 {
amount := price * quantity amount := price * quantity
if amount > maxAmount { if amount > maxAmount {
ratio := maxAmount / amount ratio := maxAmount / amount

View File

@ -36,7 +36,7 @@ func TestAdjustQuantityByMinAmount(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
q := AdjustQuantityByMinAmount(test.args.quantity, test.args.price, test.args.minAmount) q := AdjustFloatQuantityByMinAmount(test.args.quantity, test.args.price, test.args.minAmount)
assert.Equal(t, test.wanted, q) assert.Equal(t, test.wanted, q)
}) })
} }

View File

@ -13,18 +13,20 @@ import (
"golang.org/x/time/rate" "golang.org/x/time/rate"
) )
const OrderExecutionReady = 1
type TwapExecution struct { type TwapExecution struct {
Session *ExchangeSession Session *ExchangeSession
Symbol string Symbol string
Side types.SideType Side types.SideType
TargetQuantity fixedpoint.Value TargetQuantity fixedpoint.Value
SliceQuantity fixedpoint.Value SliceQuantity fixedpoint.Value
StopPrice fixedpoint.Value
market types.Market market types.Market
marketDataStream types.Stream marketDataStream types.Stream
userDataStream types.Stream
userDataStream types.Stream
userDataStreamCtx context.Context
cancelUserDataStream context.CancelFunc
orderBook *types.StreamOrderBook orderBook *types.StreamOrderBook
currentPrice fixedpoint.Value currentPrice fixedpoint.Value
@ -34,6 +36,11 @@ type TwapExecution struct {
orderStore *OrderStore orderStore *OrderStore
position *Position position *Position
executionCtx context.Context
cancelExecution context.CancelFunc
stoppedC chan struct{}
state int state int
mu sync.Mutex mu sync.Mutex
@ -84,7 +91,6 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e
} }
newPrice := first.Price newPrice := first.Price
spread, ok := book.Spread() spread, ok := book.Spread()
if !ok { if !ok {
return orderForm, errors.New("can not calculate spread, neither bid price or ask price exists") return orderForm, errors.New("can not calculate spread, neither bid price or ask price exists")
@ -101,12 +107,55 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e
} }
} }
if e.StopPrice > 0 {
switch e.Side {
case types.SideTypeSell:
if newPrice < e.StopPrice {
log.Infof("%s order price %f is lower than the stop sell price %f, setting order price to the stop sell price %f",
e.Symbol,
newPrice.Float64(),
e.StopPrice.Float64(),
e.StopPrice.Float64())
newPrice = e.StopPrice
}
case types.SideTypeBuy:
if newPrice > e.StopPrice {
log.Infof("%s order price %f is higher than the stop buy price %f, setting order price to the stop buy price %f",
e.Symbol,
newPrice.Float64(),
e.StopPrice.Float64(),
e.StopPrice.Float64())
newPrice = e.StopPrice
}
}
}
minQuantity := fixedpoint.NewFromFloat(e.market.MinQuantity)
restQuantity := e.TargetQuantity - fixedpoint.Abs(e.position.Base)
if restQuantity < minQuantity {
return orderForm, fmt.Errorf("can not continue placing orders, rest quantity %f is less than the min quantity %f", restQuantity.Float64(), minQuantity.Float64())
}
// if the rest quantity in the next round is not enough, we should merge the rest quantity into this round
orderQuantity := e.SliceQuantity
nextRestQuantity := restQuantity - e.SliceQuantity
if nextRestQuantity < minQuantity {
orderQuantity = restQuantity
}
minNotional := fixedpoint.NewFromFloat(e.market.MinNotional)
orderAmount := newPrice.Mul(orderQuantity)
if orderAmount <= minNotional {
orderQuantity = AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional)
}
orderForm = types.SubmitOrder{ orderForm = types.SubmitOrder{
// ClientOrderID: "", // ClientOrderID: "",
Symbol: e.Symbol, Symbol: e.Symbol,
Side: e.Side, Side: e.Side,
Type: types.OrderTypeLimitMaker, Type: types.OrderTypeLimitMaker,
Quantity: e.SliceQuantity.Float64(), Quantity: orderQuantity.Float64(),
Price: newPrice.Float64(), Price: newPrice.Float64(),
Market: e.market, Market: e.market,
TimeInForce: "GTC", TimeInForce: "GTC",
@ -115,6 +164,7 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e
} }
func (e *TwapExecution) updateOrder(ctx context.Context) error { func (e *TwapExecution) updateOrder(ctx context.Context) error {
sideBook, err := e.getSideBook() sideBook, err := e.getSideBook()
if err != nil { if err != nil {
return err return err
@ -132,7 +182,7 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error {
orders := e.activeMakerOrders.Orders() orders := e.activeMakerOrders.Orders()
if len(orders) > 1 { if len(orders) > 1 {
log.Warnf("there are more than 1 open orders in the strategy...") log.Warnf("more than 1 %s open orders in the strategy...", e.Symbol)
} }
// get the first order // get the first order
@ -140,6 +190,29 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error {
price := fixedpoint.NewFromFloat(order.Price) price := fixedpoint.NewFromFloat(order.Price)
quantity := fixedpoint.NewFromFloat(order.Quantity) quantity := fixedpoint.NewFromFloat(order.Quantity)
remainingQuantity := order.Quantity - order.ExecutedQuantity
if remainingQuantity <= e.market.MinQuantity {
log.Infof("order remaining quantity %f is less than the market minimal quantity %f, skip updating order", remainingQuantity, e.market.MinQuantity)
return nil
}
if e.StopPrice > 0 {
switch e.Side {
case types.SideTypeBuy:
if first.Price > e.StopPrice {
log.Infof("%s first bid price %f is higher than the stop price %f, skip updating order", e.Symbol, first.Price.Float64(), e.StopPrice.Float64())
return nil
}
case types.SideTypeSell:
if first.Price < e.StopPrice {
log.Infof("%s first ask price %f is lower than the stop price %f, skip updating order", e.Symbol, first.Price.Float64(), e.StopPrice.Float64())
return nil
}
}
}
// if the first bid price or first ask price is the same to the current active order // if the first bid price or first ask price is the same to the current active order
// we should skip updating the order // we should skip updating the order
if first.Price == price { if first.Price == price {
@ -151,12 +224,12 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error {
// if there is no gap between the first price entry and the second price entry // if there is no gap between the first price entry and the second price entry
second, ok := sideBook.Second() second, ok := sideBook.Second()
if !ok { if !ok {
return fmt.Errorf("there is no secoond price on the %s order book %s, can not update", e.Symbol, e.Side) return fmt.Errorf("no secoond price on the %s order book %s, can not update", e.Symbol, e.Side)
} }
// if there is no gap // if there is no gap
if fixedpoint.Abs(first.Price-second.Price) == tickSize { if fixedpoint.Abs(first.Price-second.Price) == tickSize {
log.Infof("there is no gap between the second price %f and the first price %f (tick size = %f), skip updating", log.Infof("no gap between the second price %f and the first price %f (tick size = %f), skip updating",
first.Price.Float64(), first.Price.Float64(),
second.Price.Float64(), second.Price.Float64(),
tickSize.Float64()) tickSize.Float64())
@ -187,8 +260,10 @@ func (e *TwapExecution) cancelActiveOrders(ctx context.Context) {
for e.activeMakerOrders.NumOfOrders() > 0 { for e.activeMakerOrders.NumOfOrders() > 0 {
didCancel = true didCancel = true
log.Infof("canceling open orders...")
orders := e.activeMakerOrders.Orders() orders := e.activeMakerOrders.Orders()
log.Infof("canceling %d open orders:", len(orders))
e.activeMakerOrders.Print()
if err := e.Session.Exchange.CancelOrders(ctx, orders...); err != nil { if err := e.Session.Exchange.CancelOrders(ctx, orders...); err != nil {
log.WithError(err).Errorf("can not cancel %s orders", e.Symbol) log.WithError(err).Errorf("can not cancel %s orders", e.Symbol)
} }
@ -205,8 +280,13 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) {
ticker := time.NewTimer(5 * time.Second) ticker := time.NewTimer(5 * time.Second)
defer ticker.Stop() defer ticker.Stop()
// we should stop updater and clean up our open orders, if
// 1. the given context is canceled.
// 2. the base quantity equals to or greater than the target quantity
defer func() { defer func() {
e.cancelActiveOrders(context.Background()) e.cancelActiveOrders(context.Background())
e.cancelUserDataStream()
e.emitDone()
}() }()
for { for {
@ -219,6 +299,10 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) {
break break
} }
if e.cancelContextIfTargetQuantityFilled() {
return
}
if err := e.updateOrder(ctx); err != nil { if err := e.updateOrder(ctx); err != nil {
log.WithError(err).Errorf("order update failed") log.WithError(err).Errorf("order update failed")
} }
@ -228,6 +312,10 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) {
break break
} }
if e.cancelContextIfTargetQuantityFilled() {
return
}
if err := e.updateOrder(ctx); err != nil { if err := e.updateOrder(ctx); err != nil {
log.WithError(err).Errorf("order update failed") log.WithError(err).Errorf("order update failed")
} }
@ -236,6 +324,15 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) {
} }
} }
func (e *TwapExecution) cancelContextIfTargetQuantityFilled() bool {
if fixedpoint.Abs(e.position.Base) >= e.TargetQuantity {
log.Infof("filled target quantity, canceling the order execution context")
e.cancelExecution()
return true
}
return false
}
func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { func (e *TwapExecution) handleTradeUpdate(trade types.Trade) {
// ignore trades that are not in the symbol we interested // ignore trades that are not in the symbol we interested
if trade.Symbol != e.Symbol { if trade.Symbol != e.Symbol {
@ -246,19 +343,27 @@ func (e *TwapExecution) handleTradeUpdate(trade types.Trade) {
return return
} }
q := fixedpoint.NewFromFloat(trade.Quantity) log.Info(trade.String())
_ = q
e.position.AddTrade(trade) e.position.AddTrade(trade)
log.Infof("position updated: %+v", e.position) log.Infof("position updated: %+v", e.position)
} }
func (e *TwapExecution) handleFilledOrder(order types.Order) { func (e *TwapExecution) handleFilledOrder(order types.Order) {
log.Infof("order is filled: %s", order.String()) log.Info(order.String())
// filled event triggers the order removal from the active order store
// we need to ensure we received every order update event before the execution is done.
e.cancelContextIfTargetQuantityFilled()
} }
func (e *TwapExecution) Run(ctx context.Context) error { func (e *TwapExecution) Run(parentCtx context.Context) error {
e.mu.Lock()
e.stoppedC = make(chan struct{})
e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx)
e.userDataStreamCtx, e.cancelUserDataStream = context.WithCancel(context.Background())
e.mu.Unlock()
var ok bool var ok bool
e.market, ok = e.Session.Market(e.Symbol) e.market, ok = e.Session.Market(e.Symbol)
if !ok { if !ok {
@ -271,7 +376,7 @@ func (e *TwapExecution) Run(ctx context.Context) error {
e.orderBook = types.NewStreamBook(e.Symbol) e.orderBook = types.NewStreamBook(e.Symbol)
e.orderBook.BindStream(e.marketDataStream) e.orderBook.BindStream(e.marketDataStream)
go e.connectMarketData(ctx) go e.connectMarketData(e.executionCtx)
e.userDataStream = e.Session.Exchange.NewStream() e.userDataStream = e.Session.Exchange.NewStream()
e.userDataStream.OnTradeUpdate(e.handleTradeUpdate) e.userDataStream.OnTradeUpdate(e.handleTradeUpdate)
@ -287,11 +392,60 @@ func (e *TwapExecution) Run(ctx context.Context) error {
e.activeMakerOrders.OnFilled(e.handleFilledOrder) e.activeMakerOrders.OnFilled(e.handleFilledOrder)
e.activeMakerOrders.BindStream(e.userDataStream) e.activeMakerOrders.BindStream(e.userDataStream)
go e.connectUserData(ctx) go e.connectUserData(e.userDataStreamCtx)
go e.orderUpdater(ctx) go e.orderUpdater(e.executionCtx)
return nil return nil
} }
func (e *TwapExecution) emitDone() {
e.mu.Lock()
if e.stoppedC == nil {
e.stoppedC = make(chan struct{})
}
close(e.stoppedC)
e.mu.Unlock()
}
func (e *TwapExecution) Done() (c <-chan struct{}) {
e.mu.Lock()
// if the channel is not allocated, it means it's not started yet, we need to return a closed channel
if e.stoppedC == nil {
e.stoppedC = make(chan struct{})
close(e.stoppedC)
c = e.stoppedC
} else {
c = e.stoppedC
}
e.mu.Unlock()
return c
}
// Shutdown stops the execution
// If we call this method, it means the execution is still running,
// We need to:
// 1. stop the order updater (by using the execution context)
// 2. the order updater cancels all open orders and close the user data stream
func (e *TwapExecution) Shutdown(shutdownCtx context.Context) {
e.mu.Lock()
if e.cancelExecution != nil {
e.cancelExecution()
}
e.mu.Unlock()
for {
select {
case <-shutdownCtx.Done():
return
case <-e.Done():
return
}
}
}
type TwapOrderExecutor struct { type TwapOrderExecutor struct {
Session *ExchangeSession Session *ExchangeSession
@ -300,13 +454,14 @@ type TwapOrderExecutor struct {
DelayTime types.Duration DelayTime types.Duration
} }
func (e *TwapOrderExecutor) Execute(ctx context.Context, symbol string, side types.SideType, targetQuantity, sliceQuantity fixedpoint.Value) (*TwapExecution, error) { func (e *TwapOrderExecutor) Execute(ctx context.Context, symbol string, side types.SideType, targetQuantity, sliceQuantity, stopPrice fixedpoint.Value) (*TwapExecution, error) {
execution := &TwapExecution{ execution := &TwapExecution{
Session: e.Session, Session: e.Session,
Symbol: symbol, Symbol: symbol,
Side: side, Side: side,
TargetQuantity: targetQuantity, TargetQuantity: targetQuantity,
SliceQuantity: sliceQuantity, SliceQuantity: sliceQuantity,
StopPrice: stopPrice,
} }
err := execution.Run(ctx) err := execution.Run(ctx)
return execution, err return execution, err