mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-10 09:11:55 +00:00
Merge pull request #596 from c9s/improve-persistence-api
improve persistence api
This commit is contained in:
commit
948cf7a1c4
|
@ -102,6 +102,11 @@ func main() {
|
|||
return
|
||||
}
|
||||
|
||||
if err := trader.LoadState() ; err != nil {
|
||||
log.WithError(err).Error("failed to load strategy states")
|
||||
return
|
||||
}
|
||||
|
||||
// for setup mode, we don't start the trader
|
||||
go func() {
|
||||
if err := trader.Run(ctx); err != nil {
|
||||
|
|
|
@ -110,6 +110,11 @@ func main() {
|
|||
return
|
||||
}
|
||||
|
||||
if err := trader.LoadState() ; err != nil {
|
||||
log.WithError(err).Error("failed to load strategy states")
|
||||
return
|
||||
}
|
||||
|
||||
// for setup mode, we don't start the trader
|
||||
if err := trader.Run(ctx); err != nil {
|
||||
log.WithError(err).Error("failed to start trader")
|
||||
|
|
|
@ -106,9 +106,6 @@ exchangeStrategies:
|
|||
# buyBelowNeutralSMA: when this set, it will only place buy order when the current price is below the SMA line.
|
||||
buyBelowNeutralSMA: false
|
||||
|
||||
persistence:
|
||||
type: redis
|
||||
|
||||
# Set up your stop order, this is optional
|
||||
# sometimes the stop order might decrease your total profit.
|
||||
# you can setup multiple stop,
|
||||
|
|
20
pkg/bbgo/graceful_shutdown.go
Normal file
20
pkg/bbgo/graceful_shutdown.go
Normal file
|
@ -0,0 +1,20 @@
|
|||
package bbgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
//go:generate callbackgen -type Graceful
|
||||
type Graceful struct {
|
||||
shutdownCallbacks []func(ctx context.Context, wg *sync.WaitGroup)
|
||||
}
|
||||
|
||||
func (g *Graceful) Shutdown(ctx context.Context) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(g.shutdownCallbacks))
|
||||
|
||||
go g.EmitShutdown(ctx, &wg)
|
||||
|
||||
wg.Wait()
|
||||
}
|
|
@ -7,24 +7,6 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func isSymbolBasedStrategy(rs reflect.Value) (string, bool) {
|
||||
field := rs.FieldByName("Symbol")
|
||||
if !field.IsValid() {
|
||||
return "", false
|
||||
}
|
||||
|
||||
if field.Kind() != reflect.String {
|
||||
return "", false
|
||||
}
|
||||
|
||||
return field.String(), true
|
||||
}
|
||||
|
||||
func hasField(rs reflect.Value, fieldName string) (field reflect.Value, ok bool) {
|
||||
field = rs.FieldByName(fieldName)
|
||||
return field, field.IsValid()
|
||||
}
|
||||
|
||||
func injectField(rs reflect.Value, fieldName string, obj interface{}, pointerOnly bool) error {
|
||||
field := rs.FieldByName(fieldName)
|
||||
if !field.IsValid() {
|
||||
|
|
|
@ -2,6 +2,7 @@ package bbgo
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
|
@ -74,3 +75,101 @@ func (p *Persistence) Save(val interface{}, subIDs ...string) error {
|
|||
store := ps.NewStore(p.PersistenceSelector.StoreID, subIDs...)
|
||||
return store.Save(val)
|
||||
}
|
||||
|
||||
func (p *Persistence) Sync(obj interface{}) error {
|
||||
id := callID(obj)
|
||||
if len(id) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ps := p.Facade.Get()
|
||||
return storePersistenceFields(obj, id, ps)
|
||||
}
|
||||
|
||||
type StructFieldIterator func(tag string, ft reflect.StructField, fv reflect.Value) error
|
||||
|
||||
func iterateFieldsByTag(obj interface{}, tagName string, cb StructFieldIterator) error {
|
||||
sv := reflect.ValueOf(obj)
|
||||
st := reflect.TypeOf(obj)
|
||||
|
||||
if st.Kind() != reflect.Ptr {
|
||||
return fmt.Errorf("f needs to be a pointer of a struct, %s given", st)
|
||||
}
|
||||
|
||||
// solve the reference
|
||||
st = st.Elem()
|
||||
sv = sv.Elem()
|
||||
|
||||
if st.Kind() != reflect.Struct {
|
||||
return fmt.Errorf("f needs to be a struct, %s given", st)
|
||||
}
|
||||
|
||||
for i := 0; i < sv.NumField(); i++ {
|
||||
fv := sv.Field(i)
|
||||
ft := st.Field(i)
|
||||
|
||||
fvt := fv.Type()
|
||||
_ = fvt
|
||||
|
||||
// skip unexported fields
|
||||
if !st.Field(i).IsExported() {
|
||||
continue
|
||||
}
|
||||
|
||||
tag, ok := ft.Tag.Lookup(tagName)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := cb(tag, ft, fv); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// https://github.com/xiaojun207/go-base-utils/blob/master/utils/Clone.go
|
||||
func newTypeValueInterface(typ reflect.Type) interface{} {
|
||||
if typ.Kind() == reflect.Ptr {
|
||||
typ = typ.Elem()
|
||||
dst := reflect.New(typ).Elem()
|
||||
return dst.Addr().Interface()
|
||||
}
|
||||
dst := reflect.New(typ)
|
||||
return dst.Interface()
|
||||
}
|
||||
|
||||
func loadPersistenceFields(obj interface{}, id string, persistence service.PersistenceService) error {
|
||||
return iterateFieldsByTag(obj, "persistence", func(tag string, field reflect.StructField, value reflect.Value) error {
|
||||
newValueInf := newTypeValueInterface(value.Type())
|
||||
// inf := value.Interface()
|
||||
store := persistence.NewStore("state", id, tag)
|
||||
if err := store.Load(&newValueInf); err != nil {
|
||||
if err == service.ErrPersistenceNotExists {
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
newValue := reflect.ValueOf(newValueInf)
|
||||
if value.Kind() != reflect.Ptr && newValue.Kind() == reflect.Ptr {
|
||||
newValue = newValue.Elem()
|
||||
}
|
||||
|
||||
// log.Debugf("%v = %v (%s) -> %v (%s)\n", field, value, value.Type(), newValue, newValue.Type())
|
||||
|
||||
value.Set(newValue)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func storePersistenceFields(obj interface{}, id string, persistence service.PersistenceService) error {
|
||||
return iterateFieldsByTag(obj, "persistence", func(tag string, ft reflect.StructField, fv reflect.Value) error {
|
||||
inf := fv.Interface()
|
||||
|
||||
store := persistence.NewStore("state", id, tag)
|
||||
return store.Save(inf)
|
||||
})
|
||||
}
|
||||
|
|
96
pkg/bbgo/persistence_test.go
Normal file
96
pkg/bbgo/persistence_test.go
Normal file
|
@ -0,0 +1,96 @@
|
|||
package bbgo
|
||||
|
||||
import (
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||
"github.com/c9s/bbgo/pkg/service"
|
||||
"github.com/c9s/bbgo/pkg/types"
|
||||
)
|
||||
|
||||
type TestStruct struct {
|
||||
Position *types.Position `persistence:"position"`
|
||||
Integer int64 `persistence:"integer"`
|
||||
Integer2 int64 `persistence:"integer2"`
|
||||
Float int64 `persistence:"float"`
|
||||
String string `persistence:"string"`
|
||||
}
|
||||
|
||||
func (t *TestStruct) InstanceID() string {
|
||||
return "test-struct"
|
||||
}
|
||||
|
||||
func preparePersistentServices() []service.PersistenceService {
|
||||
mem := service.NewMemoryService()
|
||||
jsonDir := &service.JsonPersistenceService{Directory: "testoutput/persistence"}
|
||||
pss := []service.PersistenceService{
|
||||
mem,
|
||||
jsonDir,
|
||||
}
|
||||
|
||||
if _, ok := os.LookupEnv("TEST_REDIS"); ok {
|
||||
redisP := service.NewRedisPersistenceService(&service.RedisPersistenceConfig{
|
||||
Host: "localhost",
|
||||
Port: "6379",
|
||||
DB: 0,
|
||||
})
|
||||
pss = append(pss, redisP)
|
||||
}
|
||||
|
||||
return pss
|
||||
}
|
||||
|
||||
func Test_callID(t *testing.T) {
|
||||
id := callID(&TestStruct{})
|
||||
assert.NotEmpty(t, id)
|
||||
}
|
||||
|
||||
func Test_storePersistenceFields(t *testing.T) {
|
||||
var pss = preparePersistentServices()
|
||||
|
||||
var a = &TestStruct{
|
||||
Integer: 1,
|
||||
Integer2: 2,
|
||||
Float: 3.0,
|
||||
String: "foobar",
|
||||
Position: types.NewPosition("BTCUSDT", "BTC", "USDT"),
|
||||
}
|
||||
|
||||
a.Position.Base = fixedpoint.NewFromFloat(10.0)
|
||||
a.Position.AverageCost = fixedpoint.NewFromFloat(3343.0)
|
||||
|
||||
for _, ps := range pss {
|
||||
t.Run(reflect.TypeOf(ps).Elem().String(), func(t *testing.T) {
|
||||
id := callID(a)
|
||||
err := storePersistenceFields(a, id, ps)
|
||||
assert.NoError(t, err)
|
||||
|
||||
var i int64
|
||||
store := ps.NewStore("state", "test-struct", "integer")
|
||||
err = store.Load(&i)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(1), i)
|
||||
|
||||
var p *types.Position
|
||||
store = ps.NewStore("state", "test-struct", "position")
|
||||
err = store.Load(&p)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fixedpoint.NewFromFloat(10.0), p.Base)
|
||||
assert.Equal(t, fixedpoint.NewFromFloat(3343.0), p.AverageCost)
|
||||
|
||||
var b = &TestStruct{}
|
||||
err = loadPersistenceFields(b, id, ps)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, a.Integer, b.Integer)
|
||||
assert.Equal(t, a.Integer2, b.Integer2)
|
||||
assert.Equal(t, a.Float, b.Float)
|
||||
assert.Equal(t, a.String, b.String)
|
||||
assert.Equal(t, a.Position, b.Position)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
39
pkg/bbgo/reflect.go
Normal file
39
pkg/bbgo/reflect.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package bbgo
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
)
|
||||
|
||||
type InstanceIDProvider interface{
|
||||
InstanceID() string
|
||||
}
|
||||
|
||||
|
||||
func callID(obj interface{}) string {
|
||||
sv := reflect.ValueOf(obj)
|
||||
st := reflect.TypeOf(obj)
|
||||
if st.Implements(reflect.TypeOf((*InstanceIDProvider)(nil)).Elem()) {
|
||||
m := sv.MethodByName("InstanceID")
|
||||
ret := m.Call(nil)
|
||||
return ret[0].String()
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func isSymbolBasedStrategy(rs reflect.Value) (string, bool) {
|
||||
field := rs.FieldByName("Symbol")
|
||||
if !field.IsValid() {
|
||||
return "", false
|
||||
}
|
||||
|
||||
if field.Kind() != reflect.String {
|
||||
return "", false
|
||||
}
|
||||
|
||||
return field.String(), true
|
||||
}
|
||||
|
||||
func hasField(rs reflect.Value, fieldName string) (field reflect.Value, ok bool) {
|
||||
field = rs.FieldByName(fieldName)
|
||||
return field, field.IsValid()
|
||||
}
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
@ -43,20 +42,6 @@ type Validator interface {
|
|||
Validate() error
|
||||
}
|
||||
|
||||
//go:generate callbackgen -type Graceful
|
||||
type Graceful struct {
|
||||
shutdownCallbacks []func(ctx context.Context, wg *sync.WaitGroup)
|
||||
}
|
||||
|
||||
func (g *Graceful) Shutdown(ctx context.Context) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(g.shutdownCallbacks))
|
||||
|
||||
go g.EmitShutdown(ctx, &wg)
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
type Logging interface {
|
||||
EnableLogging()
|
||||
DisableLogging()
|
||||
|
@ -349,6 +334,72 @@ func (trader *Trader) Run(ctx context.Context) error {
|
|||
return trader.environment.Connect(ctx)
|
||||
}
|
||||
|
||||
func (trader *Trader) LoadState() error {
|
||||
if trader.environment.BacktestService != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if trader.environment.PersistenceServiceFacade == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
ps := trader.environment.PersistenceServiceFacade.Get()
|
||||
|
||||
log.Infof("loading strategies states...")
|
||||
for _, strategies := range trader.exchangeStrategies {
|
||||
for _, strategy := range strategies {
|
||||
if err := loadPersistenceFields(strategy, strategy.ID(), ps); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, strategy := range trader.crossExchangeStrategies {
|
||||
if err := loadPersistenceFields(strategy, strategy.ID(), ps); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (trader *Trader) SaveState() error {
|
||||
if trader.environment.BacktestService != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if trader.environment.PersistenceServiceFacade == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
ps := trader.environment.PersistenceServiceFacade.Get()
|
||||
|
||||
log.Infof("saving strategies states...")
|
||||
for _, strategies := range trader.exchangeStrategies {
|
||||
for _, strategy := range strategies {
|
||||
id := callID(strategy)
|
||||
if len(id) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := storePersistenceFields(strategy, id, ps); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, strategy := range trader.crossExchangeStrategies {
|
||||
id := callID(strategy)
|
||||
if len(id) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := storePersistenceFields(strategy, id, ps); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
var defaultPersistenceSelector = &PersistenceSelector{
|
||||
StoreID: "default",
|
||||
Type: "memory",
|
||||
|
|
|
@ -189,6 +189,10 @@ func runConfig(basectx context.Context, cmd *cobra.Command, userConfig *bbgo.Con
|
|||
return err
|
||||
}
|
||||
|
||||
if err := trader.LoadState(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := trader.Run(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -228,6 +232,10 @@ func runConfig(basectx context.Context, cmd *cobra.Command, userConfig *bbgo.Con
|
|||
trader.Graceful.Shutdown(shutdownCtx)
|
||||
cancelShutdown()
|
||||
|
||||
if err := trader.SaveState(); err != nil {
|
||||
log.WithError(err).Errorf("can not save strategy states")
|
||||
}
|
||||
|
||||
for _, session := range environ.Sessions() {
|
||||
if err := session.MarketDataStream.Close(); err != nil {
|
||||
log.WithError(err).Errorf("[%s] market data stream close error", session.Name)
|
||||
|
|
|
@ -36,7 +36,8 @@ func (store *MemoryStore) Save(val interface{}) error {
|
|||
func (store *MemoryStore) Load(val interface{}) error {
|
||||
v := reflect.ValueOf(val)
|
||||
if data, ok := store.memory.Slots[store.Key]; ok {
|
||||
v.Elem().Set(reflect.ValueOf(data).Elem())
|
||||
dataRV := reflect.ValueOf(data)
|
||||
v.Elem().Set(dataRV)
|
||||
} else {
|
||||
return ErrPersistenceNotExists
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ func TestMemoryService(t *testing.T) {
|
|||
store := service.NewStore("test")
|
||||
|
||||
i := 3
|
||||
err := store.Save(&i)
|
||||
err := store.Save(i)
|
||||
|
||||
assert.NoError(t, err)
|
||||
|
||||
|
|
|
@ -63,7 +63,7 @@ func (store JsonStore) Load(val interface{}) error {
|
|||
|
||||
func (store JsonStore) Save(val interface{}) error {
|
||||
if _, err := os.Stat(store.Directory); os.IsNotExist(err) {
|
||||
if err2 := os.Mkdir(store.Directory, 0777); err2 != nil {
|
||||
if err2 := os.MkdirAll(store.Directory, 0777); err2 != nil {
|
||||
return err2
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,17 +5,15 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/indicator"
|
||||
"github.com/c9s/bbgo/pkg/util"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/bbgo"
|
||||
"github.com/c9s/bbgo/pkg/exchange/max"
|
||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||
"github.com/c9s/bbgo/pkg/service"
|
||||
"github.com/c9s/bbgo/pkg/types"
|
||||
)
|
||||
|
||||
|
@ -37,8 +35,12 @@ func init() {
|
|||
bbgo.RegisterStrategy(ID, &Strategy{})
|
||||
}
|
||||
|
||||
// Deprecated: State is deprecated, please use the persistence tag
|
||||
type State struct {
|
||||
Position *types.Position `json:"position,omitempty"`
|
||||
// Deprecated: Position is deprecated, please define the Position field in the strategy struct directly.
|
||||
Position *types.Position `json:"position,omitempty"`
|
||||
|
||||
// Deprecated: ProfitStats is deprecated, please define the ProfitStats field in the strategy struct directly.
|
||||
ProfitStats types.ProfitStats `json:"profitStats,omitempty"`
|
||||
}
|
||||
|
||||
|
@ -147,6 +149,10 @@ type Strategy struct {
|
|||
|
||||
state *State
|
||||
|
||||
// persistence fields
|
||||
Position *types.Position `json:"position,omitempty" persistence:"position"`
|
||||
ProfitStats *types.ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
|
||||
|
||||
activeMakerOrders *bbgo.LocalActiveOrderBook
|
||||
orderStore *bbgo.OrderStore
|
||||
tradeCollector *bbgo.TradeCollector
|
||||
|
@ -169,6 +175,10 @@ func (s *Strategy) ID() string {
|
|||
return ID
|
||||
}
|
||||
|
||||
func (s *Strategy) InstanceID() string {
|
||||
return fmt.Sprintf("%s:%s", ID, s.Symbol)
|
||||
}
|
||||
|
||||
func (s *Strategy) Initialize() error {
|
||||
return s.SmartStops.InitializeStopControllers(s.Symbol)
|
||||
}
|
||||
|
@ -202,13 +212,13 @@ func (s *Strategy) Validate() error {
|
|||
}
|
||||
|
||||
func (s *Strategy) CurrentPosition() *types.Position {
|
||||
return s.state.Position
|
||||
return s.Position
|
||||
}
|
||||
|
||||
func (s *Strategy) ClosePosition(ctx context.Context, percentage fixedpoint.Value) error {
|
||||
base := s.state.Position.GetBase()
|
||||
base := s.Position.GetBase()
|
||||
if base.IsZero() {
|
||||
return fmt.Errorf("no opened %s position", s.state.Position.Symbol)
|
||||
return fmt.Errorf("no opened %s position", s.Position.Symbol)
|
||||
}
|
||||
|
||||
// make it negative
|
||||
|
@ -256,19 +266,12 @@ func (s *Strategy) Suspend(ctx context.Context) error {
|
|||
log.WithError(err).Errorf("graceful cancel order error")
|
||||
s.Notify("graceful cancel order error")
|
||||
} else {
|
||||
s.Notify("All orders cancelled.")
|
||||
s.Notify("All orders are cancelled.")
|
||||
}
|
||||
|
||||
s.tradeCollector.Process()
|
||||
|
||||
// Save state
|
||||
if err := s.SaveState(); err != nil {
|
||||
log.WithError(err).Errorf("can not save state: %+v", s.state)
|
||||
} else {
|
||||
log.Infof("%s position is saved.", s.Symbol)
|
||||
}
|
||||
|
||||
return nil
|
||||
return s.Persistence.Sync(s)
|
||||
}
|
||||
|
||||
func (s *Strategy) Resume(ctx context.Context) error {
|
||||
|
@ -288,41 +291,13 @@ func (s *Strategy) EmergencyStop(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *Strategy) SaveState() error {
|
||||
if err := s.Persistence.Save(s.state, ID, s.Symbol, stateKey); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("state is saved => %+v", s.state)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Deprecated: LoadState method is migrated to the persistence struct tag.
|
||||
func (s *Strategy) LoadState() error {
|
||||
var state State
|
||||
|
||||
// load position
|
||||
if err := s.Persistence.Load(&state, ID, s.Symbol, stateKey); err != nil {
|
||||
if err != service.ErrPersistenceNotExists {
|
||||
return err
|
||||
}
|
||||
|
||||
s.state = &State{}
|
||||
} else {
|
||||
if err := s.Persistence.Load(&state, ID, s.Symbol, stateKey); err == nil {
|
||||
s.state = &state
|
||||
log.Infof("state is restored: %+v", s.state)
|
||||
}
|
||||
|
||||
// if position is nil, we need to allocate a new position for calculation
|
||||
if s.state.Position == nil {
|
||||
s.state.Position = types.NewPositionFromMarket(s.Market)
|
||||
}
|
||||
|
||||
// init profit states
|
||||
s.state.ProfitStats.Symbol = s.Market.Symbol
|
||||
s.state.ProfitStats.BaseCurrency = s.Market.BaseCurrency
|
||||
s.state.ProfitStats.QuoteCurrency = s.Market.QuoteCurrency
|
||||
if s.state.ProfitStats.AccumulatedSince == 0 {
|
||||
s.state.ProfitStats.AccumulatedSince = time.Now().Unix()
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -353,7 +328,7 @@ func (s *Strategy) placeOrders(ctx context.Context, orderExecutor bbgo.OrderExec
|
|||
|
||||
askPrice := midPrice.Mul(fixedpoint.One.Add(askSpread))
|
||||
bidPrice := midPrice.Mul(fixedpoint.One.Sub(bidSpread))
|
||||
base := s.state.Position.GetBase()
|
||||
base := s.Position.GetBase()
|
||||
balances := s.session.GetAccount().Balances()
|
||||
|
||||
log.Infof("mid price:%v spread: %s ask:%v bid: %v position: %s",
|
||||
|
@ -361,7 +336,7 @@ func (s *Strategy) placeOrders(ctx context.Context, orderExecutor bbgo.OrderExec
|
|||
s.Spread.Percentage(),
|
||||
askPrice,
|
||||
bidPrice,
|
||||
s.state.Position,
|
||||
s.Position,
|
||||
)
|
||||
|
||||
sellQuantity := s.QuantityOrAmount.CalculateQuantity(askPrice)
|
||||
|
@ -488,7 +463,7 @@ func (s *Strategy) placeOrders(ctx context.Context, orderExecutor bbgo.OrderExec
|
|||
canSell = false
|
||||
}
|
||||
|
||||
if midPrice.Compare(s.state.Position.AverageCost.Mul(fixedpoint.One.Add(s.MinProfitSpread))) < 0 {
|
||||
if midPrice.Compare(s.Position.AverageCost.Mul(fixedpoint.One.Add(s.MinProfitSpread))) < 0 {
|
||||
canSell = false
|
||||
}
|
||||
|
||||
|
@ -509,7 +484,7 @@ func (s *Strategy) placeOrders(ctx context.Context, orderExecutor bbgo.OrderExec
|
|||
|
||||
// condition for lower the average cost
|
||||
/*
|
||||
if midPrice < s.state.Position.AverageCost.MulFloat64(1.0-s.MinProfitSpread.Float64()) && canBuy {
|
||||
if midPrice < s.Position.AverageCost.MulFloat64(1.0-s.MinProfitSpread.Float64()) && canBuy {
|
||||
submitOrders = append(submitOrders, buyOrder)
|
||||
}
|
||||
*/
|
||||
|
@ -597,17 +572,37 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
|
|||
s.defaultBoll = s.StandardIndicatorSet.BOLL(s.DefaultBollinger.IntervalWindow, s.DefaultBollinger.BandWidth)
|
||||
|
||||
// calculate group id for orders
|
||||
instanceID := fmt.Sprintf("%s-%s", ID, s.Symbol)
|
||||
s.groupID = max.GenerateGroupID(instanceID)
|
||||
log.Infof("using group id %d from fnv(%s)", s.groupID, instanceID)
|
||||
instanceID := s.InstanceID()
|
||||
s.groupID = util.FNV32(instanceID)
|
||||
|
||||
// restore state
|
||||
if err := s.LoadState(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.state.Position.Strategy = ID
|
||||
s.state.Position.StrategyInstanceID = instanceID
|
||||
// If position is nil, we need to allocate a new position for calculation
|
||||
if s.Position == nil {
|
||||
// fallback to the legacy position struct in the state
|
||||
if s.state != nil && s.state.Position != nil {
|
||||
s.Position = s.state.Position
|
||||
} else {
|
||||
s.Position = types.NewPositionFromMarket(s.Market)
|
||||
}
|
||||
}
|
||||
|
||||
if s.ProfitStats == nil {
|
||||
if s.state != nil {
|
||||
// copy profit stats
|
||||
p2 := s.state.ProfitStats
|
||||
s.ProfitStats = &p2
|
||||
} else {
|
||||
s.ProfitStats = types.NewProfitStats(s.Market)
|
||||
}
|
||||
}
|
||||
|
||||
// Always update the position fields
|
||||
s.Position.Strategy = ID
|
||||
s.Position.StrategyInstanceID = instanceID
|
||||
|
||||
s.stopC = make(chan struct{})
|
||||
|
||||
|
@ -617,7 +612,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
|
|||
s.orderStore = bbgo.NewOrderStore(s.Symbol)
|
||||
s.orderStore.BindStream(session.UserDataStream)
|
||||
|
||||
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.state.Position, s.orderStore)
|
||||
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
|
||||
|
||||
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
|
||||
// StrategyController
|
||||
|
@ -626,27 +621,27 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
|
|||
}
|
||||
|
||||
s.Notifiability.Notify(trade)
|
||||
s.state.ProfitStats.AddTrade(trade)
|
||||
s.ProfitStats.AddTrade(trade)
|
||||
|
||||
if profit.Compare(fixedpoint.Zero) == 0 {
|
||||
s.Environment.RecordPosition(s.state.Position, trade, nil)
|
||||
s.Environment.RecordPosition(s.Position, trade, nil)
|
||||
} else {
|
||||
log.Infof("%s generated profit: %v", s.Symbol, profit)
|
||||
p := s.state.Position.NewProfit(trade, profit, netProfit)
|
||||
p := s.Position.NewProfit(trade, profit, netProfit)
|
||||
p.Strategy = ID
|
||||
p.StrategyInstanceID = instanceID
|
||||
s.Notify(&p)
|
||||
|
||||
s.state.ProfitStats.AddProfit(p)
|
||||
s.Notify(&s.state.ProfitStats)
|
||||
s.ProfitStats.AddProfit(p)
|
||||
s.Notify(&s.ProfitStats)
|
||||
|
||||
s.Environment.RecordPosition(s.state.Position, trade, &p)
|
||||
s.Environment.RecordPosition(s.Position, trade, &p)
|
||||
}
|
||||
})
|
||||
|
||||
s.tradeCollector.OnPositionUpdate(func(position *types.Position) {
|
||||
log.Infof("position changed: %s", s.state.Position)
|
||||
s.Notify(s.state.Position)
|
||||
log.Infof("position changed: %s", s.Position)
|
||||
s.Notify(s.Position)
|
||||
})
|
||||
|
||||
s.tradeCollector.BindStream(session.UserDataStream)
|
||||
|
@ -712,10 +707,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
|
|||
}
|
||||
|
||||
s.tradeCollector.Process()
|
||||
|
||||
if err := s.SaveState(); err != nil {
|
||||
log.WithError(err).Errorf("can not save state: %+v", s.state)
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
|
|
|
@ -9,10 +9,10 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/bbgo"
|
||||
"github.com/c9s/bbgo/pkg/exchange/max"
|
||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||
"github.com/c9s/bbgo/pkg/service"
|
||||
"github.com/c9s/bbgo/pkg/types"
|
||||
"github.com/c9s/bbgo/pkg/util"
|
||||
)
|
||||
|
||||
const ID = "grid"
|
||||
|
@ -586,7 +586,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
|
|||
}
|
||||
|
||||
instanceID := s.InstanceID()
|
||||
s.groupID = max.GenerateGroupID(instanceID)
|
||||
s.groupID = util.FNV32(instanceID)
|
||||
log.Infof("using group id %d from fnv(%s)", s.groupID, instanceID)
|
||||
|
||||
if err := s.LoadState(); err != nil {
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/bbgo"
|
||||
"github.com/c9s/bbgo/pkg/exchange/max"
|
||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||
"github.com/c9s/bbgo/pkg/service"
|
||||
"github.com/c9s/bbgo/pkg/types"
|
||||
|
@ -232,7 +231,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
|
|||
s.tradingSession.UserDataStream.OnTradeUpdate(s.handleTradeUpdate)
|
||||
|
||||
instanceID := fmt.Sprintf("%s-%s", ID, s.Symbol)
|
||||
s.groupID = max.GenerateGroupID(instanceID)
|
||||
s.groupID = util.FNV32(instanceID)
|
||||
log.Infof("using group id %d from fnv32(%s)", s.groupID, instanceID)
|
||||
|
||||
go func() {
|
||||
|
|
|
@ -9,12 +9,17 @@ import (
|
|||
|
||||
type State struct {
|
||||
CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty"`
|
||||
|
||||
// Deprecated:
|
||||
Position *types.Position `json:"position,omitempty"`
|
||||
|
||||
// Deprecated:
|
||||
ProfitStats ProfitStats `json:"profitStats,omitempty"`
|
||||
}
|
||||
|
||||
type ProfitStats struct {
|
||||
types.ProfitStats
|
||||
*types.ProfitStats
|
||||
|
||||
lock sync.Mutex
|
||||
|
||||
MakerExchange types.ExchangeName `json:"makerExchange"`
|
||||
|
|
|
@ -11,10 +11,8 @@ import (
|
|||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/bbgo"
|
||||
"github.com/c9s/bbgo/pkg/exchange/max"
|
||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||
"github.com/c9s/bbgo/pkg/indicator"
|
||||
"github.com/c9s/bbgo/pkg/service"
|
||||
"github.com/c9s/bbgo/pkg/types"
|
||||
"github.com/c9s/bbgo/pkg/util"
|
||||
)
|
||||
|
@ -99,6 +97,11 @@ type Strategy struct {
|
|||
|
||||
state *State
|
||||
|
||||
// persistence fields
|
||||
Position *types.Position `json:"position,omitempty" persistence:"position"`
|
||||
ProfitStats *ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
|
||||
CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty" persistence:"covered_position"`
|
||||
|
||||
book *types.StreamOrderBook
|
||||
activeMakerOrders *bbgo.LocalActiveOrderBook
|
||||
|
||||
|
@ -120,6 +123,10 @@ func (s *Strategy) ID() string {
|
|||
return ID
|
||||
}
|
||||
|
||||
func (s *Strategy) InstanceID() string {
|
||||
return fmt.Sprintf("%s:%s", ID, s.Symbol)
|
||||
}
|
||||
|
||||
func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
|
||||
sourceSession, ok := sessions[s.SourceExchange]
|
||||
if !ok {
|
||||
|
@ -271,7 +278,7 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
|
|||
// 1. place bid orders when we already bought too much
|
||||
// 2. place ask orders when we already sold too much
|
||||
if s.MaxExposurePosition.Sign() > 0 {
|
||||
pos := s.state.Position.GetBase()
|
||||
pos := s.Position.GetBase()
|
||||
|
||||
if pos.Compare(s.MaxExposurePosition.Neg()) > 0 {
|
||||
// stop sell if we over-sell
|
||||
|
@ -566,9 +573,9 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
|
|||
|
||||
// if it's selling, than we should add positive position
|
||||
if side == types.SideTypeSell {
|
||||
s.state.CoveredPosition = s.state.CoveredPosition.Add(quantity)
|
||||
s.CoveredPosition = s.CoveredPosition.Add(quantity)
|
||||
} else {
|
||||
s.state.CoveredPosition = s.state.CoveredPosition.Add(quantity.Neg())
|
||||
s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg())
|
||||
}
|
||||
|
||||
s.orderStore.Add(returnOrders...)
|
||||
|
@ -594,39 +601,10 @@ func (s *Strategy) LoadState() error {
|
|||
var state State
|
||||
|
||||
// load position
|
||||
if err := s.Persistence.Load(&state, ID, s.Symbol, stateKey); err != nil {
|
||||
if err != service.ErrPersistenceNotExists {
|
||||
return err
|
||||
}
|
||||
|
||||
s.state = &State{}
|
||||
} else {
|
||||
if err := s.Persistence.Load(&state, ID, s.Symbol, stateKey); err == nil {
|
||||
s.state = &state
|
||||
}
|
||||
|
||||
// if position is nil, we need to allocate a new position for calculation
|
||||
if s.state.Position == nil {
|
||||
s.state.Position = types.NewPositionFromMarket(s.makerMarket)
|
||||
}
|
||||
s.state.Position.Market = s.makerMarket
|
||||
|
||||
s.state.ProfitStats.Symbol = s.makerMarket.Symbol
|
||||
s.state.ProfitStats.BaseCurrency = s.makerMarket.BaseCurrency
|
||||
s.state.ProfitStats.QuoteCurrency = s.makerMarket.QuoteCurrency
|
||||
s.state.ProfitStats.MakerExchange = s.makerSession.ExchangeName
|
||||
if s.state.ProfitStats.AccumulatedSince == 0 {
|
||||
s.state.ProfitStats.AccumulatedSince = time.Now().Unix()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Strategy) SaveState() error {
|
||||
if err := s.Persistence.Save(s.state, ID, s.Symbol, stateKey); err != nil {
|
||||
return err
|
||||
} else {
|
||||
log.Infof("%s state is saved => %+v", ID, s.state)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -709,25 +687,54 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
|
|||
}, 1.0)
|
||||
|
||||
// restore state
|
||||
instanceID := fmt.Sprintf("%s-%s", ID, s.Symbol)
|
||||
s.groupID = max.GenerateGroupID(instanceID)
|
||||
instanceID := s.InstanceID()
|
||||
s.groupID = util.FNV32(instanceID)
|
||||
log.Infof("using group id %d from fnv(%s)", s.groupID, instanceID)
|
||||
|
||||
if err := s.LoadState(); err != nil {
|
||||
return err
|
||||
} else {
|
||||
s.Notify("xmaker: %s position is restored", s.Symbol, s.state.Position)
|
||||
s.Notify("xmaker: %s position is restored", s.Symbol, s.Position)
|
||||
}
|
||||
|
||||
if s.Position == nil {
|
||||
if s.state != nil && s.state.Position != nil {
|
||||
s.Position = s.state.Position
|
||||
} else {
|
||||
s.Position = types.NewPositionFromMarket(s.makerMarket)
|
||||
}
|
||||
|
||||
// force update for legacy code
|
||||
s.Position.Market = s.makerMarket
|
||||
}
|
||||
|
||||
if s.ProfitStats == nil {
|
||||
if s.state != nil {
|
||||
p2 := s.state.ProfitStats
|
||||
s.ProfitStats = &p2
|
||||
} else {
|
||||
s.ProfitStats = &ProfitStats{
|
||||
ProfitStats: types.NewProfitStats(s.makerMarket),
|
||||
MakerExchange: s.makerSession.ExchangeName,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if s.CoveredPosition.IsZero() {
|
||||
if s.state != nil && !s.CoveredPosition.IsZero() {
|
||||
s.CoveredPosition = s.state.CoveredPosition
|
||||
}
|
||||
}
|
||||
|
||||
if s.makerSession.MakerFeeRate.Sign() > 0 || s.makerSession.TakerFeeRate.Sign() > 0 {
|
||||
s.state.Position.SetExchangeFeeRate(types.ExchangeName(s.MakerExchange), types.ExchangeFee{
|
||||
s.Position.SetExchangeFeeRate(types.ExchangeName(s.MakerExchange), types.ExchangeFee{
|
||||
MakerFeeRate: s.makerSession.MakerFeeRate,
|
||||
TakerFeeRate: s.makerSession.TakerFeeRate,
|
||||
})
|
||||
}
|
||||
|
||||
if s.sourceSession.MakerFeeRate.Sign() > 0 || s.sourceSession.TakerFeeRate.Sign() > 0 {
|
||||
s.state.Position.SetExchangeFeeRate(types.ExchangeName(s.SourceExchange), types.ExchangeFee{
|
||||
s.Position.SetExchangeFeeRate(types.ExchangeName(s.SourceExchange), types.ExchangeFee{
|
||||
MakerFeeRate: s.sourceSession.MakerFeeRate,
|
||||
TakerFeeRate: s.sourceSession.TakerFeeRate,
|
||||
})
|
||||
|
@ -743,7 +750,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
|
|||
s.orderStore.BindStream(s.sourceSession.UserDataStream)
|
||||
s.orderStore.BindStream(s.makerSession.UserDataStream)
|
||||
|
||||
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.state.Position, s.orderStore)
|
||||
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
|
||||
|
||||
if s.NotifyTrade {
|
||||
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
|
||||
|
@ -754,27 +761,23 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
|
|||
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
|
||||
c := trade.PositionChange()
|
||||
if trade.Exchange == s.sourceSession.ExchangeName {
|
||||
s.state.CoveredPosition = s.state.CoveredPosition.Add(c)
|
||||
s.CoveredPosition = s.CoveredPosition.Add(c)
|
||||
}
|
||||
|
||||
s.state.ProfitStats.AddTrade(trade)
|
||||
s.ProfitStats.AddTrade(trade)
|
||||
|
||||
if profit.Compare(fixedpoint.Zero) == 0 {
|
||||
s.Environment.RecordPosition(s.state.Position, trade, nil)
|
||||
s.Environment.RecordPosition(s.Position, trade, nil)
|
||||
} else {
|
||||
log.Infof("%s generated profit: %v", s.Symbol, profit)
|
||||
|
||||
p := s.state.Position.NewProfit(trade, profit, netProfit)
|
||||
p := s.Position.NewProfit(trade, profit, netProfit)
|
||||
p.Strategy = ID
|
||||
p.StrategyInstanceID = instanceID
|
||||
s.Notify(&p)
|
||||
s.state.ProfitStats.AddProfit(p)
|
||||
s.ProfitStats.AddProfit(p)
|
||||
|
||||
s.Environment.RecordPosition(s.state.Position, trade, &p)
|
||||
}
|
||||
|
||||
if err := s.SaveState(); err != nil {
|
||||
log.WithError(err).Error("save state error")
|
||||
s.Environment.RecordPosition(s.Position, trade, &p)
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -825,7 +828,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
|
|||
s.updateQuote(ctx, orderExecutionRouter)
|
||||
|
||||
case <-reportTicker.C:
|
||||
s.Notifiability.Notify(&s.state.ProfitStats)
|
||||
s.Notifiability.Notify(&s.ProfitStats)
|
||||
|
||||
case <-tradeScanTicker.C:
|
||||
log.Infof("scanning trades from %s ago...", tradeScanInterval)
|
||||
|
@ -847,15 +850,15 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
|
|||
// uncover position = -5 - -3 (covered position) = -2
|
||||
s.tradeCollector.Process()
|
||||
|
||||
position := s.state.Position.GetBase()
|
||||
position := s.Position.GetBase()
|
||||
|
||||
uncoverPosition := position.Sub(s.state.CoveredPosition)
|
||||
uncoverPosition := position.Sub(s.CoveredPosition)
|
||||
absPos := uncoverPosition.Abs()
|
||||
if !s.DisableHedge && absPos.Compare(s.sourceMarket.MinQuantity) > 0 {
|
||||
log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v",
|
||||
s.Symbol,
|
||||
position,
|
||||
s.state.CoveredPosition,
|
||||
s.CoveredPosition,
|
||||
uncoverPosition,
|
||||
)
|
||||
|
||||
|
@ -880,11 +883,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
|
|||
log.WithError(err).Errorf("graceful cancel error")
|
||||
}
|
||||
|
||||
if err := s.SaveState(); err != nil {
|
||||
log.WithError(err).Errorf("can not save state: %+v", s.state)
|
||||
} else {
|
||||
s.Notify("%s: %s position is saved", ID, s.Symbol, s.state.Position)
|
||||
}
|
||||
s.Notify("%s: %s position", ID, s.Symbol, s.Position)
|
||||
})
|
||||
|
||||
return nil
|
||||
|
|
|
@ -218,6 +218,16 @@ type ProfitStats struct {
|
|||
TodaySince int64 `json:"todaySince,omitempty"`
|
||||
}
|
||||
|
||||
func NewProfitStats(market Market) *ProfitStats {
|
||||
return &ProfitStats{
|
||||
Symbol: market.Symbol,
|
||||
BaseCurrency: market.BaseCurrency,
|
||||
QuoteCurrency: market.QuoteCurrency,
|
||||
AccumulatedSince: time.Now().Unix(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func (s *ProfitStats) Init(market Market) {
|
||||
s.Symbol = market.Symbol
|
||||
s.BaseCurrency = market.BaseCurrency
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
package max
|
||||
package util
|
||||
|
||||
import "hash/fnv"
|
||||
|
||||
func GenerateGroupID(s string) uint32 {
|
||||
func FNV32(s string) uint32 {
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(s))
|
||||
return h.Sum32()
|
Loading…
Reference in New Issue
Block a user