Merge pull request #989 from austin362667/austin362667/irr

strategy:irr: a mean reversion based on box of klines in same direction
This commit is contained in:
Yo-An Lin 2022-11-02 12:59:23 +08:00 committed by GitHub
commit 335b90a97c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 192 additions and 103 deletions

View File

@ -10,28 +10,23 @@ sessions:
binance:
exchange: binance
envVarPrefix: binance
max:
exchange: max
envVarPrefix: max
ftx:
exchange: ftx
envVarPrefix: ftx
exchangeStrategies:
- on: binance
irr:
symbol: BTCBUSD
interval: 1m
window: 120
amount: 5_000.0
# in milliseconds(ms)
# must > 10 ms
hftInterval: 1000
# qty per trade
quantity: 0.001
# Draw pnl
drawGraph: true
graphPNLPath: "./pnl.png"
graphCumPNLPath: "./cumpnl.png"
backtest:
sessions:
- binance
startTime: "2022-09-01"
endTime: "2022-10-04"
symbols:
- BTCBUSD
accounts:
binance:
takerFeeRate: 0.0
balances:
BUSD: 5_000.0

View File

@ -11,23 +11,36 @@ import (
"github.com/wcharczuk/go-chart/v2"
)
func (s *Strategy) InitDrawCommands(profit, cumProfit types.Series) {
bbgo.RegisterCommand("/pnl", "Draw PNL(%) per trade", func(reply interact.Reply) {
func (s *Strategy) InitDrawCommands(profit, cumProfit, cumProfitDollar types.Series) {
bbgo.RegisterCommand("/rt", "Draw Return Rate(%) Per Trade", func(reply interact.Reply) {
canvas := DrawPNL(s.InstanceID(), profit)
var buffer bytes.Buffer
if err := canvas.Render(chart.PNG, &buffer); err != nil {
log.WithError(err).Errorf("cannot render pnl in drift")
reply.Message(fmt.Sprintf("[error] cannot render pnl in ewo: %v", err))
log.WithError(err).Errorf("cannot render return in irr")
reply.Message(fmt.Sprintf("[error] cannot render return in irr: %v", err))
return
}
bbgo.SendPhoto(&buffer)
})
bbgo.RegisterCommand("/cumpnl", "Draw Cummulative PNL(Quote)", func(reply interact.Reply) {
bbgo.RegisterCommand("/nav", "Draw Net Assets Value", func(reply interact.Reply) {
canvas := DrawCumPNL(s.InstanceID(), cumProfit)
var buffer bytes.Buffer
if err := canvas.Render(chart.PNG, &buffer); err != nil {
log.WithError(err).Errorf("cannot render cumpnl in drift")
reply.Message(fmt.Sprintf("[error] canot render cumpnl in drift: %v", err))
log.WithError(err).Errorf("cannot render nav in irr")
reply.Message(fmt.Sprintf("[error] canot render nav in irr: %v", err))
return
}
bbgo.SendPhoto(&buffer)
})
bbgo.RegisterCommand("/pnl", "Draw Cumulative Profit & Loss", func(reply interact.Reply) {
canvas := DrawCumPNL(s.InstanceID(), cumProfitDollar)
var buffer bytes.Buffer
if err := canvas.Render(chart.PNG, &buffer); err != nil {
log.WithError(err).Errorf("cannot render pnl in irr")
reply.Message(fmt.Sprintf("[error] canot render pnl in irr: %v", err))
return
}
bbgo.SendPhoto(&buffer)
@ -77,7 +90,7 @@ func DrawPNL(instanceID string, profit types.Series) *types.Canvas {
func DrawCumPNL(instanceID string, cumProfit types.Series) *types.Canvas {
canvas := types.NewCanvas(instanceID)
canvas.PlotRaw("cummulative pnl", cumProfit, cumProfit.Length())
canvas.PlotRaw("cumulative pnl", cumProfit, cumProfit.Length())
canvas.YAxis = chart.YAxis{
ValueFormatter: func(v interface{}) string {
if vf, isFloat := v.(float64); isFloat {

View File

@ -30,18 +30,21 @@ type NRR struct {
var _ types.SeriesExtend = &NRR{}
func (inc *NRR) Update(price float64) {
func (inc *NRR) Update(openPrice, closePrice float64) {
if inc.SeriesBase.Series == nil {
inc.SeriesBase.Series = inc
inc.Prices = types.NewQueue(inc.Window)
}
inc.Prices.Update(price)
inc.Prices.Update(closePrice)
if inc.Prices.Length() < inc.Window {
return
}
irr := (inc.Prices.Last() / inc.Prices.Index(inc.Window-1)) - 1
// D0
irr := openPrice - closePrice
// D1
// -1*((inc.Prices.Last() / inc.Prices.Index(inc.Window-1)) - 1)
inc.Values.Push(-irr) // neg ret here
inc.Values.Push(irr) // neg ret here
inc.RankedValues.Push(inc.Rank(inc.RankingWindow).Last() / float64(inc.RankingWindow)) // ranked neg ret here
}
@ -75,7 +78,7 @@ func (inc *NRR) PushK(k types.KLine) {
return
}
inc.Update(indicator.KLineClosePriceMapper(k))
inc.Update(indicator.KLineOpenPriceMapper(k), indicator.KLineClosePriceMapper(k))
inc.EndTime = k.EndTime.Time()
inc.EmitUpdate(inc.Last())
}
@ -86,14 +89,3 @@ func (inc *NRR) LoadK(allKLines []types.KLine) {
}
inc.EmitUpdate(inc.Last())
}
//func calculateReturn(klines []types.KLine, window int, val KLineValueMapper) (float64, error) {
// length := len(klines)
// if length == 0 || length < window {
// return 0.0, fmt.Errorf("insufficient elements for calculating VOL with window = %d", window)
// }
//
// rate := val(klines[length-1])/val(klines[length-2]) - 1
//
// return rate, nil
//}

View File

@ -3,15 +3,17 @@ package irr
import (
"context"
"fmt"
"os"
"sync"
"sync/atomic"
"time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/data/tsv"
"github.com/c9s/bbgo/pkg/datatype/floats"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/types"
"os"
"sync"
"github.com/sirupsen/logrus"
)
@ -19,7 +21,6 @@ const ID = "irr"
var one = fixedpoint.One
var zero = fixedpoint.Zero
var Fee = 0.0008 // taker fee % * 2, for upper bound
var log = logrus.WithField("strategy", ID)
@ -47,7 +48,19 @@ type Strategy struct {
orderExecutor *bbgo.GeneralOrderExecutor
bbgo.QuantityOrAmount
nrr *NRR
Interval int `json:"hftInterval"`
// realtime book ticker to submit order
obBuyPrice uint64
obSellPrice uint64
// for getting close price
currentTradePrice uint64
// for negative return rate
openPrice float64
closePrice float64
stopC chan struct{}
// StrategyController
bbgo.StrategyController
@ -194,13 +207,11 @@ func (r *AccumulatedProfitReport) Output(symbol string) {
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
if !bbgo.IsBackTesting {
session.Subscribe(types.MarketTradeChannel, s.Symbol, types.SubscribeOptions{})
session.Subscribe(types.AggTradeChannel, s.Symbol, types.SubscribeOptions{})
session.Subscribe(types.BookTickerChannel, s.Symbol, types.SubscribeOptions{})
}
s.ExitMethods.SetAndSubscribe(session, s)
//session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
}
func (s *Strategy) ID() string {
@ -238,7 +249,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// Cancel active orders
_ = s.orderExecutor.GracefulCancel(ctx)
// Close 100% position
// _ = s.ClosePosition(ctx, fixedpoint.One)
_ = s.orderExecutor.ClosePosition(ctx, fixedpoint.One)
})
// initial required information
@ -273,9 +284,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.AccumulatedProfitReport.RecordProfit(profit.Profit)
})
// s.orderExecutor.TradeCollector().OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) {
// s.AccumulatedProfitReport.RecordTrade(trade.Fee)
// })
session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1d, func(kline types.KLine) {
s.AccumulatedProfitReport.DailyUpdate(s.TradeStats)
}))
@ -286,6 +294,8 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
price, _ := session.LastPrice(s.Symbol)
initAsset := s.CalcAssetValue(price).Float64()
cumProfitSlice := floats.Slice{initAsset, initAsset}
profitDollarSlice := floats.Slice{0, 0}
cumProfitDollarSlice := floats.Slice{0, 0}
s.orderExecutor.TradeCollector().OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) {
if bbgo.IsBackTesting {
@ -301,6 +311,8 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
profitSlice.Update(s.sellPrice / price)
cumProfitSlice.Update(s.CalcAssetValue(trade.Price).Float64())
}
profitDollarSlice.Update(profit.Float64())
cumProfitDollarSlice.Update(profitDollarSlice.Sum())
if s.Position.IsDust(trade.Price) {
s.buyPrice = 0
s.sellPrice = 0
@ -319,63 +331,140 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
}
})
s.InitDrawCommands(&profitSlice, &cumProfitSlice, &cumProfitDollarSlice)
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(ctx, s)
})
s.orderExecutor.Bind()
s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol)
for _, method := range s.ExitMethods {
method.Bind(session, s.orderExecutor)
atomic.SwapUint64(&s.currentTradePrice, 0.)
s.closePrice = 0.
s.openPrice = 0.
klinDirections := types.NewQueue(100)
started := false
boxOpenPrice := 0.
boxClosePrice := 0.
boxCounter := 0
if !bbgo.IsBackTesting {
s.session.MarketDataStream.OnBookTickerUpdate(func(bt types.BookTicker) {
// quote order book price
newBid := uint64(bt.Buy.Float64())
newAsk := uint64(bt.Sell.Float64())
atomic.SwapUint64(&s.obBuyPrice, newBid)
atomic.SwapUint64(&s.obSellPrice, newAsk)
})
s.session.MarketDataStream.OnAggTrade(func(trade types.Trade) {
tradePrice := uint64(trade.Price.Float64())
atomic.SwapUint64(&s.currentTradePrice, tradePrice)
})
closeTime := <-time.After(time.Duration(s.Interval-int(time.Now().UnixMilli())%s.Interval) * time.Millisecond)
log.Infof("kline close timing synced @ %s", closeTime.Format("2006-01-02 15:04:05.000000"))
go func() {
intervalCloseTicker := time.NewTicker(time.Duration(s.Interval) * time.Millisecond)
defer intervalCloseTicker.Stop()
for {
select {
case <-intervalCloseTicker.C:
log.Infof("kline close time @ %s", time.Now().Format("2006-01-02 15:04:05.000000"))
s.orderExecutor.CancelNoWait(context.Background())
if s.currentTradePrice > 0 {
s.closePrice = float64(s.currentTradePrice)
log.Infof("Close Price: %f", s.closePrice)
if s.closePrice > 0 && s.openPrice > 0 {
direction := s.closePrice - s.openPrice
klinDirections.Update(direction)
regimeShift := klinDirections.Index(0)*klinDirections.Index(1) < 0
if regimeShift && !started {
boxOpenPrice = s.openPrice
started = true
boxCounter = 0
log.Infof("box started at price: %f", boxOpenPrice)
} else if regimeShift && started {
boxClosePrice = s.closePrice
started = false
log.Infof("box ended at price: %f with time length: %d", boxClosePrice, boxCounter)
// box ending, should re-balance position
nirr := fixedpoint.NewFromFloat(((boxOpenPrice - boxClosePrice) / boxOpenPrice) / (float64(boxCounter) + 1))
log.Infof("Alpha: %f", nirr.Float64())
if nirr.Float64() < 0 {
_, err := s.orderExecutor.SubmitOrders(context.Background(), types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeSell,
Quantity: s.Quantity,
Type: types.OrderTypeLimitMaker,
Price: fixedpoint.NewFromFloat(float64(s.obSellPrice)),
Tag: "irrSell",
})
if err != nil {
log.WithError(err)
}
} else if nirr.Float64() > 0 {
_, err := s.orderExecutor.SubmitOrders(context.Background(), types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeBuy,
Quantity: s.Quantity,
Type: types.OrderTypeLimitMaker,
Price: fixedpoint.NewFromFloat(float64(s.obBuyPrice)),
Tag: "irrBuy",
})
if err != nil {
log.WithError(err)
}
}
} else {
boxCounter++
}
}
}
case <-s.stopC:
log.Warnf("%s goroutine stopped, due to the stop signal", s.Symbol)
return
case <-ctx.Done():
log.Warnf("%s goroutine stopped, due to the cancelled context", s.Symbol)
return
}
}
}()
openTime := <-time.After(time.Duration(s.Interval-int(time.Now().UnixMilli())%s.Interval) * time.Millisecond)
log.Infof("kline open timing synced @ %s", openTime.Format("2006-01-02 15:04:05.000000"))
go func() {
intervalOpenTicker := time.NewTicker(time.Duration(s.Interval) * time.Millisecond)
defer intervalOpenTicker.Stop()
for {
select {
case <-intervalOpenTicker.C:
time.Sleep(10 * time.Millisecond)
log.Infof("kline open time @ %s", time.Now().Format("2006-01-02 15:04:05.000000"))
if s.currentTradePrice > 0 && s.closePrice > 0 {
s.openPrice = float64(s.currentTradePrice)
log.Infof("Open Price: %f", s.openPrice)
}
case <-s.stopC:
log.Warnf("%s goroutine stopped, due to the stop signal", s.Symbol)
return
case <-ctx.Done():
log.Warnf("%s goroutine stopped, due to the cancelled context", s.Symbol)
return
}
}
}()
}
kLineStore, _ := s.session.MarketDataStore(s.Symbol)
s.nrr = &NRR{IntervalWindow: types.IntervalWindow{Window: 2, Interval: s.Interval}, RankingWindow: s.Window}
s.nrr.BindK(s.session.MarketDataStream, s.Symbol, s.Interval)
if klines, ok := kLineStore.KLinesOfInterval(s.nrr.Interval); ok {
s.nrr.LoadK((*klines)[0:])
}
// startTime := s.Environment.StartTime()
// s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1h, startTime))
s.session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) {
// ts_rank(): transformed to [0~1] which divided equally
// queued first signal as its initial process
// important: delayed signal in order to submit order at current kline close (a.k.a. next open while in production)
// instead of right in current kline open
// alpha-weighted assets (inventory and capital)
targetBase := s.QuantityOrAmount.CalculateQuantity(kline.Close).Mul(fixedpoint.NewFromFloat(s.nrr.RankedValues.Index(1)))
diffQty := targetBase.Sub(s.Position.Base)
log.Infof("decision alpah: %f, ranked negative return: %f, current position: %f, target position diff: %f", s.nrr.RankedValues.Index(1), s.nrr.RankedValues.Last(), s.Position.Base.Float64(), diffQty.Float64())
// use kline direction to prevent reversing position too soon
if diffQty.Sign() > 0 { // && kline.Direction() >= 0
_, _ = s.orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeBuy,
Quantity: diffQty.Abs(),
Type: types.OrderTypeMarket,
Tag: "irr buy more",
})
} else if diffQty.Sign() < 0 { // && kline.Direction() <= 0
_, _ = s.orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeSell,
Quantity: diffQty.Abs(),
Type: types.OrderTypeMarket,
Tag: "irr sell more",
})
}
}))
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
// Output accumulated profit report
if bbgo.IsBackTesting {
defer s.AccumulatedProfitReport.Output(s.Symbol)
@ -385,12 +474,12 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
log.WithError(err).Errorf("cannot draw graph")
}
}
} else {
close(s.stopC)
}
_, _ = fmt.Fprintln(os.Stderr, s.TradeStats.String())
_ = s.orderExecutor.GracefulCancel(ctx)
})
return nil
}