Merge pull request #45 from c9s/strategy/grid

strategy: grid trading strategy
This commit is contained in:
Yo-An Lin 2020-10-31 18:34:43 +08:00 committed by GitHub
commit ecbe17837c
18 changed files with 508 additions and 27 deletions

50
config/grid.yaml Normal file
View File

@ -0,0 +1,50 @@
---
notifications:
slack:
defaultChannel: "dev-bbgo"
errorChannel: "bbgo-error"
# if you want to route channel by symbol
symbolChannels:
"^BTC": "btc"
"^ETH": "eth"
"^BNB": "bnb"
# object routing rules
routing:
trade: "$symbol"
order: "$symbol"
submitOrder: "$session" # not supported yet
pnL: "bbgo-pnl"
sessions:
binance:
exchange: binance
envVarPrefix: binance
riskControls:
# This is the session-based risk controller, which let you configure different risk controller by session.
sessionBased:
# "max" is the session name that you want to configure the risk control
binance:
# orderExecutors is one of the risk control
orderExecutors:
# symbol-routed order executor
bySymbol:
BNBUSDT:
# basic risk control order executor
basic:
minQuoteBalance: 100.0
maxBaseAssetBalance: 50.0
minBaseAssetBalance: 1.0
maxOrderAmount: 100.0
exchangeStrategies:
- on: binance
grid:
symbol: BNBUSDT
interval: 1m
baseQuantity: 1.0
gridPips: 0.02
gridNumber: 2

2
go.mod
View File

@ -41,7 +41,7 @@ require (
github.com/x-cray/logrus-prefixed-formatter v0.5.2
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c // indirect
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
gonum.org/v1/gonum v0.8.1 // indirect
gonum.org/v1/gonum v0.8.1
gopkg.in/yaml.v2 v2.3.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c
)

View File

@ -158,7 +158,7 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
session.marketDataStores[kline.Symbol].AddKLine(kline)
})
session.Stream.OnTrade(func(trade types.Trade) {
session.Stream.OnTradeUpdate(func(trade types.Trade) {
// append trades
session.Trades[trade.Symbol] = append(session.Trades[trade.Symbol], trade)

View File

@ -21,6 +21,7 @@ func NewStandardIndicatorSet(symbol string, store *MarketDataStore) *StandardInd
Symbol: symbol,
SMA: make(map[types.IntervalWindow]*indicator.SMA),
EWMA: make(map[types.IntervalWindow]*indicator.EWMA),
BOLL: make(map[types.IntervalWindow]*indicator.BOLL),
store: store,
}
@ -45,6 +46,19 @@ func NewStandardIndicatorSet(symbol string, store *MarketDataStore) *StandardInd
return set
}
// GetBOLL returns the bollinger band indicator of the given interval and the window,
// Please note that the K for std dev is fixed and defaults to 2.0
func (set *StandardIndicatorSet) GetBOLL(iw types.IntervalWindow) *indicator.BOLL {
inc, ok := set.BOLL[iw]
if !ok {
inc := &indicator.BOLL{IntervalWindow: iw, K: 2.0}
inc.Bind(set.store)
set.BOLL[iw] = inc
}
return inc
}
// GetSMA returns the simple moving average indicator of the given interval and the window size.
func (set *StandardIndicatorSet) GetSMA(iw types.IntervalWindow) *indicator.SMA {
inc, ok := set.SMA[iw]

View File

@ -106,7 +106,7 @@ func (trader *Trader) Run(ctx context.Context) error {
for sessionName := range trader.environment.sessions {
var session = trader.environment.sessions[sessionName]
if trader.tradeReporter != nil {
session.Stream.OnTrade(func(trade types.Trade) {
session.Stream.OnTradeUpdate(func(trade types.Trade) {
trader.tradeReporter.Report(trade)
})
}
@ -290,7 +290,7 @@ func (trader *OrderExecutor) RunStrategy(ctx context.Context, strategy SingleExc
trader.reportPnL()
})
stream.OnTrade(func(trade *types.Trade) {
stream.OnTradeUpdate(func(trade *types.Trade) {
trader.NotifyTrade(trade)
trader.ProfitAndLossCalculator.AddTrade(*trade)
_, err := trader.Context.StockManager.AddTrades([]types.Trade{*trade})

View File

@ -3,6 +3,7 @@ package cmd
// import built-in strategies
import (
_ "github.com/c9s/bbgo/pkg/strategy/buyandhold"
_ "github.com/c9s/bbgo/pkg/strategy/grid"
_ "github.com/c9s/bbgo/pkg/strategy/pricealert"
_ "github.com/c9s/bbgo/pkg/strategy/swing"
_ "github.com/c9s/bbgo/pkg/strategy/xpuremaker"

View File

@ -75,7 +75,8 @@ type ExecutionReportEvent struct {
CurrentExecutionType string `json:"x"`
CurrentOrderStatus string `json:"X"`
OrderID uint64 `json:"i"`
OrderID int64 `json:"i"`
Ignored int64 `json:"I"`
TradeID int64 `json:"t"`
TransactionTime int64 `json:"T"`
@ -108,7 +109,7 @@ func (e *ExecutionReportEvent) Order() (*types.Order, error) {
Price: util.MustParseFloat(e.OrderPrice),
TimeInForce: e.TimeInForce,
},
OrderID: e.OrderID,
OrderID: uint64(e.OrderID),
Status: toGlobalOrderStatus(binance.OrderStatusType(e.CurrentOrderStatus)),
ExecutedQuantity: util.MustParseFloat(e.CumulativeFilledQuantity),
CreationTime: orderCreationTime,

View File

@ -130,14 +130,24 @@ func NewStream(client *binance.Client) *Stream {
stream.OnExecutionReportEvent(func(e *ExecutionReportEvent) {
switch e.CurrentExecutionType {
case "NEW", "CANCELED", "REJECTED", "EXPIRED", "REPLACED":
order, err := e.Order()
if err != nil {
log.WithError(err).Error("order convert error")
return
}
stream.EmitOrderUpdate(*order)
case "TRADE":
trade, err := e.Trade()
if err != nil {
log.WithError(err).Error("trade convert error")
break
return
}
stream.EmitTrade(*trade)
stream.EmitTradeUpdate(*trade)
}
})

View File

@ -58,7 +58,7 @@ func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) {
MinLot: 1.0 / math.Pow10(m.BaseUnitPrecision), // make it like 0.0001
MinQuantity: m.MinBaseAmount,
MaxQuantity: 10000.0,
MinPrice: 0.1,
MinPrice: 1.0 / math.Pow10(m.QuoteUnitPrecision), // used in the price formatter
MaxPrice: 10000.0,
TickSize: 0.001,
}

View File

@ -17,14 +17,14 @@ type OrderUpdate struct {
Event string `json:"e"`
ID uint64 `json:"i"`
Side string `json:"sd"`
OrderType string `json:"ot"`
OrderType OrderType `json:"ot"`
Price string `json:"p"`
StopPrice string `json:"sp"`
Volume string `json:"v"`
AveragePrice string `json:"ap"`
State string `json:"S"`
State OrderState `json:"S"`
Market string `json:"M"`
RemainingVolume string `json:"rv"`
@ -37,6 +37,7 @@ type OrderUpdate struct {
CreatedAtMs int64 `json:"T"`
}
type OrderUpdateEvent struct {
BaseEvent
@ -49,8 +50,8 @@ func parserOrderUpdate(v *fastjson.Value) OrderUpdate {
ID: v.GetUint64("i"),
Side: string(v.GetStringBytes("sd")),
Market: string(v.GetStringBytes("M")),
OrderType: string(v.GetStringBytes("ot")),
State: string(v.GetStringBytes("S")),
OrderType: OrderType(v.GetStringBytes("ot")),
State: OrderState(v.GetStringBytes("S")),
Price: string(v.GetStringBytes("p")),
StopPrice: string(v.GetStringBytes("sp")),
AveragePrice: string(v.GetStringBytes("ap")),

View File

@ -6,7 +6,9 @@ import (
"time"
max "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
var logger = log.WithField("exchange", "max")
@ -28,7 +30,29 @@ func NewStream(key, secret string) *Stream {
logger.Infof("M: %s", message)
})
// wss.OnTradeEvent(func(e max.PublicTradeEvent) { })
wss.OnOrderSnapshotEvent(func(e max.OrderSnapshotEvent) {
for _, o := range e.Orders {
globalOrder, err := toGlobalOrderUpdate(o)
if err != nil {
log.WithError(err).Error("websocket order snapshot convert error")
continue
}
stream.EmitOrderUpdate(*globalOrder)
}
})
wss.OnOrderUpdateEvent(func(e max.OrderUpdateEvent) {
for _, o := range e.Orders {
globalOrder, err := toGlobalOrderUpdate(o)
if err != nil {
log.WithError(err).Error("websocket order update convert error")
continue
}
stream.EmitOrderUpdate(*globalOrder)
}
})
wss.OnTradeUpdateEvent(func(e max.TradeUpdateEvent) {
for _, tradeUpdate := range e.Trades {
@ -38,7 +62,7 @@ func NewStream(key, secret string) *Stream {
return
}
stream.EmitTrade(*trade)
stream.EmitTradeUpdate(*trade)
}
})
@ -141,3 +165,32 @@ func convertWebSocketTrade(t max.TradeUpdate) (*types.Trade, error) {
Time: mts,
}, nil
}
func toGlobalOrderUpdate(u max.OrderUpdate) (*types.Order, error) {
executedVolume, err := fixedpoint.NewFromString(u.ExecutedVolume)
if err != nil {
return nil, err
}
remainingVolume, err := fixedpoint.NewFromString(u.RemainingVolume)
if err != nil {
return nil, err
}
return &types.Order{
SubmitOrder: types.SubmitOrder{
ClientOrderID: u.ClientOID,
Symbol: u.Market,
Side: toGlobalSideType(u.Side),
Type: toGlobalOrderType(u.OrderType),
Quantity: util.MustParseFloat(u.Volume),
Price: util.MustParseFloat(u.Price),
StopPrice: util.MustParseFloat(u.StopPrice),
TimeInForce: "GTC", // MAX only supports GTC
},
OrderID: u.ID,
Status: toGlobalOrderStatus(u.State, executedVolume, remainingVolume),
ExecutedQuantity: executedVolume.Float64(),
CreationTime: time.Unix(0, u.CreatedAtMs*int64(time.Millisecond)),
}, nil
}

View File

@ -20,6 +20,8 @@ Bollinger Bands
Bollinger Bands Technical indicator guide:
- https://www.fidelity.com/learning-center/trading-investing/technical-analysis/technical-indicator-guide/bollinger-bands
*/
//go:generate callbackgen -type BOLL
type BOLL struct {
types.IntervalWindow
@ -32,6 +34,16 @@ type BOLL struct {
DownBand Float64Slice
EndTime time.Time
updateCallbacks []func(sma, upBand, downBand float64)
}
func (inc *BOLL) LastUpBand() float64 {
return inc.UpBand[len(inc.UpBand)-1]
}
func (inc *BOLL) LastDownBand() float64 {
return inc.DownBand[len(inc.DownBand)-1]
}
func (inc *BOLL) LastSMA() float64 {
@ -62,14 +74,18 @@ func (inc *BOLL) calculateAndUpdate(kLines []types.KLine) {
var std = stat.StdDev(prices, nil)
inc.StdDev.Push(std)
var upBand = sma + inc.K*std
var band = inc.K * std
var upBand = sma + band
inc.UpBand.Push(upBand)
var downBand = sma - inc.K*std
var downBand = sma - band
inc.DownBand.Push(downBand)
// update end time
inc.EndTime = kLines[index].EndTime
inc.EmitUpdate(sma, upBand, downBand)
}
func (inc *BOLL) handleKLineWindowUpdate(interval types.Interval, window types.KLineWindow) {

View File

@ -0,0 +1,15 @@
// Code generated by "callbackgen -type BOLL"; DO NOT EDIT.
package indicator
import ()
func (inc *BOLL) OnUpdate(cb func(sma float64, upBand float64, downBand float64)) {
inc.updateCallbacks = append(inc.updateCallbacks, cb)
}
func (inc *BOLL) EmitUpdate(sma float64, upBand float64, downBand float64) {
for _, cb := range inc.updateCallbacks {
cb(sma, upBand, downBand)
}
}

View File

@ -0,0 +1,297 @@
package grid
import (
"context"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/types"
)
var log = logrus.WithField("strategy", "grid")
// The indicators (SMA and EWMA) that we want to use are returning float64 data.
type Float64Indicator interface {
Last() float64
}
func init() {
// Register the pointer of the strategy struct,
// so that bbgo knows what struct to be used to unmarshal the configs (YAML or JSON)
// Note: built-in strategies need to imported manually in the bbgo cmd package.
bbgo.RegisterStrategy("grid", &Strategy{})
}
type Strategy struct {
// The notification system will be injected into the strategy automatically.
// This field will be injected automatically since it's a single exchange strategy.
*bbgo.Notifiability
// OrderExecutor is an interface for submitting order.
// This field will be injected automatically since it's a single exchange strategy.
bbgo.OrderExecutor
// if Symbol string field is defined, bbgo will know it's a symbol-based strategy
// The following embedded fields will be injected with the corresponding instances.
// MarketDataStore is a pointer only injection field. public trades, k-lines (candlestick)
// and order book updates are maintained in the market data store.
// This field will be injected automatically since we defined the Symbol field.
*bbgo.MarketDataStore
// StandardIndicatorSet contains the standard indicators of a market (symbol)
// This field will be injected automatically since we defined the Symbol field.
*bbgo.StandardIndicatorSet
// Market stores the configuration of the market, for example, VolumePrecision, PricePrecision, MinLotSize... etc
// This field will be injected automatically since we defined the Symbol field.
types.Market
// These fields will be filled from the config file (it translates YAML to JSON)
Symbol string `json:"symbol"`
Interval types.Interval `json:"interval"`
// GridPips is the pips of grid, e.g., 0.001
GridPips fixedpoint.Value `json:"gridPips"`
// GridNum is the grid number (order numbers)
GridNum int `json:"gridNumber"`
BaseQuantity float64 `json:"baseQuantity"`
activeBidOrders map[uint64]types.Order
activeAskOrders map[uint64]types.Order
boll *indicator.BOLL
mu sync.Mutex
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
// currently we need the 1m kline to update the last close price and indicators
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval.String()})
}
func (s *Strategy) updateBidOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) {
quoteCurrency := s.Market.QuoteCurrency
balances := session.Account.Balances()
balance, ok := balances[quoteCurrency]
if !ok || balance.Available <= 0.0 {
return
}
var numOrders = s.GridNum - len(s.activeBidOrders)
if numOrders <= 0 {
return
}
var downBand = s.boll.LastDownBand()
var startPrice = downBand
var submitOrders []types.SubmitOrder
for i := 0; i < numOrders; i++ {
submitOrders = append(submitOrders, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeBuy,
Type: types.OrderTypeLimit,
Market: s.Market,
Quantity: s.BaseQuantity,
Price: startPrice,
TimeInForce: "GTC",
})
startPrice -= s.GridPips.Float64()
}
orders, err := orderExecutor.SubmitOrders(context.Background(), submitOrders...)
if err != nil {
log.WithError(err).Error("submit bid order error")
return
}
s.mu.Lock()
for i := range orders {
var order = orders[i]
log.Infof("adding order %d to the active bid order pool...", order.OrderID)
s.activeBidOrders[order.OrderID] = order
}
s.mu.Unlock()
}
func (s *Strategy) updateAskOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) {
baseCurrency := s.Market.BaseCurrency
balances := session.Account.Balances()
balance, ok := balances[baseCurrency]
if !ok || balance.Available <= 0.0 {
return
}
var numOrders = s.GridNum - len(s.activeAskOrders)
if numOrders <= 0 {
return
}
var upBand = s.boll.LastUpBand()
var startPrice = upBand
var submitOrders []types.SubmitOrder
for i := 0; i < numOrders; i++ {
submitOrders = append(submitOrders, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeSell,
Type: types.OrderTypeLimit,
Market: s.Market,
Quantity: s.BaseQuantity,
Price: startPrice,
TimeInForce: "GTC",
})
startPrice += s.GridPips.Float64()
}
orders, err := orderExecutor.SubmitOrders(context.Background(), submitOrders...)
if err != nil {
log.WithError(err).Error("submit ask order error")
return
}
s.mu.Lock()
for i := range orders {
var order = orders[i]
log.Infof("adding order %d to the active ask order pool...", order.OrderID)
s.activeAskOrders[order.OrderID] = order
}
s.mu.Unlock()
}
func (s *Strategy) updateOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) {
log.Infof("checking grid orders, bids=%d asks=%d", len(s.activeBidOrders), len(s.activeAskOrders))
for _, o := range s.activeBidOrders {
log.Infof("bid order: %d -> %s", o.OrderID, o.Status)
}
for _, o := range s.activeAskOrders {
log.Infof("ask order: %d -> %s", o.OrderID, o.Status)
}
if len(s.activeBidOrders) < s.GridNum {
log.Infof("active bid orders not enough: %d < %d, updating...", len(s.activeBidOrders), s.GridNum)
s.updateBidOrders(orderExecutor, session)
}
if len(s.activeAskOrders) < s.GridNum {
log.Infof("active ask orders not enough: %d < %d, updating...", len(s.activeAskOrders), s.GridNum)
s.updateAskOrders(orderExecutor, session)
}
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
if s.GridNum == 0 {
s.GridNum = 2
}
s.boll = s.StandardIndicatorSet.GetBOLL(types.IntervalWindow{
Interval: s.Interval,
Window: 21,
})
// we don't persist orders so that we can not clear the previous orders for now. just need time to support this.
// TODO: pull this map out and add mutex lock
s.activeBidOrders = make(map[uint64]types.Order)
s.activeAskOrders = make(map[uint64]types.Order)
session.Stream.OnOrderUpdate(func(order types.Order) {
log.Infof("received order update: %+v", order)
if order.Symbol != s.Symbol {
return
}
s.mu.Lock()
defer s.mu.Unlock()
switch order.Status {
case types.OrderStatusFilled:
switch order.Side {
case types.SideTypeSell:
// find the filled bid to remove
for id, o := range s.activeBidOrders {
if o.Status == types.OrderStatusFilled {
delete(s.activeBidOrders, id)
delete(s.activeAskOrders, order.OrderID)
break
}
}
case types.SideTypeBuy:
// find the filled ask order to remove
for id, o := range s.activeAskOrders {
if o.Status == types.OrderStatusFilled {
delete(s.activeAskOrders, id)
delete(s.activeBidOrders, order.OrderID)
break
}
}
}
case types.OrderStatusCanceled, types.OrderStatusRejected:
log.Infof("order status %s, removing %d from the active order pool...", order.Status, order.OrderID)
switch order.Side {
case types.SideTypeSell:
delete(s.activeAskOrders, order.OrderID)
case types.SideTypeBuy:
delete(s.activeBidOrders, order.OrderID)
}
default:
log.Infof("order status %s, updating %d to the active order pool...", order.Status, order.OrderID)
switch order.Side {
case types.SideTypeSell:
s.activeAskOrders[order.OrderID] = order
case types.SideTypeBuy:
s.activeBidOrders[order.OrderID] = order
}
}
})
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
s.updateOrders(orderExecutor, session)
defer func() {
for _, o := range s.activeBidOrders {
_ = session.Exchange.CancelOrders(context.Background(), o)
}
for _, o := range s.activeAskOrders {
_ = session.Exchange.CancelOrders(context.Background(), o)
}
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// see if we have enough balances and then we create limit orders on the up band and the down band.
s.updateOrders(orderExecutor, session)
}
}
}()
return nil
}

View File

@ -44,7 +44,10 @@ func (m Market) FormatPriceCurrency(val float64) string {
}
func (m Market) FormatPrice(val float64) string {
p := math.Pow10(m.PricePrecision)
// p := math.Pow10(m.PricePrecision)
prec := int(math.Abs(math.Log10(m.MinPrice)))
p := math.Pow10(prec)
val = math.Trunc(val*p) / p
return strconv.FormatFloat(val, 'f', m.PricePrecision, 64)
}

View File

@ -40,6 +40,13 @@ func (slice PriceVolumeSlice) Copy() PriceVolumeSlice {
return append(slice[:0:0], slice...)
}
func (slice PriceVolumeSlice) First() (PriceVolume, bool) {
if len(slice) > 0 {
return slice[0], true
}
return PriceVolume{}, false
}
func (slice PriceVolumeSlice) IndexByVolumeDepth(requiredVolume fixedpoint.Value) int {
var tv int64 = 0
for x, el := range slice {

View File

@ -2,18 +2,26 @@
package types
import ()
func (stream *StandardStream) OnTrade(cb func(trade Trade)) {
stream.tradeCallbacks = append(stream.tradeCallbacks, cb)
func (stream *StandardStream) OnTradeUpdate(cb func(trade Trade)) {
stream.tradeUpdateCallbacks = append(stream.tradeUpdateCallbacks, cb)
}
func (stream *StandardStream) EmitTrade(trade Trade) {
for _, cb := range stream.tradeCallbacks {
func (stream *StandardStream) EmitTradeUpdate(trade Trade) {
for _, cb := range stream.tradeUpdateCallbacks {
cb(trade)
}
}
func (stream *StandardStream) OnOrderUpdate(cb func(order Order)) {
stream.orderUpdateCallbacks = append(stream.orderUpdateCallbacks, cb)
}
func (stream *StandardStream) EmitOrderUpdate(order Order) {
for _, cb := range stream.orderUpdateCallbacks {
cb(order)
}
}
func (stream *StandardStream) OnBalanceSnapshot(cb func(balances map[string]Balance)) {
stream.balanceSnapshotCallbacks = append(stream.balanceSnapshotCallbacks, cb)
}
@ -75,7 +83,9 @@ func (stream *StandardStream) EmitBookSnapshot(book OrderBook) {
}
type StandardStreamEventHub interface {
OnTrade(cb func(trade Trade))
OnTradeUpdate(cb func(trade Trade))
OnOrderUpdate(cb func(order Order))
OnBalanceSnapshot(cb func(balances map[string]Balance))

View File

@ -22,8 +22,11 @@ var KLineChannel = Channel("kline")
type StandardStream struct {
Subscriptions []Subscription
// private trade callbacks
tradeCallbacks []func(trade Trade)
// private trade update callbacks
tradeUpdateCallbacks []func(trade Trade)
// private order update callbacks
orderUpdateCallbacks []func(order Order)
// balance snapshot callbacks
balanceSnapshotCallbacks []func(balances map[string]Balance)