2024-08-17 10:00:49 +00:00
package twap
import (
"context"
"errors"
2024-08-18 05:16:55 +00:00
"fmt"
2024-08-17 10:00:49 +00:00
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
var defaultUpdateInterval = time . Minute
type DoneSignal struct {
doneC chan struct { }
mu sync . Mutex
}
func NewDoneSignal ( ) * DoneSignal {
return & DoneSignal {
doneC : make ( chan struct { } ) ,
}
}
func ( e * DoneSignal ) Emit ( ) {
e . mu . Lock ( )
if e . doneC == nil {
e . doneC = make ( chan struct { } )
}
close ( e . doneC )
e . mu . Unlock ( )
}
// Chan returns a channel that emits a signal when the execution is done.
func ( e * DoneSignal ) Chan ( ) ( c <- chan struct { } ) {
// if the channel is not allocated, it means it's not started yet, we need to return a closed channel
e . mu . Lock ( )
if e . doneC == nil {
e . doneC = make ( chan struct { } )
c = e . doneC
} else {
c = e . doneC
}
e . mu . Unlock ( )
return c
}
// FixedQuantityExecutor is a TWAP executor that places orders on the exchange using the exchange's stream API.
// It uses a fixed target quantity to place orders.
type FixedQuantityExecutor struct {
exchange types . Exchange
// configuration fields
2024-08-18 05:16:55 +00:00
symbol string
side types . SideType
targetQuantity , sliceQuantity fixedpoint . Value
2024-08-17 10:00:49 +00:00
// updateInterval is a fixed update interval for placing new order
updateInterval time . Duration
// delayInterval is the delay interval between each order placement
delayInterval time . Duration
2024-08-18 05:16:55 +00:00
// numOfTicks is the number of price ticks behind the best bid to place the order
numOfTicks int
// stopPrice is the price limit for the order
2024-08-17 10:00:49 +00:00
// for buy-orders, the price limit is the maximum price
// for sell-orders, the price limit is the minimum price
2024-08-18 05:16:55 +00:00
stopPrice fixedpoint . Value
2024-08-17 10:00:49 +00:00
// deadlineTime is the deadline time for the order execution
deadlineTime * time . Time
executionCtx context . Context
cancelExecution context . CancelFunc
userDataStreamCtx context . Context
cancelUserDataStream context . CancelFunc
market types . Market
marketDataStream types . Stream
2024-08-19 07:37:56 +00:00
orderBook * types . StreamOrderBook
userDataStream types . Stream
2024-08-17 10:00:49 +00:00
activeMakerOrders * bbgo . ActiveOrderBook
orderStore * core . OrderStore
position * types . Position
2024-08-18 05:16:55 +00:00
tradeCollector * core . TradeCollector
2024-08-17 10:00:49 +00:00
logger logrus . FieldLogger
mu sync . Mutex
2024-08-19 07:37:56 +00:00
userDataStreamConnectC chan struct { }
marketDataStreamConnectC chan struct { }
done * DoneSignal
2024-08-17 10:00:49 +00:00
}
func NewStreamExecutor (
exchange types . Exchange ,
symbol string ,
market types . Market ,
side types . SideType ,
2024-08-18 05:16:55 +00:00
targetQuantity , sliceQuantity fixedpoint . Value ,
2024-08-17 10:00:49 +00:00
) * FixedQuantityExecutor {
2024-08-19 07:37:56 +00:00
marketDataStream := exchange . NewStream ( )
marketDataStream . SetPublicOnly ( )
marketDataStream . Subscribe ( types . BookChannel , symbol , types . SubscribeOptions {
Depth : types . DepthLevelMedium ,
} )
orderBook := types . NewStreamBook ( symbol )
orderBook . BindStream ( marketDataStream )
userDataStream := exchange . NewStream ( )
2024-08-18 05:16:55 +00:00
orderStore := core . NewOrderStore ( symbol )
position := types . NewPositionFromMarket ( market )
tradeCollector := core . NewTradeCollector ( symbol , position , orderStore )
2024-08-19 07:37:56 +00:00
orderStore . BindStream ( userDataStream )
activeMakerOrders := bbgo . NewActiveOrderBook ( symbol )
e := & FixedQuantityExecutor {
2024-08-17 10:00:49 +00:00
exchange : exchange ,
symbol : symbol ,
side : side ,
market : market ,
targetQuantity : targetQuantity ,
2024-08-18 05:16:55 +00:00
sliceQuantity : sliceQuantity ,
2024-08-17 10:00:49 +00:00
updateInterval : defaultUpdateInterval ,
logger : logrus . WithFields ( logrus . Fields {
"executor" : "twapStream" ,
"symbol" : symbol ,
} ) ,
2024-08-19 07:37:56 +00:00
marketDataStream : marketDataStream ,
orderBook : orderBook ,
userDataStream : userDataStream ,
activeMakerOrders : activeMakerOrders ,
orderStore : orderStore ,
tradeCollector : tradeCollector ,
position : position ,
done : NewDoneSignal ( ) ,
userDataStreamConnectC : make ( chan struct { } ) ,
marketDataStreamConnectC : make ( chan struct { } ) ,
2024-08-17 10:00:49 +00:00
}
2024-08-19 07:37:56 +00:00
e . tradeCollector . OnTrade ( func ( trade types . Trade , profit fixedpoint . Value , netProfit fixedpoint . Value ) {
e . logger . Info ( trade . String ( ) )
} )
e . tradeCollector . BindStream ( e . userDataStream )
activeMakerOrders . OnFilled ( e . handleFilledOrder )
activeMakerOrders . BindStream ( e . userDataStream )
e . marketDataStream . OnConnect ( func ( ) {
e . logger . Info ( "market data stream on connect" )
close ( e . marketDataStreamConnectC )
e . logger . Infof ( "marketDataStreamConnectC closed" )
} )
// private channels
e . userDataStream . OnAuth ( func ( ) {
e . logger . Info ( "user data stream on auth" )
close ( e . userDataStreamConnectC )
e . logger . Info ( "userDataStreamConnectC closed" )
} )
return e
2024-08-17 10:00:49 +00:00
}
func ( e * FixedQuantityExecutor ) SetDeadlineTime ( t time . Time ) {
e . deadlineTime = & t
}
func ( e * FixedQuantityExecutor ) SetDelayInterval ( delayInterval time . Duration ) {
e . delayInterval = delayInterval
}
func ( e * FixedQuantityExecutor ) SetUpdateInterval ( updateInterval time . Duration ) {
e . updateInterval = updateInterval
}
func ( e * FixedQuantityExecutor ) connectMarketData ( ctx context . Context ) {
e . logger . Infof ( "connecting market data stream..." )
if err := e . marketDataStream . Connect ( ctx ) ; err != nil {
e . logger . WithError ( err ) . Errorf ( "market data stream connect error" )
}
}
func ( e * FixedQuantityExecutor ) connectUserData ( ctx context . Context ) {
e . logger . Infof ( "connecting user data stream..." )
if err := e . userDataStream . Connect ( ctx ) ; err != nil {
e . logger . WithError ( err ) . Errorf ( "user data stream connect error" )
}
}
func ( e * FixedQuantityExecutor ) handleFilledOrder ( order types . Order ) {
e . logger . Info ( order . String ( ) )
// filled event triggers the order removal from the active order store
// we need to ensure we received every order update event before the execution is done.
e . cancelContextIfTargetQuantityFilled ( )
}
func ( e * FixedQuantityExecutor ) cancelContextIfTargetQuantityFilled ( ) bool {
base := e . position . GetBase ( )
if base . Abs ( ) . Compare ( e . targetQuantity ) >= 0 {
e . logger . Infof ( "filled target quantity, canceling the order execution context" )
e . cancelExecution ( )
return true
}
return false
}
func ( e * FixedQuantityExecutor ) cancelActiveOrders ( ctx context . Context ) error {
gracefulCtx , gracefulCancel := context . WithTimeout ( ctx , 30 * time . Second )
defer gracefulCancel ( )
return e . activeMakerOrders . GracefulCancel ( gracefulCtx , e . exchange )
}
func ( e * FixedQuantityExecutor ) orderUpdater ( ctx context . Context ) {
2024-08-19 09:58:25 +00:00
// updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 2)
2024-08-17 10:00:49 +00:00
defer func ( ) {
if err := e . cancelActiveOrders ( ctx ) ; err != nil {
e . logger . WithError ( err ) . Error ( "cancel active orders error" )
}
e . cancelUserDataStream ( )
e . done . Emit ( )
} ( )
ticker := time . NewTimer ( e . updateInterval )
defer ticker . Stop ( )
2024-08-18 05:16:55 +00:00
monitor := NewBboMonitor ( )
2024-08-17 10:00:49 +00:00
for {
select {
case <- ctx . Done ( ) :
return
case <- e . orderBook . C :
2024-08-18 05:16:55 +00:00
changed := monitor . OnUpdateFromBook ( e . orderBook )
if ! changed {
continue
}
// orderBook.C sends a signal when any price or quantity changes in the order book
2024-08-19 09:58:25 +00:00
/ *
if ! updateLimiter . Allow ( ) {
break
}
* /
2024-08-17 10:00:49 +00:00
if e . cancelContextIfTargetQuantityFilled ( ) {
return
}
e . logger . Infof ( "%s order book changed, checking order..." , e . symbol )
2024-08-18 05:16:55 +00:00
if err := e . updateOrder ( ctx ) ; err != nil {
e . logger . WithError ( err ) . Errorf ( "order update failed" )
}
2024-08-17 10:00:49 +00:00
case <- ticker . C :
2024-08-18 05:16:55 +00:00
changed := monitor . OnUpdateFromBook ( e . orderBook )
if ! changed {
continue
}
2024-08-19 09:58:25 +00:00
/ *
if ! updateLimiter . Allow ( ) {
break
}
* /
2024-08-17 10:00:49 +00:00
if e . cancelContextIfTargetQuantityFilled ( ) {
return
}
2024-08-18 05:16:55 +00:00
if err := e . updateOrder ( ctx ) ; err != nil {
e . logger . WithError ( err ) . Errorf ( "order update failed" )
}
}
}
}
func ( e * FixedQuantityExecutor ) updateOrder ( ctx context . Context ) error {
book := e . orderBook . Copy ( )
sideBook := book . SideBook ( e . side )
first , ok := sideBook . First ( )
if ! ok {
return fmt . Errorf ( "empty %s %s side book" , e . symbol , e . side )
}
// if there is no gap between the first price entry and the second price entry
second , ok := sideBook . Second ( )
if ! ok {
return fmt . Errorf ( "no secoond price on the %s order book %s, can not update" , e . symbol , e . side )
}
tickSize := e . market . TickSize
numOfTicks := fixedpoint . NewFromInt ( int64 ( e . numOfTicks ) )
tickSpread := tickSize . Mul ( numOfTicks )
// check and see if we need to cancel the existing active orders
for e . activeMakerOrders . NumOfOrders ( ) > 0 {
orders := e . activeMakerOrders . Orders ( )
if len ( orders ) > 1 {
2024-08-19 07:37:56 +00:00
e . logger . Warnf ( "found more than 1 %s open orders on the orderbook" , e . symbol )
2024-08-18 05:16:55 +00:00
}
// get the first active order
order := orders [ 0 ]
orderPrice := order . Price
// quantity := fixedpoint.NewFromFloat(order.Quantity)
remainingQuantity := order . Quantity . Sub ( order . ExecutedQuantity )
if remainingQuantity . Compare ( e . market . MinQuantity ) <= 0 {
logrus . Infof ( "order remaining quantity %s is less than the market minimal quantity %s, skip updating order" , remainingQuantity . String ( ) , e . market . MinQuantity . String ( ) )
return nil
}
// if the first bid price or first ask price is the same to the current active order
// we should skip updating the order
// DO NOT UPDATE IF:
// tickSpread > 0 AND current order price == second price + tickSpread
// current order price == first price
logrus . Infof ( "orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s" , orderPrice . String ( ) , first . Price . String ( ) , second . Price . String ( ) , tickSpread . String ( ) )
switch e . side {
case types . SideTypeBuy :
if tickSpread . Sign ( ) > 0 && orderPrice == second . Price . Add ( tickSpread ) {
logrus . Infof ( "the current order is already on the best ask price %s" , orderPrice . String ( ) )
return nil
} else if orderPrice == first . Price {
logrus . Infof ( "the current order is already on the best bid price %s" , orderPrice . String ( ) )
return nil
}
case types . SideTypeSell :
if tickSpread . Sign ( ) > 0 && orderPrice == second . Price . Sub ( tickSpread ) {
logrus . Infof ( "the current order is already on the best ask price %s" , orderPrice . String ( ) )
return nil
} else if orderPrice == first . Price {
logrus . Infof ( "the current order is already on the best ask price %s" , orderPrice . String ( ) )
return nil
}
}
if err := e . cancelActiveOrders ( ctx ) ; err != nil {
e . logger . Warnf ( "cancel active orders error: %v" , err )
}
}
2024-08-19 09:58:25 +00:00
e . tradeCollector . Process ( )
2024-08-18 05:16:55 +00:00
orderForm , err := e . generateOrder ( )
if err != nil {
return err
} else if orderForm == nil {
return nil
}
createdOrder , err := e . exchange . SubmitOrder ( ctx , * orderForm )
if err != nil {
return err
}
if createdOrder != nil {
e . orderStore . Add ( * createdOrder )
e . activeMakerOrders . Add ( * createdOrder )
e . tradeCollector . Process ( )
}
return nil
}
func ( e * FixedQuantityExecutor ) getNewPrice ( ) ( fixedpoint . Value , error ) {
newPrice := fixedpoint . Zero
book := e . orderBook . Copy ( )
sideBook := book . SideBook ( e . side )
first , ok := sideBook . First ( )
if ! ok {
return newPrice , fmt . Errorf ( "empty %s %s side book" , e . symbol , e . side )
}
newPrice = first . Price
spread , ok := book . Spread ( )
if ! ok {
return newPrice , errors . New ( "can not calculate spread, neither bid price or ask price exists" )
}
tickSize := e . market . TickSize
tickSpread := tickSize . Mul ( fixedpoint . NewFromInt ( int64 ( e . numOfTicks ) ) )
if spread . Compare ( tickSize ) > 0 {
// there is a gap in the spread
tickSpread = fixedpoint . Min ( tickSpread , spread . Sub ( tickSize ) )
switch e . side {
case types . SideTypeSell :
newPrice = newPrice . Sub ( tickSpread )
case types . SideTypeBuy :
newPrice = newPrice . Add ( tickSpread )
}
}
if e . stopPrice . Sign ( ) > 0 {
switch e . side {
case types . SideTypeSell :
if newPrice . Compare ( e . stopPrice ) < 0 {
logrus . Infof ( "%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s" ,
e . symbol ,
newPrice . String ( ) ,
e . stopPrice . String ( ) ,
e . stopPrice . String ( ) )
newPrice = e . stopPrice
}
case types . SideTypeBuy :
if newPrice . Compare ( e . stopPrice ) > 0 {
logrus . Infof ( "%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s" ,
e . symbol ,
newPrice . String ( ) ,
e . stopPrice . String ( ) ,
e . stopPrice . String ( ) )
newPrice = e . stopPrice
}
}
}
return newPrice , nil
}
2024-08-19 07:37:56 +00:00
func ( e * FixedQuantityExecutor ) getRemainingQuantity ( ) fixedpoint . Value {
base := e . position . GetBase ( )
return e . targetQuantity . Sub ( base . Abs ( ) )
}
func ( e * FixedQuantityExecutor ) isDeadlineExceeded ( ) bool {
if e . deadlineTime != nil && ! e . deadlineTime . IsZero ( ) {
return time . Since ( * e . deadlineTime ) > 0
2024-08-18 05:16:55 +00:00
}
2024-08-19 07:37:56 +00:00
return false
}
func ( e * FixedQuantityExecutor ) calculateNewOrderQuantity ( price fixedpoint . Value ) ( fixedpoint . Value , error ) {
2024-08-18 05:16:55 +00:00
minQuantity := e . market . MinQuantity
2024-08-19 07:37:56 +00:00
remainingQuantity := e . getRemainingQuantity ( )
if remainingQuantity . Sign ( ) <= 0 {
e . cancelExecution ( )
return fixedpoint . Zero , nil
}
2024-08-18 05:16:55 +00:00
2024-08-19 07:37:56 +00:00
if remainingQuantity . Compare ( minQuantity ) < 0 {
e . logger . Warnf ( "can not continue placing orders, the remaining quantity %s is less than the min quantity %s" , remainingQuantity . String ( ) , minQuantity . String ( ) )
2024-08-18 05:16:55 +00:00
2024-08-19 07:37:56 +00:00
e . cancelExecution ( )
return fixedpoint . Zero , nil
2024-08-17 10:00:49 +00:00
}
2024-08-18 05:16:55 +00:00
2024-08-19 07:37:56 +00:00
// if deadline exceeded, we should return the remaining quantity
if e . isDeadlineExceeded ( ) {
return remainingQuantity , nil
2024-08-18 05:16:55 +00:00
}
// when slice = 1000, if we only have 998, we should adjust our quantity to 998
2024-08-19 07:37:56 +00:00
orderQuantity := fixedpoint . Min ( e . sliceQuantity , remainingQuantity )
2024-08-18 05:16:55 +00:00
2024-08-19 07:37:56 +00:00
// if the remaining quantity in the next round is not enough, we should merge the remaining quantity into this round
2024-08-18 05:16:55 +00:00
// if there are rest slices
2024-08-19 07:37:56 +00:00
nextRemainingQuantity := remainingQuantity . Sub ( e . sliceQuantity )
if nextRemainingQuantity . Sign ( ) > 0 && e . market . IsDustQuantity ( nextRemainingQuantity , price ) {
orderQuantity = remainingQuantity
}
orderQuantity = e . market . AdjustQuantityByMinNotional ( orderQuantity , price )
return orderQuantity , nil
}
func ( e * FixedQuantityExecutor ) generateOrder ( ) ( * types . SubmitOrder , error ) {
newPrice , err := e . getNewPrice ( )
if err != nil {
return nil , err
2024-08-18 05:16:55 +00:00
}
2024-08-19 07:37:56 +00:00
orderQuantity , err := e . calculateNewOrderQuantity ( newPrice )
if err != nil {
return nil , err
}
2024-08-18 05:16:55 +00:00
balances , err := e . exchange . QueryAccountBalances ( e . executionCtx )
if err != nil {
return nil , err
}
switch e . side {
case types . SideTypeSell :
// check base balance for sell, try to sell as more as possible
if b , ok := balances [ e . market . BaseCurrency ] ; ok {
orderQuantity = fixedpoint . Min ( b . Available , orderQuantity )
}
case types . SideTypeBuy :
// check base balance for sell, try to sell as more as possible
if b , ok := balances [ e . market . QuoteCurrency ] ; ok {
2024-08-19 07:37:56 +00:00
orderQuantity = e . market . AdjustQuantityByMaxAmount ( orderQuantity , newPrice , b . Available )
2024-08-18 05:16:55 +00:00
}
}
2024-08-19 07:37:56 +00:00
if e . isDeadlineExceeded ( ) {
return & types . SubmitOrder {
Symbol : e . symbol ,
Side : e . side ,
Type : types . OrderTypeMarket ,
Quantity : orderQuantity ,
Market : e . market ,
} , nil
2024-08-18 05:16:55 +00:00
}
2024-08-19 07:37:56 +00:00
return & types . SubmitOrder {
2024-08-18 05:16:55 +00:00
Symbol : e . symbol ,
Side : e . side ,
Type : types . OrderTypeLimitMaker ,
Quantity : orderQuantity ,
Price : newPrice ,
Market : e . market ,
2024-08-19 07:37:56 +00:00
TimeInForce : types . TimeInForceGTC ,
} , nil
2024-08-17 10:00:49 +00:00
}
func ( e * FixedQuantityExecutor ) Start ( ctx context . Context ) error {
2024-08-19 07:37:56 +00:00
if e . executionCtx != nil {
return errors . New ( "executionCtx is not nil, you can't start the executor twice" )
2024-08-17 10:00:49 +00:00
}
e . executionCtx , e . cancelExecution = context . WithCancel ( ctx )
e . userDataStreamCtx , e . cancelUserDataStream = context . WithCancel ( ctx )
2024-08-19 07:37:56 +00:00
go e . connectMarketData ( e . executionCtx )
go e . connectUserData ( e . userDataStreamCtx )
2024-08-17 10:00:49 +00:00
2024-08-19 07:37:56 +00:00
e . logger . Infof ( "waiting for connections ready..." )
2024-08-19 09:58:25 +00:00
if err := e . WaitForConnection ( ctx ) ; err != nil {
2024-08-19 07:37:56 +00:00
e . cancelExecution ( )
2024-08-19 09:58:25 +00:00
return err
}
e . logger . Infof ( "connections ready, starting order updater..." )
go e . orderUpdater ( e . executionCtx )
return nil
}
func ( e * FixedQuantityExecutor ) WaitForConnection ( ctx context . Context ) error {
if ! selectSignalOrTimeout ( ctx , e . marketDataStreamConnectC , 10 * time . Second ) {
2024-08-19 07:37:56 +00:00
return fmt . Errorf ( "market data stream connection timeout" )
}
2024-08-17 10:00:49 +00:00
2024-08-19 07:37:56 +00:00
if ! selectSignalOrTimeout ( ctx , e . userDataStreamConnectC , 10 * time . Second ) {
return fmt . Errorf ( "user data stream connection timeout" )
}
2024-08-17 10:00:49 +00:00
return nil
}
// Done returns a channel that emits a signal when the execution is done.
func ( e * FixedQuantityExecutor ) Done ( ) <- chan struct { } {
return e . done . Chan ( )
}
// Shutdown stops the execution
// If we call this method, it means the execution is still running,
// We need it to:
// 1. Stop the order updater (by using the execution context)
// 2. The order updater cancels all open orders and closes the user data stream
func ( e * FixedQuantityExecutor ) Shutdown ( shutdownCtx context . Context ) {
e . mu . Lock ( )
if e . cancelExecution != nil {
e . cancelExecution ( )
}
e . mu . Unlock ( )
for {
select {
case <- shutdownCtx . Done ( ) :
return
case <- e . done . Chan ( ) :
return
}
}
}
2024-08-19 07:37:56 +00:00
func selectSignalOrTimeout ( ctx context . Context , c chan struct { } , timeout time . Duration ) bool {
select {
case <- ctx . Done ( ) :
return false
case <- time . After ( timeout ) :
return false
case <- c :
return true
}
}