bbgo: add retry limit and exponential backoff to retry order

This commit is contained in:
c9s 2023-03-01 15:29:26 +08:00
parent 4f0c986709
commit c1cc008ecc
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
3 changed files with 32 additions and 15 deletions

View File

@ -310,17 +310,19 @@ func BatchPlaceOrder(ctx context.Context, exchange types.Exchange, submitOrders
type OrderCallback func(order types.Order) type OrderCallback func(order types.Order)
// BatchRetryPlaceOrder places the orders and retries the failed orders // BatchRetryPlaceOrder places the orders and retries the failed orders
func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx []int, orderCallback OrderCallback, submitOrders ...types.SubmitOrder) (types.OrderSlice, error) { func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx []int, orderCallback OrderCallback, submitOrders ...types.SubmitOrder) (types.OrderSlice, []int, error) {
var createdOrders types.OrderSlice var createdOrders types.OrderSlice
var werr error var werr error
// if the errIdx is nil, then we should iterate all the submit orders // if the errIdx is nil, then we should iterate all the submit orders
// allocate a variable for new error index
var errIdxNext []int
if len(errIdx) == 0 { if len(errIdx) == 0 {
for i, submitOrder := range submitOrders { for i, submitOrder := range submitOrders {
createdOrder, err2 := exchange.SubmitOrder(ctx, submitOrder) createdOrder, err2 := exchange.SubmitOrder(ctx, submitOrder)
if err2 != nil { if err2 != nil {
werr = multierr.Append(werr, err2) werr = multierr.Append(werr, err2)
errIdx = append(errIdx, i) errIdxNext = append(errIdxNext, i)
} else if createdOrder != nil { } else if createdOrder != nil {
// if the order is successfully created, than we should copy the order tag // if the order is successfully created, than we should copy the order tag
createdOrder.Tag = submitOrder.Tag createdOrder.Tag = submitOrder.Tag
@ -332,24 +334,36 @@ func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx [
createdOrders = append(createdOrders, *createdOrder) createdOrders = append(createdOrders, *createdOrder)
} }
} }
errIdx = errIdxNext
} }
// if we got any error, we should re-iterate the errored orders timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Minute)
for len(errIdx) > 0 { defer cancelTimeout()
time.Sleep(200 * time.Millisecond)
// allocate a variable for new error index // if we got any error, we should re-iterate the errored orders
var errIdxNext []int coolDownTime := 200 * time.Millisecond
// set backoff max retries to 101 because https://ja.wikipedia.org/wiki/101%E5%9B%9E%E7%9B%AE%E3%81%AE%E3%83%97%E3%83%AD%E3%83%9D%E3%83%BC%E3%82%BA
backoffMaxRetries := uint64(101)
for retryRound := 0; len(errIdx) > 0 && retryRound < 10; retryRound++ {
// sleep for 200 millisecond between each retry
log.Warnf("retry round #%d, cooling down for %s", retryRound+1, coolDownTime)
time.Sleep(coolDownTime)
// reset error index since it's a new retry
errIdxNext = nil
// iterate the error index and re-submit the order // iterate the error index and re-submit the order
for _, idx := range errIdx { log.Warnf("starting retry round #%d...", retryRound+1)
for _, idx := range errIdxNext {
submitOrder := submitOrders[idx] submitOrder := submitOrders[idx]
op := func() error { op := func() error {
// can allocate permanent error backoff.Permanent(err) to stop backoff // can allocate permanent error backoff.Permanent(err) to stop backoff
createdOrder, err2 := exchange.SubmitOrder(ctx, submitOrder) createdOrder, err2 := exchange.SubmitOrder(timeoutCtx, submitOrder)
if err2 == nil && createdOrder != nil { if err2 == nil && createdOrder != nil {
// if the order is successfully created, than we should copy the order tag // if the order is successfully created, then we should copy the order tag
createdOrder.Tag = submitOrder.Tag createdOrder.Tag = submitOrder.Tag
if orderCallback != nil { if orderCallback != nil {
@ -362,8 +376,10 @@ func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx [
return err2 return err2
} }
// if err2 := backoff.Retry(op, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 99)); err2 != nil { var bo backoff.BackOff = backoff.NewExponentialBackOff()
if err2 := backoff.Retry(op, backoff.NewExponentialBackOff()); err2 != nil { bo = backoff.WithMaxRetries(bo, backoffMaxRetries)
bo = backoff.WithContext(bo, timeoutCtx)
if err2 := backoff.Retry(op, bo); err2 != nil {
werr = multierr.Append(werr, err2) werr = multierr.Append(werr, err2)
errIdxNext = append(errIdxNext, idx) errIdxNext = append(errIdxNext, idx)
} }
@ -373,5 +389,5 @@ func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx [
errIdx = errIdxNext errIdx = errIdxNext
} }
return createdOrders, werr return createdOrders, errIdx, werr
} }

View File

@ -223,7 +223,8 @@ func (e *GeneralOrderExecutor) SubmitOrders(ctx context.Context, submitOrders ..
e.tradeCollector.Process() e.tradeCollector.Process()
} }
return BatchRetryPlaceOrder(ctx, e.session.Exchange, nil, orderCreateCallback, formattedOrders...) createdOrders, _, err := BatchRetryPlaceOrder(ctx, e.session.Exchange, nil, orderCreateCallback, formattedOrders...)
return createdOrders, err
} }
type OpenPositionOptions struct { type OpenPositionOptions struct {

View File

@ -47,7 +47,7 @@ func (s *TradingService) SubmitOrder(ctx context.Context, request *pb.SubmitOrde
} }
// we will return this error later because some orders could be succeeded // we will return this error later because some orders could be succeeded
createdOrders, err := bbgo.BatchRetryPlaceOrder(ctx, session.Exchange, nil, nil, submitOrders...) createdOrders, _, err := bbgo.BatchRetryPlaceOrder(ctx, session.Exchange, nil, nil, submitOrders...)
// convert response // convert response
resp := &pb.SubmitOrderResponse{ resp := &pb.SubmitOrderResponse{