Merge pull request #1226 from c9s/c9s/base-strategy

REFACTOR: pull out base strategy struct
This commit is contained in:
c9s 2023-07-10 17:50:12 +08:00 committed by GitHub
commit 1da94f55e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
83 changed files with 851 additions and 556 deletions

53
config/rsicross.yaml Normal file
View File

@ -0,0 +1,53 @@
persistence:
json:
directory: var/data
redis:
host: 127.0.0.1
port: 6379
db: 0
sessions:
binance:
exchange: binance
envVarPrefix: binance
exchangeStrategies:
- on: binance
rsicross:
symbol: BTCUSDT
interval: 5m
fastWindow: 3
slowWindow: 12
quantity: 0.1
### RISK CONTROLS
## circuitBreakEMA is used for calculating the price for circuitBreak
# circuitBreakEMA:
# interval: 1m
# window: 14
## circuitBreakLossThreshold is the maximum loss threshold for realized+unrealized PnL
# circuitBreakLossThreshold: -10.0
## positionHardLimit is the maximum position limit
# positionHardLimit: 500.0
## maxPositionQuantity is the maximum quantity per order that could be controlled in positionHardLimit,
## this parameter is used with positionHardLimit togerther
# maxPositionQuantity: 10.0
backtest:
startTime: "2022-01-01"
endTime: "2022-02-01"
symbols:
- BTCUSDT
sessions: [binance]
# syncSecKLines: true
accounts:
binance:
makerFeeRate: 0.0%
takerFeeRate: 0.075%
balances:
BTC: 0.0
USDT: 10000.0

View File

@ -3,7 +3,7 @@ package bbgo
import (
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/indicator/v2"
"github.com/c9s/bbgo/pkg/types"
)
@ -16,8 +16,8 @@ type IndicatorSet struct {
store *MarketDataStore
// caches
kLines map[types.Interval]*indicator.KLineStream
closePrices map[types.Interval]*indicator.PriceStream
kLines map[types.Interval]*indicatorv2.KLineStream
closePrices map[types.Interval]*indicatorv2.PriceStream
}
func NewIndicatorSet(symbol string, stream types.Stream, store *MarketDataStore) *IndicatorSet {
@ -26,17 +26,17 @@ func NewIndicatorSet(symbol string, stream types.Stream, store *MarketDataStore)
store: store,
stream: stream,
kLines: make(map[types.Interval]*indicator.KLineStream),
closePrices: make(map[types.Interval]*indicator.PriceStream),
kLines: make(map[types.Interval]*indicatorv2.KLineStream),
closePrices: make(map[types.Interval]*indicatorv2.PriceStream),
}
}
func (i *IndicatorSet) KLines(interval types.Interval) *indicator.KLineStream {
func (i *IndicatorSet) KLines(interval types.Interval) *indicatorv2.KLineStream {
if kLines, ok := i.kLines[interval]; ok {
return kLines
}
kLines := indicator.KLines(i.stream, i.Symbol, interval)
kLines := indicatorv2.KLines(i.stream, i.Symbol, interval)
if kLinesWindow, ok := i.store.KLinesOfInterval(interval); ok {
kLines.BackFill(*kLinesWindow)
} else {
@ -47,60 +47,60 @@ func (i *IndicatorSet) KLines(interval types.Interval) *indicator.KLineStream {
return kLines
}
func (i *IndicatorSet) OPEN(interval types.Interval) *indicator.PriceStream {
return indicator.OpenPrices(i.KLines(interval))
func (i *IndicatorSet) OPEN(interval types.Interval) *indicatorv2.PriceStream {
return indicatorv2.OpenPrices(i.KLines(interval))
}
func (i *IndicatorSet) HIGH(interval types.Interval) *indicator.PriceStream {
return indicator.HighPrices(i.KLines(interval))
func (i *IndicatorSet) HIGH(interval types.Interval) *indicatorv2.PriceStream {
return indicatorv2.HighPrices(i.KLines(interval))
}
func (i *IndicatorSet) LOW(interval types.Interval) *indicator.PriceStream {
return indicator.LowPrices(i.KLines(interval))
func (i *IndicatorSet) LOW(interval types.Interval) *indicatorv2.PriceStream {
return indicatorv2.LowPrices(i.KLines(interval))
}
func (i *IndicatorSet) CLOSE(interval types.Interval) *indicator.PriceStream {
func (i *IndicatorSet) CLOSE(interval types.Interval) *indicatorv2.PriceStream {
if closePrices, ok := i.closePrices[interval]; ok {
return closePrices
}
closePrices := indicator.ClosePrices(i.KLines(interval))
closePrices := indicatorv2.ClosePrices(i.KLines(interval))
i.closePrices[interval] = closePrices
return closePrices
}
func (i *IndicatorSet) VOLUME(interval types.Interval) *indicator.PriceStream {
return indicator.Volumes(i.KLines(interval))
func (i *IndicatorSet) VOLUME(interval types.Interval) *indicatorv2.PriceStream {
return indicatorv2.Volumes(i.KLines(interval))
}
func (i *IndicatorSet) RSI(iw types.IntervalWindow) *indicator.RSIStream {
return indicator.RSI2(i.CLOSE(iw.Interval), iw.Window)
func (i *IndicatorSet) RSI(iw types.IntervalWindow) *indicatorv2.RSIStream {
return indicatorv2.RSI2(i.CLOSE(iw.Interval), iw.Window)
}
func (i *IndicatorSet) EMA(iw types.IntervalWindow) *indicator.EWMAStream {
func (i *IndicatorSet) EMA(iw types.IntervalWindow) *indicatorv2.EWMAStream {
return i.EWMA(iw)
}
func (i *IndicatorSet) EWMA(iw types.IntervalWindow) *indicator.EWMAStream {
return indicator.EWMA2(i.CLOSE(iw.Interval), iw.Window)
func (i *IndicatorSet) EWMA(iw types.IntervalWindow) *indicatorv2.EWMAStream {
return indicatorv2.EWMA2(i.CLOSE(iw.Interval), iw.Window)
}
func (i *IndicatorSet) STOCH(iw types.IntervalWindow, dPeriod int) *indicator.StochStream {
return indicator.Stoch2(i.KLines(iw.Interval), iw.Window, dPeriod)
func (i *IndicatorSet) STOCH(iw types.IntervalWindow, dPeriod int) *indicatorv2.StochStream {
return indicatorv2.Stoch(i.KLines(iw.Interval), iw.Window, dPeriod)
}
func (i *IndicatorSet) BOLL(iw types.IntervalWindow, k float64) *indicator.BOLLStream {
return indicator.BOLL2(i.CLOSE(iw.Interval), iw.Window, k)
func (i *IndicatorSet) BOLL(iw types.IntervalWindow, k float64) *indicatorv2.BOLLStream {
return indicatorv2.BOLL(i.CLOSE(iw.Interval), iw.Window, k)
}
func (i *IndicatorSet) MACD(interval types.Interval, shortWindow, longWindow, signalWindow int) *indicator.MACDStream {
return indicator.MACD2(i.CLOSE(interval), shortWindow, longWindow, signalWindow)
func (i *IndicatorSet) MACD(interval types.Interval, shortWindow, longWindow, signalWindow int) *indicatorv2.MACDStream {
return indicatorv2.MACD2(i.CLOSE(interval), shortWindow, longWindow, signalWindow)
}
func (i *IndicatorSet) ATR(interval types.Interval, window int) *indicator.ATRStream {
return indicator.ATR2(i.KLines(interval), window)
func (i *IndicatorSet) ATR(interval types.Interval, window int) *indicatorv2.ATRStream {
return indicatorv2.ATR2(i.KLines(interval), window)
}
func (i *IndicatorSet) ATRP(interval types.Interval, window int) *indicator.ATRPStream {
return indicator.ATRP2(i.KLines(interval), window)
func (i *IndicatorSet) ATRP(interval types.Interval, window int) *indicatorv2.ATRPStream {
return indicatorv2.ATRP2(i.KLines(interval), window)
}

View File

@ -417,6 +417,8 @@ func (e *GeneralOrderExecutor) OpenPosition(ctx context.Context, options OpenPos
return createdOrder, nil
}
log.WithError(err).Errorf("unable to submit order: %v", err)
log.Infof("reduce quantity and retry order")
return e.reduceQuantityAndSubmitOrder(ctx, price, *submitOrder)
}

View File

@ -222,7 +222,6 @@ func (trader *Trader) injectFieldsAndSubscribe(ctx context.Context) error {
// load and run Session strategies
for sessionName, strategies := range trader.exchangeStrategies {
var session = trader.environment.sessions[sessionName]
var orderExecutor = trader.getSessionOrderExecutor(sessionName)
for _, strategy := range strategies {
rs := reflect.ValueOf(strategy)
@ -237,10 +236,6 @@ func (trader *Trader) injectFieldsAndSubscribe(ctx context.Context) error {
return err
}
if err := dynamic.InjectField(rs, "OrderExecutor", orderExecutor, false); err != nil {
return errors.Wrapf(err, "failed to inject OrderExecutor on %T", strategy)
}
if defaulter, ok := strategy.(StrategyDefaulter); ok {
if err := defaulter.Defaults(); err != nil {
panic(err)
@ -441,7 +436,7 @@ func (trader *Trader) injectCommonServices(ctx context.Context, s interface{}) e
return fmt.Errorf("field Persistence is not a struct element, %s given", field)
}
if err := dynamic.InjectField(elem, "Facade", ps, true); err != nil {
if err := dynamic.InjectField(elem.Interface(), "Facade", ps, true); err != nil {
return err
}

View File

@ -27,6 +27,7 @@ import (
_ "github.com/c9s/bbgo/pkg/strategy/pricealert"
_ "github.com/c9s/bbgo/pkg/strategy/pricedrop"
_ "github.com/c9s/bbgo/pkg/strategy/rebalance"
_ "github.com/c9s/bbgo/pkg/strategy/rsicross"
_ "github.com/c9s/bbgo/pkg/strategy/rsmaker"
_ "github.com/c9s/bbgo/pkg/strategy/schedule"
_ "github.com/c9s/bbgo/pkg/strategy/scmaker"

View File

@ -3,22 +3,19 @@ package dynamic
import (
"fmt"
"reflect"
"testing"
"time"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
)
type testEnvironment struct {
startTime time.Time
}
func InjectField(rs reflect.Value, fieldName string, obj interface{}, pointerOnly bool) error {
func InjectField(target interface{}, fieldName string, obj interface{}, pointerOnly bool) error {
rs := reflect.ValueOf(target)
field := rs.FieldByName(fieldName)
if !field.IsValid() {
return nil
}
@ -131,96 +128,3 @@ func ParseStructAndInject(f interface{}, objects ...interface{}) error {
return nil
}
func Test_injectField(t *testing.T) {
type TT struct {
TradeService *service.TradeService
}
// only pointer object can be set.
var tt = &TT{}
// get the value of the pointer, or it can not be set.
var rv = reflect.ValueOf(tt).Elem()
_, ret := HasField(rv, "TradeService")
assert.True(t, ret)
ts := &service.TradeService{}
err := InjectField(rv, "TradeService", ts, true)
assert.NoError(t, err)
}
func Test_parseStructAndInject(t *testing.T) {
t.Run("skip nil", func(t *testing.T) {
ss := struct {
a int
Env *testEnvironment
}{
a: 1,
Env: nil,
}
err := ParseStructAndInject(&ss, nil)
assert.NoError(t, err)
assert.Nil(t, ss.Env)
})
t.Run("pointer", func(t *testing.T) {
ss := struct {
a int
Env *testEnvironment
}{
a: 1,
Env: nil,
}
err := ParseStructAndInject(&ss, &testEnvironment{})
assert.NoError(t, err)
assert.NotNil(t, ss.Env)
})
t.Run("composition", func(t *testing.T) {
type TT struct {
*service.TradeService
}
ss := TT{}
err := ParseStructAndInject(&ss, &service.TradeService{})
assert.NoError(t, err)
assert.NotNil(t, ss.TradeService)
})
t.Run("struct", func(t *testing.T) {
ss := struct {
a int
Env testEnvironment
}{
a: 1,
}
err := ParseStructAndInject(&ss, testEnvironment{
startTime: time.Now(),
})
assert.NoError(t, err)
assert.NotEqual(t, time.Time{}, ss.Env.startTime)
})
t.Run("interface/any", func(t *testing.T) {
ss := struct {
Any interface{} // anything
}{
Any: nil,
}
err := ParseStructAndInject(&ss, &testEnvironment{
startTime: time.Now(),
})
assert.NoError(t, err)
assert.NotNil(t, ss.Any)
})
t.Run("interface/stringer", func(t *testing.T) {
ss := struct {
Stringer types.Stringer // stringer interface
}{
Stringer: nil,
}
err := ParseStructAndInject(&ss, &types.Trade{})
assert.NoError(t, err)
assert.NotNil(t, ss.Stringer)
})
}

105
pkg/dynamic/inject_test.go Normal file
View File

@ -0,0 +1,105 @@
package dynamic
import (
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
)
func Test_injectField(t *testing.T) {
type TT struct {
TradeService *service.TradeService
}
// only pointer object can be set.
var tt = &TT{}
// get the value of the pointer, or it can not be set.
var rv = reflect.ValueOf(tt).Elem()
_, ret := HasField(rv, "TradeService")
assert.True(t, ret)
ts := &service.TradeService{}
err := InjectField(rv, "TradeService", ts, true)
assert.NoError(t, err)
}
func Test_parseStructAndInject(t *testing.T) {
t.Run("skip nil", func(t *testing.T) {
ss := struct {
a int
Env *testEnvironment
}{
a: 1,
Env: nil,
}
err := ParseStructAndInject(&ss, nil)
assert.NoError(t, err)
assert.Nil(t, ss.Env)
})
t.Run("pointer", func(t *testing.T) {
ss := struct {
a int
Env *testEnvironment
}{
a: 1,
Env: nil,
}
err := ParseStructAndInject(&ss, &testEnvironment{})
assert.NoError(t, err)
assert.NotNil(t, ss.Env)
})
t.Run("composition", func(t *testing.T) {
type TT struct {
*service.TradeService
}
ss := TT{}
err := ParseStructAndInject(&ss, &service.TradeService{})
assert.NoError(t, err)
assert.NotNil(t, ss.TradeService)
})
t.Run("struct", func(t *testing.T) {
ss := struct {
a int
Env testEnvironment
}{
a: 1,
}
err := ParseStructAndInject(&ss, testEnvironment{
startTime: time.Now(),
})
assert.NoError(t, err)
assert.NotEqual(t, time.Time{}, ss.Env.startTime)
})
t.Run("interface/any", func(t *testing.T) {
ss := struct {
Any interface{} // anything
}{
Any: nil,
}
err := ParseStructAndInject(&ss, &testEnvironment{
startTime: time.Now(),
})
assert.NoError(t, err)
assert.NotNil(t, ss.Any)
})
t.Run("interface/stringer", func(t *testing.T) {
ss := struct {
Stringer types.Stringer // stringer interface
}{
Stringer: nil,
}
err := ParseStructAndInject(&ss, &types.Trade{})
assert.NoError(t, err)
assert.NotNil(t, ss.Stringer)
})
}

View File

@ -61,7 +61,7 @@ func IterateFieldsByTag(obj interface{}, tagName string, cb StructFieldIterator)
st := reflect.TypeOf(obj)
if st.Kind() != reflect.Ptr {
return fmt.Errorf("f should be a pointer of a struct, %s given", st)
return fmt.Errorf("obj should be a pointer of a struct, %s given", st)
}
// for pointer, check if it's nil

View File

@ -8,6 +8,16 @@ import (
"github.com/stretchr/testify/assert"
)
type TestEmbedded struct {
Foo int `persistence:"foo"`
Bar int `persistence:"bar"`
}
type TestA struct {
*TestEmbedded
Outer int `persistence:"outer"`
}
func TestIterateFields(t *testing.T) {
t.Run("basic", func(t *testing.T) {
@ -100,4 +110,22 @@ func TestIterateFieldsByTag(t *testing.T) {
assert.Equal(t, 2, cnt)
assert.Equal(t, []string{"a", "b"}, collectedTags)
})
t.Run("embedded", func(t *testing.T) {
a := &TestA{
TestEmbedded: &TestEmbedded{Foo: 1, Bar: 2},
Outer: 3,
}
collectedTags := []string{}
cnt := 0
err := IterateFieldsByTag(a, "persistence", func(tag string, ft reflect.StructField, fv reflect.Value) error {
cnt++
collectedTags = append(collectedTags, tag)
return nil
})
assert.NoError(t, err)
assert.Equal(t, 3, cnt)
assert.Equal(t, []string{"foo", "bar", "outer"}, collectedTags)
})
}

View File

@ -76,9 +76,9 @@ func (inc *EWMA) PushK(k types.KLine) {
inc.EmitUpdate(inc.Last(0))
}
func CalculateKLinesEMA(allKLines []types.KLine, priceF KLineValueMapper, window int) float64 {
func CalculateKLinesEMA(allKLines []types.KLine, priceF types.KLineValueMapper, window int) float64 {
var multiplier = 2.0 / (float64(window) + 1)
return ewma(MapKLinePrice(allKLines, priceF), multiplier)
return ewma(types.MapKLinePrice(allKLines, priceF), multiplier)
}
// see https://www.investopedia.com/ask/answers/122314/what-exponential-moving-average-ema-formula-and-how-ema-calculated.asp

View File

@ -1027,7 +1027,7 @@ func buildKLines(prices []fixedpoint.Value) (klines []types.KLine) {
func Test_calculateEWMA(t *testing.T) {
type args struct {
allKLines []types.KLine
priceF KLineValueMapper
priceF types.KLineValueMapper
window int
}
var input []fixedpoint.Value
@ -1043,7 +1043,7 @@ func Test_calculateEWMA(t *testing.T) {
name: "ETHUSDT EMA 7",
args: args{
allKLines: buildKLines(input),
priceF: KLineClosePriceMapper,
priceF: types.KLineClosePriceMapper,
window: 7,
},
want: 571.72, // with open price, binance desktop returns 571.45, trading view returns 570.8957, for close price, binance mobile returns 571.72
@ -1052,7 +1052,7 @@ func Test_calculateEWMA(t *testing.T) {
name: "ETHUSDT EMA 25",
args: args{
allKLines: buildKLines(input),
priceF: KLineClosePriceMapper,
priceF: types.KLineClosePriceMapper,
window: 25,
},
want: 571.30,
@ -1061,7 +1061,7 @@ func Test_calculateEWMA(t *testing.T) {
name: "ETHUSDT EMA 99",
args: args{
allKLines: buildKLines(input),
priceF: KLineClosePriceMapper,
priceF: types.KLineClosePriceMapper,
window: 99,
},
want: 577.62, // binance mobile uses 577.58

View File

@ -1,15 +0,0 @@
// Code generated by "callbackgen -type Float64Updater"; DO NOT EDIT.
package indicator
import ()
func (f *Float64Updater) OnUpdate(cb func(v float64)) {
f.updateCallbacks = append(f.updateCallbacks, cb)
}
func (f *Float64Updater) EmitUpdate(v float64) {
for _, cb := range f.updateCallbacks {
cb(v)
}
}

View File

@ -6070,7 +6070,7 @@ func Test_GHFilter(t *testing.T) {
func Test_GHFilterEstimationAccurate(t *testing.T) {
type args struct {
allKLines []types.KLine
priceF KLineValueMapper
priceF types.KLineValueMapper
window int
}
var klines []types.KLine

View File

@ -1,33 +0,0 @@
package indicator
import "github.com/c9s/bbgo/pkg/types"
type KLineValueMapper func(k types.KLine) float64
func KLineOpenPriceMapper(k types.KLine) float64 {
return k.Open.Float64()
}
func KLineClosePriceMapper(k types.KLine) float64 {
return k.Close.Float64()
}
func KLineTypicalPriceMapper(k types.KLine) float64 {
return (k.High.Float64() + k.Low.Float64() + k.Close.Float64()) / 3.
}
func KLinePriceVolumeMapper(k types.KLine) float64 {
return k.Close.Mul(k.Volume).Float64()
}
func KLineVolumeMapper(k types.KLine) float64 {
return k.Volume.Float64()
}
func MapKLinePrice(kLines []types.KLine, f KLineValueMapper) (prices []float64) {
for _, k := range kLines {
prices = append(prices, f(k))
}
return prices
}

View File

@ -52,7 +52,7 @@ func (inc *Pivot) CalculateAndUpdate(klines []types.KLine) {
recentT := klines[end-(inc.Window-1) : end+1]
l, h, err := calculatePivot(recentT, inc.Window, KLineLowPriceMapper, KLineHighPriceMapper)
l, h, err := calculatePivot(recentT, inc.Window, types.KLineLowPriceMapper, types.KLineHighPriceMapper)
if err != nil {
log.WithError(err).Error("can not calculate pivots")
return
@ -90,7 +90,7 @@ func (inc *Pivot) Bind(updater KLineWindowUpdater) {
updater.OnKLineWindowUpdate(inc.handleKLineWindowUpdate)
}
func calculatePivot(klines []types.KLine, window int, valLow KLineValueMapper, valHigh KLineValueMapper) (float64, float64, error) {
func calculatePivot(klines []types.KLine, window int, valLow types.KLineValueMapper, valHigh types.KLineValueMapper) (float64, float64, error) {
length := len(klines)
if length == 0 || length < window {
return 0., 0., fmt.Errorf("insufficient elements for calculating with window = %d", window)
@ -115,11 +115,3 @@ func calculatePivot(klines []types.KLine, window int, valLow KLineValueMapper, v
return pl, ph, nil
}
func KLineLowPriceMapper(k types.KLine) float64 {
return k.Low.Float64()
}
func KLineHighPriceMapper(k types.KLine) float64 {
return k.High.Float64()
}

View File

@ -1,22 +0,0 @@
package indicator
import "github.com/c9s/bbgo/pkg/types"
type Float64Calculator interface {
Calculate(x float64) float64
PushAndEmit(x float64)
}
type Float64Source interface {
types.Series
OnUpdate(f func(v float64))
}
type Float64Subscription interface {
types.Series
AddSubscriber(f func(v float64))
}
type Float64Truncator interface {
Truncate()
}

View File

@ -7,7 +7,7 @@ func max(x, y int) int {
return y
}
func min(x, y int) int {
func Min(x, y int) int {
if x < y {
return x
}

View File

@ -1,19 +0,0 @@
package indicator
/*
NEW INDICATOR DESIGN:
klines := kLines(marketDataStream)
closePrices := closePrices(klines)
macd := MACD(klines, {Fast: 12, Slow: 10})
equals to:
klines := KLines(marketDataStream)
closePrices := ClosePrice(klines)
fastEMA := EMA(closePrices, 7)
slowEMA := EMA(closePrices, 25)
macd := Subtract(fastEMA, slowEMA)
signal := EMA(macd, 16)
histogram := Subtract(macd, signal)
*/

View File

@ -1,4 +1,4 @@
package indicator
package indicatorv2
type ATRStream struct {
// embedded struct

View File

@ -1,4 +1,4 @@
package indicator
package indicatorv2
import (
"encoding/json"

View File

@ -1,12 +1,14 @@
package indicator
package indicatorv2
import "github.com/c9s/bbgo/pkg/types"
type ATRPStream struct {
*Float64Series
*types.Float64Series
}
func ATRP2(source KLineSubscription, window int) *ATRPStream {
s := &ATRPStream{
Float64Series: NewFloat64Series(),
Float64Series: types.NewFloat64Series(),
}
tr := TR2(source)
atr := RMA2(tr, window, true)

View File

@ -1,10 +1,14 @@
package indicator
package indicatorv2
import (
"github.com/c9s/bbgo/pkg/types"
)
type BOLLStream struct {
// the band series
*Float64Series
*types.Float64Series
UpBand, DownBand *Float64Series
UpBand, DownBand *types.Float64Series
window int
k float64
@ -20,15 +24,15 @@ type BOLLStream struct {
//
// -> calculate SMA
// -> calculate stdDev -> calculate bandWidth -> get latest SMA -> upBand, downBand
func BOLL2(source Float64Source, window int, k float64) *BOLLStream {
func BOLL(source types.Float64Source, window int, k float64) *BOLLStream {
// bind these indicators before our main calculator
sma := SMA2(source, window)
stdDev := StdDev2(source, window)
stdDev := StdDev(source, window)
s := &BOLLStream{
Float64Series: NewFloat64Series(),
UpBand: NewFloat64Series(),
DownBand: NewFloat64Series(),
Float64Series: types.NewFloat64Series(),
UpBand: types.NewFloat64Series(),
DownBand: types.NewFloat64Series(),
window: window,
k: k,
SMA: sma,

28
pkg/indicator/v2/cma.go Normal file
View File

@ -0,0 +1,28 @@
package indicatorv2
import (
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/types"
)
type CMAStream struct {
*types.Float64Series
}
func CMA2(source types.Float64Source) *CMAStream {
s := &CMAStream{
Float64Series: types.NewFloat64Series(),
}
s.Bind(source, s)
return s
}
func (s *CMAStream) Calculate(x float64) float64 {
l := float64(s.Slice.Length())
cma := (s.Slice.Last(0)*l + x) / (l + 1.)
return cma
}
func (s *CMAStream) Truncate() {
s.Slice.Truncate(indicator.MaxNumOfEWMA)
}

View File

@ -1,7 +1,8 @@
package indicator
package indicatorv2
import (
"github.com/c9s/bbgo/pkg/datatype/floats"
"github.com/c9s/bbgo/pkg/types"
)
type CrossType float64
@ -13,7 +14,7 @@ const (
// CrossStream subscribes 2 upstreams, and calculate the cross signal
type CrossStream struct {
*Float64Series
*types.Float64Series
a, b floats.Slice
}
@ -21,9 +22,9 @@ type CrossStream struct {
// Cross creates the CrossStream object:
//
// cross := Cross(fastEWMA, slowEWMA)
func Cross(a, b Float64Source) *CrossStream {
func Cross(a, b types.Float64Source) *CrossStream {
s := &CrossStream{
Float64Series: NewFloat64Series(),
Float64Series: types.NewFloat64Series(),
}
a.OnUpdate(func(v float64) {
s.a.Push(v)

View File

@ -1,15 +1,17 @@
package indicator
package indicatorv2
import "github.com/c9s/bbgo/pkg/types"
type EWMAStream struct {
*Float64Series
*types.Float64Series
window int
multiplier float64
}
func EWMA2(source Float64Source, window int) *EWMAStream {
func EWMA2(source types.Float64Source, window int) *EWMAStream {
s := &EWMAStream{
Float64Series: NewFloat64Series(),
Float64Series: types.NewFloat64Series(),
window: window,
multiplier: 2.0 / float64(1+window),
}
@ -18,7 +20,7 @@ func EWMA2(source Float64Source, window int) *EWMAStream {
}
func (s *EWMAStream) Calculate(v float64) float64 {
last := s.slice.Last(0)
last := s.Slice.Last(0)
if last == 0.0 {
return v
}

View File

@ -1,4 +1,4 @@
package indicator
package indicatorv2
import "github.com/c9s/bbgo/pkg/types"
@ -60,3 +60,9 @@ func KLines(source types.Stream, symbol string, interval types.Interval) *KLineS
return s
}
type KLineSubscription interface {
AddSubscriber(f func(k types.KLine))
Length() int
Last(i int) *types.KLine
}

View File

@ -1,6 +1,6 @@
// Code generated by "callbackgen -type KLineStream"; DO NOT EDIT.
package indicator
package indicatorv2
import (
"github.com/c9s/bbgo/pkg/types"

View File

@ -1,4 +1,8 @@
package indicator
package indicatorv2
import (
"github.com/c9s/bbgo/pkg/types"
)
type MACDStream struct {
*SubtractStream
@ -9,7 +13,7 @@ type MACDStream struct {
Histogram *SubtractStream
}
func MACD2(source Float64Source, shortWindow, longWindow, signalWindow int) *MACDStream {
func MACD2(source types.Float64Source, shortWindow, longWindow, signalWindow int) *MACDStream {
// bind and calculate these first
fastEWMA := EWMA2(source, shortWindow)
slowEWMA := EWMA2(source, longWindow)

View File

@ -1,4 +1,4 @@
package indicator
package indicatorv2
import (
"encoding/json"
@ -21,6 +21,14 @@ fast = s.ewm(span=12, adjust=False).mean()
print(fast - slow)
*/
func buildKLines(prices []fixedpoint.Value) (klines []types.KLine) {
for _, p := range prices {
klines = append(klines, types.KLine{Close: p})
}
return klines
}
func Test_MACD2(t *testing.T) {
var randomPrices = []byte(`[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]`)
var input []fixedpoint.Value

View File

@ -1,15 +1,18 @@
package indicator
package indicatorv2
import "github.com/c9s/bbgo/pkg/datatype/floats"
import (
"github.com/c9s/bbgo/pkg/datatype/floats"
"github.com/c9s/bbgo/pkg/types"
)
type MultiplyStream struct {
*Float64Series
*types.Float64Series
a, b floats.Slice
}
func Multiply(a, b Float64Source) *MultiplyStream {
func Multiply(a, b types.Float64Source) *MultiplyStream {
s := &MultiplyStream{
Float64Series: NewFloat64Series(),
Float64Series: types.NewFloat64Series(),
}
a.OnUpdate(func(v float64) {
@ -29,13 +32,13 @@ func (s *MultiplyStream) calculate() {
return
}
if s.a.Length() > s.slice.Length() {
var numNewElems = s.a.Length() - s.slice.Length()
if s.a.Length() > s.Slice.Length() {
var numNewElems = s.a.Length() - s.Slice.Length()
var tailA = s.a.Tail(numNewElems)
var tailB = s.b.Tail(numNewElems)
var tailC = tailA.Mul(tailB)
for _, f := range tailC {
s.slice.Push(f)
s.Slice.Push(f)
s.EmitUpdate(f)
}
}

View File

@ -0,0 +1,34 @@
package indicatorv2
import (
"github.com/c9s/bbgo/pkg/datatype/floats"
"github.com/c9s/bbgo/pkg/types"
)
type PivotHighStream struct {
*types.Float64Series
rawValues floats.Slice
window, rightWindow int
}
func PivotHigh2(source types.Float64Source, window, rightWindow int) *PivotHighStream {
s := &PivotHighStream{
Float64Series: types.NewFloat64Series(),
window: window,
rightWindow: rightWindow,
}
s.Subscribe(source, func(x float64) {
s.rawValues.Push(x)
if low, ok := s.calculatePivotHigh(s.rawValues, s.window, s.rightWindow); ok {
s.PushAndEmit(low)
}
})
return s
}
func (s *PivotHighStream) calculatePivotHigh(highs floats.Slice, left, right int) (float64, bool) {
return floats.FindPivot(highs, left, right, func(a, pivot float64) bool {
return a < pivot
})
}

View File

@ -0,0 +1,34 @@
package indicatorv2
import (
"github.com/c9s/bbgo/pkg/datatype/floats"
"github.com/c9s/bbgo/pkg/types"
)
type PivotLowStream struct {
*types.Float64Series
rawValues floats.Slice
window, rightWindow int
}
func PivotLow(source types.Float64Source, window, rightWindow int) *PivotLowStream {
s := &PivotLowStream{
Float64Series: types.NewFloat64Series(),
window: window,
rightWindow: rightWindow,
}
s.Subscribe(source, func(x float64) {
s.rawValues.Push(x)
if low, ok := s.calculatePivotLow(s.rawValues, s.window, s.rightWindow); ok {
s.PushAndEmit(low)
}
})
return s
}
func (s *PivotLowStream) calculatePivotLow(lows floats.Slice, left, right int) (float64, bool) {
return floats.FindPivot(lows, left, right, func(a, pivot float64) bool {
return a > pivot
})
}

View File

@ -1,24 +1,18 @@
package indicator
package indicatorv2
import (
"github.com/c9s/bbgo/pkg/types"
)
type KLineSubscription interface {
AddSubscriber(f func(k types.KLine))
Length() int
Last(i int) *types.KLine
}
type PriceStream struct {
*Float64Series
*types.Float64Series
mapper KLineValueMapper
mapper types.KLineValueMapper
}
func Price(source KLineSubscription, mapper KLineValueMapper) *PriceStream {
func Price(source KLineSubscription, mapper types.KLineValueMapper) *PriceStream {
s := &PriceStream{
Float64Series: NewFloat64Series(),
Float64Series: types.NewFloat64Series(),
mapper: mapper,
}
@ -37,37 +31,37 @@ func Price(source KLineSubscription, mapper KLineValueMapper) *PriceStream {
func (s *PriceStream) AddSubscriber(f func(v float64)) {
s.OnUpdate(f)
if len(s.slice) == 0 {
if len(s.Slice) == 0 {
return
}
// push historical value to the subscriber
for _, v := range s.slice {
for _, v := range s.Slice {
f(v)
}
}
func (s *PriceStream) PushAndEmit(v float64) {
s.slice.Push(v)
s.Slice.Push(v)
s.EmitUpdate(v)
}
func ClosePrices(source KLineSubscription) *PriceStream {
return Price(source, KLineClosePriceMapper)
return Price(source, types.KLineClosePriceMapper)
}
func LowPrices(source KLineSubscription) *PriceStream {
return Price(source, KLineLowPriceMapper)
return Price(source, types.KLineLowPriceMapper)
}
func HighPrices(source KLineSubscription) *PriceStream {
return Price(source, KLineHighPriceMapper)
return Price(source, types.KLineHighPriceMapper)
}
func OpenPrices(source KLineSubscription) *PriceStream {
return Price(source, KLineOpenPriceMapper)
return Price(source, types.KLineOpenPriceMapper)
}
func Volumes(source KLineSubscription) *PriceStream {
return Price(source, KLineVolumeMapper)
return Price(source, types.KLineVolumeMapper)
}

View File

@ -1,8 +1,15 @@
package indicator
package indicatorv2
import (
"github.com/c9s/bbgo/pkg/types"
)
const MaxNumOfRMA = 1000
const MaxNumOfRMATruncateSize = 500
type RMAStream struct {
// embedded structs
*Float64Series
*types.Float64Series
// config fields
Adjust bool
@ -12,9 +19,9 @@ type RMAStream struct {
sum, previous float64
}
func RMA2(source Float64Source, window int, adjust bool) *RMAStream {
func RMA2(source types.Float64Source, window int, adjust bool) *RMAStream {
s := &RMAStream{
Float64Series: NewFloat64Series(),
Float64Series: types.NewFloat64Series(),
window: window,
Adjust: adjust,
}
@ -41,17 +48,17 @@ func (s *RMAStream) Calculate(x float64) float64 {
if s.counter < s.window {
// we can use x, but we need to use 0. to make the same behavior as the result from python pandas_ta
s.slice.Push(0)
s.Slice.Push(0)
}
s.slice.Push(tmp)
s.Slice.Push(tmp)
s.previous = tmp
return tmp
}
func (s *RMAStream) Truncate() {
if len(s.slice) > MaxNumOfRMA {
s.slice = s.slice[MaxNumOfRMATruncateSize-1:]
if len(s.Slice) > MaxNumOfRMA {
s.Slice = s.Slice[MaxNumOfRMATruncateSize-1:]
}
}

View File

@ -1,20 +1,24 @@
package indicator
package indicatorv2
import (
"github.com/c9s/bbgo/pkg/types"
)
type RSIStream struct {
// embedded structs
*Float64Series
*types.Float64Series
// config fields
window int
// private states
source Float64Source
source types.Float64Source
}
func RSI2(source Float64Source, window int) *RSIStream {
func RSI2(source types.Float64Source, window int) *RSIStream {
s := &RSIStream{
source: source,
Float64Series: NewFloat64Series(),
Float64Series: types.NewFloat64Series(),
window: window,
}
s.Bind(source, s)
@ -42,3 +46,17 @@ func (s *RSIStream) Calculate(_ float64) float64 {
rsi := 100.0 - (100.0 / (1.0 + rs))
return rsi
}
func max(x, y int) int {
if x > y {
return x
}
return y
}
func min(x, y int) int {
if x < y {
return x
}
return y
}

View File

@ -1,4 +1,4 @@
package indicator
package indicatorv2
import (
"encoding/json"
@ -75,11 +75,11 @@ func Test_RSI2(t *testing.T) {
prices.PushAndEmit(price)
}
assert.Equal(t, floats.Slice(tt.values), prices.slice)
assert.Equal(t, floats.Slice(tt.values), prices.Slice)
if assert.Equal(t, len(tt.want), len(rsi.slice)) {
if assert.Equal(t, len(tt.want), len(rsi.Slice)) {
for i, v := range tt.want {
assert.InDelta(t, v, rsi.slice[i], 0.000001, "Expected rsi.slice[%d] to be %v, but got %v", i, v, rsi.slice[i])
assert.InDelta(t, v, rsi.Slice[i], 0.000001, "Expected rsi.slice[%d] to be %v, but got %v", i, v, rsi.Slice[i])
}
}
})

View File

@ -1,16 +1,20 @@
package indicator
package indicatorv2
import "github.com/c9s/bbgo/pkg/types"
import (
"github.com/c9s/bbgo/pkg/types"
)
const MaxNumOfSMA = 5_000
type SMAStream struct {
*Float64Series
*types.Float64Series
window int
rawValues *types.Queue
}
func SMA2(source Float64Source, window int) *SMAStream {
func SMA2(source types.Float64Source, window int) *SMAStream {
s := &SMAStream{
Float64Series: NewFloat64Series(),
Float64Series: types.NewFloat64Series(),
window: window,
rawValues: types.NewQueue(window),
}
@ -25,5 +29,5 @@ func (s *SMAStream) Calculate(v float64) float64 {
}
func (s *SMAStream) Truncate() {
s.slice = s.slice.Truncate(MaxNumOfSMA)
s.Slice = s.Slice.Truncate(MaxNumOfSMA)
}

View File

@ -1,9 +1,9 @@
package indicator
package indicatorv2
import "github.com/c9s/bbgo/pkg/types"
type StdDevStream struct {
*Float64Series
*types.Float64Series
rawValues *types.Queue
@ -11,9 +11,9 @@ type StdDevStream struct {
multiplier float64
}
func StdDev2(source Float64Source, window int) *StdDevStream {
func StdDev(source types.Float64Source, window int) *StdDevStream {
s := &StdDevStream{
Float64Series: NewFloat64Series(),
Float64Series: types.NewFloat64Series(),
rawValues: types.NewQueue(window),
window: window,
}

View File

@ -1,10 +1,12 @@
package indicator
package indicatorv2
import (
"github.com/c9s/bbgo/pkg/datatype/floats"
"github.com/c9s/bbgo/pkg/types"
)
const DPeriod int = 3
// Stochastic Oscillator
// - https://www.investopedia.com/terms/s/stochasticoscillator.asp
//
@ -30,7 +32,7 @@ type StochStream struct {
}
// Stochastic Oscillator
func Stoch2(source KLineSubscription, window, dPeriod int) *StochStream {
func Stoch(source KLineSubscription, window, dPeriod int) *StochStream {
highPrices := HighPrices(source)
lowPrices := LowPrices(source)
@ -42,8 +44,8 @@ func Stoch2(source KLineSubscription, window, dPeriod int) *StochStream {
}
source.AddSubscriber(func(kLine types.KLine) {
lowest := s.lowPrices.slice.Tail(s.window).Min()
highest := s.highPrices.slice.Tail(s.window).Max()
lowest := s.lowPrices.Slice.Tail(s.window).Min()
highest := s.highPrices.Slice.Tail(s.window).Max()
var k float64 = 50.0
var d float64 = 0.0

View File

@ -1,4 +1,4 @@
package indicator
package indicatorv2
import (
"encoding/json"
@ -58,7 +58,7 @@ func TestSTOCH2_update(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
stream := &types.StandardStream{}
kLines := KLines(stream, "", "")
kd := Stoch2(kLines, tt.window, DPeriod)
kd := Stoch(kLines, tt.window, DPeriod)
for _, k := range tt.kLines {
stream.EmitKLineClosed(k)

View File

@ -1,6 +1,6 @@
// Code generated by "callbackgen -type StochStream"; DO NOT EDIT.
package indicator
package indicatorv2
import ()

View File

@ -1,12 +1,13 @@
package indicator
package indicatorv2
import (
"github.com/c9s/bbgo/pkg/datatype/floats"
"github.com/c9s/bbgo/pkg/types"
)
// SubtractStream subscribes 2 upstream data, and then subtract these 2 values
type SubtractStream struct {
*Float64Series
*types.Float64Series
a, b floats.Slice
i int
@ -14,9 +15,9 @@ type SubtractStream struct {
// Subtract creates the SubtractStream object
// subtract := Subtract(longEWMA, shortEWMA)
func Subtract(a, b Float64Source) *SubtractStream {
func Subtract(a, b types.Float64Source) *SubtractStream {
s := &SubtractStream{
Float64Series: NewFloat64Series(),
Float64Series: types.NewFloat64Series(),
}
a.OnUpdate(func(v float64) {
@ -35,13 +36,13 @@ func (s *SubtractStream) calculate() {
return
}
if s.a.Length() > s.slice.Length() {
var numNewElems = s.a.Length() - s.slice.Length()
if s.a.Length() > s.Slice.Length() {
var numNewElems = s.a.Length() - s.Slice.Length()
var tailA = s.a.Tail(numNewElems)
var tailB = s.b.Tail(numNewElems)
var tailC = tailA.Sub(tailB)
for _, f := range tailC {
s.slice.Push(f)
s.Slice.Push(f)
s.EmitUpdate(f)
}
}

View File

@ -1,4 +1,4 @@
package indicator
package indicatorv2
import (
"testing"
@ -21,10 +21,10 @@ func Test_v2_Subtract(t *testing.T) {
stream.EmitKLineClosed(types.KLine{Close: fixedpoint.NewFromFloat(19_000.0 + i)})
}
t.Logf("fastEMA: %+v", fastEMA.slice)
t.Logf("slowEMA: %+v", slowEMA.slice)
t.Logf("fastEMA: %+v", fastEMA.Slice)
t.Logf("slowEMA: %+v", slowEMA.Slice)
assert.Equal(t, len(subtract.a), len(subtract.b))
assert.Equal(t, len(subtract.a), len(subtract.slice))
assert.InDelta(t, subtract.slice[0], subtract.a[0]-subtract.b[0], 0.0001)
assert.Equal(t, len(subtract.a), len(subtract.Slice))
assert.InDelta(t, subtract.Slice[0], subtract.a[0]-subtract.b[0], 0.0001)
}

View File

@ -1,4 +1,4 @@
package indicator
package indicatorv2
import (
"math"
@ -9,7 +9,7 @@ import (
// This TRStream calculates the ATR first
type TRStream struct {
// embedded struct
*Float64Series
*types.Float64Series
// private states
previousClose float64
@ -17,7 +17,7 @@ type TRStream struct {
func TR2(source KLineSubscription) *TRStream {
s := &TRStream{
Float64Series: NewFloat64Series(),
Float64Series: types.NewFloat64Series(),
}
source.AddSubscriber(func(k types.KLine) {

View File

@ -1,4 +1,4 @@
package indicator
package indicatorv2
import (
"encoding/json"

View File

@ -1,23 +0,0 @@
package indicator
type CMAStream struct {
*Float64Series
}
func CMA2(source Float64Source) *CMAStream {
s := &CMAStream{
Float64Series: NewFloat64Series(),
}
s.Bind(source, s)
return s
}
func (s *CMAStream) Calculate(x float64) float64 {
l := float64(s.slice.Length())
cma := (s.slice.Last(0)*l + x) / (l + 1.)
return cma
}
func (s *CMAStream) Truncate() {
s.slice.Truncate(MaxNumOfEWMA)
}

View File

@ -1,27 +0,0 @@
package indicator
import (
"github.com/c9s/bbgo/pkg/datatype/floats"
)
type PivotHighStream struct {
*Float64Series
rawValues floats.Slice
window, rightWindow int
}
func PivotHigh2(source Float64Source, window, rightWindow int) *PivotHighStream {
s := &PivotHighStream{
Float64Series: NewFloat64Series(),
window: window,
rightWindow: rightWindow,
}
s.Subscribe(source, func(x float64) {
s.rawValues.Push(x)
if low, ok := calculatePivotHigh(s.rawValues, s.window, s.rightWindow); ok {
s.PushAndEmit(low)
}
})
return s
}

View File

@ -1,27 +0,0 @@
package indicator
import (
"github.com/c9s/bbgo/pkg/datatype/floats"
)
type PivotLowStream struct {
*Float64Series
rawValues floats.Slice
window, rightWindow int
}
func PivotLow2(source Float64Source, window, rightWindow int) *PivotLowStream {
s := &PivotLowStream{
Float64Series: NewFloat64Series(),
window: window,
rightWindow: rightWindow,
}
s.Subscribe(source, func(x float64) {
s.rawValues.Push(x)
if low, ok := calculatePivotLow(s.rawValues, s.window, s.rightWindow); ok {
s.PushAndEmit(low)
}
})
return s
}

View File

@ -58,7 +58,7 @@ func (inc *Volatility) CalculateAndUpdate(allKLines []types.KLine) {
var recentT = allKLines[end-(inc.Window-1) : end+1]
volatility, err := calculateVOLATILITY(recentT, inc.Window, KLineClosePriceMapper)
volatility, err := calculateVOLATILITY(recentT, inc.Window, types.KLineClosePriceMapper)
if err != nil {
log.WithError(err).Error("can not calculate volatility")
return
@ -86,7 +86,7 @@ func (inc *Volatility) Bind(updater KLineWindowUpdater) {
updater.OnKLineWindowUpdate(inc.handleKLineWindowUpdate)
}
func calculateVOLATILITY(klines []types.KLine, window int, priceF KLineValueMapper) (float64, error) {
func calculateVOLATILITY(klines []types.KLine, window int, priceF types.KLineValueMapper) (float64, error) {
length := len(klines)
if length == 0 || length < window {
return 0.0, fmt.Errorf("insufficient elements for calculating VOL with window = %d", window)

View File

@ -71,7 +71,7 @@ func (inc *VWAP) Length() int {
var _ types.SeriesExtend = &VWAP{}
func (inc *VWAP) PushK(k types.KLine) {
inc.Update(KLineTypicalPriceMapper(k), k.Volume.Float64())
inc.Update(types.KLineTypicalPriceMapper(k), k.Volume.Float64())
}
func (inc *VWAP) CalculateAndUpdate(allKLines []types.KLine) {
@ -99,7 +99,7 @@ func (inc *VWAP) Bind(updater KLineWindowUpdater) {
updater.OnKLineWindowUpdate(inc.handleKLineWindowUpdate)
}
func calculateVWAP(klines []types.KLine, priceF KLineValueMapper, window int) float64 {
func calculateVWAP(klines []types.KLine, priceF types.KLineValueMapper, window int) float64 {
vwap := VWAP{IntervalWindow: types.IntervalWindow{Window: window}}
for _, k := range klines {
vwap.Update(priceF(k), k.Volume.Float64())

View File

@ -63,7 +63,7 @@ func Test_calculateVWAP(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
priceF := KLineTypicalPriceMapper
priceF := types.KLineTypicalPriceMapper
got := calculateVWAP(tt.kLines, priceF, tt.window)
diff := math.Trunc((got-tt.want)*100) / 100
if diff != 0 {

View File

@ -4,14 +4,14 @@ import (
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/indicator/v2"
"github.com/c9s/bbgo/pkg/types"
)
type CircuitBreakRiskControl struct {
// Since price could be fluctuated large,
// use an EWMA to smooth it in running time
price *indicator.EWMAStream
price *indicatorv2.EWMAStream
position *types.Position
profitStats *types.ProfitStats
lossThreshold fixedpoint.Value
@ -19,7 +19,7 @@ type CircuitBreakRiskControl struct {
func NewCircuitBreakRiskControl(
position *types.Position,
price *indicator.EWMAStream,
price *indicatorv2.EWMAStream,
lossThreshold fixedpoint.Value,
profitStats *types.ProfitStats) *CircuitBreakRiskControl {

View File

@ -6,12 +6,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator"
indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2"
"github.com/c9s/bbgo/pkg/types"
)
func Test_IsHalted(t *testing.T) {
var (
price = 30000.00
realizedPnL = fixedpoint.NewFromFloat(-100.0)
@ -19,7 +18,7 @@ func Test_IsHalted(t *testing.T) {
)
window := types.IntervalWindow{Window: 30, Interval: types.Interval1m}
priceEWMA := indicator.EWMA2(nil, window.Window)
priceEWMA := indicatorv2.EWMA2(nil, window.Window)
priceEWMA.PushAndEmit(price)
cases := []struct {

View File

@ -7,6 +7,7 @@ import (
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/indicator"
indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2"
"github.com/c9s/bbgo/pkg/types"
)
@ -28,7 +29,7 @@ type DynamicSpreadSettings struct {
}
// Initialize dynamic spreads and preload SMAs
func (ds *DynamicSpreadSettings) Initialize(symbol string, session *bbgo.ExchangeSession, neutralBoll, defaultBoll *indicator.BOLLStream) {
func (ds *DynamicSpreadSettings) Initialize(symbol string, session *bbgo.ExchangeSession, neutralBoll, defaultBoll *indicatorv2.BOLLStream) {
switch {
case ds.AmpSpreadSettings != nil:
ds.AmpSpreadSettings.initialize(symbol, session)
@ -164,10 +165,10 @@ type DynamicSpreadBollWidthRatioSettings struct {
// A positive number. The greater factor, the sharper weighting function. Default set to 1.0 .
Sensitivity float64 `json:"sensitivity"`
defaultBoll, neutralBoll *indicator.BOLLStream
defaultBoll, neutralBoll *indicatorv2.BOLLStream
}
func (ds *DynamicSpreadBollWidthRatioSettings) initialize(neutralBoll, defaultBoll *indicator.BOLLStream) {
func (ds *DynamicSpreadBollWidthRatioSettings) initialize(neutralBoll, defaultBoll *indicatorv2.BOLLStream) {
ds.neutralBoll = neutralBoll
ds.defaultBoll = defaultBoll
if ds.Sensitivity <= 0. {

View File

@ -6,7 +6,7 @@ import (
"math"
"sync"
"github.com/c9s/bbgo/pkg/indicator"
indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2"
"github.com/c9s/bbgo/pkg/util"
"github.com/pkg/errors"
@ -158,10 +158,10 @@ type Strategy struct {
groupID uint32
// defaultBoll is the BOLLINGER indicator we used for predicting the price.
defaultBoll *indicator.BOLLStream
defaultBoll *indicatorv2.BOLLStream
// neutralBoll is the neutral price section
neutralBoll *indicator.BOLLStream
neutralBoll *indicatorv2.BOLLStream
// StrategyController
bbgo.StrategyController

View File

@ -1,6 +1,8 @@
package bollmaker
import "github.com/c9s/bbgo/pkg/indicator"
import (
indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2"
)
type PriceTrend string
@ -11,7 +13,7 @@ const (
UnknownTrend PriceTrend = "unknown"
)
func detectPriceTrend(inc *indicator.BOLLStream, price float64) PriceTrend {
func detectPriceTrend(inc *indicatorv2.BOLLStream, price float64) PriceTrend {
if inBetween(price, inc.DownBand.Last(0), inc.UpBand.Last(0)) {
return NeutralTrend
}

View File

@ -0,0 +1,88 @@
package common
import (
"context"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/risk/riskcontrol"
"github.com/c9s/bbgo/pkg/types"
)
type RiskController struct {
PositionHardLimit fixedpoint.Value `json:"positionHardLimit"`
MaxPositionQuantity fixedpoint.Value `json:"maxPositionQuantity"`
CircuitBreakLossThreshold fixedpoint.Value `json:"circuitBreakLossThreshold"`
CircuitBreakEMA types.IntervalWindow `json:"circuitBreakEMA"`
positionRiskControl *riskcontrol.PositionRiskControl
circuitBreakRiskControl *riskcontrol.CircuitBreakRiskControl
}
// Strategy provides the core functionality that is required by a long/short strategy.
type Strategy struct {
Position *types.Position `json:"position,omitempty" persistence:"position"`
ProfitStats *types.ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
parent, ctx context.Context
cancel context.CancelFunc
Environ *bbgo.Environment
Session *bbgo.ExchangeSession
OrderExecutor *bbgo.GeneralOrderExecutor
RiskController
}
func (s *Strategy) Initialize(ctx context.Context, environ *bbgo.Environment, session *bbgo.ExchangeSession, market types.Market, strategyID, instanceID string) {
s.parent = ctx
s.ctx, s.cancel = context.WithCancel(ctx)
s.Environ = environ
s.Session = session
if s.ProfitStats == nil {
s.ProfitStats = types.NewProfitStats(market)
}
if s.Position == nil {
s.Position = types.NewPositionFromMarket(market)
}
// Always update the position fields
s.Position.Strategy = strategyID
s.Position.StrategyInstanceID = instanceID
// if anyone of the fee rate is defined, this assumes that both are defined.
// so that zero maker fee could be applied
if session.MakerFeeRate.Sign() > 0 || session.TakerFeeRate.Sign() > 0 {
s.Position.SetExchangeFeeRate(session.ExchangeName, types.ExchangeFee{
MakerFeeRate: session.MakerFeeRate,
TakerFeeRate: session.TakerFeeRate,
})
}
s.OrderExecutor = bbgo.NewGeneralOrderExecutor(session, market.Symbol, strategyID, instanceID, s.Position)
s.OrderExecutor.BindEnvironment(environ)
s.OrderExecutor.BindProfitStats(s.ProfitStats)
s.OrderExecutor.Bind()
s.OrderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
// bbgo.Sync(ctx, s)
})
if !s.PositionHardLimit.IsZero() && !s.MaxPositionQuantity.IsZero() {
log.Infof("positionHardLimit and maxPositionQuantity are configured, setting up PositionRiskControl...")
s.positionRiskControl = riskcontrol.NewPositionRiskControl(s.OrderExecutor, s.PositionHardLimit, s.MaxPositionQuantity)
}
if !s.CircuitBreakLossThreshold.IsZero() {
log.Infof("circuitBreakLossThreshold is configured, setting up CircuitBreakRiskControl...")
s.circuitBreakRiskControl = riskcontrol.NewCircuitBreakRiskControl(
s.Position,
session.Indicators(market.Symbol).EWMA(s.CircuitBreakEMA),
s.CircuitBreakLossThreshold,
s.ProfitStats)
}
}

View File

@ -82,13 +82,13 @@ func (inc *PMR) PushK(k types.KLine) {
return
}
inc.Update(indicator.KLineClosePriceMapper(k))
inc.Update(types.KLineClosePriceMapper(k))
inc.EndTime = k.EndTime.Time()
inc.EmitUpdate(inc.Last(0))
}
func CalculateKLinesPMR(allKLines []types.KLine, window int) float64 {
return pmr(indicator.MapKLinePrice(allKLines, indicator.KLineClosePriceMapper), window)
return pmr(types.MapKLinePrice(allKLines, types.KLineClosePriceMapper), window)
}
func pmr(prices []float64, window int) float64 {

View File

@ -89,13 +89,13 @@ func (inc *PVD) PushK(k types.KLine) {
return
}
inc.Update(indicator.KLineClosePriceMapper(k), indicator.KLineVolumeMapper(k))
inc.Update(types.KLineClosePriceMapper(k), types.KLineVolumeMapper(k))
inc.EndTime = k.EndTime.Time()
inc.EmitUpdate(inc.Last(0))
}
func CalculateKLinesPVD(allKLines []types.KLine, window int) float64 {
return pvd(indicator.MapKLinePrice(allKLines, indicator.KLineClosePriceMapper), indicator.MapKLinePrice(allKLines, indicator.KLineVolumeMapper), window)
return pvd(types.MapKLinePrice(allKLines, types.KLineClosePriceMapper), types.MapKLinePrice(allKLines, types.KLineVolumeMapper), window)
}
func pvd(prices []float64, volumes []float64, window int) float64 {

View File

@ -81,7 +81,7 @@ func (inc *RR) PushK(k types.KLine) {
return
}
inc.Update(indicator.KLineClosePriceMapper(k))
inc.Update(types.KLineClosePriceMapper(k))
inc.EndTime = k.EndTime.Time()
inc.EmitUpdate(inc.Last(0))
}

View File

@ -42,7 +42,7 @@ func (inc *A18) CalculateAndUpdate(klines []types.KLine) {
var recentT = klines[end-(inc.Window-1) : end+1]
val, err := calculateA18(recentT, indicator.KLineClosePriceMapper)
val, err := calculateA18(recentT, types.KLineClosePriceMapper)
if err != nil {
log.WithError(err).Error("can not calculate")
return

View File

@ -42,7 +42,7 @@ func (inc *A2) CalculateAndUpdate(klines []types.KLine) {
var recentT = klines[end-(inc.Window-1) : end+1]
val, err := calculateA2(recentT, KLineLowPriceMapper, KLineHighPriceMapper, indicator.KLineClosePriceMapper)
val, err := calculateA2(recentT, KLineLowPriceMapper, KLineHighPriceMapper, types.KLineClosePriceMapper)
if err != nil {
log.WithError(err).Error("can not calculate")
return

View File

@ -43,7 +43,7 @@ func (inc *A3) CalculateAndUpdate(klines []types.KLine) {
var recentT = klines[end-(inc.Window-1) : end+1]
val, err := calculateA3(recentT, KLineLowPriceMapper, KLineHighPriceMapper, indicator.KLineClosePriceMapper)
val, err := calculateA3(recentT, KLineLowPriceMapper, KLineHighPriceMapper, types.KLineClosePriceMapper)
if err != nil {
log.WithError(err).Error("can not calculate pivots")
return

View File

@ -42,7 +42,7 @@ func (inc *A34) CalculateAndUpdate(klines []types.KLine) {
var recentT = klines[end-(inc.Window-1) : end+1]
val, err := calculateA34(recentT, indicator.KLineClosePriceMapper)
val, err := calculateA34(recentT, types.KLineClosePriceMapper)
if err != nil {
log.WithError(err).Error("can not calculate pivots")
return

View File

@ -46,7 +46,7 @@ func (inc *R) CalculateAndUpdate(klines []types.KLine) {
var recentT = klines[end-(inc.Window-1) : end+1]
val, err := calculateR(recentT, indicator.KLineOpenPriceMapper, indicator.KLineClosePriceMapper)
val, err := calculateR(recentT, types.KLineOpenPriceMapper, types.KLineClosePriceMapper)
if err != nil {
log.WithError(err).Error("can not calculate pivots")
return

View File

@ -42,7 +42,7 @@ func (inc *S0) CalculateAndUpdate(klines []types.KLine) {
var recentT = klines[end-(inc.Window-1) : end+1]
val, err := calculateS0(recentT, indicator.KLineClosePriceMapper)
val, err := calculateS0(recentT, types.KLineClosePriceMapper)
if err != nil {
log.WithError(err).Error("can not calculate")
return

View File

@ -40,7 +40,7 @@ func (inc *S1) CalculateAndUpdate(klines []types.KLine) {
var recentT = klines[end-(inc.Window-1) : end+1]
correlation, err := calculateS1(recentT, inc.Window, KLineAmplitudeMapper, indicator.KLineVolumeMapper)
correlation, err := calculateS1(recentT, inc.Window, KLineAmplitudeMapper, types.KLineVolumeMapper)
if err != nil {
log.WithError(err).Error("can not calculate correlation")
return

View File

@ -40,7 +40,7 @@ func (inc *S2) CalculateAndUpdate(klines []types.KLine) {
var recentT = klines[end-(inc.Window-1) : end+1]
correlation, err := calculateS2(recentT, inc.Window, indicator.KLineOpenPriceMapper, indicator.KLineVolumeMapper)
correlation, err := calculateS2(recentT, inc.Window, types.KLineOpenPriceMapper, types.KLineVolumeMapper)
if err != nil {
log.WithError(err).Error("can not calculate correlation")
return

View File

@ -42,7 +42,7 @@ func (inc *S3) CalculateAndUpdate(klines []types.KLine) {
var recentT = klines[end-(inc.Window-1) : end+1]
val, err := calculateS3(recentT, indicator.KLineClosePriceMapper, indicator.KLineOpenPriceMapper)
val, err := calculateS3(recentT, types.KLineClosePriceMapper, types.KLineOpenPriceMapper)
if err != nil {
log.WithError(err).Error("can not calculate")
return

View File

@ -42,7 +42,7 @@ func (inc *S4) CalculateAndUpdate(klines []types.KLine) {
var recentT = klines[end-(inc.Window-1) : end+1]
val, err := calculateS4(recentT, indicator.KLineClosePriceMapper)
val, err := calculateS4(recentT, types.KLineClosePriceMapper)
if err != nil {
log.WithError(err).Error("can not calculate")
return

View File

@ -42,7 +42,7 @@ func (inc *S5) CalculateAndUpdate(klines []types.KLine) {
var recentT = klines[end-(inc.Window-1) : end+1]
val, err := calculateS5(recentT, indicator.KLineVolumeMapper)
val, err := calculateS5(recentT, types.KLineVolumeMapper)
if err != nil {
log.WithError(err).Error("can not calculate pivots")
return

View File

@ -42,7 +42,7 @@ func (inc *S6) CalculateAndUpdate(klines []types.KLine) {
var recentT = klines[end-(inc.Window-1) : end+1]
val, err := calculateS6(recentT, indicator.KLineHighPriceMapper, indicator.KLineLowPriceMapper, indicator.KLineClosePriceMapper, indicator.KLineVolumeMapper)
val, err := calculateS6(recentT, types.KLineHighPriceMapper, types.KLineLowPriceMapper, types.KLineClosePriceMapper, types.KLineVolumeMapper)
if err != nil {
log.WithError(err).Error("can not calculate")
return

View File

@ -42,7 +42,7 @@ func (inc *S7) CalculateAndUpdate(klines []types.KLine) {
var recentT = klines[end-(inc.Window-1) : end+1]
val, err := calculateS7(recentT, indicator.KLineOpenPriceMapper, indicator.KLineClosePriceMapper)
val, err := calculateS7(recentT, types.KLineOpenPriceMapper, types.KLineClosePriceMapper)
if err != nil {
log.WithError(err).Error("can not calculate")
return

View File

@ -72,7 +72,7 @@ func (inc *SHARK) PushK(k types.KLine) {
return
}
inc.Update(indicator.KLineHighPriceMapper(k), indicator.KLineLowPriceMapper(k), indicator.KLineClosePriceMapper(k))
inc.Update(types.KLineHighPriceMapper(k), types.KLineLowPriceMapper(k), types.KLineClosePriceMapper(k))
inc.EndTime = k.EndTime.Time()
inc.EmitUpdate(inc.Last(0))
}

View File

@ -74,7 +74,7 @@ func (inc *NRR) PushK(k types.KLine) {
return
}
inc.Update(indicator.KLineOpenPriceMapper(k), indicator.KLineClosePriceMapper(k))
inc.Update(types.KLineOpenPriceMapper(k), types.KLineClosePriceMapper(k))
inc.EndTime = k.EndTime.Time()
inc.EmitUpdate(inc.Last(0))
}

View File

@ -3,9 +3,10 @@ package linregmaker
import (
"context"
"fmt"
"github.com/c9s/bbgo/pkg/risk/dynamicrisk"
"sync"
"github.com/c9s/bbgo/pkg/risk/dynamicrisk"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/util"
@ -221,20 +222,20 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
})
}
// Setup Exits
// Initialize Exits
s.ExitMethods.SetAndSubscribe(session, s)
// Setup dynamic spread
// Initialize dynamic spread
if s.DynamicSpread.IsEnabled() {
s.DynamicSpread.Initialize(s.Symbol, session)
}
// Setup dynamic exposure
// Initialize dynamic exposure
if s.DynamicExposure.IsEnabled() {
s.DynamicExposure.Initialize(s.Symbol, session)
}
// Setup dynamic quantities
// Initialize dynamic quantities
if len(s.DynamicQuantityIncrease) > 0 {
s.DynamicQuantityIncrease.Initialize(s.Symbol, session)
}

View File

@ -0,0 +1,103 @@
package rsicross
import (
"context"
"fmt"
"sync"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator/v2"
"github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/types"
)
const ID = "rsicross"
func init() {
bbgo.RegisterStrategy(ID, &Strategy{})
}
type Strategy struct {
*common.Strategy
Environment *bbgo.Environment
Market types.Market
Symbol string `json:"symbol"`
Interval types.Interval `json:"interval"`
SlowWindow int `json:"slowWindow"`
FastWindow int `json:"fastWindow"`
bbgo.OpenPositionOptions
}
func (s *Strategy) ID() string {
return ID
}
func (s *Strategy) InstanceID() string {
return fmt.Sprintf("%s:%s:%s:%d-%d", ID, s.Symbol, s.Interval, s.FastWindow, s.SlowWindow)
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
s.Strategy = &common.Strategy{}
s.Strategy.Initialize(ctx, s.Environment, session, s.Market, ID, s.InstanceID())
fastRsi := session.Indicators(s.Symbol).RSI(types.IntervalWindow{Interval: s.Interval, Window: s.FastWindow})
slowRsi := session.Indicators(s.Symbol).RSI(types.IntervalWindow{Interval: s.Interval, Window: s.SlowWindow})
rsiCross := indicatorv2.Cross(fastRsi, slowRsi)
rsiCross.OnUpdate(func(v float64) {
switch indicatorv2.CrossType(v) {
case indicatorv2.CrossOver:
opts := s.OpenPositionOptions
opts.Long = true
if price, ok := session.LastPrice(s.Symbol); ok {
opts.Price = price
}
// opts.Price = closePrice
opts.Tags = []string{"rsiCrossOver"}
if _, err := s.OrderExecutor.OpenPosition(ctx, opts); err != nil {
logErr(err, "unable to open position")
}
case indicatorv2.CrossUnder:
if err := s.OrderExecutor.ClosePosition(ctx, fixedpoint.One); err != nil {
logErr(err, "failed to close position")
}
}
})
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
})
return nil
}
func logErr(err error, msgAndArgs ...interface{}) bool {
if err == nil {
return false
}
if len(msgAndArgs) == 0 {
log.WithError(err).Error(err.Error())
} else if len(msgAndArgs) == 1 {
msg := msgAndArgs[0].(string)
log.WithError(err).Error(msg)
} else if len(msgAndArgs) > 1 {
msg := msgAndArgs[0].(string)
log.WithError(err).Errorf(msg, msgAndArgs[1:]...)
}
return true
}

View File

@ -2,24 +2,24 @@ package scmaker
import (
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator"
indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2"
"github.com/c9s/bbgo/pkg/types"
)
type IntensityStream struct {
*indicator.Float64Series
*types.Float64Series
Buy, Sell *indicator.RMAStream
Buy, Sell *indicatorv2.RMAStream
window int
}
func Intensity(source indicator.KLineSubscription, window int) *IntensityStream {
func Intensity(source indicatorv2.KLineSubscription, window int) *IntensityStream {
s := &IntensityStream{
Float64Series: indicator.NewFloat64Series(),
Float64Series: types.NewFloat64Series(),
window: window,
Buy: indicator.RMA2(indicator.NewFloat64Series(), window, false),
Sell: indicator.RMA2(indicator.NewFloat64Series(), window, false),
Buy: indicatorv2.RMA2(types.NewFloat64Series(), window, false),
Sell: indicatorv2.RMA2(types.NewFloat64Series(), window, false),
}
threshold := fixedpoint.NewFromFloat(100.0)

View File

@ -10,8 +10,9 @@ import (
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator"
. "github.com/c9s/bbgo/pkg/indicator/v2"
"github.com/c9s/bbgo/pkg/risk/riskcontrol"
"github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/types"
)
@ -36,6 +37,8 @@ func init() {
// Strategy scmaker is a stable coin market maker
type Strategy struct {
*common.Strategy
Environment *bbgo.Environment
Market types.Market
@ -64,19 +67,14 @@ type Strategy struct {
CircuitBreakLossThreshold fixedpoint.Value `json:"circuitBreakLossThreshold"`
CircuitBreakEMA types.IntervalWindow `json:"circuitBreakEMA"`
Position *types.Position `json:"position,omitempty" persistence:"position"`
ProfitStats *types.ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
session *bbgo.ExchangeSession
orderExecutor *bbgo.GeneralOrderExecutor
liquidityOrderBook, adjustmentOrderBook *bbgo.ActiveOrderBook
book *types.StreamOrderBook
liquidityScale bbgo.Scale
// indicators
ewma *indicator.EWMAStream
boll *indicator.BOLLStream
ewma *EWMAStream
boll *BOLLStream
intensity *IntensityStream
positionRiskControl *riskcontrol.PositionRiskControl
@ -102,9 +100,9 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
instanceID := s.InstanceID()
s.Strategy = &common.Strategy{}
s.Strategy.Initialize(ctx, s.Environment, session, s.Market, ID, s.InstanceID())
s.session = session
s.book = types.NewStreamBook(s.Symbol)
s.book.BindStream(session.UserDataStream)
@ -114,39 +112,9 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.adjustmentOrderBook = bbgo.NewActiveOrderBook(s.Symbol)
s.adjustmentOrderBook.BindStream(session.UserDataStream)
// If position is nil, we need to allocate a new position for calculation
if s.Position == nil {
s.Position = types.NewPositionFromMarket(s.Market)
}
// Always update the position fields
s.Position.Strategy = ID
s.Position.StrategyInstanceID = instanceID
// if anyone of the fee rate is defined, this assumes that both are defined.
// so that zero maker fee could be applied
if s.session.MakerFeeRate.Sign() > 0 || s.session.TakerFeeRate.Sign() > 0 {
s.Position.SetExchangeFeeRate(s.session.ExchangeName, types.ExchangeFee{
MakerFeeRate: s.session.MakerFeeRate,
TakerFeeRate: s.session.TakerFeeRate,
})
}
if s.ProfitStats == nil {
s.ProfitStats = types.NewProfitStats(s.Market)
}
s.orderExecutor = bbgo.NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID, s.Position)
s.orderExecutor.BindEnvironment(s.Environment)
s.orderExecutor.BindProfitStats(s.ProfitStats)
s.orderExecutor.Bind()
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(ctx, s)
})
if !s.PositionHardLimit.IsZero() && !s.MaxPositionQuantity.IsZero() {
log.Infof("positionHardLimit and maxPositionQuantity are configured, setting up PositionRiskControl...")
s.positionRiskControl = riskcontrol.NewPositionRiskControl(s.orderExecutor, s.PositionHardLimit, s.MaxPositionQuantity)
s.positionRiskControl = riskcontrol.NewPositionRiskControl(s.OrderExecutor, s.PositionHardLimit, s.MaxPositionQuantity)
}
if !s.CircuitBreakLossThreshold.IsZero() {
@ -194,17 +162,17 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
err := s.liquidityOrderBook.GracefulCancel(ctx, s.session.Exchange)
err := s.liquidityOrderBook.GracefulCancel(ctx, s.Session.Exchange)
logErr(err, "unable to cancel liquidity orders")
err = s.adjustmentOrderBook.GracefulCancel(ctx, s.session.Exchange)
err = s.adjustmentOrderBook.GracefulCancel(ctx, s.Session.Exchange)
logErr(err, "unable to cancel adjustment orders")
})
return nil
}
func (s *Strategy) preloadKLines(inc *indicator.KLineStream, session *bbgo.ExchangeSession, symbol string, interval types.Interval) {
func (s *Strategy) preloadKLines(inc *KLineStream, session *bbgo.ExchangeSession, symbol string, interval types.Interval) {
if store, ok := session.MarketDataStore(symbol); ok {
if kLinesData, ok := store.KLinesOfInterval(interval); ok {
for _, k := range *kLinesData {
@ -215,46 +183,46 @@ func (s *Strategy) preloadKLines(inc *indicator.KLineStream, session *bbgo.Excha
}
func (s *Strategy) initializeMidPriceEMA(session *bbgo.ExchangeSession) {
kLines := indicator.KLines(session.MarketDataStream, s.Symbol, s.MidPriceEMA.Interval)
s.ewma = indicator.EWMA2(indicator.ClosePrices(kLines), s.MidPriceEMA.Window)
kLines := KLines(session.MarketDataStream, s.Symbol, s.MidPriceEMA.Interval)
s.ewma = EWMA2(ClosePrices(kLines), s.MidPriceEMA.Window)
s.preloadKLines(kLines, session, s.Symbol, s.MidPriceEMA.Interval)
}
func (s *Strategy) initializeIntensityIndicator(session *bbgo.ExchangeSession) {
kLines := indicator.KLines(session.MarketDataStream, s.Symbol, s.StrengthInterval)
kLines := KLines(session.MarketDataStream, s.Symbol, s.StrengthInterval)
s.intensity = Intensity(kLines, 10)
s.preloadKLines(kLines, session, s.Symbol, s.StrengthInterval)
}
func (s *Strategy) initializePriceRangeBollinger(session *bbgo.ExchangeSession) {
kLines := indicator.KLines(session.MarketDataStream, s.Symbol, s.PriceRangeBollinger.Interval)
closePrices := indicator.ClosePrices(kLines)
s.boll = indicator.BOLL2(closePrices, s.PriceRangeBollinger.Window, s.PriceRangeBollinger.K)
kLines := KLines(session.MarketDataStream, s.Symbol, s.PriceRangeBollinger.Interval)
closePrices := ClosePrices(kLines)
s.boll = BOLL(closePrices, s.PriceRangeBollinger.Window, s.PriceRangeBollinger.K)
s.preloadKLines(kLines, session, s.Symbol, s.PriceRangeBollinger.Interval)
}
func (s *Strategy) placeAdjustmentOrders(ctx context.Context) {
_ = s.adjustmentOrderBook.GracefulCancel(ctx, s.session.Exchange)
_ = s.adjustmentOrderBook.GracefulCancel(ctx, s.Session.Exchange)
if s.Position.IsDust() {
return
}
ticker, err := s.session.Exchange.QueryTicker(ctx, s.Symbol)
ticker, err := s.Session.Exchange.QueryTicker(ctx, s.Symbol)
if logErr(err, "unable to query ticker") {
return
}
if _, err := s.session.UpdateAccount(ctx); err != nil {
if _, err := s.Session.UpdateAccount(ctx); err != nil {
logErr(err, "unable to update account")
return
}
baseBal, _ := s.session.Account.Balance(s.Market.BaseCurrency)
quoteBal, _ := s.session.Account.Balance(s.Market.QuoteCurrency)
baseBal, _ := s.Session.Account.Balance(s.Market.BaseCurrency)
quoteBal, _ := s.Session.Account.Balance(s.Market.QuoteCurrency)
var adjOrders []types.SubmitOrder
@ -262,7 +230,7 @@ func (s *Strategy) placeAdjustmentOrders(ctx context.Context) {
tickSize := s.Market.TickSize
if s.Position.IsShort() {
price := profitProtectedPrice(types.SideTypeBuy, s.Position.AverageCost, ticker.Sell.Add(tickSize.Neg()), s.session.MakerFeeRate, s.MinProfit)
price := profitProtectedPrice(types.SideTypeBuy, s.Position.AverageCost, ticker.Sell.Add(tickSize.Neg()), s.Session.MakerFeeRate, s.MinProfit)
quoteQuantity := fixedpoint.Min(price.Mul(posSize), quoteBal.Available)
bidQuantity := quoteQuantity.Div(price)
@ -280,7 +248,7 @@ func (s *Strategy) placeAdjustmentOrders(ctx context.Context) {
TimeInForce: types.TimeInForceGTC,
})
} else if s.Position.IsLong() {
price := profitProtectedPrice(types.SideTypeSell, s.Position.AverageCost, ticker.Buy.Add(tickSize), s.session.MakerFeeRate, s.MinProfit)
price := profitProtectedPrice(types.SideTypeSell, s.Position.AverageCost, ticker.Buy.Add(tickSize), s.Session.MakerFeeRate, s.MinProfit)
askQuantity := fixedpoint.Min(posSize, baseBal.Available)
if s.Market.IsDustQuantity(askQuantity, price) {
@ -298,7 +266,7 @@ func (s *Strategy) placeAdjustmentOrders(ctx context.Context) {
})
}
createdOrders, err := s.orderExecutor.SubmitOrders(ctx, adjOrders...)
createdOrders, err := s.OrderExecutor.SubmitOrders(ctx, adjOrders...)
if logErr(err, "unable to place liquidity orders") {
return
}
@ -312,12 +280,12 @@ func (s *Strategy) placeLiquidityOrders(ctx context.Context) {
return
}
err := s.liquidityOrderBook.GracefulCancel(ctx, s.session.Exchange)
err := s.liquidityOrderBook.GracefulCancel(ctx, s.Session.Exchange)
if logErr(err, "unable to cancel orders") {
return
}
ticker, err := s.session.Exchange.QueryTicker(ctx, s.Symbol)
ticker, err := s.Session.Exchange.QueryTicker(ctx, s.Symbol)
if logErr(err, "unable to query ticker") {
return
}
@ -331,13 +299,13 @@ func (s *Strategy) placeLiquidityOrders(ctx context.Context) {
ticker.Sell = ticker.Buy.Add(s.Market.TickSize)
}
if _, err := s.session.UpdateAccount(ctx); err != nil {
if _, err := s.Session.UpdateAccount(ctx); err != nil {
logErr(err, "unable to update account")
return
}
baseBal, _ := s.session.Account.Balance(s.Market.BaseCurrency)
quoteBal, _ := s.session.Account.Balance(s.Market.QuoteCurrency)
baseBal, _ := s.Session.Account.Balance(s.Market.BaseCurrency)
quoteBal, _ := s.Session.Account.Balance(s.Market.QuoteCurrency)
spread := ticker.Sell.Sub(ticker.Buy)
tickSize := fixedpoint.Max(s.LiquidityLayerTickSize, s.Market.TickSize)
@ -497,7 +465,7 @@ func (s *Strategy) placeLiquidityOrders(ctx context.Context) {
makerQuota.Commit()
createdOrders, err := s.orderExecutor.SubmitOrders(ctx, liqOrders...)
createdOrders, err := s.OrderExecutor.SubmitOrders(ctx, liqOrders...)
if logErr(err, "unable to place liquidity orders") {
return
}

View File

@ -1,4 +1,4 @@
package indicator
package types
//go:generate callbackgen -type Float64Updater
type Float64Updater struct {

View File

@ -0,0 +1,15 @@
// Code generated by "callbackgen -type Float64Updater"; DO NOT EDIT.
package types
import ()
func (F *Float64Updater) OnUpdate(cb func(v float64)) {
F.updateCallbacks = append(F.updateCallbacks, cb)
}
func (F *Float64Updater) EmitUpdate(v float64) {
for _, cb := range F.updateCallbacks {
cb(v)
}
}

View File

@ -645,3 +645,41 @@ func KLineWith(symbol string, interval Interval, callback KLineCallback) KLineCa
callback(k)
}
}
type KLineValueMapper func(k KLine) float64
func KLineOpenPriceMapper(k KLine) float64 {
return k.Open.Float64()
}
func KLineClosePriceMapper(k KLine) float64 {
return k.Close.Float64()
}
func KLineTypicalPriceMapper(k KLine) float64 {
return (k.High.Float64() + k.Low.Float64() + k.Close.Float64()) / 3.
}
func KLinePriceVolumeMapper(k KLine) float64 {
return k.Close.Mul(k.Volume).Float64()
}
func KLineVolumeMapper(k KLine) float64 {
return k.Volume.Float64()
}
func MapKLinePrice(kLines []KLine, f KLineValueMapper) (prices []float64) {
for _, k := range kLines {
prices = append(prices, f(k))
}
return prices
}
func KLineLowPriceMapper(k KLine) float64 {
return k.Low.Float64()
}
func KLineHighPriceMapper(k KLine) float64 {
return k.High.Float64()
}

View File

@ -1,25 +1,24 @@
package indicator
package types
import (
"github.com/c9s/bbgo/pkg/datatype/floats"
"github.com/c9s/bbgo/pkg/types"
)
type Float64Series struct {
types.SeriesBase
SeriesBase
Float64Updater
slice floats.Slice
Slice floats.Slice
}
func NewFloat64Series(v ...float64) *Float64Series {
s := &Float64Series{}
s.slice = v
s.SeriesBase.Series = s.slice
s.Slice = v
s.SeriesBase.Series = s.Slice
return s
}
func (f *Float64Series) Last(i int) float64 {
return f.slice.Last(i)
return f.Slice.Last(i)
}
func (f *Float64Series) Index(i int) float64 {
@ -27,15 +26,11 @@ func (f *Float64Series) Index(i int) float64 {
}
func (f *Float64Series) Length() int {
return len(f.slice)
}
func (f *Float64Series) Slice() floats.Slice {
return f.slice
return len(f.Slice)
}
func (f *Float64Series) PushAndEmit(x float64) {
f.slice.Push(x)
f.Slice.Push(x)
f.EmitUpdate(x)
}
@ -71,3 +66,22 @@ func (f *Float64Series) Bind(source Float64Source, target Float64Calculator) {
f.Subscribe(source, c)
}
}
type Float64Calculator interface {
Calculate(x float64) float64
PushAndEmit(x float64)
}
type Float64Source interface {
Series
OnUpdate(f func(v float64))
}
type Float64Subscription interface {
Series
AddSubscriber(f func(v float64))
}
type Float64Truncator interface {
Truncate()
}