From 345c92c295836eb9ac75b214d412c84a3cf962e3 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 23 Sep 2024 18:26:23 +0800 Subject: [PATCH] all: improve UniversalCancelAllOrders and add mutex to covered position --- pkg/exchange/bitget/stream.go | 8 ++++-- pkg/fixedpoint/mutex.go | 37 +++++++++++++++++++++++++ pkg/strategy/dca2/strategy.go | 6 ++-- pkg/strategy/liquiditymaker/strategy.go | 2 +- pkg/strategy/xdepthmaker/strategy.go | 33 ++++++++++++---------- pkg/util/logerr.go | 6 ++++ pkg/util/tradingutil/cancel.go | 26 +++++++++-------- 7 files changed, 87 insertions(+), 31 deletions(-) create mode 100644 pkg/fixedpoint/mutex.go diff --git a/pkg/exchange/bitget/stream.go b/pkg/exchange/bitget/stream.go index bc62bb920..415618b4e 100644 --- a/pkg/exchange/bitget/stream.go +++ b/pkg/exchange/bitget/stream.go @@ -435,10 +435,14 @@ func (s *Stream) handleOrderTradeEvent(m OrderTradeEvent) { return } - debugf("received OrderTradeEvent: %+v", m) + debugf("received %s (%s) OrderTradeEvent: %+v", m.instId, m.actionType, m) for _, order := range m.Orders { - debugf("received Order: %+v", order) + if order.TradeId == 0 { + debugf("%s order update #%d: %+v", m.instId, order.OrderId, order) + } else { + debugf("%s order update #%d and trade update #%d: %+v", m.instId, order.OrderId, order, order.TradeId) + } globalOrder, err := order.toGlobalOrder() if err != nil { diff --git a/pkg/fixedpoint/mutex.go b/pkg/fixedpoint/mutex.go new file mode 100644 index 000000000..8ce71e826 --- /dev/null +++ b/pkg/fixedpoint/mutex.go @@ -0,0 +1,37 @@ +package fixedpoint + +import "sync" + +type MutexValue struct { + value Value + mu sync.Mutex +} + +func (f *MutexValue) Add(v Value) Value { + f.mu.Lock() + f.value = f.value.Add(v) + ret := f.value + f.mu.Unlock() + return ret +} + +func (f *MutexValue) Sub(v Value) Value { + f.mu.Lock() + f.value = f.value.Sub(v) + ret := f.value + f.mu.Unlock() + return ret +} + +func (f *MutexValue) Set(v Value) { + f.mu.Lock() + f.value = v + f.mu.Unlock() +} + +func (f *MutexValue) Get() Value { + f.mu.Lock() + v := f.value + f.mu.Unlock() + return v +} diff --git a/pkg/strategy/dca2/strategy.go b/pkg/strategy/dca2/strategy.go index a43830700..529df9328 100644 --- a/pkg/strategy/dca2/strategy.go +++ b/pkg/strategy/dca2/strategy.go @@ -375,7 +375,7 @@ func (s *Strategy) Close(ctx context.Context) error { var err error if s.UniversalCancelAllOrdersWhenClose { - err = tradingutil.UniversalCancelAllOrders(ctx, s.ExchangeSession.Exchange, nil) + err = tradingutil.UniversalCancelAllOrders(ctx, s.ExchangeSession.Exchange, s.Symbol, nil) } else { err = s.OrderExecutor.GracefulCancel(ctx) } @@ -398,7 +398,7 @@ func (s *Strategy) CleanUp(ctx context.Context) error { } // ignore the first cancel error, this skips one open-orders query request - if err := tradingutil.UniversalCancelAllOrders(ctx, session.Exchange, nil); err == nil { + if err := tradingutil.UniversalCancelAllOrders(ctx, session.Exchange, s.Symbol, nil); err == nil { return nil } @@ -418,7 +418,7 @@ func (s *Strategy) CleanUp(ctx context.Context) error { break } - if err := tradingutil.UniversalCancelAllOrders(ctx, session.Exchange, openOrders); err != nil { + if err := tradingutil.UniversalCancelAllOrders(ctx, session.Exchange, s.Symbol, openOrders); err != nil { s.logger.WithError(err).Errorf("unable to cancel all orders") werr = multierr.Append(werr, err) } diff --git a/pkg/strategy/liquiditymaker/strategy.go b/pkg/strategy/liquiditymaker/strategy.go index d26d4e757..e8afa4725 100644 --- a/pkg/strategy/liquiditymaker/strategy.go +++ b/pkg/strategy/liquiditymaker/strategy.go @@ -159,7 +159,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. util.LogErr(err, "unable to cancel adjustment orders") } - if err := tradingutil.UniversalCancelAllOrders(ctx, s.Session.Exchange, nil); err != nil { + if err := tradingutil.UniversalCancelAllOrders(ctx, s.Session.Exchange, s.Symbol, nil); err != nil { util.LogErr(err, "unable to cancel all orders") } diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index 8863a98d1..59571d5d8 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -18,6 +18,7 @@ import ( "github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" + "github.com/c9s/bbgo/pkg/util/tradingutil" ) var lastPriceModifier = fixedpoint.NewFromFloat(1.001) @@ -46,9 +47,10 @@ type CrossExchangeMarketMakingStrategy struct { makerMarket, hedgeMarket types.Market // persistence fields - Position *types.Position `json:"position,omitempty" persistence:"position"` - ProfitStats *types.ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"` - CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty" persistence:"covered_position"` + Position *types.Position `json:"position,omitempty" persistence:"position"` + ProfitStats *types.ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"` + + CoveredPosition fixedpoint.MutexValue core.ConverterManager @@ -152,9 +154,7 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize( // 2) short position -> increase short position // TODO: make this atomic - s.mu.Lock() - s.CoveredPosition = s.CoveredPosition.Add(c) - s.mu.Unlock() + s.CoveredPosition.Add(c) }) return nil } @@ -486,13 +486,18 @@ func (s *Strategy) CrossRun( s.MakerOrderExecutor.TradeCollector().Process() position := s.Position.GetBase() - uncoverPosition := position.Sub(s.CoveredPosition) + + s.mu.Lock() + coveredPosition := s.CoveredPosition.Get() + uncoverPosition := position.Sub(coveredPosition) + s.mu.Unlock() + absPos := uncoverPosition.Abs() if absPos.Compare(s.hedgeMarket.MinQuantity) > 0 { log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v", s.Symbol, position, - s.CoveredPosition, + coveredPosition, uncoverPosition, ) @@ -520,6 +525,10 @@ func (s *Strategy) CrossRun( log.WithError(err).Errorf("graceful cancel %s order error", s.HedgeSymbol) } + if err := tradingutil.UniversalCancelAllOrders(ctx, s.makerSession.Exchange, s.Symbol, s.MakerOrderExecutor.ActiveMakerOrders().Orders()); err != nil { + log.WithError(err).Errorf("unable to cancel all orders") + } + bbgo.Sync(ctx, s) bbgo.Notify("%s: %s position", ID, s.Symbol, s.Position) }) @@ -625,13 +634,9 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) { // if the hedge is on sell side, then we should add positive position switch side { case types.SideTypeSell: - s.mu.Lock() - s.CoveredPosition = s.CoveredPosition.Add(quantity) - s.mu.Unlock() + s.CoveredPosition.Add(quantity) case types.SideTypeBuy: - s.mu.Lock() - s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg()) - s.mu.Unlock() + s.CoveredPosition.Add(quantity.Neg()) } } diff --git a/pkg/util/logerr.go b/pkg/util/logerr.go index 187416edc..09c24bcb4 100644 --- a/pkg/util/logerr.go +++ b/pkg/util/logerr.go @@ -4,6 +4,12 @@ import ( log "github.com/sirupsen/logrus" ) +// LogErr logs the error with the message and arguments if the error is not nil. +// It returns true if the error is not nil. +// Examples: +// LogErr(err) +// LogErr(err, "error message") +// LogErr(err, "error message %s", "with argument") func LogErr(err error, msgAndArgs ...interface{}) bool { if err == nil { return false diff --git a/pkg/util/tradingutil/cancel.go b/pkg/util/tradingutil/cancel.go index 437fefa9e..2c24a71ec 100644 --- a/pkg/util/tradingutil/cancel.go +++ b/pkg/util/tradingutil/cancel.go @@ -2,7 +2,6 @@ package tradingutil import ( "context" - "errors" "fmt" log "github.com/sirupsen/logrus" @@ -28,7 +27,7 @@ type CancelAllOrdersByGroupIDService interface { // // if CancelAllOrdersService is not supported, then it tries CancelAllOrdersBySymbolService which needs at least one symbol // for the cancel api request. -func UniversalCancelAllOrders(ctx context.Context, exchange types.Exchange, openOrders []types.Order) error { +func UniversalCancelAllOrders(ctx context.Context, exchange types.Exchange, symbol string, openOrders []types.Order) error { if service, ok := exchange.(CancelAllOrdersService); ok { if _, err := service.CancelAllOrders(ctx); err == nil { return nil @@ -37,17 +36,17 @@ func UniversalCancelAllOrders(ctx context.Context, exchange types.Exchange, open } } - if len(openOrders) == 0 { - return errors.New("to cancel all orders, openOrders can not be empty") - } - var anyErr error if service, ok := exchange.(CancelAllOrdersBySymbolService); ok { - var symbols = CollectOrderSymbols(openOrders) - for _, symbol := range symbols { - _, err := service.CancelOrdersBySymbol(ctx, symbol) - if err != nil { - anyErr = err + if len(symbol) > 0 { + _, anyErr = service.CancelOrdersBySymbol(ctx, symbol) + } else if len(openOrders) > 0 { + var orderSymbols = CollectOrderSymbols(openOrders) + for _, orderSymbol := range orderSymbols { + _, err := service.CancelOrdersBySymbol(ctx, orderSymbol) + if err != nil { + anyErr = err + } } } @@ -56,6 +55,11 @@ func UniversalCancelAllOrders(ctx context.Context, exchange types.Exchange, open } } + if len(openOrders) == 0 { + log.Warnf("empty open orders, unable to call specific cancel all orders api, skip") + return nil + } + if service, ok := exchange.(CancelAllOrdersByGroupIDService); ok { var groupIds = CollectOrderGroupIds(openOrders) for _, groupId := range groupIds {