bbgo: add types.ExchangeOrderQueryService support for checking canceled orders

This commit is contained in:
c9s 2024-09-25 14:13:23 +08:00
parent 768428a7eb
commit b3d58a9e05
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54

View File

@ -3,17 +3,21 @@ package bbgo
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"strconv"
"sync" "sync"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/sigchan" "github.com/c9s/bbgo/pkg/sigchan"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
const DefaultCancelOrderWaitTime = 20 * time.Millisecond const DefaultCancelOrderWaitTime = 20 * time.Millisecond
const DefaultOrderCancelTimeout = 5 * time.Second
// ActiveOrderBook manages the local active order books. // ActiveOrderBook manages the local active order books.
// //
@ -35,6 +39,7 @@ type ActiveOrderBook struct {
mu sync.Mutex mu sync.Mutex
cancelOrderWaitTime time.Duration cancelOrderWaitTime time.Duration
cancelOrderTimeout time.Duration
} }
func NewActiveOrderBook(symbol string) *ActiveOrderBook { func NewActiveOrderBook(symbol string) *ActiveOrderBook {
@ -44,6 +49,7 @@ func NewActiveOrderBook(symbol string) *ActiveOrderBook {
pendingOrderUpdates: types.NewSyncOrderMap(), pendingOrderUpdates: types.NewSyncOrderMap(),
C: sigchan.New(1), C: sigchan.New(1),
cancelOrderWaitTime: DefaultCancelOrderWaitTime, cancelOrderWaitTime: DefaultCancelOrderWaitTime,
cancelOrderTimeout: DefaultOrderCancelTimeout,
} }
} }
@ -174,7 +180,7 @@ func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange,
hasSymbol := b.Symbol != "" hasSymbol := b.Symbol != ""
for _, o := range orders { for _, o := range orders {
if hasSymbol && o.Symbol != b.Symbol { if hasSymbol && o.Symbol != b.Symbol {
return errors.New("[ActiveOrderBook] cancel " + b.Symbol + " orderbook with different symbol: " + o.Symbol) return fmt.Errorf("[ActiveOrderBook] canceling %s orderbook with different symbol: %s", b.Symbol, o.Symbol)
} }
} }
} }
@ -186,7 +192,6 @@ func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange,
log.Debugf("[ActiveOrderBook] gracefully cancelling %s orders...", b.Symbol) log.Debugf("[ActiveOrderBook] gracefully cancelling %s orders...", b.Symbol)
waitTime := b.cancelOrderWaitTime waitTime := b.cancelOrderWaitTime
orderCancelTimeout := 5 * time.Second
startTime := time.Now() startTime := time.Now()
// ensure every order is canceled // ensure every order is canceled
@ -204,7 +209,7 @@ func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange,
log.Debugf("[ActiveOrderBook] waiting %s for %d %s orders to be cancelled...", waitTime, len(orders), b.Symbol) log.Debugf("[ActiveOrderBook] waiting %s for %d %s orders to be cancelled...", waitTime, len(orders), b.Symbol)
if cancelAll { if cancelAll {
clear, err := b.waitAllClear(ctx, waitTime, orderCancelTimeout) clear, err := b.waitAllClear(ctx, waitTime, b.cancelOrderTimeout)
if err != nil { if err != nil {
if !errors.Is(err, context.Canceled) { if !errors.Is(err, context.Canceled) {
log.WithError(err).Errorf("order cancel error") log.WithError(err).Errorf("order cancel error")
@ -230,36 +235,64 @@ func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange,
} }
// verify the current open orders via the RESTful API // verify the current open orders via the RESTful API
log.Warnf("[ActiveOrderBook] using open orders API to verify the active orders...") if orderQueryService, ok := ex.(types.ExchangeOrderQueryService); ok {
for idx, o := range orders {
retOrder, err := retry.QueryOrderUntilSuccessful(ctx, orderQueryService, types.OrderQuery{
Symbol: o.Symbol,
OrderID: strconv.FormatUint(o.OrderID, 10),
})
var symbolOrdersMap = categorizeOrderBySymbol(orders) if err != nil {
var errOccurred bool log.WithError(err).Errorf("unable to update order #%d", o.OrderID)
var leftOrders types.OrderSlice continue
for symbol, symbolOrders := range symbolOrdersMap { } else if retOrder != nil {
openOrders, err := ex.QueryOpenOrders(ctx, symbol) b.Update(*retOrder)
if err != nil {
errOccurred = true
log.WithError(err).Errorf("can not query %s open orders", symbol)
break
}
openOrderMap := types.NewOrderMap(openOrders...) orders[idx] = *retOrder
for _, o := range symbolOrders {
// if it's not on the order book (open orders),
// we should remove it from our local side
if !openOrderMap.Exists(o.OrderID) {
b.Remove(o)
} else {
leftOrders.Add(o)
} }
} }
if cancelAll {
orders = b.Orders()
} else {
// partial cancel
orders = filterCanceledOrders(orders)
}
} else {
log.Warnf("[ActiveOrderBook] using open orders API to verify the active orders...")
var symbolOrdersMap = categorizeOrderBySymbol(orders)
var errOccurred bool
var leftOrders types.OrderSlice
for symbol, symbolOrders := range symbolOrdersMap {
openOrders, err := ex.QueryOpenOrders(ctx, symbol)
if err != nil {
errOccurred = true
log.WithError(err).Errorf("can not query %s open orders", symbol)
break
}
openOrderMap := types.NewOrderMap(openOrders...)
for _, o := range symbolOrders {
// if it's not on the order book (open orders),
// we should remove it from our local side
if !openOrderMap.Exists(o.OrderID) {
b.Remove(o)
} else {
leftOrders.Add(o)
}
}
}
// if an error occurs, we cannot update the orders because it will result in an empty order slice.
if !errOccurred {
// update order slice for the next try
orders = leftOrders
}
} }
// if an error occurs, we cannot update the orders because it will result in an empty order slice.
if !errOccurred {
// update order slice for the next try
orders = leftOrders
}
} }
log.Debugf("[ActiveOrderBook] all %s orders are cancelled successfully in %s", b.Symbol, time.Since(startTime)) log.Debugf("[ActiveOrderBook] all %s orders are cancelled successfully in %s", b.Symbol, time.Since(startTime))
@ -489,3 +522,15 @@ func categorizeOrderBySymbol(orders types.OrderSlice) map[string]types.OrderSlic
return orderMap return orderMap
} }
func filterCanceledOrders(orders types.OrderSlice) (ret types.OrderSlice) {
for _, o := range orders {
if o.Status == types.OrderStatusCanceled {
continue
}
ret = append(ret, o)
}
return ret
}