mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-25 00:05:15 +00:00
all: move trade collector to pkg/core
This commit is contained in:
parent
ff727ae495
commit
1ad10a9360
|
@ -5,12 +5,13 @@
|
||||||
package mocks
|
package mocks
|
||||||
|
|
||||||
import (
|
import (
|
||||||
context "context"
|
"context"
|
||||||
reflect "reflect"
|
"reflect"
|
||||||
|
|
||||||
bbgo "github.com/c9s/bbgo/pkg/bbgo"
|
"github.com/golang/mock/gomock"
|
||||||
types "github.com/c9s/bbgo/pkg/types"
|
|
||||||
gomock "github.com/golang/mock/gomock"
|
"github.com/c9s/bbgo/pkg/core"
|
||||||
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MockOrderExecutorExtended is a mock of OrderExecutorExtended interface.
|
// MockOrderExecutorExtended is a mock of OrderExecutorExtended interface.
|
||||||
|
@ -90,10 +91,10 @@ func (mr *MockOrderExecutorExtendedMockRecorder) SubmitOrders(arg0 interface{},
|
||||||
}
|
}
|
||||||
|
|
||||||
// TradeCollector mocks base method.
|
// TradeCollector mocks base method.
|
||||||
func (m *MockOrderExecutorExtended) TradeCollector() *bbgo.TradeCollector {
|
func (m *MockOrderExecutorExtended) TradeCollector() *core.TradeCollector {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "TradeCollector")
|
ret := m.ctrl.Call(m, "TradeCollector")
|
||||||
ret0, _ := ret[0].(*bbgo.TradeCollector)
|
ret0, _ := ret[0].(*core.TradeCollector)
|
||||||
return ret0
|
return ret0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"go.uber.org/multierr"
|
"go.uber.org/multierr"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/core"
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
"github.com/c9s/bbgo/pkg/util"
|
"github.com/c9s/bbgo/pkg/util"
|
||||||
|
@ -32,7 +33,7 @@ type OrderExecutor interface {
|
||||||
type OrderExecutorExtended interface {
|
type OrderExecutorExtended interface {
|
||||||
SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error)
|
SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error)
|
||||||
CancelOrders(ctx context.Context, orders ...types.Order) error
|
CancelOrders(ctx context.Context, orders ...types.Order) error
|
||||||
TradeCollector() *TradeCollector
|
TradeCollector() *core.TradeCollector
|
||||||
Position() *types.Position
|
Position() *types.Position
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,7 +35,7 @@ type GeneralOrderExecutor struct {
|
||||||
position *types.Position
|
position *types.Position
|
||||||
activeMakerOrders *ActiveOrderBook
|
activeMakerOrders *ActiveOrderBook
|
||||||
orderStore *core.OrderStore
|
orderStore *core.OrderStore
|
||||||
tradeCollector *TradeCollector
|
tradeCollector *core.TradeCollector
|
||||||
|
|
||||||
logger log.FieldLogger
|
logger log.FieldLogger
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ func NewGeneralOrderExecutor(session *ExchangeSession, symbol, strategy, strateg
|
||||||
position: position,
|
position: position,
|
||||||
activeMakerOrders: NewActiveOrderBook(symbol),
|
activeMakerOrders: NewActiveOrderBook(symbol),
|
||||||
orderStore: orderStore,
|
orderStore: orderStore,
|
||||||
tradeCollector: NewTradeCollector(symbol, position, orderStore),
|
tradeCollector: core.NewTradeCollector(symbol, position, orderStore),
|
||||||
}
|
}
|
||||||
|
|
||||||
if session != nil && session.Margin {
|
if session != nil && session.Margin {
|
||||||
|
@ -517,7 +517,7 @@ func (e *GeneralOrderExecutor) ClosePosition(ctx context.Context, percentage fix
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *GeneralOrderExecutor) TradeCollector() *TradeCollector {
|
func (e *GeneralOrderExecutor) TradeCollector() *core.TradeCollector {
|
||||||
return e.tradeCollector
|
return e.tradeCollector
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,238 +1 @@
|
||||||
package bbgo
|
package bbgo
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/core"
|
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
|
||||||
"github.com/c9s/bbgo/pkg/sigchan"
|
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
//go:generate callbackgen -type TradeCollector
|
|
||||||
type TradeCollector struct {
|
|
||||||
Symbol string
|
|
||||||
orderSig sigchan.Chan
|
|
||||||
|
|
||||||
tradeStore *core.TradeStore
|
|
||||||
tradeC chan types.Trade
|
|
||||||
position *types.Position
|
|
||||||
orderStore *core.OrderStore
|
|
||||||
doneTrades map[types.TradeKey]struct{}
|
|
||||||
|
|
||||||
mu sync.Mutex
|
|
||||||
|
|
||||||
recoverCallbacks []func(trade types.Trade)
|
|
||||||
|
|
||||||
tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value)
|
|
||||||
|
|
||||||
positionUpdateCallbacks []func(position *types.Position)
|
|
||||||
profitCallbacks []func(trade types.Trade, profit *types.Profit)
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewTradeCollector(symbol string, position *types.Position, orderStore *core.OrderStore) *TradeCollector {
|
|
||||||
return &TradeCollector{
|
|
||||||
Symbol: symbol,
|
|
||||||
orderSig: sigchan.New(1),
|
|
||||||
|
|
||||||
tradeC: make(chan types.Trade, 100),
|
|
||||||
tradeStore: core.NewTradeStore(),
|
|
||||||
doneTrades: make(map[types.TradeKey]struct{}),
|
|
||||||
position: position,
|
|
||||||
orderStore: orderStore,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// OrderStore returns the order store used by the trade collector
|
|
||||||
func (c *TradeCollector) OrderStore() *core.OrderStore {
|
|
||||||
return c.orderStore
|
|
||||||
}
|
|
||||||
|
|
||||||
// Position returns the position used by the trade collector
|
|
||||||
func (c *TradeCollector) Position() *types.Position {
|
|
||||||
return c.position
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TradeCollector) TradeStore() *core.TradeStore {
|
|
||||||
return c.tradeStore
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TradeCollector) SetPosition(position *types.Position) {
|
|
||||||
c.position = position
|
|
||||||
}
|
|
||||||
|
|
||||||
// QueueTrade sends the trade object to the trade channel,
|
|
||||||
// so that the goroutine can receive the trade and process in the background.
|
|
||||||
func (c *TradeCollector) QueueTrade(trade types.Trade) {
|
|
||||||
c.tradeC <- trade
|
|
||||||
}
|
|
||||||
|
|
||||||
// BindStreamForBackground bind the stream callback for background processing
|
|
||||||
func (c *TradeCollector) BindStreamForBackground(stream types.Stream) {
|
|
||||||
stream.OnTradeUpdate(c.QueueTrade)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TradeCollector) BindStream(stream types.Stream) {
|
|
||||||
stream.OnTradeUpdate(func(trade types.Trade) {
|
|
||||||
c.ProcessTrade(trade)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Emit triggers the trade processing (position update)
|
|
||||||
// If you sent order, and the order store is updated, you can call this method
|
|
||||||
// so that trades will be processed in the next round of the goroutine loop
|
|
||||||
func (c *TradeCollector) Emit() {
|
|
||||||
c.orderSig.Emit()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time) error {
|
|
||||||
trades, err := ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
|
|
||||||
StartTime: &from,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, td := range trades {
|
|
||||||
log.Debugf("processing trade: %s", td.String())
|
|
||||||
if c.ProcessTrade(td) {
|
|
||||||
log.Infof("recovered trade: %s", td.String())
|
|
||||||
c.EmitRecover(td)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *TradeCollector) setDone(key types.TradeKey) {
|
|
||||||
c.mu.Lock()
|
|
||||||
c.doneTrades[key] = struct{}{}
|
|
||||||
c.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process filters the received trades and see if there are orders matching the trades
|
|
||||||
// if we have the order in the order store, then the trade will be considered for the position.
|
|
||||||
// profit will also be calculated.
|
|
||||||
func (c *TradeCollector) Process() bool {
|
|
||||||
positionChanged := false
|
|
||||||
|
|
||||||
c.tradeStore.Filter(func(trade types.Trade) bool {
|
|
||||||
key := trade.Key()
|
|
||||||
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
|
|
||||||
// if it's already done, remove the trade from the trade store
|
|
||||||
if _, done := c.doneTrades[key]; done {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.orderStore.Exists(trade.OrderID) {
|
|
||||||
if c.position != nil {
|
|
||||||
profit, netProfit, madeProfit := c.position.AddTrade(trade)
|
|
||||||
if madeProfit {
|
|
||||||
p := c.position.NewProfit(trade, profit, netProfit)
|
|
||||||
c.EmitTrade(trade, profit, netProfit)
|
|
||||||
c.EmitProfit(trade, &p)
|
|
||||||
} else {
|
|
||||||
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
|
|
||||||
c.EmitProfit(trade, nil)
|
|
||||||
}
|
|
||||||
positionChanged = true
|
|
||||||
} else {
|
|
||||||
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.doneTrades[key] = struct{}{}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
|
|
||||||
if positionChanged && c.position != nil {
|
|
||||||
c.EmitPositionUpdate(c.position)
|
|
||||||
}
|
|
||||||
|
|
||||||
return positionChanged
|
|
||||||
}
|
|
||||||
|
|
||||||
// processTrade takes a trade and see if there is a matched order
|
|
||||||
// if the order is found, then we add the trade to the position
|
|
||||||
// return true when the given trade is added
|
|
||||||
// return false when the given trade is not added
|
|
||||||
func (c *TradeCollector) processTrade(trade types.Trade) bool {
|
|
||||||
c.mu.Lock()
|
|
||||||
defer c.mu.Unlock()
|
|
||||||
|
|
||||||
key := trade.Key()
|
|
||||||
|
|
||||||
// if it's already done, remove the trade from the trade store
|
|
||||||
if _, done := c.doneTrades[key]; done {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.orderStore.Exists(trade.OrderID) {
|
|
||||||
if c.position != nil {
|
|
||||||
profit, netProfit, madeProfit := c.position.AddTrade(trade)
|
|
||||||
if madeProfit {
|
|
||||||
p := c.position.NewProfit(trade, profit, netProfit)
|
|
||||||
c.EmitTrade(trade, profit, netProfit)
|
|
||||||
c.EmitProfit(trade, &p)
|
|
||||||
} else {
|
|
||||||
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
|
|
||||||
c.EmitProfit(trade, nil)
|
|
||||||
}
|
|
||||||
c.EmitPositionUpdate(c.position)
|
|
||||||
} else {
|
|
||||||
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
|
|
||||||
}
|
|
||||||
|
|
||||||
c.doneTrades[key] = struct{}{}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// return true when the given trade is added
|
|
||||||
// return false when the given trade is not added
|
|
||||||
func (c *TradeCollector) ProcessTrade(trade types.Trade) bool {
|
|
||||||
key := trade.Key()
|
|
||||||
// if it's already done, remove the trade from the trade store
|
|
||||||
c.mu.Lock()
|
|
||||||
if _, done := c.doneTrades[key]; done {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
c.mu.Unlock()
|
|
||||||
|
|
||||||
if c.processTrade(trade) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
c.tradeStore.Add(trade)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Run is a goroutine executed in the background
|
|
||||||
// Do not use this function if you need back-testing
|
|
||||||
func (c *TradeCollector) Run(ctx context.Context) {
|
|
||||||
var ticker = time.NewTicker(3 * time.Second)
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
|
|
||||||
case <-ticker.C:
|
|
||||||
c.Process()
|
|
||||||
|
|
||||||
case <-c.orderSig:
|
|
||||||
c.Process()
|
|
||||||
|
|
||||||
case trade := <-c.tradeC:
|
|
||||||
c.ProcessTrade(trade)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,66 +1 @@
|
||||||
package bbgo
|
package bbgo
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/core"
|
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestTradeCollector_ShouldNotCountDuplicatedTrade(t *testing.T) {
|
|
||||||
symbol := "BTCUSDT"
|
|
||||||
position := types.NewPosition(symbol, "BTC", "USDT")
|
|
||||||
orderStore := core.NewOrderStore(symbol)
|
|
||||||
collector := NewTradeCollector(symbol, position, orderStore)
|
|
||||||
assert.NotNil(t, collector)
|
|
||||||
|
|
||||||
matched := collector.ProcessTrade(types.Trade{
|
|
||||||
ID: 1,
|
|
||||||
OrderID: 399,
|
|
||||||
Exchange: types.ExchangeBinance,
|
|
||||||
Price: fixedpoint.NewFromInt(40000),
|
|
||||||
Quantity: fixedpoint.One,
|
|
||||||
QuoteQuantity: fixedpoint.NewFromInt(40000),
|
|
||||||
Symbol: "BTCUSDT",
|
|
||||||
Side: types.SideTypeBuy,
|
|
||||||
IsBuyer: true,
|
|
||||||
})
|
|
||||||
assert.False(t, matched, "should be added to the trade store")
|
|
||||||
assert.Equal(t, 1, len(collector.tradeStore.Trades()), "should have one trade in the trade store")
|
|
||||||
|
|
||||||
orderStore.Add(types.Order{
|
|
||||||
SubmitOrder: types.SubmitOrder{
|
|
||||||
Symbol: "BTCUSDT",
|
|
||||||
Side: types.SideTypeBuy,
|
|
||||||
Type: types.OrderTypeLimit,
|
|
||||||
Quantity: fixedpoint.One,
|
|
||||||
Price: fixedpoint.NewFromInt(40000),
|
|
||||||
},
|
|
||||||
Exchange: types.ExchangeBinance,
|
|
||||||
OrderID: 399,
|
|
||||||
Status: types.OrderStatusFilled,
|
|
||||||
ExecutedQuantity: fixedpoint.One,
|
|
||||||
IsWorking: false,
|
|
||||||
})
|
|
||||||
|
|
||||||
matched = collector.Process()
|
|
||||||
assert.True(t, matched)
|
|
||||||
assert.Equal(t, 0, len(collector.tradeStore.Trades()), "the found trade should be removed from the trade store")
|
|
||||||
|
|
||||||
matched = collector.ProcessTrade(types.Trade{
|
|
||||||
ID: 1,
|
|
||||||
OrderID: 399,
|
|
||||||
Exchange: types.ExchangeBinance,
|
|
||||||
Price: fixedpoint.NewFromInt(40000),
|
|
||||||
Quantity: fixedpoint.One,
|
|
||||||
QuoteQuantity: fixedpoint.NewFromInt(40000),
|
|
||||||
Symbol: "BTCUSDT",
|
|
||||||
Side: types.SideTypeBuy,
|
|
||||||
IsBuyer: true,
|
|
||||||
})
|
|
||||||
assert.False(t, matched, "the same trade should not match")
|
|
||||||
assert.Equal(t, 0, len(collector.tradeStore.Trades()), "the same trade should not be added to the trade store")
|
|
||||||
}
|
|
||||||
|
|
|
@ -308,7 +308,7 @@ var BacktestCmd = &cobra.Command{
|
||||||
var reportDir = outputDirectory
|
var reportDir = outputDirectory
|
||||||
var sessionTradeStats = make(map[string]map[string]*types.TradeStats)
|
var sessionTradeStats = make(map[string]map[string]*types.TradeStats)
|
||||||
|
|
||||||
var tradeCollectorList []*bbgo.TradeCollector
|
var tradeCollectorList []*core.TradeCollector
|
||||||
for _, exSource := range exchangeSources {
|
for _, exSource := range exchangeSources {
|
||||||
sessionName := exSource.Session.Name
|
sessionName := exSource.Session.Name
|
||||||
tradeStatsMap := make(map[string]*types.TradeStats)
|
tradeStatsMap := make(map[string]*types.TradeStats)
|
||||||
|
@ -317,7 +317,7 @@ var BacktestCmd = &cobra.Command{
|
||||||
position := types.NewPositionFromMarket(market)
|
position := types.NewPositionFromMarket(market)
|
||||||
orderStore := core.NewOrderStore(usedSymbol)
|
orderStore := core.NewOrderStore(usedSymbol)
|
||||||
orderStore.AddOrderUpdate = true
|
orderStore.AddOrderUpdate = true
|
||||||
tradeCollector := bbgo.NewTradeCollector(usedSymbol, position, orderStore)
|
tradeCollector := core.NewTradeCollector(usedSymbol, position, orderStore)
|
||||||
|
|
||||||
tradeStats := types.NewTradeStats(usedSymbol)
|
tradeStats := types.NewTradeStats(usedSymbol)
|
||||||
tradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, startTime))
|
tradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, startTime))
|
||||||
|
|
237
pkg/core/tradecollector.go
Normal file
237
pkg/core/tradecollector.go
Normal file
|
@ -0,0 +1,237 @@
|
||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
|
"github.com/c9s/bbgo/pkg/sigchan"
|
||||||
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:generate callbackgen -type TradeCollector
|
||||||
|
type TradeCollector struct {
|
||||||
|
Symbol string
|
||||||
|
orderSig sigchan.Chan
|
||||||
|
|
||||||
|
tradeStore *TradeStore
|
||||||
|
tradeC chan types.Trade
|
||||||
|
position *types.Position
|
||||||
|
orderStore *OrderStore
|
||||||
|
doneTrades map[types.TradeKey]struct{}
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
|
||||||
|
recoverCallbacks []func(trade types.Trade)
|
||||||
|
|
||||||
|
tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value)
|
||||||
|
|
||||||
|
positionUpdateCallbacks []func(position *types.Position)
|
||||||
|
profitCallbacks []func(trade types.Trade, profit *types.Profit)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector {
|
||||||
|
return &TradeCollector{
|
||||||
|
Symbol: symbol,
|
||||||
|
orderSig: sigchan.New(1),
|
||||||
|
|
||||||
|
tradeC: make(chan types.Trade, 100),
|
||||||
|
tradeStore: NewTradeStore(),
|
||||||
|
doneTrades: make(map[types.TradeKey]struct{}),
|
||||||
|
position: position,
|
||||||
|
orderStore: orderStore,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// OrderStore returns the order store used by the trade collector
|
||||||
|
func (c *TradeCollector) OrderStore() *OrderStore {
|
||||||
|
return c.orderStore
|
||||||
|
}
|
||||||
|
|
||||||
|
// Position returns the position used by the trade collector
|
||||||
|
func (c *TradeCollector) Position() *types.Position {
|
||||||
|
return c.position
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TradeCollector) TradeStore() *TradeStore {
|
||||||
|
return c.tradeStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TradeCollector) SetPosition(position *types.Position) {
|
||||||
|
c.position = position
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueueTrade sends the trade object to the trade channel,
|
||||||
|
// so that the goroutine can receive the trade and process in the background.
|
||||||
|
func (c *TradeCollector) QueueTrade(trade types.Trade) {
|
||||||
|
c.tradeC <- trade
|
||||||
|
}
|
||||||
|
|
||||||
|
// BindStreamForBackground bind the stream callback for background processing
|
||||||
|
func (c *TradeCollector) BindStreamForBackground(stream types.Stream) {
|
||||||
|
stream.OnTradeUpdate(c.QueueTrade)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TradeCollector) BindStream(stream types.Stream) {
|
||||||
|
stream.OnTradeUpdate(func(trade types.Trade) {
|
||||||
|
c.ProcessTrade(trade)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit triggers the trade processing (position update)
|
||||||
|
// If you sent order, and the order store is updated, you can call this method
|
||||||
|
// so that trades will be processed in the next round of the goroutine loop
|
||||||
|
func (c *TradeCollector) Emit() {
|
||||||
|
c.orderSig.Emit()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time) error {
|
||||||
|
trades, err := ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
|
||||||
|
StartTime: &from,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, td := range trades {
|
||||||
|
logrus.Debugf("processing trade: %s", td.String())
|
||||||
|
if c.ProcessTrade(td) {
|
||||||
|
logrus.Infof("recovered trade: %s", td.String())
|
||||||
|
c.EmitRecover(td)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *TradeCollector) setDone(key types.TradeKey) {
|
||||||
|
c.mu.Lock()
|
||||||
|
c.doneTrades[key] = struct{}{}
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process filters the received trades and see if there are orders matching the trades
|
||||||
|
// if we have the order in the order store, then the trade will be considered for the position.
|
||||||
|
// profit will also be calculated.
|
||||||
|
func (c *TradeCollector) Process() bool {
|
||||||
|
positionChanged := false
|
||||||
|
|
||||||
|
c.tradeStore.Filter(func(trade types.Trade) bool {
|
||||||
|
key := trade.Key()
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
// if it's already done, remove the trade from the trade store
|
||||||
|
if _, done := c.doneTrades[key]; done {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.orderStore.Exists(trade.OrderID) {
|
||||||
|
if c.position != nil {
|
||||||
|
profit, netProfit, madeProfit := c.position.AddTrade(trade)
|
||||||
|
if madeProfit {
|
||||||
|
p := c.position.NewProfit(trade, profit, netProfit)
|
||||||
|
c.EmitTrade(trade, profit, netProfit)
|
||||||
|
c.EmitProfit(trade, &p)
|
||||||
|
} else {
|
||||||
|
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
|
||||||
|
c.EmitProfit(trade, nil)
|
||||||
|
}
|
||||||
|
positionChanged = true
|
||||||
|
} else {
|
||||||
|
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.doneTrades[key] = struct{}{}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
|
||||||
|
if positionChanged && c.position != nil {
|
||||||
|
c.EmitPositionUpdate(c.position)
|
||||||
|
}
|
||||||
|
|
||||||
|
return positionChanged
|
||||||
|
}
|
||||||
|
|
||||||
|
// processTrade takes a trade and see if there is a matched order
|
||||||
|
// if the order is found, then we add the trade to the position
|
||||||
|
// return true when the given trade is added
|
||||||
|
// return false when the given trade is not added
|
||||||
|
func (c *TradeCollector) processTrade(trade types.Trade) bool {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
key := trade.Key()
|
||||||
|
|
||||||
|
// if it's already done, remove the trade from the trade store
|
||||||
|
if _, done := c.doneTrades[key]; done {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.orderStore.Exists(trade.OrderID) {
|
||||||
|
if c.position != nil {
|
||||||
|
profit, netProfit, madeProfit := c.position.AddTrade(trade)
|
||||||
|
if madeProfit {
|
||||||
|
p := c.position.NewProfit(trade, profit, netProfit)
|
||||||
|
c.EmitTrade(trade, profit, netProfit)
|
||||||
|
c.EmitProfit(trade, &p)
|
||||||
|
} else {
|
||||||
|
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
|
||||||
|
c.EmitProfit(trade, nil)
|
||||||
|
}
|
||||||
|
c.EmitPositionUpdate(c.position)
|
||||||
|
} else {
|
||||||
|
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
|
||||||
|
}
|
||||||
|
|
||||||
|
c.doneTrades[key] = struct{}{}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// return true when the given trade is added
|
||||||
|
// return false when the given trade is not added
|
||||||
|
func (c *TradeCollector) ProcessTrade(trade types.Trade) bool {
|
||||||
|
key := trade.Key()
|
||||||
|
// if it's already done, remove the trade from the trade store
|
||||||
|
c.mu.Lock()
|
||||||
|
if _, done := c.doneTrades[key]; done {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
|
||||||
|
if c.processTrade(trade) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
c.tradeStore.Add(trade)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run is a goroutine executed in the background
|
||||||
|
// Do not use this function if you need back-testing
|
||||||
|
func (c *TradeCollector) Run(ctx context.Context) {
|
||||||
|
var ticker = time.NewTicker(3 * time.Second)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
|
||||||
|
case <-ticker.C:
|
||||||
|
c.Process()
|
||||||
|
|
||||||
|
case <-c.orderSig:
|
||||||
|
c.Process()
|
||||||
|
|
||||||
|
case trade := <-c.tradeC:
|
||||||
|
c.ProcessTrade(trade)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,6 @@
|
||||||
// Code generated by "callbackgen -type TradeCollector"; DO NOT EDIT.
|
// Code generated by "callbackgen -type TradeCollector"; DO NOT EDIT.
|
||||||
|
|
||||||
package bbgo
|
package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
65
pkg/core/tradecollector_test.go
Normal file
65
pkg/core/tradecollector_test.go
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
package core
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTradeCollector_ShouldNotCountDuplicatedTrade(t *testing.T) {
|
||||||
|
symbol := "BTCUSDT"
|
||||||
|
position := types.NewPosition(symbol, "BTC", "USDT")
|
||||||
|
orderStore := NewOrderStore(symbol)
|
||||||
|
collector := NewTradeCollector(symbol, position, orderStore)
|
||||||
|
assert.NotNil(t, collector)
|
||||||
|
|
||||||
|
matched := collector.ProcessTrade(types.Trade{
|
||||||
|
ID: 1,
|
||||||
|
OrderID: 399,
|
||||||
|
Exchange: types.ExchangeBinance,
|
||||||
|
Price: fixedpoint.NewFromInt(40000),
|
||||||
|
Quantity: fixedpoint.One,
|
||||||
|
QuoteQuantity: fixedpoint.NewFromInt(40000),
|
||||||
|
Symbol: "BTCUSDT",
|
||||||
|
Side: types.SideTypeBuy,
|
||||||
|
IsBuyer: true,
|
||||||
|
})
|
||||||
|
assert.False(t, matched, "should be added to the trade store")
|
||||||
|
assert.Equal(t, 1, len(collector.tradeStore.Trades()), "should have one trade in the trade store")
|
||||||
|
|
||||||
|
orderStore.Add(types.Order{
|
||||||
|
SubmitOrder: types.SubmitOrder{
|
||||||
|
Symbol: "BTCUSDT",
|
||||||
|
Side: types.SideTypeBuy,
|
||||||
|
Type: types.OrderTypeLimit,
|
||||||
|
Quantity: fixedpoint.One,
|
||||||
|
Price: fixedpoint.NewFromInt(40000),
|
||||||
|
},
|
||||||
|
Exchange: types.ExchangeBinance,
|
||||||
|
OrderID: 399,
|
||||||
|
Status: types.OrderStatusFilled,
|
||||||
|
ExecutedQuantity: fixedpoint.One,
|
||||||
|
IsWorking: false,
|
||||||
|
})
|
||||||
|
|
||||||
|
matched = collector.Process()
|
||||||
|
assert.True(t, matched)
|
||||||
|
assert.Equal(t, 0, len(collector.tradeStore.Trades()), "the found trade should be removed from the trade store")
|
||||||
|
|
||||||
|
matched = collector.ProcessTrade(types.Trade{
|
||||||
|
ID: 1,
|
||||||
|
OrderID: 399,
|
||||||
|
Exchange: types.ExchangeBinance,
|
||||||
|
Price: fixedpoint.NewFromInt(40000),
|
||||||
|
Quantity: fixedpoint.One,
|
||||||
|
QuoteQuantity: fixedpoint.NewFromInt(40000),
|
||||||
|
Symbol: "BTCUSDT",
|
||||||
|
Side: types.SideTypeBuy,
|
||||||
|
IsBuyer: true,
|
||||||
|
})
|
||||||
|
assert.False(t, matched, "the same trade should not match")
|
||||||
|
assert.Equal(t, 0, len(collector.tradeStore.Trades()), "the same trade should not be added to the trade store")
|
||||||
|
}
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/bbgo"
|
"github.com/c9s/bbgo/pkg/bbgo"
|
||||||
"github.com/c9s/bbgo/pkg/bbgo/mocks"
|
"github.com/c9s/bbgo/pkg/bbgo/mocks"
|
||||||
|
"github.com/c9s/bbgo/pkg/core"
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
@ -93,7 +94,7 @@ func TestReleasePositionCallbacks(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
tradeCollector := &bbgo.TradeCollector{}
|
tradeCollector := &core.TradeCollector{}
|
||||||
mockCtrl := gomock.NewController(t)
|
mockCtrl := gomock.NewController(t)
|
||||||
defer mockCtrl.Finish()
|
defer mockCtrl.Finish()
|
||||||
orderExecutor := mocks.NewMockOrderExecutorExtended(mockCtrl)
|
orderExecutor := mocks.NewMockOrderExecutorExtended(mockCtrl)
|
||||||
|
|
|
@ -49,7 +49,7 @@ type Strategy struct {
|
||||||
// closePositionOrders *bbgo.LocalActiveOrderBook
|
// closePositionOrders *bbgo.LocalActiveOrderBook
|
||||||
|
|
||||||
orderStore *core.OrderStore
|
orderStore *core.OrderStore
|
||||||
tradeCollector *bbgo.TradeCollector
|
tradeCollector *core.TradeCollector
|
||||||
|
|
||||||
session *bbgo.ExchangeSession
|
session *bbgo.ExchangeSession
|
||||||
|
|
||||||
|
@ -174,7 +174,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
|
||||||
s.Position.Strategy = ID
|
s.Position.Strategy = ID
|
||||||
s.Position.StrategyInstanceID = instanceID
|
s.Position.StrategyInstanceID = instanceID
|
||||||
|
|
||||||
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
|
s.tradeCollector = core.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
|
||||||
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
|
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
|
||||||
// StrategyController
|
// StrategyController
|
||||||
if s.Status != types.StrategyStatusRunning {
|
if s.Status != types.StrategyStatusRunning {
|
||||||
|
|
|
@ -95,7 +95,7 @@ type Strategy struct {
|
||||||
// activeOrders is the locally maintained active order book of the maker orders.
|
// activeOrders is the locally maintained active order book of the maker orders.
|
||||||
activeOrders *bbgo.ActiveOrderBook
|
activeOrders *bbgo.ActiveOrderBook
|
||||||
|
|
||||||
tradeCollector *bbgo.TradeCollector
|
tradeCollector *core.TradeCollector
|
||||||
|
|
||||||
// groupID is the group ID used for the strategy instance for canceling orders
|
// groupID is the group ID used for the strategy instance for canceling orders
|
||||||
groupID uint32
|
groupID uint32
|
||||||
|
@ -571,7 +571,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
|
||||||
s.activeOrders.OnFilled(s.handleFilledOrder)
|
s.activeOrders.OnFilled(s.handleFilledOrder)
|
||||||
s.activeOrders.BindStream(session.UserDataStream)
|
s.activeOrders.BindStream(session.UserDataStream)
|
||||||
|
|
||||||
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.State.Position, s.orderStore)
|
s.tradeCollector = core.NewTradeCollector(s.Symbol, s.State.Position, s.orderStore)
|
||||||
|
|
||||||
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
|
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
|
||||||
bbgo.Notify(trade)
|
bbgo.Notify(trade)
|
||||||
|
|
|
@ -67,7 +67,7 @@ type Strategy struct {
|
||||||
activeAdjustmentOrders *bbgo.ActiveOrderBook
|
activeAdjustmentOrders *bbgo.ActiveOrderBook
|
||||||
activeWallOrders *bbgo.ActiveOrderBook
|
activeWallOrders *bbgo.ActiveOrderBook
|
||||||
orderStore *core.OrderStore
|
orderStore *core.OrderStore
|
||||||
tradeCollector *bbgo.TradeCollector
|
tradeCollector *core.TradeCollector
|
||||||
|
|
||||||
groupID uint32
|
groupID uint32
|
||||||
|
|
||||||
|
@ -277,7 +277,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
|
||||||
s.orderStore = core.NewOrderStore(s.Symbol)
|
s.orderStore = core.NewOrderStore(s.Symbol)
|
||||||
s.orderStore.BindStream(session.UserDataStream)
|
s.orderStore.BindStream(session.UserDataStream)
|
||||||
|
|
||||||
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
|
s.tradeCollector = core.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
|
||||||
|
|
||||||
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
|
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
|
||||||
bbgo.Notify(trade)
|
bbgo.Notify(trade)
|
||||||
|
|
|
@ -105,7 +105,7 @@ type Strategy struct {
|
||||||
hedgeErrorRateReservation *rate.Reservation
|
hedgeErrorRateReservation *rate.Reservation
|
||||||
|
|
||||||
orderStore *core.OrderStore
|
orderStore *core.OrderStore
|
||||||
tradeCollector *bbgo.TradeCollector
|
tradeCollector *core.TradeCollector
|
||||||
|
|
||||||
askPriceHeartBeat, bidPriceHeartBeat types.PriceHeartBeat
|
askPriceHeartBeat, bidPriceHeartBeat types.PriceHeartBeat
|
||||||
|
|
||||||
|
@ -737,7 +737,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
|
||||||
s.orderStore.BindStream(s.sourceSession.UserDataStream)
|
s.orderStore.BindStream(s.sourceSession.UserDataStream)
|
||||||
s.orderStore.BindStream(s.makerSession.UserDataStream)
|
s.orderStore.BindStream(s.makerSession.UserDataStream)
|
||||||
|
|
||||||
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
|
s.tradeCollector = core.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
|
||||||
|
|
||||||
if s.NotifyTrade {
|
if s.NotifyTrade {
|
||||||
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
|
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user