twap: add stream executor test

This commit is contained in:
c9s 2024-08-19 15:37:56 +08:00
parent 648e10fd7c
commit cec078f4bf
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
5 changed files with 655 additions and 79 deletions

View File

@ -90,9 +90,11 @@ type FixedQuantityExecutor struct {
market types.Market market types.Market
marketDataStream types.Stream marketDataStream types.Stream
orderBook *types.StreamOrderBook orderBook *types.StreamOrderBook
userDataStream types.Stream userDataStream types.Stream
activeMakerOrders *bbgo.ActiveOrderBook activeMakerOrders *bbgo.ActiveOrderBook
orderStore *core.OrderStore orderStore *core.OrderStore
position *types.Position position *types.Position
@ -102,6 +104,8 @@ type FixedQuantityExecutor struct {
mu sync.Mutex mu sync.Mutex
userDataStreamConnectC chan struct{}
marketDataStreamConnectC chan struct{}
done *DoneSignal done *DoneSignal
} }
@ -112,10 +116,25 @@ func NewStreamExecutor(
side types.SideType, side types.SideType,
targetQuantity, sliceQuantity fixedpoint.Value, targetQuantity, sliceQuantity fixedpoint.Value,
) *FixedQuantityExecutor { ) *FixedQuantityExecutor {
marketDataStream := exchange.NewStream()
marketDataStream.SetPublicOnly()
marketDataStream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{
Depth: types.DepthLevelMedium,
})
orderBook := types.NewStreamBook(symbol)
orderBook.BindStream(marketDataStream)
userDataStream := exchange.NewStream()
orderStore := core.NewOrderStore(symbol) orderStore := core.NewOrderStore(symbol)
position := types.NewPositionFromMarket(market) position := types.NewPositionFromMarket(market)
tradeCollector := core.NewTradeCollector(symbol, position, orderStore) tradeCollector := core.NewTradeCollector(symbol, position, orderStore)
return &FixedQuantityExecutor{ orderStore.BindStream(userDataStream)
activeMakerOrders := bbgo.NewActiveOrderBook(symbol)
e := &FixedQuantityExecutor{
exchange: exchange, exchange: exchange,
symbol: symbol, symbol: symbol,
side: side, side: side,
@ -128,11 +147,43 @@ func NewStreamExecutor(
"symbol": symbol, "symbol": symbol,
}), }),
marketDataStream: marketDataStream,
orderBook: orderBook,
userDataStream: userDataStream,
activeMakerOrders: activeMakerOrders,
orderStore: orderStore, orderStore: orderStore,
tradeCollector: tradeCollector, tradeCollector: tradeCollector,
position: position, position: position,
done: NewDoneSignal(), done: NewDoneSignal(),
userDataStreamConnectC: make(chan struct{}),
marketDataStreamConnectC: make(chan struct{}),
} }
e.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) {
e.logger.Info(trade.String())
})
e.tradeCollector.BindStream(e.userDataStream)
activeMakerOrders.OnFilled(e.handleFilledOrder)
activeMakerOrders.BindStream(e.userDataStream)
e.marketDataStream.OnConnect(func() {
e.logger.Info("market data stream on connect")
close(e.marketDataStreamConnectC)
e.logger.Infof("marketDataStreamConnectC closed")
})
// private channels
e.userDataStream.OnAuth(func() {
e.logger.Info("user data stream on auth")
close(e.userDataStreamConnectC)
e.logger.Info("userDataStreamConnectC closed")
})
return e
} }
func (e *FixedQuantityExecutor) SetDeadlineTime(t time.Time) { func (e *FixedQuantityExecutor) SetDeadlineTime(t time.Time) {
@ -270,11 +321,11 @@ func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error {
tickSpread := tickSize.Mul(numOfTicks) tickSpread := tickSize.Mul(numOfTicks)
// check and see if we need to cancel the existing active orders // check and see if we need to cancel the existing active orders
for e.activeMakerOrders.NumOfOrders() > 0 { for e.activeMakerOrders.NumOfOrders() > 0 {
orders := e.activeMakerOrders.Orders() orders := e.activeMakerOrders.Orders()
if len(orders) > 1 { if len(orders) > 1 {
logrus.Warnf("more than 1 %s open orders in the strategy...", e.symbol) e.logger.Warnf("found more than 1 %s open orders on the orderbook", e.symbol)
} }
// get the first active order // get the first active order
@ -397,39 +448,65 @@ func (e *FixedQuantityExecutor) getNewPrice() (fixedpoint.Value, error) {
return newPrice, nil return newPrice, nil
} }
func (e *FixedQuantityExecutor) generateOrder() (orderForm *types.SubmitOrder, err error) { func (e *FixedQuantityExecutor) getRemainingQuantity() fixedpoint.Value {
base := e.position.GetBase()
return e.targetQuantity.Sub(base.Abs())
}
func (e *FixedQuantityExecutor) isDeadlineExceeded() bool {
if e.deadlineTime != nil && !e.deadlineTime.IsZero() {
return time.Since(*e.deadlineTime) > 0
}
return false
}
func (e *FixedQuantityExecutor) calculateNewOrderQuantity(price fixedpoint.Value) (fixedpoint.Value, error) {
minQuantity := e.market.MinQuantity
remainingQuantity := e.getRemainingQuantity()
if remainingQuantity.Sign() <= 0 {
e.cancelExecution()
return fixedpoint.Zero, nil
}
if remainingQuantity.Compare(minQuantity) < 0 {
e.logger.Warnf("can not continue placing orders, the remaining quantity %s is less than the min quantity %s", remainingQuantity.String(), minQuantity.String())
e.cancelExecution()
return fixedpoint.Zero, nil
}
// if deadline exceeded, we should return the remaining quantity
if e.isDeadlineExceeded() {
return remainingQuantity, nil
}
// when slice = 1000, if we only have 998, we should adjust our quantity to 998
orderQuantity := fixedpoint.Min(e.sliceQuantity, remainingQuantity)
// if the remaining quantity in the next round is not enough, we should merge the remaining quantity into this round
// if there are rest slices
nextRemainingQuantity := remainingQuantity.Sub(e.sliceQuantity)
if nextRemainingQuantity.Sign() > 0 && e.market.IsDustQuantity(nextRemainingQuantity, price) {
orderQuantity = remainingQuantity
}
orderQuantity = e.market.AdjustQuantityByMinNotional(orderQuantity, price)
return orderQuantity, nil
}
func (e *FixedQuantityExecutor) generateOrder() (*types.SubmitOrder, error) {
newPrice, err := e.getNewPrice() newPrice, err := e.getNewPrice()
if err != nil { if err != nil {
return nil, err return nil, err
} }
minQuantity := e.market.MinQuantity orderQuantity, err := e.calculateNewOrderQuantity(newPrice)
base := e.position.GetBase() if err != nil {
return nil, err
restQuantity := e.targetQuantity.Sub(base.Abs())
if restQuantity.Sign() <= 0 {
if e.cancelContextIfTargetQuantityFilled() {
return nil, nil
} }
}
if restQuantity.Compare(minQuantity) < 0 {
return nil, fmt.Errorf("can not continue placing orders, rest quantity %s is less than the min quantity %s", restQuantity.String(), minQuantity.String())
}
// when slice = 1000, if we only have 998, we should adjust our quantity to 998
orderQuantity := fixedpoint.Min(e.sliceQuantity, restQuantity)
// if the rest quantity in the next round is not enough, we should merge the rest quantity into this round
// if there are rest slices
nextRestQuantity := restQuantity.Sub(e.sliceQuantity)
if nextRestQuantity.Sign() > 0 && nextRestQuantity.Compare(minQuantity) < 0 {
orderQuantity = restQuantity
}
minNotional := e.market.MinNotional
orderQuantity = bbgo.AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional)
balances, err := e.exchange.QueryAccountBalances(e.executionCtx) balances, err := e.exchange.QueryAccountBalances(e.executionCtx)
if err != nil { if err != nil {
@ -446,67 +523,55 @@ func (e *FixedQuantityExecutor) generateOrder() (orderForm *types.SubmitOrder, e
case types.SideTypeBuy: case types.SideTypeBuy:
// check base balance for sell, try to sell as more as possible // check base balance for sell, try to sell as more as possible
if b, ok := balances[e.market.QuoteCurrency]; ok { if b, ok := balances[e.market.QuoteCurrency]; ok {
orderQuantity = bbgo.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) orderQuantity = e.market.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available)
} }
} }
if e.deadlineTime != nil && !e.deadlineTime.IsZero() { if e.isDeadlineExceeded() {
now := time.Now() return &types.SubmitOrder{
if now.After(*e.deadlineTime) {
orderForm = &types.SubmitOrder{
Symbol: e.symbol, Symbol: e.symbol,
Side: e.side, Side: e.side,
Type: types.OrderTypeMarket, Type: types.OrderTypeMarket,
Quantity: restQuantity, Quantity: orderQuantity,
Market: e.market, Market: e.market,
} }, nil
return orderForm, nil
}
} }
orderForm = &types.SubmitOrder{ return &types.SubmitOrder{
Symbol: e.symbol, Symbol: e.symbol,
Side: e.side, Side: e.side,
Type: types.OrderTypeLimitMaker, Type: types.OrderTypeLimitMaker,
Quantity: orderQuantity, Quantity: orderQuantity,
Price: newPrice, Price: newPrice,
Market: e.market, Market: e.market,
TimeInForce: "GTC", TimeInForce: types.TimeInForceGTC,
} }, nil
return orderForm, err
} }
func (e *FixedQuantityExecutor) Start(ctx context.Context) error { func (e *FixedQuantityExecutor) Start(ctx context.Context) error {
if e.marketDataStream != nil { if e.executionCtx != nil {
return errors.New("market data stream is not nil, you can't start the executor twice") return errors.New("executionCtx is not nil, you can't start the executor twice")
} }
e.executionCtx, e.cancelExecution = context.WithCancel(ctx) e.executionCtx, e.cancelExecution = context.WithCancel(ctx)
e.userDataStreamCtx, e.cancelUserDataStream = context.WithCancel(ctx) e.userDataStreamCtx, e.cancelUserDataStream = context.WithCancel(ctx)
e.marketDataStream = e.exchange.NewStream()
e.marketDataStream.SetPublicOnly()
e.marketDataStream.Subscribe(types.BookChannel, e.symbol, types.SubscribeOptions{
Depth: types.DepthLevelMedium,
})
e.orderBook = types.NewStreamBook(e.symbol)
e.orderBook.BindStream(e.marketDataStream)
// private channels
e.userDataStream = e.exchange.NewStream()
e.orderStore.BindStream(e.userDataStream)
e.activeMakerOrders = bbgo.NewActiveOrderBook(e.symbol)
e.activeMakerOrders.OnFilled(e.handleFilledOrder)
e.activeMakerOrders.BindStream(e.userDataStream)
e.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) {
e.logger.Info(trade.String())
})
e.tradeCollector.BindStream(e.userDataStream)
go e.connectMarketData(e.executionCtx) go e.connectMarketData(e.executionCtx)
go e.connectUserData(e.userDataStreamCtx) go e.connectUserData(e.userDataStreamCtx)
e.logger.Infof("waiting for connections ready...")
if !selectSignalOrTimeout(ctx, e.marketDataStreamConnectC, 10*time.Second) {
e.cancelExecution()
return fmt.Errorf("market data stream connection timeout")
}
if !selectSignalOrTimeout(ctx, e.userDataStreamConnectC, 10*time.Second) {
e.cancelExecution()
return fmt.Errorf("user data stream connection timeout")
}
e.logger.Infof("connections ready, starting order updater...")
go e.orderUpdater(e.executionCtx) go e.orderUpdater(e.executionCtx)
return nil return nil
} }
@ -540,3 +605,14 @@ func (e *FixedQuantityExecutor) Shutdown(shutdownCtx context.Context) {
} }
} }
} }
func selectSignalOrTimeout(ctx context.Context, c chan struct{}, timeout time.Duration) bool {
select {
case <-ctx.Done():
return false
case <-time.After(timeout):
return false
case <-c:
return true
}
}

View File

@ -1,9 +1,120 @@
package twap package twap
import ( import (
"context"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
"github.com/c9s/bbgo/pkg/fixedpoint"
. "github.com/c9s/bbgo/pkg/testing/testhelper"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/types/mocks"
) )
func TestNewStreamExecutorV2(t *testing.T) { func getTestMarket() types.Market {
market := types.Market{
Symbol: "BTCUSDT",
PricePrecision: 8,
VolumePrecision: 8,
QuoteCurrency: "USDT",
BaseCurrency: "BTC",
MinNotional: fixedpoint.MustNewFromString("0.001"),
MinAmount: fixedpoint.MustNewFromString("10.0"),
MinQuantity: fixedpoint.MustNewFromString("0.001"),
}
return market
}
type CatchMatcher struct {
f func(x any)
}
func Catch(f func(x any)) *CatchMatcher {
return &CatchMatcher{
f: f,
}
}
func (m *CatchMatcher) Matches(x interface{}) bool {
m.f(x)
return true
}
func (m *CatchMatcher) String() string {
return "CatchMatcher"
}
func TestNewStreamExecutor(t *testing.T) {
symbol := "BTCUSDT"
market := getTestMarket()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockCtrl := gomock.NewController(t)
defer mockCtrl.Finish()
mockEx := mocks.NewMockExchange(mockCtrl)
marketDataStream := &types.StandardStream{}
userDataStream := &types.StandardStream{}
mockMarketDataStream := mocks.NewMockStream(mockCtrl)
mockMarketDataStream.EXPECT().SetPublicOnly()
mockMarketDataStream.EXPECT().Subscribe(types.BookChannel, symbol, types.SubscribeOptions{
Depth: types.DepthLevelMedium,
})
mockMarketDataStream.EXPECT().OnBookSnapshot(Catch(func(x any) {
marketDataStream.OnBookSnapshot(x.(func(book types.SliceOrderBook)))
})).AnyTimes()
mockMarketDataStream.EXPECT().OnBookUpdate(Catch(func(x any) {
marketDataStream.OnBookUpdate(x.(func(book types.SliceOrderBook)))
})).AnyTimes()
mockMarketDataStream.EXPECT().OnConnect(Catch(func(x any) {
marketDataStream.OnConnect(x.(func()))
})).AnyTimes()
mockMarketDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx))
mockUserDataStream := mocks.NewMockStream(mockCtrl)
mockUserDataStream.EXPECT().OnOrderUpdate(Catch(func(x any) {
userDataStream.OnOrderUpdate(x.(func(order types.Order)))
})).AnyTimes()
mockUserDataStream.EXPECT().OnTradeUpdate(Catch(func(x any) {
userDataStream.OnTradeUpdate(x.(func(order types.Trade)))
})).AnyTimes()
mockUserDataStream.EXPECT().OnConnect(Catch(func(x any) {
userDataStream.OnConnect(x.(func()))
})).AnyTimes()
mockUserDataStream.EXPECT().OnAuth(Catch(func(x any) {
userDataStream.OnAuth(x.(func()))
}))
mockUserDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx))
mockEx.EXPECT().NewStream().Return(mockMarketDataStream)
mockEx.EXPECT().NewStream().Return(mockUserDataStream)
executor := NewStreamExecutor(mockEx, symbol, market, types.SideTypeBuy, Number(100), Number(1))
executor.SetUpdateInterval(time.Second)
go func() {
err := executor.Start(ctx)
assert.NoError(t, err)
}()
go func() {
time.Sleep(500 * time.Millisecond)
marketDataStream.EmitConnect()
userDataStream.EmitConnect()
userDataStream.EmitAuth()
}()
select {
case <-ctx.Done():
case <-time.After(10 * time.Second):
case <-executor.Done():
}
t.Logf("executor done")
} }

View File

@ -247,6 +247,19 @@ func (m Market) AdjustQuantityByMinNotional(quantity, currentPrice fixedpoint.Va
return quantity return quantity
} }
// AdjustQuantityByMaxAmount adjusts the quantity to make the amount less than the given maxAmount
func (m Market) AdjustQuantityByMaxAmount(quantity, currentPrice, maxAmount fixedpoint.Value) fixedpoint.Value {
// modify quantity for the min amount
amount := currentPrice.Mul(quantity)
if amount.Compare(maxAmount) < 0 {
return quantity
}
ratio := maxAmount.Div(amount)
quantity = quantity.Mul(ratio)
return m.TruncateQuantity(quantity)
}
type MarketMap map[string]Market type MarketMap map[string]Market
func (m MarketMap) Add(market Market) { func (m MarketMap) Add(market Market) {

View File

@ -0,0 +1,375 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/c9s/bbgo/pkg/types (interfaces: Stream)
//
// Generated by this command:
//
// mockgen -destination=mocks/mock_stream.go -package=mocks . Stream
//
// Package mocks is a generated GoMock package.
package mocks
import (
context "context"
reflect "reflect"
types "github.com/c9s/bbgo/pkg/types"
gomock "go.uber.org/mock/gomock"
)
// MockStream is a mock of Stream interface.
type MockStream struct {
ctrl *gomock.Controller
recorder *MockStreamMockRecorder
}
// MockStreamMockRecorder is the mock recorder for MockStream.
type MockStreamMockRecorder struct {
mock *MockStream
}
// NewMockStream creates a new mock instance.
func NewMockStream(ctrl *gomock.Controller) *MockStream {
mock := &MockStream{ctrl: ctrl}
mock.recorder = &MockStreamMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockStream) EXPECT() *MockStreamMockRecorder {
return m.recorder
}
// Close mocks base method.
func (m *MockStream) Close() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockStreamMockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStream)(nil).Close))
}
// Connect mocks base method.
func (m *MockStream) Connect(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Connect", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Connect indicates an expected call of Connect.
func (mr *MockStreamMockRecorder) Connect(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connect", reflect.TypeOf((*MockStream)(nil).Connect), arg0)
}
// GetPublicOnly mocks base method.
func (m *MockStream) GetPublicOnly() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetPublicOnly")
ret0, _ := ret[0].(bool)
return ret0
}
// GetPublicOnly indicates an expected call of GetPublicOnly.
func (mr *MockStreamMockRecorder) GetPublicOnly() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPublicOnly", reflect.TypeOf((*MockStream)(nil).GetPublicOnly))
}
// GetSubscriptions mocks base method.
func (m *MockStream) GetSubscriptions() []types.Subscription {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetSubscriptions")
ret0, _ := ret[0].([]types.Subscription)
return ret0
}
// GetSubscriptions indicates an expected call of GetSubscriptions.
func (mr *MockStreamMockRecorder) GetSubscriptions() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubscriptions", reflect.TypeOf((*MockStream)(nil).GetSubscriptions))
}
// OnAggTrade mocks base method.
func (m *MockStream) OnAggTrade(arg0 func(types.Trade)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnAggTrade", arg0)
}
// OnAggTrade indicates an expected call of OnAggTrade.
func (mr *MockStreamMockRecorder) OnAggTrade(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnAggTrade", reflect.TypeOf((*MockStream)(nil).OnAggTrade), arg0)
}
// OnAuth mocks base method.
func (m *MockStream) OnAuth(arg0 func()) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnAuth", arg0)
}
// OnAuth indicates an expected call of OnAuth.
func (mr *MockStreamMockRecorder) OnAuth(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnAuth", reflect.TypeOf((*MockStream)(nil).OnAuth), arg0)
}
// OnBalanceSnapshot mocks base method.
func (m *MockStream) OnBalanceSnapshot(arg0 func(types.BalanceMap)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnBalanceSnapshot", arg0)
}
// OnBalanceSnapshot indicates an expected call of OnBalanceSnapshot.
func (mr *MockStreamMockRecorder) OnBalanceSnapshot(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBalanceSnapshot", reflect.TypeOf((*MockStream)(nil).OnBalanceSnapshot), arg0)
}
// OnBalanceUpdate mocks base method.
func (m *MockStream) OnBalanceUpdate(arg0 func(types.BalanceMap)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnBalanceUpdate", arg0)
}
// OnBalanceUpdate indicates an expected call of OnBalanceUpdate.
func (mr *MockStreamMockRecorder) OnBalanceUpdate(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBalanceUpdate", reflect.TypeOf((*MockStream)(nil).OnBalanceUpdate), arg0)
}
// OnBookSnapshot mocks base method.
func (m *MockStream) OnBookSnapshot(arg0 func(types.SliceOrderBook)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnBookSnapshot", arg0)
}
// OnBookSnapshot indicates an expected call of OnBookSnapshot.
func (mr *MockStreamMockRecorder) OnBookSnapshot(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookSnapshot", reflect.TypeOf((*MockStream)(nil).OnBookSnapshot), arg0)
}
// OnBookTickerUpdate mocks base method.
func (m *MockStream) OnBookTickerUpdate(arg0 func(types.BookTicker)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnBookTickerUpdate", arg0)
}
// OnBookTickerUpdate indicates an expected call of OnBookTickerUpdate.
func (mr *MockStreamMockRecorder) OnBookTickerUpdate(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookTickerUpdate", reflect.TypeOf((*MockStream)(nil).OnBookTickerUpdate), arg0)
}
// OnBookUpdate mocks base method.
func (m *MockStream) OnBookUpdate(arg0 func(types.SliceOrderBook)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnBookUpdate", arg0)
}
// OnBookUpdate indicates an expected call of OnBookUpdate.
func (mr *MockStreamMockRecorder) OnBookUpdate(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookUpdate", reflect.TypeOf((*MockStream)(nil).OnBookUpdate), arg0)
}
// OnConnect mocks base method.
func (m *MockStream) OnConnect(arg0 func()) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnConnect", arg0)
}
// OnConnect indicates an expected call of OnConnect.
func (mr *MockStreamMockRecorder) OnConnect(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnConnect", reflect.TypeOf((*MockStream)(nil).OnConnect), arg0)
}
// OnDisconnect mocks base method.
func (m *MockStream) OnDisconnect(arg0 func()) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnDisconnect", arg0)
}
// OnDisconnect indicates an expected call of OnDisconnect.
func (mr *MockStreamMockRecorder) OnDisconnect(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnDisconnect", reflect.TypeOf((*MockStream)(nil).OnDisconnect), arg0)
}
// OnForceOrder mocks base method.
func (m *MockStream) OnForceOrder(arg0 func(types.LiquidationInfo)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnForceOrder", arg0)
}
// OnForceOrder indicates an expected call of OnForceOrder.
func (mr *MockStreamMockRecorder) OnForceOrder(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnForceOrder", reflect.TypeOf((*MockStream)(nil).OnForceOrder), arg0)
}
// OnFuturesPositionSnapshot mocks base method.
func (m *MockStream) OnFuturesPositionSnapshot(arg0 func(types.FuturesPositionMap)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnFuturesPositionSnapshot", arg0)
}
// OnFuturesPositionSnapshot indicates an expected call of OnFuturesPositionSnapshot.
func (mr *MockStreamMockRecorder) OnFuturesPositionSnapshot(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFuturesPositionSnapshot", reflect.TypeOf((*MockStream)(nil).OnFuturesPositionSnapshot), arg0)
}
// OnFuturesPositionUpdate mocks base method.
func (m *MockStream) OnFuturesPositionUpdate(arg0 func(types.FuturesPositionMap)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnFuturesPositionUpdate", arg0)
}
// OnFuturesPositionUpdate indicates an expected call of OnFuturesPositionUpdate.
func (mr *MockStreamMockRecorder) OnFuturesPositionUpdate(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFuturesPositionUpdate", reflect.TypeOf((*MockStream)(nil).OnFuturesPositionUpdate), arg0)
}
// OnKLine mocks base method.
func (m *MockStream) OnKLine(arg0 func(types.KLine)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnKLine", arg0)
}
// OnKLine indicates an expected call of OnKLine.
func (mr *MockStreamMockRecorder) OnKLine(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnKLine", reflect.TypeOf((*MockStream)(nil).OnKLine), arg0)
}
// OnKLineClosed mocks base method.
func (m *MockStream) OnKLineClosed(arg0 func(types.KLine)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnKLineClosed", arg0)
}
// OnKLineClosed indicates an expected call of OnKLineClosed.
func (mr *MockStreamMockRecorder) OnKLineClosed(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnKLineClosed", reflect.TypeOf((*MockStream)(nil).OnKLineClosed), arg0)
}
// OnMarketTrade mocks base method.
func (m *MockStream) OnMarketTrade(arg0 func(types.Trade)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnMarketTrade", arg0)
}
// OnMarketTrade indicates an expected call of OnMarketTrade.
func (mr *MockStreamMockRecorder) OnMarketTrade(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnMarketTrade", reflect.TypeOf((*MockStream)(nil).OnMarketTrade), arg0)
}
// OnOrderUpdate mocks base method.
func (m *MockStream) OnOrderUpdate(arg0 func(types.Order)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnOrderUpdate", arg0)
}
// OnOrderUpdate indicates an expected call of OnOrderUpdate.
func (mr *MockStreamMockRecorder) OnOrderUpdate(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnOrderUpdate", reflect.TypeOf((*MockStream)(nil).OnOrderUpdate), arg0)
}
// OnRawMessage mocks base method.
func (m *MockStream) OnRawMessage(arg0 func([]byte)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnRawMessage", arg0)
}
// OnRawMessage indicates an expected call of OnRawMessage.
func (mr *MockStreamMockRecorder) OnRawMessage(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnRawMessage", reflect.TypeOf((*MockStream)(nil).OnRawMessage), arg0)
}
// OnStart mocks base method.
func (m *MockStream) OnStart(arg0 func()) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnStart", arg0)
}
// OnStart indicates an expected call of OnStart.
func (mr *MockStreamMockRecorder) OnStart(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnStart", reflect.TypeOf((*MockStream)(nil).OnStart), arg0)
}
// OnTradeUpdate mocks base method.
func (m *MockStream) OnTradeUpdate(arg0 func(types.Trade)) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "OnTradeUpdate", arg0)
}
// OnTradeUpdate indicates an expected call of OnTradeUpdate.
func (mr *MockStreamMockRecorder) OnTradeUpdate(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnTradeUpdate", reflect.TypeOf((*MockStream)(nil).OnTradeUpdate), arg0)
}
// Reconnect mocks base method.
func (m *MockStream) Reconnect() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Reconnect")
}
// Reconnect indicates an expected call of Reconnect.
func (mr *MockStreamMockRecorder) Reconnect() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reconnect", reflect.TypeOf((*MockStream)(nil).Reconnect))
}
// Resubscribe mocks base method.
func (m *MockStream) Resubscribe(arg0 func([]types.Subscription) ([]types.Subscription, error)) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Resubscribe", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Resubscribe indicates an expected call of Resubscribe.
func (mr *MockStreamMockRecorder) Resubscribe(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Resubscribe", reflect.TypeOf((*MockStream)(nil).Resubscribe), arg0)
}
// SetPublicOnly mocks base method.
func (m *MockStream) SetPublicOnly() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetPublicOnly")
}
// SetPublicOnly indicates an expected call of SetPublicOnly.
func (mr *MockStreamMockRecorder) SetPublicOnly() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetPublicOnly", reflect.TypeOf((*MockStream)(nil).SetPublicOnly))
}
// Subscribe mocks base method.
func (m *MockStream) Subscribe(arg0 types.Channel, arg1 string, arg2 types.SubscribeOptions) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Subscribe", arg0, arg1, arg2)
}
// Subscribe indicates an expected call of Subscribe.
func (mr *MockStreamMockRecorder) Subscribe(arg0, arg1, arg2 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockStream)(nil).Subscribe), arg0, arg1, arg2)
}

View File

@ -24,6 +24,7 @@ var defaultDialer = &websocket.Dialer{
ReadBufferSize: 4096, ReadBufferSize: 4096,
} }
//go:generate mockgen -destination=mocks/mock_stream.go -package=mocks . Stream
type Stream interface { type Stream interface {
StandardStreamEventHub StandardStreamEventHub