From db4fbbc30c73263580424ab75955ab86b793673b Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 24 Jun 2021 15:38:44 +0800 Subject: [PATCH] bbgo: add trade collector --- pkg/bbgo/tradecollector.go | 69 ++++++++++++++++++++++++++++ pkg/bbgo/tradecollector_callbacks.go | 27 +++++++++++ 2 files changed, 96 insertions(+) create mode 100644 pkg/bbgo/tradecollector.go create mode 100644 pkg/bbgo/tradecollector_callbacks.go diff --git a/pkg/bbgo/tradecollector.go b/pkg/bbgo/tradecollector.go new file mode 100644 index 000000000..e4327e5c9 --- /dev/null +++ b/pkg/bbgo/tradecollector.go @@ -0,0 +1,69 @@ +package bbgo + +import ( + "context" + + "github.com/c9s/bbgo/pkg/sigchan" + "github.com/c9s/bbgo/pkg/types" +) + +//go:generate callbackgen -type TradeCollector +type TradeCollector struct { + Symbol string + orderSig sigchan.Chan + + tradeStore *TradeStore + tradeC chan types.Trade + position *Position + orderStore *OrderStore + + tradeCallbacks []func(trade types.Trade) + positionUpdateCallbacks []func(position *Position) +} + +func NewTradeCollector(symbol string, position *Position, orderStore *OrderStore) *TradeCollector { + return &TradeCollector{ + Symbol: symbol, + orderSig: sigchan.New(1), + + tradeC: make(chan types.Trade, 100), + tradeStore: NewTradeStore(symbol), + position: position, + orderStore: orderStore, + } +} + +func (c *TradeCollector) handleTradeUpdate(trade types.Trade) { + c.tradeC <- trade +} + +func (c *TradeCollector) BindStream(stream types.Stream) { + stream.OnTradeUpdate(c.handleTradeUpdate) +} + +func (c *TradeCollector) Emit() { + c.orderSig.Emit() +} + +func (c *TradeCollector) Run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + + case <-c.orderSig: + trades := c.tradeStore.GetAndClear() + for _, trade := range trades { + if c.orderStore.Exists(trade.OrderID) { + c.position.AddTrade(trade) + c.EmitTrade(trade) + } + } + c.EmitPositionUpdate(c.position) + + case trade := <-c.tradeC: + c.tradeStore.Add(trade) + + } + } +} diff --git a/pkg/bbgo/tradecollector_callbacks.go b/pkg/bbgo/tradecollector_callbacks.go new file mode 100644 index 000000000..73e5863c0 --- /dev/null +++ b/pkg/bbgo/tradecollector_callbacks.go @@ -0,0 +1,27 @@ +// Code generated by "callbackgen -type TradeCollector"; DO NOT EDIT. + +package bbgo + +import ( + "github.com/c9s/bbgo/pkg/types" +) + +func (c *TradeCollector) OnTrade(cb func(trade types.Trade)) { + c.tradeCallbacks = append(c.tradeCallbacks, cb) +} + +func (c *TradeCollector) EmitTrade(trade types.Trade) { + for _, cb := range c.tradeCallbacks { + cb(trade) + } +} + +func (c *TradeCollector) OnPositionUpdate(cb func(position *Position)) { + c.positionUpdateCallbacks = append(c.positionUpdateCallbacks, cb) +} + +func (c *TradeCollector) EmitPositionUpdate(position *Position) { + for _, cb := range c.positionUpdateCallbacks { + cb(position) + } +}