core: add ConverterManager

This commit is contained in:
c9s 2024-08-08 17:00:45 +08:00
parent f228ca7962
commit f277b191d2
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
2 changed files with 88 additions and 31 deletions

43
pkg/core/converter.go Normal file
View File

@ -0,0 +1,43 @@
package core
import "github.com/c9s/bbgo/pkg/types"
type Converter interface {
OrderConverter
TradeConverter
}
// OrderConverter converts the order to another order
type OrderConverter interface {
ConvertOrder(order types.Order) (types.Order, error)
}
// TradeConverter converts the trade to another trade
type TradeConverter interface {
ConvertTrade(trade types.Trade) (types.Trade, error)
}
// SymbolConverter converts the symbol to another symbol
type SymbolConverter struct {
fromSymbol, toSymbol string
}
func NewSymbolConverter(fromSymbol, toSymbol string) *SymbolConverter {
return &SymbolConverter{fromSymbol: fromSymbol, toSymbol: toSymbol}
}
func (c *SymbolConverter) ConvertOrder(order types.Order) (types.Order, error) {
if order.Symbol == c.fromSymbol {
order.Symbol = c.toSymbol
}
return order, nil
}
func (c *SymbolConverter) ConvertTrade(trade types.Trade) (types.Trade, error) {
if trade.Symbol == c.fromSymbol {
trade.Symbol = c.toSymbol
}
return trade, nil
}

View File

@ -12,12 +12,48 @@ import (
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
type OrderConverter interface { type ConverterManager struct {
ConvertOrder(order types.Order) (types.Order, error) converters []Converter
} }
type TradeConverter interface { func (c *ConverterManager) AddConverter(converter Converter) {
ConvertTrade(trade types.Trade) (types.Trade, error) c.converters = append(c.converters, converter)
}
func (c *ConverterManager) ConvertOrder(order types.Order) types.Order {
if len(c.converters) == 0 {
return order
}
for _, converter := range c.converters {
convOrder, err := converter.ConvertOrder(order)
if err != nil {
logrus.WithError(err).Errorf("converter %+v error, order: %s", converter, order.String())
continue
}
order = convOrder
}
return order
}
func (c *ConverterManager) ConvertTrade(trade types.Trade) types.Trade {
if len(c.converters) == 0 {
return trade
}
for _, converter := range c.converters {
convTrade, err := converter.ConvertTrade(trade)
if err != nil {
logrus.WithError(err).Errorf("converter %+v error, trade: %s", converter, trade.String())
continue
}
trade = convTrade
}
return trade
} }
//go:generate callbackgen -type TradeCollector //go:generate callbackgen -type TradeCollector
@ -33,14 +69,14 @@ type TradeCollector struct {
mu sync.Mutex mu sync.Mutex
tradeConverters []TradeConverter
recoverCallbacks []func(trade types.Trade) recoverCallbacks []func(trade types.Trade)
tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value) tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value)
positionUpdateCallbacks []func(position *types.Position) positionUpdateCallbacks []func(position *types.Position)
profitCallbacks []func(trade types.Trade, profit *types.Profit) profitCallbacks []func(trade types.Trade, profit *types.Profit)
ConverterManager
} }
func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector { func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector {
@ -59,28 +95,6 @@ func NewTradeCollector(symbol string, position *types.Position, orderStore *Orde
} }
} }
func (c *TradeCollector) AddTradeConverter(converter TradeConverter) {
c.tradeConverters = append(c.tradeConverters, converter)
}
func (c *TradeCollector) convertTrade(trade types.Trade) types.Trade {
if len(c.tradeConverters) == 0 {
return trade
}
for _, converter := range c.tradeConverters {
convTrade, err := converter.ConvertTrade(trade)
if err != nil {
logrus.WithError(err).Errorf("trade %+v converter error, trade: %s", converter, trade.String())
continue
}
trade = convTrade
}
return trade
}
// OrderStore returns the order store used by the trade collector // OrderStore returns the order store used by the trade collector
func (c *TradeCollector) OrderStore() *OrderStore { func (c *TradeCollector) OrderStore() *OrderStore {
return c.orderStore return c.orderStore
@ -148,7 +162,7 @@ func (c *TradeCollector) Recover(
} }
func (c *TradeCollector) RecoverTrade(td types.Trade) bool { func (c *TradeCollector) RecoverTrade(td types.Trade) bool {
td = c.convertTrade(td) td = c.ConvertTrade(td)
logrus.Debugf("checking trade: %s", td.String()) logrus.Debugf("checking trade: %s", td.String())
if c.processTrade(td) { if c.processTrade(td) {
@ -264,7 +278,7 @@ func (c *TradeCollector) processTrade(trade types.Trade) bool {
// return true when the given trade is added // return true when the given trade is added
// return false when the given trade is not added // return false when the given trade is not added
func (c *TradeCollector) ProcessTrade(trade types.Trade) bool { func (c *TradeCollector) ProcessTrade(trade types.Trade) bool {
return c.processTrade(c.convertTrade(trade)) return c.processTrade(c.ConvertTrade(trade))
} }
// Run is a goroutine executed in the background // Run is a goroutine executed in the background
@ -283,7 +297,7 @@ func (c *TradeCollector) Run(ctx context.Context) {
c.Process() c.Process()
case trade := <-c.tradeC: case trade := <-c.tradeC:
c.processTrade(c.convertTrade(trade)) c.processTrade(c.ConvertTrade(trade))
} }
} }