mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-10 09:11:55 +00:00
twap: move twap execution to a single package
This commit is contained in:
parent
daa1def6d9
commit
621a2b86cf
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
|
"github.com/c9s/bbgo/pkg/twap"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/bbgo"
|
"github.com/c9s/bbgo/pkg/bbgo"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
|
@ -255,7 +256,7 @@ var executeOrderCmd = &cobra.Command{
|
||||||
executionCtx, cancelExecution := context.WithCancel(ctx)
|
executionCtx, cancelExecution := context.WithCancel(ctx)
|
||||||
defer cancelExecution()
|
defer cancelExecution()
|
||||||
|
|
||||||
execution := &bbgo.TwapExecution{
|
execution := &twap.Execution{
|
||||||
Session: session,
|
Session: session,
|
||||||
Symbol: symbol,
|
Symbol: symbol,
|
||||||
Side: side,
|
Side: side,
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package bbgo
|
package twap
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -7,16 +7,17 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
log "github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/bbgo"
|
||||||
"github.com/c9s/bbgo/pkg/core"
|
"github.com/c9s/bbgo/pkg/core"
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type TwapExecution struct {
|
type Execution struct {
|
||||||
Session *ExchangeSession
|
Session *bbgo.ExchangeSession
|
||||||
Symbol string
|
Symbol string
|
||||||
Side types.SideType
|
Side types.SideType
|
||||||
TargetQuantity fixedpoint.Value
|
TargetQuantity fixedpoint.Value
|
||||||
|
@ -37,7 +38,7 @@ type TwapExecution struct {
|
||||||
currentPrice fixedpoint.Value
|
currentPrice fixedpoint.Value
|
||||||
activePosition fixedpoint.Value
|
activePosition fixedpoint.Value
|
||||||
|
|
||||||
activeMakerOrders *ActiveOrderBook
|
activeMakerOrders *bbgo.ActiveOrderBook
|
||||||
orderStore *core.OrderStore
|
orderStore *core.OrderStore
|
||||||
position *types.Position
|
position *types.Position
|
||||||
|
|
||||||
|
@ -51,21 +52,21 @@ type TwapExecution struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TwapExecution) connectMarketData(ctx context.Context) {
|
func (e *Execution) connectMarketData(ctx context.Context) {
|
||||||
log.Infof("connecting market data stream...")
|
logrus.Infof("connecting market data stream...")
|
||||||
if err := e.marketDataStream.Connect(ctx); err != nil {
|
if err := e.marketDataStream.Connect(ctx); err != nil {
|
||||||
log.WithError(err).Errorf("market data stream connect error")
|
logrus.WithError(err).Errorf("market data stream connect error")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TwapExecution) connectUserData(ctx context.Context) {
|
func (e *Execution) connectUserData(ctx context.Context) {
|
||||||
log.Infof("connecting user data stream...")
|
logrus.Infof("connecting user data stream...")
|
||||||
if err := e.userDataStream.Connect(ctx); err != nil {
|
if err := e.userDataStream.Connect(ctx); err != nil {
|
||||||
log.WithError(err).Errorf("user data stream connect error")
|
logrus.WithError(err).Errorf("user data stream connect error")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err error) {
|
func (e *Execution) newBestPriceOrder() (orderForm types.SubmitOrder, err error) {
|
||||||
book := e.orderBook.Copy()
|
book := e.orderBook.Copy()
|
||||||
sideBook := book.SideBook(e.Side)
|
sideBook := book.SideBook(e.Side)
|
||||||
|
|
||||||
|
@ -111,7 +112,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er
|
||||||
switch e.Side {
|
switch e.Side {
|
||||||
case types.SideTypeSell:
|
case types.SideTypeSell:
|
||||||
if newPrice.Compare(e.StopPrice) < 0 {
|
if newPrice.Compare(e.StopPrice) < 0 {
|
||||||
log.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s",
|
logrus.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s",
|
||||||
e.Symbol,
|
e.Symbol,
|
||||||
newPrice.String(),
|
newPrice.String(),
|
||||||
e.StopPrice.String(),
|
e.StopPrice.String(),
|
||||||
|
@ -121,7 +122,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er
|
||||||
|
|
||||||
case types.SideTypeBuy:
|
case types.SideTypeBuy:
|
||||||
if newPrice.Compare(e.StopPrice) > 0 {
|
if newPrice.Compare(e.StopPrice) > 0 {
|
||||||
log.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s",
|
logrus.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s",
|
||||||
e.Symbol,
|
e.Symbol,
|
||||||
newPrice.String(),
|
newPrice.String(),
|
||||||
e.StopPrice.String(),
|
e.StopPrice.String(),
|
||||||
|
@ -157,7 +158,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er
|
||||||
}
|
}
|
||||||
|
|
||||||
minNotional := e.market.MinNotional
|
minNotional := e.market.MinNotional
|
||||||
orderQuantity = AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional)
|
orderQuantity = bbgo.AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional)
|
||||||
|
|
||||||
switch e.Side {
|
switch e.Side {
|
||||||
case types.SideTypeSell:
|
case types.SideTypeSell:
|
||||||
|
@ -169,11 +170,11 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er
|
||||||
case types.SideTypeBuy:
|
case types.SideTypeBuy:
|
||||||
// check base balance for sell, try to sell as more as possible
|
// check base balance for sell, try to sell as more as possible
|
||||||
if b, ok := e.Session.GetAccount().Balance(e.market.QuoteCurrency); ok {
|
if b, ok := e.Session.GetAccount().Balance(e.market.QuoteCurrency); ok {
|
||||||
orderQuantity = AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available)
|
orderQuantity = bbgo.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if e.DeadlineTime != emptyTime {
|
if !e.DeadlineTime.IsZero() {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
if now.After(e.DeadlineTime) {
|
if now.After(e.DeadlineTime) {
|
||||||
orderForm = types.SubmitOrder{
|
orderForm = types.SubmitOrder{
|
||||||
|
@ -200,7 +201,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er
|
||||||
return orderForm, err
|
return orderForm, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TwapExecution) updateOrder(ctx context.Context) error {
|
func (e *Execution) updateOrder(ctx context.Context) error {
|
||||||
book := e.orderBook.Copy()
|
book := e.orderBook.Copy()
|
||||||
sideBook := book.SideBook(e.Side)
|
sideBook := book.SideBook(e.Side)
|
||||||
|
|
||||||
|
@ -224,7 +225,7 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error {
|
||||||
orders := e.activeMakerOrders.Orders()
|
orders := e.activeMakerOrders.Orders()
|
||||||
|
|
||||||
if len(orders) > 1 {
|
if len(orders) > 1 {
|
||||||
log.Warnf("more than 1 %s open orders in the strategy...", e.Symbol)
|
logrus.Warnf("more than 1 %s open orders in the strategy...", e.Symbol)
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the first order
|
// get the first order
|
||||||
|
@ -234,7 +235,7 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error {
|
||||||
|
|
||||||
remainingQuantity := order.Quantity.Sub(order.ExecutedQuantity)
|
remainingQuantity := order.Quantity.Sub(order.ExecutedQuantity)
|
||||||
if remainingQuantity.Compare(e.market.MinQuantity) <= 0 {
|
if remainingQuantity.Compare(e.market.MinQuantity) <= 0 {
|
||||||
log.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String())
|
logrus.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,24 +244,24 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error {
|
||||||
// DO NOT UPDATE IF:
|
// DO NOT UPDATE IF:
|
||||||
// tickSpread > 0 AND current order price == second price + tickSpread
|
// tickSpread > 0 AND current order price == second price + tickSpread
|
||||||
// current order price == first price
|
// current order price == first price
|
||||||
log.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String())
|
logrus.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String())
|
||||||
|
|
||||||
switch e.Side {
|
switch e.Side {
|
||||||
case types.SideTypeBuy:
|
case types.SideTypeBuy:
|
||||||
if tickSpread.Sign() > 0 && orderPrice == second.Price.Add(tickSpread) {
|
if tickSpread.Sign() > 0 && orderPrice == second.Price.Add(tickSpread) {
|
||||||
log.Infof("the current order is already on the best ask price %s", orderPrice.String())
|
logrus.Infof("the current order is already on the best ask price %s", orderPrice.String())
|
||||||
return nil
|
return nil
|
||||||
} else if orderPrice == first.Price {
|
} else if orderPrice == first.Price {
|
||||||
log.Infof("the current order is already on the best bid price %s", orderPrice.String())
|
logrus.Infof("the current order is already on the best bid price %s", orderPrice.String())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
case types.SideTypeSell:
|
case types.SideTypeSell:
|
||||||
if tickSpread.Sign() > 0 && orderPrice == second.Price.Sub(tickSpread) {
|
if tickSpread.Sign() > 0 && orderPrice == second.Price.Sub(tickSpread) {
|
||||||
log.Infof("the current order is already on the best ask price %s", orderPrice.String())
|
logrus.Infof("the current order is already on the best ask price %s", orderPrice.String())
|
||||||
return nil
|
return nil
|
||||||
} else if orderPrice == first.Price {
|
} else if orderPrice == first.Price {
|
||||||
log.Infof("the current order is already on the best ask price %s", orderPrice.String())
|
logrus.Infof("the current order is already on the best ask price %s", orderPrice.String())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -283,13 +284,13 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TwapExecution) cancelActiveOrders() {
|
func (e *Execution) cancelActiveOrders() {
|
||||||
gracefulCtx, gracefulCancel := context.WithTimeout(context.TODO(), 30*time.Second)
|
gracefulCtx, gracefulCancel := context.WithTimeout(context.TODO(), 30*time.Second)
|
||||||
defer gracefulCancel()
|
defer gracefulCancel()
|
||||||
e.activeMakerOrders.GracefulCancel(gracefulCtx, e.Session.Exchange)
|
e.activeMakerOrders.GracefulCancel(gracefulCtx, e.Session.Exchange)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TwapExecution) orderUpdater(ctx context.Context) {
|
func (e *Execution) orderUpdater(ctx context.Context) {
|
||||||
updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1)
|
updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1)
|
||||||
ticker := time.NewTimer(e.UpdateInterval)
|
ticker := time.NewTimer(e.UpdateInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
@ -317,9 +318,9 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("%s order book changed, checking order...", e.Symbol)
|
logrus.Infof("%s order book changed, checking order...", e.Symbol)
|
||||||
if err := e.updateOrder(ctx); err != nil {
|
if err := e.updateOrder(ctx); err != nil {
|
||||||
log.WithError(err).Errorf("order update failed")
|
logrus.WithError(err).Errorf("order update failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
@ -332,25 +333,25 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := e.updateOrder(ctx); err != nil {
|
if err := e.updateOrder(ctx); err != nil {
|
||||||
log.WithError(err).Errorf("order update failed")
|
logrus.WithError(err).Errorf("order update failed")
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TwapExecution) cancelContextIfTargetQuantityFilled() bool {
|
func (e *Execution) cancelContextIfTargetQuantityFilled() bool {
|
||||||
base := e.position.GetBase()
|
base := e.position.GetBase()
|
||||||
|
|
||||||
if base.Abs().Compare(e.TargetQuantity) >= 0 {
|
if base.Abs().Compare(e.TargetQuantity) >= 0 {
|
||||||
log.Infof("filled target quantity, canceling the order execution context")
|
logrus.Infof("filled target quantity, canceling the order execution context")
|
||||||
e.cancelExecution()
|
e.cancelExecution()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TwapExecution) handleTradeUpdate(trade types.Trade) {
|
func (e *Execution) handleTradeUpdate(trade types.Trade) {
|
||||||
// ignore trades that are not in the symbol we interested
|
// ignore trades that are not in the symbol we interested
|
||||||
if trade.Symbol != e.Symbol {
|
if trade.Symbol != e.Symbol {
|
||||||
return
|
return
|
||||||
|
@ -360,21 +361,21 @@ func (e *TwapExecution) handleTradeUpdate(trade types.Trade) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info(trade.String())
|
logrus.Info(trade.String())
|
||||||
|
|
||||||
e.position.AddTrade(trade)
|
e.position.AddTrade(trade)
|
||||||
log.Infof("position updated: %+v", e.position)
|
logrus.Infof("position updated: %+v", e.position)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TwapExecution) handleFilledOrder(order types.Order) {
|
func (e *Execution) handleFilledOrder(order types.Order) {
|
||||||
log.Info(order.String())
|
logrus.Info(order.String())
|
||||||
|
|
||||||
// filled event triggers the order removal from the active order store
|
// filled event triggers the order removal from the active order store
|
||||||
// we need to ensure we received every order update event before the execution is done.
|
// we need to ensure we received every order update event before the execution is done.
|
||||||
e.cancelContextIfTargetQuantityFilled()
|
e.cancelContextIfTargetQuantityFilled()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TwapExecution) Run(parentCtx context.Context) error {
|
func (e *Execution) Run(parentCtx context.Context) error {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
e.stoppedC = make(chan struct{})
|
e.stoppedC = make(chan struct{})
|
||||||
e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx)
|
e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx)
|
||||||
|
@ -409,7 +410,7 @@ func (e *TwapExecution) Run(parentCtx context.Context) error {
|
||||||
|
|
||||||
e.orderStore = core.NewOrderStore(e.Symbol)
|
e.orderStore = core.NewOrderStore(e.Symbol)
|
||||||
e.orderStore.BindStream(e.userDataStream)
|
e.orderStore.BindStream(e.userDataStream)
|
||||||
e.activeMakerOrders = NewActiveOrderBook(e.Symbol)
|
e.activeMakerOrders = bbgo.NewActiveOrderBook(e.Symbol)
|
||||||
e.activeMakerOrders.OnFilled(e.handleFilledOrder)
|
e.activeMakerOrders.OnFilled(e.handleFilledOrder)
|
||||||
e.activeMakerOrders.BindStream(e.userDataStream)
|
e.activeMakerOrders.BindStream(e.userDataStream)
|
||||||
|
|
||||||
|
@ -418,7 +419,7 @@ func (e *TwapExecution) Run(parentCtx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TwapExecution) emitDone() {
|
func (e *Execution) emitDone() {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
if e.stoppedC == nil {
|
if e.stoppedC == nil {
|
||||||
e.stoppedC = make(chan struct{})
|
e.stoppedC = make(chan struct{})
|
||||||
|
@ -427,7 +428,7 @@ func (e *TwapExecution) emitDone() {
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *TwapExecution) Done() (c <-chan struct{}) {
|
func (e *Execution) Done() (c <-chan struct{}) {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
// if the channel is not allocated, it means it's not started yet, we need to return a closed channel
|
// if the channel is not allocated, it means it's not started yet, we need to return a closed channel
|
||||||
if e.stoppedC == nil {
|
if e.stoppedC == nil {
|
||||||
|
@ -447,7 +448,7 @@ func (e *TwapExecution) Done() (c <-chan struct{}) {
|
||||||
// We need to:
|
// We need to:
|
||||||
// 1. stop the order updater (by using the execution context)
|
// 1. stop the order updater (by using the execution context)
|
||||||
// 2. the order updater cancels all open orders and close the user data stream
|
// 2. the order updater cancels all open orders and close the user data stream
|
||||||
func (e *TwapExecution) Shutdown(shutdownCtx context.Context) {
|
func (e *Execution) Shutdown(shutdownCtx context.Context) {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
if e.cancelExecution != nil {
|
if e.cancelExecution != nil {
|
||||||
e.cancelExecution()
|
e.cancelExecution()
|
Loading…
Reference in New Issue
Block a user