mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-10 09:11:55 +00:00
refactor hot reloader and market data store
This commit is contained in:
parent
3e573ebb10
commit
afcde6827f
|
@ -11,41 +11,40 @@ var Interval5m = Interval("5m")
|
|||
var Interval1h = Interval("1h")
|
||||
var Interval1d = Interval("1d")
|
||||
|
||||
type KLineStore struct {
|
||||
// MaxChanges stores the max change kline per interval
|
||||
MaxChanges map[Interval]types.KLine `json:"-"`
|
||||
type MarketDataStore struct {
|
||||
// MaxChangeKLines stores the max change kline per interval
|
||||
MaxChangeKLines map[Interval]types.KLine `json:"-"`
|
||||
|
||||
// Windows stores all loaded klines per interval
|
||||
Windows map[Interval]types.KLineWindow `json:"-"`
|
||||
// KLineWindows stores all loaded klines per interval
|
||||
KLineWindows map[Interval]types.KLineWindow `json:"-"`
|
||||
}
|
||||
|
||||
func NewKLineStore() *KLineStore {
|
||||
return &KLineStore{
|
||||
MaxChanges: make(map[Interval]types.KLine),
|
||||
func NewMarketDataStore() *MarketDataStore {
|
||||
return &MarketDataStore{
|
||||
MaxChangeKLines: make(map[Interval]types.KLine),
|
||||
|
||||
// Windows stores all loaded klines per interval
|
||||
Windows: make(map[Interval]types.KLineWindow),
|
||||
// KLineWindows stores all loaded klines per interval
|
||||
KLineWindows: make(map[Interval]types.KLineWindow),
|
||||
}
|
||||
}
|
||||
|
||||
func (store *KLineStore) BindPrivateStream(stream *types.StandardPrivateStream) {
|
||||
func (store *MarketDataStore) BindPrivateStream(stream *types.StandardPrivateStream) {
|
||||
stream.OnKLineClosed(store.handleKLineClosed)
|
||||
}
|
||||
|
||||
func (store *KLineStore) handleKLineClosed(kline *types.KLine) {
|
||||
func (store *MarketDataStore) handleKLineClosed(kline *types.KLine) {
|
||||
store.AddKLine(*kline)
|
||||
}
|
||||
|
||||
func (store *KLineStore) AddKLine(kline types.KLine) {
|
||||
func (store *MarketDataStore) AddKLine(kline types.KLine) {
|
||||
var interval = Interval(kline.Interval)
|
||||
|
||||
var window = store.Windows[interval]
|
||||
var window = store.KLineWindows[interval]
|
||||
window.Add(kline)
|
||||
|
||||
|
||||
if _, ok := store.MaxChanges[interval] ; ok {
|
||||
if kline.GetMaxChange() > store.MaxChanges[interval].GetMaxChange() {
|
||||
store.MaxChanges[interval] = kline
|
||||
if _, ok := store.MaxChangeKLines[interval] ; ok {
|
||||
if kline.GetMaxChange() > store.MaxChangeKLines[interval].GetMaxChange() {
|
||||
store.MaxChangeKLines[interval] = kline
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,9 +6,11 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/jmoiron/sqlx"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/bbgo/config"
|
||||
"github.com/c9s/bbgo/pkg/bbgo/service"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/bbgo/exchange/binance"
|
||||
|
@ -136,6 +138,82 @@ func (trader *Trader) Initialize(ctx context.Context, startTime time.Time) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func (trader *Trader) RunStrategyWithHotReload(ctx context.Context, strategy Strategy, configFile string) (chan struct{}, error) {
|
||||
var done = make(chan struct{})
|
||||
var configWatcherDone = make(chan struct{})
|
||||
|
||||
log.Infof("watching config file: %v", configFile)
|
||||
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer watcher.Close()
|
||||
|
||||
if err := watcher.Add(configFile); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
go func() {
|
||||
strategyContext, strategyCancel := context.WithCancel(ctx)
|
||||
defer strategyCancel()
|
||||
defer close(done)
|
||||
|
||||
traderDone, err := trader.RunStrategy(strategyContext, strategy)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var configReloadTimer *time.Timer = nil
|
||||
defer close(configWatcherDone)
|
||||
|
||||
for {
|
||||
select {
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-traderDone:
|
||||
log.Infof("reloading config file %s", configFile)
|
||||
if err := config.LoadConfigFile(configFile, strategy); err != nil {
|
||||
log.WithError(err).Error("error load config file")
|
||||
}
|
||||
|
||||
trader.Notifier.Notify("config reloaded, restarting trader")
|
||||
|
||||
traderDone, err = trader.RunStrategy(strategyContext, strategy)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("[trader] error:", err)
|
||||
return
|
||||
}
|
||||
|
||||
case event := <-watcher.Events:
|
||||
log.Infof("[fsnotify] event: %+v", event)
|
||||
|
||||
if event.Op&fsnotify.Write == fsnotify.Write {
|
||||
log.Info("[fsnotify] modified file:", event.Name)
|
||||
}
|
||||
|
||||
if configReloadTimer != nil {
|
||||
configReloadTimer.Stop()
|
||||
}
|
||||
|
||||
configReloadTimer = time.AfterFunc(3*time.Second, func() {
|
||||
strategyCancel()
|
||||
})
|
||||
|
||||
case err := <-watcher.Errors:
|
||||
log.WithError(err).Error("[fsnotify] error:", err)
|
||||
return
|
||||
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return done, nil
|
||||
}
|
||||
|
||||
func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan struct{}, error) {
|
||||
if err := strategy.Load(trader.Context, trader); err != nil {
|
||||
return nil, err
|
||||
|
@ -147,7 +225,7 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan
|
|||
}
|
||||
|
||||
// bind kline store to the stream
|
||||
klineStore := NewKLineStore()
|
||||
klineStore := NewMarketDataStore()
|
||||
klineStore.BindPrivateStream(&stream.StandardPrivateStream)
|
||||
|
||||
trader.Account.BindPrivateStream(stream)
|
||||
|
|
Loading…
Reference in New Issue
Block a user