2024-08-17 05:29:27 +00:00
package twap
2021-05-14 03:53:07 +00:00
import (
"context"
"fmt"
"sync"
"time"
"github.com/pkg/errors"
2024-08-17 05:29:27 +00:00
"github.com/sirupsen/logrus"
2021-05-14 03:53:07 +00:00
"golang.org/x/time/rate"
2022-01-06 16:10:40 +00:00
2024-08-17 05:29:27 +00:00
"github.com/c9s/bbgo/pkg/bbgo"
2023-07-04 13:42:24 +00:00
"github.com/c9s/bbgo/pkg/core"
2022-01-06 16:10:40 +00:00
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
2021-05-14 03:53:07 +00:00
)
2024-08-17 06:09:25 +00:00
// StreamExecutor is a TWAP execution that places orders on the best price by connecting to the real time market data stream.
type StreamExecutor struct {
2024-08-17 05:29:27 +00:00
Session * bbgo . ExchangeSession
2021-05-14 03:53:07 +00:00
Symbol string
Side types . SideType
TargetQuantity fixedpoint . Value
SliceQuantity fixedpoint . Value
2021-05-14 06:52:19 +00:00
StopPrice fixedpoint . Value
2021-05-14 07:35:11 +00:00
NumOfTicks int
UpdateInterval time . Duration
2021-05-15 01:46:07 +00:00
DeadlineTime time . Time
2021-05-14 03:53:07 +00:00
market types . Market
marketDataStream types . Stream
2021-05-14 06:52:19 +00:00
userDataStream types . Stream
userDataStreamCtx context . Context
cancelUserDataStream context . CancelFunc
2021-05-14 04:23:07 +00:00
orderBook * types . StreamOrderBook
currentPrice fixedpoint . Value
activePosition fixedpoint . Value
2021-05-14 03:53:07 +00:00
2024-08-17 05:29:27 +00:00
activeMakerOrders * bbgo . ActiveOrderBook
2023-07-04 13:42:24 +00:00
orderStore * core . OrderStore
2021-12-11 11:16:16 +00:00
position * types . Position
2021-05-14 03:53:07 +00:00
2021-05-14 06:52:19 +00:00
executionCtx context . Context
cancelExecution context . CancelFunc
stoppedC chan struct { }
2021-05-14 03:53:07 +00:00
mu sync . Mutex
}
2024-08-17 06:09:25 +00:00
func ( e * StreamExecutor ) connectMarketData ( ctx context . Context ) {
2024-08-17 05:29:27 +00:00
logrus . Infof ( "connecting market data stream..." )
2021-05-14 03:53:07 +00:00
if err := e . marketDataStream . Connect ( ctx ) ; err != nil {
2024-08-17 05:29:27 +00:00
logrus . WithError ( err ) . Errorf ( "market data stream connect error" )
2021-05-14 03:53:07 +00:00
}
}
2024-08-17 06:09:25 +00:00
func ( e * StreamExecutor ) connectUserData ( ctx context . Context ) {
2024-08-17 05:29:27 +00:00
logrus . Infof ( "connecting user data stream..." )
2021-05-14 03:53:07 +00:00
if err := e . userDataStream . Connect ( ctx ) ; err != nil {
2024-08-17 05:29:27 +00:00
logrus . WithError ( err ) . Errorf ( "user data stream connect error" )
2021-05-14 03:53:07 +00:00
}
}
2024-08-17 06:09:25 +00:00
func ( e * StreamExecutor ) newBestPriceOrder ( ) ( orderForm types . SubmitOrder , err error ) {
2021-05-22 04:18:08 +00:00
book := e . orderBook . Copy ( )
2021-05-22 16:42:44 +00:00
sideBook := book . SideBook ( e . Side )
2021-05-14 03:53:07 +00:00
first , ok := sideBook . First ( )
if ! ok {
return orderForm , fmt . Errorf ( "empty %s %s side book" , e . Symbol , e . Side )
}
newPrice := first . Price
spread , ok := book . Spread ( )
if ! ok {
return orderForm , errors . New ( "can not calculate spread, neither bid price or ask price exists" )
}
2021-05-14 07:35:11 +00:00
// for example, we have tickSize = 0.01, and spread is 28.02 - 28.00 = 0.02
// assign tickSpread = min(spread - tickSize, tickSpread)
//
// if number of ticks = 0, than the tickSpread is 0
// tickSpread = min(0.02 - 0.01, 0)
// price = first bid price 28.00 + tickSpread (0.00) = 28.00
//
// if number of ticks = 1, than the tickSpread is 0.01
// tickSpread = min(0.02 - 0.01, 0.01)
// price = first bid price 28.00 + tickSpread (0.01) = 28.01
//
// if number of ticks = 2, than the tickSpread is 0.02
// tickSpread = min(0.02 - 0.01, 0.02)
// price = first bid price 28.00 + tickSpread (0.01) = 28.01
2022-02-03 11:19:56 +00:00
tickSize := e . market . TickSize
tickSpread := tickSize . Mul ( fixedpoint . NewFromInt ( int64 ( e . NumOfTicks ) ) )
if spread . Compare ( tickSize ) > 0 {
2021-05-14 07:35:11 +00:00
// there is a gap in the spread
2022-02-03 11:19:56 +00:00
tickSpread = fixedpoint . Min ( tickSpread , spread . Sub ( tickSize ) )
2021-05-14 03:53:07 +00:00
switch e . Side {
case types . SideTypeSell :
2022-02-03 11:19:56 +00:00
newPrice = newPrice . Sub ( tickSpread )
2021-05-14 03:53:07 +00:00
case types . SideTypeBuy :
2022-02-03 11:19:56 +00:00
newPrice = newPrice . Add ( tickSpread )
2021-05-14 03:53:07 +00:00
}
}
2022-02-03 11:19:56 +00:00
if e . StopPrice . Sign ( ) > 0 {
2021-05-14 06:52:19 +00:00
switch e . Side {
case types . SideTypeSell :
2022-02-03 11:19:56 +00:00
if newPrice . Compare ( e . StopPrice ) < 0 {
2024-08-17 05:29:27 +00:00
logrus . Infof ( "%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s" ,
2021-05-14 06:52:19 +00:00
e . Symbol ,
2022-02-03 11:19:56 +00:00
newPrice . String ( ) ,
e . StopPrice . String ( ) ,
e . StopPrice . String ( ) )
2021-05-14 06:52:19 +00:00
newPrice = e . StopPrice
}
case types . SideTypeBuy :
2022-02-03 11:19:56 +00:00
if newPrice . Compare ( e . StopPrice ) > 0 {
2024-08-17 05:29:27 +00:00
logrus . Infof ( "%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s" ,
2021-05-14 06:52:19 +00:00
e . Symbol ,
2022-02-03 11:19:56 +00:00
newPrice . String ( ) ,
e . StopPrice . String ( ) ,
e . StopPrice . String ( ) )
2021-05-14 06:52:19 +00:00
newPrice = e . StopPrice
}
}
}
2022-02-03 11:19:56 +00:00
minQuantity := e . market . MinQuantity
2022-01-08 16:35:45 +00:00
base := e . position . GetBase ( )
2021-05-15 02:06:45 +00:00
2022-02-03 11:19:56 +00:00
restQuantity := e . TargetQuantity . Sub ( base . Abs ( ) )
2021-05-15 02:00:32 +00:00
2022-02-03 11:19:56 +00:00
if restQuantity . Sign ( ) <= 0 {
2021-05-15 02:00:32 +00:00
if e . cancelContextIfTargetQuantityFilled ( ) {
return
}
}
2022-02-03 11:19:56 +00:00
if restQuantity . Compare ( minQuantity ) < 0 {
return orderForm , fmt . Errorf ( "can not continue placing orders, rest quantity %s is less than the min quantity %s" , restQuantity . String ( ) , minQuantity . String ( ) )
2021-05-14 06:52:19 +00:00
}
2021-05-15 01:23:41 +00:00
// when slice = 1000, if we only have 998, we should adjust our quantity to 998
orderQuantity := fixedpoint . Min ( e . SliceQuantity , restQuantity )
2021-05-14 06:52:19 +00:00
// if the rest quantity in the next round is not enough, we should merge the rest quantity into this round
2021-05-15 01:23:41 +00:00
// if there are rest slices
2022-02-03 11:19:56 +00:00
nextRestQuantity := restQuantity . Sub ( e . SliceQuantity )
if nextRestQuantity . Sign ( ) > 0 && nextRestQuantity . Compare ( minQuantity ) < 0 {
2021-05-14 06:52:19 +00:00
orderQuantity = restQuantity
}
2022-02-03 11:19:56 +00:00
minNotional := e . market . MinNotional
2024-08-17 05:29:27 +00:00
orderQuantity = bbgo . AdjustQuantityByMinAmount ( orderQuantity , newPrice , minNotional )
2021-05-15 01:46:07 +00:00
switch e . Side {
case types . SideTypeSell :
// check base balance for sell, try to sell as more as possible
2022-04-23 07:43:11 +00:00
if b , ok := e . Session . GetAccount ( ) . Balance ( e . market . BaseCurrency ) ; ok {
2021-05-15 01:46:07 +00:00
orderQuantity = fixedpoint . Min ( b . Available , orderQuantity )
}
case types . SideTypeBuy :
// check base balance for sell, try to sell as more as possible
2022-04-23 07:43:11 +00:00
if b , ok := e . Session . GetAccount ( ) . Balance ( e . market . QuoteCurrency ) ; ok {
2024-08-17 05:29:27 +00:00
orderQuantity = bbgo . AdjustQuantityByMaxAmount ( orderQuantity , newPrice , b . Available )
2021-05-15 01:46:07 +00:00
}
}
2024-08-17 05:29:27 +00:00
if ! e . DeadlineTime . IsZero ( ) {
2021-05-15 01:46:07 +00:00
now := time . Now ( )
if now . After ( e . DeadlineTime ) {
orderForm = types . SubmitOrder {
Symbol : e . Symbol ,
Side : e . Side ,
Type : types . OrderTypeMarket ,
2022-02-03 11:19:56 +00:00
Quantity : restQuantity ,
2021-05-15 01:46:07 +00:00
Market : e . market ,
}
return orderForm , nil
}
2021-05-14 06:52:19 +00:00
}
2021-05-14 03:53:07 +00:00
orderForm = types . SubmitOrder {
// ClientOrderID: "",
Symbol : e . Symbol ,
Side : e . Side ,
Type : types . OrderTypeLimitMaker ,
2022-02-03 11:19:56 +00:00
Quantity : orderQuantity ,
Price : newPrice ,
2021-05-14 03:53:07 +00:00
Market : e . market ,
TimeInForce : "GTC" ,
}
return orderForm , err
}
2024-08-17 06:09:25 +00:00
func ( e * StreamExecutor ) updateOrder ( ctx context . Context ) error {
2021-05-22 16:42:44 +00:00
book := e . orderBook . Copy ( )
sideBook := book . SideBook ( e . Side )
2021-05-14 03:53:07 +00:00
first , ok := sideBook . First ( )
if ! ok {
return fmt . Errorf ( "empty %s %s side book" , e . Symbol , e . Side )
}
2021-05-18 05:38:23 +00:00
// if there is no gap between the first price entry and the second price entry
second , ok := sideBook . Second ( )
if ! ok {
return fmt . Errorf ( "no secoond price on the %s order book %s, can not update" , e . Symbol , e . Side )
}
2022-02-03 11:19:56 +00:00
tickSize := e . market . TickSize
numOfTicks := fixedpoint . NewFromInt ( int64 ( e . NumOfTicks ) )
tickSpread := tickSize . Mul ( numOfTicks )
2021-05-14 03:53:07 +00:00
// check and see if we need to cancel the existing active orders
for e . activeMakerOrders . NumOfOrders ( ) > 0 {
orders := e . activeMakerOrders . Orders ( )
if len ( orders ) > 1 {
2024-08-17 05:29:27 +00:00
logrus . Warnf ( "more than 1 %s open orders in the strategy..." , e . Symbol )
2021-05-14 03:53:07 +00:00
}
// get the first order
order := orders [ 0 ]
2022-02-03 11:19:56 +00:00
orderPrice := order . Price
2021-05-18 05:38:23 +00:00
// quantity := fixedpoint.NewFromFloat(order.Quantity)
2021-05-14 03:53:07 +00:00
2022-02-03 11:19:56 +00:00
remainingQuantity := order . Quantity . Sub ( order . ExecutedQuantity )
if remainingQuantity . Compare ( e . market . MinQuantity ) <= 0 {
2024-08-17 05:29:27 +00:00
logrus . Infof ( "order remaining quantity %s is less than the market minimal quantity %s, skip updating order" , remainingQuantity . String ( ) , e . market . MinQuantity . String ( ) )
2021-05-14 06:52:19 +00:00
return nil
}
2021-05-14 03:53:07 +00:00
// if the first bid price or first ask price is the same to the current active order
// we should skip updating the order
2021-05-18 05:38:23 +00:00
// DO NOT UPDATE IF:
// tickSpread > 0 AND current order price == second price + tickSpread
// current order price == first price
2024-08-17 05:29:27 +00:00
logrus . Infof ( "orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s" , orderPrice . String ( ) , first . Price . String ( ) , second . Price . String ( ) , tickSpread . String ( ) )
2021-05-14 03:53:07 +00:00
2021-05-18 05:38:23 +00:00
switch e . Side {
case types . SideTypeBuy :
2022-02-03 11:19:56 +00:00
if tickSpread . Sign ( ) > 0 && orderPrice == second . Price . Add ( tickSpread ) {
2024-08-17 05:29:27 +00:00
logrus . Infof ( "the current order is already on the best ask price %s" , orderPrice . String ( ) )
2021-05-18 05:38:23 +00:00
return nil
} else if orderPrice == first . Price {
2024-08-17 05:29:27 +00:00
logrus . Infof ( "the current order is already on the best bid price %s" , orderPrice . String ( ) )
2021-05-18 05:38:23 +00:00
return nil
2021-05-14 03:53:07 +00:00
}
2021-05-18 05:38:23 +00:00
case types . SideTypeSell :
2022-02-03 11:19:56 +00:00
if tickSpread . Sign ( ) > 0 && orderPrice == second . Price . Sub ( tickSpread ) {
2024-08-17 05:29:27 +00:00
logrus . Infof ( "the current order is already on the best ask price %s" , orderPrice . String ( ) )
2021-05-18 05:38:23 +00:00
return nil
} else if orderPrice == first . Price {
2024-08-17 05:29:27 +00:00
logrus . Infof ( "the current order is already on the best ask price %s" , orderPrice . String ( ) )
2021-05-14 03:53:07 +00:00
return nil
}
}
2022-01-06 17:34:23 +00:00
e . cancelActiveOrders ( )
2021-05-14 03:53:07 +00:00
}
2021-05-15 01:46:07 +00:00
orderForm , err := e . newBestPriceOrder ( )
2021-05-14 03:53:07 +00:00
if err != nil {
return err
}
createdOrders , err := e . Session . OrderExecutor . SubmitOrders ( ctx , orderForm )
if err != nil {
return err
}
e . activeMakerOrders . Add ( createdOrders ... )
e . orderStore . Add ( createdOrders ... )
return nil
}
2024-08-17 06:09:25 +00:00
func ( e * StreamExecutor ) cancelActiveOrders ( ) {
2022-02-15 05:55:19 +00:00
gracefulCtx , gracefulCancel := context . WithTimeout ( context . TODO ( ) , 30 * time . Second )
2022-01-06 17:34:23 +00:00
defer gracefulCancel ( )
e . activeMakerOrders . GracefulCancel ( gracefulCtx , e . Session . Exchange )
2021-05-14 04:23:07 +00:00
}
2024-08-17 06:09:25 +00:00
func ( e * StreamExecutor ) orderUpdater ( ctx context . Context ) {
2021-05-15 01:23:41 +00:00
updateLimiter := rate . NewLimiter ( rate . Every ( 3 * time . Second ) , 1 )
2021-05-14 07:35:11 +00:00
ticker := time . NewTimer ( e . UpdateInterval )
2021-05-14 03:53:07 +00:00
defer ticker . Stop ( )
2021-05-14 04:23:07 +00:00
2021-05-14 06:52:19 +00:00
// we should stop updater and clean up our open orders, if
// 1. the given context is canceled.
// 2. the base quantity equals to or greater than the target quantity
2021-05-14 04:23:07 +00:00
defer func ( ) {
2022-01-06 17:34:23 +00:00
e . cancelActiveOrders ( )
2021-05-14 06:52:19 +00:00
e . cancelUserDataStream ( )
e . emitDone ( )
2021-05-14 04:23:07 +00:00
} ( )
2021-05-14 03:53:07 +00:00
for {
select {
case <- ctx . Done ( ) :
return
case <- e . orderBook . C :
2021-05-15 01:20:46 +00:00
if ! updateLimiter . Allow ( ) {
2021-05-14 03:53:07 +00:00
break
}
2021-05-14 06:52:19 +00:00
if e . cancelContextIfTargetQuantityFilled ( ) {
return
}
2024-08-17 05:29:27 +00:00
logrus . Infof ( "%s order book changed, checking order..." , e . Symbol )
2021-05-14 03:53:07 +00:00
if err := e . updateOrder ( ctx ) ; err != nil {
2024-08-17 05:29:27 +00:00
logrus . WithError ( err ) . Errorf ( "order update failed" )
2021-05-14 03:53:07 +00:00
}
case <- ticker . C :
2021-05-15 01:20:46 +00:00
if ! updateLimiter . Allow ( ) {
2021-05-14 03:53:07 +00:00
break
}
2021-05-14 06:52:19 +00:00
if e . cancelContextIfTargetQuantityFilled ( ) {
return
}
2021-05-14 03:53:07 +00:00
if err := e . updateOrder ( ctx ) ; err != nil {
2024-08-17 05:29:27 +00:00
logrus . WithError ( err ) . Errorf ( "order update failed" )
2021-05-14 03:53:07 +00:00
}
}
}
}
2024-08-17 06:09:25 +00:00
func ( e * StreamExecutor ) cancelContextIfTargetQuantityFilled ( ) bool {
2022-01-08 16:35:45 +00:00
base := e . position . GetBase ( )
2021-05-15 02:06:45 +00:00
2022-02-03 11:19:56 +00:00
if base . Abs ( ) . Compare ( e . TargetQuantity ) >= 0 {
2024-08-17 05:29:27 +00:00
logrus . Infof ( "filled target quantity, canceling the order execution context" )
2021-05-14 06:52:19 +00:00
e . cancelExecution ( )
return true
}
2024-08-17 10:00:49 +00:00
2021-05-14 06:52:19 +00:00
return false
}
2024-08-17 06:09:25 +00:00
func ( e * StreamExecutor ) handleTradeUpdate ( trade types . Trade ) {
2021-05-14 03:53:07 +00:00
// ignore trades that are not in the symbol we interested
if trade . Symbol != e . Symbol {
return
}
if ! e . orderStore . Exists ( trade . OrderID ) {
return
}
2024-08-17 05:29:27 +00:00
logrus . Info ( trade . String ( ) )
2021-05-14 03:53:07 +00:00
e . position . AddTrade ( trade )
2024-08-17 05:29:27 +00:00
logrus . Infof ( "position updated: %+v" , e . position )
2021-05-14 03:53:07 +00:00
}
2024-08-17 06:09:25 +00:00
func ( e * StreamExecutor ) handleFilledOrder ( order types . Order ) {
2024-08-17 05:29:27 +00:00
logrus . Info ( order . String ( ) )
2021-05-14 06:52:19 +00:00
// filled event triggers the order removal from the active order store
// we need to ensure we received every order update event before the execution is done.
e . cancelContextIfTargetQuantityFilled ( )
2021-05-14 04:23:07 +00:00
}
2024-08-17 06:09:25 +00:00
func ( e * StreamExecutor ) Run ( parentCtx context . Context ) error {
2021-05-14 06:52:19 +00:00
e . mu . Lock ( )
e . stoppedC = make ( chan struct { } )
e . executionCtx , e . cancelExecution = context . WithCancel ( parentCtx )
e . userDataStreamCtx , e . cancelUserDataStream = context . WithCancel ( context . Background ( ) )
e . mu . Unlock ( )
2021-05-14 07:35:11 +00:00
if e . UpdateInterval == 0 {
e . UpdateInterval = 10 * time . Second
}
2021-05-14 03:53:07 +00:00
var ok bool
e . market , ok = e . Session . Market ( e . Symbol )
if ! ok {
return fmt . Errorf ( "market %s not found" , e . Symbol )
}
e . marketDataStream = e . Session . Exchange . NewStream ( )
e . marketDataStream . SetPublicOnly ( )
e . marketDataStream . Subscribe ( types . BookChannel , e . Symbol , types . SubscribeOptions { } )
2024-08-24 05:45:01 +00:00
e . orderBook = types . NewStreamBook ( e . Symbol , e . Session . ExchangeName )
2021-05-14 03:53:07 +00:00
e . orderBook . BindStream ( e . marketDataStream )
e . userDataStream = e . Session . Exchange . NewStream ( )
2021-05-14 04:23:07 +00:00
e . userDataStream . OnTradeUpdate ( e . handleTradeUpdate )
2024-08-17 10:00:49 +00:00
2021-12-11 11:16:16 +00:00
e . position = & types . Position {
2021-05-14 03:53:07 +00:00
Symbol : e . Symbol ,
BaseCurrency : e . market . BaseCurrency ,
QuoteCurrency : e . market . QuoteCurrency ,
}
2023-07-04 13:42:24 +00:00
e . orderStore = core . NewOrderStore ( e . Symbol )
2021-05-14 03:53:07 +00:00
e . orderStore . BindStream ( e . userDataStream )
2024-08-17 05:29:27 +00:00
e . activeMakerOrders = bbgo . NewActiveOrderBook ( e . Symbol )
2021-05-14 04:23:07 +00:00
e . activeMakerOrders . OnFilled ( e . handleFilledOrder )
2021-05-14 03:53:07 +00:00
e . activeMakerOrders . BindStream ( e . userDataStream )
2024-08-17 10:00:49 +00:00
go e . connectMarketData ( e . executionCtx )
2021-05-14 06:52:19 +00:00
go e . connectUserData ( e . userDataStreamCtx )
go e . orderUpdater ( e . executionCtx )
2021-05-14 03:53:07 +00:00
return nil
}
2024-08-17 06:09:25 +00:00
func ( e * StreamExecutor ) emitDone ( ) {
2021-05-14 06:52:19 +00:00
e . mu . Lock ( )
if e . stoppedC == nil {
e . stoppedC = make ( chan struct { } )
}
close ( e . stoppedC )
e . mu . Unlock ( )
}
2024-08-17 06:09:25 +00:00
func ( e * StreamExecutor ) Done ( ) ( c <- chan struct { } ) {
2021-05-14 06:52:19 +00:00
e . mu . Lock ( )
// if the channel is not allocated, it means it's not started yet, we need to return a closed channel
if e . stoppedC == nil {
e . stoppedC = make ( chan struct { } )
close ( e . stoppedC )
c = e . stoppedC
} else {
c = e . stoppedC
}
e . mu . Unlock ( )
return c
}
// Shutdown stops the execution
// If we call this method, it means the execution is still running,
// We need to:
// 1. stop the order updater (by using the execution context)
// 2. the order updater cancels all open orders and close the user data stream
2024-08-17 06:09:25 +00:00
func ( e * StreamExecutor ) Shutdown ( shutdownCtx context . Context ) {
2021-05-14 06:52:19 +00:00
e . mu . Lock ( )
if e . cancelExecution != nil {
e . cancelExecution ( )
}
e . mu . Unlock ( )
for {
select {
case <- shutdownCtx . Done ( ) :
return
case <- e . Done ( ) :
return
}
}
}