Merge pull request #1470 from c9s/narumi/xnav/schedule

FEATURE: [xnav] add cron schedule
This commit is contained in:
c9s 2024-01-03 16:38:19 +08:00 committed by GitHub
commit 3dca9aaf98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 26 deletions

View File

@ -30,6 +30,7 @@ crossExchangeStrategies:
- xnav:
interval: 1h
# schedule: "0 * * * *" # every hour
reportOnStart: true
ignoreDusts: true

View File

@ -2,19 +2,19 @@ package xnav
import (
"context"
"fmt"
"sync"
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/util/templateutil"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/slack-go/slack"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/templateutil"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"github.com/slack-go/slack"
)
const ID = "xnav"
@ -59,16 +59,30 @@ type Strategy struct {
*bbgo.Environment
Interval types.Interval `json:"interval"`
Schedule string `json:"schedule"`
ReportOnStart bool `json:"reportOnStart"`
IgnoreDusts bool `json:"ignoreDusts"`
State *State `persistence:"state"`
cron *cron.Cron
}
func (s *Strategy) ID() string {
return ID
}
func (s *Strategy) Initialize() error {
return nil
}
func (s *Strategy) Validate() error {
if s.Interval == "" && s.Schedule == "" {
return fmt.Errorf("interval or schedule is required")
}
return nil
}
var Ten = fixedpoint.NewFromInt(10)
func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {}
@ -138,10 +152,6 @@ func (s *Strategy) recordNetAssetValue(ctx context.Context, sessions map[string]
}
func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error {
if s.Interval == "" {
return errors.New("interval can not be empty")
}
if s.State == nil {
s.State = &State{}
s.State.Reset()
@ -161,25 +171,32 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
log.Warnf("xnav does not support backtesting")
}
// TODO: if interval is supported, we can use kline as the ticker
if _, ok := types.SupportedIntervals[s.Interval]; ok {
if s.Interval != "" {
go func() {
ticker := time.NewTicker(util.MillisecondsJitter(s.Interval.Duration(), 1000))
defer ticker.Stop()
}
for {
select {
case <-ctx.Done():
return
go func() {
ticker := time.NewTicker(util.MillisecondsJitter(s.Interval.Duration(), 1000))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.recordNetAssetValue(ctx, sessions)
case <-ticker.C:
s.recordNetAssetValue(ctx, sessions)
}
}
}()
} else if s.Schedule != "" {
s.cron = cron.New()
_, err := s.cron.AddFunc(s.Schedule, func() {
s.recordNetAssetValue(ctx, sessions)
})
if err != nil {
return err
}
}()
s.cron.Start()
}
return nil
}