Merge pull request #41 from c9s/feature/indicators

feature: support standard indicator sets
This commit is contained in:
Yo-An Lin 2020-10-28 17:47:57 +08:00 committed by GitHub
commit 90ca829915
17 changed files with 286 additions and 102 deletions

View File

@ -41,6 +41,5 @@ exchangeStrategies:
buyandhold:
symbol: "BTCUSDT"
interval: "1m"
# quantity = change percentage * baseQuantity
baseQuantity: 1.0
minDropPercentage: -0.0001
baseQuantity: 0.001
minDropPercentage: -0.001

View File

@ -139,8 +139,37 @@ func (environ *Environment) Connect(ctx context.Context) error {
marketDataStore := store.NewMarketDataStore(symbol)
marketDataStore.BindStream(session.Stream)
session.marketDataStores[symbol] = marketDataStore
standardIndicatorSet := NewStandardIndicatorSet(symbol)
standardIndicatorSet.BindMarketDataStore(marketDataStore)
session.standardIndicatorSets[symbol] = standardIndicatorSet
}
// move market data store dispatch to here, use one callback to dispatch the market data
// session.Stream.OnKLineClosed(func(kline types.KLine) { })
now := time.Now()
for symbol := range loadedSymbols {
marketDataStore, ok := session.marketDataStores[symbol]
if !ok {
return errors.Errorf("symbol %s is not defined", symbol)
}
for interval := range types.SupportedIntervals {
kLines, err := session.Exchange.QueryKLines(ctx, symbol, interval.String(), types.KLineQueryOptions{
EndTime: &now,
Limit: 100,
})
if err != nil {
return err
}
for _, k := range kLines {
// let market data store trigger the update, so that the indicator could be updated too.
marketDataStore.AddKLine(k)
}
}
}
log.Infof("querying balances...")
@ -181,3 +210,18 @@ func (environ *Environment) Connect(ctx context.Context) error {
return nil
}
func BatchQueryKLineWindows(ctx context.Context, e types.Exchange, symbol string, intervals []string, startTime, endTime time.Time) (map[string]types.KLineWindow, error) {
batch := &types.ExchangeBatchProcessor{Exchange: e}
klineWindows := map[string]types.KLineWindow{}
for _, interval := range intervals {
kLines, err := batch.BatchQueryKLines(ctx, symbol, interval, startTime, endTime)
if err != nil {
return klineWindows, err
}
klineWindows[interval] = kLines
}
return klineWindows, nil
}

37
pkg/bbgo/injection.go Normal file
View File

@ -0,0 +1,37 @@
package bbgo
import (
"reflect"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
func injectStrategyField(strategy SingleExchangeStrategy, rs reflect.Value, fieldName string, obj interface{}) error {
field := rs.FieldByName(fieldName)
if !field.IsValid() {
return nil
}
logrus.Infof("found %s in strategy %T, injecting %T...", fieldName, strategy, obj)
if !field.CanSet() {
return errors.Errorf("field %s of strategy %T can not be set", fieldName, strategy)
}
rv := reflect.ValueOf(obj)
if field.Kind() == reflect.Ptr {
if field.Type() != rv.Type() {
return errors.Errorf("field type mismatches: %s != %s", field.Type(), rv.Type())
}
field.Set(rv)
} else if field.Kind() == reflect.Interface {
field.Set(rv)
} else {
// set as value
field.Set(rv.Elem())
}
return nil
}

View File

@ -1,10 +1,61 @@
package bbgo
import (
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/store"
"github.com/c9s/bbgo/pkg/types"
)
type IntervalWindow struct {
// The interval of kline
Interval types.Interval
// The windows size of EWMA and SMA
Window int
}
type StandardIndicatorSet struct {
Symbol string
// Standard indicators
// interval -> window
SMA map[IntervalWindow]*indicator.SMA
EWMA map[IntervalWindow]*indicator.EWMA
}
func NewStandardIndicatorSet(symbol string) *StandardIndicatorSet {
set := &StandardIndicatorSet{
Symbol: symbol,
SMA: make(map[IntervalWindow]*indicator.SMA),
EWMA: make(map[IntervalWindow]*indicator.EWMA),
}
// let us pre-defined commonly used intervals
for interval := range types.SupportedIntervals {
for _, window := range []int{7, 25, 99} {
set.SMA[IntervalWindow{interval, window}] = &indicator.SMA{
Interval: interval,
Window: window,
}
set.EWMA[IntervalWindow{interval, window}] = &indicator.EWMA{
Interval: interval,
Window: window,
}
}
}
return set
}
func (set *StandardIndicatorSet) BindMarketDataStore(store *store.MarketDataStore) {
for _, inc := range set.SMA {
inc.BindMarketDataStore(store)
}
for _, inc := range set.EWMA {
inc.BindMarketDataStore(store)
}
}
// ExchangeSession presents the exchange connection session
// It also maintains and collects the data returned from the stream.
type ExchangeSession struct {
@ -30,8 +81,12 @@ type ExchangeSession struct {
// map: symbol -> []trade
Trades map[string][]types.Trade
// marketDataStores contains the market data store of each market
marketDataStores map[string]*store.MarketDataStore
// standard indicators of each market
standardIndicatorSets map[string]*StandardIndicatorSet
tradeReporter *TradeReporter
}
@ -49,6 +104,11 @@ func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession {
}
}
func (session *ExchangeSession) StandardIndicatorSet(symbol string) (*StandardIndicatorSet, bool) {
set, ok := session.standardIndicatorSets[symbol]
return set, ok
}
// MarketDataStore returns the market data store of a symbol
func (session *ExchangeSession) MarketDataStore(symbol string) (s *store.MarketDataStore, ok bool) {
s, ok = session.marketDataStores[symbol]

View File

@ -4,7 +4,6 @@ import (
"context"
"reflect"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/types"
@ -165,35 +164,6 @@ func (trader *Trader) Run(ctx context.Context) error {
return trader.environment.Connect(ctx)
}
func injectStrategyField(strategy SingleExchangeStrategy, rs reflect.Value, fieldName string, obj interface{}) error {
field := rs.FieldByName(fieldName)
if !field.IsValid() {
return nil
}
log.Infof("found %s in strategy %T, injecting %T...", fieldName, strategy, obj)
if !field.CanSet() {
return errors.Errorf("field %s of strategy %T can not be set", fieldName, strategy)
}
rv := reflect.ValueOf(obj)
if field.Kind() == reflect.Ptr {
if field.Type() != rv.Type() {
return errors.Errorf("field type mismatches: %s != %s", field.Type(), rv.Type())
}
field.Set(rv)
} else if field.Kind() == reflect.Interface {
field.Set(rv)
} else {
// set as value
field.Set(rv.Elem())
}
return nil
}
/*
func (trader *OrderExecutor) RunStrategyWithHotReload(ctx context.Context, strategy SingleExchangeStrategy, configFile string) (chan struct{}, error) {
var done = make(chan struct{})

View File

@ -414,7 +414,7 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol, interval string, opt
for _, k := range resp {
kLines = append(kLines, types.KLine{
Symbol: symbol,
Interval: interval,
Interval: types.Interval(interval),
StartTime: time.Unix(0, k.OpenTime*int64(time.Millisecond)),
EndTime: time.Unix(0, k.CloseTime*int64(time.Millisecond)),
Open: util.MustParseFloat(k.Open),

View File

@ -414,7 +414,7 @@ type KLineEvent struct {
func (k *KLine) KLine() types.KLine {
return types.KLine{
Symbol: k.Symbol,
Interval: k.Interval,
Interval: types.Interval(k.Interval),
StartTime: time.Unix(0, k.StartTime*int64(time.Millisecond)),
EndTime: time.Unix(0, k.EndTime*int64(time.Millisecond)),
Open: util.MustParseFloat(k.Open),

View File

@ -205,7 +205,7 @@ type KLine struct {
func (k KLine) KLine() types.KLine {
return types.KLine{
Symbol: k.Symbol,
Interval: k.Interval,
Interval: types.Interval(k.Interval),
StartTime: k.StartTime,
EndTime: k.EndTime,
Open: k.Open,

View File

@ -120,7 +120,7 @@ func (k KLinePayload) KLine() types.KLine {
StartTime: time.Unix(0, k.StartTime*int64(time.Millisecond)),
EndTime: time.Unix(0, k.EndTime*int64(time.Millisecond)),
Symbol: k.Market,
Interval: k.Resolution,
Interval: types.Interval(k.Resolution),
Open: util.MustParseFloat(k.Open),
Close: util.MustParseFloat(k.Close),
High: util.MustParseFloat(k.High),

60
pkg/indicator/ema.go Normal file
View File

@ -0,0 +1,60 @@
package indicator
import (
"time"
"github.com/c9s/bbgo/pkg/store"
"github.com/c9s/bbgo/pkg/types"
)
type EWMA struct {
Interval types.Interval
Window int
Values Float64Slice
EndTime time.Time
}
func (inc *EWMA) calculateAndUpdate(kLines []types.KLine) {
if len(kLines) < inc.Window {
// we can't calculate
return
}
var index = len(kLines) - 1
var lastK = kLines[index]
var multiplier = 2.0 / float64(inc.Window+1)
if inc.EndTime != zeroTime && lastK.EndTime.Before(inc.EndTime) {
return
}
var recentK = kLines[index-(inc.Window-1) : index+1]
if len(inc.Values) > 0 {
var previousEWMA = inc.Values[len(inc.Values)-1]
var ewma = lastK.Close * multiplier + previousEWMA * (1 - multiplier)
inc.Values.Push(ewma)
} else {
// The first EWMA is actually SMA
var sma = calculateSMA(recentK)
inc.Values.Push(sma)
}
inc.EndTime = kLines[index].EndTime
}
func (inc *EWMA) BindMarketDataStore(store *store.MarketDataStore) {
store.OnKLineUpdate(func(kline types.KLine) {
// kline guard
if inc.Interval != kline.Interval {
return
}
if inc.EndTime != zeroTime && inc.EndTime.Before(inc.EndTime) {
return
}
if kLines, ok := store.KLinesOfInterval(types.Interval(kline.Interval)); ok {
inc.calculateAndUpdate(kLines)
}
})
}

View File

@ -1,57 +0,0 @@
package ma
import (
"math"
"time"
"github.com/c9s/bbgo/pkg/store"
"github.com/c9s/bbgo/pkg/types"
)
type SMA struct {
store *store.MarketDataStore
Window int
}
func NewSMA(window int) *SMA {
return &SMA{
Window: window,
}
}
func (i *SMA) handleUpdate(kline types.KLine) {
klines, ok := i.store.KLineWindows[types.Interval(kline.Interval)]
if !ok {
return
}
if len(klines) < i.Window {
return
}
// calculate ma
}
type IndicatorValue struct {
Value float64
Time time.Time
}
func calculateMovingAverage(klines types.KLineWindow, period int) (values []IndicatorValue) {
for idx := range klines[period:] {
offset := idx + period
sum := klines[offset-period : offset].ReduceClose()
values = append(values, IndicatorValue{
Time: klines[offset].GetEndTime(),
Value: math.Round(sum / float64(period)),
})
}
return values
}
func (i *SMA) SubscribeStore(store *store.MarketDataStore) {
i.store = store
// register kline update callback
// store.OnUpdate(i.handleUpdate)
}

65
pkg/indicator/sma.go Normal file
View File

@ -0,0 +1,65 @@
package indicator
import (
"time"
"github.com/c9s/bbgo/pkg/store"
"github.com/c9s/bbgo/pkg/types"
)
type Float64Slice []float64
func (s *Float64Slice) Push(v float64) {
*s = append(*s, v)
}
var zeroTime time.Time
type SMA struct {
Interval types.Interval
Window int
Values Float64Slice
EndTime time.Time
}
func (inc *SMA) calculateAndUpdate(kLines []types.KLine) {
if len(kLines) < inc.Window {
return
}
var index = len(kLines) - 1
var kline = kLines[index]
if inc.EndTime != zeroTime && kline.EndTime.Before(inc.EndTime) {
return
}
var recentK = kLines[index-(inc.Window-1) : index+1]
var sma = calculateSMA(recentK)
inc.Values.Push(sma)
inc.EndTime = kLines[index].EndTime
}
func (inc *SMA) BindMarketDataStore(store *store.MarketDataStore) {
store.OnKLineUpdate(func(kline types.KLine) {
// kline guard
if inc.Interval != kline.Interval {
return
}
if kLines, ok := store.KLinesOfInterval(kline.Interval); ok {
inc.calculateAndUpdate(kLines)
}
})
}
func calculateSMA(kLines []types.KLine) float64 {
sum := 0.0
length := len(kLines)
for _, k := range kLines {
sum += k.Close
}
avg := sum / float64(length)
return avg
}

View File

@ -80,8 +80,9 @@ func (store *MarketDataStore) handleKLineClosed(kline types.KLine) {
}
func (store *MarketDataStore) AddKLine(kline types.KLine) {
window := store.KLineWindows[types.Interval(kline.Interval)]
window := store.KLineWindows[kline.Interval]
window.Add(kline)
store.KLineWindows[kline.Interval] = window
store.LastKLine = kline

View File

@ -43,12 +43,13 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
return
}
quantity := s.BaseQuantity * (1.0 + math.Abs(changePercentage))
_, err := orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: kline.Symbol,
Market: market,
Side: types.SideTypeBuy,
Type: types.OrderTypeMarket,
Quantity: s.BaseQuantity * math.Abs(changePercentage),
Quantity: quantity,
})
if err != nil {
log.WithError(err).Error("submit order error")

View File

@ -74,7 +74,7 @@ type ExchangeBatchProcessor struct {
func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol, interval string, startTime, endTime time.Time) (allKLines []KLine, err error) {
for startTime.Before(endTime) {
klines, err := e.QueryKLines(ctx, symbol, interval, KLineQueryOptions{
kLines, err := e.QueryKLines(ctx, symbol, interval, KLineQueryOptions{
StartTime: &startTime,
Limit: 1000,
})
@ -83,7 +83,7 @@ func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol, in
return nil, err
}
for _, kline := range klines {
for _, kline := range kLines {
if kline.EndTime.After(endTime) {
return allKLines, nil
}

View File

@ -2,6 +2,10 @@ package types
type Interval string
func (i Interval) String() string {
return string(i)
}
var Interval1m = Interval("1m")
var Interval5m = Interval("5m")
var Interval15m = Interval("15m")

View File

@ -43,7 +43,7 @@ type KLine struct {
EndTime time.Time
Symbol string
Interval string
Interval Interval
Open float64
Close float64
@ -65,7 +65,7 @@ func (k KLine) GetEndTime() time.Time {
return k.EndTime
}
func (k KLine) GetInterval() string {
func (k KLine) GetInterval() Interval {
return k.Interval
}
@ -229,7 +229,7 @@ func (k KLineWindow) Last() KLine {
return k[len(k)-1]
}
func (k KLineWindow) GetInterval() string {
func (k KLineWindow) GetInterval() Interval {
return k.First().Interval
}