Merge pull request #1004 from austin362667/austin362667/irr

strategy:irr rollback to original nirr and consume kline
This commit is contained in:
Yo-An Lin 2022-11-03 13:48:22 +08:00 committed by GitHub
commit b80ac89486
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 75 additions and 163 deletions

View File

@ -10,23 +10,31 @@ sessions:
binance:
exchange: binance
envVarPrefix: binance
max:
exchange: max
envVarPrefix: max
ftx:
exchange: ftx
envVarPrefix: ftx
exchangeStrategies:
- on: binance
irr:
symbol: BTCBUSD
# in milliseconds(ms)
# must > 10 ms
hftInterval: 1000
# qty per trade
quantity: 0.001
symbol: BTCUSDT
interval: 1m
window: 10
amount: 5000
# Draw pnl
drawGraph: true
graphPNLPath: "./pnl.png"
graphCumPNLPath: "./cumpnl.png"
backtest:
startTime: "2022-01-01"
endTime: "2022-11-01"
symbols:
- BTCUSDT
sessions: [binance]
# syncSecKLines: true
accounts:
binance:
makerFeeRate: 0.0000
takerFeeRate: 0.0000
balances:
BTC: 0.0
USDT: 5000

View File

@ -18,10 +18,12 @@ type NRR struct {
types.SeriesBase
RankingWindow int
delay bool
prices *types.Queue
Prices *types.Queue
Values floats.Slice
RankedValues floats.Slice
ReturnValues floats.Slice
EndTime time.Time
@ -33,20 +35,22 @@ var _ types.SeriesExtend = &NRR{}
func (inc *NRR) Update(openPrice, closePrice float64) {
if inc.SeriesBase.Series == nil {
inc.SeriesBase.Series = inc
inc.Prices = types.NewQueue(inc.Window)
}
inc.Prices.Update(closePrice)
if inc.Prices.Length() < inc.Window {
return
inc.prices = types.NewQueue(inc.Window)
}
inc.prices.Update(closePrice)
// D0
irr := openPrice - closePrice
// D1
// -1*((inc.Prices.Last() / inc.Prices.Index(inc.Window-1)) - 1)
nirr := (openPrice - closePrice) / openPrice
irr := (closePrice - openPrice) / openPrice
if inc.prices.Length() >= inc.Window && inc.delay {
// D1
nirr = -1 * ((inc.prices.Last() / inc.prices.Index(inc.Window-1)) - 1)
irr = (inc.prices.Last() / inc.prices.Index(inc.Window-1)) - 1
}
inc.Values.Push(irr) // neg ret here
inc.Values.Push(nirr) // neg ret here
inc.RankedValues.Push(inc.Rank(inc.RankingWindow).Last() / float64(inc.RankingWindow)) // ranked neg ret here
inc.ReturnValues.Push(irr)
}
func (inc *NRR) Last() float64 {

View File

@ -5,8 +5,6 @@ import (
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/data/tsv"
@ -14,6 +12,7 @@ import (
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/types"
"github.com/sirupsen/logrus"
)
@ -42,23 +41,14 @@ type Strategy struct {
activeOrders *bbgo.ActiveOrderBook
ExitMethods bbgo.ExitMethodSet `json:"exits"`
ExitMethods bbgo.ExitMethodSet `json:"exits"`
session *bbgo.ExchangeSession
orderExecutor *bbgo.GeneralOrderExecutor
bbgo.QuantityOrAmount
Interval int `json:"hftInterval"`
// realtime book ticker to submit order
obBuyPrice uint64
obSellPrice uint64
// for getting close price
currentTradePrice uint64
// for negative return rate
openPrice float64
closePrice float64
nrr *NRR
stopC chan struct{}
@ -207,11 +197,7 @@ func (r *AccumulatedProfitReport) Output(symbol string) {
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
if !bbgo.IsBackTesting {
session.Subscribe(types.AggTradeChannel, s.Symbol, types.SubscribeOptions{})
session.Subscribe(types.BookTickerChannel, s.Symbol, types.SubscribeOptions{})
}
//session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
}
func (s *Strategy) ID() string {
@ -339,130 +325,44 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.orderExecutor.Bind()
s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol)
atomic.SwapUint64(&s.currentTradePrice, 0.)
s.closePrice = 0.
s.openPrice = 0.
klinDirections := types.NewQueue(100)
started := false
boxOpenPrice := 0.
boxClosePrice := 0.
boxCounter := 0
if !bbgo.IsBackTesting {
s.session.MarketDataStream.OnBookTickerUpdate(func(bt types.BookTicker) {
// quote order book price
newBid := uint64(bt.Buy.Float64())
newAsk := uint64(bt.Sell.Float64())
atomic.SwapUint64(&s.obBuyPrice, newBid)
atomic.SwapUint64(&s.obSellPrice, newAsk)
})
s.session.MarketDataStream.OnAggTrade(func(trade types.Trade) {
tradePrice := uint64(trade.Price.Float64())
atomic.SwapUint64(&s.currentTradePrice, tradePrice)
})
closeTime := <-time.After(time.Duration(s.Interval-int(time.Now().UnixMilli())%s.Interval) * time.Millisecond)
log.Infof("kline close timing synced @ %s", closeTime.Format("2006-01-02 15:04:05.000000"))
go func() {
intervalCloseTicker := time.NewTicker(time.Duration(s.Interval) * time.Millisecond)
defer intervalCloseTicker.Stop()
for {
select {
case <-intervalCloseTicker.C:
log.Infof("kline close time @ %s", time.Now().Format("2006-01-02 15:04:05.000000"))
s.orderExecutor.CancelNoWait(context.Background())
if s.currentTradePrice > 0 {
s.closePrice = float64(s.currentTradePrice)
log.Infof("Close Price: %f", s.closePrice)
if s.closePrice > 0 && s.openPrice > 0 {
direction := s.closePrice - s.openPrice
klinDirections.Update(direction)
regimeShift := klinDirections.Index(0)*klinDirections.Index(1) < 0
if regimeShift && !started {
boxOpenPrice = s.openPrice
started = true
boxCounter = 0
log.Infof("box started at price: %f", boxOpenPrice)
} else if regimeShift && started {
boxClosePrice = s.closePrice
started = false
log.Infof("box ended at price: %f with time length: %d", boxClosePrice, boxCounter)
// box ending, should re-balance position
nirr := fixedpoint.NewFromFloat(((boxOpenPrice - boxClosePrice) / boxOpenPrice) / (float64(boxCounter) + 1))
log.Infof("Alpha: %f", nirr.Float64())
if nirr.Float64() < 0 {
_, err := s.orderExecutor.SubmitOrders(context.Background(), types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeSell,
Quantity: s.Quantity,
Type: types.OrderTypeLimitMaker,
Price: fixedpoint.NewFromFloat(float64(s.obSellPrice)),
Tag: "irrSell",
})
if err != nil {
log.WithError(err)
}
} else if nirr.Float64() > 0 {
_, err := s.orderExecutor.SubmitOrders(context.Background(), types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeBuy,
Quantity: s.Quantity,
Type: types.OrderTypeLimitMaker,
Price: fixedpoint.NewFromFloat(float64(s.obBuyPrice)),
Tag: "irrBuy",
})
if err != nil {
log.WithError(err)
}
}
} else {
boxCounter++
}
}
}
case <-s.stopC:
log.Warnf("%s goroutine stopped, due to the stop signal", s.Symbol)
return
case <-ctx.Done():
log.Warnf("%s goroutine stopped, due to the cancelled context", s.Symbol)
return
}
}
}()
openTime := <-time.After(time.Duration(s.Interval-int(time.Now().UnixMilli())%s.Interval) * time.Millisecond)
log.Infof("kline open timing synced @ %s", openTime.Format("2006-01-02 15:04:05.000000"))
go func() {
intervalOpenTicker := time.NewTicker(time.Duration(s.Interval) * time.Millisecond)
defer intervalOpenTicker.Stop()
for {
select {
case <-intervalOpenTicker.C:
time.Sleep(10 * time.Millisecond)
log.Infof("kline open time @ %s", time.Now().Format("2006-01-02 15:04:05.000000"))
if s.currentTradePrice > 0 && s.closePrice > 0 {
s.openPrice = float64(s.currentTradePrice)
log.Infof("Open Price: %f", s.openPrice)
}
case <-s.stopC:
log.Warnf("%s goroutine stopped, due to the stop signal", s.Symbol)
return
case <-ctx.Done():
log.Warnf("%s goroutine stopped, due to the cancelled context", s.Symbol)
return
}
}
}()
kLineStore, _ := s.session.MarketDataStore(s.Symbol)
// window = 2 means day-to-day return, previousClose/currentClose -1
// delay = false means use open/close-1 as D0 return (default)
// delay = true means use open/close-1 as 10 return
s.nrr = &NRR{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: 2}, RankingWindow: s.Window, delay: true}
s.nrr.BindK(s.session.MarketDataStream, s.Symbol, s.nrr.Interval)
if klines, ok := kLineStore.KLinesOfInterval(s.nrr.Interval); ok {
s.nrr.LoadK((*klines)[0:])
}
s.session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) {
alphaNrr := fixedpoint.NewFromFloat(s.nrr.RankedValues.Index(1))
// alpha-weighted inventory and cash
targetBase := s.QuantityOrAmount.CalculateQuantity(kline.Close).Mul(alphaNrr)
diffQty := targetBase.Sub(s.Position.Base)
log.Info(alphaNrr.Float64(), s.Position.Base, diffQty.Float64())
s.orderExecutor.FastCancel(ctx)
if diffQty.Sign() > 0 {
_, _ = s.orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeBuy,
Quantity: diffQty.Abs(),
Type: types.OrderTypeMarket,
Tag: "irrBuy",
})
} else if diffQty.Sign() < 0 {
_, _ = s.orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeSell,
Quantity: diffQty.Abs(),
Type: types.OrderTypeMarket,
Tag: "irrSell",
})
}
}))
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
// Output accumulated profit report