diff --git a/pkg/core/tradecollector.go b/pkg/core/tradecollector.go index da3e19445..6069d2c26 100644 --- a/pkg/core/tradecollector.go +++ b/pkg/core/tradecollector.go @@ -12,7 +12,9 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -type TradeConverter func(trade types.Trade) (types.Trade, error) +type TradeConverter interface { + Convert(trade types.Trade) (types.Trade, error) +} //go:generate callbackgen -type TradeCollector type TradeCollector struct { @@ -58,10 +60,14 @@ func (c *TradeCollector) AddTradeConverter(converter TradeConverter) { } func (c *TradeCollector) convertTrade(trade types.Trade) types.Trade { + if len(c.tradeConverters) == 0 { + return trade + } + for _, converter := range c.tradeConverters { - convTrade, err := converter(trade) + convTrade, err := converter.Convert(trade) if err != nil { - logrus.WithError(err).Errorf("trade converter error, trade: %s", trade.String()) + logrus.WithError(err).Errorf("trade %+v converter error, trade: %s", converter, trade.String()) continue } @@ -138,6 +144,8 @@ func (c *TradeCollector) Recover( } func (c *TradeCollector) RecoverTrade(td types.Trade) bool { + td = c.convertTrade(td) + logrus.Debugf("checking trade: %s", td.String()) if c.processTrade(td) { logrus.Infof("recovered trade: %s", td.String()) @@ -252,7 +260,7 @@ func (c *TradeCollector) processTrade(trade types.Trade) bool { // return true when the given trade is added // return false when the given trade is not added func (c *TradeCollector) ProcessTrade(trade types.Trade) bool { - return c.processTrade(trade) + return c.processTrade(c.convertTrade(trade)) } // Run is a goroutine executed in the background @@ -271,7 +279,8 @@ func (c *TradeCollector) Run(ctx context.Context) { c.Process() case trade := <-c.tradeC: - c.processTrade(trade) + c.processTrade(c.convertTrade(trade)) + } } }