Merge pull request #910 from zenixls2/feature/ewo_renew

SerialMarketDataStore, elliottwave renewal
This commit is contained in:
Zenix 2022-09-07 18:54:01 +09:00 committed by GitHub
commit afad9cca47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1619 additions and 316 deletions

View File

@ -56,12 +56,12 @@ exchangeStrategies:
# when farest price from entry goes over that ratio, start using the callback ratio accordingly to do trailingstop
#trailingActivationRatio: [0.01, 0.016, 0.05]
#trailingActivationRatio: [0.001, 0.0081, 0.022]
trailingActivationRatio: [0.0029, 0.028]
trailingActivationRatio: [0.0012, 0.01]
#trailingActivationRatio: []
#trailingCallbackRate: []
#trailingCallbackRate: [0.002, 0.01, 0.1]
#trailingCallbackRate: [0.0004, 0.0009, 0.018]
trailingCallbackRate: [0.0005, 0.0149]
trailingCallbackRate: [0.0006, 0.0049]
generateGraph: true
graphPNLDeductFee: false
@ -124,15 +124,15 @@ sync:
- BTCUSDT
backtest:
startTime: "2022-08-01"
endTime: "2022-08-31"
startTime: "2022-09-01"
endTime: "2022-09-15"
symbols:
- BTCUSDT
sessions: [binance]
accounts:
binance:
makerFeeRate: 0.000
#takerFeeRate: 0.000
takerFeeRate: 0.000
balances:
BTC: 0
USDT: 21

122
config/elliottwave.yaml Normal file
View File

@ -0,0 +1,122 @@
---
persistence:
redis:
host: 127.0.0.1
port: 6379
db: 0
sessions:
binance:
exchange: binance
futures: false
envVarPrefix: binance
heikinAshi: false
# Drift strategy intends to place buy/sell orders as much value mas it could be. To exchanges that requires to
# calculate fees before placing limit orders (e.g. FTX Pro), make sure the fee rate is configured correctly and
# enable modifyOrderAmountForFee to prevent order rejection.
makerFeeRate: 0.0002
takerFeeRate: 0.0007
modifyOrderAmountForFee: false
exchangeStrategies:
- on: binance
elliottwave:
symbol: BNBBUSD
# kline interval for indicators
interval: 3m
stoploss: 0.2%
windowATR: 14
windowQuick: 5
windowSlow: 19
source: hl2
pendingMinutes: 10
useHeikinAshi: true
drawGraph: true
graphIndicatorPath: "./indicator.png"
graphPNLPath: "./pnl.png"
graphCumPNLPath: "./cumpnl.png"
# ActivationRatio should be increasing order
# when farest price from entry goes over that ratio, start using the callback ratio accordingly to do trailingstop
#trailingActivationRatio: [0.01, 0.016, 0.05]
#trailingActivationRatio: [0.001, 0.0081, 0.022]
trailingActivationRatio: [0.0017, 0.01, 0.015]
#trailingActivationRatio: []
#trailingCallbackRate: []
#trailingCallbackRate: [0.002, 0.01, 0.1]
#trailingCallbackRate: [0.0004, 0.0009, 0.018]
trailingCallbackRate: [0.0006, 0.0049, 0.006]
#exits:
# - roiStopLoss:
# percentage: 0.35%
#- roiTakeProfit:
# percentage: 0.7%
#- protectiveStopLoss:
# activationRatio: 0.5%
# stopLossRatio: 0.2%
# placeStopOrder: false
#- trailingStop:
# callbackRate: 0.3%
# activationRatio is relative to the average cost,
# when side is buy, 1% means lower 1% than the average cost.
# when side is sell, 1% means higher 1% than the average cost.
# activationRatio: 0.7%
# minProfit uses the position ROI to calculate the profit ratio
# minProfit: 1.5%
# interval: 1m
# side: sell
# closePosition: 100%
#- trailingStop:
# callbackRate: 0.3%
# activationRatio is relative to the average cost,
# when side is buy, 1% means lower 1% than the average cost.
# when side is sell, 1% means higher 1% than the average cost.
# activationRatio: 0.7%
# minProfit uses the position ROI to calculate the profit ratio
# minProfit: 1.5%
# interval: 1m
# side: buy
# closePosition: 100%
#- protectiveStopLoss:
# activationRatio: 5%
# stopLossRatio: 1%
# placeStopOrder: false
#- cumulatedVolumeTakeProfit:
# interval: 5m
# window: 2
# minQuoteVolume: 200_000_000
#- protectiveStopLoss:
# activationRatio: 2%
# stopLossRatio: 1%
# placeStopOrder: false
sync:
userDataStream:
trades: true
filledOrders: true
sessions:
- binance
symbols:
- BNBBUSD
backtest:
startTime: "2022-09-01"
endTime: "2022-09-30"
symbols:
- BNBBUSD
sessions: [binance]
accounts:
binance:
makerFeeRate: 0.000
takerFeeRate: 0.000
balances:
BNB: 0
BUSD: 20

View File

@ -3,6 +3,7 @@ package bbgo
import (
"context"
"encoding/json"
"fmt"
"time"
log "github.com/sirupsen/logrus"
@ -40,6 +41,30 @@ func (b *ActiveOrderBook) BindStream(stream types.Stream) {
stream.OnOrderUpdate(b.orderUpdateHandler)
}
func (b *ActiveOrderBook) waitClear(ctx context.Context, order types.Order, waitTime, timeout time.Duration) (bool, error) {
if !b.Exists(order) {
return true, nil
}
timeoutC := time.After(timeout)
for {
time.Sleep(waitTime)
clear := !b.Exists(order)
select {
case <-timeoutC:
return clear, nil
case <-ctx.Done():
return clear, ctx.Err()
default:
if clear {
return clear, nil
}
}
}
}
func (b *ActiveOrderBook) waitAllClear(ctx context.Context, waitTime, timeout time.Duration) (bool, error) {
numOfOrders := b.NumOfOrders()
clear := numOfOrders == 0
@ -67,6 +92,55 @@ func (b *ActiveOrderBook) waitAllClear(ctx context.Context, waitTime, timeout ti
}
}
// Cancel cancels the given order from activeOrderBook gracefully
func (b *ActiveOrderBook) Cancel(ctx context.Context, ex types.Exchange, order types.Order) error {
if !b.Exists(order) {
return fmt.Errorf("cannot find %v in orderbook", order)
}
// optimize order cancel for back-testing
if IsBackTesting {
return ex.CancelOrders(context.Background(), order)
}
log.Debugf("[ActiveOrderBook] gracefully cancelling %v order...", order.OrderID)
waitTime := CancelOrderWaitTime
startTime := time.Now()
// ensure order is cancelled
for {
// Some orders in the variable are not created on the server side yet,
// If we cancel these orders directly, we will get an unsent order error
// We wait here for a while for server to create these orders.
// time.Sleep(SentOrderWaitTime)
// since ctx might be canceled, we should use background context here
if err := ex.CancelOrders(context.Background(), order); err != nil {
log.WithError(err).Errorf("[ActiveORderBook] can not cancel %v order", order.OrderID)
}
log.Debugf("[ActiveOrderBook] waiting %s for %v order to be cancelled...", waitTime, order.OrderID)
clear, err := b.waitClear(ctx, order, waitTime, 5*time.Second)
if clear || err != nil {
break
}
b.Print()
openOrders, err := ex.QueryOpenOrders(ctx, order.Symbol)
if err != nil {
log.WithError(err).Errorf("can not query %s open orders", order.Symbol)
continue
}
openOrderStore := NewOrderStore(order.Symbol)
openOrderStore.Add(openOrders...)
// if it's not on the order book (open orders), we should remove it from our local side
if !openOrderStore.Exists(order.OrderID) {
b.Remove(order)
}
}
log.Debugf("[ActiveOrderBook] %v(%s) order is cancelled successfully in %s", order.OrderID, b.Symbol, time.Since(startTime))
return nil
}
// GracefulCancel cancels the active orders gracefully
func (b *ActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exchange) error {
// optimize order cancel for back-testing

View File

@ -144,6 +144,20 @@ func (e *GeneralOrderExecutor) GracefulCancelActiveOrderBook(ctx context.Context
return nil
}
func (e *GeneralOrderExecutor) GracefulCancelOrder(ctx context.Context, order types.Order) error {
if e.activeMakerOrders.NumOfOrders() == 0 {
return nil
}
if err := e.activeMakerOrders.Cancel(ctx, e.session.Exchange, order); err != nil {
// Retry once
if err = e.activeMakerOrders.Cancel(ctx, e.session.Exchange, order); err != nil {
return fmt.Errorf("cancel order error: %w", err)
}
}
e.tradeCollector.Process()
return nil
}
// GracefulCancel cancels all active maker orders
func (e *GeneralOrderExecutor) GracefulCancel(ctx context.Context) error {
return e.GracefulCancelActiveOrderBook(ctx, e.activeMakerOrders)

View File

@ -0,0 +1,69 @@
package bbgo
import (
"time"
"github.com/c9s/bbgo/pkg/types"
)
type SerialMarketDataStore struct {
*MarketDataStore
KLines map[types.Interval]*types.KLine
Subscription []types.Interval
}
func NewSerialMarketDataStore(symbol string) *SerialMarketDataStore {
return &SerialMarketDataStore{
MarketDataStore: NewMarketDataStore(symbol),
KLines: make(map[types.Interval]*types.KLine),
Subscription: []types.Interval{},
}
}
func (store *SerialMarketDataStore) Subscribe(interval types.Interval) {
// dedup
for _, i := range store.Subscription {
if i == interval {
return
}
}
store.Subscription = append(store.Subscription, interval)
}
func (store *SerialMarketDataStore) BindStream(stream types.Stream) {
stream.OnKLineClosed(store.handleKLineClosed)
}
func (store *SerialMarketDataStore) handleKLineClosed(kline types.KLine) {
store.AddKLine(kline)
}
func (store *SerialMarketDataStore) AddKLine(kline types.KLine) {
if kline.Symbol != store.Symbol {
return
}
// only consumes kline1m
if kline.Interval != types.Interval1m {
return
}
// endtime in minutes
timestamp := kline.StartTime.Time().Add(time.Minute)
for _, val := range store.Subscription {
k, ok := store.KLines[val]
if !ok {
k = &types.KLine{}
k.Set(&kline)
k.Interval = val
k.Closed = false
store.KLines[val] = k
} else {
k.Merge(&kline)
k.Closed = false
}
if timestamp.Truncate(val.Duration()) == timestamp {
k.Closed = true
store.MarketDataStore.AddKLine(*k)
delete(store.KLines, val)
}
}
}

View File

@ -23,6 +23,8 @@ import (
"github.com/c9s/bbgo/pkg/util"
)
var KLinePreloadLimit int64 = 1000
// ExchangeSession presents the exchange connection Session
// It also maintains and collects the data returned from the stream.
type ExchangeSession struct {
@ -418,29 +420,34 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ
for interval := range klineSubscriptions {
// avoid querying the last unclosed kline
endTime := environ.startTime
kLines, err := session.Exchange.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
EndTime: &endTime,
Limit: 1000, // indicators need at least 100
})
if err != nil {
return err
}
var i int64
for i = 0; i < KLinePreloadLimit; i += 1000 {
var duration time.Duration = time.Duration(-i * int64(interval.Duration()))
e := endTime.Add(duration)
if len(kLines) == 0 {
log.Warnf("no kline data for %s %s (end time <= %s)", symbol, interval, environ.startTime)
continue
}
kLines, err := session.Exchange.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
EndTime: &e,
Limit: 1000, // indicators need at least 100
})
if err != nil {
return err
}
// update last prices by the given kline
lastKLine := kLines[len(kLines)-1]
if interval == types.Interval1m {
log.Infof("last kline %+v", lastKLine)
session.lastPrices[symbol] = lastKLine.Close
}
if len(kLines) == 0 {
log.Warnf("no kline data for %s %s (end time <= %s)", symbol, interval, e)
continue
}
for _, k := range kLines {
// let market data store trigger the update, so that the indicator could be updated too.
marketDataStore.AddKLine(k)
// update last prices by the given kline
lastKLine := kLines[len(kLines)-1]
if interval == types.Interval1m {
session.lastPrices[symbol] = lastKLine.Close
}
for _, k := range kLines {
// let market data store trigger the update, so that the indicator could be updated too.
marketDataStore.AddKLine(k)
}
}
}
@ -499,6 +506,28 @@ func (session *ExchangeSession) MarketDataStore(symbol string) (s *MarketDataSto
return s, ok
}
// KLine updates will be received in the order listend in intervals array
func (session *ExchangeSession) SerialMarketDataStore(symbol string, intervals []types.Interval) (store *SerialMarketDataStore, ok bool) {
st, ok := session.MarketDataStore(symbol)
if !ok {
return nil, false
}
store = NewSerialMarketDataStore(symbol)
klines, ok := st.KLinesOfInterval(types.Interval1m)
if !ok {
log.Errorf("SerialMarketDataStore: cannot get 1m history")
return nil, false
}
for _, interval := range intervals {
store.Subscribe(interval)
}
for _, kline := range *klines {
store.AddKLine(kline)
}
store.BindStream(session.MarketDataStream)
return store, true
}
// OrderBook returns the personal orderbook of a symbol
func (session *ExchangeSession) OrderBook(symbol string) (s *types.StreamOrderBook, ok bool) {
s, ok = session.orderBooks[symbol]

82
pkg/bbgo/source.go Normal file
View File

@ -0,0 +1,82 @@
package bbgo
import (
"encoding/json"
"strings"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
log "github.com/sirupsen/logrus"
)
type SourceFunc func(*types.KLine) fixedpoint.Value
type selectorInternal struct {
Source string
sourceGetter SourceFunc
}
func (s *selectorInternal) UnmarshalJSON(d []byte) error {
if err := json.Unmarshal(d, &s.Source); err != nil {
return err
}
s.init()
return nil
}
func (s selectorInternal) MarshalJSON() ([]byte, error) {
if s.Source == "" {
s.Source = "close"
s.init()
}
return []byte("\"" + s.Source + "\""), nil
}
type SourceSelector struct {
Source selectorInternal `json:"source,omitempty"`
}
func (s *selectorInternal) init() {
switch strings.ToLower(s.Source) {
case "close":
s.sourceGetter = func(kline *types.KLine) fixedpoint.Value { return kline.Close }
case "high":
s.sourceGetter = func(kline *types.KLine) fixedpoint.Value { return kline.High }
case "low":
s.sourceGetter = func(kline *types.KLine) fixedpoint.Value { return kline.Low }
case "hl2":
s.sourceGetter = func(kline *types.KLine) fixedpoint.Value { return kline.High.Add(kline.Low).Div(fixedpoint.Two) }
case "hlc3":
s.sourceGetter = func(kline *types.KLine) fixedpoint.Value {
return kline.High.Add(kline.Low).Add(kline.Close).Div(fixedpoint.Three)
}
case "ohlc4":
s.sourceGetter = func(kline *types.KLine) fixedpoint.Value {
return kline.High.Add(kline.Low).Add(kline.Close).Add(kline.Open).Div(fixedpoint.Four)
}
case "open":
s.sourceGetter = func(kline *types.KLine) fixedpoint.Value { return kline.Open }
case "oc2":
s.sourceGetter = func(kline *types.KLine) fixedpoint.Value { return kline.Open.Add(kline.Close).Div(fixedpoint.Two) }
default:
log.Infof("source not set: %s, use hl2 by default", s.Source)
s.sourceGetter = func(kline *types.KLine) fixedpoint.Value { return kline.High.Add(kline.Low).Div(fixedpoint.Two) }
}
}
func (s *selectorInternal) String() string {
if s.Source == "" {
s.Source = "close"
s.init()
}
return s.Source
}
// lazy init if empty struct is passed in
func (s *SourceSelector) GetSource(kline *types.KLine) fixedpoint.Value {
if s.Source.Source == "" {
s.Source.Source = "close"
s.Source.init()
}
return s.Source.sourceGetter(kline)
}

34
pkg/bbgo/source_test.go Normal file
View File

@ -0,0 +1,34 @@
package bbgo
import (
"encoding/json"
"testing"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/stretchr/testify/assert"
)
func TestSource(t *testing.T) {
input := "{\"source\":\"high\"}"
type Strategy struct {
SourceSelector
}
s := Strategy{}
assert.NoError(t, json.Unmarshal([]byte(input), &s))
assert.Equal(t, s.Source.Source, "high")
assert.NotNil(t, s.Source.sourceGetter)
e, err := json.Marshal(&s)
assert.NoError(t, err)
assert.Equal(t, input, string(e))
input = "{}"
s = Strategy{}
assert.NoError(t, json.Unmarshal([]byte(input), &s))
assert.Equal(t, fixedpoint.Zero, s.GetSource(&types.KLine{}))
e, err = json.Marshal(&Strategy{})
assert.NoError(t, err)
assert.Equal(t, "{\"source\":\"close\"}", string(e))
}

View File

@ -8,6 +8,7 @@ import (
_ "github.com/c9s/bbgo/pkg/strategy/bollmaker"
_ "github.com/c9s/bbgo/pkg/strategy/dca"
_ "github.com/c9s/bbgo/pkg/strategy/drift"
_ "github.com/c9s/bbgo/pkg/strategy/elliottwave"
_ "github.com/c9s/bbgo/pkg/strategy/emastop"
_ "github.com/c9s/bbgo/pkg/strategy/etf"
_ "github.com/c9s/bbgo/pkg/strategy/ewoDgtrd"

View File

@ -1,6 +1,7 @@
package dynamic
import (
"encoding/json"
"fmt"
"io"
"reflect"
@ -16,7 +17,7 @@ import (
)
func DefaultWhiteList() []string {
return []string{"Window", "Interval", "Symbol"}
return []string{"Window", "RightWindow", "Interval", "Symbol", "Source"}
}
// @param s: strategy object
@ -96,6 +97,9 @@ func PrintConfig(s interface{}, f io.Writer, style *table.Style, withColor bool,
}
redundantSet[name] = struct{}{}
value := field.Field(j).Interface()
if e, err := json.Marshal(value); err == nil {
value = string(e)
}
values = append(values, types.JsonStruct{Key: fieldName, Json: name, Type: tt.Type.String(), Value: value})
}
}
@ -106,7 +110,11 @@ func PrintConfig(s interface{}, f io.Writer, style *table.Style, withColor bool,
continue
}
redundantSet[name] = struct{}{}
values = append(values, types.JsonStruct{Key: fieldName, Json: name, Type: t.Type.String(), Value: val.Field(i).Interface()})
value := val.Field(i).Interface()
if e, err := json.Marshal(value); err == nil {
value = string(e)
}
values = append(values, types.JsonStruct{Key: fieldName, Json: name, Type: t.Type.String(), Value: value})
}
}
sort.Sort(values)

7
pkg/fixedpoint/const.go Normal file
View File

@ -0,0 +1,7 @@
package fixedpoint
var (
Two Value = NewFromInt(2)
Three Value = NewFromInt(3)
Four Value = NewFromInt(3)
)

View File

@ -8,8 +8,8 @@ import (
"math"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/wcharczuk/go-chart/v2"
@ -36,14 +36,13 @@ func init() {
bbgo.RegisterStrategy(ID, &Strategy{})
}
type SourceFunc func(*types.KLine) fixedpoint.Value
type Strategy struct {
Symbol string `json:"symbol"`
bbgo.StrategyController
types.Market
types.IntervalWindow
bbgo.SourceSelector
*bbgo.Environment
*types.Position `persistence:"position"`
@ -52,6 +51,7 @@ type Strategy struct {
p *types.Position
priceLines *types.Queue
trendLine types.UpdatableSeriesExtend
ma types.UpdatableSeriesExtend
stdevHigh *indicator.StdDev
@ -62,6 +62,7 @@ type Strategy struct {
midPrice fixedpoint.Value
lock sync.RWMutex `ignore:"true"`
positionLock sync.RWMutex `ignore:"true"`
startTime time.Time
minutesCounter int
orderPendingCounter map[uint64]int
frameKLine *types.KLine
@ -69,7 +70,6 @@ type Strategy struct {
beta float64
Source string `json:"source,omitempty"`
StopLoss fixedpoint.Value `json:"stoploss"`
CanvasPath string `json:"canvasPath"`
PredictOffset int `json:"predictOffset"`
@ -113,7 +113,6 @@ type Strategy struct {
*bbgo.GeneralOrderExecutor
getLastPrice func() fixedpoint.Value
getSource SourceFunc
}
func (s *Strategy) ID() string {
@ -125,9 +124,9 @@ func (s *Strategy) InstanceID() string {
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{
Interval: s.Interval,
})
// by default, bbgo only pre-subscribe 1000 klines.
// this is not enough if we're subscribing 30m intervals using SerialMarketDataStore
bbgo.KLinePreloadLimit = int64((s.Interval.Minutes()*s.Window/1000 + 1) * 1000)
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{
Interval: types.Interval1m,
})
@ -173,39 +172,7 @@ func (s *Strategy) ClosePosition(ctx context.Context, percentage fixedpoint.Valu
}
}
func (s *Strategy) SourceFuncGenerator() SourceFunc {
switch strings.ToLower(s.Source) {
case "close":
return func(kline *types.KLine) fixedpoint.Value { return kline.Close }
case "high":
return func(kline *types.KLine) fixedpoint.Value { return kline.High }
case "low":
return func(kline *types.KLine) fixedpoint.Value { return kline.Low }
case "hl2":
return func(kline *types.KLine) fixedpoint.Value {
return kline.High.Add(kline.Low).Div(Two)
}
case "hlc3":
return func(kline *types.KLine) fixedpoint.Value {
return kline.High.Add(kline.Low).Add(kline.Close).Div(Three)
}
case "ohlc4":
return func(kline *types.KLine) fixedpoint.Value {
return kline.Open.Add(kline.High).Add(kline.Low).Add(kline.Close).Div(Four)
}
case "open":
return func(kline *types.KLine) fixedpoint.Value { return kline.Open }
case "":
log.Infof("source not set, use hl2 by default")
return func(kline *types.KLine) fixedpoint.Value {
return kline.High.Add(kline.Low).Div(Two)
}
default:
panic(fmt.Sprintf("Unable to parse: %s", s.Source))
}
}
func (s *Strategy) initIndicators(priceLines *types.Queue) error {
func (s *Strategy) initIndicators(store *bbgo.SerialMarketDataStore) error {
s.ma = &indicator.SMA{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}}
s.stdevHigh = &indicator.StdDev{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}}
s.stdevLow = &indicator.StdDev{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}}
@ -239,14 +206,13 @@ func (s *Strategy) initIndicators(priceLines *types.Queue) error {
s.atr = &indicator.ATR{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.ATRWindow}}
s.trendLine = &indicator.EWMA{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.TrendWindow}}
store, _ := s.Session.MarketDataStore(s.Symbol)
klines, ok := store.KLinesOfInterval(s.Interval)
if !ok {
klinesLength := len(*klines)
if !ok || klinesLength == 0 {
return errors.New("klines not exists")
}
for _, kline := range *klines {
source := s.getSource(&kline).Float64()
source := s.GetSource(&kline).Float64()
high := kline.High.Float64()
low := kline.Low.Float64()
s.ma.Update(source)
@ -255,17 +221,18 @@ func (s *Strategy) initIndicators(priceLines *types.Queue) error {
s.drift.Update(source, kline.Volume.Float64())
s.trendLine.Update(source)
s.atr.PushK(kline)
priceLines.Update(source)
s.priceLines.Update(source)
}
if s.frameKLine != nil && klines != nil {
s.frameKLine.Set(&(*klines)[len(*klines)-1])
}
klines, ok = store.KLinesOfInterval(types.Interval1m)
if !ok {
klinesLength = len(*klines)
if !ok || klinesLength == 0 {
return errors.New("klines not exists")
}
for _, kline := range *klines {
source := s.getSource(&kline).Float64()
source := s.GetSource(&kline).Float64()
s.drift1m.Update(source, kline.Volume.Float64())
if s.drift1m.Last() != s.drift1m.Last() {
panic(fmt.Sprintf("%f %v %f %f", source, s.drift1m.drift.Values.Index(1), s.drift1m.ma2.Last(), s.drift1m.drift.LastValue))
@ -274,6 +241,7 @@ func (s *Strategy) initIndicators(priceLines *types.Queue) error {
if s.kline1m != nil && klines != nil {
s.kline1m.Set(&(*klines)[len(*klines)-1])
}
s.startTime = s.kline1m.StartTime.Time().Add(s.kline1m.Interval.Duration())
return nil
}
@ -287,7 +255,10 @@ func (s *Strategy) smartCancel(ctx context.Context, pricef, atr float64) (int, e
drift := s.drift1m.Array(2)
for _, order := range nonTraded {
log.Warnf("%v", order)
if order.Status != types.OrderStatusNew && order.Status != types.OrderStatusPartiallyFilled {
continue
}
log.Warnf("%v | counter: %d, system: %d", order, s.orderPendingCounter[order.OrderID], s.minutesCounter)
if s.minutesCounter-s.orderPendingCounter[order.OrderID] > s.PendingMinutes {
if order.Side == types.SideTypeBuy && drift[1] < drift[0] {
continue
@ -438,15 +409,15 @@ func (s *Strategy) initTickerFunctions(ctx context.Context) {
}
func (s *Strategy) DrawIndicators(time types.Time, priceLine types.SeriesExtend) *types.Canvas {
func (s *Strategy) DrawIndicators(time types.Time) *types.Canvas {
canvas := types.NewCanvas(s.InstanceID(), s.Interval)
Length := priceLine.Length()
Length := s.priceLines.Length()
if Length > 300 {
Length = 300
}
log.Infof("draw indicators with %d data", Length)
mean := priceLine.Mean(Length)
highestPrice := priceLine.Minus(mean).Abs().Highest(Length)
mean := s.priceLines.Mean(Length)
highestPrice := s.priceLines.Minus(mean).Abs().Highest(Length)
highestDrift := s.drift.Abs().Highest(Length)
hi := s.drift.drift.Abs().Highest(Length)
h1m := s.drift1m.Abs().Highest(Length * s.Interval.Minutes())
@ -459,7 +430,7 @@ func (s *Strategy) DrawIndicators(time types.Time, priceLine types.SeriesExtend)
canvas.Plot("driftOrig", s.drift.drift.Mul(highestPrice/hi).Add(mean), time, Length)
canvas.Plot("drift1m", s.drift1m.Mul(highestPrice/h1m).Add(mean), time, Length*s.Interval.Minutes(), types.Interval1m)
canvas.Plot("zero", types.NumberSeries(mean), time, Length)
canvas.Plot("price", priceLine, time, Length)
canvas.Plot("price", s.priceLines, time, Length)
return canvas
}
@ -498,8 +469,8 @@ func (s *Strategy) DrawCumPNL(cumProfit types.Series) *types.Canvas {
return canvas
}
func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit types.Series, cumProfit types.Series) {
canvas := s.DrawIndicators(time, priceLine)
func (s *Strategy) Draw(time types.Time, profit types.Series, cumProfit types.Series) {
canvas := s.DrawIndicators(time)
f, err := os.Create(s.CanvasPath)
if err != nil {
log.WithError(err).Errorf("cannot create on %s", s.CanvasPath)
@ -595,6 +566,224 @@ func (s *Strategy) CalcAssetValue(price fixedpoint.Value) fixedpoint.Value {
return balances[s.Market.BaseCurrency].Total().Mul(price).Add(balances[s.Market.QuoteCurrency].Total())
}
func (s *Strategy) klineHandler1m(ctx context.Context, kline types.KLine) {
s.kline1m.Set(&kline)
s.drift1m.Update(s.GetSource(&kline).Float64(), kline.Volume.Float64())
if s.Status != types.StrategyStatusRunning {
return
}
if s.NoTrailingStopLoss || s.TrailingStopLossType == "realtime" {
return
}
// for doing the trailing stoploss during backtesting
atr := s.atr.Last()
price := s.getLastPrice()
pricef := price.Float64()
stoploss := s.StopLoss.Float64()
var err error
numPending := 0
if numPending, err = s.smartCancel(ctx, pricef, atr); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
return
}
if numPending > 0 {
log.Infof("pending orders: %d, exit", numPending)
return
}
lowf := math.Min(kline.Low.Float64(), pricef)
highf := math.Max(kline.High.Float64(), pricef)
s.positionLock.Lock()
if s.lowestPrice > 0 && lowf < s.lowestPrice {
s.lowestPrice = lowf
}
if s.highestPrice > 0 && highf > s.highestPrice {
s.highestPrice = highf
}
//log.Infof("d1m: %f, hf: %f, lf: %f", s.drift1m.Last(), highf, lowf)
exitShortCondition := s.sellPrice > 0 && (s.sellPrice*(1.+stoploss) <= highf ||
s.trailingCheck(highf, "short") /* || s.drift1m.Last() > 0*/)
exitLongCondition := s.buyPrice > 0 && (s.buyPrice*(1.-stoploss) >= lowf ||
s.trailingCheck(lowf, "long") /* || s.drift1m.Last() < 0*/)
if exitShortCondition || exitLongCondition {
s.positionLock.Unlock()
_ = s.ClosePosition(ctx, fixedpoint.One)
} else {
s.positionLock.Unlock()
}
}
func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine) {
var driftPred, atr float64
var drift []float64
s.frameKLine.Set(&kline)
source := s.GetSource(s.frameKLine)
sourcef := source.Float64()
s.priceLines.Update(sourcef)
s.ma.Update(sourcef)
s.trendLine.Update(sourcef)
s.drift.Update(sourcef, kline.Volume.Float64())
s.atr.PushK(kline)
drift = s.drift.Array(2)
ddrift := s.drift.drift.Array(2)
driftPred = s.drift.Predict(s.PredictOffset)
ddriftPred := s.drift.drift.Predict(s.PredictOffset)
atr = s.atr.Last()
price := s.getLastPrice()
pricef := price.Float64()
lowf := math.Min(kline.Low.Float64(), pricef)
highf := math.Max(kline.High.Float64(), pricef)
lowdiff := s.ma.Last() - lowf
s.stdevLow.Update(lowdiff)
highdiff := highf - s.ma.Last()
s.stdevHigh.Update(highdiff)
if s.Status != types.StrategyStatusRunning {
return
}
stoploss := s.StopLoss.Float64()
s.positionLock.Lock()
log.Errorf("highdiff: %3.2f ma: %.2f, close: %8v, high: %8v, low: %8v, time: %v %v", s.stdevHigh.Last(), s.ma.Last(), kline.Close, kline.High, kline.Low, kline.StartTime, kline.EndTime)
if s.lowestPrice > 0 && lowf < s.lowestPrice {
s.lowestPrice = lowf
}
if s.highestPrice > 0 && highf > s.highestPrice {
s.highestPrice = highf
}
if !s.NoRebalance {
s.Rebalance(ctx)
}
balances := s.GeneralOrderExecutor.Session().GetAccount().Balances()
bbgo.Notify("source: %.4f, price: %.4f, driftPred: %.4f, ddriftPred: %.4f, drift[1]: %.4f, ddrift[1]: %.4f, atr: %.4f, lowf %.4f, highf: %.4f lowest: %.4f highest: %.4f sp %.4f bp %.4f",
sourcef, pricef, driftPred, ddriftPred, drift[1], ddrift[1], atr, lowf, highf, s.lowestPrice, s.highestPrice, s.sellPrice, s.buyPrice)
// Notify will parse args to strings and process separately
bbgo.Notify("balances: [Total] %v %s [Base] %s(%v %s) [Quote] %s",
s.CalcAssetValue(price),
s.Market.QuoteCurrency,
balances[s.Market.BaseCurrency].String(),
balances[s.Market.BaseCurrency].Total().Mul(price),
s.Market.QuoteCurrency,
balances[s.Market.QuoteCurrency].String(),
)
shortCondition := (drift[1] >= s.DriftFilterNeg || ddrift[1] >= 0) && (driftPred <= s.DDriftFilterNeg || ddriftPred <= 0) || drift[1] < 0 && drift[0] < 0
longCondition := (drift[1] <= s.DriftFilterPos || ddrift[1] <= 0) && (driftPred >= s.DDriftFilterPos || ddriftPred >= 0) || drift[1] > 0 && drift[0] > 0
if shortCondition && longCondition {
if drift[1] > drift[0] {
longCondition = false
} else {
shortCondition = false
}
}
exitShortCondition := s.sellPrice > 0 && !shortCondition && !longCondition && (s.sellPrice*(1.+stoploss) <= highf ||
s.trailingCheck(pricef, "short"))
exitLongCondition := s.buyPrice > 0 && !longCondition && !shortCondition && (s.buyPrice*(1.-stoploss) >= lowf ||
s.trailingCheck(pricef, "long"))
if exitShortCondition || exitLongCondition {
if err := s.GeneralOrderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
s.positionLock.Unlock()
return
}
s.positionLock.Unlock()
_ = s.ClosePosition(ctx, fixedpoint.One)
return
}
if longCondition {
if err := s.GeneralOrderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
s.positionLock.Unlock()
return
}
source = source.Sub(fixedpoint.NewFromFloat(s.stdevLow.Last() * s.HighLowVarianceMultiplier))
if source.Compare(price) > 0 {
source = price
}
sourcef = source.Float64()
log.Infof("source in long %v %v %f", source, price, s.stdevLow.Last())
quoteBalance, ok := s.Session.GetAccount().Balance(s.Market.QuoteCurrency)
if !ok {
log.Errorf("unable to get quoteCurrency")
s.positionLock.Unlock()
return
}
if s.Market.IsDustQuantity(
quoteBalance.Available.Div(source), source) {
s.positionLock.Unlock()
return
}
s.positionLock.Unlock()
quantity := quoteBalance.Available.Div(source)
createdOrders, err := s.GeneralOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeBuy,
Type: types.OrderTypeLimit,
Price: source,
Quantity: quantity,
Tag: "long",
})
log.Infof("orders %v", createdOrders)
if err != nil {
log.WithError(err).Errorf("cannot place buy order")
return
}
s.orderPendingCounter[createdOrders[0].OrderID] = s.minutesCounter
return
}
if shortCondition {
if err := s.GeneralOrderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
s.positionLock.Unlock()
return
}
baseBalance, ok := s.Session.GetAccount().Balance(s.Market.BaseCurrency)
if !ok {
log.Errorf("unable to get baseBalance")
s.positionLock.Unlock()
return
}
source = source.Add(fixedpoint.NewFromFloat(s.stdevHigh.Last() * s.HighLowVarianceMultiplier))
if source.Compare(price) < 0 {
source = price
}
sourcef = source.Float64()
log.Infof("source in short: %v", source)
if s.Market.IsDustQuantity(baseBalance.Available, source) {
s.positionLock.Unlock()
return
}
s.positionLock.Unlock()
// Cleanup pending StopOrders
quantity := baseBalance.Available
createdOrders, err := s.GeneralOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeSell,
Type: types.OrderTypeLimit,
Price: source,
Quantity: quantity,
Tag: "short",
})
if err != nil {
log.WithError(err).Errorf("cannot place sell order")
return
}
s.orderPendingCounter[createdOrders[0].OrderID] = s.minutesCounter
return
}
s.positionLock.Unlock()
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
instanceID := s.InstanceID()
// Will be set by persistence if there's any from DB
@ -613,14 +802,8 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
if s.TradeStats == nil {
s.TradeStats = types.NewTradeStats(s.Symbol)
}
startTime := s.Environment.StartTime()
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, startTime))
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1w, startTime))
// StrategyController
s.Status = types.StrategyStatusRunning
// Get source function from config input
s.getSource = s.SourceFuncGenerator()
s.OnSuspend(func() {
_ = s.GeneralOrderExecutor.GracefulCancel(ctx)
@ -729,21 +912,20 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.frameKLine = &types.KLine{}
s.kline1m = &types.KLine{}
priceLine := types.NewQueue(300)
if err := s.initIndicators(priceLine); err != nil {
log.WithError(err).Errorf("initIndicator failed")
return nil
}
s.initTickerFunctions(ctx)
s.priceLines = types.NewQueue(300)
s.initTickerFunctions(ctx)
startTime := s.Environment.StartTime()
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, startTime))
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1w, startTime))
stoploss := s.StopLoss.Float64()
// default value: use 1m kline
if !s.NoTrailingStopLoss && s.IsBackTesting() || s.TrailingStopLossType == "" {
s.TrailingStopLossType = "kline"
}
bbgo.RegisterCommand("/draw", "Draw Indicators", func(reply interact.Reply) {
canvas := s.DrawIndicators(s.frameKLine.StartTime, priceLine)
canvas := s.DrawIndicators(s.frameKLine.StartTime)
var buffer bytes.Buffer
if err := canvas.Render(chart.PNG, &buffer); err != nil {
log.WithError(err).Errorf("cannot render indicators in drift")
@ -798,225 +980,22 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
reply.Message(buffer.String())
})
session.MarketDataStream.OnKLineClosed(func(kline types.KLine) {
if kline.Symbol != s.Symbol {
return
}
var driftPred, atr float64
var drift []float64
if !kline.Closed {
return
}
// event trigger order: s.Interval => Interval1m
store, ok := session.SerialMarketDataStore(s.Symbol, []types.Interval{s.Interval, types.Interval1m})
if !ok {
panic("cannot get 1m history")
}
if err := s.initIndicators(store); err != nil {
log.WithError(err).Errorf("initIndicator failed")
return nil
}
store.OnKLineClosed(func(kline types.KLine) {
s.minutesCounter = int(kline.StartTime.Time().Add(kline.Interval.Duration()).Sub(s.startTime).Minutes())
if kline.Interval == types.Interval1m {
s.kline1m.Set(&kline)
s.drift1m.Update(s.getSource(&kline).Float64(), kline.Volume.Float64())
s.minutesCounter += 1
if s.Status != types.StrategyStatusRunning {
return
}
if s.NoTrailingStopLoss || s.TrailingStopLossType == "realtime" {
return
}
// for doing the trailing stoploss during backtesting
atr = s.atr.Last()
price := s.getLastPrice()
pricef := price.Float64()
var err error
numPending := 0
if numPending, err = s.smartCancel(ctx, pricef, atr); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
return
}
if numPending > 0 {
log.Infof("pending orders: %d, exit", numPending)
return
}
lowf := math.Min(kline.Low.Float64(), pricef)
highf := math.Max(kline.High.Float64(), pricef)
s.positionLock.Lock()
if s.lowestPrice > 0 && lowf < s.lowestPrice {
s.lowestPrice = lowf
}
if s.highestPrice > 0 && highf > s.highestPrice {
s.highestPrice = highf
}
exitShortCondition := s.sellPrice > 0 && (s.sellPrice*(1.+stoploss) <= highf ||
s.trailingCheck(highf, "short") || s.drift1m.Last() > 0)
exitLongCondition := s.buyPrice > 0 && (s.buyPrice*(1.-stoploss) >= lowf ||
s.trailingCheck(lowf, "long") || s.drift1m.Last() < 0)
if exitShortCondition || exitLongCondition {
s.positionLock.Unlock()
_ = s.ClosePosition(ctx, fixedpoint.One)
} else {
s.positionLock.Unlock()
}
return
s.klineHandler1m(ctx, kline)
} else if kline.Interval == s.Interval {
s.klineHandler(ctx, kline)
}
if kline.Interval != s.Interval {
return
}
s.frameKLine.Set(&kline)
source := s.getSource(s.frameKLine)
sourcef := source.Float64()
priceLine.Update(sourcef)
s.ma.Update(sourcef)
s.trendLine.Update(sourcef)
s.drift.Update(sourcef, kline.Volume.Float64())
s.atr.PushK(kline)
drift = s.drift.Array(2)
ddrift := s.drift.drift.Array(2)
driftPred = s.drift.Predict(s.PredictOffset)
ddriftPred := s.drift.drift.Predict(s.PredictOffset)
atr = s.atr.Last()
price := s.getLastPrice()
pricef := price.Float64()
lowf := math.Min(kline.Low.Float64(), pricef)
highf := math.Max(kline.High.Float64(), pricef)
lowdiff := s.ma.Last() - lowf
s.stdevLow.Update(lowdiff)
highdiff := highf - s.ma.Last()
s.stdevHigh.Update(highdiff)
if s.Status != types.StrategyStatusRunning {
return
}
s.positionLock.Lock()
log.Errorf("highdiff: %3.2f ma: %.2f, close: %8v, high: %8v, low: %8v, time: %v %v", s.stdevHigh.Last(), s.ma.Last(), kline.Close, kline.High, kline.Low, kline.StartTime, kline.EndTime)
if s.lowestPrice > 0 && lowf < s.lowestPrice {
s.lowestPrice = lowf
}
if s.highestPrice > 0 && highf > s.highestPrice {
s.highestPrice = highf
}
if !s.NoRebalance {
s.Rebalance(ctx)
}
balances := s.GeneralOrderExecutor.Session().GetAccount().Balances()
bbgo.Notify("source: %.4f, price: %.4f, driftPred: %.4f, ddriftPred: %.4f, drift[1]: %.4f, ddrift[1]: %.4f, atr: %.4f, lowf %.4f, highf: %.4f lowest: %.4f highest: %.4f sp %.4f bp %.4f",
sourcef, pricef, driftPred, ddriftPred, drift[1], ddrift[1], atr, lowf, highf, s.lowestPrice, s.highestPrice, s.sellPrice, s.buyPrice)
// Notify will parse args to strings and process separately
bbgo.Notify("balances: [Total] %v %s [Base] %s(%v %s) [Quote] %s",
s.CalcAssetValue(price),
s.Market.QuoteCurrency,
balances[s.Market.BaseCurrency].String(),
balances[s.Market.BaseCurrency].Total().Mul(price),
s.Market.QuoteCurrency,
balances[s.Market.QuoteCurrency].String(),
)
shortCondition := (drift[1] >= s.DriftFilterNeg || ddrift[1] >= 0) && (driftPred <= s.DDriftFilterNeg || ddriftPred <= 0) || drift[1] < 0 && drift[0] < 0
longCondition := (drift[1] <= s.DriftFilterPos || ddrift[1] <= 0) && (driftPred >= s.DDriftFilterPos || ddriftPred >= 0) || drift[1] > 0 && drift[0] > 0
if shortCondition && longCondition {
if drift[1] > drift[0] {
longCondition = false
} else {
shortCondition = false
}
}
exitShortCondition := s.sellPrice > 0 && !shortCondition && !longCondition && (s.sellPrice*(1.+stoploss) <= highf ||
s.trailingCheck(pricef, "short"))
exitLongCondition := s.buyPrice > 0 && !longCondition && !shortCondition && (s.buyPrice*(1.-stoploss) >= lowf ||
s.trailingCheck(pricef, "long"))
if exitShortCondition || exitLongCondition {
if err := s.GeneralOrderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
s.positionLock.Unlock()
return
}
s.positionLock.Unlock()
_ = s.ClosePosition(ctx, fixedpoint.One)
return
}
if longCondition {
if err := s.GeneralOrderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
s.positionLock.Unlock()
return
}
source = source.Sub(fixedpoint.NewFromFloat(s.stdevLow.Last() * s.HighLowVarianceMultiplier))
if source.Compare(price) > 0 {
source = price
}
sourcef = source.Float64()
quoteBalance, ok := s.Session.GetAccount().Balance(s.Market.QuoteCurrency)
if !ok {
log.Errorf("unable to get quoteCurrency")
s.positionLock.Unlock()
return
}
if s.Market.IsDustQuantity(
quoteBalance.Available.Div(source), source) {
s.positionLock.Unlock()
return
}
s.positionLock.Unlock()
quantity := quoteBalance.Available.Div(source)
createdOrders, err := s.GeneralOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeBuy,
Type: types.OrderTypeLimit,
Price: source,
Quantity: quantity,
Tag: "long",
})
if err != nil {
log.WithError(err).Errorf("cannot place buy order")
return
}
s.orderPendingCounter[createdOrders[0].OrderID] = s.minutesCounter
return
}
if shortCondition {
if err := s.GeneralOrderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
s.positionLock.Unlock()
return
}
baseBalance, ok := s.Session.GetAccount().Balance(s.Market.BaseCurrency)
if !ok {
log.Errorf("unable to get baseBalance")
s.positionLock.Unlock()
return
}
source = source.Add(fixedpoint.NewFromFloat(s.stdevHigh.Last() * s.HighLowVarianceMultiplier))
if source.Compare(price) < 0 {
source = price
}
sourcef = source.Float64()
if s.Market.IsDustQuantity(baseBalance.Available, source) {
s.positionLock.Unlock()
return
}
s.positionLock.Unlock()
// Cleanup pending StopOrders
quantity := baseBalance.Available
createdOrders, err := s.GeneralOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeSell,
Type: types.OrderTypeLimit,
Price: source,
Quantity: quantity,
Tag: "short",
})
if err != nil {
log.WithError(err).Errorf("cannot place sell order")
return
}
s.orderPendingCounter[createdOrders[0].OrderID] = s.minutesCounter
return
}
s.positionLock.Unlock()
})
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
@ -1034,7 +1013,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
os.Stdout.Write(buffer.Bytes())
if s.GenerateGraph {
s.Draw(s.frameKLine.StartTime, priceLine, &profit, &cumProfit)
s.Draw(s.frameKLine.StartTime, &profit, &cumProfit)
}
wg.Done()

View File

@ -0,0 +1,138 @@
package elliottwave
import (
"bytes"
"fmt"
"os"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/interact"
"github.com/c9s/bbgo/pkg/types"
"github.com/wcharczuk/go-chart/v2"
)
func (s *Strategy) InitDrawCommands(store *bbgo.SerialMarketDataStore, profit, cumProfit types.Series) {
bbgo.RegisterCommand("/draw", "Draw Indicators", func(reply interact.Reply) {
canvas := s.DrawIndicators(store)
if canvas == nil {
reply.Message("cannot render indicators")
return
}
var buffer bytes.Buffer
if err := canvas.Render(chart.PNG, &buffer); err != nil {
log.WithError(err).Errorf("cannot render indicators in ewo")
reply.Message(fmt.Sprintf("[error] cannot render indicators in ewo: %v", err))
return
}
bbgo.SendPhoto(&buffer)
})
bbgo.RegisterCommand("/pnl", "Draw PNL(%) per trade", func(reply interact.Reply) {
canvas := s.DrawPNL(profit)
var buffer bytes.Buffer
if err := canvas.Render(chart.PNG, &buffer); err != nil {
log.WithError(err).Errorf("cannot render pnl in drift")
reply.Message(fmt.Sprintf("[error] cannot render pnl in ewo: %v", err))
return
}
bbgo.SendPhoto(&buffer)
})
bbgo.RegisterCommand("/cumpnl", "Draw Cummulative PNL(Quote)", func(reply interact.Reply) {
canvas := s.DrawCumPNL(cumProfit)
var buffer bytes.Buffer
if err := canvas.Render(chart.PNG, &buffer); err != nil {
log.WithError(err).Errorf("cannot render cumpnl in drift")
reply.Message(fmt.Sprintf("[error] canot render cumpnl in drift: %v", err))
return
}
bbgo.SendPhoto(&buffer)
})
}
func (s *Strategy) DrawIndicators(store *bbgo.SerialMarketDataStore) *types.Canvas {
klines, ok := store.KLinesOfInterval(types.Interval1m)
if !ok {
return nil
}
time := (*klines)[len(*klines)-1].StartTime
canvas := types.NewCanvas(s.InstanceID(), s.Interval)
Length := s.priceLines.Length()
if Length > 300 {
Length = 300
}
log.Infof("draw indicators with %d data", Length)
mean := s.priceLines.Mean(Length)
high := s.priceLines.Highest(Length)
low := s.priceLines.Lowest(Length)
ehigh := types.Highest(s.ewo, Length)
elow := types.Lowest(s.ewo, Length)
canvas.Plot("ewo", types.Add(types.Mul(s.ewo, (high-low)/(ehigh-elow)), mean), time, Length)
canvas.Plot("zero", types.NumberSeries(mean), time, Length)
canvas.Plot("price", s.priceLines, time, Length)
return canvas
}
func (s *Strategy) DrawPNL(profit types.Series) *types.Canvas {
canvas := types.NewCanvas(s.InstanceID())
length := profit.Length()
log.Errorf("pnl Highest: %f, Lowest: %f", types.Highest(profit, length), types.Lowest(profit, length))
canvas.PlotRaw("pnl %", profit, length)
canvas.YAxis = chart.YAxis{
ValueFormatter: func(v interface{}) string {
if vf, isFloat := v.(float64); isFloat {
return fmt.Sprintf("%.4f", vf)
}
return ""
},
}
canvas.PlotRaw("1", types.NumberSeries(1), length)
return canvas
}
func (s *Strategy) DrawCumPNL(cumProfit types.Series) *types.Canvas {
canvas := types.NewCanvas(s.InstanceID())
canvas.PlotRaw("cummulative pnl", cumProfit, cumProfit.Length())
canvas.YAxis = chart.YAxis{
ValueFormatter: func(v interface{}) string {
if vf, isFloat := v.(float64); isFloat {
return fmt.Sprintf("%.4f", vf)
}
return ""
},
}
return canvas
}
func (s *Strategy) Draw(store *bbgo.SerialMarketDataStore, profit, cumProfit types.Series) {
canvas := s.DrawIndicators(store)
f, err := os.Create(s.GraphIndicatorPath)
if err != nil {
log.WithError(err).Errorf("cannot create on path " + s.GraphIndicatorPath)
return
}
defer f.Close()
if err = canvas.Render(chart.PNG, f); err != nil {
log.WithError(err).Errorf("cannot render elliottwave")
}
canvas = s.DrawPNL(profit)
f, err = os.Create(s.GraphPNLPath)
if err != nil {
log.WithError(err).Errorf("cannot create on path " + s.GraphPNLPath)
return
}
defer f.Close()
if err = canvas.Render(chart.PNG, f); err != nil {
log.WithError(err).Errorf("cannot render pnl")
return
}
canvas = s.DrawCumPNL(cumProfit)
f, err = os.Create(s.GraphCumPNLPath)
if err != nil {
log.WithError(err).Errorf("cannot create on path " + s.GraphCumPNLPath)
return
}
defer f.Close()
if err = canvas.Render(chart.PNG, f); err != nil {
log.WithError(err).Errorf("cannot render cumpnl")
}
}

View File

@ -0,0 +1,25 @@
package elliottwave
import "github.com/c9s/bbgo/pkg/indicator"
type ElliottWave struct {
maSlow *indicator.SMA
maQuick *indicator.SMA
}
func (s *ElliottWave) Index(i int) float64 {
return s.maQuick.Index(i)/s.maSlow.Index(i) - 1.0
}
func (s *ElliottWave) Last() float64 {
return s.maQuick.Last()/s.maSlow.Last() - 1.0
}
func (s *ElliottWave) Length() int {
return s.maSlow.Length()
}
func (s *ElliottWave) Update(v float64) {
s.maSlow.Update(v)
s.maQuick.Update(v)
}

View File

@ -0,0 +1,46 @@
package elliottwave
import (
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type HeikinAshi struct {
Values []types.KLine
size int
}
func NewHeikinAshi(size int) *HeikinAshi {
return &HeikinAshi{
Values: make([]types.KLine, size),
size: size,
}
}
func (inc *HeikinAshi) Last() *types.KLine {
if len(inc.Values) == 0 {
return &types.KLine{}
}
return &inc.Values[len(inc.Values)-1]
}
func (inc *HeikinAshi) Update(kline types.KLine) {
open := kline.Open
cloze := kline.Close
high := kline.High
low := kline.Low
lastOpen := inc.Last().Open
lastClose := inc.Last().Close
newClose := open.Add(high).Add(low).Add(cloze).Div(Four)
newOpen := lastOpen.Add(lastClose).Div(Two)
kline.Close = newClose
kline.Open = newOpen
kline.High = fixedpoint.Max(fixedpoint.Max(high, newOpen), newClose)
kline.Low = fixedpoint.Max(fixedpoint.Min(low, newOpen), newClose)
inc.Values = append(inc.Values, kline)
if len(inc.Values) > inc.size {
inc.Values = inc.Values[len(inc.Values)-inc.size:]
}
}

View File

@ -0,0 +1,554 @@
package elliottwave
import (
"bytes"
"context"
"errors"
"fmt"
"math"
"os"
"sync"
"time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/datatype/floats"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/sirupsen/logrus"
)
const ID = "elliottwave"
var log = logrus.WithField("strategy", ID)
var Two fixedpoint.Value = fixedpoint.NewFromInt(2)
var Three fixedpoint.Value = fixedpoint.NewFromInt(3)
var Four fixedpoint.Value = fixedpoint.NewFromInt(4)
var Delta fixedpoint.Value = fixedpoint.NewFromFloat(0.00001)
func init() {
bbgo.RegisterStrategy(ID, &Strategy{})
}
type SourceFunc func(*types.KLine) fixedpoint.Value
type Strategy struct {
Symbol string `json:"symbol"`
bbgo.StrategyController
bbgo.SourceSelector
types.Market
Session *bbgo.ExchangeSession
Interval types.Interval `json:"interval"`
Stoploss fixedpoint.Value `json:"stoploss"`
WindowATR int `json:"windowATR"`
WindowQuick int `json:"windowQuick"`
WindowSlow int `json:"windowSlow"`
PendingMinutes int `json:"pendingMinutes"`
UseHeikinAshi bool `json:"useHeikinAshi"`
// whether to draw graph or not by the end of backtest
DrawGraph bool `json:"drawGraph"`
GraphIndicatorPath string `json:"graphIndicatorPath"`
GraphPNLPath string `json:"graphPNLPath"`
GraphCumPNLPath string `json:"graphCumPNLPath"`
*bbgo.Environment
*bbgo.GeneralOrderExecutor
*types.Position `persistence:"position"`
*types.ProfitStats `persistence:"profit_stats"`
*types.TradeStats `persistence:"trade_stats"`
ewo *ElliottWave
atr *indicator.ATR
heikinAshi *HeikinAshi
priceLines *types.Queue
getLastPrice func() fixedpoint.Value
// for smart cancel
orderPendingCounter map[uint64]int
startTime time.Time
minutesCounter int
// for position
buyPrice float64 `persistence:"buy_price"`
sellPrice float64 `persistence:"sell_price"`
highestPrice float64 `persistence:"highest_price"`
lowestPrice float64 `persistence:"lowest_price"`
TrailingCallbackRate []float64 `json:"trailingCallbackRate"`
TrailingActivationRatio []float64 `json:"trailingActivationRatio"`
ExitMethods bbgo.ExitMethodSet `json:"exits"`
midPrice fixedpoint.Value
lock sync.RWMutex `ignore:"true"`
}
func (s *Strategy) ID() string {
return ID
}
func (s *Strategy) InstanceID() string {
return fmt.Sprintf("%s:%s:%v", ID, s.Symbol, bbgo.IsBackTesting)
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
// by default, bbgo only pre-subscribe 1000 klines.
// this is not enough if we're subscribing 30m intervals using SerialMarketDataStore
bbgo.KLinePreloadLimit = int64((s.Interval.Minutes()*s.WindowSlow/1000 + 1) + 1000)
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{
Interval: types.Interval1m,
})
if !bbgo.IsBackTesting {
session.Subscribe(types.BookTickerChannel, s.Symbol, types.SubscribeOptions{})
}
s.ExitMethods.SetAndSubscribe(session, s)
}
func (s *Strategy) CurrentPosition() *types.Position {
return s.Position
}
func (s *Strategy) ClosePosition(ctx context.Context, percentage fixedpoint.Value) error {
order := s.Position.NewMarketCloseOrder(percentage)
if order == nil {
return nil
}
order.Tag = "close"
order.TimeInForce = ""
balances := s.GeneralOrderExecutor.Session().GetAccount().Balances()
baseBalance := balances[s.Market.BaseCurrency].Available
price := s.getLastPrice()
if order.Side == types.SideTypeBuy {
quoteAmount := balances[s.Market.QuoteCurrency].Available.Div(price)
if order.Quantity.Compare(quoteAmount) > 0 {
order.Quantity = quoteAmount
}
} else if order.Side == types.SideTypeSell && order.Quantity.Compare(baseBalance) > 0 {
order.Quantity = baseBalance
}
for {
if s.Market.IsDustQuantity(order.Quantity, price) {
return nil
}
_, err := s.GeneralOrderExecutor.SubmitOrders(ctx, *order)
if err != nil {
order.Quantity = order.Quantity.Mul(fixedpoint.One.Sub(Delta))
continue
}
return nil
}
}
func (s *Strategy) initIndicators(store *bbgo.SerialMarketDataStore) error {
s.priceLines = types.NewQueue(300)
maSlow := &indicator.SMA{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.WindowSlow}}
maQuick := &indicator.SMA{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.WindowQuick}}
s.ewo = &ElliottWave{
maSlow, maQuick,
}
s.atr = &indicator.ATR{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.WindowATR}}
klines, ok := store.KLinesOfInterval(s.Interval)
klineLength := len(*klines)
if !ok || klineLength == 0 {
return errors.New("klines not exists")
}
tmpK := (*klines)[klineLength-1]
s.startTime = tmpK.StartTime.Time().Add(tmpK.Interval.Duration())
s.heikinAshi = NewHeikinAshi(500)
for _, kline := range *klines {
source := s.GetSource(&kline).Float64()
s.ewo.Update(source)
s.atr.PushK(kline)
s.priceLines.Update(source)
s.heikinAshi.Update(kline)
}
return nil
}
// FIXME: stdevHigh
func (s *Strategy) smartCancel(ctx context.Context, pricef float64) int {
nonTraded := s.GeneralOrderExecutor.ActiveMakerOrders().Orders()
if len(nonTraded) > 0 {
left := 0
for _, order := range nonTraded {
if order.Status != types.OrderStatusNew && order.Status != types.OrderStatusPartiallyFilled {
continue
}
log.Warnf("%v | counter: %d, system: %d", order, s.orderPendingCounter[order.OrderID], s.minutesCounter)
toCancel := false
if s.minutesCounter-s.orderPendingCounter[order.OrderID] >= s.PendingMinutes {
toCancel = true
} else if order.Side == types.SideTypeBuy {
if order.Price.Float64()+s.atr.Last()*2 <= pricef {
toCancel = true
}
} else if order.Side == types.SideTypeSell {
// 75% of the probability
if order.Price.Float64()-s.atr.Last()*2 >= pricef {
toCancel = true
}
} else {
panic("not supported side for the order")
}
if toCancel {
err := s.GeneralOrderExecutor.GracefulCancelOrder(ctx, order)
if err == nil {
delete(s.orderPendingCounter, order.OrderID)
} else {
log.WithError(err).Errorf("failed to cancel %v", order.OrderID)
}
log.Warnf("cancel %v", order.OrderID)
} else {
left += 1
}
}
return left
}
return len(nonTraded)
}
func (s *Strategy) trailingCheck(price float64, direction string) bool {
if s.highestPrice > 0 && s.highestPrice < price {
s.highestPrice = price
}
if s.lowestPrice > 0 && s.lowestPrice > price {
s.lowestPrice = price
}
isShort := direction == "short"
for i := len(s.TrailingCallbackRate) - 1; i >= 0; i-- {
trailingCallbackRate := s.TrailingCallbackRate[i]
trailingActivationRatio := s.TrailingActivationRatio[i]
if isShort {
if (s.sellPrice-s.lowestPrice)/s.lowestPrice > trailingActivationRatio {
return (price-s.lowestPrice)/s.lowestPrice > trailingCallbackRate
}
} else {
if (s.highestPrice-s.buyPrice)/s.buyPrice > trailingActivationRatio {
return (s.highestPrice-price)/price > trailingCallbackRate
}
}
}
return false
}
func (s *Strategy) initTickerFunctions() {
if s.IsBackTesting() {
s.getLastPrice = func() fixedpoint.Value {
lastPrice, ok := s.Session.LastPrice(s.Symbol)
if !ok {
log.Error("cannot get lastprice")
}
return lastPrice
}
} else {
s.Session.MarketDataStream.OnBookTickerUpdate(func(ticker types.BookTicker) {
bestBid := ticker.Buy
bestAsk := ticker.Sell
if !util.TryLock(&s.lock) {
return
}
if !bestAsk.IsZero() && !bestBid.IsZero() {
s.midPrice = bestAsk.Add(bestBid).Div(Two)
} else if !bestAsk.IsZero() {
s.midPrice = bestAsk
} else if !bestBid.IsZero() {
s.midPrice = bestBid
}
s.lock.Unlock()
})
s.getLastPrice = func() (lastPrice fixedpoint.Value) {
var ok bool
s.lock.RLock()
defer s.lock.RUnlock()
if s.midPrice.IsZero() {
lastPrice, ok = s.Session.LastPrice(s.Symbol)
if !ok {
log.Error("cannot get lastprice")
return lastPrice
}
} else {
lastPrice = s.midPrice
}
return lastPrice
}
}
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
instanceID := s.InstanceID()
if s.Position == nil {
s.Position = types.NewPositionFromMarket(s.Market)
}
if s.ProfitStats == nil {
s.ProfitStats = types.NewProfitStats(s.Market)
}
if s.TradeStats == nil {
s.TradeStats = types.NewTradeStats(s.Symbol)
}
// StrategyController
s.Status = types.StrategyStatusRunning
s.OnSuspend(func() {
_ = s.GeneralOrderExecutor.GracefulCancel(ctx)
})
s.OnEmergencyStop(func() {
_ = s.GeneralOrderExecutor.GracefulCancel(ctx)
_ = s.ClosePosition(ctx, fixedpoint.One)
})
s.GeneralOrderExecutor = bbgo.NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID, s.Position)
s.GeneralOrderExecutor.BindEnvironment(s.Environment)
s.GeneralOrderExecutor.BindProfitStats(s.ProfitStats)
s.GeneralOrderExecutor.BindTradeStats(s.TradeStats)
s.GeneralOrderExecutor.TradeCollector().OnPositionUpdate(func(p *types.Position) {
bbgo.Sync(s)
})
s.GeneralOrderExecutor.Bind()
s.orderPendingCounter = make(map[uint64]int)
s.minutesCounter = 0
for _, method := range s.ExitMethods {
method.Bind(session, s.GeneralOrderExecutor)
}
profit := floats.Slice{1., 1.}
price, _ := s.Session.LastPrice(s.Symbol)
initAsset := s.CalcAssetValue(price).Float64()
cumProfit := floats.Slice{initAsset, initAsset}
modify := func(p float64) float64 {
return p
}
s.GeneralOrderExecutor.TradeCollector().OnTrade(func(trade types.Trade, _profit, _netProfit fixedpoint.Value) {
price := trade.Price.Float64()
if s.buyPrice > 0 {
profit.Update(modify(price / s.buyPrice))
cumProfit.Update(s.CalcAssetValue(trade.Price).Float64())
} else if s.sellPrice > 0 {
profit.Update(modify(s.sellPrice / price))
cumProfit.Update(s.CalcAssetValue(trade.Price).Float64())
}
if s.Position.IsDust(trade.Price) {
s.buyPrice = 0
s.sellPrice = 0
s.highestPrice = 0
s.lowestPrice = 0
} else if s.Position.IsLong() {
s.buyPrice = price
s.sellPrice = 0
s.highestPrice = s.buyPrice
s.lowestPrice = 0
} else {
s.sellPrice = price
s.buyPrice = 0
s.highestPrice = 0
s.lowestPrice = s.sellPrice
}
})
s.initTickerFunctions()
startTime := s.Environment.StartTime()
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, startTime))
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1w, startTime))
// event trigger order: s.Interval => Interval1m
store, ok := session.SerialMarketDataStore(s.Symbol, []types.Interval{s.Interval, types.Interval1m})
if !ok {
panic("cannot get 1m history")
}
if err := s.initIndicators(store); err != nil {
log.WithError(err).Errorf("initIndicator failed")
return nil
}
s.InitDrawCommands(store, &profit, &cumProfit)
store.OnKLineClosed(func(kline types.KLine) {
s.minutesCounter = int(kline.StartTime.Time().Add(kline.Interval.Duration()).Sub(s.startTime).Minutes())
if kline.Interval == types.Interval1m {
s.klineHandler1m(ctx, kline)
} else if kline.Interval == s.Interval {
s.klineHandler(ctx, kline)
}
})
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
var buffer bytes.Buffer
for _, daypnl := range s.TradeStats.IntervalProfits[types.Interval1d].GetNonProfitableIntervals() {
fmt.Fprintf(&buffer, "%s\n", daypnl)
}
fmt.Fprintln(&buffer, s.TradeStats.BriefString())
os.Stdout.Write(buffer.Bytes())
if s.DrawGraph {
s.Draw(store, &profit, &cumProfit)
}
wg.Done()
})
return nil
}
func (s *Strategy) CalcAssetValue(price fixedpoint.Value) fixedpoint.Value {
balances := s.Session.GetAccount().Balances()
return balances[s.Market.BaseCurrency].Total().Mul(price).Add(balances[s.Market.QuoteCurrency].Total())
}
func (s *Strategy) klineHandler1m(ctx context.Context, kline types.KLine) {
if s.Status != types.StrategyStatusRunning {
return
}
stoploss := s.Stoploss.Float64()
price := s.getLastPrice()
pricef := price.Float64()
atr := s.atr.Last()
numPending := s.smartCancel(ctx, pricef)
if numPending > 0 {
log.Infof("pending orders: %d, exit", numPending)
return
}
lowf := math.Min(kline.Low.Float64(), pricef)
highf := math.Max(kline.High.Float64(), pricef)
if s.lowestPrice > 0 && lowf < s.lowestPrice {
s.lowestPrice = lowf
}
if s.highestPrice > 0 && highf > s.highestPrice {
s.highestPrice = highf
}
exitShortCondition := s.sellPrice > 0 && (s.sellPrice*(1.+stoploss) <= highf || s.sellPrice+atr <= highf ||
s.trailingCheck(highf, "short"))
exitLongCondition := s.buyPrice > 0 && (s.buyPrice*(1.-stoploss) >= lowf || s.buyPrice-atr >= lowf ||
s.trailingCheck(lowf, "long"))
if exitShortCondition || exitLongCondition {
_ = s.ClosePosition(ctx, fixedpoint.One)
}
}
func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine) {
s.heikinAshi.Update(kline)
source := s.GetSource(&kline)
sourcef := source.Float64()
s.priceLines.Update(sourcef)
if s.UseHeikinAshi {
source := s.GetSource(s.heikinAshi.Last())
sourcef := source.Float64()
s.ewo.Update(sourcef)
} else {
s.ewo.Update(sourcef)
}
s.atr.PushK(kline)
if s.Status != types.StrategyStatusRunning {
return
}
stoploss := s.Stoploss.Float64()
price := s.getLastPrice()
pricef := price.Float64()
lowf := math.Min(kline.Low.Float64(), pricef)
highf := math.Min(kline.High.Float64(), pricef)
s.smartCancel(ctx, pricef)
atr := s.atr.Last()
ewo := types.Array(s.ewo, 4)
bull := kline.Close.Compare(kline.Open) > 0
balances := s.GeneralOrderExecutor.Session().GetAccount().Balances()
bbgo.Notify("source: %.4f, price: %.4f lowest: %.4f highest: %.4f sp %.4f bp %.4f", sourcef, pricef, s.lowestPrice, s.highestPrice, s.sellPrice, s.buyPrice)
bbgo.Notify("balances: [Total] %v %s [Base] %s(%v %s) [Quote] %s",
s.CalcAssetValue(price),
s.Market.QuoteCurrency,
balances[s.Market.BaseCurrency].String(),
balances[s.Market.BaseCurrency].Total().Mul(price),
s.Market.QuoteCurrency,
balances[s.Market.QuoteCurrency].String(),
)
shortCondition := ewo[0] < ewo[1] && ewo[1] >= ewo[2] && (ewo[1] <= ewo[2] || ewo[2] >= ewo[3]) || s.sellPrice == 0 && ewo[0] < ewo[1] && ewo[1] < ewo[2]
longCondition := ewo[0] > ewo[1] && ewo[1] <= ewo[2] && (ewo[1] >= ewo[2] || ewo[2] <= ewo[3]) || s.buyPrice == 0 && ewo[0] > ewo[1] && ewo[1] > ewo[2]
exitShortCondition := s.sellPrice > 0 && !shortCondition && s.sellPrice*(1.+stoploss) <= highf || s.sellPrice+atr <= highf || s.trailingCheck(highf, "short")
exitLongCondition := s.buyPrice > 0 && !longCondition && s.buyPrice*(1.-stoploss) >= lowf || s.buyPrice-atr >= lowf || s.trailingCheck(lowf, "long")
if exitShortCondition || exitLongCondition {
if err := s.GeneralOrderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
return
}
s.ClosePosition(ctx, fixedpoint.One)
}
if longCondition && bull {
if err := s.GeneralOrderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
return
}
if source.Compare(price) > 0 {
source = price
sourcef = source.Float64()
}
balances := s.GeneralOrderExecutor.Session().GetAccount().Balances()
quoteBalance, ok := balances[s.Market.QuoteCurrency]
if !ok {
log.Errorf("unable to get quoteCurrency")
return
}
if s.Market.IsDustQuantity(
quoteBalance.Available.Div(source), source) {
return
}
quantity := quoteBalance.Available.Div(source)
createdOrders, err := s.GeneralOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeBuy,
Type: types.OrderTypeLimit,
Price: source,
Quantity: quantity,
Tag: "long",
})
if err != nil {
log.WithError(err).Errorf("cannot place buy order")
log.Errorf("%v %v %v", quoteBalance, source, kline)
return
}
s.orderPendingCounter[createdOrders[0].OrderID] = s.minutesCounter
return
}
if shortCondition && !bull {
if err := s.GeneralOrderExecutor.GracefulCancel(ctx); err != nil {
log.WithError(err).Errorf("cannot cancel orders")
return
}
if source.Compare(price) < 0 {
source = price
sourcef = price.Float64()
}
balances := s.GeneralOrderExecutor.Session().GetAccount().Balances()
baseBalance, ok := balances[s.Market.BaseCurrency]
if !ok {
log.Errorf("unable to get baseCurrency")
return
}
if s.Market.IsDustQuantity(baseBalance.Available, source) {
return
}
quantity := baseBalance.Available
createdOrders, err := s.GeneralOrderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeSell,
Type: types.OrderTypeLimit,
Price: source,
Quantity: quantity,
Tag: "short",
})
if err != nil {
log.WithError(err).Errorf("cannot place sell order")
return
}
s.orderPendingCounter[createdOrders[0].OrderID] = s.minutesCounter
return
}
}

View File

@ -3,13 +3,18 @@ package types
import (
"encoding/json"
"fmt"
"strings"
"time"
)
type Interval string
func (i Interval) Minutes() int {
return SupportedIntervals[i]
m, ok := SupportedIntervals[i]
if !ok {
return ParseInterval(i)
}
return m
}
func (i Interval) Duration() time.Duration {
@ -41,6 +46,7 @@ func (s IntervalSlice) StringSlice() (slice []string) {
}
var Interval1m = Interval("1m")
var Interval3m = Interval("3m")
var Interval5m = Interval("5m")
var Interval15m = Interval("15m")
var Interval30m = Interval("30m")
@ -55,8 +61,37 @@ var Interval1w = Interval("1w")
var Interval2w = Interval("2w")
var Interval1mo = Interval("1mo")
func ParseInterval(input Interval) int {
t := 0
index := 0
for i, rn := range string(input) {
if rn >= '0' && rn <= '9' {
t = t*10 + int(rn-'0')
} else {
index = i
break
}
}
switch strings.ToLower(string(input[index:])) {
case "m":
return t
case "h":
t *= 60
case "d":
t *= 60 * 24
case "w":
t *= 60 * 24 * 7
case "mo":
t *= 60 * 24 * 30
default:
panic("unknown input: " + input)
}
return t
}
var SupportedIntervals = map[Interval]int{
Interval1m: 1,
Interval3m: 3,
Interval5m: 5,
Interval15m: 15,
Interval30m: 30,

View File

@ -0,0 +1,14 @@
package types
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestParseInterval(t *testing.T) {
assert.Equal(t, ParseInterval("3m"), 3)
assert.Equal(t, ParseInterval("15h"), 15*60)
assert.Equal(t, ParseInterval("72d"), 72*24*60)
assert.Equal(t, ParseInterval("3Mo"), 3*30*24*60)
}

View File

@ -91,6 +91,20 @@ func (k *KLine) Set(o *KLine) {
k.Closed = o.Closed
}
func (k *KLine) Merge(o *KLine) {
k.EndTime = o.EndTime
k.Close = o.Close
k.High = fixedpoint.Max(k.High, o.High)
k.Low = fixedpoint.Min(k.Low, o.Low)
k.Volume = k.Volume.Add(o.Volume)
k.QuoteVolume = k.QuoteVolume.Add(o.QuoteVolume)
k.TakerBuyBaseAssetVolume = k.TakerBuyBaseAssetVolume.Add(o.TakerBuyBaseAssetVolume)
k.TakerBuyQuoteAssetVolume = k.TakerBuyQuoteAssetVolume.Add(o.TakerBuyQuoteAssetVolume)
k.LastTradeID = o.LastTradeID
k.NumberOfTrades += o.NumberOfTrades
k.Closed = o.Closed
}
func (k KLine) GetStartTime() Time {
return k.StartTime
}

58
pkg/types/pca.go Normal file
View File

@ -0,0 +1,58 @@
package types
import (
"fmt"
"gonum.org/v1/gonum/mat"
)
type PCA struct {
svd *mat.SVD
}
func (pca *PCA) FitTransform(x []SeriesExtend, lookback, feature int) ([]SeriesExtend, error) {
if err := pca.Fit(x, lookback); err != nil {
return nil, err
}
return pca.Transform(x, lookback, feature), nil
}
func (pca *PCA) Fit(x []SeriesExtend, lookback int) error {
vec := make([]float64, lookback*len(x))
for i, xx := range x {
mean := xx.Mean(lookback)
for j := 0; j < lookback; j++ {
vec[i+j*i] = xx.Index(j) - mean
}
}
pca.svd = &mat.SVD{}
diffMatrix := mat.NewDense(lookback, len(x), vec)
if ok := pca.svd.Factorize(diffMatrix, mat.SVDThin); !ok {
return fmt.Errorf("Unable to factorize")
}
return nil
}
func (pca *PCA) Transform(x []SeriesExtend, lookback int, features int) (result []SeriesExtend) {
result = make([]SeriesExtend, features)
vTemp := new(mat.Dense)
pca.svd.VTo(vTemp)
var ret mat.Dense
vec := make([]float64, lookback*len(x))
for i, xx := range x {
for j := 0; j < lookback; j++ {
vec[i+j*i] = xx.Index(j)
}
}
newX := mat.NewDense(lookback, len(x), vec)
ret.Mul(newX, vTemp)
newMatrix := mat.NewDense(lookback, features, nil)
newMatrix.Copy(&ret)
for i := 0; i < features; i++ {
queue := NewQueue(lookback)
for j := 0; j < lookback; j++ {
queue.Update(newMatrix.At(lookback-j-1, i))
}
result[i] = queue
}
return result
}