xmaker: use trade channel to buffer trades

This commit is contained in:
c9s 2021-05-29 01:03:43 +08:00
parent 33db0b5c6f
commit 6e0bc7c1e2

View File

@ -112,6 +112,7 @@ type Strategy struct {
orderStore *bbgo.OrderStore
tradeStore *bbgo.TradeStore
tradeC chan types.Trade
lastPrice float64
groupID uint32
@ -498,6 +499,7 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
log.Infof("submitting %s hedge order %s %f", s.Symbol, side.String(), quantity.Float64())
s.Notifiability.Notify("Submitting %s hedge order %s %f", s.Symbol, side.String(), quantity.Float64())
orderExecutor := &bbgo.ExchangeOrderExecutor{Session: s.sourceSession}
returnOrders, err := orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,
@ -514,17 +516,21 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
s.orderStore.Add(returnOrders...)
}
func (s *Strategy) handleTradeUpdate(trade types.Trade) {
func (s *Strategy) handleTrade(trade types.Trade) {
s.tradeC <- trade
}
func (s *Strategy) processTrade(trade types.Trade) {
log.Infof("processing trade %+v", trade)
if trade.Symbol != s.Symbol {
return
}
if !s.orderStore.Exists(trade.OrderID) {
s.tradeStore.Add(trade)
return
}
log.Infof("received trade %+v", trade)
log.Infof("identified %s trade %d with an existing order: %d", trade.Symbol, trade.ID, trade.OrderID)
q := fixedpoint.NewFromFloat(trade.Quantity)
switch trade.Side {
@ -544,8 +550,6 @@ func (s *Strategy) handleTradeUpdate(trade types.Trade) {
}
log.Infof("identified %s trade %d with an existing order: %d", trade.Symbol, trade.ID, trade.OrderID)
s.state.HedgePosition.AtomicAdd(q)
s.state.AccumulatedVolume.AtomicAdd(fixedpoint.NewFromFloat(trade.Quantity))
@ -578,8 +582,8 @@ func (s *Strategy) handleTradeUpdate(trade types.Trade) {
s.state.AccumulatedLoss.Float64(), s.state.Position.QuoteCurrency)
} else {
log.Infof("position changed: %s", s.state.Position)
s.Notify(s.state.Position)
log.Info("position changed: %s", s.state.Position)
}
s.lastPrice = trade.Price
@ -633,6 +637,9 @@ func (s *Strategy) SaveState() error {
}
func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error {
// buffer 100 trades in the channel
s.tradeC = make(chan types.Trade, 100)
if s.BollBandInterval == "" {
s.BollBandInterval = types.Interval1m
}
@ -749,8 +756,8 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
s.book = types.NewStreamBook(s.Symbol)
s.book.BindStream(s.sourceSession.MarketDataStream)
s.sourceSession.UserDataStream.OnTradeUpdate(s.handleTradeUpdate)
s.makerSession.UserDataStream.OnTradeUpdate(s.handleTradeUpdate)
s.sourceSession.UserDataStream.OnTradeUpdate(s.handleTrade)
s.makerSession.UserDataStream.OnTradeUpdate(s.handleTrade)
s.activeMakerOrders = bbgo.NewLocalActiveOrderBook()
s.activeMakerOrders.BindStream(s.makerSession.UserDataStream)
@ -790,10 +797,20 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
case <-quoteTicker.C:
s.updateQuote(ctx, orderExecutionRouter)
case trade := <-s.tradeC:
log.Infof("recieved trade %+v", trade)
if s.orderStore.Exists(trade.OrderID) {
s.processTrade(trade)
} else {
// buffer this trade to the next tick of hedge
s.tradeStore.Add(trade)
}
case <-posTicker.C:
// process pending trades
if s.tradeStore.Num() > 0 {
for _, trade := range s.tradeStore.Trades() {
s.handleTradeUpdate(trade)
s.processTrade(trade)
}
s.tradeStore.Clear()
}