618 lines
17 KiB
Go
618 lines
17 KiB
Go
package twap
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
"golang.org/x/time/rate"
|
|
|
|
"git.qtrade.icu/lychiyu/bbgo/pkg/bbgo"
|
|
"git.qtrade.icu/lychiyu/bbgo/pkg/core"
|
|
"git.qtrade.icu/lychiyu/bbgo/pkg/fixedpoint"
|
|
"git.qtrade.icu/lychiyu/bbgo/pkg/types"
|
|
)
|
|
|
|
var defaultUpdateInterval = time.Minute
|
|
|
|
// FixedQuantityExecutor is a TWAP executor that places orders on the exchange using the exchange's stream API.
|
|
// It uses a fixed target quantity to place orders.
|
|
type FixedQuantityExecutor struct {
|
|
exchange types.Exchange
|
|
|
|
// configuration fields
|
|
|
|
symbol string
|
|
side types.SideType
|
|
targetQuantity, sliceQuantity fixedpoint.Value
|
|
|
|
// updateInterval is a fixed update interval for placing new order
|
|
updateInterval time.Duration
|
|
|
|
// delayInterval is the delay interval between each order placement
|
|
delayInterval time.Duration
|
|
|
|
// numOfTicks is the number of price ticks behind the best bid to place the order
|
|
numOfTicks int
|
|
|
|
// stopPrice is the price limit for the order
|
|
// for buy-orders, the price limit is the maximum price
|
|
// for sell-orders, the price limit is the minimum price
|
|
stopPrice fixedpoint.Value
|
|
|
|
// deadlineTime is the deadline time for the order execution
|
|
deadlineTime *time.Time
|
|
|
|
executionCtx context.Context
|
|
cancelExecution context.CancelFunc
|
|
|
|
userDataStreamCtx context.Context
|
|
cancelUserDataStream context.CancelFunc
|
|
|
|
market types.Market
|
|
marketDataStream types.Stream
|
|
|
|
orderBook *types.StreamOrderBook
|
|
|
|
userDataStream types.Stream
|
|
|
|
orderUpdateRateLimit *rate.Limiter
|
|
activeMakerOrders *bbgo.ActiveOrderBook
|
|
orderStore *core.OrderStore
|
|
position *types.Position
|
|
tradeCollector *core.TradeCollector
|
|
|
|
logger logrus.FieldLogger
|
|
|
|
mu sync.Mutex
|
|
|
|
userDataStreamConnectC chan struct{}
|
|
marketDataStreamConnectC chan struct{}
|
|
done *DoneSignal
|
|
}
|
|
|
|
func NewFixedQuantityExecutor(
|
|
exchange types.Exchange,
|
|
symbol string,
|
|
market types.Market,
|
|
side types.SideType,
|
|
targetQuantity, sliceQuantity fixedpoint.Value,
|
|
) *FixedQuantityExecutor {
|
|
|
|
marketDataStream := exchange.NewStream()
|
|
marketDataStream.SetPublicOnly()
|
|
marketDataStream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{
|
|
Depth: types.DepthLevelMedium,
|
|
})
|
|
|
|
orderBook := types.NewStreamBook(symbol, exchange.Name())
|
|
orderBook.BindStream(marketDataStream)
|
|
|
|
userDataStream := exchange.NewStream()
|
|
orderStore := core.NewOrderStore(symbol)
|
|
position := types.NewPositionFromMarket(market)
|
|
tradeCollector := core.NewTradeCollector(symbol, position, orderStore)
|
|
orderStore.BindStream(userDataStream)
|
|
|
|
activeMakerOrders := bbgo.NewActiveOrderBook(symbol)
|
|
|
|
e := &FixedQuantityExecutor{
|
|
exchange: exchange,
|
|
symbol: symbol,
|
|
side: side,
|
|
market: market,
|
|
targetQuantity: targetQuantity,
|
|
sliceQuantity: sliceQuantity,
|
|
updateInterval: defaultUpdateInterval,
|
|
logger: logrus.WithFields(logrus.Fields{
|
|
"executor": "twapStream",
|
|
"symbol": symbol,
|
|
}),
|
|
|
|
marketDataStream: marketDataStream,
|
|
orderBook: orderBook,
|
|
|
|
userDataStream: userDataStream,
|
|
|
|
activeMakerOrders: activeMakerOrders,
|
|
orderStore: orderStore,
|
|
tradeCollector: tradeCollector,
|
|
position: position,
|
|
done: NewDoneSignal(),
|
|
|
|
userDataStreamConnectC: make(chan struct{}),
|
|
marketDataStreamConnectC: make(chan struct{}),
|
|
}
|
|
|
|
e.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) {
|
|
e.logger.Info(trade.String())
|
|
})
|
|
e.tradeCollector.BindStream(e.userDataStream)
|
|
|
|
activeMakerOrders.OnFilled(e.handleFilledOrder)
|
|
activeMakerOrders.BindStream(e.userDataStream)
|
|
|
|
e.marketDataStream.OnConnect(func() {
|
|
e.logger.Info("market data stream on connect")
|
|
close(e.marketDataStreamConnectC)
|
|
e.logger.Infof("marketDataStreamConnectC closed")
|
|
})
|
|
|
|
// private channels
|
|
e.userDataStream.OnAuth(func() {
|
|
e.logger.Info("user data stream on auth")
|
|
close(e.userDataStreamConnectC)
|
|
e.logger.Info("userDataStreamConnectC closed")
|
|
})
|
|
|
|
return e
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) SetDeadlineTime(t time.Time) {
|
|
e.deadlineTime = &t
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) SetDelayInterval(delayInterval time.Duration) {
|
|
e.delayInterval = delayInterval
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) SetUpdateInterval(updateInterval time.Duration) {
|
|
e.updateInterval = updateInterval
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) SetNumOfTicks(numOfTicks int) {
|
|
e.numOfTicks = numOfTicks
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) SetStopPrice(price fixedpoint.Value) {
|
|
e.stopPrice = price
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) connectMarketData(ctx context.Context) {
|
|
e.logger.Infof("connecting market data stream...")
|
|
if err := e.marketDataStream.Connect(ctx); err != nil {
|
|
e.logger.WithError(err).Errorf("market data stream connect error")
|
|
}
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) connectUserData(ctx context.Context) {
|
|
e.logger.Infof("connecting user data stream...")
|
|
if err := e.userDataStream.Connect(ctx); err != nil {
|
|
e.logger.WithError(err).Errorf("user data stream connect error")
|
|
}
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) handleFilledOrder(order types.Order) {
|
|
e.logger.Info(order.String())
|
|
|
|
// filled event triggers the order removal from the active order store
|
|
// we need to ensure we received every order update event before the execution is done.
|
|
e.cancelContextIfTargetQuantityFilled()
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) cancelContextIfTargetQuantityFilled() bool {
|
|
// ensure that the trades are processed
|
|
e.tradeCollector.Process()
|
|
|
|
// now get the base quantity from the position
|
|
base := e.position.GetBase()
|
|
|
|
if base.Abs().Sub(e.targetQuantity).Compare(e.market.MinQuantity.Neg()) >= 0 {
|
|
e.logger.Infof("position is filled with target quantity, canceling the order execution context")
|
|
e.cancelExecution()
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) SetOrderUpdateRateLimit(rateLimit *rate.Limiter) {
|
|
e.orderUpdateRateLimit = rateLimit
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error {
|
|
gracefulCtx, gracefulCancel := context.WithTimeout(ctx, 30*time.Second)
|
|
defer gracefulCancel()
|
|
return e.activeMakerOrders.GracefulCancel(gracefulCtx, e.exchange)
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) {
|
|
defer func() {
|
|
if err := e.cancelActiveOrders(ctx); err != nil {
|
|
e.logger.WithError(err).Error("cancel active orders error")
|
|
}
|
|
|
|
e.cancelUserDataStream()
|
|
e.done.Emit()
|
|
}()
|
|
|
|
ticker := time.NewTimer(e.updateInterval)
|
|
defer ticker.Stop()
|
|
|
|
monitor := NewBboMonitor()
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
|
|
case <-e.orderBook.C:
|
|
changed := monitor.UpdateFromBook(e.orderBook)
|
|
if !changed {
|
|
continue
|
|
}
|
|
|
|
// orderBook.C sends a signal when any price or quantity changes in the order book
|
|
if e.cancelContextIfTargetQuantityFilled() {
|
|
return
|
|
}
|
|
|
|
e.logger.Infof("%s order book changed, checking order...", e.symbol)
|
|
|
|
if err := e.updateOrder(ctx); err != nil {
|
|
e.logger.WithError(err).Errorf("order update failed")
|
|
}
|
|
|
|
case <-ticker.C:
|
|
changed := monitor.UpdateFromBook(e.orderBook)
|
|
if !changed {
|
|
continue
|
|
}
|
|
|
|
if e.cancelContextIfTargetQuantityFilled() {
|
|
return
|
|
}
|
|
|
|
if err := e.updateOrder(ctx); err != nil {
|
|
e.logger.WithError(err).Errorf("order update failed")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error {
|
|
if e.orderUpdateRateLimit != nil && !e.orderUpdateRateLimit.Allow() {
|
|
e.logger.Infof("rate limit exceeded, skip updating order")
|
|
return nil
|
|
}
|
|
|
|
book := e.orderBook.Copy()
|
|
sideBook := book.SideBook(e.side)
|
|
|
|
first, ok := sideBook.First()
|
|
if !ok {
|
|
return fmt.Errorf("empty %s %s side book", e.symbol, e.side)
|
|
}
|
|
|
|
// if there is no gap between the first price entry and the second price entry
|
|
second, ok := sideBook.Second()
|
|
if !ok {
|
|
return fmt.Errorf("no secoond price on the %s order book %s, can not update", e.symbol, e.side)
|
|
}
|
|
|
|
tickSize := e.market.TickSize
|
|
numOfTicks := fixedpoint.NewFromInt(int64(e.numOfTicks))
|
|
tickSpread := tickSize.Mul(numOfTicks)
|
|
|
|
// check and see if we need to cancel the existing active orders
|
|
for e.activeMakerOrders.NumOfOrders() > 0 {
|
|
orders := e.activeMakerOrders.Orders()
|
|
if len(orders) > 1 {
|
|
e.logger.Warnf("found more than 1 %s open orders on the orderbook", e.symbol)
|
|
}
|
|
|
|
// get the first active order
|
|
order := orders[0]
|
|
orderPrice := order.Price
|
|
// quantity := fixedpoint.NewFromFloat(order.Quantity)
|
|
|
|
remainingQuantity := order.Quantity.Sub(order.ExecutedQuantity)
|
|
if remainingQuantity.Compare(e.market.MinQuantity) <= 0 {
|
|
logrus.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String())
|
|
return nil
|
|
}
|
|
|
|
// if the first bid price or first ask price is the same to the current active order
|
|
// we should skip updating the order
|
|
// DO NOT UPDATE IF:
|
|
// tickSpread > 0 AND current order price == second price + tickSpread
|
|
// current order price == first price
|
|
logrus.Infof("orderPrice = %s, best price = %s, second level price = %s, tickSpread = %s",
|
|
orderPrice.String(),
|
|
first.Price.String(),
|
|
second.Price.String(),
|
|
tickSpread.String())
|
|
|
|
switch e.side {
|
|
case types.SideTypeBuy:
|
|
if tickSpread.Sign() > 0 && orderPrice.Compare(second.Price.Add(tickSpread)) == 0 {
|
|
e.logger.Infof("the current order is already on the best ask price %s, skip update", orderPrice.String())
|
|
return nil
|
|
} else if orderPrice == first.Price {
|
|
e.logger.Infof("the current order is already on the best bid price %s, skip update", orderPrice.String())
|
|
return nil
|
|
}
|
|
|
|
case types.SideTypeSell:
|
|
if tickSpread.Sign() > 0 && orderPrice.Compare(second.Price.Sub(tickSpread)) == 0 {
|
|
e.logger.Infof("the current order is already on the best ask price %s, skip update", orderPrice.String())
|
|
return nil
|
|
} else if orderPrice == first.Price {
|
|
e.logger.Infof("the current order is already on the best ask price %s, skip update", orderPrice.String())
|
|
return nil
|
|
}
|
|
}
|
|
|
|
if err := e.cancelActiveOrders(ctx); err != nil {
|
|
e.logger.Warnf("cancel active orders error: %v", err)
|
|
}
|
|
}
|
|
|
|
e.tradeCollector.Process()
|
|
|
|
if e.delayInterval > 0 {
|
|
time.Sleep(e.delayInterval)
|
|
}
|
|
|
|
orderForm, err := e.generateOrder()
|
|
if err != nil {
|
|
return err
|
|
} else if orderForm == nil {
|
|
return nil
|
|
}
|
|
|
|
return e.submitOrder(ctx, *orderForm)
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) submitOrder(ctx context.Context, orderForm types.SubmitOrder) error {
|
|
createdOrder, err := e.exchange.SubmitOrder(ctx, orderForm)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if createdOrder != nil {
|
|
e.orderStore.Add(*createdOrder)
|
|
e.activeMakerOrders.Add(*createdOrder)
|
|
e.tradeCollector.Process()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) getNewPrice() (fixedpoint.Value, error) {
|
|
newPrice := fixedpoint.Zero
|
|
book := e.orderBook.Copy()
|
|
sideBook := book.SideBook(e.side)
|
|
|
|
first, ok := sideBook.First()
|
|
if !ok {
|
|
return newPrice, fmt.Errorf("empty %s %s side book", e.symbol, e.side)
|
|
}
|
|
|
|
newPrice = first.Price
|
|
spread, ok := book.Spread()
|
|
if !ok {
|
|
return newPrice, errors.New("can not calculate spread, neither bid price or ask price exists")
|
|
}
|
|
|
|
tickSize := e.market.TickSize
|
|
tickSpread := tickSize.Mul(fixedpoint.NewFromInt(int64(e.numOfTicks)))
|
|
if spread.Compare(tickSize) > 0 {
|
|
// there is a gap in the spread
|
|
tickSpread = fixedpoint.Min(tickSpread, spread.Sub(tickSize))
|
|
switch e.side {
|
|
case types.SideTypeSell:
|
|
newPrice = newPrice.Sub(tickSpread)
|
|
case types.SideTypeBuy:
|
|
newPrice = newPrice.Add(tickSpread)
|
|
}
|
|
}
|
|
|
|
if e.stopPrice.Sign() > 0 {
|
|
switch e.side {
|
|
case types.SideTypeSell:
|
|
if newPrice.Compare(e.stopPrice) < 0 {
|
|
logrus.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s",
|
|
e.symbol,
|
|
newPrice.String(),
|
|
e.stopPrice.String(),
|
|
e.stopPrice.String())
|
|
newPrice = e.stopPrice
|
|
}
|
|
|
|
case types.SideTypeBuy:
|
|
if newPrice.Compare(e.stopPrice) > 0 {
|
|
logrus.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s",
|
|
e.symbol,
|
|
newPrice.String(),
|
|
e.stopPrice.String(),
|
|
e.stopPrice.String())
|
|
newPrice = e.stopPrice
|
|
}
|
|
}
|
|
}
|
|
|
|
return newPrice, nil
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) getRemainingQuantity() fixedpoint.Value {
|
|
base := e.position.GetBase()
|
|
return e.targetQuantity.Sub(base.Abs())
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) isDeadlineExceeded() bool {
|
|
if e.deadlineTime != nil && !e.deadlineTime.IsZero() {
|
|
return time.Since(*e.deadlineTime) > 0
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) calculateNewOrderQuantity(price fixedpoint.Value) (fixedpoint.Value, error) {
|
|
minQuantity := e.market.MinQuantity
|
|
remainingQuantity := e.getRemainingQuantity()
|
|
|
|
if remainingQuantity.Sign() <= 0 {
|
|
e.cancelExecution()
|
|
return fixedpoint.Zero, nil
|
|
}
|
|
|
|
if remainingQuantity.Compare(minQuantity) < 0 {
|
|
e.logger.Warnf("can not continue placing orders, the remaining quantity %s is less than the min quantity %s", remainingQuantity.String(), minQuantity.String())
|
|
|
|
e.cancelExecution()
|
|
return fixedpoint.Zero, nil
|
|
}
|
|
|
|
// if deadline exceeded, we should return the remaining quantity
|
|
if e.isDeadlineExceeded() {
|
|
return remainingQuantity, nil
|
|
}
|
|
|
|
// when slice = 1000, if we only have 998, we should adjust our quantity to 998
|
|
orderQuantity := fixedpoint.Min(e.sliceQuantity, remainingQuantity)
|
|
|
|
// if the remaining quantity in the next round is not enough, we should merge the remaining quantity into this round
|
|
// if there are rest slices
|
|
nextRemainingQuantity := remainingQuantity.Sub(e.sliceQuantity)
|
|
|
|
if nextRemainingQuantity.Sign() > 0 && e.market.IsDustQuantity(nextRemainingQuantity, price) {
|
|
orderQuantity = remainingQuantity
|
|
}
|
|
|
|
orderQuantity = e.market.AdjustQuantityByMinNotional(orderQuantity, price)
|
|
return orderQuantity, nil
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) generateOrder() (*types.SubmitOrder, error) {
|
|
newPrice, err := e.getNewPrice()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
orderQuantity, err := e.calculateNewOrderQuantity(newPrice)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
balances, err := e.exchange.QueryAccountBalances(e.executionCtx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
switch e.side {
|
|
case types.SideTypeSell:
|
|
// check base balance for sell, try to sell as more as possible
|
|
if b, ok := balances[e.market.BaseCurrency]; ok {
|
|
orderQuantity = fixedpoint.Min(b.Available, orderQuantity)
|
|
}
|
|
|
|
case types.SideTypeBuy:
|
|
// check base balance for sell, try to sell as more as possible
|
|
if b, ok := balances[e.market.QuoteCurrency]; ok {
|
|
orderQuantity = e.market.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available)
|
|
}
|
|
}
|
|
|
|
if e.isDeadlineExceeded() {
|
|
return &types.SubmitOrder{
|
|
Symbol: e.symbol,
|
|
Side: e.side,
|
|
Type: types.OrderTypeMarket,
|
|
Quantity: orderQuantity,
|
|
Market: e.market,
|
|
}, nil
|
|
}
|
|
|
|
return &types.SubmitOrder{
|
|
Symbol: e.symbol,
|
|
Side: e.side,
|
|
Type: types.OrderTypeLimitMaker,
|
|
Quantity: orderQuantity,
|
|
Price: newPrice,
|
|
Market: e.market,
|
|
TimeInForce: types.TimeInForceGTC,
|
|
}, nil
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) Start(ctx context.Context) error {
|
|
if e.executionCtx != nil {
|
|
return errors.New("executionCtx is not nil, you can't start the executor twice")
|
|
}
|
|
|
|
e.executionCtx, e.cancelExecution = context.WithCancel(ctx)
|
|
e.userDataStreamCtx, e.cancelUserDataStream = context.WithCancel(ctx)
|
|
|
|
go e.connectMarketData(e.executionCtx)
|
|
go e.connectUserData(e.userDataStreamCtx)
|
|
|
|
e.logger.Infof("waiting for connections ready...")
|
|
|
|
if err := e.WaitForConnection(ctx); err != nil {
|
|
e.cancelExecution()
|
|
return err
|
|
}
|
|
|
|
e.logger.Infof("connections ready, starting order updater...")
|
|
|
|
go e.orderUpdater(e.executionCtx)
|
|
return nil
|
|
}
|
|
|
|
func (e *FixedQuantityExecutor) WaitForConnection(ctx context.Context) error {
|
|
if !selectSignalOrTimeout(ctx, e.marketDataStreamConnectC, 10*time.Second) {
|
|
return fmt.Errorf("market data stream connection timeout")
|
|
}
|
|
|
|
if !selectSignalOrTimeout(ctx, e.userDataStreamConnectC, 10*time.Second) {
|
|
return fmt.Errorf("user data stream connection timeout")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Done returns a channel that emits a signal when the execution is done.
|
|
func (e *FixedQuantityExecutor) Done() <-chan struct{} {
|
|
return e.done.Chan()
|
|
}
|
|
|
|
// Shutdown stops the execution
|
|
// If we call this method, it means the execution is still running,
|
|
// We need it to:
|
|
// 1. Stop the order updater (by using the execution context)
|
|
// 2. The order updater cancels all open orders and closes the user data stream
|
|
func (e *FixedQuantityExecutor) Shutdown(shutdownCtx context.Context) {
|
|
e.tradeCollector.Process()
|
|
|
|
e.mu.Lock()
|
|
if e.cancelExecution != nil {
|
|
e.cancelExecution()
|
|
}
|
|
e.mu.Unlock()
|
|
|
|
for {
|
|
select {
|
|
|
|
case <-shutdownCtx.Done():
|
|
return
|
|
|
|
case <-e.done.Chan():
|
|
return
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
func selectSignalOrTimeout(ctx context.Context, c chan struct{}, timeout time.Duration) bool {
|
|
select {
|
|
case <-ctx.Done():
|
|
return false
|
|
case <-time.After(timeout):
|
|
return false
|
|
case <-c:
|
|
return true
|
|
}
|
|
}
|