Merge pull request #1466 from c9s/c9s/fix-xdepthmaker

FIX: [xdepthmaker] remove shared trade collector and fix hedge
This commit is contained in:
c9s 2023-12-20 22:37:11 +08:00 committed by GitHub
commit 26f7d869ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -12,7 +12,6 @@ import (
"golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
@ -48,14 +47,9 @@ type CrossExchangeMarketMakingStrategy struct {
Position *types.Position `json:"position,omitempty" persistence:"position"`
ProfitStats *types.ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty" persistence:"covered_position"`
mu sync.Mutex
MakerOrderExecutor, HedgeOrderExecutor *bbgo.GeneralOrderExecutor
// orderStore is a shared order store between the maker session and the hedge session
orderStore *core.OrderStore
// tradeCollector is a shared trade collector between the maker session and the hedge session
tradeCollector *core.TradeCollector
}
func (s *CrossExchangeMarketMakingStrategy) Initialize(
@ -129,14 +123,7 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize(
// bbgo.Sync(ctx, s)
})
// global order store
s.orderStore = core.NewOrderStore(s.Position.Symbol)
s.orderStore.BindStream(hedgeSession.UserDataStream)
s.orderStore.BindStream(makerSession.UserDataStream)
// global trade collector
s.tradeCollector = core.NewTradeCollector(symbol, s.Position, s.orderStore)
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
s.HedgeOrderExecutor.TradeCollector().OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
c := trade.PositionChange()
// sync covered position
@ -146,13 +133,12 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize(
// buy trade -> positive delta ->
// 1) short position -> reduce short position
// 2) short position -> increase short position
if trade.Exchange == s.hedgeSession.ExchangeName {
// TODO: make this atomic
s.CoveredPosition = s.CoveredPosition.Add(c)
}
// TODO: make this atomic
s.mu.Lock()
s.CoveredPosition = s.CoveredPosition.Add(c)
s.mu.Unlock()
})
s.tradeCollector.BindStream(s.hedgeSession.UserDataStream)
s.tradeCollector.BindStream(s.makerSession.UserDataStream)
return nil
}
@ -229,7 +215,7 @@ func (s *Strategy) ID() string {
}
func (s *Strategy) InstanceID() string {
return fmt.Sprintf("%s:%s", ID, s.Symbol)
return fmt.Sprintf("%s:%s:%s-%s", ID, s.Symbol, s.MakerExchange, s.HedgeExchange)
}
func (s *Strategy) Initialize() error {
@ -339,11 +325,7 @@ func (s *Strategy) CrossRun(
s.stopC = make(chan struct{})
if s.RecoverTrade {
s.tradeCollector.OnRecover(func(trade types.Trade) {
bbgo.Notify("Recovered trade", trade)
})
go s.runTradeRecover(ctx)
// go s.runTradeRecover(ctx)
}
s.authedC = make(chan struct{}, 2)
@ -373,7 +355,7 @@ func (s *Strategy) CrossRun(
defer fullReplenishTicker.Stop()
// clean up the previous open orders
if err := s.cleanUpOpenOrders(ctx); err != nil {
if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil {
log.WithError(err).Errorf("error cleaning up open orders")
}
@ -426,10 +408,10 @@ func (s *Strategy) CrossRun(
//
// For negative position:
// uncover position = -5 - -3 (covered position) = -2
s.tradeCollector.Process()
s.HedgeOrderExecutor.TradeCollector().Process()
s.MakerOrderExecutor.TradeCollector().Process()
position := s.Position.GetBase()
uncoverPosition := position.Sub(s.CoveredPosition)
absPos := uncoverPosition.Abs()
if absPos.Compare(s.hedgeMarket.MinQuantity) > 0 {
@ -550,7 +532,7 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
log.Infof("submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
createdOrders, err := s.HedgeOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
_, err := s.HedgeOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Market: s.hedgeMarket,
Symbol: s.Symbol,
Type: types.OrderTypeMarket,
@ -564,14 +546,16 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
return
}
s.orderStore.Add(createdOrders...)
// if the hedge is on sell side, then we should add positive position
switch side {
case types.SideTypeSell:
s.mu.Lock()
s.CoveredPosition = s.CoveredPosition.Add(quantity)
s.mu.Unlock()
case types.SideTypeBuy:
s.mu.Lock()
s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg())
s.mu.Unlock()
}
}
@ -597,11 +581,11 @@ func (s *Strategy) runTradeRecover(ctx context.Context) {
if s.RecoverTrade {
startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod)
if err := s.tradeCollector.Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
if err := s.tradeCollector.Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
if err := s.MakerOrderExecutor.TradeCollector().Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
}
@ -853,17 +837,15 @@ func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) {
return
}
createdOrders, err := s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...)
_, err = s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...)
if err != nil {
log.WithError(err).Errorf("order error: %s", err.Error())
return
}
s.orderStore.Add(createdOrders...)
}
func (s *Strategy) cleanUpOpenOrders(ctx context.Context) error {
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.makerSession.Exchange, s.Symbol)
func (s *Strategy) cleanUpOpenOrders(ctx context.Context, session *bbgo.ExchangeSession) error {
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol)
if err != nil {
return err
}
@ -875,7 +857,7 @@ func (s *Strategy) cleanUpOpenOrders(ctx context.Context) error {
log.Infof("found existing open orders:")
types.OrderSlice(openOrders).Print()
if err := s.makerSession.Exchange.CancelOrders(ctx, openOrders...); err != nil {
if err := session.Exchange.CancelOrders(ctx, openOrders...); err != nil {
return err
}