diff --git a/pkg/twap/v2/stream_executor.go b/pkg/twap/v2/stream_executor.go index bf5fa7ec9..042a1fa94 100644 --- a/pkg/twap/v2/stream_executor.go +++ b/pkg/twap/v2/stream_executor.go @@ -8,7 +8,6 @@ import ( "time" "github.com/sirupsen/logrus" - "golang.org/x/time/rate" "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/core" @@ -238,7 +237,7 @@ func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error { } func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { - updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1) + // updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 2) defer func() { if err := e.cancelActiveOrders(ctx); err != nil { @@ -266,9 +265,11 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { } // orderBook.C sends a signal when any price or quantity changes in the order book - if !updateLimiter.Allow() { - break - } + /* + if !updateLimiter.Allow() { + break + } + */ if e.cancelContextIfTargetQuantityFilled() { return @@ -286,9 +287,11 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { continue } - if !updateLimiter.Allow() { - break - } + /* + if !updateLimiter.Allow() { + break + } + */ if e.cancelContextIfTargetQuantityFilled() { return @@ -321,7 +324,6 @@ func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error { tickSpread := tickSize.Mul(numOfTicks) // check and see if we need to cancel the existing active orders - for e.activeMakerOrders.NumOfOrders() > 0 { orders := e.activeMakerOrders.Orders() if len(orders) > 1 { @@ -371,6 +373,8 @@ func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error { } } + e.tradeCollector.Process() + orderForm, err := e.generateOrder() if err != nil { return err @@ -560,14 +564,10 @@ func (e *FixedQuantityExecutor) Start(ctx context.Context) error { go e.connectUserData(e.userDataStreamCtx) e.logger.Infof("waiting for connections ready...") - if !selectSignalOrTimeout(ctx, e.marketDataStreamConnectC, 10*time.Second) { - e.cancelExecution() - return fmt.Errorf("market data stream connection timeout") - } - if !selectSignalOrTimeout(ctx, e.userDataStreamConnectC, 10*time.Second) { + if err := e.WaitForConnection(ctx); err != nil { e.cancelExecution() - return fmt.Errorf("user data stream connection timeout") + return err } e.logger.Infof("connections ready, starting order updater...") @@ -576,6 +576,18 @@ func (e *FixedQuantityExecutor) Start(ctx context.Context) error { return nil } +func (e *FixedQuantityExecutor) WaitForConnection(ctx context.Context) error { + if !selectSignalOrTimeout(ctx, e.marketDataStreamConnectC, 10*time.Second) { + return fmt.Errorf("market data stream connection timeout") + } + + if !selectSignalOrTimeout(ctx, e.userDataStreamConnectC, 10*time.Second) { + return fmt.Errorf("user data stream connection timeout") + } + + return nil +} + // Done returns a channel that emits a signal when the execution is done. func (e *FixedQuantityExecutor) Done() <-chan struct{} { return e.done.Chan() diff --git a/pkg/twap/v2/stream_executor_test.go b/pkg/twap/v2/stream_executor_test.go index 842ef21c3..9ed443e27 100644 --- a/pkg/twap/v2/stream_executor_test.go +++ b/pkg/twap/v2/stream_executor_test.go @@ -2,6 +2,7 @@ package twap import ( "context" + "fmt" "testing" "time" @@ -28,6 +29,29 @@ func getTestMarket() types.Market { return market } +type OrderMatcher struct { + Order types.Order +} + +func MatchOrder(o types.Order) *OrderMatcher { + return &OrderMatcher{ + Order: o, + } +} + +func (m *OrderMatcher) Matches(x interface{}) bool { + order, ok := x.(types.Order) + if !ok { + return false + } + + return m.Order.OrderID == order.OrderID && m.Order.Price.Compare(m.Order.Price) == 0 +} + +func (m *OrderMatcher) String() string { + return fmt.Sprintf("OrderMatcher expects %+v", m.Order) +} + type CatchMatcher struct { f func(x any) } @@ -47,10 +71,44 @@ func (m *CatchMatcher) String() string { return "CatchMatcher" } +func bindMockMarketDataStream(mockStream *mocks.MockStream, stream *types.StandardStream) { + mockStream.EXPECT().OnBookSnapshot(Catch(func(x any) { + stream.OnBookSnapshot(x.(func(book types.SliceOrderBook))) + })).AnyTimes() + mockStream.EXPECT().OnBookUpdate(Catch(func(x any) { + stream.OnBookUpdate(x.(func(book types.SliceOrderBook))) + })).AnyTimes() + mockStream.EXPECT().OnConnect(Catch(func(x any) { + stream.OnConnect(x.(func())) + })).AnyTimes() +} + +func bindMockUserDataStream(mockStream *mocks.MockStream, stream *types.StandardStream) { + mockStream.EXPECT().OnOrderUpdate(Catch(func(x any) { + stream.OnOrderUpdate(x.(func(order types.Order))) + })).AnyTimes() + mockStream.EXPECT().OnTradeUpdate(Catch(func(x any) { + stream.OnTradeUpdate(x.(func(order types.Trade))) + })).AnyTimes() + mockStream.EXPECT().OnBalanceUpdate(Catch(func(x any) { + stream.OnBalanceUpdate(x.(func(m types.BalanceMap))) + })).AnyTimes() + mockStream.EXPECT().OnConnect(Catch(func(x any) { + stream.OnConnect(x.(func())) + })).AnyTimes() + mockStream.EXPECT().OnAuth(Catch(func(x any) { + stream.OnAuth(x.(func())) + })) +} + func TestNewStreamExecutor(t *testing.T) { + exchangeName := types.ExchangeBinance symbol := "BTCUSDT" market := getTestMarket() + targetQuantity := Number(100) + sliceQuantity := Number(1) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -67,37 +125,53 @@ func TestNewStreamExecutor(t *testing.T) { mockMarketDataStream.EXPECT().Subscribe(types.BookChannel, symbol, types.SubscribeOptions{ Depth: types.DepthLevelMedium, }) - mockMarketDataStream.EXPECT().OnBookSnapshot(Catch(func(x any) { - marketDataStream.OnBookSnapshot(x.(func(book types.SliceOrderBook))) - })).AnyTimes() - mockMarketDataStream.EXPECT().OnBookUpdate(Catch(func(x any) { - marketDataStream.OnBookUpdate(x.(func(book types.SliceOrderBook))) - })).AnyTimes() - mockMarketDataStream.EXPECT().OnConnect(Catch(func(x any) { - marketDataStream.OnConnect(x.(func())) - })).AnyTimes() + + bindMockMarketDataStream(mockMarketDataStream, marketDataStream) + mockMarketDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx)) mockUserDataStream := mocks.NewMockStream(mockCtrl) - mockUserDataStream.EXPECT().OnOrderUpdate(Catch(func(x any) { - userDataStream.OnOrderUpdate(x.(func(order types.Order))) - })).AnyTimes() - mockUserDataStream.EXPECT().OnTradeUpdate(Catch(func(x any) { - userDataStream.OnTradeUpdate(x.(func(order types.Trade))) - })).AnyTimes() - mockUserDataStream.EXPECT().OnConnect(Catch(func(x any) { - userDataStream.OnConnect(x.(func())) - })).AnyTimes() - mockUserDataStream.EXPECT().OnAuth(Catch(func(x any) { - userDataStream.OnAuth(x.(func())) - })) + bindMockUserDataStream(mockUserDataStream, userDataStream) mockUserDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx)) + initialBalances := types.BalanceMap{ + "BTC": types.Balance{ + Available: Number(2), + }, + "USDT": types.Balance{ + Available: Number(20_000), + }, + } + mockEx.EXPECT().NewStream().Return(mockMarketDataStream) mockEx.EXPECT().NewStream().Return(mockUserDataStream) + mockEx.EXPECT().QueryAccountBalances(gomock.AssignableToTypeOf(ctx)).Return(initialBalances, nil) - executor := NewStreamExecutor(mockEx, symbol, market, types.SideTypeBuy, Number(100), Number(1)) - executor.SetUpdateInterval(time.Second) + // first order + firstSubmitOrder := types.SubmitOrder{ + Symbol: symbol, + Side: types.SideTypeBuy, + Type: types.OrderTypeLimitMaker, + Quantity: Number(1), + Price: Number(19400), + Market: market, + TimeInForce: types.TimeInForceGTC, + } + firstSubmitOrderTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) + firstOrder := types.Order{ + SubmitOrder: firstSubmitOrder, + Exchange: exchangeName, + OrderID: 1, + Status: types.OrderStatusNew, + ExecutedQuantity: Number(0.0), + IsWorking: true, + CreationTime: types.Time(firstSubmitOrderTime), + UpdateTime: types.Time(firstSubmitOrderTime), + } + mockEx.EXPECT().SubmitOrder(gomock.AssignableToTypeOf(ctx), firstSubmitOrder).Return(&firstOrder, nil) + + executor := NewStreamExecutor(mockEx, symbol, market, types.SideTypeBuy, targetQuantity, sliceQuantity) + executor.SetUpdateInterval(200 * time.Millisecond) go func() { err := executor.Start(ctx) @@ -111,6 +185,128 @@ func TestNewStreamExecutor(t *testing.T) { userDataStream.EmitAuth() }() + err := executor.WaitForConnection(ctx) + assert.NoError(t, err) + + t.Logf("sending book snapshot...") + snapshotTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) + marketDataStream.EmitBookSnapshot(types.SliceOrderBook{ + Symbol: symbol, + Bids: types.PriceVolumeSlice{ + {Price: Number(19400), Volume: Number(1)}, + {Price: Number(19300), Volume: Number(2)}, + {Price: Number(19200), Volume: Number(3)}, + }, + Asks: types.PriceVolumeSlice{ + {Price: Number(19450), Volume: Number(1)}, + {Price: Number(19550), Volume: Number(2)}, + {Price: Number(19650), Volume: Number(3)}, + }, + Time: snapshotTime, + LastUpdateId: 101, + }) + + time.Sleep(500 * time.Millisecond) + + t.Logf("sending book update...") + + // we expect the second order will be placed when the order update is received + secondSubmitOrder := types.SubmitOrder{ + Symbol: symbol, + Side: types.SideTypeBuy, + Type: types.OrderTypeLimitMaker, + Quantity: Number(1), + Price: Number(19420), + Market: market, + TimeInForce: types.TimeInForceGTC, + } + secondSubmitOrderTime := time.Date(2021, 1, 1, 0, 1, 0, 0, time.UTC) + secondOrder := types.Order{ + SubmitOrder: secondSubmitOrder, + Exchange: exchangeName, + OrderID: 2, + Status: types.OrderStatusNew, + ExecutedQuantity: Number(0.0), + IsWorking: true, + CreationTime: types.Time(secondSubmitOrderTime), + UpdateTime: types.Time(secondSubmitOrderTime), + } + mockEx.EXPECT().CancelOrders(context.Background(), MatchOrder(firstOrder)).DoAndReturn(func( + ctx context.Context, orders ...types.Order, + ) error { + orderUpdate := firstOrder + orderUpdate.Status = types.OrderStatusCanceled + userDataStream.EmitOrderUpdate(orderUpdate) + t.Logf("emit order update: %+v", orderUpdate) + return nil + }) + mockEx.EXPECT().QueryAccountBalances(gomock.AssignableToTypeOf(ctx)).Return(initialBalances, nil) + mockEx.EXPECT().SubmitOrder(gomock.AssignableToTypeOf(ctx), secondSubmitOrder).Return(&secondOrder, nil) + + t.Logf("waiting for the order update...") + time.Sleep(500 * time.Millisecond) + { + orders := executor.orderStore.Orders() + assert.Len(t, orders, 1, "should have 1 order in the order store") + } + + marketDataStream.EmitBookUpdate(types.SliceOrderBook{ + Symbol: symbol, + Bids: types.PriceVolumeSlice{ + {Price: Number(19420), Volume: Number(1)}, + {Price: Number(19300), Volume: Number(2)}, + {Price: Number(19200), Volume: Number(3)}, + }, + Asks: types.PriceVolumeSlice{ + {Price: Number(19450), Volume: Number(1)}, + {Price: Number(19550), Volume: Number(2)}, + {Price: Number(19650), Volume: Number(3)}, + }, + Time: snapshotTime, + LastUpdateId: 101, + }) + + t.Logf("waiting for the next order update...") + time.Sleep(500 * time.Millisecond) + + { + orders := executor.orderStore.Orders() + assert.Len(t, orders, 1, "should have 1 order in the order store") + } + + t.Logf("emitting trade update...") + userDataStream.EmitTradeUpdate(types.Trade{ + ID: 1, + OrderID: 2, + Exchange: exchangeName, + Price: Number(19420.0), + Quantity: Number(100.0), + QuoteQuantity: Number(100.0 * 19420.0), + Symbol: symbol, + Side: types.SideTypeBuy, + IsBuyer: true, + IsMaker: true, + Time: types.Time(secondSubmitOrderTime), + }) + + t.Logf("waiting for the trade callbacks...") + time.Sleep(500 * time.Millisecond) + + executor.tradeCollector.Process() + assert.Equal(t, Number(100), executor.position.GetBase()) + + mockEx.EXPECT().CancelOrders(context.Background(), MatchOrder(secondOrder)).DoAndReturn(func( + ctx context.Context, orders ...types.Order, + ) error { + orderUpdate := secondOrder + orderUpdate.Status = types.OrderStatusCanceled + userDataStream.EmitOrderUpdate(orderUpdate) + t.Logf("emit order #2 update: %+v", orderUpdate) + return nil + }) + assert.True(t, executor.cancelContextIfTargetQuantityFilled(), "target quantity should be filled") + + // finalizing and stop the executor select { case <-ctx.Done(): case <-time.After(10 * time.Second):