2021-05-14 03:53:07 +00:00
package bbgo
import (
"context"
"fmt"
"sync"
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
)
type TwapExecution struct {
Session * ExchangeSession
Symbol string
Side types . SideType
TargetQuantity fixedpoint . Value
SliceQuantity fixedpoint . Value
2021-05-14 06:52:19 +00:00
StopPrice fixedpoint . Value
2021-05-14 07:35:11 +00:00
NumOfTicks int
UpdateInterval time . Duration
2021-05-15 01:46:07 +00:00
DeadlineTime time . Time
2021-05-14 03:53:07 +00:00
market types . Market
marketDataStream types . Stream
2021-05-14 06:52:19 +00:00
userDataStream types . Stream
userDataStreamCtx context . Context
cancelUserDataStream context . CancelFunc
2021-05-14 04:23:07 +00:00
orderBook * types . StreamOrderBook
currentPrice fixedpoint . Value
activePosition fixedpoint . Value
2021-05-14 03:53:07 +00:00
activeMakerOrders * LocalActiveOrderBook
orderStore * OrderStore
position * Position
2021-05-14 06:52:19 +00:00
executionCtx context . Context
cancelExecution context . CancelFunc
stoppedC chan struct { }
2021-05-14 03:53:07 +00:00
state int
mu sync . Mutex
}
func ( e * TwapExecution ) connectMarketData ( ctx context . Context ) {
log . Infof ( "connecting market data stream..." )
if err := e . marketDataStream . Connect ( ctx ) ; err != nil {
log . WithError ( err ) . Errorf ( "market data stream connect error" )
}
}
func ( e * TwapExecution ) connectUserData ( ctx context . Context ) {
log . Infof ( "connecting user data stream..." )
if err := e . userDataStream . Connect ( ctx ) ; err != nil {
log . WithError ( err ) . Errorf ( "user data stream connect error" )
}
}
func ( e * TwapExecution ) getSideBook ( ) ( pvs types . PriceVolumeSlice , err error ) {
book := e . orderBook . Get ( )
switch e . Side {
case types . SideTypeSell :
pvs = book . Asks
case types . SideTypeBuy :
pvs = book . Bids
default :
err = fmt . Errorf ( "invalid side type: %+v" , e . Side )
}
return pvs , err
}
2021-05-15 01:46:07 +00:00
func ( e * TwapExecution ) newBestPriceOrder ( ) ( orderForm types . SubmitOrder , err error ) {
2021-05-14 03:53:07 +00:00
book := e . orderBook . Get ( )
sideBook , err := e . getSideBook ( )
if err != nil {
return orderForm , err
}
first , ok := sideBook . First ( )
if ! ok {
return orderForm , fmt . Errorf ( "empty %s %s side book" , e . Symbol , e . Side )
}
newPrice := first . Price
spread , ok := book . Spread ( )
if ! ok {
return orderForm , errors . New ( "can not calculate spread, neither bid price or ask price exists" )
}
2021-05-14 07:35:11 +00:00
// for example, we have tickSize = 0.01, and spread is 28.02 - 28.00 = 0.02
// assign tickSpread = min(spread - tickSize, tickSpread)
//
// if number of ticks = 0, than the tickSpread is 0
// tickSpread = min(0.02 - 0.01, 0)
// price = first bid price 28.00 + tickSpread (0.00) = 28.00
//
// if number of ticks = 1, than the tickSpread is 0.01
// tickSpread = min(0.02 - 0.01, 0.01)
// price = first bid price 28.00 + tickSpread (0.01) = 28.01
//
// if number of ticks = 2, than the tickSpread is 0.02
// tickSpread = min(0.02 - 0.01, 0.02)
// price = first bid price 28.00 + tickSpread (0.01) = 28.01
2021-05-14 03:53:07 +00:00
tickSize := fixedpoint . NewFromFloat ( e . market . TickSize )
2021-05-14 07:35:11 +00:00
tickSpread := tickSize . MulInt ( e . NumOfTicks )
2021-05-14 03:53:07 +00:00
if spread > tickSize {
2021-05-14 07:35:11 +00:00
// there is a gap in the spread
tickSpread = fixedpoint . Min ( tickSpread , spread - tickSize )
2021-05-14 03:53:07 +00:00
switch e . Side {
case types . SideTypeSell :
2021-05-14 07:35:11 +00:00
newPrice -= tickSpread
2021-05-14 03:53:07 +00:00
case types . SideTypeBuy :
2021-05-14 07:35:11 +00:00
newPrice += tickSpread
2021-05-14 03:53:07 +00:00
}
}
2021-05-14 06:52:19 +00:00
if e . StopPrice > 0 {
switch e . Side {
case types . SideTypeSell :
if newPrice < e . StopPrice {
log . Infof ( "%s order price %f is lower than the stop sell price %f, setting order price to the stop sell price %f" ,
e . Symbol ,
newPrice . Float64 ( ) ,
e . StopPrice . Float64 ( ) ,
e . StopPrice . Float64 ( ) )
newPrice = e . StopPrice
}
case types . SideTypeBuy :
if newPrice > e . StopPrice {
log . Infof ( "%s order price %f is higher than the stop buy price %f, setting order price to the stop buy price %f" ,
e . Symbol ,
newPrice . Float64 ( ) ,
e . StopPrice . Float64 ( ) ,
e . StopPrice . Float64 ( ) )
newPrice = e . StopPrice
}
}
}
minQuantity := fixedpoint . NewFromFloat ( e . market . MinQuantity )
2021-05-15 02:06:45 +00:00
e . position . Lock ( )
base := e . position . Base
e . position . Unlock ( )
restQuantity := e . TargetQuantity - fixedpoint . Abs ( base )
2021-05-15 02:00:32 +00:00
if restQuantity == 0 {
if e . cancelContextIfTargetQuantityFilled ( ) {
return
}
}
2021-05-14 06:52:19 +00:00
if restQuantity < minQuantity {
return orderForm , fmt . Errorf ( "can not continue placing orders, rest quantity %f is less than the min quantity %f" , restQuantity . Float64 ( ) , minQuantity . Float64 ( ) )
}
2021-05-15 01:23:41 +00:00
// when slice = 1000, if we only have 998, we should adjust our quantity to 998
orderQuantity := fixedpoint . Min ( e . SliceQuantity , restQuantity )
2021-05-14 06:52:19 +00:00
// if the rest quantity in the next round is not enough, we should merge the rest quantity into this round
2021-05-15 01:23:41 +00:00
// if there are rest slices
2021-05-14 06:52:19 +00:00
nextRestQuantity := restQuantity - e . SliceQuantity
2021-05-15 01:23:41 +00:00
if nextRestQuantity > 0 && nextRestQuantity < minQuantity {
2021-05-14 06:52:19 +00:00
orderQuantity = restQuantity
}
minNotional := fixedpoint . NewFromFloat ( e . market . MinNotional )
2021-05-15 01:46:07 +00:00
orderQuantity = AdjustQuantityByMinAmount ( orderQuantity , newPrice , minNotional )
switch e . Side {
case types . SideTypeSell :
// check base balance for sell, try to sell as more as possible
if b , ok := e . Session . Account . Balance ( e . market . BaseCurrency ) ; ok {
orderQuantity = fixedpoint . Min ( b . Available , orderQuantity )
}
case types . SideTypeBuy :
// check base balance for sell, try to sell as more as possible
if b , ok := e . Session . Account . Balance ( e . market . QuoteCurrency ) ; ok {
orderQuantity = AdjustQuantityByMaxAmount ( orderQuantity , newPrice , b . Available )
}
}
if e . DeadlineTime != emptyTime {
now := time . Now ( )
if now . After ( e . DeadlineTime ) {
orderForm = types . SubmitOrder {
Symbol : e . Symbol ,
Side : e . Side ,
Type : types . OrderTypeMarket ,
Quantity : restQuantity . Float64 ( ) ,
Market : e . market ,
}
return orderForm , nil
}
2021-05-14 06:52:19 +00:00
}
2021-05-14 03:53:07 +00:00
orderForm = types . SubmitOrder {
// ClientOrderID: "",
Symbol : e . Symbol ,
Side : e . Side ,
Type : types . OrderTypeLimitMaker ,
2021-05-14 06:52:19 +00:00
Quantity : orderQuantity . Float64 ( ) ,
2021-05-14 03:53:07 +00:00
Price : newPrice . Float64 ( ) ,
Market : e . market ,
TimeInForce : "GTC" ,
}
return orderForm , err
}
func ( e * TwapExecution ) updateOrder ( ctx context . Context ) error {
2021-05-14 06:52:19 +00:00
2021-05-14 03:53:07 +00:00
sideBook , err := e . getSideBook ( )
if err != nil {
return err
}
first , ok := sideBook . First ( )
if ! ok {
return fmt . Errorf ( "empty %s %s side book" , e . Symbol , e . Side )
}
tickSize := fixedpoint . NewFromFloat ( e . market . TickSize )
// check and see if we need to cancel the existing active orders
for e . activeMakerOrders . NumOfOrders ( ) > 0 {
orders := e . activeMakerOrders . Orders ( )
if len ( orders ) > 1 {
2021-05-14 06:52:19 +00:00
log . Warnf ( "more than 1 %s open orders in the strategy..." , e . Symbol )
2021-05-14 03:53:07 +00:00
}
// get the first order
order := orders [ 0 ]
price := fixedpoint . NewFromFloat ( order . Price )
quantity := fixedpoint . NewFromFloat ( order . Quantity )
2021-05-14 06:52:19 +00:00
remainingQuantity := order . Quantity - order . ExecutedQuantity
if remainingQuantity <= e . market . MinQuantity {
log . Infof ( "order remaining quantity %f is less than the market minimal quantity %f, skip updating order" , remainingQuantity , e . market . MinQuantity )
return nil
}
if e . StopPrice > 0 {
switch e . Side {
case types . SideTypeBuy :
if first . Price > e . StopPrice {
log . Infof ( "%s first bid price %f is higher than the stop price %f, skip updating order" , e . Symbol , first . Price . Float64 ( ) , e . StopPrice . Float64 ( ) )
return nil
}
case types . SideTypeSell :
if first . Price < e . StopPrice {
log . Infof ( "%s first ask price %f is lower than the stop price %f, skip updating order" , e . Symbol , first . Price . Float64 ( ) , e . StopPrice . Float64 ( ) )
return nil
}
}
}
2021-05-14 03:53:07 +00:00
// if the first bid price or first ask price is the same to the current active order
// we should skip updating the order
if first . Price == price {
// there are other orders in the same price, it means if we cancel ours, the price is still the best price.
if first . Volume > quantity {
return nil
}
// if there is no gap between the first price entry and the second price entry
second , ok := sideBook . Second ( )
if ! ok {
2021-05-14 06:52:19 +00:00
return fmt . Errorf ( "no secoond price on the %s order book %s, can not update" , e . Symbol , e . Side )
2021-05-14 03:53:07 +00:00
}
// if there is no gap
2021-05-14 07:35:11 +00:00
gap := fixedpoint . Abs ( first . Price - second . Price )
if gap > tickSize . MulInt ( e . NumOfTicks ) {
// found gap, we should update our price
} else {
2021-05-14 06:52:19 +00:00
log . Infof ( "no gap between the second price %f and the first price %f (tick size = %f), skip updating" ,
2021-05-14 04:23:07 +00:00
first . Price . Float64 ( ) ,
second . Price . Float64 ( ) ,
tickSize . Float64 ( ) )
2021-05-14 03:53:07 +00:00
return nil
}
}
2021-05-14 04:23:07 +00:00
e . cancelActiveOrders ( ctx )
2021-05-14 03:53:07 +00:00
}
2021-05-15 01:46:07 +00:00
orderForm , err := e . newBestPriceOrder ( )
2021-05-14 03:53:07 +00:00
if err != nil {
return err
}
createdOrders , err := e . Session . OrderExecutor . SubmitOrders ( ctx , orderForm )
if err != nil {
return err
}
e . activeMakerOrders . Add ( createdOrders ... )
e . orderStore . Add ( createdOrders ... )
return nil
}
2021-05-14 04:23:07 +00:00
func ( e * TwapExecution ) cancelActiveOrders ( ctx context . Context ) {
didCancel := false
for e . activeMakerOrders . NumOfOrders ( ) > 0 {
didCancel = true
orders := e . activeMakerOrders . Orders ( )
2021-05-14 06:52:19 +00:00
log . Infof ( "canceling %d open orders:" , len ( orders ) )
e . activeMakerOrders . Print ( )
2021-05-14 04:23:07 +00:00
if err := e . Session . Exchange . CancelOrders ( ctx , orders ... ) ; err != nil {
log . WithError ( err ) . Errorf ( "can not cancel %s orders" , e . Symbol )
}
time . Sleep ( 3 * time . Second )
}
if didCancel {
log . Infof ( "orders are canceled successfully" )
}
}
2021-05-14 03:53:07 +00:00
func ( e * TwapExecution ) orderUpdater ( ctx context . Context ) {
2021-05-15 01:23:41 +00:00
updateLimiter := rate . NewLimiter ( rate . Every ( 3 * time . Second ) , 1 )
2021-05-14 07:35:11 +00:00
ticker := time . NewTimer ( e . UpdateInterval )
2021-05-14 03:53:07 +00:00
defer ticker . Stop ( )
2021-05-14 04:23:07 +00:00
2021-05-14 06:52:19 +00:00
// we should stop updater and clean up our open orders, if
// 1. the given context is canceled.
// 2. the base quantity equals to or greater than the target quantity
2021-05-14 04:23:07 +00:00
defer func ( ) {
e . cancelActiveOrders ( context . Background ( ) )
2021-05-14 06:52:19 +00:00
e . cancelUserDataStream ( )
e . emitDone ( )
2021-05-14 04:23:07 +00:00
} ( )
2021-05-14 03:53:07 +00:00
for {
select {
case <- ctx . Done ( ) :
return
case <- e . orderBook . C :
2021-05-15 01:20:46 +00:00
if ! updateLimiter . Allow ( ) {
2021-05-14 03:53:07 +00:00
break
}
2021-05-14 06:52:19 +00:00
if e . cancelContextIfTargetQuantityFilled ( ) {
return
}
2021-05-15 01:29:44 +00:00
log . Infof ( "%s order book changed, checking order..." , e . Symbol )
2021-05-14 03:53:07 +00:00
if err := e . updateOrder ( ctx ) ; err != nil {
log . WithError ( err ) . Errorf ( "order update failed" )
}
case <- ticker . C :
2021-05-15 01:20:46 +00:00
if ! updateLimiter . Allow ( ) {
2021-05-14 03:53:07 +00:00
break
}
2021-05-14 06:52:19 +00:00
if e . cancelContextIfTargetQuantityFilled ( ) {
return
}
2021-05-14 03:53:07 +00:00
if err := e . updateOrder ( ctx ) ; err != nil {
log . WithError ( err ) . Errorf ( "order update failed" )
}
}
}
}
2021-05-14 06:52:19 +00:00
func ( e * TwapExecution ) cancelContextIfTargetQuantityFilled ( ) bool {
2021-05-15 02:06:45 +00:00
e . position . Lock ( )
base := e . position . Base
e . position . Unlock ( )
if fixedpoint . Abs ( base ) >= e . TargetQuantity {
2021-05-14 06:52:19 +00:00
log . Infof ( "filled target quantity, canceling the order execution context" )
e . cancelExecution ( )
return true
}
return false
}
2021-05-14 04:23:07 +00:00
func ( e * TwapExecution ) handleTradeUpdate ( trade types . Trade ) {
2021-05-14 03:53:07 +00:00
// ignore trades that are not in the symbol we interested
if trade . Symbol != e . Symbol {
return
}
if ! e . orderStore . Exists ( trade . OrderID ) {
return
}
2021-05-14 06:52:19 +00:00
log . Info ( trade . String ( ) )
2021-05-14 03:53:07 +00:00
e . position . AddTrade ( trade )
log . Infof ( "position updated: %+v" , e . position )
}
2021-05-14 04:23:07 +00:00
func ( e * TwapExecution ) handleFilledOrder ( order types . Order ) {
2021-05-14 06:52:19 +00:00
log . Info ( order . String ( ) )
// filled event triggers the order removal from the active order store
// we need to ensure we received every order update event before the execution is done.
e . cancelContextIfTargetQuantityFilled ( )
2021-05-14 04:23:07 +00:00
}
2021-05-14 06:52:19 +00:00
func ( e * TwapExecution ) Run ( parentCtx context . Context ) error {
e . mu . Lock ( )
e . stoppedC = make ( chan struct { } )
e . executionCtx , e . cancelExecution = context . WithCancel ( parentCtx )
e . userDataStreamCtx , e . cancelUserDataStream = context . WithCancel ( context . Background ( ) )
e . mu . Unlock ( )
2021-05-14 07:35:11 +00:00
if e . UpdateInterval == 0 {
e . UpdateInterval = 10 * time . Second
}
2021-05-14 03:53:07 +00:00
var ok bool
e . market , ok = e . Session . Market ( e . Symbol )
if ! ok {
return fmt . Errorf ( "market %s not found" , e . Symbol )
}
e . marketDataStream = e . Session . Exchange . NewStream ( )
e . marketDataStream . SetPublicOnly ( )
e . marketDataStream . Subscribe ( types . BookChannel , e . Symbol , types . SubscribeOptions { } )
e . orderBook = types . NewStreamBook ( e . Symbol )
e . orderBook . BindStream ( e . marketDataStream )
2021-05-14 06:52:19 +00:00
go e . connectMarketData ( e . executionCtx )
2021-05-14 03:53:07 +00:00
e . userDataStream = e . Session . Exchange . NewStream ( )
2021-05-14 04:23:07 +00:00
e . userDataStream . OnTradeUpdate ( e . handleTradeUpdate )
2021-05-14 03:53:07 +00:00
e . position = & Position {
Symbol : e . Symbol ,
BaseCurrency : e . market . BaseCurrency ,
QuoteCurrency : e . market . QuoteCurrency ,
}
e . orderStore = NewOrderStore ( e . Symbol )
e . orderStore . BindStream ( e . userDataStream )
e . activeMakerOrders = NewLocalActiveOrderBook ( )
2021-05-14 04:23:07 +00:00
e . activeMakerOrders . OnFilled ( e . handleFilledOrder )
2021-05-14 03:53:07 +00:00
e . activeMakerOrders . BindStream ( e . userDataStream )
2021-05-14 06:52:19 +00:00
go e . connectUserData ( e . userDataStreamCtx )
go e . orderUpdater ( e . executionCtx )
2021-05-14 03:53:07 +00:00
return nil
}
2021-05-14 06:52:19 +00:00
func ( e * TwapExecution ) emitDone ( ) {
e . mu . Lock ( )
if e . stoppedC == nil {
e . stoppedC = make ( chan struct { } )
}
close ( e . stoppedC )
e . mu . Unlock ( )
}
func ( e * TwapExecution ) Done ( ) ( c <- chan struct { } ) {
e . mu . Lock ( )
// if the channel is not allocated, it means it's not started yet, we need to return a closed channel
if e . stoppedC == nil {
e . stoppedC = make ( chan struct { } )
close ( e . stoppedC )
c = e . stoppedC
} else {
c = e . stoppedC
}
e . mu . Unlock ( )
return c
}
// Shutdown stops the execution
// If we call this method, it means the execution is still running,
// We need to:
// 1. stop the order updater (by using the execution context)
// 2. the order updater cancels all open orders and close the user data stream
func ( e * TwapExecution ) Shutdown ( shutdownCtx context . Context ) {
e . mu . Lock ( )
if e . cancelExecution != nil {
e . cancelExecution ( )
}
e . mu . Unlock ( )
for {
select {
case <- shutdownCtx . Done ( ) :
return
case <- e . Done ( ) :
return
}
}
}