bbgo_origin/pkg/bbgo/twap_order_executor.go

469 lines
12 KiB
Go
Raw Normal View History

2021-05-14 03:53:07 +00:00
package bbgo
import (
"context"
"fmt"
"sync"
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)
type TwapExecution struct {
Session *ExchangeSession
Symbol string
Side types.SideType
TargetQuantity fixedpoint.Value
SliceQuantity fixedpoint.Value
StopPrice fixedpoint.Value
2021-05-14 03:53:07 +00:00
market types.Market
marketDataStream types.Stream
userDataStream types.Stream
userDataStreamCtx context.Context
cancelUserDataStream context.CancelFunc
2021-05-14 04:23:07 +00:00
orderBook *types.StreamOrderBook
currentPrice fixedpoint.Value
activePosition fixedpoint.Value
2021-05-14 03:53:07 +00:00
activeMakerOrders *LocalActiveOrderBook
orderStore *OrderStore
position *Position
executionCtx context.Context
cancelExecution context.CancelFunc
stoppedC chan struct{}
2021-05-14 03:53:07 +00:00
state int
mu sync.Mutex
}
func (e *TwapExecution) connectMarketData(ctx context.Context) {
log.Infof("connecting market data stream...")
if err := e.marketDataStream.Connect(ctx); err != nil {
log.WithError(err).Errorf("market data stream connect error")
}
}
func (e *TwapExecution) connectUserData(ctx context.Context) {
log.Infof("connecting user data stream...")
if err := e.userDataStream.Connect(ctx); err != nil {
log.WithError(err).Errorf("user data stream connect error")
}
}
func (e *TwapExecution) getSideBook() (pvs types.PriceVolumeSlice, err error) {
book := e.orderBook.Get()
switch e.Side {
case types.SideTypeSell:
pvs = book.Asks
case types.SideTypeBuy:
pvs = book.Bids
default:
err = fmt.Errorf("invalid side type: %+v", e.Side)
}
return pvs, err
}
func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, err error) {
book := e.orderBook.Get()
sideBook, err := e.getSideBook()
if err != nil {
return orderForm, err
}
first, ok := sideBook.First()
if !ok {
return orderForm, fmt.Errorf("empty %s %s side book", e.Symbol, e.Side)
}
newPrice := first.Price
spread, ok := book.Spread()
if !ok {
return orderForm, errors.New("can not calculate spread, neither bid price or ask price exists")
}
tickSize := fixedpoint.NewFromFloat(e.market.TickSize)
if spread > tickSize {
2021-05-14 04:23:07 +00:00
log.Infof("spread %f is greater than the tick size %f, adding 1 tick to the price...", spread.Float64(), tickSize.Float64())
2021-05-14 03:53:07 +00:00
switch e.Side {
case types.SideTypeSell:
newPrice -= fixedpoint.NewFromFloat(e.market.TickSize)
case types.SideTypeBuy:
newPrice += fixedpoint.NewFromFloat(e.market.TickSize)
}
}
if e.StopPrice > 0 {
switch e.Side {
case types.SideTypeSell:
if newPrice < e.StopPrice {
log.Infof("%s order price %f is lower than the stop sell price %f, setting order price to the stop sell price %f",
e.Symbol,
newPrice.Float64(),
e.StopPrice.Float64(),
e.StopPrice.Float64())
newPrice = e.StopPrice
}
case types.SideTypeBuy:
if newPrice > e.StopPrice {
log.Infof("%s order price %f is higher than the stop buy price %f, setting order price to the stop buy price %f",
e.Symbol,
newPrice.Float64(),
e.StopPrice.Float64(),
e.StopPrice.Float64())
newPrice = e.StopPrice
}
}
}
minQuantity := fixedpoint.NewFromFloat(e.market.MinQuantity)
restQuantity := e.TargetQuantity - fixedpoint.Abs(e.position.Base)
if restQuantity < minQuantity {
return orderForm, fmt.Errorf("can not continue placing orders, rest quantity %f is less than the min quantity %f", restQuantity.Float64(), minQuantity.Float64())
}
// if the rest quantity in the next round is not enough, we should merge the rest quantity into this round
orderQuantity := e.SliceQuantity
nextRestQuantity := restQuantity - e.SliceQuantity
if nextRestQuantity < minQuantity {
orderQuantity = restQuantity
}
minNotional := fixedpoint.NewFromFloat(e.market.MinNotional)
orderAmount := newPrice.Mul(orderQuantity)
if orderAmount <= minNotional {
orderQuantity = AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional)
}
2021-05-14 03:53:07 +00:00
orderForm = types.SubmitOrder{
// ClientOrderID: "",
Symbol: e.Symbol,
Side: e.Side,
Type: types.OrderTypeLimitMaker,
Quantity: orderQuantity.Float64(),
2021-05-14 03:53:07 +00:00
Price: newPrice.Float64(),
Market: e.market,
TimeInForce: "GTC",
}
return orderForm, err
}
func (e *TwapExecution) updateOrder(ctx context.Context) error {
2021-05-14 03:53:07 +00:00
sideBook, err := e.getSideBook()
if err != nil {
return err
}
first, ok := sideBook.First()
if !ok {
return fmt.Errorf("empty %s %s side book", e.Symbol, e.Side)
}
tickSize := fixedpoint.NewFromFloat(e.market.TickSize)
// check and see if we need to cancel the existing active orders
for e.activeMakerOrders.NumOfOrders() > 0 {
orders := e.activeMakerOrders.Orders()
if len(orders) > 1 {
log.Warnf("more than 1 %s open orders in the strategy...", e.Symbol)
2021-05-14 03:53:07 +00:00
}
// get the first order
order := orders[0]
price := fixedpoint.NewFromFloat(order.Price)
quantity := fixedpoint.NewFromFloat(order.Quantity)
remainingQuantity := order.Quantity - order.ExecutedQuantity
if remainingQuantity <= e.market.MinQuantity {
log.Infof("order remaining quantity %f is less than the market minimal quantity %f, skip updating order", remainingQuantity, e.market.MinQuantity)
return nil
}
if e.StopPrice > 0 {
switch e.Side {
case types.SideTypeBuy:
if first.Price > e.StopPrice {
log.Infof("%s first bid price %f is higher than the stop price %f, skip updating order", e.Symbol, first.Price.Float64(), e.StopPrice.Float64())
return nil
}
case types.SideTypeSell:
if first.Price < e.StopPrice {
log.Infof("%s first ask price %f is lower than the stop price %f, skip updating order", e.Symbol, first.Price.Float64(), e.StopPrice.Float64())
return nil
}
}
}
2021-05-14 03:53:07 +00:00
// if the first bid price or first ask price is the same to the current active order
// we should skip updating the order
if first.Price == price {
// there are other orders in the same price, it means if we cancel ours, the price is still the best price.
if first.Volume > quantity {
return nil
}
// if there is no gap between the first price entry and the second price entry
second, ok := sideBook.Second()
if !ok {
return fmt.Errorf("no secoond price on the %s order book %s, can not update", e.Symbol, e.Side)
2021-05-14 03:53:07 +00:00
}
// if there is no gap
if fixedpoint.Abs(first.Price-second.Price) == tickSize {
log.Infof("no gap between the second price %f and the first price %f (tick size = %f), skip updating",
2021-05-14 04:23:07 +00:00
first.Price.Float64(),
second.Price.Float64(),
tickSize.Float64())
2021-05-14 03:53:07 +00:00
return nil
}
}
2021-05-14 04:23:07 +00:00
e.cancelActiveOrders(ctx)
2021-05-14 03:53:07 +00:00
}
orderForm, err := e.newBestPriceMakerOrder()
if err != nil {
return err
}
createdOrders, err := e.Session.OrderExecutor.SubmitOrders(ctx, orderForm)
if err != nil {
return err
}
e.activeMakerOrders.Add(createdOrders...)
e.orderStore.Add(createdOrders...)
return nil
}
2021-05-14 04:23:07 +00:00
func (e *TwapExecution) cancelActiveOrders(ctx context.Context) {
didCancel := false
for e.activeMakerOrders.NumOfOrders() > 0 {
didCancel = true
orders := e.activeMakerOrders.Orders()
log.Infof("canceling %d open orders:", len(orders))
e.activeMakerOrders.Print()
2021-05-14 04:23:07 +00:00
if err := e.Session.Exchange.CancelOrders(ctx, orders...); err != nil {
log.WithError(err).Errorf("can not cancel %s orders", e.Symbol)
}
time.Sleep(3 * time.Second)
}
if didCancel {
log.Infof("orders are canceled successfully")
}
}
2021-05-14 03:53:07 +00:00
func (e *TwapExecution) orderUpdater(ctx context.Context) {
rateLimiter := rate.NewLimiter(rate.Every(time.Minute), 15)
ticker := time.NewTimer(5 * time.Second)
defer ticker.Stop()
2021-05-14 04:23:07 +00:00
// we should stop updater and clean up our open orders, if
// 1. the given context is canceled.
// 2. the base quantity equals to or greater than the target quantity
2021-05-14 04:23:07 +00:00
defer func() {
e.cancelActiveOrders(context.Background())
e.cancelUserDataStream()
e.emitDone()
2021-05-14 04:23:07 +00:00
}()
2021-05-14 03:53:07 +00:00
for {
select {
case <-ctx.Done():
return
case <-e.orderBook.C:
if !rateLimiter.Allow() {
break
}
if e.cancelContextIfTargetQuantityFilled() {
return
}
2021-05-14 03:53:07 +00:00
if err := e.updateOrder(ctx); err != nil {
log.WithError(err).Errorf("order update failed")
}
case <-ticker.C:
if !rateLimiter.Allow() {
break
}
if e.cancelContextIfTargetQuantityFilled() {
return
}
2021-05-14 03:53:07 +00:00
if err := e.updateOrder(ctx); err != nil {
log.WithError(err).Errorf("order update failed")
}
}
}
}
func (e *TwapExecution) cancelContextIfTargetQuantityFilled() bool {
if fixedpoint.Abs(e.position.Base) >= e.TargetQuantity {
log.Infof("filled target quantity, canceling the order execution context")
e.cancelExecution()
return true
}
return false
}
2021-05-14 04:23:07 +00:00
func (e *TwapExecution) handleTradeUpdate(trade types.Trade) {
2021-05-14 03:53:07 +00:00
// ignore trades that are not in the symbol we interested
if trade.Symbol != e.Symbol {
return
}
if !e.orderStore.Exists(trade.OrderID) {
return
}
log.Info(trade.String())
2021-05-14 03:53:07 +00:00
e.position.AddTrade(trade)
log.Infof("position updated: %+v", e.position)
}
2021-05-14 04:23:07 +00:00
func (e *TwapExecution) handleFilledOrder(order types.Order) {
log.Info(order.String())
// filled event triggers the order removal from the active order store
// we need to ensure we received every order update event before the execution is done.
e.cancelContextIfTargetQuantityFilled()
2021-05-14 04:23:07 +00:00
}
func (e *TwapExecution) Run(parentCtx context.Context) error {
e.mu.Lock()
e.stoppedC = make(chan struct{})
e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx)
e.userDataStreamCtx, e.cancelUserDataStream = context.WithCancel(context.Background())
e.mu.Unlock()
2021-05-14 03:53:07 +00:00
var ok bool
e.market, ok = e.Session.Market(e.Symbol)
if !ok {
return fmt.Errorf("market %s not found", e.Symbol)
}
e.marketDataStream = e.Session.Exchange.NewStream()
e.marketDataStream.SetPublicOnly()
e.marketDataStream.Subscribe(types.BookChannel, e.Symbol, types.SubscribeOptions{})
e.orderBook = types.NewStreamBook(e.Symbol)
e.orderBook.BindStream(e.marketDataStream)
go e.connectMarketData(e.executionCtx)
2021-05-14 03:53:07 +00:00
e.userDataStream = e.Session.Exchange.NewStream()
2021-05-14 04:23:07 +00:00
e.userDataStream.OnTradeUpdate(e.handleTradeUpdate)
2021-05-14 03:53:07 +00:00
e.position = &Position{
Symbol: e.Symbol,
BaseCurrency: e.market.BaseCurrency,
QuoteCurrency: e.market.QuoteCurrency,
}
e.orderStore = NewOrderStore(e.Symbol)
e.orderStore.BindStream(e.userDataStream)
e.activeMakerOrders = NewLocalActiveOrderBook()
2021-05-14 04:23:07 +00:00
e.activeMakerOrders.OnFilled(e.handleFilledOrder)
2021-05-14 03:53:07 +00:00
e.activeMakerOrders.BindStream(e.userDataStream)
go e.connectUserData(e.userDataStreamCtx)
go e.orderUpdater(e.executionCtx)
2021-05-14 03:53:07 +00:00
return nil
}
func (e *TwapExecution) emitDone() {
e.mu.Lock()
if e.stoppedC == nil {
e.stoppedC = make(chan struct{})
}
close(e.stoppedC)
e.mu.Unlock()
}
func (e *TwapExecution) Done() (c <-chan struct{}) {
e.mu.Lock()
// if the channel is not allocated, it means it's not started yet, we need to return a closed channel
if e.stoppedC == nil {
e.stoppedC = make(chan struct{})
close(e.stoppedC)
c = e.stoppedC
} else {
c = e.stoppedC
}
e.mu.Unlock()
return c
}
// Shutdown stops the execution
// If we call this method, it means the execution is still running,
// We need to:
// 1. stop the order updater (by using the execution context)
// 2. the order updater cancels all open orders and close the user data stream
func (e *TwapExecution) Shutdown(shutdownCtx context.Context) {
e.mu.Lock()
if e.cancelExecution != nil {
e.cancelExecution()
}
e.mu.Unlock()
for {
select {
case <-shutdownCtx.Done():
return
case <-e.Done():
return
}
}
}
2021-05-14 03:53:07 +00:00
type TwapOrderExecutor struct {
Session *ExchangeSession
// Execution parameters
// DelayTime is the order update delay time
DelayTime types.Duration
}
func (e *TwapOrderExecutor) Execute(ctx context.Context, symbol string, side types.SideType, targetQuantity, sliceQuantity, stopPrice fixedpoint.Value) (*TwapExecution, error) {
2021-05-14 03:53:07 +00:00
execution := &TwapExecution{
Session: e.Session,
Symbol: symbol,
Side: side,
TargetQuantity: targetQuantity,
SliceQuantity: sliceQuantity,
StopPrice: stopPrice,
2021-05-14 03:53:07 +00:00
}
err := execution.Run(ctx)
return execution, err
}