refactor environment, market data store, injection and add swing strategy

This commit is contained in:
c9s 2020-10-28 16:27:25 +08:00
parent 90ca829915
commit 2680ad5072
15 changed files with 410 additions and 106 deletions

51
config/swing.yaml Normal file
View File

@ -0,0 +1,51 @@
---
notifications:
slack:
defaultChannel: "#dev-bbgo"
errorChannel: "#bbgo-error"
# if you want to route channel by symbol
symbolChannels:
"^BTC": "#btc"
"^ETH": "#eth"
"^BNB": "#bnb"
# object routing rules
routing:
trade: "$symbol"
order: "$symbol"
submitOrder: "$session" # not supported yet
pnL: "#bbgo-pnl"
sessions:
binance:
exchange: binance
envVarPrefix: binance
riskControls:
# This is the session-based risk controller, which let you configure different risk controller by session.
sessionBased:
# "max" is the session name that you want to configure the risk control
binance:
# orderExecutors is one of the risk control
orderExecutors:
# symbol-routed order executor
bySymbol:
BNBUSDT:
# basic risk control order executor
basic:
minQuoteBalance: 1000.0
maxBaseAssetBalance: 50.0
minBaseAssetBalance: 10.0
maxOrderAmount: 100.0
exchangeStrategies:
- on: binance
swing:
symbol: BNBUSDT
interval: 1m
minChange: 0.01
baseQuantity: 1.0
movingAverageType: EWMA
movingAverageInterval: 1m
movingAverageWindow: 99

View File

@ -11,7 +11,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/store"
"github.com/c9s/bbgo/pkg/types"
)
@ -59,8 +58,10 @@ func (environ *Environment) AddExchange(name string, exchange types.Exchange) (s
return session
}
// Init prepares the data that will be used by the strategies
func (environ *Environment) Init(ctx context.Context) (err error) {
for _, session := range environ.sessions {
for n := range environ.sessions {
var session = environ.sessions[n]
var markets types.MarketMap
err = WithCache(fmt.Sprintf("%s-markets", session.Exchange.Name()), &markets, func() (interface{}, error) {
@ -75,37 +76,9 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
}
session.markets = markets
}
return nil
}
// SyncTradesFrom overrides the default trade scan time (-7 days)
func (environ *Environment) SyncTradesFrom(t time.Time) *Environment {
environ.tradeScanTime = t
return environ
}
func (environ *Environment) Connect(ctx context.Context) error {
var err error
for n := range environ.sessions {
// avoid using the placeholder variable for the session because we use that in the callbacks
var session = environ.sessions[n]
var log = log.WithField("session", n)
loadedSymbols := make(map[string]struct{})
for _, s := range session.Subscriptions {
symbol := strings.ToUpper(s.Symbol)
loadedSymbols[symbol] = struct{}{}
log.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options)
session.Stream.Subscribe(s.Channel, s.Symbol, s.Options)
}
// trade sync and market data store depends on subscribed symbols so we have to do this here.
for symbol := range loadedSymbols {
for symbol := range session.loadedSymbols {
var trades []types.Trade
if environ.TradeSync != nil {
@ -130,14 +103,14 @@ func (environ *Environment) Connect(ctx context.Context) error {
session.Trades[symbol] = trades
currentPrice, err := session.Exchange.QueryAveragePrice(ctx, symbol)
averagePrice, err := session.Exchange.QueryAveragePrice(ctx, symbol)
if err != nil {
return err
}
session.lastPrices[symbol] = currentPrice
session.lastPrices[symbol] = averagePrice
marketDataStore := store.NewMarketDataStore(symbol)
marketDataStore := NewMarketDataStore(symbol)
marketDataStore.BindStream(session.Stream)
session.marketDataStores[symbol] = marketDataStore
@ -146,11 +119,8 @@ func (environ *Environment) Connect(ctx context.Context) error {
session.standardIndicatorSets[symbol] = standardIndicatorSet
}
// move market data store dispatch to here, use one callback to dispatch the market data
// session.Stream.OnKLineClosed(func(kline types.KLine) { })
now := time.Now()
for symbol := range loadedSymbols {
for symbol := range session.loadedSymbols {
marketDataStore, ok := session.marketDataStores[symbol]
if !ok {
return errors.Errorf("symbol %s is not defined", symbol)
@ -197,12 +167,37 @@ func (environ *Environment) Connect(ctx context.Context) error {
}
})
// move market data store dispatch to here, use one callback to dispatch the market data
// session.Stream.OnKLineClosed(func(kline types.KLine) { })
}
return nil
}
// SyncTradesFrom overrides the default trade scan time (-7 days)
func (environ *Environment) SyncTradesFrom(t time.Time) *Environment {
environ.tradeScanTime = t
return environ
}
func (environ *Environment) Connect(ctx context.Context) error {
for n := range environ.sessions {
// avoid using the placeholder variable for the session because we use that in the callbacks
var session = environ.sessions[n]
var logger = log.WithField("session", n)
if len(session.Subscriptions) == 0 {
log.Warnf("no subscriptions, exchange session %s will not be connected", session.Name)
logger.Warnf("no subscriptions, exchange session %s will not be connected", session.Name)
continue
}
log.Infof("connecting session %s...", session.Name)
// add the subscribe requests to the stream
for _, s := range session.Subscriptions {
logger.Infof("subscribing %s %s %v", s.Symbol, s.Channel, s.Options)
session.Stream.Subscribe(s.Channel, s.Symbol, s.Options)
}
logger.Infof("connecting session %s...", session.Name)
if err := session.Stream.Connect(ctx); err != nil {
return err
}

View File

@ -7,16 +7,34 @@ import (
"github.com/sirupsen/logrus"
)
func injectStrategyField(strategy SingleExchangeStrategy, rs reflect.Value, fieldName string, obj interface{}) error {
func isSymbolBasedStrategy(rs reflect.Value) (string, bool) {
field := rs.FieldByName("Symbol")
if !field.IsValid() {
return "", false
}
if field.Kind() != reflect.String {
return "", false
}
return field.String(), true
}
func hasField(rs reflect.Value, fieldName string) bool {
field := rs.FieldByName(fieldName)
return field.IsValid()
}
func injectField(rs reflect.Value, fieldName string, obj interface{}) error {
field := rs.FieldByName(fieldName)
if !field.IsValid() {
return nil
}
logrus.Infof("found %s in strategy %T, injecting %T...", fieldName, strategy, obj)
logrus.Infof("found %s in %T, injecting %T...", fieldName, rs.Type(), obj)
if !field.CanSet() {
return errors.Errorf("field %s of strategy %T can not be set", fieldName, strategy)
return errors.Errorf("field %s of %T can not be set", fieldName, rs.Type())
}
rv := reflect.ValueOf(obj)

View File

@ -1,9 +1,8 @@
package store
package bbgo
import (
"github.com/c9s/bbgo/pkg/types"
)
import "github.com/c9s/bbgo/pkg/types"
// MarketDataStore receives and maintain the public market data
//go:generate callbackgen -type MarketDataStore
type MarketDataStore struct {
Symbol string
@ -13,7 +12,7 @@ type MarketDataStore struct {
LastKLine types.KLine
kLineUpdateCallbacks []func(kline types.KLine)
kLineWindowUpdateCallbacks []func(interval types.Interval, kline types.KLineWindow)
orderBook *types.StreamOrderBook
@ -86,5 +85,5 @@ func (store *MarketDataStore) AddKLine(kline types.KLine) {
store.LastKLine = kline
store.EmitKLineUpdate(kline)
store.EmitKLineWindowUpdate(kline.Interval, window)
}

View File

@ -1,18 +1,18 @@
// Code generated by "callbackgen -type MarketDataStore"; DO NOT EDIT.
package store
package bbgo
import (
"github.com/c9s/bbgo/pkg/types"
)
func (store *MarketDataStore) OnKLineUpdate(cb func(kline types.KLine)) {
store.kLineUpdateCallbacks = append(store.kLineUpdateCallbacks, cb)
func (store *MarketDataStore) OnKLineWindowUpdate(cb func(interval types.Interval, kline types.KLineWindow)) {
store.kLineWindowUpdateCallbacks = append(store.kLineWindowUpdateCallbacks, cb)
}
func (store *MarketDataStore) EmitKLineUpdate(kline types.KLine) {
for _, cb := range store.kLineUpdateCallbacks {
cb(kline)
func (store *MarketDataStore) EmitKLineWindowUpdate(interval types.Interval, kline types.KLineWindow) {
for _, cb := range store.kLineWindowUpdateCallbacks {
cb(interval, kline)
}
}

View File

@ -62,8 +62,8 @@ type BasicRiskControlOrderExecutor struct {
func (e *BasicRiskControlOrderExecutor) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) ([]types.Order, error) {
var formattedOrders []types.SubmitOrder
for _, order := range orders {
currentPrice, ok := e.session.lastPrices[order.Symbol]
if ok {
currentPrice, ok := e.session.LastPrice(order.Symbol)
if !ok {
return nil, errors.Errorf("the last price of symbol %q is not found", order.Symbol)
}

View File

@ -2,7 +2,6 @@ package bbgo
import (
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/store"
"github.com/c9s/bbgo/pkg/types"
)
@ -46,7 +45,7 @@ func NewStandardIndicatorSet(symbol string) *StandardIndicatorSet {
return set
}
func (set *StandardIndicatorSet) BindMarketDataStore(store *store.MarketDataStore) {
func (set *StandardIndicatorSet) BindMarketDataStore(store *MarketDataStore) {
for _, inc := range set.SMA {
inc.BindMarketDataStore(store)
}
@ -82,12 +81,14 @@ type ExchangeSession struct {
Trades map[string][]types.Trade
// marketDataStores contains the market data store of each market
marketDataStores map[string]*store.MarketDataStore
marketDataStores map[string]*MarketDataStore
// standard indicators of each market
standardIndicatorSets map[string]*StandardIndicatorSet
tradeReporter *TradeReporter
loadedSymbols map[string]struct{}
}
func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession {
@ -95,12 +96,16 @@ func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession {
Name: name,
Exchange: exchange,
Stream: exchange.NewStream(),
Account: &types.Account{},
Subscriptions: make(map[types.Subscription]types.Subscription),
markets: make(map[string]types.Market),
Account: &types.Account{},
Trades: make(map[string][]types.Trade),
markets: make(map[string]types.Market),
lastPrices: make(map[string]float64),
marketDataStores: make(map[string]*store.MarketDataStore),
marketDataStores: make(map[string]*MarketDataStore),
standardIndicatorSets: make(map[string]*StandardIndicatorSet),
loadedSymbols: make(map[string]struct{}),
}
}
@ -110,7 +115,7 @@ func (session *ExchangeSession) StandardIndicatorSet(symbol string) (*StandardIn
}
// MarketDataStore returns the market data store of a symbol
func (session *ExchangeSession) MarketDataStore(symbol string) (s *store.MarketDataStore, ok bool) {
func (session *ExchangeSession) MarketDataStore(symbol string) (s *MarketDataStore, ok bool) {
s, ok = session.marketDataStores[symbol]
return s, ok
}
@ -138,6 +143,8 @@ func (session *ExchangeSession) Subscribe(channel types.Channel, symbol string,
Options: options,
}
// add to the loaded symbol table
session.loadedSymbols[symbol] = struct{}{}
session.Subscriptions[sub] = sub
return session
}

View File

@ -27,6 +27,10 @@ type SingleExchangeStrategy interface {
Run(ctx context.Context, orderExecutor OrderExecutor, session *ExchangeSession) error
}
type ExchangeSessionSubscriber interface {
Subscribe(session *ExchangeSession)
}
type CrossExchangeStrategy interface {
Run(ctx context.Context, orderExecutionRouter OrderExecutionRouter, sessions map[string]*ExchangeSession) error
}
@ -85,14 +89,24 @@ func (trader *Trader) SetRiskControls(riskControls *RiskControls) {
}
func (trader *Trader) Run(ctx context.Context) error {
// pre-subscribe the data
for sessionName, strategies := range trader.exchangeStrategies {
session := trader.environment.sessions[sessionName]
for _, strategy := range strategies {
if subscriber, ok := strategy.(ExchangeSessionSubscriber); ok {
subscriber.Subscribe(session)
}
}
}
if err := trader.environment.Init(ctx); err != nil {
return err
}
// load and run session strategies
for sessionName, strategies := range trader.exchangeStrategies {
session := trader.environment.sessions[sessionName]
// session based trade reporter
for sessionName := range trader.environment.sessions {
var session = trader.environment.sessions[sessionName]
if session.tradeReporter != nil {
session.Stream.OnTrade(func(trade types.Trade) {
session.tradeReporter.Report(trade)
@ -102,6 +116,11 @@ func (trader *Trader) Run(ctx context.Context) error {
trader.tradeReporter.Report(trade)
})
}
}
// load and run session strategies
for sessionName, strategies := range trader.exchangeStrategies {
var session = trader.environment.sessions[sessionName]
var baseOrderExecutor = &ExchangeOrderExecutor{
// copy the parent notifiers and session
@ -133,12 +152,32 @@ func (trader *Trader) Run(ctx context.Context) error {
// get the struct element
rs = rs.Elem()
if err := injectStrategyField(strategy, rs, "Notifiability", &trader.Notifiability); err != nil {
log.WithError(err).Errorf("strategy notifiability injection failed")
if err := injectField(rs, "Notifiability", &trader.Notifiability); err != nil {
log.WithError(err).Errorf("strategy Notifiability injection failed")
}
if err := injectStrategyField(strategy, rs, "OrderExecutor", orderExecutor); err != nil {
log.WithError(err).Errorf("strategy orderExecutor injection failed")
if err := injectField(rs, "OrderExecutor", orderExecutor); err != nil {
log.WithError(err).Errorf("strategy OrderExecutor injection failed")
}
if symbol, ok := isSymbolBasedStrategy(rs); ok {
log.Infof("found symbol based strategy from %T", rs.Type())
if hasField(rs, "Market") {
if market, ok := session.Market(symbol); ok {
// let's make the market object passed by pointer
if err := injectField(rs, "Market", &market); err != nil {
log.WithError(err).Errorf("strategy Market injection failed")
}
}
}
if hasField(rs, "MarketDataStore") {
if store, ok := session.MarketDataStore(symbol); ok {
if err := injectField(rs, "MarketDataStore", store); err != nil {
log.WithError(err).Errorf("strategy MarketDataStore injection failed")
}
}
}
}
}

9
pkg/cmd/builtin.go Normal file
View File

@ -0,0 +1,9 @@
package cmd
// import built-in strategies
import (
_ "github.com/c9s/bbgo/pkg/strategy/buyandhold"
_ "github.com/c9s/bbgo/pkg/strategy/pricealert"
_ "github.com/c9s/bbgo/pkg/strategy/swing"
_ "github.com/c9s/bbgo/pkg/strategy/xpuremaker"
)

View File

@ -25,10 +25,6 @@ import (
"github.com/c9s/bbgo/pkg/slack/slacklog"
"github.com/c9s/bbgo/pkg/types"
// import built-in strategies
_ "github.com/c9s/bbgo/pkg/strategy/buyandhold"
_ "github.com/c9s/bbgo/pkg/strategy/pricealert"
_ "github.com/c9s/bbgo/pkg/strategy/xpuremaker"
)
var errSlackTokenUndefined = errors.New("slack token is not defined.")

View File

@ -3,7 +3,6 @@ package indicator
import (
"time"
"github.com/c9s/bbgo/pkg/store"
"github.com/c9s/bbgo/pkg/types"
)
@ -14,6 +13,10 @@ type EWMA struct {
EndTime time.Time
}
func (inc *EWMA) Last() float64 {
return inc.Values[len(inc.Values)-1]
}
func (inc *EWMA) calculateAndUpdate(kLines []types.KLine) {
if len(kLines) < inc.Window {
// we can't calculate
@ -42,10 +45,13 @@ func (inc *EWMA) calculateAndUpdate(kLines []types.KLine) {
inc.EndTime = kLines[index].EndTime
}
func (inc *EWMA) BindMarketDataStore(store *store.MarketDataStore) {
store.OnKLineUpdate(func(kline types.KLine) {
// kline guard
if inc.Interval != kline.Interval {
type KLineWindowUpdater interface {
OnKLineWindowUpdate(func(interval types.Interval, window types.KLineWindow))
}
func (inc *EWMA) BindMarketDataStore(updater KLineWindowUpdater) {
updater.OnKLineWindowUpdate(func(interval types.Interval, window types.KLineWindow) {
if inc.Interval != interval {
return
}
@ -53,8 +59,6 @@ func (inc *EWMA) BindMarketDataStore(store *store.MarketDataStore) {
return
}
if kLines, ok := store.KLinesOfInterval(types.Interval(kline.Interval)); ok {
inc.calculateAndUpdate(kLines)
}
inc.calculateAndUpdate(window)
})
}

View File

@ -3,7 +3,6 @@ package indicator
import (
"time"
"github.com/c9s/bbgo/pkg/store"
"github.com/c9s/bbgo/pkg/types"
)
@ -22,6 +21,10 @@ type SMA struct {
EndTime time.Time
}
func (inc *SMA) Last() float64 {
return inc.Values[len(inc.Values)-1]
}
func (inc *SMA) calculateAndUpdate(kLines []types.KLine) {
if len(kLines) < inc.Window {
return
@ -40,16 +43,17 @@ func (inc *SMA) calculateAndUpdate(kLines []types.KLine) {
inc.EndTime = kLines[index].EndTime
}
func (inc *SMA) BindMarketDataStore(store *store.MarketDataStore) {
store.OnKLineUpdate(func(kline types.KLine) {
// kline guard
if inc.Interval != kline.Interval {
func (inc *SMA) BindMarketDataStore(updater KLineWindowUpdater) {
updater.OnKLineWindowUpdate(func(interval types.Interval, window types.KLineWindow) {
if inc.Interval != interval {
return
}
if kLines, ok := store.KLinesOfInterval(kline.Interval); ok {
inc.calculateAndUpdate(kLines)
if inc.EndTime != zeroTime && inc.EndTime.Before(inc.EndTime) {
return
}
inc.calculateAndUpdate(window)
})
}

View File

@ -25,11 +25,21 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
session.Stream.OnKLine(func(kline types.KLine) {
// skip k-lines from other symbols
if kline.Symbol != s.Symbol {
return
}
changePercentage := kline.GetChange() / kline.Open
log.Infof("change %f <=> %f", changePercentage, s.MinDropPercentage)
})
session.Stream.OnKLineClosed(func(kline types.KLine) {
// skip k-lines from other symbols
if kline.Symbol != s.Symbol {
return
}
changePercentage := kline.GetChange() / kline.Open
if changePercentage > s.MinDropPercentage {

View File

@ -0,0 +1,159 @@
package swing
import (
"context"
"math"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/types"
)
func init() {
bbgo.RegisterExchangeStrategy("swing", &Strategy{})
}
type Strategy struct {
// The notification system will be injected into the strategy automatically.
*bbgo.Notifiability
*bbgo.MarketDataStore
*types.Market
bbgo.OrderExecutor
// These fields will be filled from the config file (it translates YAML to JSON)
Symbol string `json:"symbol"`
Interval string `json:"interval"`
MinChange float64 `json:"minChange"`
BaseQuantity float64 `json:"baseQuantity"`
MovingAverageType string `json:"movingAverageType"`
MovingAverageInterval types.Interval `json:"movingAverageInterval"`
MovingAverageWindow int `json:"movingAverageWindow"`
}
type Float64Indicator interface {
Last() float64
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
market, ok := session.Market(s.Symbol)
if !ok {
return errors.Errorf("market config of %s is not configured", s.Symbol)
}
marketDataStore, ok := session.MarketDataStore(s.Symbol)
if !ok {
return errors.Errorf("market data store of %s is not configured", s.Symbol)
}
indicatorSet, ok := session.StandardIndicatorSet(s.Symbol)
if !ok {
return errors.Errorf("indicatorSet of %s is not configured", s.Symbol)
}
var inc Float64Indicator
var iw = bbgo.IntervalWindow{Interval: s.MovingAverageInterval, Window: s.MovingAverageWindow}
switch s.MovingAverageType {
case "SMA":
inc, ok = indicatorSet.SMA[iw]
if !ok {
inc := &indicator.SMA{
Interval: iw.Interval,
Window: iw.Window,
}
inc.BindMarketDataStore(marketDataStore)
indicatorSet.SMA[iw] = inc
}
case "EWMA", "EMA":
inc, ok = indicatorSet.EWMA[iw]
if !ok {
inc := &indicator.EWMA{
Interval: iw.Interval,
Window: iw.Window,
}
inc.BindMarketDataStore(marketDataStore)
indicatorSet.EWMA[iw] = inc
}
default:
return errors.Errorf("unsupported moving average type: %s", s.MovingAverageType)
}
session.Stream.OnKLine(func(kline types.KLine) {
// skip k-lines from other symbols
if kline.Symbol != s.Symbol {
return
}
movingAveragePrice := inc.Last()
// skip it if it's near zero
if movingAveragePrice < 0.0001 {
return
}
// skip if the change is not above the minChange
if math.Abs(kline.GetChange()) < s.MinChange {
return
}
closePrice := kline.Close
changePercentage := kline.GetChange() / kline.Open
quantity := s.BaseQuantity * (1.0 + math.Abs(changePercentage))
trend := kline.GetTrend()
switch trend {
case 1:
// if it goes up and it's above the moving average price, then we sell
if closePrice > movingAveragePrice {
s.notify(":chart_with_upwards_trend: closePrice %f is above movingAveragePrice %f, submitting sell order", closePrice, movingAveragePrice)
_, err := orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,
Market: market,
Side: types.SideTypeSell,
Type: types.OrderTypeMarket,
Quantity: quantity,
})
if err != nil {
log.WithError(err).Error("submit order error")
}
}
case -1:
// if it goes down and it's below the moving average price, then we buy
if closePrice < movingAveragePrice {
s.notify(":chart_with_downwards_trend: closePrice %f is below movingAveragePrice %f, submitting buy order", closePrice, movingAveragePrice)
_, err := orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,
Market: market,
Side: types.SideTypeBuy,
Type: types.OrderTypeMarket,
Quantity: quantity,
})
if err != nil {
log.WithError(err).Error("submit order error")
}
}
}
})
return nil
}
func (s *Strategy) notify(format string, args ...interface{}) {
if channel, ok := s.RouteSymbol(s.Symbol); ok {
s.NotifyTo(channel, format, args...)
} else {
s.Notify(format, args...)
}
}

View File

@ -1,7 +1,20 @@
package types
import "encoding/json"
type Interval string
func (i *Interval) UnmarshalJSON(b []byte) (err error) {
var a string
err = json.Unmarshal(b, &a)
if err != nil {
return err
}
*i = Interval(a)
return
}
func (i Interval) String() string {
return string(i)
}