diff --git a/pkg/bbgo/order_executor_general.go b/pkg/bbgo/order_executor_general.go index 59ee81639..64df59c00 100644 --- a/pkg/bbgo/order_executor_general.go +++ b/pkg/bbgo/order_executor_general.go @@ -27,7 +27,9 @@ var quantityReduceDelta = fixedpoint.NewFromFloat(0.005) // This is for the maximum retries const submitOrderRetryLimit = 5 +// BaseOrderExecutor provides the common accessors for order executor type BaseOrderExecutor struct { + exchange types.Exchange session *ExchangeSession activeMakerOrders *ActiveOrderBook orderStore *core.OrderStore @@ -43,8 +45,8 @@ func (e *BaseOrderExecutor) ActiveMakerOrders() *ActiveOrderBook { // GracefulCancel cancels all active maker orders if orders are not given, otherwise cancel all the given orders func (e *BaseOrderExecutor) GracefulCancel(ctx context.Context, orders ...types.Order) error { - if err := e.activeMakerOrders.GracefulCancel(ctx, e.session.Exchange, orders...); err != nil { - return errors.Wrap(err, "graceful cancel error") + if err := e.activeMakerOrders.GracefulCancel(ctx, e.exchange, orders...); err != nil { + return errors.Wrap(err, "graceful cancel order error") } return nil @@ -83,6 +85,7 @@ func NewGeneralOrderExecutor( executor := &GeneralOrderExecutor{ BaseOrderExecutor: BaseOrderExecutor{ + exchange: session.Exchange, session: session, exchange: session.Exchange, activeMakerOrders: NewActiveOrderBook(symbol), @@ -112,7 +115,7 @@ func (e *GeneralOrderExecutor) SetMaxRetries(maxRetries uint) { } func (e *GeneralOrderExecutor) startMarginAssetUpdater(ctx context.Context) { - marginService, ok := e.session.Exchange.(types.MarginBorrowRepayService) + marginService, ok := e.exchange.(types.MarginBorrowRepayService) if !ok { log.Warnf("session %s (%T) exchange does not support MarginBorrowRepayService", e.session.Name, e.session.Exchange) return diff --git a/pkg/twap/v2/bbomonitor.go b/pkg/twap/v2/bbomonitor.go new file mode 100644 index 000000000..5081fc9bb --- /dev/null +++ b/pkg/twap/v2/bbomonitor.go @@ -0,0 +1,53 @@ +package twap + +import ( + "time" + + "github.com/c9s/bbgo/pkg/types" +) + +// BboMonitor monitors the best bid and ask price and volume. +// +//go:generate callbackgen -type BboMonitor +type BboMonitor struct { + Bid types.PriceVolume + Ask types.PriceVolume + UpdatedTime time.Time + + updateCallbacks []func(bid, ask types.PriceVolume) +} + +func NewBboMonitor() *BboMonitor { + return &BboMonitor{} +} + +func (m *BboMonitor) OnUpdateFromBook(book *types.StreamOrderBook) bool { + bestBid, ok1 := book.BestBid() + bestAsk, ok2 := book.BestAsk() + if !ok1 || !ok2 { + return false + } + + return m.Update(bestBid, bestAsk, book.LastUpdateTime()) +} + +func (m *BboMonitor) Update(bid, ask types.PriceVolume, t time.Time) bool { + changed := false + if m.Bid.Price.Compare(bid.Price) != 0 || m.Bid.Volume.Compare(bid.Volume) != 0 { + changed = true + } + + if m.Ask.Price.Compare(ask.Price) != 0 || m.Ask.Volume.Compare(ask.Volume) != 0 { + changed = true + } + + m.Bid = bid + m.Ask = ask + m.UpdatedTime = t + + if changed { + m.EmitUpdate(bid, ask) + } + + return changed +} diff --git a/pkg/twap/v2/bbomonitor_callbacks.go b/pkg/twap/v2/bbomonitor_callbacks.go new file mode 100644 index 000000000..4e413eaab --- /dev/null +++ b/pkg/twap/v2/bbomonitor_callbacks.go @@ -0,0 +1,17 @@ +// Code generated by "callbackgen -type BboMonitor"; DO NOT EDIT. + +package twap + +import ( + "github.com/c9s/bbgo/pkg/types" +) + +func (m *BboMonitor) OnUpdate(cb func(bid types.PriceVolume, ask types.PriceVolume)) { + m.updateCallbacks = append(m.updateCallbacks, cb) +} + +func (m *BboMonitor) EmitUpdate(bid types.PriceVolume, ask types.PriceVolume) { + for _, cb := range m.updateCallbacks { + cb(bid, ask) + } +} diff --git a/pkg/twap/v2/stream_executor.go b/pkg/twap/v2/stream_executor.go index 1f76176bc..b05d7e410 100644 --- a/pkg/twap/v2/stream_executor.go +++ b/pkg/twap/v2/stream_executor.go @@ -3,6 +3,7 @@ package twap import ( "context" "errors" + "fmt" "sync" "time" @@ -60,9 +61,9 @@ type FixedQuantityExecutor struct { // configuration fields - symbol string - side types.SideType - targetQuantity fixedpoint.Value + symbol string + side types.SideType + targetQuantity, sliceQuantity fixedpoint.Value // updateInterval is a fixed update interval for placing new order updateInterval time.Duration @@ -70,10 +71,13 @@ type FixedQuantityExecutor struct { // delayInterval is the delay interval between each order placement delayInterval time.Duration - // priceLimit is the price limit for the order + // numOfTicks is the number of price ticks behind the best bid to place the order + numOfTicks int + + // stopPrice is the price limit for the order // for buy-orders, the price limit is the maximum price // for sell-orders, the price limit is the minimum price - priceLimit fixedpoint.Value + stopPrice fixedpoint.Value // deadlineTime is the deadline time for the order execution deadlineTime *time.Time @@ -92,6 +96,7 @@ type FixedQuantityExecutor struct { activeMakerOrders *bbgo.ActiveOrderBook orderStore *core.OrderStore position *types.Position + tradeCollector *core.TradeCollector logger logrus.FieldLogger @@ -105,22 +110,28 @@ func NewStreamExecutor( symbol string, market types.Market, side types.SideType, - targetQuantity fixedpoint.Value, + targetQuantity, sliceQuantity fixedpoint.Value, ) *FixedQuantityExecutor { + orderStore := core.NewOrderStore(symbol) + position := types.NewPositionFromMarket(market) + tradeCollector := core.NewTradeCollector(symbol, position, orderStore) return &FixedQuantityExecutor{ exchange: exchange, symbol: symbol, side: side, market: market, - position: types.NewPositionFromMarket(market), targetQuantity: targetQuantity, + sliceQuantity: sliceQuantity, updateInterval: defaultUpdateInterval, logger: logrus.WithFields(logrus.Fields{ "executor": "twapStream", "symbol": symbol, }), - done: NewDoneSignal(), + orderStore: orderStore, + tradeCollector: tradeCollector, + position: position, + done: NewDoneSignal(), } } @@ -177,7 +188,6 @@ func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error { func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1) - _ = updateLimiter defer func() { if err := e.cancelActiveOrders(ctx); err != nil { @@ -191,12 +201,20 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { ticker := time.NewTimer(e.updateInterval) defer ticker.Stop() + monitor := NewBboMonitor() + for { select { case <-ctx.Done(): return case <-e.orderBook.C: + changed := monitor.OnUpdateFromBook(e.orderBook) + if !changed { + continue + } + + // orderBook.C sends a signal when any price or quantity changes in the order book if !updateLimiter.Allow() { break } @@ -207,13 +225,16 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { e.logger.Infof("%s order book changed, checking order...", e.symbol) - /* - if err := e.updateOrder(ctx); err != nil { - e.logger.WithError(err).Errorf("order update failed") - } - */ + if err := e.updateOrder(ctx); err != nil { + e.logger.WithError(err).Errorf("order update failed") + } case <-ticker.C: + changed := monitor.OnUpdateFromBook(e.orderBook) + if !changed { + continue + } + if !updateLimiter.Allow() { break } @@ -222,15 +243,240 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { return } - /* - if err := e.updateOrder(ctx); err != nil { - e.logger.WithError(err).Errorf("order update failed") - } - */ + if err := e.updateOrder(ctx); err != nil { + e.logger.WithError(err).Errorf("order update failed") + } } } } +func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error { + book := e.orderBook.Copy() + sideBook := book.SideBook(e.side) + + first, ok := sideBook.First() + if !ok { + return fmt.Errorf("empty %s %s side book", e.symbol, e.side) + } + + // if there is no gap between the first price entry and the second price entry + second, ok := sideBook.Second() + if !ok { + return fmt.Errorf("no secoond price on the %s order book %s, can not update", e.symbol, e.side) + } + + tickSize := e.market.TickSize + numOfTicks := fixedpoint.NewFromInt(int64(e.numOfTicks)) + tickSpread := tickSize.Mul(numOfTicks) + + // check and see if we need to cancel the existing active orders + for e.activeMakerOrders.NumOfOrders() > 0 { + orders := e.activeMakerOrders.Orders() + + if len(orders) > 1 { + logrus.Warnf("more than 1 %s open orders in the strategy...", e.symbol) + } + + // get the first active order + order := orders[0] + orderPrice := order.Price + // quantity := fixedpoint.NewFromFloat(order.Quantity) + + remainingQuantity := order.Quantity.Sub(order.ExecutedQuantity) + if remainingQuantity.Compare(e.market.MinQuantity) <= 0 { + logrus.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String()) + return nil + } + + // if the first bid price or first ask price is the same to the current active order + // we should skip updating the order + // DO NOT UPDATE IF: + // tickSpread > 0 AND current order price == second price + tickSpread + // current order price == first price + logrus.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String()) + + switch e.side { + case types.SideTypeBuy: + if tickSpread.Sign() > 0 && orderPrice == second.Price.Add(tickSpread) { + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) + return nil + } else if orderPrice == first.Price { + logrus.Infof("the current order is already on the best bid price %s", orderPrice.String()) + return nil + } + + case types.SideTypeSell: + if tickSpread.Sign() > 0 && orderPrice == second.Price.Sub(tickSpread) { + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) + return nil + } else if orderPrice == first.Price { + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) + return nil + } + } + + if err := e.cancelActiveOrders(ctx); err != nil { + e.logger.Warnf("cancel active orders error: %v", err) + } + } + + orderForm, err := e.generateOrder() + if err != nil { + return err + } else if orderForm == nil { + return nil + } + + createdOrder, err := e.exchange.SubmitOrder(ctx, *orderForm) + if err != nil { + return err + } + + if createdOrder != nil { + e.orderStore.Add(*createdOrder) + e.activeMakerOrders.Add(*createdOrder) + e.tradeCollector.Process() + } + + return nil +} + +func (e *FixedQuantityExecutor) getNewPrice() (fixedpoint.Value, error) { + newPrice := fixedpoint.Zero + book := e.orderBook.Copy() + sideBook := book.SideBook(e.side) + + first, ok := sideBook.First() + if !ok { + return newPrice, fmt.Errorf("empty %s %s side book", e.symbol, e.side) + } + + newPrice = first.Price + spread, ok := book.Spread() + if !ok { + return newPrice, errors.New("can not calculate spread, neither bid price or ask price exists") + } + + tickSize := e.market.TickSize + tickSpread := tickSize.Mul(fixedpoint.NewFromInt(int64(e.numOfTicks))) + if spread.Compare(tickSize) > 0 { + // there is a gap in the spread + tickSpread = fixedpoint.Min(tickSpread, spread.Sub(tickSize)) + switch e.side { + case types.SideTypeSell: + newPrice = newPrice.Sub(tickSpread) + case types.SideTypeBuy: + newPrice = newPrice.Add(tickSpread) + } + } + + if e.stopPrice.Sign() > 0 { + switch e.side { + case types.SideTypeSell: + if newPrice.Compare(e.stopPrice) < 0 { + logrus.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s", + e.symbol, + newPrice.String(), + e.stopPrice.String(), + e.stopPrice.String()) + newPrice = e.stopPrice + } + + case types.SideTypeBuy: + if newPrice.Compare(e.stopPrice) > 0 { + logrus.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s", + e.symbol, + newPrice.String(), + e.stopPrice.String(), + e.stopPrice.String()) + newPrice = e.stopPrice + } + } + } + + return newPrice, nil +} + +func (e *FixedQuantityExecutor) generateOrder() (orderForm *types.SubmitOrder, err error) { + newPrice, err := e.getNewPrice() + if err != nil { + return nil, err + } + + minQuantity := e.market.MinQuantity + base := e.position.GetBase() + + restQuantity := e.targetQuantity.Sub(base.Abs()) + + if restQuantity.Sign() <= 0 { + if e.cancelContextIfTargetQuantityFilled() { + return nil, nil + } + } + + if restQuantity.Compare(minQuantity) < 0 { + return nil, fmt.Errorf("can not continue placing orders, rest quantity %s is less than the min quantity %s", restQuantity.String(), minQuantity.String()) + } + + // when slice = 1000, if we only have 998, we should adjust our quantity to 998 + orderQuantity := fixedpoint.Min(e.sliceQuantity, restQuantity) + + // if the rest quantity in the next round is not enough, we should merge the rest quantity into this round + // if there are rest slices + nextRestQuantity := restQuantity.Sub(e.sliceQuantity) + if nextRestQuantity.Sign() > 0 && nextRestQuantity.Compare(minQuantity) < 0 { + orderQuantity = restQuantity + } + + minNotional := e.market.MinNotional + orderQuantity = bbgo.AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional) + + balances, err := e.exchange.QueryAccountBalances(e.executionCtx) + if err != nil { + return nil, err + } + + switch e.side { + case types.SideTypeSell: + // check base balance for sell, try to sell as more as possible + if b, ok := balances[e.market.BaseCurrency]; ok { + orderQuantity = fixedpoint.Min(b.Available, orderQuantity) + } + + case types.SideTypeBuy: + // check base balance for sell, try to sell as more as possible + if b, ok := balances[e.market.QuoteCurrency]; ok { + orderQuantity = bbgo.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) + } + } + + if e.deadlineTime != nil && !e.deadlineTime.IsZero() { + now := time.Now() + if now.After(*e.deadlineTime) { + orderForm = &types.SubmitOrder{ + Symbol: e.symbol, + Side: e.side, + Type: types.OrderTypeMarket, + Quantity: restQuantity, + Market: e.market, + } + return orderForm, nil + } + } + + orderForm = &types.SubmitOrder{ + Symbol: e.symbol, + Side: e.side, + Type: types.OrderTypeLimitMaker, + Quantity: orderQuantity, + Price: newPrice, + Market: e.market, + TimeInForce: "GTC", + } + + return orderForm, err +} + func (e *FixedQuantityExecutor) Start(ctx context.Context) error { if e.marketDataStream != nil { return errors.New("market data stream is not nil, you can't start the executor twice") @@ -248,11 +494,16 @@ func (e *FixedQuantityExecutor) Start(ctx context.Context) error { e.orderBook = types.NewStreamBook(e.symbol) e.orderBook.BindStream(e.marketDataStream) - e.orderStore = core.NewOrderStore(e.symbol) + // private channels + e.userDataStream = e.exchange.NewStream() e.orderStore.BindStream(e.userDataStream) e.activeMakerOrders = bbgo.NewActiveOrderBook(e.symbol) e.activeMakerOrders.OnFilled(e.handleFilledOrder) e.activeMakerOrders.BindStream(e.userDataStream) + e.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) { + e.logger.Info(trade.String()) + }) + e.tradeCollector.BindStream(e.userDataStream) go e.connectMarketData(e.executionCtx) go e.connectUserData(e.userDataStreamCtx)