From f6e58ded026e95d102676c508fed76ad5342ae64 Mon Sep 17 00:00:00 2001 From: edwin Date: Mon, 30 Sep 2024 21:26:41 +0800 Subject: [PATCH 1/5] pkg/exchange: add PollAndGet method and declare an interface --- pkg/exchange/bybit/market_info_poller.go | 24 +++++++++++++++---- pkg/exchange/bybit/market_info_poller_test.go | 10 ++++---- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/pkg/exchange/bybit/market_info_poller.go b/pkg/exchange/bybit/market_info_poller.go index e35a1e4c2..4ce13cc9c 100644 --- a/pkg/exchange/bybit/market_info_poller.go +++ b/pkg/exchange/bybit/market_info_poller.go @@ -21,6 +21,12 @@ var ( pollFeeRateRateLimiter = rate.NewLimiter(rate.Every(10*time.Minute), 1) ) +type FeeRatePoller interface { + Start(ctx context.Context) + Get(symbol string) (symbolFeeDetail, bool) + Poll(ctx context.Context) error +} + type symbolFeeDetail struct { bybitapi.FeeRate @@ -34,6 +40,9 @@ type feeRatePoller struct { once sync.Once client MarketInfoProvider + // lastSyncTime is the last time the fee rate was updated. + lastSyncTime time.Time + symbolFeeDetail map[string]symbolFeeDetail } @@ -51,7 +60,7 @@ func (p *feeRatePoller) Start(ctx context.Context) { } func (p *feeRatePoller) startLoop(ctx context.Context) { - err := p.poll(ctx) + err := p.Poll(ctx) if err != nil { log.WithError(err).Warn("failed to initialize the fee rate, the ticker is scheduled to update it subsequently") } @@ -67,22 +76,27 @@ func (p *feeRatePoller) startLoop(ctx context.Context) { return case <-ticker.C: - if err := p.poll(ctx); err != nil { + if err := p.Poll(ctx); err != nil { log.WithError(err).Warn("failed to update fee rate") } } } } -func (p *feeRatePoller) poll(ctx context.Context) error { +func (p *feeRatePoller) Poll(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + // the poll will be called frequently, so we need to check the last sync time. + if time.Since(p.lastSyncTime) < feeRatePollingPeriod { + return nil + } symbolFeeRate, err := p.getAllFeeRates(ctx) if err != nil { return err } - p.mu.Lock() p.symbolFeeDetail = symbolFeeRate - p.mu.Unlock() + p.lastSyncTime = time.Now() if pollFeeRateRateLimiter.Allow() { log.Infof("updated fee rate: %+v", p.symbolFeeDetail) diff --git a/pkg/exchange/bybit/market_info_poller_test.go b/pkg/exchange/bybit/market_info_poller_test.go index 4d7a7aa07..8770bcc2a 100644 --- a/pkg/exchange/bybit/market_info_poller_test.go +++ b/pkg/exchange/bybit/market_info_poller_test.go @@ -3,16 +3,14 @@ package bybit import ( "context" "fmt" - "testing" - - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "go.uber.org/mock/gomock" - "github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi" "github.com/c9s/bbgo/pkg/exchange/bybit/mocks" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + "testing" ) func TestFeeRatePoller_getAllFeeRates(t *testing.T) { From d2c1ae064282f0a2ac60dd52d18ed59603e61824 Mon Sep 17 00:00:00 2001 From: edwin Date: Mon, 30 Sep 2024 21:27:05 +0800 Subject: [PATCH 2/5] pkg/exchange: move fee rate calculate method outside --- pkg/exchange/bybit/stream.go | 74 ++++++++++++++++++------------- pkg/exchange/bybit/stream_test.go | 2 +- pkg/exchange/bybit/types_test.go | 8 ++-- 3 files changed, 48 insertions(+), 36 deletions(-) diff --git a/pkg/exchange/bybit/stream.go b/pkg/exchange/bybit/stream.go index fd09ec4c8..eaccd6c62 100644 --- a/pkg/exchange/bybit/stream.go +++ b/pkg/exchange/bybit/stream.go @@ -59,7 +59,7 @@ type Stream struct { key, secret string streamDataProvider StreamDataProvider - feeRateProvider *feeRatePoller + feeRateProvider FeeRatePoller marketsInfo types.MarketMap bookEventCallbacks []func(e BookEvent) @@ -70,14 +70,14 @@ type Stream struct { tradeEventCallbacks []func(e []TradeEvent) } -func NewStream(key, secret string, userDataProvider StreamDataProvider) *Stream { +func NewStream(key, secret string, userDataProvider StreamDataProvider, poller FeeRatePoller) *Stream { stream := &Stream{ StandardStream: types.NewStandardStream(), // pragma: allowlist nextline secret key: key, secret: secret, streamDataProvider: userDataProvider, - feeRateProvider: newFeeRatePoller(userDataProvider), + feeRateProvider: poller, } stream.SetEndpointCreator(stream.createEndpoint) @@ -439,37 +439,49 @@ func (s *Stream) handleKLineEvent(klineEvent KLineEvent) { } } -func (s *Stream) handleTradeEvent(events []TradeEvent) { - for _, event := range events { - feeRate, found := s.feeRateProvider.Get(event.Symbol) - if !found { - feeRate = symbolFeeDetail{ - FeeRate: bybitapi.FeeRate{ - Symbol: event.Symbol, - TakerFeeRate: defaultTakerFee, - MakerFeeRate: defaultMakerFee, - }, - BaseCoin: "", - QuoteCoin: "", - } +func pollAndGetFeeRate(ctx context.Context, symbol string, poller FeeRatePoller, marketsInfo types.MarketMap) (symbolFeeDetail, error) { + err := poller.Poll(ctx) + if err != nil { + return symbolFeeDetail{}, err + } + return getFeeRate(symbol, poller, marketsInfo), nil +} - if market, ok := s.marketsInfo[event.Symbol]; ok { - feeRate.BaseCoin = market.BaseCurrency - feeRate.QuoteCoin = market.QuoteCurrency - } - - if tradeLogLimiter.Allow() { - // The error log level was utilized due to a detected discrepancy in the fee calculations. - log.Errorf("failed to get %s fee rate, use default taker fee %f, maker fee %f, base coin: %s, quote coin: %s", - event.Symbol, - feeRate.TakerFeeRate.Float64(), - feeRate.MakerFeeRate.Float64(), - feeRate.BaseCoin, - feeRate.QuoteCoin, - ) - } +func getFeeRate(symbol string, poller FeeRatePoller, marketsInfo types.MarketMap) symbolFeeDetail { + feeRate, found := poller.Get(symbol) + if !found { + feeRate = symbolFeeDetail{ + FeeRate: bybitapi.FeeRate{ + Symbol: symbol, + TakerFeeRate: defaultTakerFee, + MakerFeeRate: defaultMakerFee, + }, + BaseCoin: "", + QuoteCoin: "", } + if market, ok := marketsInfo[symbol]; ok { + feeRate.BaseCoin = market.BaseCurrency + feeRate.QuoteCoin = market.QuoteCurrency + } + + if tradeLogLimiter.Allow() { + // The error log level was utilized due to a detected discrepancy in the fee calculations. + log.Errorf("failed to get %s fee rate, use default taker fee %f, maker fee %f, base coin: %s, quote coin: %s", + symbol, + feeRate.TakerFeeRate.Float64(), + feeRate.MakerFeeRate.Float64(), + feeRate.BaseCoin, + feeRate.QuoteCoin, + ) + } + } + return feeRate +} + +func (s *Stream) handleTradeEvent(events []TradeEvent) { + for _, event := range events { + feeRate := getFeeRate(event.Symbol, s.feeRateProvider, s.marketsInfo) gTrade, err := event.toGlobalTrade(feeRate) if err != nil { if tradeLogLimiter.Allow() { diff --git a/pkg/exchange/bybit/stream_test.go b/pkg/exchange/bybit/stream_test.go index 8ed651022..ba719a7c9 100644 --- a/pkg/exchange/bybit/stream_test.go +++ b/pkg/exchange/bybit/stream_test.go @@ -30,7 +30,7 @@ func getTestClientOrSkip(t *testing.T) *Stream { exchange, err := New(key, secret) assert.NoError(t, err) - return NewStream(key, secret, exchange) + return NewStream(key, secret, exchange, newFeeRatePoller(exchange)) } func TestStream(t *testing.T) { diff --git a/pkg/exchange/bybit/types_test.go b/pkg/exchange/bybit/types_test.go index f469e84fd..126b32c4a 100644 --- a/pkg/exchange/bybit/types_test.go +++ b/pkg/exchange/bybit/types_test.go @@ -15,7 +15,7 @@ import ( func Test_parseWebSocketEvent(t *testing.T) { t.Run("[public] PingEvent without req id", func(t *testing.T) { - s := NewStream("", "", nil) + s := NewStream("", "", nil, nil) msg := `{"success":true,"ret_msg":"pong","conn_id":"a806f6c4-3608-4b6d-a225-9f5da975bc44","op":"ping"}` raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) @@ -26,7 +26,7 @@ func Test_parseWebSocketEvent(t *testing.T) { }) t.Run("[public] PingEvent with req id", func(t *testing.T) { - s := NewStream("", "", nil) + s := NewStream("", "", nil, nil) msg := `{"success":true,"ret_msg":"pong","conn_id":"a806f6c4-3608-4b6d-a225-9f5da975bc44","req_id":"b26704da-f5af-44c2-bdf7-935d6739e1a0","op":"ping"}` raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) @@ -37,7 +37,7 @@ func Test_parseWebSocketEvent(t *testing.T) { }) t.Run("[private] PingEvent without req id", func(t *testing.T) { - s := NewStream("", "", nil) + s := NewStream("", "", nil, nil) msg := `{"op":"pong","args":["1690884539181"],"conn_id":"civn4p1dcjmtvb69ome0-yrt1"}` raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) @@ -48,7 +48,7 @@ func Test_parseWebSocketEvent(t *testing.T) { }) t.Run("[private] PingEvent with req id", func(t *testing.T) { - s := NewStream("", "", nil) + s := NewStream("", "", nil, nil) msg := `{"req_id":"78d36b57-a142-47b7-9143-5843df77d44d","op":"pong","args":["1690884539181"],"conn_id":"civn4p1dcjmtvb69ome0-yrt1"}` raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) From eb2a7421da4333273e7c15d34c183c3cafc82ebe Mon Sep 17 00:00:00 2001 From: edwin Date: Mon, 30 Sep 2024 21:27:36 +0800 Subject: [PATCH 3/5] pkg/exchange: integrate the v5 query trade api --- pkg/exchange/bybit/convert.go | 36 +++++++ pkg/exchange/bybit/exchange.go | 177 +++++++++++++++++---------------- 2 files changed, 126 insertions(+), 87 deletions(-) diff --git a/pkg/exchange/bybit/convert.go b/pkg/exchange/bybit/convert.go index 8c2d91541..0bda0cad2 100644 --- a/pkg/exchange/bybit/convert.go +++ b/pkg/exchange/bybit/convert.go @@ -325,6 +325,42 @@ func v3ToGlobalTrade(trade v3.Trade) (*types.Trade, error) { }, nil } +func toGlobalTrade(trade bybitapi.Trade, feeDetail symbolFeeDetail) (*types.Trade, error) { + side, err := toGlobalSideType(trade.Side) + if err != nil { + return nil, fmt.Errorf("unexpected side: %s, err: %w", trade.Side, err) + } + orderIdNum, err := strconv.ParseUint(trade.OrderId, 10, 64) + if err != nil { + return nil, fmt.Errorf("unexpected order id: %s, err: %w", trade.OrderId, err) + } + tradeIdNum, err := strconv.ParseUint(trade.ExecId, 10, 64) + if err != nil { + return nil, fmt.Errorf("unexpected trade id: %s, err: %w", trade.ExecId, err) + } + + fc, _ := calculateFee(trade, feeDetail) + + return &types.Trade{ + ID: tradeIdNum, + OrderID: orderIdNum, + Exchange: types.ExchangeBybit, + Price: trade.ExecPrice, + Quantity: trade.ExecQty, + QuoteQuantity: trade.ExecPrice.Mul(trade.ExecQty), + Symbol: trade.Symbol, + Side: side, + IsBuyer: side == types.SideTypeBuy, + IsMaker: trade.IsMaker, + Time: types.Time(trade.ExecTime), + Fee: trade.ExecFee, + FeeCurrency: fc, + IsMargin: false, + IsFutures: false, + IsIsolated: false, + }, nil +} + func toGlobalBalanceMap(events []bybitapi.WalletBalances) types.BalanceMap { bm := types.BalanceMap{} for _, event := range events { diff --git a/pkg/exchange/bybit/exchange.go b/pkg/exchange/bybit/exchange.go index 894f76611..ffd3265c2 100644 --- a/pkg/exchange/bybit/exchange.go +++ b/pkg/exchange/bybit/exchange.go @@ -12,17 +12,17 @@ import ( "golang.org/x/time/rate" "github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi" - v3 "github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi/v3" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" ) const ( - maxOrderIdLen = 36 - defaultQueryLimit = 50 - defaultKLineLimit = 1000 + maxOrderIdLen = 36 + defaultQueryLimit = 50 + defaultQueryTradeLimit = 100 + defaultKLineLimit = 1000 - halfYearDuration = 6 * 30 * 24 * time.Hour + queryTradeDurationLimit = 7 * 24 * time.Hour ) // https://bybit-exchange.github.io/docs/zh-TW/v5/rate-limit @@ -52,7 +52,13 @@ var ( type Exchange struct { key, secret string client *bybitapi.RestClient - v3client *v3.Client + marketsInfo types.MarketMap + + // feeRateProvider provides the fee rate and fee currency for each symbol. + // Because the bybit exchange does not provide a fee currency on traditional SPOT accounts, we need to query the marker + // fee rate to get the fee currency. + // https://bybit-exchange.github.io/docs/v5/enum#spot-fee-currency-instruction + feeRateProvider FeeRatePoller } func New(key, secret string) (*Exchange, error) { @@ -60,18 +66,25 @@ func New(key, secret string) (*Exchange, error) { if err != nil { return nil, err } - - if len(key) > 0 && len(secret) > 0 { - client.Auth(key, secret) - } - - return &Exchange{ + ex := &Exchange{ key: key, // pragma: allowlist nextline secret - secret: secret, - client: client, - v3client: v3.NewClient(client), - }, nil + secret: secret, + client: client, + } + if len(key) > 0 && len(secret) > 0 { + client.Auth(key, secret) + ex.feeRateProvider = newFeeRatePoller(ex) + + ctx, cancel := context.WithTimeoutCause(context.Background(), 5*time.Second, errors.New("query markets timeout")) + defer cancel() + ex.marketsInfo, err = ex.QueryMarkets(ctx) + if err != nil { + return nil, fmt.Errorf("failed to query markets, err: %w", err) + } + } + + return ex, nil } func (e *Exchange) Name() types.ExchangeName { @@ -226,35 +239,14 @@ func (e *Exchange) QueryOrderTrades(ctx context.Context, q types.OrderQuery) (tr if len(q.OrderID) == 0 { return nil, errors.New("orderID is required parameter") } - req := e.v3client.NewGetTradesRequest().OrderId(q.OrderID) + req := e.client.NewGetExecutionListRequest().OrderId(q.OrderID) if len(q.Symbol) != 0 { req.Symbol(q.Symbol) } + req.Limit(defaultQueryTradeLimit) - if err := queryOrderTradeRateLimiter.Wait(ctx); err != nil { - return nil, fmt.Errorf("trade rate limiter wait error: %w", err) - } - response, err := req.Do(ctx) - if err != nil { - return nil, fmt.Errorf("failed to query order trades, err: %w", err) - } - - var errs error - for _, trade := range response.List { - res, err := v3ToGlobalTrade(trade) - if err != nil { - errs = multierr.Append(errs, err) - continue - } - trades = append(trades, *res) - } - - if errs != nil { - return nil, errs - } - - return trades, nil + return e.queryTrades(ctx, req) } func (e *Exchange) SubmitOrder(ctx context.Context, order types.SubmitOrder) (*types.Order, error) { @@ -432,32 +424,65 @@ func (e *Exchange) QueryClosedOrders( return types.SortOrdersAscending(orders), nil } -/* -QueryTrades queries trades by time range or trade id range. -If options.StartTime is not specified, you can only query for records in the last 7 days. -If you want to query for records older than 7 days, options.StartTime is required. -It supports to query records up to 180 days. +func (e *Exchange) queryTrades(ctx context.Context, req *bybitapi.GetExecutionListRequest) (trades []types.Trade, err error) { + cursor := "" + for { + if len(cursor) != 0 { + req = req.Cursor(cursor) + } -** Here includes MakerRebate. If needed, let's discuss how to modify it to return in trade. ** -** StartTime and EndTime are inclusive. ** -** StartTime and EndTime cannot exceed 180 days. ** -** StartTime, EndTime, FromTradeId can be used together. ** -** If the `FromTradeId` is passed, and `ToTradeId` is null, then the result is sorted by tradeId in `ascend`. -Otherwise, the result is sorted by tradeId in `descend`. ** + res, err := req.Do(ctx) + if err != nil { + return nil, fmt.Errorf("failed to query trades, err: %w", err) + } + + for _, trade := range res.List { + feeRate, err := pollAndGetFeeRate(ctx, trade.Symbol, e.feeRateProvider, e.marketsInfo) + if err != nil { + return nil, fmt.Errorf("failed to get fee rate, err: %v", err) + } + trade, err := toGlobalTrade(trade, feeRate) + if err != nil { + return nil, fmt.Errorf("failed to convert trade, err: %v", err) + } + + trades = append(trades, *trade) + } + + if len(res.NextPageCursor) == 0 { + break + } + cursor = res.NextPageCursor + } + + return trades, nil + +} + +/* +QueryTrades queries trades by time range. +** startTime and endTime are not passed, return 7 days by default ** +** Only startTime is passed, return range between startTime and startTime+7 days ** +** Only endTime is passed, return range between endTime-7 days and endTime ** +** If both are passed, the rule is endTime - startTime <= 7 days ** */ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) { - // using v3 client, since the v5 API does not support feeCurrency. - req := e.v3client.NewGetTradesRequest() + req := e.client.NewGetExecutionListRequest() req.Symbol(symbol) - // If `lastTradeId` is given and greater than 0, the query will use it as a condition and the retrieved result will be - // in `ascending` order. We can use `lastTradeId` to retrieve all the data. So we hack it to '1' if `lastTradeID` is '0'. - // If 0 is given, it will not be used as a condition and the result will be in `descending` order. The FromTradeId - // option cannot be used to retrieve more data. - req.FromTradeId(strconv.FormatUint(options.LastTradeID, 10)) - if options.LastTradeID == 0 { - req.FromTradeId("1") + if options.StartTime != nil && options.EndTime != nil { + if options.EndTime.Before(*options.StartTime) { + return nil, fmt.Errorf("end time is before start time, start time: %s, end time: %s", options.StartTime.String(), options.EndTime.String()) + } + + if options.EndTime.Sub(*options.StartTime) > queryTradeDurationLimit { + newStartTime := options.EndTime.Add(-queryTradeDurationLimit) + + log.Warnf("!!!BYBIT EXCHANGE API NOTICE!!! The time range exceeds the server boundary: %s, start time: %s, end time: %s, updated start time %s -> %s", queryTradeDurationLimit, options.StartTime.String(), options.EndTime.String(), options.StartTime.String(), newStartTime.String()) + options.StartTime = &newStartTime + } } + if options.StartTime != nil { req.StartTime(options.StartTime.UTC()) } @@ -466,35 +491,13 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type } limit := uint64(options.Limit) - if limit > defaultQueryLimit || limit <= 0 { - log.Debugf("the parameter limit exceeds the server boundary or is set to zero. changed to %d, original value: %d", defaultQueryLimit, options.Limit) - limit = defaultQueryLimit + if limit > defaultQueryTradeLimit || limit <= 0 { + log.Debugf("the parameter limit exceeds the server boundary or is set to zero. changed to %d, original value: %d", defaultQueryTradeLimit, options.Limit) + limit = defaultQueryTradeLimit } req.Limit(limit) - if err := queryOrderTradeRateLimiter.Wait(ctx); err != nil { - return nil, fmt.Errorf("trade rate limiter wait error: %w", err) - } - response, err := req.Do(ctx) - if err != nil { - return nil, fmt.Errorf("failed to query trades, err: %w", err) - } - - var errs error - for _, trade := range response.List { - res, err := v3ToGlobalTrade(trade) - if err != nil { - errs = multierr.Append(errs, err) - continue - } - trades = append(trades, *res) - } - - if errs != nil { - return nil, errs - } - - return trades, nil + return e.queryTrades(ctx, req) } func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) { @@ -604,5 +607,5 @@ func (e *Exchange) GetAllFeeRates(ctx context.Context) (bybitapi.FeeRates, error } func (e *Exchange) NewStream() types.Stream { - return NewStream(e.key, e.secret, e) + return NewStream(e.key, e.secret, e, e.feeRateProvider) } From 7f1e1a3a51d2095100be6466ce06eee262768d0f Mon Sep 17 00:00:00 2001 From: edwin Date: Mon, 30 Sep 2024 21:28:21 +0800 Subject: [PATCH 4/5] pkg/exchange: export SymbolFeeDetail struct --- pkg/exchange/bybit/convert.go | 2 +- pkg/exchange/bybit/market_info_poller.go | 18 +++++------ pkg/exchange/bybit/market_info_poller_test.go | 12 ++++---- pkg/exchange/bybit/stream.go | 8 ++--- pkg/exchange/bybit/types.go | 8 ++--- pkg/exchange/bybit/types_test.go | 30 +++++++++---------- 6 files changed, 39 insertions(+), 39 deletions(-) diff --git a/pkg/exchange/bybit/convert.go b/pkg/exchange/bybit/convert.go index 0bda0cad2..93704530d 100644 --- a/pkg/exchange/bybit/convert.go +++ b/pkg/exchange/bybit/convert.go @@ -325,7 +325,7 @@ func v3ToGlobalTrade(trade v3.Trade) (*types.Trade, error) { }, nil } -func toGlobalTrade(trade bybitapi.Trade, feeDetail symbolFeeDetail) (*types.Trade, error) { +func toGlobalTrade(trade bybitapi.Trade, feeDetail SymbolFeeDetail) (*types.Trade, error) { side, err := toGlobalSideType(trade.Side) if err != nil { return nil, fmt.Errorf("unexpected side: %s, err: %w", trade.Side, err) diff --git a/pkg/exchange/bybit/market_info_poller.go b/pkg/exchange/bybit/market_info_poller.go index 4ce13cc9c..af7a2e2a2 100644 --- a/pkg/exchange/bybit/market_info_poller.go +++ b/pkg/exchange/bybit/market_info_poller.go @@ -23,11 +23,11 @@ var ( type FeeRatePoller interface { Start(ctx context.Context) - Get(symbol string) (symbolFeeDetail, bool) + Get(symbol string) (SymbolFeeDetail, bool) Poll(ctx context.Context) error } -type symbolFeeDetail struct { +type SymbolFeeDetail struct { bybitapi.FeeRate BaseCoin string @@ -43,13 +43,13 @@ type feeRatePoller struct { // lastSyncTime is the last time the fee rate was updated. lastSyncTime time.Time - symbolFeeDetail map[string]symbolFeeDetail + symbolFeeDetail map[string]SymbolFeeDetail } func newFeeRatePoller(marketInfoProvider MarketInfoProvider) *feeRatePoller { return &feeRatePoller{ client: marketInfoProvider, - symbolFeeDetail: map[string]symbolFeeDetail{}, + symbolFeeDetail: map[string]SymbolFeeDetail{}, } } @@ -105,7 +105,7 @@ func (p *feeRatePoller) Poll(ctx context.Context) error { return nil } -func (p *feeRatePoller) Get(symbol string) (symbolFeeDetail, bool) { +func (p *feeRatePoller) Get(symbol string) (SymbolFeeDetail, bool) { p.mu.Lock() defer p.mu.Unlock() @@ -113,16 +113,16 @@ func (p *feeRatePoller) Get(symbol string) (symbolFeeDetail, bool) { return fee, found } -func (e *feeRatePoller) getAllFeeRates(ctx context.Context) (map[string]symbolFeeDetail, error) { +func (e *feeRatePoller) getAllFeeRates(ctx context.Context) (map[string]SymbolFeeDetail, error) { feeRates, err := e.client.GetAllFeeRates(ctx) if err != nil { return nil, fmt.Errorf("failed to call get fee rates: %w", err) } - symbolMap := map[string]symbolFeeDetail{} + symbolMap := map[string]SymbolFeeDetail{} for _, f := range feeRates.List { if _, found := symbolMap[f.Symbol]; !found { - symbolMap[f.Symbol] = symbolFeeDetail{FeeRate: f} + symbolMap[f.Symbol] = SymbolFeeDetail{FeeRate: f} } } @@ -131,7 +131,7 @@ func (e *feeRatePoller) getAllFeeRates(ctx context.Context) (map[string]symbolFe return nil, fmt.Errorf("failed to get markets: %w", err) } - // update base coin, quote coin into symbolFeeDetail + // update base coin, quote coin into SymbolFeeDetail for _, mkt := range mkts { feeRate, found := symbolMap[mkt.Symbol] if !found { diff --git a/pkg/exchange/bybit/market_info_poller_test.go b/pkg/exchange/bybit/market_info_poller_test.go index 8770bcc2a..42401bec8 100644 --- a/pkg/exchange/bybit/market_info_poller_test.go +++ b/pkg/exchange/bybit/market_info_poller_test.go @@ -62,7 +62,7 @@ func TestFeeRatePoller_getAllFeeRates(t *testing.T) { mockMarketProvider.EXPECT().GetAllFeeRates(ctx).Return(feeRates, nil).Times(1) mockMarketProvider.EXPECT().QueryMarkets(ctx).Return(mkts, nil).Times(1) - expFeeRates := map[string]symbolFeeDetail{ + expFeeRates := map[string]SymbolFeeDetail{ "BTCUSDT": { FeeRate: feeRates.List[0], BaseCoin: "BTC", @@ -111,7 +111,7 @@ func TestFeeRatePoller_getAllFeeRates(t *testing.T) { symbolFeeDetails, err := s.getAllFeeRates(ctx) assert.Equal(t, fmt.Errorf("failed to get markets: %w", unknownErr), err) - assert.Equal(t, map[string]symbolFeeDetail(nil), symbolFeeDetails) + assert.Equal(t, map[string]SymbolFeeDetail(nil), symbolFeeDetails) }) t.Run("failed to get fee rates", func(t *testing.T) { @@ -126,7 +126,7 @@ func TestFeeRatePoller_getAllFeeRates(t *testing.T) { symbolFeeDetails, err := s.getAllFeeRates(ctx) assert.Equal(t, fmt.Errorf("failed to call get fee rates: %w", unknownErr), err) - assert.Equal(t, map[string]symbolFeeDetail(nil), symbolFeeDetails) + assert.Equal(t, map[string]SymbolFeeDetail(nil), symbolFeeDetails) }) } @@ -137,7 +137,7 @@ func Test_feeRatePoller_Get(t *testing.T) { mockMarketProvider := mocks.NewMockStreamDataProvider(mockCtrl) t.Run("found", func(t *testing.T) { symbol := "BTCUSDT" - expFeeDetail := symbolFeeDetail{ + expFeeDetail := SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ Symbol: symbol, TakerFeeRate: fixedpoint.NewFromFloat(0.1), @@ -149,7 +149,7 @@ func Test_feeRatePoller_Get(t *testing.T) { s := &feeRatePoller{ client: mockMarketProvider, - symbolFeeDetail: map[string]symbolFeeDetail{ + symbolFeeDetail: map[string]SymbolFeeDetail{ symbol: expFeeDetail, }, } @@ -162,7 +162,7 @@ func Test_feeRatePoller_Get(t *testing.T) { symbol := "BTCUSDT" s := &feeRatePoller{ client: mockMarketProvider, - symbolFeeDetail: map[string]symbolFeeDetail{}, + symbolFeeDetail: map[string]SymbolFeeDetail{}, } _, found := s.Get(symbol) diff --git a/pkg/exchange/bybit/stream.go b/pkg/exchange/bybit/stream.go index eaccd6c62..39b5c6805 100644 --- a/pkg/exchange/bybit/stream.go +++ b/pkg/exchange/bybit/stream.go @@ -439,18 +439,18 @@ func (s *Stream) handleKLineEvent(klineEvent KLineEvent) { } } -func pollAndGetFeeRate(ctx context.Context, symbol string, poller FeeRatePoller, marketsInfo types.MarketMap) (symbolFeeDetail, error) { +func pollAndGetFeeRate(ctx context.Context, symbol string, poller FeeRatePoller, marketsInfo types.MarketMap) (SymbolFeeDetail, error) { err := poller.Poll(ctx) if err != nil { - return symbolFeeDetail{}, err + return SymbolFeeDetail{}, err } return getFeeRate(symbol, poller, marketsInfo), nil } -func getFeeRate(symbol string, poller FeeRatePoller, marketsInfo types.MarketMap) symbolFeeDetail { +func getFeeRate(symbol string, poller FeeRatePoller, marketsInfo types.MarketMap) SymbolFeeDetail { feeRate, found := poller.Get(symbol) if !found { - feeRate = symbolFeeDetail{ + feeRate = SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ Symbol: symbol, TakerFeeRate: defaultTakerFee, diff --git a/pkg/exchange/bybit/types.go b/pkg/exchange/bybit/types.go index 48641ee74..1b387049d 100644 --- a/pkg/exchange/bybit/types.go +++ b/pkg/exchange/bybit/types.go @@ -327,7 +327,7 @@ type TradeEvent struct { TradeIv string `json:"tradeIv"` } -func (t *TradeEvent) toGlobalTrade(symbolFee symbolFeeDetail) (*types.Trade, error) { +func (t *TradeEvent) toGlobalTrade(symbolFee SymbolFeeDetail) (*types.Trade, error) { if t.Category != bybitapi.CategorySpot { return nil, fmt.Errorf("unexected category: %s", t.Category) } @@ -385,7 +385,7 @@ func (t *TradeEvent) toGlobalTrade(symbolFee symbolFeeDetail) (*types.Trade, err // IsMakerOrder = FALSE // -> Side = Buy -> base currency (BTC) // -> Side = Sell -> quote currency (USDT) -func calculateFee(t bybitapi.Trade, feeDetail symbolFeeDetail) (string, fixedpoint.Value) { +func calculateFee(t bybitapi.Trade, feeDetail SymbolFeeDetail) (string, fixedpoint.Value) { if feeDetail.MakerFeeRate.Sign() > 0 || !t.IsMaker { if t.Side == bybitapi.SideBuy { return feeDetail.BaseCoin, baseCoinAsFee(t, feeDetail) @@ -399,14 +399,14 @@ func calculateFee(t bybitapi.Trade, feeDetail symbolFeeDetail) (string, fixedpoi return feeDetail.BaseCoin, baseCoinAsFee(t, feeDetail) } -func baseCoinAsFee(t bybitapi.Trade, feeDetail symbolFeeDetail) fixedpoint.Value { +func baseCoinAsFee(t bybitapi.Trade, feeDetail SymbolFeeDetail) fixedpoint.Value { if t.IsMaker { return feeDetail.MakerFeeRate.Mul(t.ExecQty) } return feeDetail.TakerFeeRate.Mul(t.ExecQty) } -func quoteCoinAsFee(t bybitapi.Trade, feeDetail symbolFeeDetail) fixedpoint.Value { +func quoteCoinAsFee(t bybitapi.Trade, feeDetail SymbolFeeDetail) fixedpoint.Value { baseFee := t.ExecPrice.Mul(t.ExecQty) if t.IsMaker { return feeDetail.MakerFeeRate.Mul(baseFee) diff --git a/pkg/exchange/bybit/types_test.go b/pkg/exchange/bybit/types_test.go index 126b32c4a..bf4d97d20 100644 --- a/pkg/exchange/bybit/types_test.go +++ b/pkg/exchange/bybit/types_test.go @@ -528,7 +528,7 @@ func TestTradeEvent_toGlobalTrade(t *testing.T) { } */ t.Run("succeeds", func(t *testing.T) { - symbolFee := symbolFeeDetail{ + symbolFee := SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ Symbol: "BTCUSDT", TakerFeeRate: fixedpoint.NewFromFloat(0.001), @@ -597,7 +597,7 @@ func TestTradeEvent_toGlobalTrade(t *testing.T) { Category: "test-spot", } - actualTrade, err := tradeEvent.toGlobalTrade(symbolFeeDetail{}) + actualTrade, err := tradeEvent.toGlobalTrade(SymbolFeeDetail{}) assert.Equal(t, fmt.Errorf("unexected category: %s", tradeEvent.Category), err) assert.Nil(t, actualTrade) }) @@ -610,7 +610,7 @@ func TestTradeEvent_toGlobalTrade(t *testing.T) { Category: "spot", } - actualTrade, err := tradeEvent.toGlobalTrade(symbolFeeDetail{}) + actualTrade, err := tradeEvent.toGlobalTrade(SymbolFeeDetail{}) assert.Equal(t, fmt.Errorf("unexpected side: BOTH"), err) assert.Nil(t, actualTrade) }) @@ -625,7 +625,7 @@ func TestTradeEvent_toGlobalTrade(t *testing.T) { } _, nerr := strconv.ParseUint(tradeEvent.OrderId, 10, 64) - actualTrade, err := tradeEvent.toGlobalTrade(symbolFeeDetail{}) + actualTrade, err := tradeEvent.toGlobalTrade(SymbolFeeDetail{}) assert.Equal(t, fmt.Errorf("unexpected order id: %s, err: %w", tradeEvent.OrderId, nerr), err) assert.Nil(t, actualTrade) }) @@ -641,7 +641,7 @@ func TestTradeEvent_toGlobalTrade(t *testing.T) { } _, nerr := strconv.ParseUint(tradeEvent.ExecId, 10, 64) - actualTrade, err := tradeEvent.toGlobalTrade(symbolFeeDetail{}) + actualTrade, err := tradeEvent.toGlobalTrade(SymbolFeeDetail{}) assert.Equal(t, fmt.Errorf("unexpected exec id: %s, err: %w", tradeEvent.ExecId, nerr), err) assert.Nil(t, actualTrade) }) @@ -649,7 +649,7 @@ func TestTradeEvent_toGlobalTrade(t *testing.T) { func TestTradeEvent_CalculateFee(t *testing.T) { t.Run("maker fee positive, maker, buyer", func(t *testing.T) { - symbolFee := symbolFeeDetail{ + symbolFee := SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ Symbol: "BTCUSDT", TakerFeeRate: fixedpoint.NewFromFloat(0.001), @@ -676,7 +676,7 @@ func TestTradeEvent_CalculateFee(t *testing.T) { }) t.Run("maker fee positive, maker, seller", func(t *testing.T) { - symbolFee := symbolFeeDetail{ + symbolFee := SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ Symbol: "BTCUSDT", TakerFeeRate: fixedpoint.NewFromFloat(0.001), @@ -703,7 +703,7 @@ func TestTradeEvent_CalculateFee(t *testing.T) { }) t.Run("maker fee positive, taker, buyer", func(t *testing.T) { - symbolFee := symbolFeeDetail{ + symbolFee := SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ Symbol: "BTCUSDT", TakerFeeRate: fixedpoint.NewFromFloat(0.001), @@ -730,7 +730,7 @@ func TestTradeEvent_CalculateFee(t *testing.T) { }) t.Run("maker fee positive, taker, seller", func(t *testing.T) { - symbolFee := symbolFeeDetail{ + symbolFee := SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ Symbol: "BTCUSDT", TakerFeeRate: fixedpoint.NewFromFloat(0.001), @@ -757,7 +757,7 @@ func TestTradeEvent_CalculateFee(t *testing.T) { }) t.Run("maker fee negative, maker, buyer", func(t *testing.T) { - symbolFee := symbolFeeDetail{ + symbolFee := SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ Symbol: "BTCUSDT", TakerFeeRate: fixedpoint.NewFromFloat(-0.001), @@ -784,7 +784,7 @@ func TestTradeEvent_CalculateFee(t *testing.T) { }) t.Run("maker fee negative, maker, seller", func(t *testing.T) { - symbolFee := symbolFeeDetail{ + symbolFee := SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ Symbol: "BTCUSDT", TakerFeeRate: fixedpoint.NewFromFloat(-0.001), @@ -811,7 +811,7 @@ func TestTradeEvent_CalculateFee(t *testing.T) { }) t.Run("maker fee negative, taker, buyer", func(t *testing.T) { - symbolFee := symbolFeeDetail{ + symbolFee := SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ Symbol: "BTCUSDT", TakerFeeRate: fixedpoint.NewFromFloat(-0.001), @@ -838,7 +838,7 @@ func TestTradeEvent_CalculateFee(t *testing.T) { }) t.Run("maker fee negative, taker, seller", func(t *testing.T) { - symbolFee := symbolFeeDetail{ + symbolFee := SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ Symbol: "BTCUSDT", TakerFeeRate: fixedpoint.NewFromFloat(-0.001), @@ -867,7 +867,7 @@ func TestTradeEvent_CalculateFee(t *testing.T) { } func TestTradeEvent_baseCoinAsFee(t *testing.T) { - symbolFee := symbolFeeDetail{ + symbolFee := SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ Symbol: "BTCUSDT", TakerFeeRate: fixedpoint.NewFromFloat(0.001), @@ -892,7 +892,7 @@ func TestTradeEvent_baseCoinAsFee(t *testing.T) { } func TestTradeEvent_quoteCoinAsFee(t *testing.T) { - symbolFee := symbolFeeDetail{ + symbolFee := SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ Symbol: "BTCUSDT", TakerFeeRate: fixedpoint.NewFromFloat(0.001), From a68764b763d89d6d870c68314b7b1aab63aea95d Mon Sep 17 00:00:00 2001 From: edwin Date: Mon, 30 Sep 2024 21:42:41 +0800 Subject: [PATCH 5/5] pkg/exchange: merge FeeRatePoller into StreamDataProvider --- pkg/exchange/bybit/exchange.go | 8 ++++---- pkg/exchange/bybit/market_info_poller.go | 16 ++++++++-------- pkg/exchange/bybit/market_info_poller_test.go | 4 ++-- pkg/exchange/bybit/stream.go | 10 +++++----- pkg/exchange/bybit/stream_test.go | 2 +- pkg/exchange/bybit/types_test.go | 8 ++++---- 6 files changed, 24 insertions(+), 24 deletions(-) diff --git a/pkg/exchange/bybit/exchange.go b/pkg/exchange/bybit/exchange.go index ffd3265c2..629f086db 100644 --- a/pkg/exchange/bybit/exchange.go +++ b/pkg/exchange/bybit/exchange.go @@ -58,7 +58,7 @@ type Exchange struct { // Because the bybit exchange does not provide a fee currency on traditional SPOT accounts, we need to query the marker // fee rate to get the fee currency. // https://bybit-exchange.github.io/docs/v5/enum#spot-fee-currency-instruction - feeRateProvider FeeRatePoller + FeeRatePoller } func New(key, secret string) (*Exchange, error) { @@ -74,7 +74,7 @@ func New(key, secret string) (*Exchange, error) { } if len(key) > 0 && len(secret) > 0 { client.Auth(key, secret) - ex.feeRateProvider = newFeeRatePoller(ex) + ex.FeeRatePoller = newFeeRatePoller(ex) ctx, cancel := context.WithTimeoutCause(context.Background(), 5*time.Second, errors.New("query markets timeout")) defer cancel() @@ -437,7 +437,7 @@ func (e *Exchange) queryTrades(ctx context.Context, req *bybitapi.GetExecutionLi } for _, trade := range res.List { - feeRate, err := pollAndGetFeeRate(ctx, trade.Symbol, e.feeRateProvider, e.marketsInfo) + feeRate, err := pollAndGetFeeRate(ctx, trade.Symbol, e.FeeRatePoller, e.marketsInfo) if err != nil { return nil, fmt.Errorf("failed to get fee rate, err: %v", err) } @@ -607,5 +607,5 @@ func (e *Exchange) GetAllFeeRates(ctx context.Context) (bybitapi.FeeRates, error } func (e *Exchange) NewStream() types.Stream { - return NewStream(e.key, e.secret, e, e.feeRateProvider) + return NewStream(e.key, e.secret, e) } diff --git a/pkg/exchange/bybit/market_info_poller.go b/pkg/exchange/bybit/market_info_poller.go index af7a2e2a2..460d4c6c4 100644 --- a/pkg/exchange/bybit/market_info_poller.go +++ b/pkg/exchange/bybit/market_info_poller.go @@ -22,9 +22,9 @@ var ( ) type FeeRatePoller interface { - Start(ctx context.Context) - Get(symbol string) (SymbolFeeDetail, bool) - Poll(ctx context.Context) error + StartFeeRatePoller(ctx context.Context) + GetFeeRate(symbol string) (SymbolFeeDetail, bool) + PollFeeRate(ctx context.Context) error } type SymbolFeeDetail struct { @@ -53,14 +53,14 @@ func newFeeRatePoller(marketInfoProvider MarketInfoProvider) *feeRatePoller { } } -func (p *feeRatePoller) Start(ctx context.Context) { +func (p *feeRatePoller) StartFeeRatePoller(ctx context.Context) { p.once.Do(func() { p.startLoop(ctx) }) } func (p *feeRatePoller) startLoop(ctx context.Context) { - err := p.Poll(ctx) + err := p.PollFeeRate(ctx) if err != nil { log.WithError(err).Warn("failed to initialize the fee rate, the ticker is scheduled to update it subsequently") } @@ -76,14 +76,14 @@ func (p *feeRatePoller) startLoop(ctx context.Context) { return case <-ticker.C: - if err := p.Poll(ctx); err != nil { + if err := p.PollFeeRate(ctx); err != nil { log.WithError(err).Warn("failed to update fee rate") } } } } -func (p *feeRatePoller) Poll(ctx context.Context) error { +func (p *feeRatePoller) PollFeeRate(ctx context.Context) error { p.mu.Lock() defer p.mu.Unlock() // the poll will be called frequently, so we need to check the last sync time. @@ -105,7 +105,7 @@ func (p *feeRatePoller) Poll(ctx context.Context) error { return nil } -func (p *feeRatePoller) Get(symbol string) (SymbolFeeDetail, bool) { +func (p *feeRatePoller) GetFeeRate(symbol string) (SymbolFeeDetail, bool) { p.mu.Lock() defer p.mu.Unlock() diff --git a/pkg/exchange/bybit/market_info_poller_test.go b/pkg/exchange/bybit/market_info_poller_test.go index 42401bec8..295577f50 100644 --- a/pkg/exchange/bybit/market_info_poller_test.go +++ b/pkg/exchange/bybit/market_info_poller_test.go @@ -154,7 +154,7 @@ func Test_feeRatePoller_Get(t *testing.T) { }, } - res, found := s.Get(symbol) + res, found := s.GetFeeRate(symbol) assert.True(t, found) assert.Equal(t, expFeeDetail, res) }) @@ -165,7 +165,7 @@ func Test_feeRatePoller_Get(t *testing.T) { symbolFeeDetail: map[string]SymbolFeeDetail{}, } - _, found := s.Get(symbol) + _, found := s.GetFeeRate(symbol) assert.False(t, found) }) } diff --git a/pkg/exchange/bybit/stream.go b/pkg/exchange/bybit/stream.go index 39b5c6805..2a7124dfa 100644 --- a/pkg/exchange/bybit/stream.go +++ b/pkg/exchange/bybit/stream.go @@ -51,6 +51,7 @@ type AccountBalanceProvider interface { type StreamDataProvider interface { MarketInfoProvider AccountBalanceProvider + FeeRatePoller } //go:generate callbackgen -type Stream @@ -70,14 +71,13 @@ type Stream struct { tradeEventCallbacks []func(e []TradeEvent) } -func NewStream(key, secret string, userDataProvider StreamDataProvider, poller FeeRatePoller) *Stream { +func NewStream(key, secret string, userDataProvider StreamDataProvider) *Stream { stream := &Stream{ StandardStream: types.NewStandardStream(), // pragma: allowlist nextline secret key: key, secret: secret, streamDataProvider: userDataProvider, - feeRateProvider: poller, } stream.SetEndpointCreator(stream.createEndpoint) @@ -91,7 +91,7 @@ func NewStream(key, secret string, userDataProvider StreamDataProvider, poller F } // get account fee rate - go stream.feeRateProvider.Start(ctx) + go stream.streamDataProvider.StartFeeRatePoller(ctx) stream.marketsInfo, err = stream.streamDataProvider.QueryMarkets(ctx) if err != nil { @@ -440,7 +440,7 @@ func (s *Stream) handleKLineEvent(klineEvent KLineEvent) { } func pollAndGetFeeRate(ctx context.Context, symbol string, poller FeeRatePoller, marketsInfo types.MarketMap) (SymbolFeeDetail, error) { - err := poller.Poll(ctx) + err := poller.PollFeeRate(ctx) if err != nil { return SymbolFeeDetail{}, err } @@ -448,7 +448,7 @@ func pollAndGetFeeRate(ctx context.Context, symbol string, poller FeeRatePoller, } func getFeeRate(symbol string, poller FeeRatePoller, marketsInfo types.MarketMap) SymbolFeeDetail { - feeRate, found := poller.Get(symbol) + feeRate, found := poller.GetFeeRate(symbol) if !found { feeRate = SymbolFeeDetail{ FeeRate: bybitapi.FeeRate{ diff --git a/pkg/exchange/bybit/stream_test.go b/pkg/exchange/bybit/stream_test.go index ba719a7c9..8ed651022 100644 --- a/pkg/exchange/bybit/stream_test.go +++ b/pkg/exchange/bybit/stream_test.go @@ -30,7 +30,7 @@ func getTestClientOrSkip(t *testing.T) *Stream { exchange, err := New(key, secret) assert.NoError(t, err) - return NewStream(key, secret, exchange, newFeeRatePoller(exchange)) + return NewStream(key, secret, exchange) } func TestStream(t *testing.T) { diff --git a/pkg/exchange/bybit/types_test.go b/pkg/exchange/bybit/types_test.go index bf4d97d20..0fa457cae 100644 --- a/pkg/exchange/bybit/types_test.go +++ b/pkg/exchange/bybit/types_test.go @@ -15,7 +15,7 @@ import ( func Test_parseWebSocketEvent(t *testing.T) { t.Run("[public] PingEvent without req id", func(t *testing.T) { - s := NewStream("", "", nil, nil) + s := NewStream("", "", nil) msg := `{"success":true,"ret_msg":"pong","conn_id":"a806f6c4-3608-4b6d-a225-9f5da975bc44","op":"ping"}` raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) @@ -26,7 +26,7 @@ func Test_parseWebSocketEvent(t *testing.T) { }) t.Run("[public] PingEvent with req id", func(t *testing.T) { - s := NewStream("", "", nil, nil) + s := NewStream("", "", nil) msg := `{"success":true,"ret_msg":"pong","conn_id":"a806f6c4-3608-4b6d-a225-9f5da975bc44","req_id":"b26704da-f5af-44c2-bdf7-935d6739e1a0","op":"ping"}` raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) @@ -37,7 +37,7 @@ func Test_parseWebSocketEvent(t *testing.T) { }) t.Run("[private] PingEvent without req id", func(t *testing.T) { - s := NewStream("", "", nil, nil) + s := NewStream("", "", nil) msg := `{"op":"pong","args":["1690884539181"],"conn_id":"civn4p1dcjmtvb69ome0-yrt1"}` raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) @@ -48,7 +48,7 @@ func Test_parseWebSocketEvent(t *testing.T) { }) t.Run("[private] PingEvent with req id", func(t *testing.T) { - s := NewStream("", "", nil, nil) + s := NewStream("", "", nil) msg := `{"req_id":"78d36b57-a142-47b7-9143-5843df77d44d","op":"pong","args":["1690884539181"],"conn_id":"civn4p1dcjmtvb69ome0-yrt1"}` raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err)