bbgo: add trade collector

This commit is contained in:
c9s 2021-06-24 15:38:44 +08:00
parent 65629a77f4
commit db4fbbc30c
2 changed files with 96 additions and 0 deletions

View File

@ -0,0 +1,69 @@
package bbgo
import (
"context"
"github.com/c9s/bbgo/pkg/sigchan"
"github.com/c9s/bbgo/pkg/types"
)
//go:generate callbackgen -type TradeCollector
type TradeCollector struct {
Symbol string
orderSig sigchan.Chan
tradeStore *TradeStore
tradeC chan types.Trade
position *Position
orderStore *OrderStore
tradeCallbacks []func(trade types.Trade)
positionUpdateCallbacks []func(position *Position)
}
func NewTradeCollector(symbol string, position *Position, orderStore *OrderStore) *TradeCollector {
return &TradeCollector{
Symbol: symbol,
orderSig: sigchan.New(1),
tradeC: make(chan types.Trade, 100),
tradeStore: NewTradeStore(symbol),
position: position,
orderStore: orderStore,
}
}
func (c *TradeCollector) handleTradeUpdate(trade types.Trade) {
c.tradeC <- trade
}
func (c *TradeCollector) BindStream(stream types.Stream) {
stream.OnTradeUpdate(c.handleTradeUpdate)
}
func (c *TradeCollector) Emit() {
c.orderSig.Emit()
}
func (c *TradeCollector) Run(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-c.orderSig:
trades := c.tradeStore.GetAndClear()
for _, trade := range trades {
if c.orderStore.Exists(trade.OrderID) {
c.position.AddTrade(trade)
c.EmitTrade(trade)
}
}
c.EmitPositionUpdate(c.position)
case trade := <-c.tradeC:
c.tradeStore.Add(trade)
}
}
}

View File

@ -0,0 +1,27 @@
// Code generated by "callbackgen -type TradeCollector"; DO NOT EDIT.
package bbgo
import (
"github.com/c9s/bbgo/pkg/types"
)
func (c *TradeCollector) OnTrade(cb func(trade types.Trade)) {
c.tradeCallbacks = append(c.tradeCallbacks, cb)
}
func (c *TradeCollector) EmitTrade(trade types.Trade) {
for _, cb := range c.tradeCallbacks {
cb(trade)
}
}
func (c *TradeCollector) OnPositionUpdate(cb func(position *Position)) {
c.positionUpdateCallbacks = append(c.positionUpdateCallbacks, cb)
}
func (c *TradeCollector) EmitPositionUpdate(position *Position) {
for _, cb := range c.positionUpdateCallbacks {
cb(position)
}
}