xmaker: fix trade handling

This commit is contained in:
c9s 2021-12-26 12:10:10 +08:00
parent 92b3a9b0fe
commit 1c54e59d55
2 changed files with 50 additions and 108 deletions

View File

@ -8,14 +8,15 @@ import (
"sync"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange/max"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var defaultMargin = fixedpoint.NewFromFloat(0.003)
@ -141,11 +142,9 @@ type Strategy struct {
// --------------------------------
// private field
makerSession *bbgo.ExchangeSession
sourceSession *bbgo.ExchangeSession
makerSession, sourceSession *bbgo.ExchangeSession
sourceMarket types.Market
makerMarket types.Market
makerMarket, sourceMarket types.Market
// boll is the BOLLINGER indicator we used for predicting the price.
boll *indicator.BOLL
@ -155,9 +154,8 @@ type Strategy struct {
book *types.StreamOrderBook
activeMakerOrders *bbgo.LocalActiveOrderBook
orderStore *bbgo.OrderStore
tradeStore *bbgo.TradeStore
tradeC chan types.Trade
orderStore *bbgo.OrderStore
tradeCollector *bbgo.TradeCollector
lastPrice float64
groupID uint32
@ -236,6 +234,9 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
return
}
// use mid-price for the last price
s.lastPrice = (bestBid.Price + bestAsk.Price).Float64() / 2
sourceBook := s.book.CopyDepth(20)
if valid, err := sourceBook.IsValid(); !valid {
log.WithError(err).Errorf("%s invalid copied order book, skip quoting: %v", s.Symbol, err)
@ -576,78 +577,6 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
s.orderStore.Add(returnOrders...)
}
func (s *Strategy) handleTrade(trade types.Trade) {
s.tradeC <- trade
}
func (s *Strategy) processTrade(trade types.Trade) {
log.Infof("processing trade %+v", trade)
if trade.Symbol != s.Symbol {
return
}
if !s.orderStore.Exists(trade.OrderID) {
return
}
if s.NotifyTrade {
s.Notifiability.Notify(trade)
}
log.Infof("identified %s trade %d with an existing order: %d", trade.Symbol, trade.ID, trade.OrderID)
q := fixedpoint.NewFromFloat(trade.Quantity)
switch trade.Side {
case types.SideTypeSell:
q = -q
case types.SideTypeBuy:
case types.SideTypeSelf:
// ignore self trades
log.Warnf("ignore self trade")
return
default:
log.Infof("ignore non sell/buy side trades, got: %v", trade.Side)
return
}
s.state.HedgePosition.AtomicAdd(q)
if trade.Exchange == s.sourceSession.ExchangeName {
s.state.CoveredPosition.AtomicAdd(q)
}
s.state.ProfitStats.AddTrade(trade)
if profit, netProfit, madeProfit := s.state.Position.AddTrade(trade); madeProfit {
p := bbgo.Profit{
Symbol: s.Symbol,
Profit: profit,
NetProfit: netProfit,
TradeAmount: fixedpoint.NewFromFloat(trade.QuoteQuantity),
ProfitMargin: profit.DivFloat64(trade.QuoteQuantity),
NetProfitMargin: netProfit.DivFloat64(trade.QuoteQuantity),
QuoteCurrency: s.state.Position.QuoteCurrency,
BaseCurrency: s.state.Position.BaseCurrency,
Time: trade.Time.Time(),
}
s.state.ProfitStats.AddProfit(p)
s.Notify(&p)
} else {
log.Infof("position changed: %s", s.state.Position)
s.Notify(s.state.Position)
}
s.lastPrice = trade.Price
if err := s.SaveState(); err != nil {
log.WithError(err).Error("save state error")
}
}
func (s *Strategy) Validate() error {
if s.Quantity == 0 || s.QuantityScale == nil {
return errors.New("quantity or quantityScale can not be empty")
@ -704,9 +633,6 @@ func (s *Strategy) SaveState() error {
}
func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error {
// buffer 100 trades in the channel
s.tradeC = make(chan types.Trade, 100)
if s.BollBandInterval == "" {
s.BollBandInterval = types.Interval1m
}
@ -810,18 +736,37 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
s.book = types.NewStreamBook(s.Symbol)
s.book.BindStream(s.sourceSession.MarketDataStream)
s.sourceSession.UserDataStream.OnTradeUpdate(s.handleTrade)
s.makerSession.UserDataStream.OnTradeUpdate(s.handleTrade)
s.activeMakerOrders = bbgo.NewLocalActiveOrderBook()
s.activeMakerOrders.BindStream(s.makerSession.UserDataStream)
s.tradeStore = bbgo.NewTradeStore(s.Symbol)
s.orderStore = bbgo.NewOrderStore(s.Symbol)
s.orderStore.BindStream(s.sourceSession.UserDataStream)
s.orderStore.BindStream(s.makerSession.UserDataStream)
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.state.Position, s.orderStore)
s.tradeCollector.OnTrade(func(trade types.Trade) {
c := trade.PositionChange()
s.state.HedgePosition.AtomicAdd(c)
if trade.Exchange == s.sourceSession.ExchangeName {
s.state.CoveredPosition.AtomicAdd(c)
}
if s.NotifyTrade {
s.Notifiability.Notify(trade)
}
s.state.ProfitStats.AddTrade(trade)
if err := s.SaveState(); err != nil {
log.WithError(err).Error("save state error")
}
})
s.tradeCollector.OnPositionUpdate(func(position *types.Position) {
s.Notifiability.Notify(position)
})
s.tradeCollector.BindStreamForBackground(s.sourceSession.UserDataStream)
s.tradeCollector.BindStreamForBackground(s.makerSession.UserDataStream)
go s.tradeCollector.Run(ctx)
s.stopC = make(chan struct{})
go func() {
@ -857,26 +802,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
case <-reportTicker.C:
s.Notifiability.Notify(&s.state.ProfitStats)
case trade := <-s.tradeC:
log.Infof("recieved trade %+v", trade)
if s.orderStore.Exists(trade.OrderID) {
s.processTrade(trade)
} else {
// buffer this trade to the next tick of hedge
s.tradeStore.Add(trade)
}
case <-posTicker.C:
// process pending trades
if s.tradeStore.Num() > 0 {
for _, trade := range s.tradeStore.Trades() {
if s.orderStore.Exists(trade.OrderID) {
s.processTrade(trade)
}
}
s.tradeStore.Clear()
}
// for positive position:
// uncover position = 5 - 3 (covered position) = 2
// for negative position:

View File

@ -9,6 +9,7 @@ import (
"github.com/slack-go/slack"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/util"
)
@ -73,6 +74,21 @@ type Trade struct {
PnL sql.NullFloat64 `json:"pnl" db:"pnl"`
}
func (trade Trade) PositionChange() fixedpoint.Value {
q := fixedpoint.NewFromFloat(trade.Quantity)
switch trade.Side {
case SideTypeSell:
return -q
case SideTypeBuy:
return q
case SideTypeSelf:
return 0
}
return 0
}
func (trade Trade) String() string {
return fmt.Sprintf("TRADE %s %s %4s %f @ %f orderID %d %s amount %f",
trade.Exchange.String(),