Merge pull request #1398 from c9s/edwin/bybit/rm-retry-and-add-fee-recover

FIX: [bybit] rm retry and add fee recover
This commit is contained in:
bailantaotao 2023-11-07 17:58:35 +08:00 committed by GitHub
commit 989132d4c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 53 additions and 24 deletions

View File

@ -8,7 +8,6 @@ import (
"time"
"github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi"
"github.com/c9s/bbgo/pkg/util"
)
const (
@ -46,14 +45,13 @@ func (p *feeRatePoller) Start(ctx context.Context) {
}
func (p *feeRatePoller) startLoop(ctx context.Context) {
err := p.poll(ctx)
if err != nil {
log.WithError(err).Warn("failed to initialize the fee rate, the ticker is scheduled to update it subsequently")
}
ticker := time.NewTicker(feeRatePollingPeriod)
defer ticker.Stop()
// Make sure the first poll should succeed by retrying with a shorter period.
_ = util.Retry(ctx, util.InfiniteRetry, 30*time.Second,
func() error { return p.poll(ctx) },
func(e error) { log.WithError(e).Warn("failed to update fee rate") })
for {
select {
case <-ctx.Done():
@ -83,15 +81,12 @@ func (p *feeRatePoller) poll(ctx context.Context) error {
return nil
}
func (p *feeRatePoller) Get(symbol string) (symbolFeeDetail, error) {
func (p *feeRatePoller) Get(symbol string) (symbolFeeDetail, bool) {
p.mu.Lock()
defer p.mu.Unlock()
fee, ok := p.symbolFeeDetail[symbol]
if !ok {
return symbolFeeDetail{}, fmt.Errorf("%s fee rate not found", symbol)
}
return fee, nil
fee, found := p.symbolFeeDetail[symbol]
return fee, found
}
func (e *feeRatePoller) getAllFeeRates(ctx context.Context) (map[string]symbolFeeDetail, error) {

View File

@ -137,7 +137,7 @@ func Test_feeRatePoller_Get(t *testing.T) {
defer mockCtrl.Finish()
mockMarketProvider := mocks.NewMockStreamDataProvider(mockCtrl)
t.Run("succeeds", func(t *testing.T) {
t.Run("found", func(t *testing.T) {
symbol := "BTCUSDT"
expFeeDetail := symbolFeeDetail{
FeeRate: bybitapi.FeeRate{
@ -156,18 +156,18 @@ func Test_feeRatePoller_Get(t *testing.T) {
},
}
res, err := s.Get(symbol)
assert.NoError(t, err)
res, found := s.Get(symbol)
assert.True(t, found)
assert.Equal(t, expFeeDetail, res)
})
t.Run("succeeds", func(t *testing.T) {
t.Run("not found", func(t *testing.T) {
symbol := "BTCUSDT"
s := &feeRatePoller{
client: mockMarketProvider,
symbolFeeDetail: map[string]symbolFeeDetail{},
}
_, err := s.Get(symbol)
assert.ErrorContains(t, err, symbol)
_, found := s.Get(symbol)
assert.False(t, found)
})
}

View File

@ -10,6 +10,7 @@ import (
"github.com/gorilla/websocket"
"github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
@ -22,6 +23,11 @@ const (
var (
// wsAuthRequest specifies the duration for which a websocket request's authentication is valid.
wsAuthRequest = 10 * time.Second
// The default taker/maker fees can help us in estimating trading fees in the SPOT market, because trade fees are not
// provided for traditional accounts on Bybit.
// https://www.bybit.com/en-US/help-center/article/Trading-Fee-Structure
defaultTakerFee = fixedpoint.NewFromFloat(0.001)
defaultMakerFee = fixedpoint.NewFromFloat(0.001)
)
// MarketInfoProvider calculates trade fees since trading fees are not supported by streaming.
@ -48,6 +54,7 @@ type Stream struct {
key, secret string
streamDataProvider StreamDataProvider
feeRateProvider *feeRatePoller
marketsInfo types.MarketMap
bookEventCallbacks []func(e BookEvent)
marketTradeEventCallbacks []func(e []MarketTradeEvent)
@ -71,8 +78,14 @@ func NewStream(key, secret string, userDataProvider StreamDataProvider) *Stream
stream.SetParser(stream.parseWebSocketEvent)
stream.SetDispatcher(stream.dispatchEvent)
stream.SetHeartBeat(stream.ping)
stream.SetBeforeConnect(func(ctx context.Context) error {
stream.SetBeforeConnect(func(ctx context.Context) (err error) {
go stream.feeRateProvider.Start(ctx)
stream.marketsInfo, err = stream.streamDataProvider.QueryMarkets(ctx)
if err != nil {
log.WithError(err).Error("failed to query market info before to connect stream")
return err
}
return nil
})
stream.OnConnect(stream.handlerConnect)
@ -406,10 +419,31 @@ func (s *Stream) handleKLineEvent(klineEvent KLineEvent) {
func (s *Stream) handleTradeEvent(events []TradeEvent) {
for _, event := range events {
feeRate, err := s.feeRateProvider.Get(event.Symbol)
if err != nil {
log.Warnf("failed to get fee rate by symbol: %s", event.Symbol)
continue
feeRate, found := s.feeRateProvider.Get(event.Symbol)
if !found {
feeRate = symbolFeeDetail{
FeeRate: bybitapi.FeeRate{
Symbol: event.Symbol,
TakerFeeRate: defaultTakerFee,
MakerFeeRate: defaultMakerFee,
},
BaseCoin: "",
QuoteCoin: "",
}
if market, ok := s.marketsInfo[event.Symbol]; ok {
feeRate.BaseCoin = market.BaseCurrency
feeRate.QuoteCoin = market.QuoteCurrency
}
// The error log level was utilized due to a detected discrepancy in the fee calculations.
log.Errorf("failed to get %s fee rate, use default taker fee %f, maker fee %f, base coin: %s, quote coin: %s",
event.Symbol,
feeRate.TakerFeeRate.Float64(),
feeRate.MakerFeeRate.Float64(),
feeRate.BaseCoin,
feeRate.QuoteCoin,
)
}
gTrade, err := event.toGlobalTrade(feeRate)