mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-26 08:45:16 +00:00
fix: SourceSelector json marshal/unmarshal without Init from strategy. smartCancel check on order status
This commit is contained in:
parent
67e57b49eb
commit
36a5579660
|
@ -23,7 +23,7 @@ import (
|
||||||
"github.com/c9s/bbgo/pkg/util"
|
"github.com/c9s/bbgo/pkg/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
var KLineLimit int64 = 1000
|
var KLinePreloadLimit int64 = 1000
|
||||||
|
|
||||||
// ExchangeSession presents the exchange connection Session
|
// ExchangeSession presents the exchange connection Session
|
||||||
// It also maintains and collects the data returned from the stream.
|
// It also maintains and collects the data returned from the stream.
|
||||||
|
@ -421,7 +421,7 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ
|
||||||
// avoid querying the last unclosed kline
|
// avoid querying the last unclosed kline
|
||||||
endTime := environ.startTime
|
endTime := environ.startTime
|
||||||
var i int64
|
var i int64
|
||||||
for i = 0; i < KLineLimit; i += 1000 {
|
for i = 0; i < KLinePreloadLimit; i += 1000 {
|
||||||
var duration time.Duration = time.Duration(-i * int64(interval.Duration()))
|
var duration time.Duration = time.Duration(-i * int64(interval.Duration()))
|
||||||
e := endTime.Add(duration)
|
e := endTime.Add(duration)
|
||||||
|
|
||||||
|
@ -441,7 +441,6 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ
|
||||||
// update last prices by the given kline
|
// update last prices by the given kline
|
||||||
lastKLine := kLines[len(kLines)-1]
|
lastKLine := kLines[len(kLines)-1]
|
||||||
if interval == types.Interval1m {
|
if interval == types.Interval1m {
|
||||||
log.Infof("last kline %+v", lastKLine)
|
|
||||||
session.lastPrices[symbol] = lastKLine.Close
|
session.lastPrices[symbol] = lastKLine.Close
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package bbgo
|
package bbgo
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
|
@ -10,16 +11,32 @@ import (
|
||||||
|
|
||||||
type SourceFunc func(*types.KLine) fixedpoint.Value
|
type SourceFunc func(*types.KLine) fixedpoint.Value
|
||||||
|
|
||||||
var Four fixedpoint.Value = fixedpoint.NewFromInt(4)
|
type selectorInternal struct {
|
||||||
var Three fixedpoint.Value = fixedpoint.NewFromInt(3)
|
Source string
|
||||||
var Two fixedpoint.Value = fixedpoint.NewFromInt(2)
|
|
||||||
|
|
||||||
type SourceSelector struct {
|
|
||||||
Source string `json:"source,omitempty"`
|
|
||||||
getSource SourceFunc
|
getSource SourceFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceSelector) Init() {
|
func (s *selectorInternal) UnmarshalJSON(d []byte) error {
|
||||||
|
if err := json.Unmarshal(d, &s.Source); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.init()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s selectorInternal) MarshalJSON() ([]byte, error) {
|
||||||
|
if s.Source == "" {
|
||||||
|
s.Source = "close"
|
||||||
|
s.init()
|
||||||
|
}
|
||||||
|
return []byte("\"" + s.Source + "\""), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type SourceSelector struct {
|
||||||
|
Source selectorInternal `json:"source,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *selectorInternal) init() {
|
||||||
switch strings.ToLower(s.Source) {
|
switch strings.ToLower(s.Source) {
|
||||||
case "close":
|
case "close":
|
||||||
s.getSource = func(kline *types.KLine) fixedpoint.Value { return kline.Close }
|
s.getSource = func(kline *types.KLine) fixedpoint.Value { return kline.Close }
|
||||||
|
@ -28,23 +45,36 @@ func (s *SourceSelector) Init() {
|
||||||
case "low":
|
case "low":
|
||||||
s.getSource = func(kline *types.KLine) fixedpoint.Value { return kline.Low }
|
s.getSource = func(kline *types.KLine) fixedpoint.Value { return kline.Low }
|
||||||
case "hl2":
|
case "hl2":
|
||||||
s.getSource = func(kline *types.KLine) fixedpoint.Value { return kline.High.Add(kline.Low).Div(Two) }
|
s.getSource = func(kline *types.KLine) fixedpoint.Value { return kline.High.Add(kline.Low).Div(fixedpoint.Two) }
|
||||||
case "hlc3":
|
case "hlc3":
|
||||||
s.getSource = func(kline *types.KLine) fixedpoint.Value {
|
s.getSource = func(kline *types.KLine) fixedpoint.Value {
|
||||||
return kline.High.Add(kline.Low).Add(kline.Close).Div(Three)
|
return kline.High.Add(kline.Low).Add(kline.Close).Div(fixedpoint.Three)
|
||||||
}
|
}
|
||||||
case "ohlc4":
|
case "ohlc4":
|
||||||
s.getSource = func(kline *types.KLine) fixedpoint.Value {
|
s.getSource = func(kline *types.KLine) fixedpoint.Value {
|
||||||
return kline.High.Add(kline.Low).Add(kline.Close).Add(kline.Open).Div(Four)
|
return kline.High.Add(kline.Low).Add(kline.Close).Add(kline.Open).Div(fixedpoint.Four)
|
||||||
}
|
}
|
||||||
case "open":
|
case "open":
|
||||||
s.getSource = func(kline *types.KLine) fixedpoint.Value { return kline.Open }
|
s.getSource = func(kline *types.KLine) fixedpoint.Value { return kline.Open }
|
||||||
default:
|
default:
|
||||||
log.Infof("source not set: %s, use hl2 by default", s.Source)
|
log.Infof("source not set: %s, use hl2 by default", s.Source)
|
||||||
s.getSource = func(kline *types.KLine) fixedpoint.Value { return kline.High.Add(kline.Low).Div(Two) }
|
s.getSource = func(kline *types.KLine) fixedpoint.Value { return kline.High.Add(kline.Low).Div(fixedpoint.Two) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SourceSelector) GetSource(kline *types.KLine) fixedpoint.Value {
|
func (s *selectorInternal) String() string {
|
||||||
return s.getSource(kline)
|
if s.Source == "" {
|
||||||
|
s.Source = "close"
|
||||||
|
s.init()
|
||||||
|
}
|
||||||
|
return s.Source
|
||||||
|
}
|
||||||
|
|
||||||
|
// lazy init if empty struct is passed in
|
||||||
|
func (s *SourceSelector) GetSource(kline *types.KLine) fixedpoint.Value {
|
||||||
|
if s.Source.Source == "" {
|
||||||
|
s.Source.Source = "close"
|
||||||
|
s.Source.init()
|
||||||
|
}
|
||||||
|
return s.Source.getSource(kline)
|
||||||
}
|
}
|
||||||
|
|
34
pkg/bbgo/source_test.go
Normal file
34
pkg/bbgo/source_test.go
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
package bbgo
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSource(t *testing.T) {
|
||||||
|
input := "{\"source\":\"high\"}"
|
||||||
|
type Strategy struct {
|
||||||
|
SourceSelector
|
||||||
|
}
|
||||||
|
s := Strategy{}
|
||||||
|
assert.NoError(t, json.Unmarshal([]byte(input), &s))
|
||||||
|
assert.Equal(t, s.Source.Source, "high")
|
||||||
|
assert.NotNil(t, s.Source.getSource)
|
||||||
|
e, err := json.Marshal(&s)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, input, string(e))
|
||||||
|
|
||||||
|
input = "{}"
|
||||||
|
s = Strategy{}
|
||||||
|
assert.NoError(t, json.Unmarshal([]byte(input), &s))
|
||||||
|
assert.Equal(t, fixedpoint.Zero, s.GetSource(&types.KLine{}))
|
||||||
|
|
||||||
|
e, err = json.Marshal(&Strategy{})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, "{\"source\":\"close\"}", string(e))
|
||||||
|
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package dynamic
|
package dynamic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
@ -16,7 +17,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func DefaultWhiteList() []string {
|
func DefaultWhiteList() []string {
|
||||||
return []string{"Window", "Interval", "Symbol"}
|
return []string{"Window", "RightWindow", "Interval", "Symbol", "Source"}
|
||||||
}
|
}
|
||||||
|
|
||||||
// @param s: strategy object
|
// @param s: strategy object
|
||||||
|
@ -96,6 +97,9 @@ func PrintConfig(s interface{}, f io.Writer, style *table.Style, withColor bool,
|
||||||
}
|
}
|
||||||
redundantSet[name] = struct{}{}
|
redundantSet[name] = struct{}{}
|
||||||
value := field.Field(j).Interface()
|
value := field.Field(j).Interface()
|
||||||
|
if e, err := json.Marshal(value); err == nil {
|
||||||
|
value = string(e)
|
||||||
|
}
|
||||||
values = append(values, types.JsonStruct{Key: fieldName, Json: name, Type: tt.Type.String(), Value: value})
|
values = append(values, types.JsonStruct{Key: fieldName, Json: name, Type: tt.Type.String(), Value: value})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -106,7 +110,11 @@ func PrintConfig(s interface{}, f io.Writer, style *table.Style, withColor bool,
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
redundantSet[name] = struct{}{}
|
redundantSet[name] = struct{}{}
|
||||||
values = append(values, types.JsonStruct{Key: fieldName, Json: name, Type: t.Type.String(), Value: val.Field(i).Interface()})
|
value := val.Field(i).Interface()
|
||||||
|
if e, err := json.Marshal(value); err == nil {
|
||||||
|
value = string(e)
|
||||||
|
}
|
||||||
|
values = append(values, types.JsonStruct{Key: fieldName, Json: name, Type: t.Type.String(), Value: value})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sort.Sort(values)
|
sort.Sort(values)
|
||||||
|
|
7
pkg/fixedpoint/const.go
Normal file
7
pkg/fixedpoint/const.go
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
package fixedpoint
|
||||||
|
|
||||||
|
var (
|
||||||
|
Two Value = NewFromInt(2)
|
||||||
|
Three Value = NewFromInt(3)
|
||||||
|
Four Value = NewFromInt(3)
|
||||||
|
)
|
|
@ -126,7 +126,7 @@ func (s *Strategy) InstanceID() string {
|
||||||
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
|
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
|
||||||
// by default, bbgo only pre-subscribe 1000 klines.
|
// by default, bbgo only pre-subscribe 1000 klines.
|
||||||
// this is not enough if we're subscribing 30m intervals using SerialMarketDataStore
|
// this is not enough if we're subscribing 30m intervals using SerialMarketDataStore
|
||||||
bbgo.KLineLimit = int64((s.Interval.Minutes()*s.Window/1000 + 1) * 1000)
|
bbgo.KLinePreloadLimit = int64((s.Interval.Minutes()*s.Window/1000 + 1) * 1000)
|
||||||
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{
|
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{
|
||||||
Interval: types.Interval1m,
|
Interval: types.Interval1m,
|
||||||
})
|
})
|
||||||
|
@ -255,6 +255,9 @@ func (s *Strategy) smartCancel(ctx context.Context, pricef, atr float64) (int, e
|
||||||
|
|
||||||
drift := s.drift1m.Array(2)
|
drift := s.drift1m.Array(2)
|
||||||
for _, order := range nonTraded {
|
for _, order := range nonTraded {
|
||||||
|
if order.Status != types.OrderStatusNew && order.Status != types.OrderStatusPartiallyFilled {
|
||||||
|
continue
|
||||||
|
}
|
||||||
log.Warnf("%v | counter: %d, system: %d", order, s.orderPendingCounter[order.OrderID], s.minutesCounter)
|
log.Warnf("%v | counter: %d, system: %d", order, s.orderPendingCounter[order.OrderID], s.minutesCounter)
|
||||||
if s.minutesCounter-s.orderPendingCounter[order.OrderID] > s.PendingMinutes {
|
if s.minutesCounter-s.orderPendingCounter[order.OrderID] > s.PendingMinutes {
|
||||||
if order.Side == types.SideTypeBuy && drift[1] < drift[0] {
|
if order.Side == types.SideTypeBuy && drift[1] < drift[0] {
|
||||||
|
@ -801,8 +804,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
|
||||||
}
|
}
|
||||||
// StrategyController
|
// StrategyController
|
||||||
s.Status = types.StrategyStatusRunning
|
s.Status = types.StrategyStatusRunning
|
||||||
// Get source function from config input
|
|
||||||
s.SourceSelector.Init()
|
|
||||||
|
|
||||||
s.OnSuspend(func() {
|
s.OnSuspend(func() {
|
||||||
_ = s.GeneralOrderExecutor.GracefulCancel(ctx)
|
_ = s.GeneralOrderExecutor.GracefulCancel(ctx)
|
||||||
|
|
|
@ -99,7 +99,7 @@ func (s *Strategy) InstanceID() string {
|
||||||
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
|
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
|
||||||
// by default, bbgo only pre-subscribe 1000 klines.
|
// by default, bbgo only pre-subscribe 1000 klines.
|
||||||
// this is not enough if we're subscribing 30m intervals using SerialMarketDataStore
|
// this is not enough if we're subscribing 30m intervals using SerialMarketDataStore
|
||||||
bbgo.KLineLimit = int64((s.Interval.Minutes()*s.WindowSlow/1000 + 1) + 1000)
|
bbgo.KLinePreloadLimit = int64((s.Interval.Minutes()*s.WindowSlow/1000 + 1) + 1000)
|
||||||
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{
|
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{
|
||||||
Interval: types.Interval1m,
|
Interval: types.Interval1m,
|
||||||
})
|
})
|
||||||
|
@ -178,6 +178,9 @@ func (s *Strategy) smartCancel(ctx context.Context, pricef float64) int {
|
||||||
if len(nonTraded) > 0 {
|
if len(nonTraded) > 0 {
|
||||||
left := 0
|
left := 0
|
||||||
for _, order := range nonTraded {
|
for _, order := range nonTraded {
|
||||||
|
if order.Status != types.OrderStatusNew && order.Status != types.OrderStatusPartiallyFilled {
|
||||||
|
continue
|
||||||
|
}
|
||||||
log.Warnf("%v | counter: %d, system: %d", order, s.orderPendingCounter[order.OrderID], s.minutesCounter)
|
log.Warnf("%v | counter: %d, system: %d", order, s.orderPendingCounter[order.OrderID], s.minutesCounter)
|
||||||
toCancel := false
|
toCancel := false
|
||||||
if s.minutesCounter-s.orderPendingCounter[order.OrderID] >= s.PendingMinutes {
|
if s.minutesCounter-s.orderPendingCounter[order.OrderID] >= s.PendingMinutes {
|
||||||
|
@ -291,8 +294,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
|
||||||
}
|
}
|
||||||
// StrategyController
|
// StrategyController
|
||||||
s.Status = types.StrategyStatusRunning
|
s.Status = types.StrategyStatusRunning
|
||||||
// Get source function from config input
|
|
||||||
s.SourceSelector.Init()
|
|
||||||
s.OnSuspend(func() {
|
s.OnSuspend(func() {
|
||||||
_ = s.GeneralOrderExecutor.GracefulCancel(ctx)
|
_ = s.GeneralOrderExecutor.GracefulCancel(ctx)
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue
Block a user