fix: SerialMarketDataStore together with backtests

This commit is contained in:
zenix 2022-10-19 13:17:44 +09:00
parent 7dd951e39c
commit 675f84dccf
9 changed files with 282 additions and 87 deletions

View File

@ -26,12 +26,13 @@ exchangeStrategies:
- on: binance
drift:
minInterval: 1s
limitOrder: true
#quantity: 0.0012
canvasPath: "./output.png"
symbol: BTCUSDT
# kline interval for indicators
interval: 1m
interval: 1s
window: 6
useAtr: true
useStopLoss: true
@ -125,11 +126,12 @@ sync:
- BTCUSDT
backtest:
startTime: "2022-09-25"
endTime: "2022-10-30"
startTime: "2022-10-18"
endTime: "2022-10-19"
symbols:
- BTCUSDT
sessions: [binance]
syncSecKLines: true
accounts:
binance:
makerFeeRate: 0.000

View File

@ -23,6 +23,7 @@ exchangeStrategies:
- on: binance
elliottwave:
minInterval: 1s
symbol: BNBBUSD
limitOrder: true
quantity: 0.16
@ -115,6 +116,7 @@ backtest:
symbols:
- BNBBUSD
sessions: [binance]
syncSecKLines: true
accounts:
binance:
makerFeeRate: 0.000

View File

@ -22,7 +22,7 @@ func NewMarketDataStore(symbol string) *MarketDataStore {
Symbol: symbol,
// KLineWindows stores all loaded klines per interval
KLineWindows: make(map[types.Interval]*types.KLineWindow, len(types.SupportedIntervals)), // 12 interval, 1m,5m,15m,30m,1h,2h,4h,6h,12h,1d,3d,1w
KLineWindows: make(map[types.Interval]*types.KLineWindow, len(types.SupportedIntervals)), // 13 interval, 1s,1m,5m,15m,30m,1h,2h,4h,6h,12h,1d,3d,1w
}
}

View File

@ -1,22 +1,34 @@
package bbgo
import (
"context"
"sync"
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type SerialMarketDataStore struct {
*MarketDataStore
KLines map[types.Interval]*types.KLine
Subscription []types.Interval
UseAggTrade bool
KLines map[types.Interval]*types.KLine
MinInterval types.Interval
Subscription []types.Interval
o, h, l, c, v, qv, price fixedpoint.Value
lock sync.Mutex
}
func NewSerialMarketDataStore(symbol string) *SerialMarketDataStore {
// @param symbol: symbol to trace on
// @param minInterval: unit interval, related to your signal timeframe
// @param useAggTrade: if not assigned, default to false. if assigned to true, will use AggTrade signal to generate klines
func NewSerialMarketDataStore(symbol string, minInterval types.Interval, useAggTrade ...bool) *SerialMarketDataStore {
return &SerialMarketDataStore{
MarketDataStore: NewMarketDataStore(symbol),
KLines: make(map[types.Interval]*types.KLine),
UseAggTrade: len(useAggTrade) > 0 && useAggTrade[0],
Subscription: []types.Interval{},
MinInterval: minInterval,
}
}
@ -30,24 +42,98 @@ func (store *SerialMarketDataStore) Subscribe(interval types.Interval) {
store.Subscription = append(store.Subscription, interval)
}
func (store *SerialMarketDataStore) BindStream(stream types.Stream) {
stream.OnKLineClosed(store.handleKLineClosed)
func (store *SerialMarketDataStore) BindStream(ctx context.Context, stream types.Stream) {
if store.UseAggTrade {
go store.tickerProcessor(ctx)
stream.OnAggTrade(store.handleMarketTrade)
} else {
stream.OnKLineClosed(store.handleKLineClosed)
}
}
func (store *SerialMarketDataStore) handleKLineClosed(kline types.KLine) {
store.AddKLine(kline)
}
func (store *SerialMarketDataStore) AddKLine(kline types.KLine) {
func (store *SerialMarketDataStore) handleMarketTrade(trade types.Trade) {
store.lock.Lock()
store.price = trade.Price
store.c = store.price
if store.price.Compare(store.h) > 0 {
store.h = store.price
}
if !store.l.IsZero() {
if store.price.Compare(store.l) < 0 {
store.l = store.price
}
} else {
store.l = store.price
}
if store.o.IsZero() {
store.o = store.price
}
store.v.Add(trade.Quantity)
store.qv.Add(trade.QuoteQuantity)
store.lock.Unlock()
}
func (store *SerialMarketDataStore) tickerProcessor(ctx context.Context) {
duration := store.MinInterval.Duration()
intervalCloseTicker := time.NewTicker(duration)
defer intervalCloseTicker.Stop()
for {
select {
case time := <-intervalCloseTicker.C:
kline := types.KLine{
Symbol: store.Symbol,
StartTime: types.Time(time.Add(-1 * duration)),
EndTime: types.Time(time),
Interval: store.MinInterval,
Closed: true,
}
store.lock.Lock()
if store.c.IsZero() {
kline.Open = store.price
kline.Close = store.price
kline.High = store.price
kline.Low = store.price
kline.Volume = fixedpoint.Zero
kline.QuoteVolume = fixedpoint.Zero
} else {
kline.Open = store.o
kline.Close = store.c
kline.High = store.h
kline.Low = store.l
kline.Volume = store.v
kline.QuoteVolume = store.qv
store.o = fixedpoint.Zero
store.c = fixedpoint.Zero
store.h = fixedpoint.Zero
store.l = fixedpoint.Zero
store.v = fixedpoint.Zero
store.qv = fixedpoint.Zero
}
store.lock.Unlock()
store.AddKLine(kline, true)
case <-ctx.Done():
return
}
}
}
func (store *SerialMarketDataStore) AddKLine(kline types.KLine, async ...bool) {
if kline.Symbol != store.Symbol {
return
}
// only consumes kline1m
if kline.Interval != types.Interval1m {
// only consumes MinInterval
if kline.Interval != store.MinInterval {
return
}
// endtime in minutes
timestamp := kline.StartTime.Time().Add(time.Minute)
// endtime
duration := store.MinInterval.Duration()
timestamp := kline.StartTime.Time().Round(duration).Add(duration)
for _, val := range store.Subscription {
k, ok := store.KLines[val]
if !ok {
@ -60,9 +146,13 @@ func (store *SerialMarketDataStore) AddKLine(kline types.KLine) {
k.Merge(&kline)
k.Closed = false
}
if timestamp.Truncate(val.Duration()) == timestamp {
if timestamp.Round(val.Duration()) == timestamp {
k.Closed = true
store.MarketDataStore.AddKLine(*k)
if len(async) > 0 && async[0] {
go store.MarketDataStore.AddKLine(*k)
} else {
store.MarketDataStore.AddKLine(*k)
}
delete(store.KLines, val)
}
}

View File

@ -395,8 +395,7 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ
// used kline intervals by the given symbol
var klineSubscriptions = map[types.Interval]struct{}{}
// always subscribe the 1m kline so we can make sure the connection persists.
klineSubscriptions[types.Interval1m] = struct{}{}
minInterval := types.Interval1m
// Aggregate the intervals that we are using in the subscriptions.
for _, sub := range session.Subscriptions {
@ -411,12 +410,20 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ
continue
}
if minInterval.Seconds() > sub.Options.Interval.Seconds() {
minInterval = sub.Options.Interval
}
if sub.Symbol == symbol {
klineSubscriptions[types.Interval(sub.Options.Interval)] = struct{}{}
}
}
}
// always subscribe the 1m kline so we can make sure the connection persists.
klineSubscriptions[minInterval] = struct{}{}
log.Warnf("sub: %v", klineSubscriptions)
for interval := range klineSubscriptions {
// avoid querying the last unclosed kline
endTime := environ.startTime
@ -440,10 +447,12 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ
// update last prices by the given kline
lastKLine := kLines[len(kLines)-1]
if interval == types.Interval1m {
if interval == minInterval {
session.lastPrices[symbol] = lastKLine.Close
}
log.Warnf("load %s", interval)
for _, k := range kLines {
// let market data store trigger the update, so that the indicator could be updated too.
marketDataStore.AddKLine(k)
@ -497,6 +506,7 @@ func (session *ExchangeSession) Positions() map[string]*types.Position {
// MarketDataStore returns the market data store of a symbol
func (session *ExchangeSession) MarketDataStore(symbol string) (s *MarketDataStore, ok bool) {
s, ok = session.marketDataStores[symbol]
// FIXME: the returned MarketDataStore when !ok will be empty
if !ok {
s = NewMarketDataStore(symbol)
s.BindStream(session.MarketDataStream)
@ -507,15 +517,21 @@ func (session *ExchangeSession) MarketDataStore(symbol string) (s *MarketDataSto
}
// KLine updates will be received in the order listend in intervals array
func (session *ExchangeSession) SerialMarketDataStore(symbol string, intervals []types.Interval) (store *SerialMarketDataStore, ok bool) {
func (session *ExchangeSession) SerialMarketDataStore(ctx context.Context, symbol string, intervals []types.Interval, useAggTrade ...bool) (store *SerialMarketDataStore, ok bool) {
st, ok := session.MarketDataStore(symbol)
if !ok {
return nil, false
}
store = NewSerialMarketDataStore(symbol)
klines, ok := st.KLinesOfInterval(types.Interval1m)
minInterval := types.Interval1m
for _, i := range intervals {
if minInterval.Seconds() > i.Seconds() {
minInterval = i
}
}
store = NewSerialMarketDataStore(symbol, minInterval, useAggTrade...)
klines, ok := st.KLinesOfInterval(minInterval)
if !ok {
log.Errorf("SerialMarketDataStore: cannot get 1m history")
log.Errorf("SerialMarketDataStore: cannot get %s history", minInterval)
return nil, false
}
for _, interval := range intervals {
@ -524,7 +540,7 @@ func (session *ExchangeSession) SerialMarketDataStore(symbol string, intervals [
for _, kline := range *klines {
store.AddKLine(kline)
}
store.BindStream(session.MarketDataStream)
store.BindStream(ctx, session.MarketDataStream)
return store, true
}

View File

@ -262,7 +262,7 @@ func (trader *Trader) RunAllSingleExchangeStrategy(ctx context.Context) error {
return nil
}
func (trader *Trader) injectFields() error {
func (trader *Trader) injectFields(ctx context.Context) error {
// load and run Session strategies
for sessionName, strategies := range trader.exchangeStrategies {
var session = trader.environment.sessions[sessionName]
@ -285,8 +285,30 @@ func (trader *Trader) injectFields() error {
return errors.Wrapf(err, "failed to inject OrderExecutor on %T", strategy)
}
if defaulter, ok := strategy.(StrategyDefaulter); ok {
if err := defaulter.Defaults(); err != nil {
panic(err)
}
}
if initializer, ok := strategy.(StrategyInitializer); ok {
if err := initializer.Initialize(); err != nil {
panic(err)
}
}
if subscriber, ok := strategy.(ExchangeSessionSubscriber); ok {
subscriber.Subscribe(session)
} else {
log.Errorf("strategy %s does not implement ExchangeSessionSubscriber", strategy.ID())
}
if symbol, ok := dynamic.LookupSymbolField(rs); ok {
log.Infof("found symbol based strategy from %s", rs.Type())
log.Infof("found symbol %s based strategy from %s", symbol, rs.Type())
if err := session.initSymbol(ctx, trader.environment, symbol); err != nil {
return errors.Wrapf(err, "failed to inject object into %T when initSymbol", strategy)
}
market, ok := session.Market(symbol)
if !ok {
@ -339,12 +361,10 @@ func (trader *Trader) Run(ctx context.Context) error {
// trader.environment.Connect will call interact.Start
interact.AddCustomInteraction(NewCoreInteraction(trader.environment, trader))
if err := trader.injectFields(); err != nil {
if err := trader.injectFields(ctx); err != nil {
return err
}
trader.Subscribe()
if err := trader.environment.Start(ctx); err != nil {
return err
}

View File

@ -65,7 +65,8 @@ type Strategy struct {
*types.ProfitStats `persistence:"profit_stats"`
*types.TradeStats `persistence:"trade_stats"`
p *types.Position
p *types.Position
MinInterval types.Interval `json:"MinInterval"`
priceLines *types.Queue
trendLine types.UpdatableSeriesExtend
@ -78,10 +79,10 @@ type Strategy struct {
lock sync.RWMutex `ignore:"true"`
positionLock sync.RWMutex `ignore:"true"`
startTime time.Time
minutesCounter int
counter int
orderPendingCounter map[uint64]int
frameKLine *types.KLine
kline1m *types.KLine
klineMin *types.KLine
beta float64
@ -135,15 +136,33 @@ func (s *Strategy) InstanceID() string {
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
// by default, bbgo only pre-subscribe 1000 klines.
// this is not enough if we're subscribing 30m intervals using SerialMarketDataStore
maxWindow := (s.Window + s.SmootherWindow + s.FisherTransformWindow) * s.Interval.Minutes()
bbgo.KLinePreloadLimit = int64((maxWindow/1000 + 1) * 1000)
log.Errorf("set kLinePreloadLimit to %d, %d %d", bbgo.KLinePreloadLimit, s.Interval.Minutes(), maxWindow)
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{
Interval: types.Interval1m,
})
if !bbgo.IsBackTesting {
session.Subscribe(types.BookTickerChannel, s.Symbol, types.SubscribeOptions{})
session.Subscribe(types.AggTradeChannel, s.Symbol, types.SubscribeOptions{})
// able to preload
if s.MinInterval.Milliseconds() >= types.Interval1s.Milliseconds() && s.MinInterval.Milliseconds()%types.Interval1s.Milliseconds() == 0 {
maxWindow := (s.Window + s.SmootherWindow + s.FisherTransformWindow) * (s.Interval.Milliseconds() / s.MinInterval.Milliseconds())
bbgo.KLinePreloadLimit = int64((maxWindow/1000 + 1) * 1000)
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{
Interval: s.MinInterval,
})
} else {
bbgo.KLinePreloadLimit = 0
}
} else {
maxWindow := (s.Window + s.SmootherWindow + s.FisherTransformWindow) * (s.Interval.Milliseconds() / s.MinInterval.Milliseconds())
bbgo.KLinePreloadLimit = int64((maxWindow/1000 + 1) * 1000)
// gave up preload
if s.Interval.Milliseconds() < s.MinInterval.Milliseconds() {
bbgo.KLinePreloadLimit = 0
}
log.Errorf("set kLinePreloadLimit to %d, %d %d", bbgo.KLinePreloadLimit, s.Interval.Milliseconds()/s.MinInterval.Milliseconds(), maxWindow)
if bbgo.KLinePreloadLimit > 0 {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{
Interval: s.MinInterval,
})
}
}
s.ExitMethods.SetAndSubscribe(session, s)
}
@ -208,6 +227,9 @@ func (s *Strategy) initIndicators(store *bbgo.SerialMarketDataStore) error {
s.atr = &indicator.ATR{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.ATRWindow}}
s.trendLine = &indicator.EWMA{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.TrendWindow}}
if bbgo.KLinePreloadLimit == 0 {
return nil
}
klines, ok := store.KLinesOfInterval(s.Interval)
klinesLength := len(*klines)
if !ok || klinesLength == 0 {
@ -229,16 +251,15 @@ func (s *Strategy) initIndicators(store *bbgo.SerialMarketDataStore) error {
if s.frameKLine != nil && klines != nil {
s.frameKLine.Set(&(*klines)[len(*klines)-1])
}
klines, ok = store.KLinesOfInterval(types.Interval1m)
klines, ok = store.KLinesOfInterval(s.MinInterval)
klinesLength = len(*klines)
if !ok || klinesLength == 0 {
return errors.New("klines not exists")
}
log.Infof("loaded %d klines1m", klinesLength)
if s.kline1m != nil && klines != nil {
s.kline1m.Set(&(*klines)[len(*klines)-1])
log.Infof("loaded %d klines%s", klinesLength, s.MinInterval)
if s.klineMin != nil && klines != nil {
s.klineMin.Set(&(*klines)[len(*klines)-1])
}
s.startTime = s.kline1m.StartTime.Time().Add(s.kline1m.Interval.Duration())
return nil
}
@ -254,8 +275,8 @@ func (s *Strategy) smartCancel(ctx context.Context, pricef, atr float64) (int, e
if order.Status != types.OrderStatusNew && order.Status != types.OrderStatusPartiallyFilled {
continue
}
log.Warnf("%v | counter: %d, system: %d", order, s.orderPendingCounter[order.OrderID], s.minutesCounter)
if s.minutesCounter-s.orderPendingCounter[order.OrderID] > s.PendingMinutes {
log.Warnf("%v | counter: %d, system: %d", order, s.orderPendingCounter[order.OrderID], s.counter)
if s.counter-s.orderPendingCounter[order.OrderID] > s.PendingMinutes {
toCancel = true
} else if order.Side == types.SideTypeBuy {
// 75% of the probability
@ -272,7 +293,7 @@ func (s *Strategy) smartCancel(ctx context.Context, pricef, atr float64) (int, e
}
}
if toCancel {
err := s.GeneralOrderExecutor.GracefulCancel(ctx)
err := s.GeneralOrderExecutor.CancelNoWait(ctx)
// TODO: clean orderPendingCounter on cancel/trade
for _, order := range nonTraded {
delete(s.orderPendingCounter, order.OrderID)
@ -545,8 +566,8 @@ func (s *Strategy) CalcAssetValue(price fixedpoint.Value) fixedpoint.Value {
return balances[s.Market.BaseCurrency].Total().Mul(price).Add(balances[s.Market.QuoteCurrency].Total())
}
func (s *Strategy) klineHandler1m(ctx context.Context, kline types.KLine) {
s.kline1m.Set(&kline)
func (s *Strategy) klineHandlerMin(ctx context.Context, kline types.KLine) {
s.klineMin.Set(&kline)
if s.Status != types.StrategyStatusRunning {
return
}
@ -667,7 +688,7 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine) {
if exitCondition {
s.positionLock.Unlock()
if err := s.GeneralOrderExecutor.GracefulCancel(ctx); err != nil {
if err := s.GeneralOrderExecutor.CancelNoWait(ctx); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
return
}
@ -680,7 +701,7 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine) {
}
if longCondition {
if err := s.GeneralOrderExecutor.GracefulCancel(ctx); err != nil {
if err := s.GeneralOrderExecutor.CancelNoWait(ctx); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
s.positionLock.Unlock()
return
@ -712,12 +733,12 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine) {
}
log.Infof("orders %v", createdOrders)
if createdOrders != nil {
s.orderPendingCounter[createdOrders[0].OrderID] = s.minutesCounter
s.orderPendingCounter[createdOrders[0].OrderID] = s.counter
}
return
}
if shortCondition {
if err := s.GeneralOrderExecutor.GracefulCancel(ctx); err != nil {
if err := s.GeneralOrderExecutor.CancelNoWait(ctx); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
s.positionLock.Unlock()
return
@ -750,7 +771,7 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine) {
}
log.Infof("orders %v", createdOrders)
if createdOrders != nil {
s.orderPendingCounter[createdOrders[0].OrderID] = s.minutesCounter
s.orderPendingCounter[createdOrders[0].OrderID] = s.counter
}
return
}
@ -800,7 +821,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.GeneralOrderExecutor.Bind()
s.orderPendingCounter = make(map[uint64]int)
s.minutesCounter = 0
s.counter = 0
// Exit methods from config
for _, method := range s.ExitMethods {
@ -862,13 +883,13 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
})
s.frameKLine = &types.KLine{}
s.kline1m = &types.KLine{}
s.klineMin = &types.KLine{}
s.priceLines = types.NewQueue(300)
s.initTickerFunctions(ctx)
startTime := s.Environment.StartTime()
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, startTime))
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1w, startTime))
s.startTime = s.Environment.StartTime()
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, s.startTime))
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1w, s.startTime))
// default value: use 1m kline
if !s.NoTrailingStopLoss && s.IsBackTesting() || s.TrailingStopLossType == "" {
@ -933,21 +954,22 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
bbgo.RegisterModifier(s)
// event trigger order: s.Interval => Interval1m
store, ok := session.SerialMarketDataStore(s.Symbol, []types.Interval{s.Interval, types.Interval1m})
// event trigger order: s.Interval => s.MinInterval
store, ok := session.SerialMarketDataStore(ctx, s.Symbol, []types.Interval{s.Interval, s.MinInterval}, !bbgo.IsBackTesting)
if !ok {
panic("cannot get 1m history")
panic("cannot get " + s.MinInterval + " history")
}
if err := s.initIndicators(store); err != nil {
log.WithError(err).Errorf("initIndicator failed")
return nil
}
store.OnKLineClosed(func(kline types.KLine) {
s.minutesCounter = int(kline.StartTime.Time().Add(kline.Interval.Duration()).Sub(s.startTime).Minutes())
s.counter = int(kline.StartTime.Time().Add(kline.Interval.Duration()).Sub(s.startTime).Milliseconds())
if kline.Interval == s.Interval {
s.klineHandler(ctx, kline)
} else if kline.Interval == types.Interval1m {
s.klineHandler1m(ctx, kline)
} else if kline.Interval == s.MinInterval {
s.klineHandlerMin(ctx, kline)
}
})

View File

@ -44,6 +44,7 @@ type Strategy struct {
Session *bbgo.ExchangeSession
Interval types.Interval `json:"interval"`
MinInterval types.Interval `json:"minInterval"`
Stoploss fixedpoint.Value `json:"stoploss" modifiable:"true"`
WindowATR int `json:"windowATR"`
WindowQuick int `json:"windowQuick"`
@ -74,7 +75,7 @@ type Strategy struct {
// for smart cancel
orderPendingCounter map[uint64]int
startTime time.Time
minutesCounter int
counter int
// for position
buyPrice float64 `persistence:"buy_price"`
@ -101,12 +102,22 @@ func (s *Strategy) InstanceID() string {
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
// by default, bbgo only pre-subscribe 1000 klines.
// this is not enough if we're subscribing 30m intervals using SerialMarketDataStore
bbgo.KLinePreloadLimit = int64((s.Interval.Minutes()*s.WindowSlow/1000 + 1) + 1000)
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{
Interval: types.Interval1m,
})
if !bbgo.IsBackTesting {
session.Subscribe(types.BookTickerChannel, s.Symbol, types.SubscribeOptions{})
session.Subscribe(types.AggTradeChannel, s.Symbol, types.SubscribeOptions{})
if s.MinInterval.Milliseconds() >= types.Interval1s.Milliseconds() && s.MinInterval.Milliseconds()%types.Interval1s.Milliseconds() == 0 {
bbgo.KLinePreloadLimit = int64(((s.Interval.Milliseconds()/s.MinInterval.Milliseconds())*s.WindowSlow/1000 + 1) + 1000)
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{
Interval: s.MinInterval,
})
} else {
bbgo.KLinePreloadLimit = 0
}
} else {
bbgo.KLinePreloadLimit = int64((s.Interval.Milliseconds()/s.MinInterval.Milliseconds()*s.WindowSlow/1000 + 1) + 1000)
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{
Interval: s.MinInterval,
})
}
s.ExitMethods.SetAndSubscribe(session, s)
}
@ -162,8 +173,6 @@ func (s *Strategy) initIndicators(store *bbgo.SerialMarketDataStore) error {
if !ok || klineLength == 0 {
return errors.New("klines not exists")
}
tmpK := (*klines)[klineLength-1]
s.startTime = tmpK.StartTime.Time().Add(tmpK.Interval.Duration())
s.heikinAshi = NewHeikinAshi(500)
for _, kline := range *klines {
@ -185,9 +194,9 @@ func (s *Strategy) smartCancel(ctx context.Context, pricef float64) int {
if order.Status != types.OrderStatusNew && order.Status != types.OrderStatusPartiallyFilled {
continue
}
log.Warnf("%v | counter: %d, system: %d", order, s.orderPendingCounter[order.OrderID], s.minutesCounter)
log.Warnf("%v | counter: %d, system: %d", order, s.orderPendingCounter[order.OrderID], s.counter)
toCancel := false
if s.minutesCounter-s.orderPendingCounter[order.OrderID] >= s.PendingMinutes {
if s.counter-s.orderPendingCounter[order.OrderID] >= s.PendingMinutes {
toCancel = true
} else if order.Side == types.SideTypeBuy {
if order.Price.Float64()+s.atr.Last()*2 <= pricef {
@ -315,7 +324,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.GeneralOrderExecutor.Bind()
s.orderPendingCounter = make(map[uint64]int)
s.minutesCounter = 0
s.counter = 0
for _, method := range s.ExitMethods {
method.Bind(session, s.GeneralOrderExecutor)
@ -355,16 +364,16 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
})
s.initTickerFunctions()
startTime := s.Environment.StartTime()
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, startTime))
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1w, startTime))
s.startTime = s.Environment.StartTime()
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, s.startTime))
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1w, s.startTime))
s.initOutputCommands()
// event trigger order: s.Interval => Interval1m
store, ok := session.SerialMarketDataStore(s.Symbol, []types.Interval{s.Interval, types.Interval1m})
// event trigger order: s.Interval => minInterval
store, ok := session.SerialMarketDataStore(ctx, s.Symbol, []types.Interval{s.Interval, s.MinInterval}, !bbgo.IsBackTesting)
if !ok {
panic("cannot get 1m history")
panic("cannot get " + s.MinInterval + " history")
}
if err := s.initIndicators(store); err != nil {
log.WithError(err).Errorf("initIndicator failed")
@ -372,11 +381,11 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
}
s.InitDrawCommands(store, &profit, &cumProfit)
store.OnKLineClosed(func(kline types.KLine) {
s.minutesCounter = int(kline.StartTime.Time().Add(kline.Interval.Duration()).Sub(s.startTime).Minutes())
s.counter = int(kline.StartTime.Time().Add(kline.Interval.Duration()).Sub(s.startTime).Milliseconds())
if kline.Interval == s.Interval {
s.klineHandler(ctx, kline)
} else if kline.Interval == types.Interval1m {
s.klineHandler1m(ctx, kline)
} else if kline.Interval == s.MinInterval {
s.klineHandlerMin(ctx, kline)
}
})
@ -401,7 +410,7 @@ func (s *Strategy) CalcAssetValue(price fixedpoint.Value) fixedpoint.Value {
return balances[s.Market.BaseCurrency].Total().Mul(price).Add(balances[s.Market.QuoteCurrency].Total())
}
func (s *Strategy) klineHandler1m(ctx context.Context, kline types.KLine) {
func (s *Strategy) klineHandlerMin(ctx context.Context, kline types.KLine) {
if s.Status != types.StrategyStatusRunning {
return
}
@ -513,7 +522,7 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine) {
return
}
if createdOrders != nil {
s.orderPendingCounter[createdOrders[0].OrderID] = s.minutesCounter
s.orderPendingCounter[createdOrders[0].OrderID] = s.counter
}
return
}
@ -539,7 +548,7 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine) {
return
}
if createdOrders != nil {
s.orderPendingCounter[createdOrders[0].OrderID] = s.minutesCounter
s.orderPendingCounter[createdOrders[0].OrderID] = s.counter
}
return
}

View File

@ -25,8 +25,41 @@ func (i Interval) Seconds() int {
return m
}
// specially handled, for better precision
func (i Interval) Milliseconds() int {
t := 0
index := 0
for i, rn := range string(i) {
if rn >= '0' && rn <= '9' {
t = t*10 + int(rn-'0')
} else {
index = i
break
}
}
switch strings.ToLower(string(i[index:])) {
case "ms":
return t
case "s":
return t * 1000
case "m":
t *= 60
case "h":
t *= 60 * 60
case "d":
t *= 60 * 60 * 24
case "w":
t *= 60 * 60 * 24 * 7
case "mo":
t *= 60 * 60 * 24 * 30
default:
panic("unknown interval input: " + i)
}
return t * 1000
}
func (i Interval) Duration() time.Duration {
return time.Duration(i.Seconds()) * time.Second
return time.Duration(i.Milliseconds()) * time.Millisecond
}
func (i *Interval) UnmarshalJSON(b []byte) (err error) {
@ -53,6 +86,7 @@ func (s IntervalSlice) StringSlice() (slice []string) {
return slice
}
var Interval1ms = Interval("1ms")
var Interval1s = Interval("1s")
var Interval1m = Interval("1m")
var Interval3m = Interval("3m")