backtest: pull out market data feeding to a function and call it in the main thread

This commit is contained in:
c9s 2021-12-25 22:57:28 +08:00
parent 60853bee23
commit 442afe8eb9
3 changed files with 95 additions and 97 deletions

View File

@ -34,6 +34,7 @@ import (
"time" "time"
"github.com/c9s/bbgo/pkg/exchange/ftx" "github.com/c9s/bbgo/pkg/exchange/ftx"
"github.com/c9s/bbgo/pkg/exchange/kucoin"
"github.com/c9s/bbgo/pkg/exchange/okex" "github.com/c9s/bbgo/pkg/exchange/okex"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -55,7 +56,7 @@ type Exchange struct {
account *types.Account account *types.Account
config *bbgo.Backtest config *bbgo.Backtest
userDataStream *Stream userDataStream, marketDataStream *Stream
trades map[string][]types.Trade trades map[string][]types.Trade
tradesMutex sync.Mutex tradesMutex sync.Mutex
@ -67,7 +68,6 @@ type Exchange struct {
matchingBooksMutex sync.Mutex matchingBooksMutex sync.Mutex
markets types.MarketMap markets types.MarketMap
doneC chan struct{}
} }
func NewExchange(sourceName types.ExchangeName, srv *service.BacktestService, config *bbgo.Backtest) (*Exchange, error) { func NewExchange(sourceName types.ExchangeName, srv *service.BacktestService, config *bbgo.Backtest) (*Exchange, error) {
@ -115,7 +115,6 @@ func NewExchange(sourceName types.ExchangeName, srv *service.BacktestService, co
endTime: endTime, endTime: endTime,
closedOrders: make(map[string][]types.Order), closedOrders: make(map[string][]types.Order),
trades: make(map[string][]types.Trade), trades: make(map[string][]types.Trade),
doneC: make(chan struct{}),
} }
e.resetMatchingBooks() e.resetMatchingBooks()
@ -157,10 +156,6 @@ func (e *Exchange) _addMatchingBook(symbol string, market types.Market) {
} }
} }
func (e *Exchange) Done() chan struct{} {
return e.doneC
}
func (e *Exchange) NewStream() types.Stream { func (e *Exchange) NewStream() types.Stream {
return &Stream{exchange: e} return &Stream{exchange: e}
} }
@ -319,7 +314,91 @@ func newPublicExchange(sourceExchange types.ExchangeName) (types.Exchange, error
return ftx.NewExchange("", "", ""), nil return ftx.NewExchange("", "", ""), nil
case types.ExchangeOKEx: case types.ExchangeOKEx:
return okex.New("", "", ""), nil return okex.New("", "", ""), nil
case types.ExchangeKucoin:
return kucoin.New("", "", ""), nil
} }
return nil, fmt.Errorf("public data from exchange %s is not supported", sourceExchange) return nil, fmt.Errorf("public data from exchange %s is not supported", sourceExchange)
} }
func (e *Exchange) FeedMarketData() error {
e.userDataStream.OnTradeUpdate(func(trade types.Trade) {
e.addTrade(trade)
})
e.matchingBooksMutex.Lock()
for _, matching := range e.matchingBooks {
matching.OnTradeUpdate(e.userDataStream.EmitTradeUpdate)
matching.OnOrderUpdate(e.userDataStream.EmitOrderUpdate)
matching.OnBalanceUpdate(e.userDataStream.EmitBalanceUpdate)
}
e.matchingBooksMutex.Unlock()
marketDataStream := e.marketDataStream
log.Infof("collecting backtest configurations...")
loadedSymbols := map[string]struct{}{}
loadedIntervals := map[types.Interval]struct{}{
// 1m interval is required for the backtest matching engine
types.Interval1m: {},
types.Interval1d: {},
}
for _, sub := range marketDataStream.Subscriptions {
loadedSymbols[sub.Symbol] = struct{}{}
switch sub.Channel {
case types.KLineChannel:
loadedIntervals[types.Interval(sub.Options.Interval)] = struct{}{}
default:
return fmt.Errorf("stream channel %marketDataStream is not supported in backtest", sub.Channel)
}
}
var symbols []string
for symbol := range loadedSymbols {
symbols = append(symbols, symbol)
}
var intervals []types.Interval
for interval := range loadedIntervals {
intervals = append(intervals, interval)
}
log.Infof("using symbols: %v and intervals: %v for back-testing", symbols, intervals)
log.Infof("querying klines from database...")
klineC, errC := e.srv.QueryKLinesCh(e.startTime, e.endTime, e, symbols, intervals)
numKlines := 0
for k := range klineC {
if k.Interval == types.Interval1m {
matching, ok := e.matchingBook(k.Symbol)
if !ok {
log.Errorf("matching book of %s is not initialized", k.Symbol)
continue
}
// here we generate trades and order updates
matching.processKLine(k)
numKlines++
}
marketDataStream.EmitKLineClosed(k)
}
if err := <-errC; err != nil {
log.WithError(err).Error("backtest data feed error")
}
if numKlines == 0 {
log.Error("kline data is empty, make sure you have sync the exchange market data")
}
if err := marketDataStream.Close(); err != nil {
log.WithError(err).Error("stream close error")
return err
}
return nil
}

View File

@ -2,8 +2,6 @@ package backtest
import ( import (
"context" "context"
"fmt"
"time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -19,105 +17,26 @@ type Stream struct {
} }
func (s *Stream) Connect(ctx context.Context) error { func (s *Stream) Connect(ctx context.Context) error {
log.Infof("collecting backtest configurations...") if s.PublicOnly {
if s.exchange.marketDataStream != nil {
loadedSymbols := map[string]struct{}{} panic("you should not set up more than 1 market data stream in back-test")
loadedIntervals := map[types.Interval]struct{}{
// 1m interval is required for the backtest matching engine
types.Interval1m: {},
types.Interval1d: {},
} }
s.exchange.marketDataStream = s
for _, sub := range s.Subscriptions { } else {
loadedSymbols[sub.Symbol] = struct{}{}
switch sub.Channel {
case types.KLineChannel:
loadedIntervals[types.Interval(sub.Options.Interval)] = struct{}{}
default:
return fmt.Errorf("stream channel %s is not supported in backtest", sub.Channel)
}
}
var symbols []string
for symbol := range loadedSymbols {
symbols = append(symbols, symbol)
}
var intervals []types.Interval
for interval := range loadedIntervals {
intervals = append(intervals, interval)
}
log.Infof("used symbols: %v and intervals: %v", symbols, intervals)
if !s.PublicOnly {
// user data stream
s.OnTradeUpdate(func(trade types.Trade) {
s.exchange.addTrade(trade)
})
// FIXME: here if we created two user data stream, since the callbacks are not de-registered we might have problem
s.exchange.matchingBooksMutex.Lock()
for _, matching := range s.exchange.matchingBooks {
matching.OnTradeUpdate(s.EmitTradeUpdate)
matching.OnOrderUpdate(s.EmitOrderUpdate)
matching.OnBalanceUpdate(s.EmitBalanceUpdate)
}
s.exchange.matchingBooksMutex.Unlock()
// assign user data stream back // assign user data stream back
if s.exchange.userDataStream != nil {
panic("you should not set up more than 1 user data stream in back-test")
}
s.exchange.userDataStream = s s.exchange.userDataStream = s
} }
s.EmitConnect() s.EmitConnect()
s.EmitStart() s.EmitStart()
if s.PublicOnly {
go func() {
FeedMarketData(s, s.exchange, s.exchange.startTime, s.exchange.endTime, symbols, intervals)
}()
}
return nil return nil
} }
func FeedMarketData(s *Stream, ex *Exchange, startTime, endTime time.Time, symbols []string, intervals []types.Interval) {
log.Infof("querying klines from database...")
klineC, errC := ex.srv.QueryKLinesCh(startTime, endTime, ex, symbols, intervals)
numKlines := 0
for k := range klineC {
if k.Interval == types.Interval1m {
matching, ok := ex.matchingBook(k.Symbol)
if !ok {
log.Errorf("matching book of %s is not initialized", k.Symbol)
continue
}
// here we generate trades and order updates
matching.processKLine(k)
numKlines++
}
s.EmitKLineClosed(k)
}
if err := <-errC; err != nil {
log.WithError(err).Error("backtest data feed error")
}
if numKlines == 0 {
log.Error("kline data is empty, make sure you have sync the exchange market data")
}
if err := s.Close(); err != nil {
log.WithError(err).Error("stream close error")
}
}
func (s *Stream) Close() error { func (s *Stream) Close() error {
close(s.exchange.doneC)
return nil return nil
} }

View File

@ -300,7 +300,7 @@ var BacktestCmd = &cobra.Command{
return err return err
} }
<-backtestExchange.Done() backtestExchange.FeedMarketData()
log.Infof("shutting down trader...") log.Infof("shutting down trader...")
shutdownCtx, cancel := context.WithDeadline(ctx, time.Now().Add(10*time.Second)) shutdownCtx, cancel := context.WithDeadline(ctx, time.Now().Add(10*time.Second))