bbgo: refactor SubmitOrders method for retry

This commit is contained in:
c9s 2023-02-23 23:34:26 +08:00
parent ed61f70d74
commit d89d0cf0ff
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
4 changed files with 44 additions and 45 deletions

View File

@ -3,6 +3,7 @@ package bbgo
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
@ -305,8 +306,10 @@ func BatchPlaceOrder(ctx context.Context, exchange types.Exchange, submitOrders
return createdOrders, errIndexes, err
}
type OrderCallback func(order types.Order)
// BatchRetryPlaceOrder places the orders and retries the failed orders
func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx []int, submitOrders ...types.SubmitOrder) (types.OrderSlice, error) {
func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx []int, orderCallback OrderCallback, submitOrders ...types.SubmitOrder) (types.OrderSlice, error) {
var createdOrders types.OrderSlice
var err error
@ -320,6 +323,11 @@ func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx [
} else if createdOrder != nil {
// if the order is successfully created, than we should copy the order tag
createdOrder.Tag = submitOrder.Tag
if orderCallback != nil {
orderCallback(*createdOrder)
}
createdOrders = append(createdOrders, *createdOrder)
}
}
@ -327,6 +335,8 @@ func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx [
// if we got any error, we should re-iterate the errored orders
for len(errIdx) > 0 {
time.Sleep(200 * time.Millisecond)
// allocate a variable for new error index
var errIdxNext []int
@ -340,6 +350,11 @@ func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx [
} else if createdOrder != nil {
// if the order is successfully created, than we should copy the order tag
createdOrder.Tag = submitOrder.Tag
if orderCallback != nil {
orderCallback(*createdOrder)
}
createdOrders = append(createdOrders, *createdOrder)
}
}

View File

@ -217,24 +217,12 @@ func (e *GeneralOrderExecutor) SubmitOrders(ctx context.Context, submitOrders ..
return nil, err
}
createdOrders, errIdx, err := BatchPlaceOrder(ctx, e.session.Exchange, formattedOrders...)
if err != nil {
log.WithError(err).Errorf("place order error, will retry orders: %v", errIdx)
orderCreateCallback := func(createdOrder types.Order) {
e.orderStore.Add(createdOrder)
e.activeMakerOrders.Add(createdOrder)
}
if len(errIdx) > 0 {
time.Sleep(200 * time.Millisecond)
createdOrders2, err2 := BatchRetryPlaceOrder(ctx, e.session.Exchange, errIdx, formattedOrders...)
if err2 != nil {
err = multierr.Append(err, err2)
} else {
createdOrders = append(createdOrders, createdOrders2...)
}
}
e.orderStore.Add(createdOrders...)
e.activeMakerOrders.Add(createdOrders...)
createdOrders, err := BatchRetryPlaceOrder(ctx, e.session.Exchange, nil, orderCreateCallback, formattedOrders...)
e.tradeCollector.Process()
return createdOrders, err
}

View File

@ -9,7 +9,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.uber.org/multierr"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
@ -47,15 +46,8 @@ func (s *TradingService) SubmitOrder(ctx context.Context, request *pb.SubmitOrde
}
}
createdOrders, errIdx, err := bbgo.BatchPlaceOrder(ctx, session.Exchange, submitOrders...)
if len(errIdx) > 0 {
createdOrders2, err2 := bbgo.BatchRetryPlaceOrder(ctx, session.Exchange, errIdx, submitOrders...)
if err2 != nil {
err = multierr.Append(err, err2)
} else {
createdOrders = append(createdOrders, createdOrders2...)
}
}
// we will return this error later because some orders could be succeeded
createdOrders, err := bbgo.BatchRetryPlaceOrder(ctx, session.Exchange, nil, nil, submitOrders...)
// convert response
resp := &pb.SubmitOrderResponse{

View File

@ -1546,24 +1546,6 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
metricsGridProfit.With(labels).Set(stats.TotalQuoteProfit.Float64())
})
// detect if there are previous grid orders on the order book
if s.ClearOpenOrdersWhenStart {
if err := s.clearOpenOrders(ctx, session); err != nil {
return err
}
}
if s.ClearOpenOrdersIfMismatch {
mismatch, err := s.openOrdersMismatches(ctx, session)
if err != nil {
s.logger.WithError(err).Errorf("clearOpenOrdersIfMismatch error")
} else if mismatch {
if err2 := s.clearOpenOrders(ctx, session); err2 != nil {
s.logger.WithError(err2).Errorf("clearOpenOrders error")
}
}
}
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
@ -1589,6 +1571,28 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
session.MarketDataStream.OnKLineClosed(s.newTakeProfitHandler(ctx, session))
}
// detect if there are previous grid orders on the order book
session.UserDataStream.OnStart(func() {
if s.ClearOpenOrdersWhenStart {
s.logger.Infof("clearOpenOrdersWhenStart is set, clearing open orders...")
if err := s.clearOpenOrders(ctx, session); err != nil {
s.logger.WithError(err).Errorf("clearOpenOrdersWhenStart error")
}
}
if s.ClearOpenOrdersIfMismatch {
s.logger.Infof("clearOpenOrdersIfMismatch is set, checking mismatched orders...")
mismatch, err := s.openOrdersMismatches(ctx, session)
if err != nil {
s.logger.WithError(err).Errorf("clearOpenOrdersIfMismatch error")
} else if mismatch {
if err2 := s.clearOpenOrders(ctx, session); err2 != nil {
s.logger.WithError(err2).Errorf("clearOpenOrders error")
}
}
}
})
// if TriggerPrice is zero, that means we need to open the grid when start up
if s.TriggerPrice.IsZero() {
session.UserDataStream.OnStart(func() {