Merge pull request #1427 from c9s/edwin/add-rate-limiter-for-log

CHORE: add log rate limiter to stream event and use backoff retry on bybit
This commit is contained in:
bailantaotao 2023-11-22 11:01:31 +08:00 committed by GitHub
commit cec8e7d712
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 48 additions and 195 deletions

View File

@ -10,6 +10,7 @@ import (
"time"
"github.com/gorilla/websocket"
"golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/exchange/bitget/bitgetapi"
v2 "github.com/c9s/bbgo/pkg/exchange/bitget/bitgetapi/v2"
@ -19,6 +20,11 @@ import (
var (
pingBytes = []byte("ping")
pongBytes = []byte("pong")
marketTradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
tradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
orderLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
kLineLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
)
//go:generate callbackgen -type Stream
@ -361,7 +367,9 @@ func (s *Stream) handleMaretTradeEvent(m MarketTradeEvent) {
for _, trade := range m.Events {
globalTrade, err := trade.ToGlobal(m.instId)
if err != nil {
log.WithError(err).Error("failed to convert to market trade")
if marketTradeLogLimiter.Allow() {
log.WithError(err).Error("failed to convert to market trade")
}
return
}
@ -377,7 +385,9 @@ func (s *Stream) handleKLineEvent(k KLineEvent) {
interval, found := toGlobalInterval[string(k.channel)]
if !found {
log.Errorf("unexpected interval %s on KLine subscription", k.channel)
if kLineLogLimiter.Allow() {
log.Errorf("unexpected interval %s on KLine subscription", k.channel)
}
return
}
@ -415,7 +425,9 @@ func (s *Stream) handleOrderTradeEvent(m OrderTradeEvent) {
for _, order := range m.Orders {
globalOrder, err := order.toGlobalOrder()
if err != nil {
log.Errorf("failed to convert order to global: %s", err)
if orderLogLimiter.Allow() {
log.Errorf("failed to convert order to global: %s", err)
}
continue
}
// The bitget support only snapshot on orders channel, so we use snapshot as update to emit data.
@ -427,7 +439,9 @@ func (s *Stream) handleOrderTradeEvent(m OrderTradeEvent) {
if globalOrder.Status == types.OrderStatusPartiallyFilled {
trade, err := order.toGlobalTrade()
if err != nil {
log.Errorf("failed to convert trade to global: %s", err)
if tradeLogLimiter.Allow() {
log.Errorf("failed to convert trade to global: %s", err)
}
continue
}
s.StandardStream.EmitTradeUpdate(trade)

View File

@ -8,11 +8,12 @@ import (
"time"
"github.com/gorilla/websocket"
"golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
const (
@ -28,6 +29,11 @@ var (
// https://www.bybit.com/en-US/help-center/article/Trading-Fee-Structure
defaultTakerFee = fixedpoint.NewFromFloat(0.001)
defaultMakerFee = fixedpoint.NewFromFloat(0.001)
marketTradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
tradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
orderLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
kLineLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
)
// MarketInfoProvider calculates trade fees since trading fees are not supported by streaming.
@ -345,11 +351,9 @@ func (s *Stream) handleAuthEvent() {
var balnacesMap types.BalanceMap
var err error
err = util.Retry(ctx, 10, 300*time.Millisecond, func() error {
err = retry.GeneralBackoff(ctx, func() error {
balnacesMap, err = s.streamDataProvider.QueryAccountBalances(ctx)
return err
}, func(err error) {
log.WithError(err).Error("failed to call query account balances")
})
if err != nil {
log.WithError(err).Error("no more attempts to retrieve balances")
@ -376,7 +380,9 @@ func (s *Stream) handleMarketTradeEvent(events []MarketTradeEvent) {
for _, event := range events {
trade, err := event.toGlobalTrade()
if err != nil {
log.WithError(err).Error("failed to convert to market trade")
if marketTradeLogLimiter.Allow() {
log.WithError(err).Error("failed to convert to market trade")
}
continue
}
@ -396,7 +402,9 @@ func (s *Stream) handleOrderEvent(events []OrderEvent) {
gOrder, err := toGlobalOrder(event.Order)
if err != nil {
log.WithError(err).Error("failed to convert to global order")
if orderLogLimiter.Allow() {
log.WithError(err).Error("failed to convert to global order")
}
continue
}
s.StandardStream.EmitOrderUpdate(*gOrder)
@ -411,7 +419,9 @@ func (s *Stream) handleKLineEvent(klineEvent KLineEvent) {
for _, event := range klineEvent.KLines {
kline, err := event.toGlobalKLine(klineEvent.Symbol)
if err != nil {
log.WithError(err).Error("failed to convert to global k line")
if kLineLogLimiter.Allow() {
log.WithError(err).Error("failed to convert to global k line")
}
continue
}
@ -442,19 +452,23 @@ func (s *Stream) handleTradeEvent(events []TradeEvent) {
feeRate.QuoteCoin = market.QuoteCurrency
}
// The error log level was utilized due to a detected discrepancy in the fee calculations.
log.Errorf("failed to get %s fee rate, use default taker fee %f, maker fee %f, base coin: %s, quote coin: %s",
event.Symbol,
feeRate.TakerFeeRate.Float64(),
feeRate.MakerFeeRate.Float64(),
feeRate.BaseCoin,
feeRate.QuoteCoin,
)
if tradeLogLimiter.Allow() {
// The error log level was utilized due to a detected discrepancy in the fee calculations.
log.Errorf("failed to get %s fee rate, use default taker fee %f, maker fee %f, base coin: %s, quote coin: %s",
event.Symbol,
feeRate.TakerFeeRate.Float64(),
feeRate.MakerFeeRate.Float64(),
feeRate.BaseCoin,
feeRate.QuoteCoin,
)
}
}
gTrade, err := event.toGlobalTrade(feeRate)
if err != nil {
log.WithError(err).Errorf("unable to convert: %+v", event)
if tradeLogLimiter.Allow() {
log.WithError(err).Errorf("unable to convert: %+v", event)
}
continue
}
s.StandardStream.EmitTradeUpdate(*gTrade)

View File

@ -1,69 +0,0 @@
package util
import (
"context"
"time"
"github.com/pkg/errors"
)
const (
InfiniteRetry = 0
)
type RetryPredicator func(e error) bool
// Retry retrys the passed function for "attempts" times, if passed function return error. Setting attempts to zero means keep retrying.
func Retry(ctx context.Context, attempts int, duration time.Duration, fnToRetry func() error, errHandler func(error), predicators ...RetryPredicator) (err error) {
infinite := false
if attempts == InfiniteRetry {
infinite = true
}
for attempts > 0 || infinite {
select {
case <-ctx.Done():
errMsg := "return for context done"
if err != nil {
return errors.Wrap(err, errMsg)
} else {
return errors.New(errMsg)
}
default:
if err = fnToRetry(); err == nil {
return nil
}
if !needRetry(err, predicators) {
return err
}
err = errors.Wrapf(err, "failed in retry: countdown: %v", attempts)
if errHandler != nil {
errHandler(err)
}
if !infinite {
attempts--
}
time.Sleep(duration)
}
}
return err
}
func needRetry(err error, predicators []RetryPredicator) bool {
if err == nil {
return false
}
// If no predicators specified, we will retry for all errors
if len(predicators) == 0 {
return true
}
return predicators[0](err)
}

View File

@ -1,106 +0,0 @@
package util
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
func addAndCheck(a *int, target int) error {
if *a++; *a == target {
return nil
} else {
return fmt.Errorf("a is not %v. It is %v\n", target, *a)
}
}
func TestRetry(t *testing.T) {
type test struct {
input int
targetNum int
ans int
ansErr error
}
tests := []test{
{input: 0, targetNum: 3, ans: 3, ansErr: nil},
{input: 0, targetNum: 10, ans: 3, ansErr: errors.New("failed in retry")},
}
for _, tc := range tests {
errHandled := false
err := Retry(context.Background(), 3, 1*time.Second, func() error {
return addAndCheck(&tc.input, tc.targetNum)
}, func(e error) { errHandled = true })
assert.Equal(t, true, errHandled)
if tc.ansErr == nil {
assert.NoError(t, err)
} else {
assert.Contains(t, err.Error(), tc.ansErr.Error())
}
assert.Equal(t, tc.ans, tc.input)
}
}
func TestRetryWithPredicator(t *testing.T) {
type test struct {
count int
f func() error
errHandler func(error)
predicator RetryPredicator
ansCount int
ansErr error
}
knownErr := errors.New("Duplicate entry '1-389837488-1' for key 'UNI_Trade'")
unknownErr := errors.New("Some Error")
tests := []test{
{
predicator: func(err error) bool {
return !strings.Contains(err.Error(), "Duplicate entry")
},
f: func() error { return knownErr },
ansCount: 1,
ansErr: knownErr,
},
{
predicator: func(err error) bool {
return !strings.Contains(err.Error(), "Duplicate entry")
},
f: func() error { return unknownErr },
ansCount: 3,
ansErr: unknownErr,
},
}
attempts := 3
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, tc := range tests {
err := Retry(ctx, attempts, 100*time.Millisecond, func() error {
tc.count++
return tc.f()
}, tc.errHandler, tc.predicator)
assert.Equal(t, tc.ansCount, tc.count)
assert.EqualError(t, errors.Cause(err), tc.ansErr.Error(), "should be equal")
}
}
func TestRetryCtxCancel(t *testing.T) {
result := int(0)
target := int(3)
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := Retry(ctx, 5, 1*time.Second, func() error { return addAndCheck(&result, target) }, func(error) {})
assert.Error(t, err)
fmt.Println("Error:", err.Error())
assert.Equal(t, int(0), result)
}