diff --git a/pkg/bbgo/twap_order_executor.go b/pkg/bbgo/twap_order_executor.go index b597687ff..f4dbfa555 100644 --- a/pkg/bbgo/twap_order_executor.go +++ b/pkg/bbgo/twap_order_executor.go @@ -25,9 +25,10 @@ type TwapExecution struct { market types.Market marketDataStream types.Stream userDataStream types.Stream - orderBook *types.StreamOrderBook - currentPrice fixedpoint.Value - activePosition fixedpoint.Value + + orderBook *types.StreamOrderBook + currentPrice fixedpoint.Value + activePosition fixedpoint.Value activeMakerOrders *LocalActiveOrderBook orderStore *OrderStore @@ -91,6 +92,7 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e tickSize := fixedpoint.NewFromFloat(e.market.TickSize) if spread > tickSize { + log.Infof("spread %f is greater than the tick size %f, adding 1 tick to the price...", spread.Float64(), tickSize.Float64()) switch e.Side { case types.SideTypeSell: newPrice -= fixedpoint.NewFromFloat(e.market.TickSize) @@ -113,11 +115,6 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e } func (e *TwapExecution) updateOrder(ctx context.Context) error { - book := e.orderBook.Get() - bestBid, _ := book.BestBid() - bestAsk, _ := book.BestAsk() - log.Infof("best bid %f, best ask %f", bestBid.Price.Float64(), bestAsk.Price.Float64()) - sideBook, err := e.getSideBook() if err != nil { return err @@ -154,22 +151,20 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { // if there is no gap between the first price entry and the second price entry second, ok := sideBook.Second() if !ok { - return fmt.Errorf("there is no secoond price on the %s order book %s", e.Symbol, e.Side) + return fmt.Errorf("there is no secoond price on the %s order book %s, can not update", e.Symbol, e.Side) } - log.Infof("checking second price %f - %f") // if there is no gap if fixedpoint.Abs(first.Price-second.Price) == tickSize { + log.Infof("there is no gap between the second price %f and the first price %f (tick size = %f), skip updating", + first.Price.Float64(), + second.Price.Float64(), + tickSize.Float64()) return nil } } - log.Infof("canceling orders...") - if err := e.Session.Exchange.CancelOrders(ctx, orders...); err != nil { - log.WithError(err).Errorf("can not cancel %s orders", e.Symbol) - } - - time.Sleep(3 * time.Second) + e.cancelActiveOrders(ctx) } orderForm, err := e.newBestPriceMakerOrder() @@ -187,10 +182,33 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { return nil } +func (e *TwapExecution) cancelActiveOrders(ctx context.Context) { + didCancel := false + for e.activeMakerOrders.NumOfOrders() > 0 { + didCancel = true + + log.Infof("canceling open orders...") + orders := e.activeMakerOrders.Orders() + if err := e.Session.Exchange.CancelOrders(ctx, orders...); err != nil { + log.WithError(err).Errorf("can not cancel %s orders", e.Symbol) + } + time.Sleep(3 * time.Second) + } + + if didCancel { + log.Infof("orders are canceled successfully") + } +} + func (e *TwapExecution) orderUpdater(ctx context.Context) { rateLimiter := rate.NewLimiter(rate.Every(time.Minute), 15) ticker := time.NewTimer(5 * time.Second) defer ticker.Stop() + + defer func() { + e.cancelActiveOrders(context.Background()) + }() + for { select { case <-ctx.Done(): @@ -218,7 +236,7 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { } } -func (e TwapExecution) tradeUpdateHandler(trade types.Trade) { +func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { // ignore trades that are not in the symbol we interested if trade.Symbol != e.Symbol { return @@ -236,6 +254,10 @@ func (e TwapExecution) tradeUpdateHandler(trade types.Trade) { log.Infof("position updated: %+v", e.position) } +func (e *TwapExecution) handleFilledOrder(order types.Order) { + log.Infof("order is filled: %s", order.String()) +} + func (e *TwapExecution) Run(ctx context.Context) error { var ok bool e.market, ok = e.Session.Market(e.Symbol) @@ -252,7 +274,7 @@ func (e *TwapExecution) Run(ctx context.Context) error { go e.connectMarketData(ctx) e.userDataStream = e.Session.Exchange.NewStream() - e.userDataStream.OnTradeUpdate(e.tradeUpdateHandler) + e.userDataStream.OnTradeUpdate(e.handleTradeUpdate) e.position = &Position{ Symbol: e.Symbol, BaseCurrency: e.market.BaseCurrency, @@ -262,6 +284,7 @@ func (e *TwapExecution) Run(ctx context.Context) error { e.orderStore = NewOrderStore(e.Symbol) e.orderStore.BindStream(e.userDataStream) e.activeMakerOrders = NewLocalActiveOrderBook() + e.activeMakerOrders.OnFilled(e.handleFilledOrder) e.activeMakerOrders.BindStream(e.userDataStream) go e.connectUserData(ctx) diff --git a/pkg/types/orderbook.go b/pkg/types/orderbook.go index a8512128d..e8b93fd4a 100644 --- a/pkg/types/orderbook.go +++ b/pkg/types/orderbook.go @@ -140,7 +140,7 @@ func (b *OrderBook) Spread() (fixedpoint.Value, bool) { return 0, false } - bestAsk, ok := b.BestBid() + bestAsk, ok := b.BestAsk() if !ok { return 0, false }