add warnings and fix subscription

This commit is contained in:
c9s 2020-12-03 09:26:10 +08:00
parent 4f399ebb9f
commit 2b264905f9

View File

@ -4,14 +4,17 @@ import (
"context" "context"
"fmt" "fmt"
"strings" "strings"
"sync"
log "github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
var log = logrus.WithField("strategy", "movingstop")
// The indicators (SMA and EWMA) that we want to use are returning float64 data. // The indicators (SMA and EWMA) that we want to use are returning float64 data.
type Float64Indicator interface { type Float64Indicator interface {
Last() float64 Last() float64
@ -25,6 +28,8 @@ func init() {
} }
type Strategy struct { type Strategy struct {
*bbgo.Graceful
SourceExchangeName string `json:"sourceExchange"` SourceExchangeName string `json:"sourceExchange"`
TargetExchangeName string `json:"targetExchange"` TargetExchangeName string `json:"targetExchange"`
@ -61,6 +66,7 @@ type Strategy struct {
func (s *Strategy) Subscribe(sessions map[string]*bbgo.ExchangeSession) { func (s *Strategy) Subscribe(sessions map[string]*bbgo.ExchangeSession) {
sourceSession := sessions[s.SourceExchangeName] sourceSession := sessions[s.SourceExchangeName]
sourceSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval.String()}) sourceSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval.String()})
sourceSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.MovingAverageInterval.String()})
// make sure we have the connection alive // make sure we have the connection alive
targetSession := sessions[s.TargetExchangeName] targetSession := sessions[s.TargetExchangeName]
@ -83,15 +89,17 @@ func (s *Strategy) place(ctx context.Context, orderExecutor *bbgo.ExchangeOrderE
// skip it if it's near zero because it's not loaded yet // skip it if it's near zero because it's not loaded yet
if movingAveragePrice < 0.0001 { if movingAveragePrice < 0.0001 {
log.Warn("moving average price is near 0: %f", movingAveragePrice)
return return
} }
// place stop limit order only when the closed price is greater than the moving average price // place stop limit order only when the closed price is greater than the moving average price
if closePrice <= movingAveragePrice { if closePrice <= movingAveragePrice {
log.Warnf("close price %f is less than moving average price %f", closePrice, movingAveragePrice)
return return
} }
var price float64 = 0.0 var price = 0.0
var orderType = types.OrderTypeStopMarket var orderType = types.OrderTypeStopMarket
switch strings.ToLower(s.OrderType) { switch strings.ToLower(s.OrderType) {
@ -158,6 +166,9 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutionRouter, session
} }
lastPrice, _ := session.LastPrice(s.Symbol)
s.place(ctx, &orderExecutor, indicator, lastPrice)
session.Stream.OnOrderUpdate(s.handleOrderUpdate) session.Stream.OnOrderUpdate(s.handleOrderUpdate)
// session.Stream.OnKLineClosed // session.Stream.OnKLineClosed
@ -175,8 +186,14 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutionRouter, session
// ok, it's our call, we need to cancel the stop limit order first // ok, it's our call, we need to cancel the stop limit order first
s.clear(ctx, session) s.clear(ctx, session)
s.place(ctx, &orderExecutor, indicator, closePrice) s.place(ctx, &orderExecutor, indicator, closePrice)
}) })
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
log.Infof("canceling movingstop order...")
s.clear(ctx, session)
})
return nil return nil
} }