Merge pull request #160 from c9s/feature/grid-options

This commit is contained in:
Yo-An Lin 2021-03-16 12:59:14 +08:00 committed by GitHub
commit f5b65e795e
18 changed files with 245 additions and 47 deletions

View File

@ -53,22 +53,28 @@ go get -u github.com/c9s/bbgo/cmd/bbgo
Add your dotenv file:
```
# optional
SLACK_TOKEN=
# optional
TELEGRAM_BOT_TOKEN=
TELEGRAM_BOT_AUTH_TOKEN=
# if you have one
BINANCE_API_KEY=
BINANCE_API_SECRET=
# if you have one
MAX_API_KEY=
MAX_API_SECRET=
# if you have one
FTX_API_KEY=
FTX_API_SECRET=
# specify it if credentials are for subaccount
FTX_SUBACCOUNT=
# optional, if you have the db setup
MYSQL_URL=root@tcp(127.0.0.1:3306)/bbgo?parseTime=true
```
@ -226,6 +232,16 @@ then the following types could be injected automatically:
- `*bbgo.ExchangeSession`
- `types.Market`
## Strategy Execution Phases
1. Load config from the config file.
2. Allocate and initialize exchange sessions.
3. Add exchange sessions to the environment (the data layer).
4. Use the given environment to initialize the trader object (the logic layer).
5. The trader initializes the environment and start the exchange connections.
6. Call strategy.Run() method sequentially.
## Exchange API Examples
Please check out the example directory: [examples](examples)
@ -250,6 +266,10 @@ streambook := types.NewStreamBook(symbol)
streambook.BindStream(stream)
```
## How To Add A New Exchange
(TBD)
## Telegram Integration
- In telegram: @botFather

View File

@ -211,8 +211,22 @@ func (e Exchange) QueryTrades(ctx context.Context, symbol string, options *types
}
func (e Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticker, error) {
// Not using Tickers in back test (yet)
return nil, ErrUnimplemented
matching, ok := e.matchingBooks[symbol]
if !ok {
return nil, fmt.Errorf("matching engine is not initialized for symbol %s", symbol)
}
kline := matching.LastKLine
return &types.Ticker{
Time: kline.EndTime,
Volume: kline.Volume,
Last: kline.Close,
Open: kline.Open,
High: kline.High,
Low: kline.Low,
Buy: kline.Close,
Sell: kline.Close,
}, nil
}
func (e Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) {

View File

@ -42,6 +42,7 @@ type SimplePriceMatching struct {
askOrders []types.Order
LastPrice fixedpoint.Value
LastKLine types.KLine
CurrentTime time.Time
Account *types.Account
@ -400,6 +401,7 @@ func (m *SimplePriceMatching) SellToPrice(price fixedpoint.Value) (closedOrders
func (m *SimplePriceMatching) processKLine(kline types.KLine) {
m.CurrentTime = kline.EndTime
m.LastKLine = kline
switch kline.Direction() {
case types.DirectionDown:

View File

@ -22,6 +22,10 @@ func NewLocalActiveOrderBook() *LocalActiveOrderBook {
}
}
func (b *LocalActiveOrderBook) Backup() []types.SubmitOrder {
return append(b.Bids.Backup(), b.Asks.Backup()...)
}
func (b *LocalActiveOrderBook) BindStream(stream types.Stream) {
stream.OnOrderUpdate(b.orderUpdateHandler)
}

View File

@ -246,6 +246,7 @@ func NewExchangeSessionFromConfig(name string, sessionConfig *ExchangeSession) (
session.EnvVarPrefix = sessionConfig.EnvVarPrefix
session.Key = sessionConfig.Key
session.Secret = sessionConfig.Secret
session.SubAccount = sessionConfig.SubAccount
session.PublicOnly = sessionConfig.PublicOnly
session.Margin = sessionConfig.Margin
session.IsolatedMargin = sessionConfig.IsolatedMargin
@ -475,7 +476,8 @@ func (environ *Environment) Connect(ctx context.Context) error {
var logger = log.WithField("session", n)
if len(session.Subscriptions) == 0 {
logger.Warnf("exchange session %s has no subscriptions", session.Name)
logger.Warnf("exchange session %s has no subscriptions, skipping", session.Name)
continue
} else {
// add the subscribe requests to the stream
for _, s := range session.Subscriptions {

View File

@ -238,7 +238,6 @@ func (rule *SlideRule) Scale() (Scale, error) {
return nil, errors.New("no any scale is defined")
}
// PriceVolumeScale defines the scale DSL for strategy, e.g.,
//
// scaleQuantity:
@ -282,7 +281,7 @@ func (q *PriceVolumeScale) ScaleByPrice(price float64) (float64, error) {
return 0, err
}
if err := scale.Solve() ; err != nil {
if err := scale.Solve(); err != nil {
return 0, err
}
@ -300,7 +299,7 @@ func (q *PriceVolumeScale) ScaleByVolume(volume float64) (float64, error) {
return 0, err
}
if err := scale.Solve() ; err != nil {
if err := scale.Solve(); err != nil {
return 0, err
}

View File

@ -310,6 +310,8 @@ func (s *Stream) Connect(ctx context.Context) error {
}
go s.read(ctx)
s.EmitStart()
return nil
}
@ -366,6 +368,8 @@ func (s *Stream) read(ctx context.Context) {
log.Info("websocket connection closed, going away")
}
s.EmitDisconnect()
// reconnect
for err != nil {
select {

View File

@ -53,7 +53,7 @@ type WebSocketService struct {
Subscriptions []Subscription
connectCallbacks []func(conn *websocket.Conn)
disconnectCallbacks []func(conn *websocket.Conn)
disconnectCallbacks []func()
errorCallbacks []func(err error)
messageCallbacks []func(message []byte)
@ -163,6 +163,7 @@ func (s *WebSocketService) read(ctx context.Context) {
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
s.EmitDisconnect()
// emit reconnect to start a new connection
s.emitReconnect()
return

View File

@ -16,13 +16,13 @@ func (s *WebSocketService) EmitConnect(conn *websocket.Conn) {
}
}
func (s *WebSocketService) OnDisconnect(cb func(conn *websocket.Conn)) {
func (s *WebSocketService) OnDisconnect(cb func()) {
s.disconnectCallbacks = append(s.disconnectCallbacks, cb)
}
func (s *WebSocketService) EmitDisconnect(conn *websocket.Conn) {
func (s *WebSocketService) EmitDisconnect() {
for _, cb := range s.disconnectCallbacks {
cb(conn)
cb()
}
}

View File

@ -42,6 +42,8 @@ func NewStream(key, secret string) *Stream {
}
})
wss.OnDisconnect(stream.EmitDisconnect)
wss.OnMessage(func(message []byte) {
logger.Debugf("M: %s", message)
})
@ -169,7 +171,13 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, options types.S
}
func (s *Stream) Connect(ctx context.Context) error {
return s.websocketService.Connect(ctx)
err := s.websocketService.Connect(ctx)
if err != nil {
return err
}
s.EmitStart()
return nil
}
func (s *Stream) Close() error {

View File

@ -18,7 +18,7 @@ func (s *DepositService) Sync(ctx context.Context, ex types.Exchange) error {
txnIDs := map[string]struct{}{}
// query descending
records, err := s.QueryLast(ex.Name(), 100)
records, err := s.QueryLast(ex.Name(), 10)
if err != nil {
return err
}
@ -56,7 +56,6 @@ func (s *DepositService) Sync(ctx context.Context, ex types.Exchange) error {
return nil
}
func (s *DepositService) QueryLast(ex types.ExchangeName, limit int) ([]types.Deposit, error) {
sql := "SELECT * FROM `deposits` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit"
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{

View File

@ -18,7 +18,7 @@ func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange) error {
txnIDs := map[string]struct{}{}
// query descending
records, err := s.QueryLast(ex.Name(), 100)
records, err := s.QueryLast(ex.Name(), 10)
if err != nil {
return err
}

View File

@ -56,25 +56,38 @@ type Strategy struct {
LowerPrice fixedpoint.Value `json:"lowerPrice" yaml:"lowerPrice"`
// Quantity is the quantity you want to submit for each order.
Quantity fixedpoint.Value `json:"quantity,omitempty"`
Quantity fixedpoint.Value `json:"quantity,omitempty"`
// ScaleQuantity helps user to define the quantity by price scale or volume scale
ScaleQuantity *bbgo.PriceVolumeScale `json:"scaleQuantity,omitempty"`
// FixedAmount is used for fixed amount (dynamic quantity) if you don't want to use fixed quantity.
FixedAmount fixedpoint.Value `json:"amount,omitempty" yaml:"amount"`
// Side is the initial maker orders side. defaults to "both"
Side types.SideType `json:"side" yaml:"side"`
// CatchUp let the maker grid catch up with the price change.
CatchUp bool `json:"catchUp" yaml:"catchUp"`
// Long means you want to hold more base asset than the quote asset.
Long bool `json:"long,omitempty" yaml:"long,omitempty"`
filledBuyGrids map[fixedpoint.Value]struct{}
filledSellGrids map[fixedpoint.Value]struct{}
// orderStore is used to store all the created orders, so that we can filter the trades.
orderStore *bbgo.OrderStore
// activeOrders is the locally maintained active order book of the maker orders.
activeOrders *bbgo.LocalActiveOrderBook
position fixedpoint.Value
position bbgo.Position
// any created orders for tracking trades
orders map[uint64]types.Order
// groupID is the group ID used for the strategy instance for canceling orders
groupID int64
}
@ -141,6 +154,11 @@ func (s *Strategy) generateGridSellOrders(session *bbgo.ExchangeSession) ([]type
baseBalance.Available.Float64())
}
if _, filled := s.filledSellGrids[price]; filled {
log.Debugf("sell grid at price %f is already filled, skipping", price.Float64())
continue
}
orders = append(orders, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeSell,
@ -152,12 +170,15 @@ func (s *Strategy) generateGridSellOrders(session *bbgo.ExchangeSession) ([]type
GroupID: s.groupID,
})
baseBalance.Available -= quantity
s.filledSellGrids[price] = struct{}{}
}
return orders, nil
}
func (s *Strategy) generateGridBuyOrders(session *bbgo.ExchangeSession) ([]types.SubmitOrder, error) {
// session.Exchange.QueryTicker()
currentPriceFloat, ok := session.LastPrice(s.Symbol)
if !ok {
return nil, fmt.Errorf("%s last price not found, skipping", s.Symbol)
@ -217,6 +238,11 @@ func (s *Strategy) generateGridBuyOrders(session *bbgo.ExchangeSession) ([]types
quoteQuantity.Float64())
}
if _, filled := s.filledBuyGrids[price]; filled {
log.Debugf("buy grid at price %f is already filled, skipping", price.Float64())
continue
}
orders = append(orders, types.SubmitOrder{
Symbol: s.Symbol,
Side: types.SideTypeBuy,
@ -228,40 +254,74 @@ func (s *Strategy) generateGridBuyOrders(session *bbgo.ExchangeSession) ([]types
GroupID: s.groupID,
})
balance.Available -= quoteQuantity
s.filledBuyGrids[price] = struct{}{}
}
return orders, nil
}
func (s *Strategy) placeGridSellOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
orderForms, err := s.generateGridSellOrders(session)
if err != nil {
return err
}
if len(orderForms) > 0 {
createdOrders, err := orderExecutor.SubmitOrders(context.Background(), orderForms...)
if err != nil {
return err
}
s.activeOrders.Add(createdOrders...)
}
return nil
}
func (s *Strategy) placeGridBuyOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
orderForms, err := s.generateGridBuyOrders(session)
if err != nil {
return err
}
if len(orderForms) > 0 {
createdOrders, err := orderExecutor.SubmitOrders(context.Background(), orderForms...)
if err != nil {
return err
} else {
s.activeOrders.Add(createdOrders...)
}
}
return nil
}
func (s *Strategy) placeGridOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) {
log.Infof("placing grid orders...")
sellOrders, err := s.generateGridSellOrders(session)
if err != nil {
log.Warn(err.Error())
}
if len(sellOrders) > 0 {
createdSellOrders, err := orderExecutor.SubmitOrders(context.Background(), sellOrders...)
if err != nil {
log.WithError(err).Error(err.Error())
} else {
s.activeOrders.Add(createdSellOrders...)
switch s.Side {
case types.SideTypeBuy:
if err := s.placeGridBuyOrders(orderExecutor, session); err != nil {
log.Warn(err.Error())
}
case types.SideTypeSell:
if err := s.placeGridSellOrders(orderExecutor, session); err != nil {
log.Warn(err.Error())
}
case types.SideTypeBoth:
if err := s.placeGridSellOrders(orderExecutor, session); err != nil {
log.Warn(err.Error())
}
if err := s.placeGridBuyOrders(orderExecutor, session); err != nil {
log.Warn(err.Error())
}
}
buyOrders, err := s.generateGridBuyOrders(session)
if err != nil {
log.Warn(err.Error())
}
if len(buyOrders) > 0 {
createdBuyOrders, err := orderExecutor.SubmitOrders(context.Background(), buyOrders...)
if err != nil {
log.WithError(err).Error(err.Error())
} else {
s.activeOrders.Add(createdBuyOrders...)
}
}
}
func (s *Strategy) tradeUpdateHandler(trade types.Trade) {
@ -271,11 +331,14 @@ func (s *Strategy) tradeUpdateHandler(trade types.Trade) {
if s.orderStore.Exists(trade.OrderID) {
log.Infof("received trade update of order %d: %+v", trade.OrderID, trade)
switch trade.Side {
case types.SideTypeBuy:
s.position.AtomicAdd(fixedpoint.NewFromFloat(trade.Quantity))
case types.SideTypeSell:
s.position.AtomicAdd(-fixedpoint.NewFromFloat(trade.Quantity))
if trade.Side == types.SideTypeSelf {
return
}
profit, madeProfit := s.position.AddTrade(trade)
if madeProfit {
s.Notify("profit: %f", profit.Float64())
}
}
}
@ -333,16 +396,25 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.GridNum = 10
}
if s.Side == "" {
s.Side = types.SideTypeBoth
}
if s.UpperPrice <= s.LowerPrice {
return fmt.Errorf("upper price (%f) should not be less than lower price (%f)", s.UpperPrice.Float64(), s.LowerPrice.Float64())
}
s.filledBuyGrids = make(map[fixedpoint.Value]struct{})
s.filledSellGrids = make(map[fixedpoint.Value]struct{})
position, ok := session.Position(s.Symbol)
if !ok {
return fmt.Errorf("position not found")
}
log.Infof("position: %+v", position)
s.position = *position
s.Notify("current position %+v", position)
instanceID := fmt.Sprintf("grid-%s-%d", s.Symbol, s.GridNum)
s.groupID = generateGroupID(instanceID)
@ -360,14 +432,21 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
defer wg.Done()
log.Infof("canceling active orders...")
if err := session.Exchange.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil {
log.WithError(err).Errorf("cancel order error")
}
})
if s.CatchUp {
session.Stream.OnKLineClosed(func(kline types.KLine) {
log.Infof("catchUp mode is enabled, updating grid orders...")
// update grid
s.placeGridOrders(orderExecutor, session)
})
}
session.Stream.OnTradeUpdate(s.tradeUpdateHandler)
session.Stream.OnConnect(func() {
session.Stream.OnStart(func() {
s.placeGridOrders(orderExecutor, session)
})

View File

@ -155,6 +155,14 @@ type Order struct {
IsIsolated bool `json:"isIsolated" db:"is_isolated"`
}
// Backup backs up the current order quantity to a SubmitOrder object
// so that we can post the order later when we want to restore the orders.
func (o Order) Backup() SubmitOrder {
so := o.SubmitOrder
so.Quantity = o.Quantity - o.ExecutedQuantity
return so
}
func (o Order) String() string {
return fmt.Sprintf("order %s %f/%f at %f -> %s", o.Side, o.ExecutedQuantity, o.Quantity, o.Price, o.Status)
}

View File

@ -5,6 +5,14 @@ import "sync"
// OrderMap is used for storing orders by their order id
type OrderMap map[uint64]Order
func (m OrderMap) Backup() (orderForms []SubmitOrder) {
for _, order := range m {
orderForms = append(orderForms, order.Backup())
}
return orderForms
}
func (m OrderMap) Add(o Order) {
m[o.OrderID] = o
}
@ -70,6 +78,12 @@ func NewSyncOrderMap() *SyncOrderMap {
}
}
func (m *SyncOrderMap) Backup() []SubmitOrder {
m.Lock()
defer m.Unlock()
return m.orders.Backup()
}
func (m *SyncOrderMap) Remove(orderID uint64) (exists bool) {
m.Lock()
defer m.Unlock()

View File

@ -1,5 +1,10 @@
package types
import (
"encoding/json"
"strings"
)
// SideType define side type of order
type SideType string
@ -7,8 +12,33 @@ const (
SideTypeBuy = SideType("BUY")
SideTypeSell = SideType("SELL")
SideTypeSelf = SideType("SELF")
// SideTypeBoth is only used for the configuration context
SideTypeBoth = SideType("BOTH")
)
func (side *SideType) UnmarshalJSON(data []byte) (err error) {
var s string
err = json.Unmarshal(data, &s)
if err != nil {
return err
}
switch strings.ToLower(s) {
case "buy":
*side = SideTypeBuy
case "sell":
*side = SideTypeSell
case "both":
*side = SideTypeBoth
}
return err
}
func (side SideType) Reverse() SideType {
switch side {
case SideTypeBuy:

View File

@ -4,6 +4,16 @@ package types
import ()
func (stream *StandardStream) OnStart(cb func()) {
stream.startCallbacks = append(stream.startCallbacks, cb)
}
func (stream *StandardStream) EmitStart() {
for _, cb := range stream.startCallbacks {
cb()
}
}
func (stream *StandardStream) OnConnect(cb func()) {
stream.connectCallbacks = append(stream.connectCallbacks, cb)
}
@ -105,6 +115,8 @@ func (stream *StandardStream) EmitBookSnapshot(book OrderBook) {
}
type StandardStreamEventHub interface {
OnStart(cb func())
OnConnect(cb func())
OnDisconnect(cb func())

View File

@ -23,6 +23,8 @@ var KLineChannel = Channel("kline")
type StandardStream struct {
Subscriptions []Subscription
startCallbacks []func()
connectCallbacks []func()
disconnectCallbacks []func()