mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-10 09:11:55 +00:00
Merge pull request #1070 from c9s/feature/submit-order-backoff
fix: add context, exponential backoff and max retry limit
This commit is contained in:
commit
dea86282b8
|
@ -310,17 +310,19 @@ func BatchPlaceOrder(ctx context.Context, exchange types.Exchange, submitOrders
|
|||
type OrderCallback func(order types.Order)
|
||||
|
||||
// BatchRetryPlaceOrder places the orders and retries the failed orders
|
||||
func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx []int, orderCallback OrderCallback, submitOrders ...types.SubmitOrder) (types.OrderSlice, error) {
|
||||
func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx []int, orderCallback OrderCallback, submitOrders ...types.SubmitOrder) (types.OrderSlice, []int, error) {
|
||||
var createdOrders types.OrderSlice
|
||||
var werr error
|
||||
|
||||
// if the errIdx is nil, then we should iterate all the submit orders
|
||||
// allocate a variable for new error index
|
||||
var errIdxNext []int
|
||||
if len(errIdx) == 0 {
|
||||
for i, submitOrder := range submitOrders {
|
||||
createdOrder, err2 := exchange.SubmitOrder(ctx, submitOrder)
|
||||
if err2 != nil {
|
||||
werr = multierr.Append(werr, err2)
|
||||
errIdx = append(errIdx, i)
|
||||
errIdxNext = append(errIdxNext, i)
|
||||
} else if createdOrder != nil {
|
||||
// if the order is successfully created, than we should copy the order tag
|
||||
createdOrder.Tag = submitOrder.Tag
|
||||
|
@ -332,24 +334,37 @@ func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx [
|
|||
createdOrders = append(createdOrders, *createdOrder)
|
||||
}
|
||||
}
|
||||
|
||||
errIdx = errIdxNext
|
||||
}
|
||||
|
||||
// if we got any error, we should re-iterate the errored orders
|
||||
for len(errIdx) > 0 {
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Minute)
|
||||
defer cancelTimeout()
|
||||
|
||||
// allocate a variable for new error index
|
||||
var errIdxNext []int
|
||||
// if we got any error, we should re-iterate the errored orders
|
||||
coolDownTime := 200 * time.Millisecond
|
||||
|
||||
// set backoff max retries to 101 because https://ja.wikipedia.org/wiki/101%E5%9B%9E%E7%9B%AE%E3%81%AE%E3%83%97%E3%83%AD%E3%83%9D%E3%83%BC%E3%82%BA
|
||||
backoffMaxRetries := uint64(101)
|
||||
batchRetryOrder:
|
||||
for retryRound := 0; len(errIdx) > 0 && retryRound < 10; retryRound++ {
|
||||
// sleep for 200 millisecond between each retry
|
||||
log.Warnf("retry round #%d, cooling down for %s", retryRound+1, coolDownTime)
|
||||
time.Sleep(coolDownTime)
|
||||
|
||||
// reset error index since it's a new retry
|
||||
errIdxNext = nil
|
||||
|
||||
// iterate the error index and re-submit the order
|
||||
for _, idx := range errIdx {
|
||||
log.Warnf("starting retry round #%d...", retryRound+1)
|
||||
for _, idx := range errIdxNext {
|
||||
submitOrder := submitOrders[idx]
|
||||
|
||||
op := func() error {
|
||||
// can allocate permanent error backoff.Permanent(err) to stop backoff
|
||||
createdOrder, err2 := exchange.SubmitOrder(ctx, submitOrder)
|
||||
createdOrder, err2 := exchange.SubmitOrder(timeoutCtx, submitOrder)
|
||||
if err2 == nil && createdOrder != nil {
|
||||
// if the order is successfully created, than we should copy the order tag
|
||||
// if the order is successfully created, then we should copy the order tag
|
||||
createdOrder.Tag = submitOrder.Tag
|
||||
|
||||
if orderCallback != nil {
|
||||
|
@ -362,8 +377,14 @@ func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx [
|
|||
return err2
|
||||
}
|
||||
|
||||
// if err2 := backoff.Retry(op, backoff.WithMaxRetries(backoff.NewExponentialBackOff(), 99)); err2 != nil {
|
||||
if err2 := backoff.Retry(op, backoff.NewExponentialBackOff()); err2 != nil {
|
||||
var bo backoff.BackOff = backoff.NewExponentialBackOff()
|
||||
bo = backoff.WithMaxRetries(bo, backoffMaxRetries)
|
||||
bo = backoff.WithContext(bo, timeoutCtx)
|
||||
if err2 := backoff.Retry(op, bo); err2 != nil {
|
||||
if err2 == context.Canceled {
|
||||
break batchRetryOrder
|
||||
}
|
||||
|
||||
werr = multierr.Append(werr, err2)
|
||||
errIdxNext = append(errIdxNext, idx)
|
||||
}
|
||||
|
@ -373,5 +394,5 @@ func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx [
|
|||
errIdx = errIdxNext
|
||||
}
|
||||
|
||||
return createdOrders, werr
|
||||
return createdOrders, errIdx, werr
|
||||
}
|
||||
|
|
|
@ -223,7 +223,8 @@ func (e *GeneralOrderExecutor) SubmitOrders(ctx context.Context, submitOrders ..
|
|||
e.tradeCollector.Process()
|
||||
}
|
||||
|
||||
return BatchRetryPlaceOrder(ctx, e.session.Exchange, nil, orderCreateCallback, formattedOrders...)
|
||||
createdOrders, _, err := BatchRetryPlaceOrder(ctx, e.session.Exchange, nil, orderCreateCallback, formattedOrders...)
|
||||
return createdOrders, err
|
||||
}
|
||||
|
||||
type OpenPositionOptions struct {
|
||||
|
|
|
@ -47,7 +47,7 @@ func (s *TradingService) SubmitOrder(ctx context.Context, request *pb.SubmitOrde
|
|||
}
|
||||
|
||||
// we will return this error later because some orders could be succeeded
|
||||
createdOrders, err := bbgo.BatchRetryPlaceOrder(ctx, session.Exchange, nil, nil, submitOrders...)
|
||||
createdOrders, _, err := bbgo.BatchRetryPlaceOrder(ctx, session.Exchange, nil, nil, submitOrders...)
|
||||
|
||||
// convert response
|
||||
resp := &pb.SubmitOrderResponse{
|
||||
|
|
Loading…
Reference in New Issue
Block a user