bbgo_origin/pkg/core/tradecollector.go

349 lines
8.2 KiB
Go
Raw Normal View History

2023-07-05 07:26:36 +00:00
package core
import (
"context"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/sigchan"
"github.com/c9s/bbgo/pkg/types"
)
2024-08-12 07:56:24 +00:00
type ConverterSetting struct {
SymbolConverter *SymbolConverter `json:"symbolConverter" yaml:"symbolConverter"`
}
func (s *ConverterSetting) getConverter() Converter {
if s.SymbolConverter != nil {
return s.SymbolConverter
}
return nil
}
func (s *ConverterSetting) InitializeConverter() (Converter, error) {
converter := s.getConverter()
if converter == nil {
return nil, nil
}
logrus.Infof("initializing converter %T ...", converter)
err := converter.Initialize()
return converter, err
2024-08-12 07:56:24 +00:00
}
2024-08-16 12:57:28 +00:00
// ConverterManager manages the converters for trade conversion
// It can be used to convert the trade symbol into the target symbol, or convert the price, volume into different units.
2024-08-08 09:00:45 +00:00
type ConverterManager struct {
2024-08-12 07:56:24 +00:00
ConverterSettings []ConverterSetting `json:"converters,omitempty" yaml:"converters,omitempty"`
converters []Converter
2024-08-08 09:18:17 +00:00
}
func (c *ConverterManager) Initialize() error {
2024-08-12 07:56:24 +00:00
for _, setting := range c.ConverterSettings {
converter, err := setting.InitializeConverter()
if err != nil {
return err
}
if converter != nil {
c.AddConverter(converter)
}
2024-08-08 09:18:17 +00:00
}
2024-08-16 12:57:28 +00:00
numConverters := len(c.converters)
logrus.Infof("%d converters loaded", numConverters)
2024-08-08 09:18:17 +00:00
return nil
2024-08-07 09:44:42 +00:00
}
2024-08-08 09:00:45 +00:00
func (c *ConverterManager) AddConverter(converter Converter) {
2024-08-12 07:56:24 +00:00
c.converters = append(c.converters, converter)
2024-08-08 09:00:45 +00:00
}
func (c *ConverterManager) ConvertOrder(order types.Order) types.Order {
2024-08-12 07:56:24 +00:00
if len(c.converters) == 0 {
2024-08-08 09:00:45 +00:00
return order
}
2024-08-12 07:56:24 +00:00
for _, converter := range c.converters {
2024-08-08 09:00:45 +00:00
convOrder, err := converter.ConvertOrder(order)
if err != nil {
logrus.WithError(err).Errorf("converter %+v error, order: %s", converter, order.String())
continue
}
order = convOrder
}
return order
}
func (c *ConverterManager) ConvertTrade(trade types.Trade) types.Trade {
2024-08-12 07:56:24 +00:00
if len(c.converters) == 0 {
2024-08-08 09:00:45 +00:00
return trade
}
2024-08-12 07:56:24 +00:00
for _, converter := range c.converters {
2024-08-08 09:00:45 +00:00
convTrade, err := converter.ConvertTrade(trade)
if err != nil {
logrus.WithError(err).Errorf("converter %+v error, trade: %s", converter, trade.String())
continue
}
trade = convTrade
}
return trade
}
2023-07-05 07:26:36 +00:00
//go:generate callbackgen -type TradeCollector
type TradeCollector struct {
Symbol string
orderSig sigchan.Chan
tradeStore *TradeStore
tradeC chan types.Trade
position *types.Position
orderStore *OrderStore
doneTrades map[types.TradeKey]struct{}
mu sync.Mutex
recoverCallbacks []func(trade types.Trade)
tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value)
positionUpdateCallbacks []func(position *types.Position)
profitCallbacks []func(trade types.Trade, profit *types.Profit)
2024-08-08 09:00:45 +00:00
ConverterManager
2023-07-05 07:26:36 +00:00
}
func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector {
tradeStore := NewTradeStore()
2024-10-17 04:53:51 +00:00
tradeStore.pruneEnabled = true
2023-07-05 07:26:36 +00:00
return &TradeCollector{
Symbol: symbol,
orderSig: sigchan.New(1),
tradeC: make(chan types.Trade, 100),
tradeStore: tradeStore,
2023-07-05 07:26:36 +00:00
doneTrades: make(map[types.TradeKey]struct{}),
position: position,
orderStore: orderStore,
}
}
2023-07-05 07:26:36 +00:00
// OrderStore returns the order store used by the trade collector
func (c *TradeCollector) OrderStore() *OrderStore {
return c.orderStore
}
// Position returns the position used by the trade collector
func (c *TradeCollector) Position() *types.Position {
return c.position
}
func (c *TradeCollector) TradeStore() *TradeStore {
return c.tradeStore
}
func (c *TradeCollector) SetPosition(position *types.Position) {
c.position = position
}
// QueueTrade sends the trade object to the trade channel,
// so that the goroutine can receive the trade and process in the background.
func (c *TradeCollector) QueueTrade(trade types.Trade) {
c.tradeC <- trade
}
// BindStreamForBackground bind the stream callback for background processing
func (c *TradeCollector) BindStreamForBackground(stream types.Stream) {
stream.OnTradeUpdate(c.QueueTrade)
}
func (c *TradeCollector) BindStream(stream types.Stream) {
stream.OnTradeUpdate(func(trade types.Trade) {
c.ProcessTrade(trade)
2023-07-05 07:26:36 +00:00
})
}
// Emit triggers the trade processing (position update)
// If you sent order, and the order store is updated, you can call this method
// so that trades will be processed in the next round of the goroutine loop
func (c *TradeCollector) Emit() {
c.orderSig.Emit()
}
func (c *TradeCollector) Recover(
ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time,
) error {
logrus.Debugf("recovering %s trades...", symbol)
2023-07-05 07:26:36 +00:00
trades, err := ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &from,
})
if err != nil {
return err
}
cnt := 0
2023-07-05 07:26:36 +00:00
for _, td := range trades {
2023-07-22 09:57:02 +00:00
if c.RecoverTrade(td) {
cnt++
2023-07-05 07:26:36 +00:00
}
}
logrus.Infof("%d %s trades were recovered", cnt, symbol)
2023-07-05 07:26:36 +00:00
return nil
}
2023-07-22 09:57:02 +00:00
func (c *TradeCollector) RecoverTrade(td types.Trade) bool {
2024-08-08 09:00:45 +00:00
td = c.ConvertTrade(td)
2023-07-22 09:57:02 +00:00
logrus.Debugf("checking trade: %s", td.String())
if c.processTrade(td) {
logrus.Infof("recovered trade: %s", td.String())
c.EmitRecover(td)
return true
}
// add to the trade store, and then we can recover it when an order is matched
c.tradeStore.Add(td)
return false
}
2023-07-05 07:26:36 +00:00
// Process filters the received trades and see if there are orders matching the trades
// if we have the order in the order store, then the trade will be considered for the position.
// profit will also be calculated.
func (c *TradeCollector) Process() bool {
positionChanged := false
var trades []types.Trade
// if it's already done, remove the trade from the trade store
c.mu.Lock()
2023-07-05 07:26:36 +00:00
c.tradeStore.Filter(func(trade types.Trade) bool {
key := trade.Key()
// remove done trades
2023-07-05 07:26:36 +00:00
if _, done := c.doneTrades[key]; done {
return true
}
// if it's the trade we're looking for, add it to the list and mark it as done
if c.orderStore.Exists(trade.OrderID) {
trades = append(trades, trade)
c.doneTrades[key] = struct{}{}
return true
}
return false
})
c.mu.Unlock()
for _, trade := range trades {
var p types.Profit
if c.position != nil {
profit, netProfit, madeProfit := c.position.AddTrade(trade)
if madeProfit {
p = c.position.NewProfit(trade, profit, netProfit)
2023-07-05 07:26:36 +00:00
}
positionChanged = true
2023-07-05 07:26:36 +00:00
c.EmitTrade(trade, profit, netProfit)
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
2023-07-05 07:26:36 +00:00
}
if !p.Profit.IsZero() {
c.EmitProfit(trade, &p)
}
}
2023-07-05 07:26:36 +00:00
if positionChanged && c.position != nil {
c.EmitPositionUpdate(c.position)
}
return positionChanged
}
// processTrade takes a trade and see if there is a matched order
// if the order is found, then we add the trade to the position
// return true when the given trade is added
// return false when the given trade is not added
func (c *TradeCollector) processTrade(trade types.Trade) bool {
key := trade.Key()
2023-08-01 12:11:33 +00:00
c.mu.Lock()
2023-07-05 07:26:36 +00:00
// if it's already done, remove the trade from the trade store
if _, done := c.doneTrades[key]; done {
2023-08-01 12:11:33 +00:00
c.mu.Unlock()
2023-07-05 07:26:36 +00:00
return false
}
if !c.orderStore.Exists(trade.OrderID) {
// not done yet
// add this trade to the trade store for the later processing
c.tradeStore.Add(trade)
2023-08-01 12:11:33 +00:00
c.mu.Unlock()
return false
}
2023-08-01 12:11:33 +00:00
c.doneTrades[key] = struct{}{}
c.mu.Unlock()
if c.position != nil {
profit, netProfit, madeProfit := c.position.AddTrade(trade)
if madeProfit {
p := c.position.NewProfit(trade, profit, netProfit)
c.EmitTrade(trade, profit, netProfit)
c.EmitProfit(trade, &p)
2023-07-05 07:26:36 +00:00
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
c.EmitProfit(trade, nil)
2023-07-05 07:26:36 +00:00
}
c.EmitPositionUpdate(c.position)
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
2023-07-05 07:26:36 +00:00
}
return true
2023-07-05 07:26:36 +00:00
}
// return true when the given trade is added
// return false when the given trade is not added
func (c *TradeCollector) ProcessTrade(trade types.Trade) bool {
2024-08-08 09:00:45 +00:00
return c.processTrade(c.ConvertTrade(trade))
2023-07-05 07:26:36 +00:00
}
// Run is a goroutine executed in the background
// Do not use this function if you need back-testing
func (c *TradeCollector) Run(ctx context.Context) {
var ticker = time.NewTicker(3 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
c.Process()
case <-c.orderSig:
c.Process()
case trade := <-c.tradeC:
2024-08-08 09:00:45 +00:00
c.processTrade(c.ConvertTrade(trade))
2023-07-05 07:26:36 +00:00
}
}
}