diff --git a/pkg/twap/v2/stream_executor.go b/pkg/twap/v2/stream_executor.go index b05d7e410..bf5fa7ec9 100644 --- a/pkg/twap/v2/stream_executor.go +++ b/pkg/twap/v2/stream_executor.go @@ -90,9 +90,11 @@ type FixedQuantityExecutor struct { market types.Market marketDataStream types.Stream - orderBook *types.StreamOrderBook - userDataStream types.Stream + orderBook *types.StreamOrderBook + + userDataStream types.Stream + activeMakerOrders *bbgo.ActiveOrderBook orderStore *core.OrderStore position *types.Position @@ -102,7 +104,9 @@ type FixedQuantityExecutor struct { mu sync.Mutex - done *DoneSignal + userDataStreamConnectC chan struct{} + marketDataStreamConnectC chan struct{} + done *DoneSignal } func NewStreamExecutor( @@ -112,10 +116,25 @@ func NewStreamExecutor( side types.SideType, targetQuantity, sliceQuantity fixedpoint.Value, ) *FixedQuantityExecutor { + + marketDataStream := exchange.NewStream() + marketDataStream.SetPublicOnly() + marketDataStream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{ + Depth: types.DepthLevelMedium, + }) + + orderBook := types.NewStreamBook(symbol) + orderBook.BindStream(marketDataStream) + + userDataStream := exchange.NewStream() orderStore := core.NewOrderStore(symbol) position := types.NewPositionFromMarket(market) tradeCollector := core.NewTradeCollector(symbol, position, orderStore) - return &FixedQuantityExecutor{ + orderStore.BindStream(userDataStream) + + activeMakerOrders := bbgo.NewActiveOrderBook(symbol) + + e := &FixedQuantityExecutor{ exchange: exchange, symbol: symbol, side: side, @@ -128,11 +147,43 @@ func NewStreamExecutor( "symbol": symbol, }), - orderStore: orderStore, - tradeCollector: tradeCollector, - position: position, - done: NewDoneSignal(), + marketDataStream: marketDataStream, + orderBook: orderBook, + + userDataStream: userDataStream, + + activeMakerOrders: activeMakerOrders, + orderStore: orderStore, + tradeCollector: tradeCollector, + position: position, + done: NewDoneSignal(), + + userDataStreamConnectC: make(chan struct{}), + marketDataStreamConnectC: make(chan struct{}), } + + e.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) { + e.logger.Info(trade.String()) + }) + e.tradeCollector.BindStream(e.userDataStream) + + activeMakerOrders.OnFilled(e.handleFilledOrder) + activeMakerOrders.BindStream(e.userDataStream) + + e.marketDataStream.OnConnect(func() { + e.logger.Info("market data stream on connect") + close(e.marketDataStreamConnectC) + e.logger.Infof("marketDataStreamConnectC closed") + }) + + // private channels + e.userDataStream.OnAuth(func() { + e.logger.Info("user data stream on auth") + close(e.userDataStreamConnectC) + e.logger.Info("userDataStreamConnectC closed") + }) + + return e } func (e *FixedQuantityExecutor) SetDeadlineTime(t time.Time) { @@ -270,11 +321,11 @@ func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error { tickSpread := tickSize.Mul(numOfTicks) // check and see if we need to cancel the existing active orders + for e.activeMakerOrders.NumOfOrders() > 0 { orders := e.activeMakerOrders.Orders() - if len(orders) > 1 { - logrus.Warnf("more than 1 %s open orders in the strategy...", e.symbol) + e.logger.Warnf("found more than 1 %s open orders on the orderbook", e.symbol) } // get the first active order @@ -397,40 +448,66 @@ func (e *FixedQuantityExecutor) getNewPrice() (fixedpoint.Value, error) { return newPrice, nil } -func (e *FixedQuantityExecutor) generateOrder() (orderForm *types.SubmitOrder, err error) { +func (e *FixedQuantityExecutor) getRemainingQuantity() fixedpoint.Value { + base := e.position.GetBase() + return e.targetQuantity.Sub(base.Abs()) +} + +func (e *FixedQuantityExecutor) isDeadlineExceeded() bool { + if e.deadlineTime != nil && !e.deadlineTime.IsZero() { + return time.Since(*e.deadlineTime) > 0 + } + + return false +} + +func (e *FixedQuantityExecutor) calculateNewOrderQuantity(price fixedpoint.Value) (fixedpoint.Value, error) { + minQuantity := e.market.MinQuantity + remainingQuantity := e.getRemainingQuantity() + + if remainingQuantity.Sign() <= 0 { + e.cancelExecution() + return fixedpoint.Zero, nil + } + + if remainingQuantity.Compare(minQuantity) < 0 { + e.logger.Warnf("can not continue placing orders, the remaining quantity %s is less than the min quantity %s", remainingQuantity.String(), minQuantity.String()) + + e.cancelExecution() + return fixedpoint.Zero, nil + } + + // if deadline exceeded, we should return the remaining quantity + if e.isDeadlineExceeded() { + return remainingQuantity, nil + } + + // when slice = 1000, if we only have 998, we should adjust our quantity to 998 + orderQuantity := fixedpoint.Min(e.sliceQuantity, remainingQuantity) + + // if the remaining quantity in the next round is not enough, we should merge the remaining quantity into this round + // if there are rest slices + nextRemainingQuantity := remainingQuantity.Sub(e.sliceQuantity) + + if nextRemainingQuantity.Sign() > 0 && e.market.IsDustQuantity(nextRemainingQuantity, price) { + orderQuantity = remainingQuantity + } + + orderQuantity = e.market.AdjustQuantityByMinNotional(orderQuantity, price) + return orderQuantity, nil +} + +func (e *FixedQuantityExecutor) generateOrder() (*types.SubmitOrder, error) { newPrice, err := e.getNewPrice() if err != nil { return nil, err } - minQuantity := e.market.MinQuantity - base := e.position.GetBase() - - restQuantity := e.targetQuantity.Sub(base.Abs()) - - if restQuantity.Sign() <= 0 { - if e.cancelContextIfTargetQuantityFilled() { - return nil, nil - } + orderQuantity, err := e.calculateNewOrderQuantity(newPrice) + if err != nil { + return nil, err } - if restQuantity.Compare(minQuantity) < 0 { - return nil, fmt.Errorf("can not continue placing orders, rest quantity %s is less than the min quantity %s", restQuantity.String(), minQuantity.String()) - } - - // when slice = 1000, if we only have 998, we should adjust our quantity to 998 - orderQuantity := fixedpoint.Min(e.sliceQuantity, restQuantity) - - // if the rest quantity in the next round is not enough, we should merge the rest quantity into this round - // if there are rest slices - nextRestQuantity := restQuantity.Sub(e.sliceQuantity) - if nextRestQuantity.Sign() > 0 && nextRestQuantity.Compare(minQuantity) < 0 { - orderQuantity = restQuantity - } - - minNotional := e.market.MinNotional - orderQuantity = bbgo.AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional) - balances, err := e.exchange.QueryAccountBalances(e.executionCtx) if err != nil { return nil, err @@ -446,67 +523,55 @@ func (e *FixedQuantityExecutor) generateOrder() (orderForm *types.SubmitOrder, e case types.SideTypeBuy: // check base balance for sell, try to sell as more as possible if b, ok := balances[e.market.QuoteCurrency]; ok { - orderQuantity = bbgo.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) + orderQuantity = e.market.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) } } - if e.deadlineTime != nil && !e.deadlineTime.IsZero() { - now := time.Now() - if now.After(*e.deadlineTime) { - orderForm = &types.SubmitOrder{ - Symbol: e.symbol, - Side: e.side, - Type: types.OrderTypeMarket, - Quantity: restQuantity, - Market: e.market, - } - return orderForm, nil - } + if e.isDeadlineExceeded() { + return &types.SubmitOrder{ + Symbol: e.symbol, + Side: e.side, + Type: types.OrderTypeMarket, + Quantity: orderQuantity, + Market: e.market, + }, nil } - orderForm = &types.SubmitOrder{ + return &types.SubmitOrder{ Symbol: e.symbol, Side: e.side, Type: types.OrderTypeLimitMaker, Quantity: orderQuantity, Price: newPrice, Market: e.market, - TimeInForce: "GTC", - } - - return orderForm, err + TimeInForce: types.TimeInForceGTC, + }, nil } func (e *FixedQuantityExecutor) Start(ctx context.Context) error { - if e.marketDataStream != nil { - return errors.New("market data stream is not nil, you can't start the executor twice") + if e.executionCtx != nil { + return errors.New("executionCtx is not nil, you can't start the executor twice") } e.executionCtx, e.cancelExecution = context.WithCancel(ctx) e.userDataStreamCtx, e.cancelUserDataStream = context.WithCancel(ctx) - e.marketDataStream = e.exchange.NewStream() - e.marketDataStream.SetPublicOnly() - e.marketDataStream.Subscribe(types.BookChannel, e.symbol, types.SubscribeOptions{ - Depth: types.DepthLevelMedium, - }) - - e.orderBook = types.NewStreamBook(e.symbol) - e.orderBook.BindStream(e.marketDataStream) - - // private channels - e.userDataStream = e.exchange.NewStream() - e.orderStore.BindStream(e.userDataStream) - e.activeMakerOrders = bbgo.NewActiveOrderBook(e.symbol) - e.activeMakerOrders.OnFilled(e.handleFilledOrder) - e.activeMakerOrders.BindStream(e.userDataStream) - e.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) { - e.logger.Info(trade.String()) - }) - e.tradeCollector.BindStream(e.userDataStream) - go e.connectMarketData(e.executionCtx) go e.connectUserData(e.userDataStreamCtx) + + e.logger.Infof("waiting for connections ready...") + if !selectSignalOrTimeout(ctx, e.marketDataStreamConnectC, 10*time.Second) { + e.cancelExecution() + return fmt.Errorf("market data stream connection timeout") + } + + if !selectSignalOrTimeout(ctx, e.userDataStreamConnectC, 10*time.Second) { + e.cancelExecution() + return fmt.Errorf("user data stream connection timeout") + } + + e.logger.Infof("connections ready, starting order updater...") + go e.orderUpdater(e.executionCtx) return nil } @@ -540,3 +605,14 @@ func (e *FixedQuantityExecutor) Shutdown(shutdownCtx context.Context) { } } } + +func selectSignalOrTimeout(ctx context.Context, c chan struct{}, timeout time.Duration) bool { + select { + case <-ctx.Done(): + return false + case <-time.After(timeout): + return false + case <-c: + return true + } +} diff --git a/pkg/twap/v2/stream_executor_test.go b/pkg/twap/v2/stream_executor_test.go index 4b6c2eaf8..842ef21c3 100644 --- a/pkg/twap/v2/stream_executor_test.go +++ b/pkg/twap/v2/stream_executor_test.go @@ -1,9 +1,120 @@ package twap import ( + "context" "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + "github.com/c9s/bbgo/pkg/fixedpoint" + . "github.com/c9s/bbgo/pkg/testing/testhelper" + "github.com/c9s/bbgo/pkg/types" + "github.com/c9s/bbgo/pkg/types/mocks" ) -func TestNewStreamExecutorV2(t *testing.T) { - +func getTestMarket() types.Market { + market := types.Market{ + Symbol: "BTCUSDT", + PricePrecision: 8, + VolumePrecision: 8, + QuoteCurrency: "USDT", + BaseCurrency: "BTC", + MinNotional: fixedpoint.MustNewFromString("0.001"), + MinAmount: fixedpoint.MustNewFromString("10.0"), + MinQuantity: fixedpoint.MustNewFromString("0.001"), + } + return market +} + +type CatchMatcher struct { + f func(x any) +} + +func Catch(f func(x any)) *CatchMatcher { + return &CatchMatcher{ + f: f, + } +} + +func (m *CatchMatcher) Matches(x interface{}) bool { + m.f(x) + return true +} + +func (m *CatchMatcher) String() string { + return "CatchMatcher" +} + +func TestNewStreamExecutor(t *testing.T) { + symbol := "BTCUSDT" + market := getTestMarket() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockEx := mocks.NewMockExchange(mockCtrl) + + marketDataStream := &types.StandardStream{} + userDataStream := &types.StandardStream{} + + mockMarketDataStream := mocks.NewMockStream(mockCtrl) + mockMarketDataStream.EXPECT().SetPublicOnly() + mockMarketDataStream.EXPECT().Subscribe(types.BookChannel, symbol, types.SubscribeOptions{ + Depth: types.DepthLevelMedium, + }) + mockMarketDataStream.EXPECT().OnBookSnapshot(Catch(func(x any) { + marketDataStream.OnBookSnapshot(x.(func(book types.SliceOrderBook))) + })).AnyTimes() + mockMarketDataStream.EXPECT().OnBookUpdate(Catch(func(x any) { + marketDataStream.OnBookUpdate(x.(func(book types.SliceOrderBook))) + })).AnyTimes() + mockMarketDataStream.EXPECT().OnConnect(Catch(func(x any) { + marketDataStream.OnConnect(x.(func())) + })).AnyTimes() + mockMarketDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx)) + + mockUserDataStream := mocks.NewMockStream(mockCtrl) + mockUserDataStream.EXPECT().OnOrderUpdate(Catch(func(x any) { + userDataStream.OnOrderUpdate(x.(func(order types.Order))) + })).AnyTimes() + mockUserDataStream.EXPECT().OnTradeUpdate(Catch(func(x any) { + userDataStream.OnTradeUpdate(x.(func(order types.Trade))) + })).AnyTimes() + mockUserDataStream.EXPECT().OnConnect(Catch(func(x any) { + userDataStream.OnConnect(x.(func())) + })).AnyTimes() + mockUserDataStream.EXPECT().OnAuth(Catch(func(x any) { + userDataStream.OnAuth(x.(func())) + })) + mockUserDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx)) + + mockEx.EXPECT().NewStream().Return(mockMarketDataStream) + mockEx.EXPECT().NewStream().Return(mockUserDataStream) + + executor := NewStreamExecutor(mockEx, symbol, market, types.SideTypeBuy, Number(100), Number(1)) + executor.SetUpdateInterval(time.Second) + + go func() { + err := executor.Start(ctx) + assert.NoError(t, err) + }() + + go func() { + time.Sleep(500 * time.Millisecond) + marketDataStream.EmitConnect() + userDataStream.EmitConnect() + userDataStream.EmitAuth() + }() + + select { + case <-ctx.Done(): + case <-time.After(10 * time.Second): + case <-executor.Done(): + } + t.Logf("executor done") } diff --git a/pkg/types/market.go b/pkg/types/market.go index c7135aff3..aae71bd1f 100644 --- a/pkg/types/market.go +++ b/pkg/types/market.go @@ -247,6 +247,19 @@ func (m Market) AdjustQuantityByMinNotional(quantity, currentPrice fixedpoint.Va return quantity } +// AdjustQuantityByMaxAmount adjusts the quantity to make the amount less than the given maxAmount +func (m Market) AdjustQuantityByMaxAmount(quantity, currentPrice, maxAmount fixedpoint.Value) fixedpoint.Value { + // modify quantity for the min amount + amount := currentPrice.Mul(quantity) + if amount.Compare(maxAmount) < 0 { + return quantity + } + + ratio := maxAmount.Div(amount) + quantity = quantity.Mul(ratio) + return m.TruncateQuantity(quantity) +} + type MarketMap map[string]Market func (m MarketMap) Add(market Market) { diff --git a/pkg/types/mocks/mock_stream.go b/pkg/types/mocks/mock_stream.go new file mode 100644 index 000000000..176484aea --- /dev/null +++ b/pkg/types/mocks/mock_stream.go @@ -0,0 +1,375 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/c9s/bbgo/pkg/types (interfaces: Stream) +// +// Generated by this command: +// +// mockgen -destination=mocks/mock_stream.go -package=mocks . Stream +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + types "github.com/c9s/bbgo/pkg/types" + gomock "go.uber.org/mock/gomock" +) + +// MockStream is a mock of Stream interface. +type MockStream struct { + ctrl *gomock.Controller + recorder *MockStreamMockRecorder +} + +// MockStreamMockRecorder is the mock recorder for MockStream. +type MockStreamMockRecorder struct { + mock *MockStream +} + +// NewMockStream creates a new mock instance. +func NewMockStream(ctrl *gomock.Controller) *MockStream { + mock := &MockStream{ctrl: ctrl} + mock.recorder = &MockStreamMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStream) EXPECT() *MockStreamMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockStream) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockStreamMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStream)(nil).Close)) +} + +// Connect mocks base method. +func (m *MockStream) Connect(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Connect", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Connect indicates an expected call of Connect. +func (mr *MockStreamMockRecorder) Connect(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connect", reflect.TypeOf((*MockStream)(nil).Connect), arg0) +} + +// GetPublicOnly mocks base method. +func (m *MockStream) GetPublicOnly() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPublicOnly") + ret0, _ := ret[0].(bool) + return ret0 +} + +// GetPublicOnly indicates an expected call of GetPublicOnly. +func (mr *MockStreamMockRecorder) GetPublicOnly() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPublicOnly", reflect.TypeOf((*MockStream)(nil).GetPublicOnly)) +} + +// GetSubscriptions mocks base method. +func (m *MockStream) GetSubscriptions() []types.Subscription { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSubscriptions") + ret0, _ := ret[0].([]types.Subscription) + return ret0 +} + +// GetSubscriptions indicates an expected call of GetSubscriptions. +func (mr *MockStreamMockRecorder) GetSubscriptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubscriptions", reflect.TypeOf((*MockStream)(nil).GetSubscriptions)) +} + +// OnAggTrade mocks base method. +func (m *MockStream) OnAggTrade(arg0 func(types.Trade)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnAggTrade", arg0) +} + +// OnAggTrade indicates an expected call of OnAggTrade. +func (mr *MockStreamMockRecorder) OnAggTrade(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnAggTrade", reflect.TypeOf((*MockStream)(nil).OnAggTrade), arg0) +} + +// OnAuth mocks base method. +func (m *MockStream) OnAuth(arg0 func()) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnAuth", arg0) +} + +// OnAuth indicates an expected call of OnAuth. +func (mr *MockStreamMockRecorder) OnAuth(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnAuth", reflect.TypeOf((*MockStream)(nil).OnAuth), arg0) +} + +// OnBalanceSnapshot mocks base method. +func (m *MockStream) OnBalanceSnapshot(arg0 func(types.BalanceMap)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBalanceSnapshot", arg0) +} + +// OnBalanceSnapshot indicates an expected call of OnBalanceSnapshot. +func (mr *MockStreamMockRecorder) OnBalanceSnapshot(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBalanceSnapshot", reflect.TypeOf((*MockStream)(nil).OnBalanceSnapshot), arg0) +} + +// OnBalanceUpdate mocks base method. +func (m *MockStream) OnBalanceUpdate(arg0 func(types.BalanceMap)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBalanceUpdate", arg0) +} + +// OnBalanceUpdate indicates an expected call of OnBalanceUpdate. +func (mr *MockStreamMockRecorder) OnBalanceUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBalanceUpdate", reflect.TypeOf((*MockStream)(nil).OnBalanceUpdate), arg0) +} + +// OnBookSnapshot mocks base method. +func (m *MockStream) OnBookSnapshot(arg0 func(types.SliceOrderBook)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBookSnapshot", arg0) +} + +// OnBookSnapshot indicates an expected call of OnBookSnapshot. +func (mr *MockStreamMockRecorder) OnBookSnapshot(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookSnapshot", reflect.TypeOf((*MockStream)(nil).OnBookSnapshot), arg0) +} + +// OnBookTickerUpdate mocks base method. +func (m *MockStream) OnBookTickerUpdate(arg0 func(types.BookTicker)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBookTickerUpdate", arg0) +} + +// OnBookTickerUpdate indicates an expected call of OnBookTickerUpdate. +func (mr *MockStreamMockRecorder) OnBookTickerUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookTickerUpdate", reflect.TypeOf((*MockStream)(nil).OnBookTickerUpdate), arg0) +} + +// OnBookUpdate mocks base method. +func (m *MockStream) OnBookUpdate(arg0 func(types.SliceOrderBook)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBookUpdate", arg0) +} + +// OnBookUpdate indicates an expected call of OnBookUpdate. +func (mr *MockStreamMockRecorder) OnBookUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookUpdate", reflect.TypeOf((*MockStream)(nil).OnBookUpdate), arg0) +} + +// OnConnect mocks base method. +func (m *MockStream) OnConnect(arg0 func()) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnConnect", arg0) +} + +// OnConnect indicates an expected call of OnConnect. +func (mr *MockStreamMockRecorder) OnConnect(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnConnect", reflect.TypeOf((*MockStream)(nil).OnConnect), arg0) +} + +// OnDisconnect mocks base method. +func (m *MockStream) OnDisconnect(arg0 func()) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnDisconnect", arg0) +} + +// OnDisconnect indicates an expected call of OnDisconnect. +func (mr *MockStreamMockRecorder) OnDisconnect(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnDisconnect", reflect.TypeOf((*MockStream)(nil).OnDisconnect), arg0) +} + +// OnForceOrder mocks base method. +func (m *MockStream) OnForceOrder(arg0 func(types.LiquidationInfo)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnForceOrder", arg0) +} + +// OnForceOrder indicates an expected call of OnForceOrder. +func (mr *MockStreamMockRecorder) OnForceOrder(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnForceOrder", reflect.TypeOf((*MockStream)(nil).OnForceOrder), arg0) +} + +// OnFuturesPositionSnapshot mocks base method. +func (m *MockStream) OnFuturesPositionSnapshot(arg0 func(types.FuturesPositionMap)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnFuturesPositionSnapshot", arg0) +} + +// OnFuturesPositionSnapshot indicates an expected call of OnFuturesPositionSnapshot. +func (mr *MockStreamMockRecorder) OnFuturesPositionSnapshot(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFuturesPositionSnapshot", reflect.TypeOf((*MockStream)(nil).OnFuturesPositionSnapshot), arg0) +} + +// OnFuturesPositionUpdate mocks base method. +func (m *MockStream) OnFuturesPositionUpdate(arg0 func(types.FuturesPositionMap)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnFuturesPositionUpdate", arg0) +} + +// OnFuturesPositionUpdate indicates an expected call of OnFuturesPositionUpdate. +func (mr *MockStreamMockRecorder) OnFuturesPositionUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFuturesPositionUpdate", reflect.TypeOf((*MockStream)(nil).OnFuturesPositionUpdate), arg0) +} + +// OnKLine mocks base method. +func (m *MockStream) OnKLine(arg0 func(types.KLine)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnKLine", arg0) +} + +// OnKLine indicates an expected call of OnKLine. +func (mr *MockStreamMockRecorder) OnKLine(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnKLine", reflect.TypeOf((*MockStream)(nil).OnKLine), arg0) +} + +// OnKLineClosed mocks base method. +func (m *MockStream) OnKLineClosed(arg0 func(types.KLine)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnKLineClosed", arg0) +} + +// OnKLineClosed indicates an expected call of OnKLineClosed. +func (mr *MockStreamMockRecorder) OnKLineClosed(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnKLineClosed", reflect.TypeOf((*MockStream)(nil).OnKLineClosed), arg0) +} + +// OnMarketTrade mocks base method. +func (m *MockStream) OnMarketTrade(arg0 func(types.Trade)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnMarketTrade", arg0) +} + +// OnMarketTrade indicates an expected call of OnMarketTrade. +func (mr *MockStreamMockRecorder) OnMarketTrade(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnMarketTrade", reflect.TypeOf((*MockStream)(nil).OnMarketTrade), arg0) +} + +// OnOrderUpdate mocks base method. +func (m *MockStream) OnOrderUpdate(arg0 func(types.Order)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnOrderUpdate", arg0) +} + +// OnOrderUpdate indicates an expected call of OnOrderUpdate. +func (mr *MockStreamMockRecorder) OnOrderUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnOrderUpdate", reflect.TypeOf((*MockStream)(nil).OnOrderUpdate), arg0) +} + +// OnRawMessage mocks base method. +func (m *MockStream) OnRawMessage(arg0 func([]byte)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnRawMessage", arg0) +} + +// OnRawMessage indicates an expected call of OnRawMessage. +func (mr *MockStreamMockRecorder) OnRawMessage(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnRawMessage", reflect.TypeOf((*MockStream)(nil).OnRawMessage), arg0) +} + +// OnStart mocks base method. +func (m *MockStream) OnStart(arg0 func()) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnStart", arg0) +} + +// OnStart indicates an expected call of OnStart. +func (mr *MockStreamMockRecorder) OnStart(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnStart", reflect.TypeOf((*MockStream)(nil).OnStart), arg0) +} + +// OnTradeUpdate mocks base method. +func (m *MockStream) OnTradeUpdate(arg0 func(types.Trade)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnTradeUpdate", arg0) +} + +// OnTradeUpdate indicates an expected call of OnTradeUpdate. +func (mr *MockStreamMockRecorder) OnTradeUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnTradeUpdate", reflect.TypeOf((*MockStream)(nil).OnTradeUpdate), arg0) +} + +// Reconnect mocks base method. +func (m *MockStream) Reconnect() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Reconnect") +} + +// Reconnect indicates an expected call of Reconnect. +func (mr *MockStreamMockRecorder) Reconnect() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reconnect", reflect.TypeOf((*MockStream)(nil).Reconnect)) +} + +// Resubscribe mocks base method. +func (m *MockStream) Resubscribe(arg0 func([]types.Subscription) ([]types.Subscription, error)) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Resubscribe", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Resubscribe indicates an expected call of Resubscribe. +func (mr *MockStreamMockRecorder) Resubscribe(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Resubscribe", reflect.TypeOf((*MockStream)(nil).Resubscribe), arg0) +} + +// SetPublicOnly mocks base method. +func (m *MockStream) SetPublicOnly() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetPublicOnly") +} + +// SetPublicOnly indicates an expected call of SetPublicOnly. +func (mr *MockStreamMockRecorder) SetPublicOnly() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetPublicOnly", reflect.TypeOf((*MockStream)(nil).SetPublicOnly)) +} + +// Subscribe mocks base method. +func (m *MockStream) Subscribe(arg0 types.Channel, arg1 string, arg2 types.SubscribeOptions) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Subscribe", arg0, arg1, arg2) +} + +// Subscribe indicates an expected call of Subscribe. +func (mr *MockStreamMockRecorder) Subscribe(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockStream)(nil).Subscribe), arg0, arg1, arg2) +} diff --git a/pkg/types/stream.go b/pkg/types/stream.go index a2246ed68..2da4df7dd 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -24,6 +24,7 @@ var defaultDialer = &websocket.Dialer{ ReadBufferSize: 4096, } +//go:generate mockgen -destination=mocks/mock_stream.go -package=mocks . Stream type Stream interface { StandardStreamEventHub