From fad7ef219e1736f81ab5d6586ac13d0c7ad2bd40 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 7 Aug 2024 16:01:56 +0800 Subject: [PATCH 1/4] xdepthmaker: separate hedge symbol --- pkg/strategy/xdepthmaker/strategy.go | 58 ++++++++++++++++------------ 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index a989cf8ab..4a2623c4b 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -56,7 +56,8 @@ type CrossExchangeMarketMakingStrategy struct { func (s *CrossExchangeMarketMakingStrategy) Initialize( ctx context.Context, environ *bbgo.Environment, makerSession, hedgeSession *bbgo.ExchangeSession, - symbol, strategyID, instanceID string, + symbol, hedgeSymbol, + strategyID, instanceID string, ) error { s.parent = ctx s.ctx, s.cancel = context.WithCancel(ctx) @@ -67,9 +68,9 @@ func (s *CrossExchangeMarketMakingStrategy) Initialize( s.hedgeSession = hedgeSession var ok bool - s.hedgeMarket, ok = s.hedgeSession.Market(symbol) + s.hedgeMarket, ok = s.hedgeSession.Market(hedgeSymbol) if !ok { - return fmt.Errorf("source session market %s is not defined", symbol) + return fmt.Errorf("hedge session market %s is not defined", hedgeSymbol) } s.makerMarket, ok = s.makerSession.Market(symbol) @@ -150,14 +151,19 @@ type Strategy struct { Symbol string `json:"symbol"` - // HedgeExchange session name - HedgeExchange string `json:"hedgeExchange"` + // HedgeSymbol is the symbol for the hedge exchange + // symbol could be different from the maker exchange + HedgeSymbol string `json:"hedgeSymbol"` // MakerExchange session name MakerExchange string `json:"makerExchange"` + // HedgeExchange session name + HedgeExchange string `json:"hedgeExchange"` + UpdateInterval types.Duration `json:"updateInterval"` - HedgeInterval types.Duration `json:"hedgeInterval"` + + HedgeInterval types.Duration `json:"hedgeInterval"` FullReplenishInterval types.Duration `json:"fullReplenishInterval"` @@ -239,12 +245,12 @@ func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) { panic(err) } - hedgeSession.Subscribe(types.BookChannel, s.Symbol, types.SubscribeOptions{ + hedgeSession.Subscribe(types.BookChannel, s.HedgeSymbol, types.SubscribeOptions{ Depth: types.DepthLevelMedium, Speed: types.SpeedLow, }) - hedgeSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"}) + hedgeSession.Subscribe(types.KLineChannel, s.HedgeSymbol, types.SubscribeOptions{Interval: "1m"}) makerSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"}) } @@ -281,6 +287,10 @@ func (s *Strategy) Defaults() error { s.HedgeInterval = types.Duration(3 * time.Second) } + if s.HedgeSymbol == "" { + s.HedgeSymbol = s.Symbol + } + if s.NumLayers == 0 { s.NumLayers = 1 } @@ -358,13 +368,13 @@ func (s *Strategy) CrossRun( if err := s.CrossExchangeMarketMakingStrategy.Initialize(ctx, s.Environment, - makerSession, - hedgeSession, - s.Symbol, ID, s.InstanceID()); err != nil { + makerSession, hedgeSession, + s.Symbol, s.HedgeSymbol, + ID, s.InstanceID()); err != nil { return err } - s.pricingBook = types.NewStreamBook(s.Symbol) + s.pricingBook = types.NewStreamBook(s.HedgeSymbol) s.pricingBook.BindStream(s.hedgeSession.MarketDataStream) s.stopC = make(chan struct{}) @@ -488,7 +498,7 @@ func (s *Strategy) CrossRun( } if err := s.HedgeOrderExecutor.GracefulCancel(ctx); err != nil { - log.WithError(err).Errorf("graceful cancel %s order error", s.Symbol) + log.WithError(err).Errorf("graceful cancel %s order error", s.HedgeSymbol) } bbgo.Sync(ctx, s) @@ -576,12 +586,12 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) { s.hedgeErrorRateReservation = nil } - log.Infof("submitting %s hedge order %s %v", s.Symbol, side.String(), quantity) - bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity) + log.Infof("submitting %s hedge order %s %v", s.HedgeSymbol, side.String(), quantity) + bbgo.Notify("Submitting %s hedge order %s %v", s.HedgeSymbol, side.String(), quantity) _, err := s.HedgeOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{ Market: s.hedgeMarket, - Symbol: s.Symbol, + Symbol: s.hedgeMarket.Symbol, Type: types.OrderTypeMarket, Side: side, Quantity: quantity, @@ -627,7 +637,7 @@ func (s *Strategy) runTradeRecover(ctx context.Context) { startTime := time.Now().Add(-tradeScanInterval).Add(-tradeScanOverlapBufferPeriod) - if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { + if err := s.HedgeOrderExecutor.TradeCollector().Recover(ctx, s.hedgeSession.Exchange.(types.ExchangeTradeHistoryService), s.HedgeSymbol, startTime); err != nil { log.WithError(err).Errorf("query trades error") } @@ -639,7 +649,9 @@ func (s *Strategy) runTradeRecover(ctx context.Context) { } func (s *Strategy) generateMakerOrders( - pricingBook *types.StreamOrderBook, maxLayer int, availableBase fixedpoint.Value, availableQuote fixedpoint.Value, + pricingBook *types.StreamOrderBook, + maxLayer int, + availableBase, availableQuote fixedpoint.Value, ) ([]types.SubmitOrder, error) { _, _, hasPrice := pricingBook.BestBidAndAsk() if !hasPrice { @@ -776,7 +788,7 @@ func (s *Strategy) generateMakerOrders( } submitOrders = append(submitOrders, types.SubmitOrder{ - Symbol: s.Symbol, + Symbol: s.makerMarket.Symbol, Type: types.OrderTypeLimitMaker, Market: s.makerMarket, Side: side, @@ -829,7 +841,7 @@ func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) { bestBidPrice := bestBid.Price bestAskPrice := bestAsk.Price - log.Infof("%s book ticker: best ask / best bid = %v / %v", s.Symbol, bestAskPrice, bestBidPrice) + log.Infof("%s book ticker: best ask / best bid = %v / %v", s.HedgeSymbol, bestAskPrice, bestBidPrice) s.lastPrice = bestBidPrice.Add(bestAskPrice).Div(Two) @@ -898,11 +910,7 @@ func (s *Strategy) cleanUpOpenOrders(ctx context.Context, session *bbgo.Exchange log.Infof("found existing open orders:") types.OrderSlice(openOrders).Print() - if err := session.Exchange.CancelOrders(ctx, openOrders...); err != nil { - return err - } - - return nil + return session.Exchange.CancelOrders(ctx, openOrders...) } func selectSessions2( From ffb2c14f1d43764f612f297bc42d5e392a292762 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 7 Aug 2024 17:07:31 +0800 Subject: [PATCH 2/4] core: add TradeConverter to the trade collector --- pkg/core/tradecollector.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/pkg/core/tradecollector.go b/pkg/core/tradecollector.go index e6e5d515f..da3e19445 100644 --- a/pkg/core/tradecollector.go +++ b/pkg/core/tradecollector.go @@ -12,6 +12,8 @@ import ( "github.com/c9s/bbgo/pkg/types" ) +type TradeConverter func(trade types.Trade) (types.Trade, error) + //go:generate callbackgen -type TradeCollector type TradeCollector struct { Symbol string @@ -25,6 +27,8 @@ type TradeCollector struct { mu sync.Mutex + tradeConverters []TradeConverter + recoverCallbacks []func(trade types.Trade) tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value) @@ -49,6 +53,24 @@ func NewTradeCollector(symbol string, position *types.Position, orderStore *Orde } } +func (c *TradeCollector) AddTradeConverter(converter TradeConverter) { + c.tradeConverters = append(c.tradeConverters, converter) +} + +func (c *TradeCollector) convertTrade(trade types.Trade) types.Trade { + for _, converter := range c.tradeConverters { + convTrade, err := converter(trade) + if err != nil { + logrus.WithError(err).Errorf("trade converter error, trade: %s", trade.String()) + continue + } + + trade = convTrade + } + + return trade +} + // OrderStore returns the order store used by the trade collector func (c *TradeCollector) OrderStore() *OrderStore { return c.orderStore From 25b0b5ded5eceb32414ded77e26d8ddd6da9de70 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 7 Aug 2024 17:12:55 +0800 Subject: [PATCH 3/4] max: fix withdraw state convert by calling convertWithdrawStatusV2 v3 api does not return status field --- pkg/exchange/max/convert.go | 2 +- pkg/exchange/max/exchange.go | 3 +-- pkg/exchange/max/maxapi/account.go | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/exchange/max/convert.go b/pkg/exchange/max/convert.go index c1fe3dfb8..20b80e397 100644 --- a/pkg/exchange/max/convert.go +++ b/pkg/exchange/max/convert.go @@ -361,7 +361,7 @@ func convertWithdrawStatusV3(status max.WithdrawStatus) types.WithdrawStatus { return types.WithdrawStatus(status) } -func convertWithdrawStatus(state max.WithdrawState) types.WithdrawStatus { +func convertWithdrawStatusV2(state max.WithdrawState) types.WithdrawStatus { switch state { case max.WithdrawStateSent, max.WithdrawStateSubmitting, max.WithdrawStatePending, "accepted", "approved": diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index 3852aadf5..873eed3ef 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -865,8 +865,7 @@ func (e *Exchange) QueryWithdrawHistory( continue } - // we can convert this later - status := convertWithdrawStatusV3(d.Status) + status := convertWithdrawStatusV2(d.State) txIDs[d.TxID] = struct{}{} withdraw := types.Withdraw{ diff --git a/pkg/exchange/max/maxapi/account.go b/pkg/exchange/max/maxapi/account.go index cc0ae7925..be176f923 100644 --- a/pkg/exchange/max/maxapi/account.go +++ b/pkg/exchange/max/maxapi/account.go @@ -194,7 +194,7 @@ type Withdraw struct { // "sygna_verifying" State WithdrawState `json:"state"` - Status WithdrawStatus `json:"status,omitempty"` + // Status WithdrawStatus `json:"status,omitempty"` CreatedAt types.MillisecondTimestamp `json:"created_at"` UpdatedAt types.MillisecondTimestamp `json:"updated_at"` From 813684fc77a029e5bbb30cab63ea36168f6e92bc Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 7 Aug 2024 17:29:03 +0800 Subject: [PATCH 4/4] core: change TradeConverter to interface and integrate trade converter --- pkg/core/tradecollector.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/pkg/core/tradecollector.go b/pkg/core/tradecollector.go index da3e19445..6069d2c26 100644 --- a/pkg/core/tradecollector.go +++ b/pkg/core/tradecollector.go @@ -12,7 +12,9 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -type TradeConverter func(trade types.Trade) (types.Trade, error) +type TradeConverter interface { + Convert(trade types.Trade) (types.Trade, error) +} //go:generate callbackgen -type TradeCollector type TradeCollector struct { @@ -58,10 +60,14 @@ func (c *TradeCollector) AddTradeConverter(converter TradeConverter) { } func (c *TradeCollector) convertTrade(trade types.Trade) types.Trade { + if len(c.tradeConverters) == 0 { + return trade + } + for _, converter := range c.tradeConverters { - convTrade, err := converter(trade) + convTrade, err := converter.Convert(trade) if err != nil { - logrus.WithError(err).Errorf("trade converter error, trade: %s", trade.String()) + logrus.WithError(err).Errorf("trade %+v converter error, trade: %s", converter, trade.String()) continue } @@ -138,6 +144,8 @@ func (c *TradeCollector) Recover( } func (c *TradeCollector) RecoverTrade(td types.Trade) bool { + td = c.convertTrade(td) + logrus.Debugf("checking trade: %s", td.String()) if c.processTrade(td) { logrus.Infof("recovered trade: %s", td.String()) @@ -252,7 +260,7 @@ func (c *TradeCollector) processTrade(trade types.Trade) bool { // return true when the given trade is added // return false when the given trade is not added func (c *TradeCollector) ProcessTrade(trade types.Trade) bool { - return c.processTrade(trade) + return c.processTrade(c.convertTrade(trade)) } // Run is a goroutine executed in the background @@ -271,7 +279,8 @@ func (c *TradeCollector) Run(ctx context.Context) { c.Process() case trade := <-c.tradeC: - c.processTrade(trade) + c.processTrade(c.convertTrade(trade)) + } } }