all: improve UniversalCancelAllOrders and add mutex to covered position

This commit is contained in:
c9s 2024-09-23 18:26:23 +08:00
parent a8444e9796
commit 345c92c295
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
7 changed files with 87 additions and 31 deletions

View File

@ -435,10 +435,14 @@ func (s *Stream) handleOrderTradeEvent(m OrderTradeEvent) {
return return
} }
debugf("received OrderTradeEvent: %+v", m) debugf("received %s (%s) OrderTradeEvent: %+v", m.instId, m.actionType, m)
for _, order := range m.Orders { for _, order := range m.Orders {
debugf("received Order: %+v", order) if order.TradeId == 0 {
debugf("%s order update #%d: %+v", m.instId, order.OrderId, order)
} else {
debugf("%s order update #%d and trade update #%d: %+v", m.instId, order.OrderId, order, order.TradeId)
}
globalOrder, err := order.toGlobalOrder() globalOrder, err := order.toGlobalOrder()
if err != nil { if err != nil {

37
pkg/fixedpoint/mutex.go Normal file
View File

@ -0,0 +1,37 @@
package fixedpoint
import "sync"
type MutexValue struct {
value Value
mu sync.Mutex
}
func (f *MutexValue) Add(v Value) Value {
f.mu.Lock()
f.value = f.value.Add(v)
ret := f.value
f.mu.Unlock()
return ret
}
func (f *MutexValue) Sub(v Value) Value {
f.mu.Lock()
f.value = f.value.Sub(v)
ret := f.value
f.mu.Unlock()
return ret
}
func (f *MutexValue) Set(v Value) {
f.mu.Lock()
f.value = v
f.mu.Unlock()
}
func (f *MutexValue) Get() Value {
f.mu.Lock()
v := f.value
f.mu.Unlock()
return v
}

View File

@ -375,7 +375,7 @@ func (s *Strategy) Close(ctx context.Context) error {
var err error var err error
if s.UniversalCancelAllOrdersWhenClose { if s.UniversalCancelAllOrdersWhenClose {
err = tradingutil.UniversalCancelAllOrders(ctx, s.ExchangeSession.Exchange, nil) err = tradingutil.UniversalCancelAllOrders(ctx, s.ExchangeSession.Exchange, s.Symbol, nil)
} else { } else {
err = s.OrderExecutor.GracefulCancel(ctx) err = s.OrderExecutor.GracefulCancel(ctx)
} }
@ -398,7 +398,7 @@ func (s *Strategy) CleanUp(ctx context.Context) error {
} }
// ignore the first cancel error, this skips one open-orders query request // ignore the first cancel error, this skips one open-orders query request
if err := tradingutil.UniversalCancelAllOrders(ctx, session.Exchange, nil); err == nil { if err := tradingutil.UniversalCancelAllOrders(ctx, session.Exchange, s.Symbol, nil); err == nil {
return nil return nil
} }
@ -418,7 +418,7 @@ func (s *Strategy) CleanUp(ctx context.Context) error {
break break
} }
if err := tradingutil.UniversalCancelAllOrders(ctx, session.Exchange, openOrders); err != nil { if err := tradingutil.UniversalCancelAllOrders(ctx, session.Exchange, s.Symbol, openOrders); err != nil {
s.logger.WithError(err).Errorf("unable to cancel all orders") s.logger.WithError(err).Errorf("unable to cancel all orders")
werr = multierr.Append(werr, err) werr = multierr.Append(werr, err)
} }

View File

@ -159,7 +159,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
util.LogErr(err, "unable to cancel adjustment orders") util.LogErr(err, "unable to cancel adjustment orders")
} }
if err := tradingutil.UniversalCancelAllOrders(ctx, s.Session.Exchange, nil); err != nil { if err := tradingutil.UniversalCancelAllOrders(ctx, s.Session.Exchange, s.Symbol, nil); err != nil {
util.LogErr(err, "unable to cancel all orders") util.LogErr(err, "unable to cancel all orders")
} }

View File

@ -18,6 +18,7 @@ import (
"github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/tradingutil"
) )
var lastPriceModifier = fixedpoint.NewFromFloat(1.001) var lastPriceModifier = fixedpoint.NewFromFloat(1.001)
@ -46,9 +47,10 @@ type CrossExchangeMarketMakingStrategy struct {
makerMarket, hedgeMarket types.Market makerMarket, hedgeMarket types.Market
// persistence fields // persistence fields
Position *types.Position `json:"position,omitempty" persistence:"position"` Position *types.Position `json:"position,omitempty" persistence:"position"`
ProfitStats *types.ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"` ProfitStats *types.ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty" persistence:"covered_position"`
CoveredPosition fixedpoint.MutexValue
core.ConverterManager core.ConverterManager
@ -152,9 +154,7 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize(
// 2) short position -> increase short position // 2) short position -> increase short position
// TODO: make this atomic // TODO: make this atomic
s.mu.Lock() s.CoveredPosition.Add(c)
s.CoveredPosition = s.CoveredPosition.Add(c)
s.mu.Unlock()
}) })
return nil return nil
} }
@ -486,13 +486,18 @@ func (s *Strategy) CrossRun(
s.MakerOrderExecutor.TradeCollector().Process() s.MakerOrderExecutor.TradeCollector().Process()
position := s.Position.GetBase() position := s.Position.GetBase()
uncoverPosition := position.Sub(s.CoveredPosition)
s.mu.Lock()
coveredPosition := s.CoveredPosition.Get()
uncoverPosition := position.Sub(coveredPosition)
s.mu.Unlock()
absPos := uncoverPosition.Abs() absPos := uncoverPosition.Abs()
if absPos.Compare(s.hedgeMarket.MinQuantity) > 0 { if absPos.Compare(s.hedgeMarket.MinQuantity) > 0 {
log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v", log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v",
s.Symbol, s.Symbol,
position, position,
s.CoveredPosition, coveredPosition,
uncoverPosition, uncoverPosition,
) )
@ -520,6 +525,10 @@ func (s *Strategy) CrossRun(
log.WithError(err).Errorf("graceful cancel %s order error", s.HedgeSymbol) log.WithError(err).Errorf("graceful cancel %s order error", s.HedgeSymbol)
} }
if err := tradingutil.UniversalCancelAllOrders(ctx, s.makerSession.Exchange, s.Symbol, s.MakerOrderExecutor.ActiveMakerOrders().Orders()); err != nil {
log.WithError(err).Errorf("unable to cancel all orders")
}
bbgo.Sync(ctx, s) bbgo.Sync(ctx, s)
bbgo.Notify("%s: %s position", ID, s.Symbol, s.Position) bbgo.Notify("%s: %s position", ID, s.Symbol, s.Position)
}) })
@ -625,13 +634,9 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
// if the hedge is on sell side, then we should add positive position // if the hedge is on sell side, then we should add positive position
switch side { switch side {
case types.SideTypeSell: case types.SideTypeSell:
s.mu.Lock() s.CoveredPosition.Add(quantity)
s.CoveredPosition = s.CoveredPosition.Add(quantity)
s.mu.Unlock()
case types.SideTypeBuy: case types.SideTypeBuy:
s.mu.Lock() s.CoveredPosition.Add(quantity.Neg())
s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg())
s.mu.Unlock()
} }
} }

View File

@ -4,6 +4,12 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
// LogErr logs the error with the message and arguments if the error is not nil.
// It returns true if the error is not nil.
// Examples:
// LogErr(err)
// LogErr(err, "error message")
// LogErr(err, "error message %s", "with argument")
func LogErr(err error, msgAndArgs ...interface{}) bool { func LogErr(err error, msgAndArgs ...interface{}) bool {
if err == nil { if err == nil {
return false return false

View File

@ -2,7 +2,6 @@ package tradingutil
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -28,7 +27,7 @@ type CancelAllOrdersByGroupIDService interface {
// //
// if CancelAllOrdersService is not supported, then it tries CancelAllOrdersBySymbolService which needs at least one symbol // if CancelAllOrdersService is not supported, then it tries CancelAllOrdersBySymbolService which needs at least one symbol
// for the cancel api request. // for the cancel api request.
func UniversalCancelAllOrders(ctx context.Context, exchange types.Exchange, openOrders []types.Order) error { func UniversalCancelAllOrders(ctx context.Context, exchange types.Exchange, symbol string, openOrders []types.Order) error {
if service, ok := exchange.(CancelAllOrdersService); ok { if service, ok := exchange.(CancelAllOrdersService); ok {
if _, err := service.CancelAllOrders(ctx); err == nil { if _, err := service.CancelAllOrders(ctx); err == nil {
return nil return nil
@ -37,17 +36,17 @@ func UniversalCancelAllOrders(ctx context.Context, exchange types.Exchange, open
} }
} }
if len(openOrders) == 0 {
return errors.New("to cancel all orders, openOrders can not be empty")
}
var anyErr error var anyErr error
if service, ok := exchange.(CancelAllOrdersBySymbolService); ok { if service, ok := exchange.(CancelAllOrdersBySymbolService); ok {
var symbols = CollectOrderSymbols(openOrders) if len(symbol) > 0 {
for _, symbol := range symbols { _, anyErr = service.CancelOrdersBySymbol(ctx, symbol)
_, err := service.CancelOrdersBySymbol(ctx, symbol) } else if len(openOrders) > 0 {
if err != nil { var orderSymbols = CollectOrderSymbols(openOrders)
anyErr = err for _, orderSymbol := range orderSymbols {
_, err := service.CancelOrdersBySymbol(ctx, orderSymbol)
if err != nil {
anyErr = err
}
} }
} }
@ -56,6 +55,11 @@ func UniversalCancelAllOrders(ctx context.Context, exchange types.Exchange, open
} }
} }
if len(openOrders) == 0 {
log.Warnf("empty open orders, unable to call specific cancel all orders api, skip")
return nil
}
if service, ok := exchange.(CancelAllOrdersByGroupIDService); ok { if service, ok := exchange.(CancelAllOrdersByGroupIDService); ok {
var groupIds = CollectOrderGroupIds(openOrders) var groupIds = CollectOrderGroupIds(openOrders)
for _, groupId := range groupIds { for _, groupId := range groupIds {