From 3aa36b59897cb8ec70559aa182a578281cb5eda5 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 30 May 2021 15:08:11 +0800 Subject: [PATCH] refactor and fix backtest for user data stream and market data stream --- pkg/backtest/exchange.go | 34 +++------------- pkg/backtest/stream.go | 84 ++++++++++++++++++++++++++-------------- pkg/bbgo/environment.go | 7 +--- pkg/server/routes.go | 4 +- 4 files changed, 64 insertions(+), 65 deletions(-) diff --git a/pkg/backtest/exchange.go b/pkg/backtest/exchange.go index db24d44a9..12b214156 100644 --- a/pkg/backtest/exchange.go +++ b/pkg/backtest/exchange.go @@ -25,7 +25,7 @@ type Exchange struct { account *types.Account config *bbgo.Backtest - stream *Stream + userDataStream *Stream trades map[string][]types.Trade closedOrders map[string][]types.Order @@ -91,31 +91,7 @@ func (e *Exchange) Done() chan struct{} { } func (e *Exchange) NewStream() types.Stream { - if e.stream != nil { - panic("backtest stream can not be allocated twice") - } - - e.stream = &Stream{exchange: e} - - e.stream.OnTradeUpdate(func(trade types.Trade) { - e.trades[trade.Symbol] = append(e.trades[trade.Symbol], trade) - }) - - for symbol, market := range e.markets { - matching := &SimplePriceMatching{ - CurrentTime: e.startTime, - Account: e.account, - Market: market, - MakerCommission: e.config.Account.MakerCommission, - TakerCommission: e.config.Account.TakerCommission, - } - matching.OnTradeUpdate(e.stream.EmitTradeUpdate) - matching.OnOrderUpdate(e.stream.EmitOrderUpdate) - matching.OnBalanceUpdate(e.stream.EmitBalanceUpdate) - e.matchingBooks[symbol] = matching - } - - return e.stream + return &Stream{exchange: e} } func (e Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) { @@ -140,11 +116,11 @@ func (e Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) e.closedOrders[symbol] = append(e.closedOrders[symbol], *createdOrder) } - e.stream.EmitOrderUpdate(*createdOrder) + e.userDataStream.EmitOrderUpdate(*createdOrder) } if trade != nil { - e.stream.EmitTradeUpdate(*trade) + e.userDataStream.EmitTradeUpdate(*trade) } } @@ -180,7 +156,7 @@ func (e Exchange) CancelOrders(ctx context.Context, orders ...types.Order) error return err } - e.stream.EmitOrderUpdate(canceledOrder) + e.userDataStream.EmitOrderUpdate(canceledOrder) } return nil diff --git a/pkg/backtest/stream.go b/pkg/backtest/stream.go index f920fb1de..01169a4b5 100644 --- a/pkg/backtest/stream.go +++ b/pkg/backtest/stream.go @@ -15,6 +15,8 @@ type Stream struct { types.StandardStream exchange *Exchange + + publicOnly bool } func (s *Stream) Connect(ctx context.Context) error { @@ -51,48 +53,72 @@ func (s *Stream) Connect(ctx context.Context) error { log.Infof("used symbols: %v and intervals: %v", symbols, intervals) - go func() { - log.Infof("emitting connect callbacks...") - s.EmitConnect() + if !s.publicOnly { + // user data stream + s.OnTradeUpdate(func(trade types.Trade) { + s.exchange.trades[trade.Symbol] = append(s.exchange.trades[trade.Symbol], trade) + }) - log.Infof("emitting start callbacks...") - s.EmitStart() + for symbol, market := range s.exchange.markets { + matching := &SimplePriceMatching{ + CurrentTime: s.exchange.startTime, + Account: s.exchange.account, + Market: market, + MakerCommission: s.exchange.config.Account.MakerCommission, + TakerCommission: s.exchange.config.Account.TakerCommission, + } + matching.OnTradeUpdate(s.EmitTradeUpdate) + matching.OnOrderUpdate(s.EmitOrderUpdate) + matching.OnBalanceUpdate(s.EmitBalanceUpdate) + s.exchange.matchingBooks[symbol] = matching + } - log.Infof("querying klines from database...") - klineC, errC := s.exchange.srv.QueryKLinesCh(s.exchange.startTime, s.exchange.endTime, s.exchange, symbols, intervals) - numKlines := 0 - for k := range klineC { - if k.Interval == types.Interval1m { - matching, ok := s.exchange.matchingBooks[k.Symbol] - if !ok { - log.Errorf("matching book of %s is not initialized", k.Symbol) - continue + // assign user data stream back + s.exchange.userDataStream = s + } + + s.EmitConnect() + s.EmitStart() + + if s.publicOnly { + go func() { + log.Infof("querying klines from database...") + klineC, errC := s.exchange.srv.QueryKLinesCh(s.exchange.startTime, s.exchange.endTime, s.exchange, symbols, intervals) + numKlines := 0 + for k := range klineC { + if k.Interval == types.Interval1m { + matching, ok := s.exchange.matchingBooks[k.Symbol] + if !ok { + log.Errorf("matching book of %s is not initialized", k.Symbol) + continue + } + + matching.processKLine(k) + numKlines++ } - matching.processKLine(k) - numKlines++ + s.EmitKLineClosed(k) } - s.EmitKLineClosed(k) - } + if err := <-errC; err != nil { + log.WithError(err).Error("backtest data feed error") + } - if err := <-errC; err != nil { - log.WithError(err).Error("backtest data feed error") - } + if numKlines == 0 { + log.Error("kline data is empty, make sure you have sync the exchange market data") + } - if numKlines == 0 { - log.Error("kline data is empty, make sure you have sync the exchange market data") - } - - if err := s.Close(); err != nil { - log.WithError(err).Error("stream close error") - } - }() + if err := s.Close(); err != nil { + log.WithError(err).Error("stream close error") + } + }() + } return nil } func (s *Stream) SetPublicOnly() { + s.publicOnly = true return } diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index ef4848c47..37784ca2e 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -211,11 +211,8 @@ func (environ *Environment) AddExchangesByViperKeys() error { } func InitExchangeSession(name string, session *ExchangeSession) error { - exchangeName, err := types.ValidExchangeName(session.ExchangeName) - if err != nil { - return err - } - + var err error + var exchangeName = session.ExchangeName var exchange types.Exchange if session.Key != "" && session.Secret != "" { if !session.PublicOnly { diff --git a/pkg/server/routes.go b/pkg/server/routes.go index 878a0cc79..d428f0270 100644 --- a/pkg/server/routes.go +++ b/pkg/server/routes.go @@ -133,7 +133,7 @@ func (s *Server) newEngine() *gin.Engine { return } - err := bbgo.InitExchangeSession(session.ExchangeName, &session) + err := bbgo.InitExchangeSession(session.ExchangeName.String(), &session) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "error": err.Error(), @@ -182,7 +182,7 @@ func (s *Server) newEngine() *gin.Engine { return } - if err := bbgo.InitExchangeSession(session.ExchangeName, &session); err != nil { + if err := bbgo.InitExchangeSession(session.ExchangeName.String(), &session); err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "error": err.Error(), })