From 1ad10a93605c3e0cccaa62c940be763e16291af1 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 5 Jul 2023 15:26:36 +0800 Subject: [PATCH] all: move trade collector to pkg/core --- .../mocks/mock_order_executor_extended.go | 15 +- pkg/bbgo/order_execution.go | 3 +- pkg/bbgo/order_executor_general.go | 6 +- pkg/bbgo/tradecollector.go | 237 ------------------ pkg/bbgo/tradecollector_test.go | 65 ----- pkg/cmd/backtest.go | 4 +- pkg/core/tradecollector.go | 237 ++++++++++++++++++ .../tradecollector_callbacks.go | 2 +- pkg/core/tradecollector_test.go | 65 +++++ pkg/risk/riskcontrol/position_test.go | 3 +- pkg/strategy/fmaker/strategy.go | 4 +- pkg/strategy/grid/strategy.go | 4 +- pkg/strategy/wall/strategy.go | 4 +- pkg/strategy/xmaker/strategy.go | 4 +- 14 files changed, 328 insertions(+), 325 deletions(-) create mode 100644 pkg/core/tradecollector.go rename pkg/{bbgo => core}/tradecollector_callbacks.go (99%) create mode 100644 pkg/core/tradecollector_test.go diff --git a/pkg/bbgo/mocks/mock_order_executor_extended.go b/pkg/bbgo/mocks/mock_order_executor_extended.go index 4244fe969..32e5cb63b 100644 --- a/pkg/bbgo/mocks/mock_order_executor_extended.go +++ b/pkg/bbgo/mocks/mock_order_executor_extended.go @@ -5,12 +5,13 @@ package mocks import ( - context "context" - reflect "reflect" + "context" + "reflect" - bbgo "github.com/c9s/bbgo/pkg/bbgo" - types "github.com/c9s/bbgo/pkg/types" - gomock "github.com/golang/mock/gomock" + "github.com/golang/mock/gomock" + + "github.com/c9s/bbgo/pkg/core" + "github.com/c9s/bbgo/pkg/types" ) // MockOrderExecutorExtended is a mock of OrderExecutorExtended interface. @@ -90,10 +91,10 @@ func (mr *MockOrderExecutorExtendedMockRecorder) SubmitOrders(arg0 interface{}, } // TradeCollector mocks base method. -func (m *MockOrderExecutorExtended) TradeCollector() *bbgo.TradeCollector { +func (m *MockOrderExecutorExtended) TradeCollector() *core.TradeCollector { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "TradeCollector") - ret0, _ := ret[0].(*bbgo.TradeCollector) + ret0, _ := ret[0].(*core.TradeCollector) return ret0 } diff --git a/pkg/bbgo/order_execution.go b/pkg/bbgo/order_execution.go index 29cbf8ab0..fed6ae51e 100644 --- a/pkg/bbgo/order_execution.go +++ b/pkg/bbgo/order_execution.go @@ -10,6 +10,7 @@ import ( log "github.com/sirupsen/logrus" "go.uber.org/multierr" + "github.com/c9s/bbgo/pkg/core" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" @@ -32,7 +33,7 @@ type OrderExecutor interface { type OrderExecutorExtended interface { SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) CancelOrders(ctx context.Context, orders ...types.Order) error - TradeCollector() *TradeCollector + TradeCollector() *core.TradeCollector Position() *types.Position } diff --git a/pkg/bbgo/order_executor_general.go b/pkg/bbgo/order_executor_general.go index fc27fea3a..cee2aa573 100644 --- a/pkg/bbgo/order_executor_general.go +++ b/pkg/bbgo/order_executor_general.go @@ -35,7 +35,7 @@ type GeneralOrderExecutor struct { position *types.Position activeMakerOrders *ActiveOrderBook orderStore *core.OrderStore - tradeCollector *TradeCollector + tradeCollector *core.TradeCollector logger log.FieldLogger @@ -60,7 +60,7 @@ func NewGeneralOrderExecutor(session *ExchangeSession, symbol, strategy, strateg position: position, activeMakerOrders: NewActiveOrderBook(symbol), orderStore: orderStore, - tradeCollector: NewTradeCollector(symbol, position, orderStore), + tradeCollector: core.NewTradeCollector(symbol, position, orderStore), } if session != nil && session.Margin { @@ -517,7 +517,7 @@ func (e *GeneralOrderExecutor) ClosePosition(ctx context.Context, percentage fix return nil } -func (e *GeneralOrderExecutor) TradeCollector() *TradeCollector { +func (e *GeneralOrderExecutor) TradeCollector() *core.TradeCollector { return e.tradeCollector } diff --git a/pkg/bbgo/tradecollector.go b/pkg/bbgo/tradecollector.go index 1f0bcea91..f30d11b65 100644 --- a/pkg/bbgo/tradecollector.go +++ b/pkg/bbgo/tradecollector.go @@ -1,238 +1 @@ package bbgo - -import ( - "context" - "sync" - "time" - - log "github.com/sirupsen/logrus" - - "github.com/c9s/bbgo/pkg/core" - "github.com/c9s/bbgo/pkg/fixedpoint" - "github.com/c9s/bbgo/pkg/sigchan" - "github.com/c9s/bbgo/pkg/types" -) - -//go:generate callbackgen -type TradeCollector -type TradeCollector struct { - Symbol string - orderSig sigchan.Chan - - tradeStore *core.TradeStore - tradeC chan types.Trade - position *types.Position - orderStore *core.OrderStore - doneTrades map[types.TradeKey]struct{} - - mu sync.Mutex - - recoverCallbacks []func(trade types.Trade) - - tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value) - - positionUpdateCallbacks []func(position *types.Position) - profitCallbacks []func(trade types.Trade, profit *types.Profit) -} - -func NewTradeCollector(symbol string, position *types.Position, orderStore *core.OrderStore) *TradeCollector { - return &TradeCollector{ - Symbol: symbol, - orderSig: sigchan.New(1), - - tradeC: make(chan types.Trade, 100), - tradeStore: core.NewTradeStore(), - doneTrades: make(map[types.TradeKey]struct{}), - position: position, - orderStore: orderStore, - } -} - -// OrderStore returns the order store used by the trade collector -func (c *TradeCollector) OrderStore() *core.OrderStore { - return c.orderStore -} - -// Position returns the position used by the trade collector -func (c *TradeCollector) Position() *types.Position { - return c.position -} - -func (c *TradeCollector) TradeStore() *core.TradeStore { - return c.tradeStore -} - -func (c *TradeCollector) SetPosition(position *types.Position) { - c.position = position -} - -// QueueTrade sends the trade object to the trade channel, -// so that the goroutine can receive the trade and process in the background. -func (c *TradeCollector) QueueTrade(trade types.Trade) { - c.tradeC <- trade -} - -// BindStreamForBackground bind the stream callback for background processing -func (c *TradeCollector) BindStreamForBackground(stream types.Stream) { - stream.OnTradeUpdate(c.QueueTrade) -} - -func (c *TradeCollector) BindStream(stream types.Stream) { - stream.OnTradeUpdate(func(trade types.Trade) { - c.ProcessTrade(trade) - }) -} - -// Emit triggers the trade processing (position update) -// If you sent order, and the order store is updated, you can call this method -// so that trades will be processed in the next round of the goroutine loop -func (c *TradeCollector) Emit() { - c.orderSig.Emit() -} - -func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time) error { - trades, err := ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ - StartTime: &from, - }) - - if err != nil { - return err - } - - for _, td := range trades { - log.Debugf("processing trade: %s", td.String()) - if c.ProcessTrade(td) { - log.Infof("recovered trade: %s", td.String()) - c.EmitRecover(td) - } - } - return nil -} - -func (c *TradeCollector) setDone(key types.TradeKey) { - c.mu.Lock() - c.doneTrades[key] = struct{}{} - c.mu.Unlock() -} - -// Process filters the received trades and see if there are orders matching the trades -// if we have the order in the order store, then the trade will be considered for the position. -// profit will also be calculated. -func (c *TradeCollector) Process() bool { - positionChanged := false - - c.tradeStore.Filter(func(trade types.Trade) bool { - key := trade.Key() - - c.mu.Lock() - defer c.mu.Unlock() - - // if it's already done, remove the trade from the trade store - if _, done := c.doneTrades[key]; done { - return true - } - - if c.orderStore.Exists(trade.OrderID) { - if c.position != nil { - profit, netProfit, madeProfit := c.position.AddTrade(trade) - if madeProfit { - p := c.position.NewProfit(trade, profit, netProfit) - c.EmitTrade(trade, profit, netProfit) - c.EmitProfit(trade, &p) - } else { - c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) - c.EmitProfit(trade, nil) - } - positionChanged = true - } else { - c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) - } - - c.doneTrades[key] = struct{}{} - return true - } - return false - }) - - if positionChanged && c.position != nil { - c.EmitPositionUpdate(c.position) - } - - return positionChanged -} - -// processTrade takes a trade and see if there is a matched order -// if the order is found, then we add the trade to the position -// return true when the given trade is added -// return false when the given trade is not added -func (c *TradeCollector) processTrade(trade types.Trade) bool { - c.mu.Lock() - defer c.mu.Unlock() - - key := trade.Key() - - // if it's already done, remove the trade from the trade store - if _, done := c.doneTrades[key]; done { - return false - } - - if c.orderStore.Exists(trade.OrderID) { - if c.position != nil { - profit, netProfit, madeProfit := c.position.AddTrade(trade) - if madeProfit { - p := c.position.NewProfit(trade, profit, netProfit) - c.EmitTrade(trade, profit, netProfit) - c.EmitProfit(trade, &p) - } else { - c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) - c.EmitProfit(trade, nil) - } - c.EmitPositionUpdate(c.position) - } else { - c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) - } - - c.doneTrades[key] = struct{}{} - return true - } - return false -} - -// return true when the given trade is added -// return false when the given trade is not added -func (c *TradeCollector) ProcessTrade(trade types.Trade) bool { - key := trade.Key() - // if it's already done, remove the trade from the trade store - c.mu.Lock() - if _, done := c.doneTrades[key]; done { - return false - } - c.mu.Unlock() - - if c.processTrade(trade) { - return true - } - - c.tradeStore.Add(trade) - return false -} - -// Run is a goroutine executed in the background -// Do not use this function if you need back-testing -func (c *TradeCollector) Run(ctx context.Context) { - var ticker = time.NewTicker(3 * time.Second) - for { - select { - case <-ctx.Done(): - return - - case <-ticker.C: - c.Process() - - case <-c.orderSig: - c.Process() - - case trade := <-c.tradeC: - c.ProcessTrade(trade) - } - } -} diff --git a/pkg/bbgo/tradecollector_test.go b/pkg/bbgo/tradecollector_test.go index f627db035..f30d11b65 100644 --- a/pkg/bbgo/tradecollector_test.go +++ b/pkg/bbgo/tradecollector_test.go @@ -1,66 +1 @@ package bbgo - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/c9s/bbgo/pkg/core" - "github.com/c9s/bbgo/pkg/fixedpoint" - "github.com/c9s/bbgo/pkg/types" -) - -func TestTradeCollector_ShouldNotCountDuplicatedTrade(t *testing.T) { - symbol := "BTCUSDT" - position := types.NewPosition(symbol, "BTC", "USDT") - orderStore := core.NewOrderStore(symbol) - collector := NewTradeCollector(symbol, position, orderStore) - assert.NotNil(t, collector) - - matched := collector.ProcessTrade(types.Trade{ - ID: 1, - OrderID: 399, - Exchange: types.ExchangeBinance, - Price: fixedpoint.NewFromInt(40000), - Quantity: fixedpoint.One, - QuoteQuantity: fixedpoint.NewFromInt(40000), - Symbol: "BTCUSDT", - Side: types.SideTypeBuy, - IsBuyer: true, - }) - assert.False(t, matched, "should be added to the trade store") - assert.Equal(t, 1, len(collector.tradeStore.Trades()), "should have one trade in the trade store") - - orderStore.Add(types.Order{ - SubmitOrder: types.SubmitOrder{ - Symbol: "BTCUSDT", - Side: types.SideTypeBuy, - Type: types.OrderTypeLimit, - Quantity: fixedpoint.One, - Price: fixedpoint.NewFromInt(40000), - }, - Exchange: types.ExchangeBinance, - OrderID: 399, - Status: types.OrderStatusFilled, - ExecutedQuantity: fixedpoint.One, - IsWorking: false, - }) - - matched = collector.Process() - assert.True(t, matched) - assert.Equal(t, 0, len(collector.tradeStore.Trades()), "the found trade should be removed from the trade store") - - matched = collector.ProcessTrade(types.Trade{ - ID: 1, - OrderID: 399, - Exchange: types.ExchangeBinance, - Price: fixedpoint.NewFromInt(40000), - Quantity: fixedpoint.One, - QuoteQuantity: fixedpoint.NewFromInt(40000), - Symbol: "BTCUSDT", - Side: types.SideTypeBuy, - IsBuyer: true, - }) - assert.False(t, matched, "the same trade should not match") - assert.Equal(t, 0, len(collector.tradeStore.Trades()), "the same trade should not be added to the trade store") -} diff --git a/pkg/cmd/backtest.go b/pkg/cmd/backtest.go index 587cc1f8f..025bec020 100644 --- a/pkg/cmd/backtest.go +++ b/pkg/cmd/backtest.go @@ -308,7 +308,7 @@ var BacktestCmd = &cobra.Command{ var reportDir = outputDirectory var sessionTradeStats = make(map[string]map[string]*types.TradeStats) - var tradeCollectorList []*bbgo.TradeCollector + var tradeCollectorList []*core.TradeCollector for _, exSource := range exchangeSources { sessionName := exSource.Session.Name tradeStatsMap := make(map[string]*types.TradeStats) @@ -317,7 +317,7 @@ var BacktestCmd = &cobra.Command{ position := types.NewPositionFromMarket(market) orderStore := core.NewOrderStore(usedSymbol) orderStore.AddOrderUpdate = true - tradeCollector := bbgo.NewTradeCollector(usedSymbol, position, orderStore) + tradeCollector := core.NewTradeCollector(usedSymbol, position, orderStore) tradeStats := types.NewTradeStats(usedSymbol) tradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, startTime)) diff --git a/pkg/core/tradecollector.go b/pkg/core/tradecollector.go new file mode 100644 index 000000000..e981fff56 --- /dev/null +++ b/pkg/core/tradecollector.go @@ -0,0 +1,237 @@ +package core + +import ( + "context" + "sync" + "time" + + "github.com/sirupsen/logrus" + + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/sigchan" + "github.com/c9s/bbgo/pkg/types" +) + +//go:generate callbackgen -type TradeCollector +type TradeCollector struct { + Symbol string + orderSig sigchan.Chan + + tradeStore *TradeStore + tradeC chan types.Trade + position *types.Position + orderStore *OrderStore + doneTrades map[types.TradeKey]struct{} + + mu sync.Mutex + + recoverCallbacks []func(trade types.Trade) + + tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value) + + positionUpdateCallbacks []func(position *types.Position) + profitCallbacks []func(trade types.Trade, profit *types.Profit) +} + +func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector { + return &TradeCollector{ + Symbol: symbol, + orderSig: sigchan.New(1), + + tradeC: make(chan types.Trade, 100), + tradeStore: NewTradeStore(), + doneTrades: make(map[types.TradeKey]struct{}), + position: position, + orderStore: orderStore, + } +} + +// OrderStore returns the order store used by the trade collector +func (c *TradeCollector) OrderStore() *OrderStore { + return c.orderStore +} + +// Position returns the position used by the trade collector +func (c *TradeCollector) Position() *types.Position { + return c.position +} + +func (c *TradeCollector) TradeStore() *TradeStore { + return c.tradeStore +} + +func (c *TradeCollector) SetPosition(position *types.Position) { + c.position = position +} + +// QueueTrade sends the trade object to the trade channel, +// so that the goroutine can receive the trade and process in the background. +func (c *TradeCollector) QueueTrade(trade types.Trade) { + c.tradeC <- trade +} + +// BindStreamForBackground bind the stream callback for background processing +func (c *TradeCollector) BindStreamForBackground(stream types.Stream) { + stream.OnTradeUpdate(c.QueueTrade) +} + +func (c *TradeCollector) BindStream(stream types.Stream) { + stream.OnTradeUpdate(func(trade types.Trade) { + c.ProcessTrade(trade) + }) +} + +// Emit triggers the trade processing (position update) +// If you sent order, and the order store is updated, you can call this method +// so that trades will be processed in the next round of the goroutine loop +func (c *TradeCollector) Emit() { + c.orderSig.Emit() +} + +func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time) error { + trades, err := ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ + StartTime: &from, + }) + + if err != nil { + return err + } + + for _, td := range trades { + logrus.Debugf("processing trade: %s", td.String()) + if c.ProcessTrade(td) { + logrus.Infof("recovered trade: %s", td.String()) + c.EmitRecover(td) + } + } + return nil +} + +func (c *TradeCollector) setDone(key types.TradeKey) { + c.mu.Lock() + c.doneTrades[key] = struct{}{} + c.mu.Unlock() +} + +// Process filters the received trades and see if there are orders matching the trades +// if we have the order in the order store, then the trade will be considered for the position. +// profit will also be calculated. +func (c *TradeCollector) Process() bool { + positionChanged := false + + c.tradeStore.Filter(func(trade types.Trade) bool { + key := trade.Key() + + c.mu.Lock() + defer c.mu.Unlock() + + // if it's already done, remove the trade from the trade store + if _, done := c.doneTrades[key]; done { + return true + } + + if c.orderStore.Exists(trade.OrderID) { + if c.position != nil { + profit, netProfit, madeProfit := c.position.AddTrade(trade) + if madeProfit { + p := c.position.NewProfit(trade, profit, netProfit) + c.EmitTrade(trade, profit, netProfit) + c.EmitProfit(trade, &p) + } else { + c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) + c.EmitProfit(trade, nil) + } + positionChanged = true + } else { + c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) + } + + c.doneTrades[key] = struct{}{} + return true + } + return false + }) + + if positionChanged && c.position != nil { + c.EmitPositionUpdate(c.position) + } + + return positionChanged +} + +// processTrade takes a trade and see if there is a matched order +// if the order is found, then we add the trade to the position +// return true when the given trade is added +// return false when the given trade is not added +func (c *TradeCollector) processTrade(trade types.Trade) bool { + c.mu.Lock() + defer c.mu.Unlock() + + key := trade.Key() + + // if it's already done, remove the trade from the trade store + if _, done := c.doneTrades[key]; done { + return false + } + + if c.orderStore.Exists(trade.OrderID) { + if c.position != nil { + profit, netProfit, madeProfit := c.position.AddTrade(trade) + if madeProfit { + p := c.position.NewProfit(trade, profit, netProfit) + c.EmitTrade(trade, profit, netProfit) + c.EmitProfit(trade, &p) + } else { + c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) + c.EmitProfit(trade, nil) + } + c.EmitPositionUpdate(c.position) + } else { + c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) + } + + c.doneTrades[key] = struct{}{} + return true + } + return false +} + +// return true when the given trade is added +// return false when the given trade is not added +func (c *TradeCollector) ProcessTrade(trade types.Trade) bool { + key := trade.Key() + // if it's already done, remove the trade from the trade store + c.mu.Lock() + if _, done := c.doneTrades[key]; done { + return false + } + c.mu.Unlock() + + if c.processTrade(trade) { + return true + } + + c.tradeStore.Add(trade) + return false +} + +// Run is a goroutine executed in the background +// Do not use this function if you need back-testing +func (c *TradeCollector) Run(ctx context.Context) { + var ticker = time.NewTicker(3 * time.Second) + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + c.Process() + + case <-c.orderSig: + c.Process() + + case trade := <-c.tradeC: + c.ProcessTrade(trade) + } + } +} diff --git a/pkg/bbgo/tradecollector_callbacks.go b/pkg/core/tradecollector_callbacks.go similarity index 99% rename from pkg/bbgo/tradecollector_callbacks.go rename to pkg/core/tradecollector_callbacks.go index 44756224f..e44c0f509 100644 --- a/pkg/bbgo/tradecollector_callbacks.go +++ b/pkg/core/tradecollector_callbacks.go @@ -1,6 +1,6 @@ // Code generated by "callbackgen -type TradeCollector"; DO NOT EDIT. -package bbgo +package core import ( "github.com/c9s/bbgo/pkg/fixedpoint" diff --git a/pkg/core/tradecollector_test.go b/pkg/core/tradecollector_test.go new file mode 100644 index 000000000..ec29a2d63 --- /dev/null +++ b/pkg/core/tradecollector_test.go @@ -0,0 +1,65 @@ +package core + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" +) + +func TestTradeCollector_ShouldNotCountDuplicatedTrade(t *testing.T) { + symbol := "BTCUSDT" + position := types.NewPosition(symbol, "BTC", "USDT") + orderStore := NewOrderStore(symbol) + collector := NewTradeCollector(symbol, position, orderStore) + assert.NotNil(t, collector) + + matched := collector.ProcessTrade(types.Trade{ + ID: 1, + OrderID: 399, + Exchange: types.ExchangeBinance, + Price: fixedpoint.NewFromInt(40000), + Quantity: fixedpoint.One, + QuoteQuantity: fixedpoint.NewFromInt(40000), + Symbol: "BTCUSDT", + Side: types.SideTypeBuy, + IsBuyer: true, + }) + assert.False(t, matched, "should be added to the trade store") + assert.Equal(t, 1, len(collector.tradeStore.Trades()), "should have one trade in the trade store") + + orderStore.Add(types.Order{ + SubmitOrder: types.SubmitOrder{ + Symbol: "BTCUSDT", + Side: types.SideTypeBuy, + Type: types.OrderTypeLimit, + Quantity: fixedpoint.One, + Price: fixedpoint.NewFromInt(40000), + }, + Exchange: types.ExchangeBinance, + OrderID: 399, + Status: types.OrderStatusFilled, + ExecutedQuantity: fixedpoint.One, + IsWorking: false, + }) + + matched = collector.Process() + assert.True(t, matched) + assert.Equal(t, 0, len(collector.tradeStore.Trades()), "the found trade should be removed from the trade store") + + matched = collector.ProcessTrade(types.Trade{ + ID: 1, + OrderID: 399, + Exchange: types.ExchangeBinance, + Price: fixedpoint.NewFromInt(40000), + Quantity: fixedpoint.One, + QuoteQuantity: fixedpoint.NewFromInt(40000), + Symbol: "BTCUSDT", + Side: types.SideTypeBuy, + IsBuyer: true, + }) + assert.False(t, matched, "the same trade should not match") + assert.Equal(t, 0, len(collector.tradeStore.Trades()), "the same trade should not be added to the trade store") +} diff --git a/pkg/risk/riskcontrol/position_test.go b/pkg/risk/riskcontrol/position_test.go index 21bb7f2ea..d1fc10652 100644 --- a/pkg/risk/riskcontrol/position_test.go +++ b/pkg/risk/riskcontrol/position_test.go @@ -8,6 +8,7 @@ import ( "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo/mocks" + "github.com/c9s/bbgo/pkg/core" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" ) @@ -93,7 +94,7 @@ func TestReleasePositionCallbacks(t *testing.T) { }, } - tradeCollector := &bbgo.TradeCollector{} + tradeCollector := &core.TradeCollector{} mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() orderExecutor := mocks.NewMockOrderExecutorExtended(mockCtrl) diff --git a/pkg/strategy/fmaker/strategy.go b/pkg/strategy/fmaker/strategy.go index 4578857d3..b62d790cd 100644 --- a/pkg/strategy/fmaker/strategy.go +++ b/pkg/strategy/fmaker/strategy.go @@ -49,7 +49,7 @@ type Strategy struct { // closePositionOrders *bbgo.LocalActiveOrderBook orderStore *core.OrderStore - tradeCollector *bbgo.TradeCollector + tradeCollector *core.TradeCollector session *bbgo.ExchangeSession @@ -174,7 +174,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.Position.Strategy = ID s.Position.StrategyInstanceID = instanceID - s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore) + s.tradeCollector = core.NewTradeCollector(s.Symbol, s.Position, s.orderStore) s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { // StrategyController if s.Status != types.StrategyStatusRunning { diff --git a/pkg/strategy/grid/strategy.go b/pkg/strategy/grid/strategy.go index 66cef8610..a64642ce5 100644 --- a/pkg/strategy/grid/strategy.go +++ b/pkg/strategy/grid/strategy.go @@ -95,7 +95,7 @@ type Strategy struct { // activeOrders is the locally maintained active order book of the maker orders. activeOrders *bbgo.ActiveOrderBook - tradeCollector *bbgo.TradeCollector + tradeCollector *core.TradeCollector // groupID is the group ID used for the strategy instance for canceling orders groupID uint32 @@ -571,7 +571,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.activeOrders.OnFilled(s.handleFilledOrder) s.activeOrders.BindStream(session.UserDataStream) - s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.State.Position, s.orderStore) + s.tradeCollector = core.NewTradeCollector(s.Symbol, s.State.Position, s.orderStore) s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { bbgo.Notify(trade) diff --git a/pkg/strategy/wall/strategy.go b/pkg/strategy/wall/strategy.go index 1b3318197..5cbb4294f 100644 --- a/pkg/strategy/wall/strategy.go +++ b/pkg/strategy/wall/strategy.go @@ -67,7 +67,7 @@ type Strategy struct { activeAdjustmentOrders *bbgo.ActiveOrderBook activeWallOrders *bbgo.ActiveOrderBook orderStore *core.OrderStore - tradeCollector *bbgo.TradeCollector + tradeCollector *core.TradeCollector groupID uint32 @@ -277,7 +277,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.orderStore = core.NewOrderStore(s.Symbol) s.orderStore.BindStream(session.UserDataStream) - s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore) + s.tradeCollector = core.NewTradeCollector(s.Symbol, s.Position, s.orderStore) s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { bbgo.Notify(trade) diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index faff3615b..2a2db7230 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -105,7 +105,7 @@ type Strategy struct { hedgeErrorRateReservation *rate.Reservation orderStore *core.OrderStore - tradeCollector *bbgo.TradeCollector + tradeCollector *core.TradeCollector askPriceHeartBeat, bidPriceHeartBeat types.PriceHeartBeat @@ -737,7 +737,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order s.orderStore.BindStream(s.sourceSession.UserDataStream) s.orderStore.BindStream(s.makerSession.UserDataStream) - s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore) + s.tradeCollector = core.NewTradeCollector(s.Symbol, s.Position, s.orderStore) if s.NotifyTrade { s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {