Merge branch 'feature/backtest' into main

This commit is contained in:
c9s 2020-11-07 20:34:55 +08:00
commit 4b0bab31fb
11 changed files with 615 additions and 37 deletions

View File

@ -48,19 +48,21 @@ backtest:
# see here for more details
# https://www.investopedia.com/terms/m/maximum-drawdown-mdd.asp
startTime: "2020-01-01"
symbols:
- BTCUSDT
account:
makerCommission: 15
takerCommission: 15
buyerCommission: 0
sellerCommission: 0
balances:
BTC: 1.0
USDT: 5000.0
BTC: 0.0
USDT: 10000.0
exchangeStrategies:
- on: max
buyandhold:
symbol: "BTCUSDT"
interval: "1m"
interval: "1h"
minDropPercentage: -0.01
baseQuantity: 0.01
minDropPercentage: -0.02

View File

@ -21,10 +21,13 @@ type Exchange struct {
account *types.Account
config *bbgo.Backtest
closedOrders []types.SubmitOrder
openOrders []types.SubmitOrder
stream *Stream
trades map[string][]types.Trade
closedOrders map[string][]types.Order
matchingBooks map[string]*SimplePriceMatching
doneC chan struct{}
}
func NewExchange(sourceExchange types.ExchangeName, srv *service.BacktestService, config *bbgo.Backtest) *Exchange {
@ -51,39 +54,112 @@ func NewExchange(sourceExchange types.ExchangeName, srv *service.BacktestService
}
account.UpdateBalances(balances)
return &Exchange{
e := &Exchange{
sourceExchange: sourceExchange,
publicExchange: ex,
srv: srv,
config: config,
account: account,
startTime: startTime,
matchingBooks: make(map[string]*SimplePriceMatching),
closedOrders: make(map[string][]types.Order),
trades: make(map[string][]types.Trade),
doneC: make(chan struct{}),
}
return e
}
func (e *Exchange) Done() chan struct{} {
return e.doneC
}
func (e *Exchange) NewStream() types.Stream {
if e.stream != nil {
panic("backtest stream is already allocated, please check if there are extra NewStream calls")
panic("backtest stream can not be allocated twice")
}
e.stream = &Stream{exchange: e}
e.stream.OnTradeUpdate(func(trade types.Trade) {
e.trades[trade.Symbol] = append(e.trades[trade.Symbol], trade)
})
for _, symbol := range e.config.Symbols {
matching := &SimplePriceMatching{
Symbol: symbol,
CurrentTime: e.startTime,
}
matching.BindStream(e.stream)
e.matchingBooks[symbol] = matching
}
return e.stream
}
func (e Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) {
panic("implement me")
for _, order := range orders {
symbol := order.Symbol
matching, ok := e.matchingBooks[symbol]
if !ok {
return nil, errors.Errorf("matching engine is not initialized for symbol %s", symbol)
}
createdOrder, trade, err := matching.PlaceOrder(order)
if err != nil {
return nil, err
}
if createdOrder != nil {
createdOrders = append(createdOrders, *createdOrder)
// market order can be closed immediately.
switch createdOrder.Status {
case types.OrderStatusFilled, types.OrderStatusCanceled, types.OrderStatusRejected:
e.closedOrders[symbol] = append(e.closedOrders[symbol], *createdOrder)
}
e.stream.EmitOrderUpdate(*createdOrder)
}
if trade != nil {
e.stream.EmitTradeUpdate(*trade)
}
}
return createdOrders, nil
}
func (e Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders []types.Order, err error) {
panic("implement me")
matching, ok := e.matchingBooks[symbol]
if !ok {
return nil, errors.Errorf("matching engine is not initialized for symbol %s", symbol)
}
return append(matching.bidOrders, matching.askOrders...), nil
}
func (e Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []types.Order, err error) {
panic("implement me")
orders, ok := e.closedOrders[symbol]
if !ok {
return orders, errors.Errorf("matching engine is not initialized for symbol %s", symbol)
}
return orders, nil
}
func (e Exchange) CancelOrders(ctx context.Context, orders ...types.Order) error {
panic("implement me")
for _, order := range orders {
matching, ok := e.matchingBooks[order.Symbol]
if !ok {
return errors.Errorf("matching engine is not initialized for symbol %s", order.Symbol)
}
if err := matching.CancelOrder(order); err != nil {
return err
}
}
return nil
}
func (e Exchange) QueryAccount(ctx context.Context) (*types.Account, error) {

305
pkg/backtest/matching.go Normal file
View File

@ -0,0 +1,305 @@
package backtest
import (
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
var orderID uint64 = 1
func incOrderID() uint64 {
return atomic.AddUint64(&orderID, 1)
}
// SimplePriceMatching implements a simple kline data driven matching engine for backtest
type SimplePriceMatching struct {
Symbol string
bidOrders []types.Order
askOrders []types.Order
LastPrice fixedpoint.Value
CurrentTime time.Time
}
func (m *SimplePriceMatching) CancelOrder(o types.Order) error {
found := false
switch o.Side {
case types.SideTypeBuy:
var orders []types.Order
for _, order := range m.bidOrders {
if o.OrderID == order.OrderID {
found = true
continue
}
orders = append(orders, order)
}
m.bidOrders = orders
case types.SideTypeSell:
var orders []types.Order
for _, order := range m.bidOrders {
if o.OrderID == order.OrderID {
found = true
continue
}
orders = append(orders, order)
}
m.bidOrders = orders
}
if !found {
return errors.Errorf("cancel order failed, order not found")
}
return nil
}
func (m *SimplePriceMatching) PlaceOrder(o types.SubmitOrder) (closedOrders *types.Order, trades *types.Trade, err error) {
// start from one
orderID := incOrderID()
if o.Type == types.OrderTypeMarket {
order := newOrder(o, orderID, m.CurrentTime)
order.Status = types.OrderStatusFilled
order.ExecutedQuantity = order.Quantity
order.Price = m.LastPrice.Float64()
trade := m.newTradeFromOrder(order, false)
return &order, &trade, nil
}
order := newOrder(o, orderID, m.CurrentTime)
switch o.Side {
case types.SideTypeBuy:
m.bidOrders = append(m.bidOrders, order)
case types.SideTypeSell:
m.askOrders = append(m.askOrders, order)
}
return &order, nil, nil
}
func (m *SimplePriceMatching) newTradeFromOrder(order types.Order, isMaker bool) types.Trade {
return types.Trade{
ID: 0,
OrderID: order.OrderID,
Exchange: "backtest",
Price: order.Price,
Quantity: order.Quantity,
QuoteQuantity: order.Quantity * order.Price,
Symbol: order.Symbol,
Side: order.Side,
IsBuyer: order.Side == types.SideTypeBuy,
IsMaker: isMaker,
Time: m.CurrentTime,
Fee: order.Quantity * order.Price * 0.0015,
FeeCurrency: "USDT",
}
}
func (m *SimplePriceMatching) BuyToPrice(price fixedpoint.Value) (closedOrders []types.Order, trades []types.Trade) {
var priceF = price.Float64()
var askOrders []types.Order
for _, o := range m.askOrders {
switch o.Type {
case types.OrderTypeStopMarket:
// should we trigger the order
if priceF >= o.StopPrice {
o.ExecutedQuantity = o.Quantity
o.Price = priceF
o.Status = types.OrderStatusFilled
closedOrders = append(closedOrders, o)
trade := m.newTradeFromOrder(o, false)
trades = append(trades, trade)
} else {
askOrders = append(askOrders, o)
}
case types.OrderTypeStopLimit:
// should we trigger the order
if priceF >= o.StopPrice {
o.Type = types.OrderTypeLimit
if priceF >= o.Price {
o.ExecutedQuantity = o.Quantity
o.Status = types.OrderStatusFilled
closedOrders = append(closedOrders, o)
trade := m.newTradeFromOrder(o, false)
trades = append(trades, trade)
} else {
askOrders = append(askOrders, o)
}
} else {
askOrders = append(askOrders, o)
}
case types.OrderTypeLimit:
if priceF >= o.Price {
o.ExecutedQuantity = o.Quantity
o.Status = types.OrderStatusFilled
closedOrders = append(closedOrders, o)
trade := m.newTradeFromOrder(o, true)
trades = append(trades, trade)
} else {
askOrders = append(askOrders, o)
}
default:
askOrders = append(askOrders, o)
}
}
m.askOrders = askOrders
m.LastPrice = price
return closedOrders, trades
}
func (m *SimplePriceMatching) SellToPrice(price fixedpoint.Value) (closedOrders []types.Order, trades []types.Trade) {
var sellPrice = price.Float64()
var bidOrders []types.Order
for _, o := range m.bidOrders {
switch o.Type {
case types.OrderTypeStopMarket:
// should we trigger the order
if sellPrice <= o.StopPrice {
o.ExecutedQuantity = o.Quantity
o.Price = sellPrice
o.Status = types.OrderStatusFilled
closedOrders = append(closedOrders, o)
trade := m.newTradeFromOrder(o, false)
trades = append(trades, trade)
} else {
bidOrders = append(bidOrders, o)
}
case types.OrderTypeStopLimit:
// should we trigger the order
if sellPrice <= o.StopPrice {
o.Type = types.OrderTypeLimit
if sellPrice <= o.Price {
o.ExecutedQuantity = o.Quantity
o.Status = types.OrderStatusFilled
closedOrders = append(closedOrders, o)
trade := m.newTradeFromOrder(o, false)
trades = append(trades, trade)
} else {
bidOrders = append(bidOrders, o)
}
} else {
bidOrders = append(bidOrders, o)
}
case types.OrderTypeLimit:
if sellPrice <= o.Price {
o.ExecutedQuantity = o.Quantity
o.Status = types.OrderStatusFilled
closedOrders = append(closedOrders, o)
trade := m.newTradeFromOrder(o, true)
trades = append(trades, trade)
} else {
bidOrders = append(bidOrders, o)
}
default:
bidOrders = append(bidOrders, o)
}
}
m.bidOrders = bidOrders
m.LastPrice = price
return closedOrders, trades
}
func (m *SimplePriceMatching) BindStream(stream types.Stream) {
stream.OnKLineClosed(func(kline types.KLine) {
if kline.Interval != types.Interval1m {
return
}
if kline.Symbol != m.Symbol {
return
}
m.CurrentTime = kline.EndTime
switch kline.GetTrend() {
case types.TrendDown:
if kline.High > kline.Open {
m.BuyToPrice(fixedpoint.NewFromFloat(kline.High))
}
if kline.Low > kline.Close {
m.SellToPrice(fixedpoint.NewFromFloat(kline.Low))
}
m.SellToPrice(fixedpoint.NewFromFloat(kline.Close))
case types.TrendUp:
if kline.Low < kline.Open {
m.SellToPrice(fixedpoint.NewFromFloat(kline.Low))
}
if kline.High > kline.Close {
m.BuyToPrice(fixedpoint.NewFromFloat(kline.High))
}
m.BuyToPrice(fixedpoint.NewFromFloat(kline.Close))
}
})
}
type Matching struct {
Symbol string
Asks PriceOrderSlice
Bids PriceOrderSlice
OrderID uint64
CurrentTime time.Time
}
func (m *Matching) PlaceOrder(o types.SubmitOrder) {
var order = types.Order{
SubmitOrder: o,
Exchange: "backtest",
OrderID: m.OrderID,
Status: types.OrderStatusNew,
ExecutedQuantity: 0,
IsWorking: false,
CreationTime: m.CurrentTime,
UpdateTime: m.CurrentTime,
}
_ = order
}
func newOrder(o types.SubmitOrder, orderID uint64, creationTime time.Time) types.Order {
return types.Order{
SubmitOrder: o,
Exchange: "backtest",
OrderID: orderID,
Status: types.OrderStatusNew,
ExecutedQuantity: 0,
IsWorking: false,
CreationTime: creationTime,
UpdateTime: creationTime,
}
}

View File

@ -0,0 +1,76 @@
package backtest
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
func newLimitOrder(symbol string, side types.SideType, price, quantity float64) types.SubmitOrder {
return types.SubmitOrder{
Symbol: symbol,
Side: side,
Type: types.OrderTypeLimit,
Quantity: quantity,
Price: price,
TimeInForce: "GTC",
}
}
func TestSimplePriceMatching(t *testing.T) {
engine := &SimplePriceMatching{
CurrentTime: time.Now(),
}
for i := 0; i < 5; i++ {
_, _, err := engine.PlaceOrder(newLimitOrder("BTCUSDT", types.SideTypeBuy, 8000.0-float64(i), 1.0))
assert.NoError(t, err)
}
assert.Len(t, engine.bidOrders, 5)
assert.Len(t, engine.askOrders, 0)
for i := 0; i < 5; i++ {
_, _, err := engine.PlaceOrder(newLimitOrder("BTCUSDT", types.SideTypeSell, 9000.0+float64(i), 1.0))
assert.NoError(t, err)
}
assert.Len(t, engine.bidOrders, 5)
assert.Len(t, engine.askOrders, 5)
closedOrders, trades := engine.SellToPrice(fixedpoint.NewFromFloat(8100.0))
assert.Len(t, closedOrders, 0)
assert.Len(t, trades, 0)
closedOrders, trades = engine.SellToPrice(fixedpoint.NewFromFloat(8000.0))
assert.Len(t, closedOrders, 1)
assert.Len(t, trades, 1)
for _, o := range closedOrders {
assert.Equal(t, types.SideTypeBuy, o.Side)
}
closedOrders, trades = engine.SellToPrice(fixedpoint.NewFromFloat(7000.0))
assert.Len(t, closedOrders, 4)
assert.Len(t, trades, 4)
closedOrders, trades = engine.BuyToPrice(fixedpoint.NewFromFloat(8900.0))
assert.Len(t, closedOrders, 0)
assert.Len(t, trades, 0)
closedOrders, trades = engine.BuyToPrice(fixedpoint.NewFromFloat(9000.0))
assert.Len(t, closedOrders, 1)
assert.Len(t, trades, 1)
for _, o := range closedOrders {
assert.Equal(t, types.SideTypeSell, o.Side)
}
for _, trade := range trades {
assert.Equal(t, types.SideTypeSell, trade.Side)
}
closedOrders, trades = engine.BuyToPrice(fixedpoint.NewFromFloat(9500.0))
assert.Len(t, closedOrders, 4)
assert.Len(t, trades, 4)
}

View File

@ -0,0 +1,77 @@
package backtest
import (
"sort"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type PriceOrder struct {
Price fixedpoint.Value
Order types.Order
}
type PriceOrderSlice []PriceOrder
func (slice PriceOrderSlice) Len() int { return len(slice) }
func (slice PriceOrderSlice) Less(i, j int) bool { return slice[i].Price < slice[j].Price }
func (slice PriceOrderSlice) Swap(i, j int) { slice[i], slice[j] = slice[j], slice[i] }
func (slice PriceOrderSlice) InsertAt(idx int, po PriceOrder) PriceOrderSlice {
rear := append([]PriceOrder{}, slice[idx:]...)
newSlice := append(slice[:idx], po)
return append(newSlice, rear...)
}
func (slice PriceOrderSlice) Remove(price fixedpoint.Value, descending bool) PriceOrderSlice {
matched, idx := slice.Find(price, descending)
if matched.Price != price {
return slice
}
return append(slice[:idx], slice[idx+1:]...)
}
func (slice PriceOrderSlice) First() (PriceOrder, bool) {
if len(slice) > 0 {
return slice[0], true
}
return PriceOrder{}, false
}
// FindPriceVolumePair finds the pair by the given price, this function is a read-only
// operation, so we use the value receiver to avoid copy value from the pointer
// If the price is not found, it will return the index where the price can be inserted at.
// true for descending (bid orders), false for ascending (ask orders)
func (slice PriceOrderSlice) Find(price fixedpoint.Value, descending bool) (pv PriceOrder, idx int) {
idx = sort.Search(len(slice), func(i int) bool {
if descending {
return slice[i].Price <= price
}
return slice[i].Price >= price
})
if idx >= len(slice) || slice[idx].Price != price {
return pv, idx
}
pv = slice[idx]
return pv, idx
}
func (slice PriceOrderSlice) Upsert(po PriceOrder, descending bool) PriceOrderSlice {
if len(slice) == 0 {
return append(slice, po)
}
price := po.Price
_, idx := slice.Find(price, descending)
if idx >= len(slice) || slice[idx].Price != price {
return slice.InsertAt(idx, po)
}
slice[idx].Order = po.Order
return slice
}

View File

@ -19,7 +19,11 @@ func (s *Stream) Connect(ctx context.Context) error {
log.Infof("collecting backtest configurations...")
loadedSymbols := map[string]struct{}{}
loadedIntervals := map[types.Interval]struct{}{}
loadedIntervals := map[types.Interval]struct{}{
// 1m interval is required for the backtest matching engine
types.Interval1m: struct{}{},
}
for _, sub := range s.Subscriptions {
loadedSymbols[sub.Symbol] = struct{}{}
@ -44,25 +48,26 @@ func (s *Stream) Connect(ctx context.Context) error {
log.Infof("used symbols: %v and intervals: %v", symbols, intervals)
// TODO: we can sync before we connect
/*
if err := backtestService.Sync(ctx, exchange, symbol, startTime); err != nil {
return err
}
*/
go func() {
klineC, errC := s.exchange.srv.QueryKLinesCh(s.exchange.startTime, s.exchange, symbols, intervals)
for k := range klineC {
s.EmitKLineClosed(k)
}
if err := <-errC; err != nil {
return err
log.WithError(err).Error("backtest data feed error")
}
if err := s.Close(); err != nil {
log.WithError(err).Error("stream close error")
}
}()
return nil
}
func (s *Stream) Close() error {
close(s.exchange.doneC)
return nil
}

View File

@ -57,6 +57,7 @@ type Session struct {
type Backtest struct {
StartTime string `json:"startTime" yaml:"startTime"`
Account BacktestAccount `json:"account" yaml:"account"`
Symbols []string `json:"symbols" yaml:"symbols"`
}
func (t Backtest) ParseStartTime() (time.Time, error) {

View File

@ -54,6 +54,10 @@ func NewEnvironment() *Environment {
}
}
func (environ *Environment) Sessions() map[string]*ExchangeSession {
return environ.sessions
}
func (environ *Environment) SyncTrades(db *sqlx.DB) *Environment {
environ.TradeService = &service.TradeService{DB: db}
environ.TradeSync = &service.SyncService{

View File

@ -2,13 +2,13 @@ package cmd
import (
"context"
"syscall"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/backtest"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
@ -18,7 +18,7 @@ import (
func init() {
BacktestCmd.Flags().String("exchange", "", "target exchange")
BacktestCmd.Flags().Bool("sync", true, "sync backtest data")
BacktestCmd.Flags().Bool("sync", false, "sync backtest data")
BacktestCmd.Flags().String("config", "config/bbgo.yaml", "strategy config file")
RootCmd.AddCommand(BacktestCmd)
}
@ -37,10 +37,7 @@ var BacktestCmd = &cobra.Command{
return errors.New("--config option is required")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
userConfig, err := bbgo.Load(configFile)
wantSync, err := cmd.Flags().GetBool("sync")
if err != nil {
return err
}
@ -55,6 +52,14 @@ var BacktestCmd = &cobra.Command{
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
userConfig, err := bbgo.Load(configFile)
if err != nil {
return err
}
db, err := cmdutil.ConnectMySQL()
if err != nil {
return err
@ -74,6 +79,14 @@ var BacktestCmd = &cobra.Command{
exchange := backtest.NewExchange(exchangeName, backtestService, userConfig.Backtest)
if wantSync {
for _, symbol := range userConfig.Backtest.Symbols {
if err := backtestService.Sync(ctx, exchange, symbol, startTime); err != nil {
return err
}
}
}
environ := bbgo.NewEnvironment()
environ.AddExchange(exchangeName.String(), exchange)
@ -101,7 +114,23 @@ var BacktestCmd = &cobra.Command{
return err
}
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
<-exchange.Done()
for _, session := range environ.Sessions() {
calculator := &pnl.AverageCostCalculator{
TradingFeeCurrency: exchange.PlatformFeeCurrency(),
}
for symbol, trades := range session.Trades {
lastPrice, ok := session.LastPrice(symbol)
if !ok {
return errors.Errorf("last price not found: %s", symbol)
}
report := calculator.Calculate(symbol, trades, lastPrice)
report.Print()
}
}
return nil
},
}

View File

@ -114,12 +114,12 @@ func (s *BacktestService) QueryKLinesCh(since time.Time, exchange types.Exchange
// scanRowsCh scan rows into channel
func (s *BacktestService) scanRowsCh(rows *sqlx.Rows) (chan types.KLine, chan error) {
ch := make(chan types.KLine, 100)
ch := make(chan types.KLine, 500)
errC := make(chan error, 1)
go func() {
defer close(ch)
defer close(errC)
defer close(ch)
defer rows.Close()
for rows.Next() {
@ -136,6 +136,7 @@ func (s *BacktestService) scanRowsCh(rows *sqlx.Rows) (chan types.KLine, chan er
errC <- err
return
}
}()
return ch, errC

View File

@ -169,7 +169,9 @@ func (k KLine) GetChange() float64 {
}
func (k KLine) String() string {
return fmt.Sprintf("%s %s Open: %.8f Close: %.8f High: %.8f Low: %.8f Volume: %.8f Change: %.4f Max Change: %.4f", k.Symbol, k.Interval, k.Open, k.Close, k.High, k.Low, k.Volume, k.GetChange(), k.GetMaxChange())
return fmt.Sprintf("%s %s %s Open: %.8f Close: %.8f High: %.8f Low: %.8f Volume: %.8f Change: %.4f Max Change: %.4f",
k.StartTime.Format("2006-01-02 15:04"),
k.Symbol, k.Interval, k.Open, k.Close, k.High, k.Low, k.Volume, k.GetChange(), k.GetMaxChange())
}
func (k KLine) Color() string {