Merge pull request #50 from c9s/feature/order-management

enhancement: add more active order management components
This commit is contained in:
Yo-An Lin 2020-11-01 21:02:09 +08:00 committed by GitHub
commit c1d72c14a7
15 changed files with 380 additions and 108 deletions

54
config/grid-max.yaml Normal file
View File

@ -0,0 +1,54 @@
---
notifications:
slack:
defaultChannel: "dev-bbgo"
errorChannel: "bbgo-error"
# if you want to route channel by symbol
symbolChannels:
"^BTC": "btc"
"^ETH": "eth"
"^BNB": "bnb"
# object routing rules
routing:
trade: "$symbol"
order: "$symbol"
submitOrder: "$session" # not supported yet
pnL: "bbgo-pnl"
sessions:
binance:
exchange: binance
envVarPrefix: binance
max:
exchange: max
envVarPrefix: max
riskControls:
# This is the session-based risk controller, which let you configure different risk controller by session.
sessionBased:
# "max" is the session name that you want to configure the risk control
max:
# orderExecutors is one of the risk control
orderExecutors:
# symbol-routed order executor
bySymbol:
MAXUSDT:
# basic risk control order executor
basic:
minQuoteBalance: 1000.0
maxBaseAssetBalance: 5000.0
minBaseAssetBalance: 10.0
maxOrderAmount: 200.0
exchangeStrategies:
- on: max
grid:
symbol: MAXUSDT
interval: 1m
baseQuantity: 200.0
gridPips: 0.02
gridNumber: 2

View File

@ -22,11 +22,15 @@ sessions:
exchange: binance
envVarPrefix: binance
max:
exchange: max
envVarPrefix: max
riskControls:
# This is the session-based risk controller, which let you configure different risk controller by session.
sessionBased:
# "max" is the session name that you want to configure the risk control
binance:
max:
# orderExecutors is one of the risk control
orderExecutors:
# symbol-routed order executor

View File

@ -135,7 +135,7 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
for interval := range types.SupportedIntervals {
kLines, err := session.Exchange.QueryKLines(ctx, symbol, interval.String(), types.KLineQueryOptions{
EndTime: &now,
Limit: 100,
Limit: 500, // indicators need at least 100
})
if err != nil {
return err

View File

@ -17,7 +17,7 @@ type ExchangeOrderExecutionRouter struct {
sessions map[string]*ExchangeSession
}
func (e *ExchangeOrderExecutionRouter) SubmitOrdersTo(ctx context.Context, session string, orders ...types.SubmitOrder) ([]types.Order, error) {
func (e *ExchangeOrderExecutionRouter) SubmitOrdersTo(ctx context.Context, session string, orders ...types.SubmitOrder) (types.OrderSlice, error) {
es, ok := e.sessions[session]
if !ok {
return nil, errors.Errorf("exchange session %s not found", session)
@ -28,7 +28,6 @@ func (e *ExchangeOrderExecutionRouter) SubmitOrdersTo(ctx context.Context, sessi
return nil, err
}
// e.Notify(":memo: Submitting order to %s %s %s %s with quantity: %s", session, order.Symbol, order.Type, order.Side, order.QuantityString, order)
return es.Exchange.SubmitOrders(ctx, formattedOrders...)
}
@ -44,14 +43,14 @@ func (e *ExchangeOrderExecutor) notifySubmitOrders(orders ...types.SubmitOrder)
// pass submit order as an interface object.
channel, ok := e.RouteObject(&order)
if ok {
e.NotifyTo(channel, ":memo: Submitting %s %s %s order with quantity: %s", order.Symbol, order.Type, order.Side, order.QuantityString, &order)
e.NotifyTo(channel, ":memo: Submitting %s %s %s order with quantity: %s at price: %s", order.Symbol, order.Type, order.Side, order.QuantityString, order.PriceString, &order)
} else {
e.Notify(":memo: Submitting %s %s %s order with quantity: %s", order.Symbol, order.Type, order.Side, order.QuantityString, &order)
e.Notify(":memo: Submitting %s %s %s order with quantity: %s at price: %s", order.Symbol, order.Type, order.Side, order.QuantityString, order.PriceString, &order)
}
}
}
func (e *ExchangeOrderExecutor) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) ([]types.Order, error) {
func (e *ExchangeOrderExecutor) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (types.OrderSlice, error) {
formattedOrders, err := formatOrders(orders, e.session)
if err != nil {
return nil, err
@ -81,7 +80,7 @@ type BasicRiskControlOrderExecutor struct {
MaxOrderAmount fixedpoint.Value `json:"maxOrderAmount,omitempty"`
}
func (e *BasicRiskControlOrderExecutor) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) ([]types.Order, error) {
func (e *BasicRiskControlOrderExecutor) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (types.OrderSlice, error) {
var formattedOrders []types.SubmitOrder
for _, order := range orders {
currentPrice, ok := e.session.LastPrice(order.Symbol)

View File

@ -17,7 +17,7 @@ type RiskControlOrderExecutors struct {
BySymbol map[string]*SymbolBasedOrderExecutor `json:"bySymbol,omitempty" yaml:"bySymbol,omitempty"`
}
func (e *RiskControlOrderExecutors) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) ([]types.Order, error) {
func (e *RiskControlOrderExecutors) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (types.OrderSlice, error) {
var symbolOrders = make(map[string][]types.SubmitOrder, len(orders))
for _, order := range orders {
symbolOrders[order.Symbol] = append(symbolOrders[order.Symbol], order)

View File

@ -295,10 +295,10 @@ func (trader *Trader) ReportPnL() *PnLReporterManager {
}
type OrderExecutor interface {
SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders []types.Order, err error)
SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error)
}
type OrderExecutionRouter interface {
// SubmitOrderTo submit order to a specific exchange session
SubmitOrdersTo(ctx context.Context, session string, orders ...types.SubmitOrder) (createdOrders []types.Order, err error)
SubmitOrdersTo(ctx context.Context, session string, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error)
}

View File

@ -310,7 +310,7 @@ func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) (err
return err2
}
func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders []types.Order, err error) {
func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) {
for _, order := range orders {
orderType, err := toLocalOrderType(order.Type)
if err != nil {

View File

@ -111,7 +111,7 @@ func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) (err
return err2
}
func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders []types.Order, err error) {
func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) {
for _, order := range orders {
orderType, err := toLocalOrderType(order.Type)
if err != nil {
@ -362,26 +362,33 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol, interval string, opt
limit = options.Limit
}
i, err := maxapi.ParseInterval(interval)
if err != nil {
return nil, err
}
// workaround for the kline query
if options.EndTime != nil && options.StartTime == nil {
startTime := options.EndTime.Add(- time.Duration(limit) * time.Minute * time.Duration(i))
options.StartTime = &startTime
}
if options.StartTime == nil {
return nil, errors.New("start time can not be empty")
}
if options.EndTime != nil {
return nil, errors.New("end time is not supported")
}
log.Infof("querying kline %s %s %v", symbol, interval, options)
log.Infof("querying kline %s %s %+v", symbol, interval, options)
// avoid rate limit
time.Sleep(100 * time.Millisecond)
localKlines, err := e.client.PublicService.KLines(toLocalSymbol(symbol), interval, *options.StartTime, limit)
localKLines, err := e.client.PublicService.KLines(toLocalSymbol(symbol), interval, *options.StartTime, limit)
if err != nil {
return nil, err
}
var kLines []types.KLine
for _, k := range localKlines {
for _, k := range localKLines {
kLines = append(kLines, k.KLine())
}

View File

@ -152,7 +152,7 @@ func mustParseTicker(v *fastjson.Value) Ticker {
type Interval int64
func parseResolution(a string) (Interval, error) {
func ParseInterval(a string) (Interval, error) {
switch strings.ToLower(a) {
case "1m":
@ -170,12 +170,21 @@ func parseResolution(a string) (Interval, error) {
case "1h":
return 60, nil
case "2h":
return 60 * 2, nil
case "3h":
return 60 * 3, nil
case "4h":
return 60 * 4, nil
case "6h":
return 60 * 6, nil
case "8h":
return 60 * 8, nil
case "12h":
return 60 * 12, nil
@ -190,7 +199,7 @@ func parseResolution(a string) (Interval, error) {
}
return 0, errors.New("incorrect resolution")
return 0, errors.Errorf("incorrect resolution: %q", a)
}
type KLine struct {
@ -224,7 +233,7 @@ func (s *PublicService) KLines(symbol string, resolution string, start time.Time
queries := url.Values{}
queries.Set("market", symbol)
interval, err := parseResolution(resolution)
interval, err := ParseInterval(resolution)
if err != nil {
return nil, err
}
@ -232,7 +241,7 @@ func (s *PublicService) KLines(symbol string, resolution string, start time.Time
nilTime := time.Time{}
if start != nilTime {
queries.Set("timestamp", strconv.FormatInt(start.Unix(), 64))
queries.Set("timestamp", strconv.FormatInt(start.Unix(), 10))
}
if limit > 0 {

View File

@ -39,10 +39,18 @@ type BOLL struct {
}
func (inc *BOLL) LastUpBand() float64 {
if len(inc.UpBand) == 0 {
return 0.0
}
return inc.UpBand[len(inc.UpBand)-1]
}
func (inc *BOLL) LastDownBand() float64 {
if len(inc.DownBand) == 0 {
return 0.0
}
return inc.DownBand[len(inc.DownBand)-1]
}
@ -85,6 +93,8 @@ func (inc *BOLL) calculateAndUpdate(kLines []types.KLine) {
// update end time
inc.EndTime = kLines[index].EndTime
// log.Infof("update boll: sma=%f, up=%f, down=%f", sma, upBand, downBand)
inc.EmitUpdate(sma, upBand, downBand)
}

View File

@ -2,7 +2,6 @@ package grid
import (
"context"
"sync"
"time"
"github.com/sirupsen/logrus"
@ -65,11 +64,9 @@ type Strategy struct {
BaseQuantity float64 `json:"baseQuantity"`
activeBidOrders map[uint64]types.Order
activeAskOrders map[uint64]types.Order
activeOrders *types.LocalActiveOrderBook
boll *indicator.BOLL
mu sync.Mutex
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
@ -86,12 +83,16 @@ func (s *Strategy) updateBidOrders(orderExecutor bbgo.OrderExecutor, session *bb
return
}
var numOrders = s.GridNum - len(s.activeBidOrders)
var numOrders = s.GridNum - s.activeOrders.NumOfBids()
if numOrders <= 0 {
return
}
var downBand = s.boll.LastDownBand()
if downBand <= 0.0 {
return
}
var startPrice = downBand
var submitOrders []types.SubmitOrder
@ -115,13 +116,7 @@ func (s *Strategy) updateBidOrders(orderExecutor bbgo.OrderExecutor, session *bb
return
}
s.mu.Lock()
for i := range orders {
var order = orders[i]
log.Infof("adding order %d to the active bid order pool...", order.OrderID)
s.activeBidOrders[order.OrderID] = order
}
s.mu.Unlock()
s.activeOrders.Add(orders...)
}
func (s *Strategy) updateAskOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) {
@ -133,12 +128,16 @@ func (s *Strategy) updateAskOrders(orderExecutor bbgo.OrderExecutor, session *bb
return
}
var numOrders = s.GridNum - len(s.activeAskOrders)
var numOrders = s.GridNum - s.activeOrders.NumOfAsks()
if numOrders <= 0 {
return
}
var upBand = s.boll.LastUpBand()
if upBand <= 0.0 {
return
}
var startPrice = upBand
var submitOrders []types.SubmitOrder
@ -162,33 +161,22 @@ func (s *Strategy) updateAskOrders(orderExecutor bbgo.OrderExecutor, session *bb
return
}
s.mu.Lock()
for i := range orders {
var order = orders[i]
log.Infof("adding order %d to the active ask order pool...", order.OrderID)
s.activeAskOrders[order.OrderID] = order
}
s.mu.Unlock()
log.Infof("adding orders to the active ask order pool...")
s.activeOrders.Add(orders...)
}
func (s *Strategy) updateOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) {
log.Infof("checking grid orders, bids=%d asks=%d", len(s.activeBidOrders), len(s.activeAskOrders))
log.Infof("checking grid orders, bids=%d asks=%d", s.activeOrders.Bids.Len(), s.activeOrders.Asks.Len())
for _, o := range s.activeBidOrders {
log.Infof("bid order: %d -> %s", o.OrderID, o.Status)
}
s.activeOrders.Print()
for _, o := range s.activeAskOrders {
log.Infof("ask order: %d -> %s", o.OrderID, o.Status)
}
if len(s.activeBidOrders) < s.GridNum {
log.Infof("active bid orders not enough: %d < %d, updating...", len(s.activeBidOrders), s.GridNum)
if s.activeOrders.Bids.Len() < s.GridNum {
log.Infof("active bid orders not enough: %d < %d, updating...", s.activeOrders.Bids.Len(), s.GridNum)
s.updateBidOrders(orderExecutor, session)
}
if len(s.activeAskOrders) < s.GridNum {
log.Infof("active ask orders not enough: %d < %d, updating...", len(s.activeAskOrders), s.GridNum)
if s.activeOrders.Asks.Len() < s.GridNum {
log.Infof("active ask orders not enough: %d < %d, updating...", s.activeOrders.Asks.Len(), s.GridNum)
s.updateAskOrders(orderExecutor, session)
}
}
@ -204,9 +192,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
})
// we don't persist orders so that we can not clear the previous orders for now. just need time to support this.
// TODO: pull this map out and add mutex lock
s.activeBidOrders = make(map[uint64]types.Order)
s.activeAskOrders = make(map[uint64]types.Order)
s.activeOrders = types.NewLocalActiveOrderBook()
session.Stream.OnOrderUpdate(func(order types.Order) {
log.Infof("received order update: %+v", order)
@ -215,53 +201,17 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
return
}
s.mu.Lock()
defer s.mu.Unlock()
switch order.Status {
case types.OrderStatusFilled:
switch order.Side {
case types.SideTypeSell:
// find the filled bid to remove
for id, o := range s.activeBidOrders {
if o.Status == types.OrderStatusFilled {
delete(s.activeBidOrders, id)
delete(s.activeAskOrders, order.OrderID)
break
}
}
case types.SideTypeBuy:
// find the filled ask order to remove
for id, o := range s.activeAskOrders {
if o.Status == types.OrderStatusFilled {
delete(s.activeAskOrders, id)
delete(s.activeBidOrders, order.OrderID)
break
}
}
}
s.activeOrders.WriteOff(order)
case types.OrderStatusCanceled, types.OrderStatusRejected:
log.Infof("order status %s, removing %d from the active order pool...", order.Status, order.OrderID)
switch order.Side {
case types.SideTypeSell:
delete(s.activeAskOrders, order.OrderID)
case types.SideTypeBuy:
delete(s.activeBidOrders, order.OrderID)
}
s.activeOrders.Delete(order)
default:
log.Infof("order status %s, updating %d to the active order pool...", order.Status, order.OrderID)
switch order.Side {
case types.SideTypeSell:
s.activeAskOrders[order.OrderID] = order
case types.SideTypeBuy:
s.activeBidOrders[order.OrderID] = order
}
s.activeOrders.Add(order)
}
})
@ -272,13 +222,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.updateOrders(orderExecutor, session)
defer func() {
for _, o := range s.activeBidOrders {
_ = session.Exchange.CancelOrders(context.Background(), o)
}
for _, o := range s.activeAskOrders {
_ = session.Exchange.CancelOrders(context.Background(), o)
}
_ = session.Exchange.CancelOrders(context.Background(), s.activeOrders.Orders()...)
}()
for {

90
pkg/types/active_book.go Normal file
View File

@ -0,0 +1,90 @@
package types
import log "github.com/sirupsen/logrus"
// LocalActiveOrderBook manages the local active order books.
type LocalActiveOrderBook struct {
Bids *SyncOrderMap
Asks *SyncOrderMap
}
func NewLocalActiveOrderBook() *LocalActiveOrderBook {
return &LocalActiveOrderBook{
Bids: NewSyncOrderMap(),
Asks: NewSyncOrderMap(),
}
}
func (b *LocalActiveOrderBook) Print() {
for _, o := range b.Bids.Orders() {
log.Infof("bid order: %d -> %s", o.OrderID, o.Status)
}
for _, o := range b.Asks.Orders() {
log.Infof("ask order: %d -> %s", o.OrderID, o.Status)
}
}
func (b *LocalActiveOrderBook) Add(orders ...Order) {
for _, order := range orders {
switch order.Side {
case SideTypeBuy:
b.Bids.Add(order)
case SideTypeSell:
b.Asks.Add(order)
}
}
}
func (b *LocalActiveOrderBook) NumOfBids() int {
return b.Bids.Len()
}
func (b *LocalActiveOrderBook) NumOfAsks() int {
return b.Asks.Len()
}
func (b *LocalActiveOrderBook) Delete(order Order) {
switch order.Side {
case SideTypeBuy:
b.Bids.Delete(order.OrderID)
case SideTypeSell:
b.Asks.Delete(order.OrderID)
}
}
// WriteOff writes off the filled order on the opposite side.
// This method does not write off order by order amount or order quantity.
func (b *LocalActiveOrderBook) WriteOff(order Order) bool {
if order.Status != OrderStatusFilled {
return false
}
switch order.Side {
case SideTypeSell:
// find the filled bid to remove
if filledOrder, ok := b.Bids.AnyFilled(); ok {
b.Bids.Delete(filledOrder.OrderID)
b.Asks.Delete(order.OrderID)
return true
}
case SideTypeBuy:
// find the filled ask order to remove
if filledOrder, ok := b.Asks.AnyFilled(); ok {
b.Asks.Delete(filledOrder.OrderID)
b.Bids.Delete(order.OrderID)
return true
}
}
return false
}
func (b *LocalActiveOrderBook) Orders() OrderSlice {
return append(b.Asks.Orders(), b.Bids.Orders()...)
}

View File

@ -54,7 +54,7 @@ type Exchange interface {
QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []Withdraw, err error)
SubmitOrders(ctx context.Context, orders ...SubmitOrder) (createdOrders []Order, err error)
SubmitOrders(ctx context.Context, orders ...SubmitOrder) (createdOrders OrderSlice, err error)
QueryOpenOrders(ctx context.Context, symbol string) (orders []Order, err error)

View File

@ -45,11 +45,10 @@ func (m Market) FormatPriceCurrency(val float64) string {
func (m Market) FormatPrice(val float64) string {
// p := math.Pow10(m.PricePrecision)
prec := int(math.Abs(math.Log10(m.MinPrice)))
p := math.Pow10(prec)
val = math.Trunc(val*p) / p
return strconv.FormatFloat(val, 'f', m.PricePrecision, 64)
return strconv.FormatFloat(val, 'f', prec, 64)
}
func (m Market) FormatQuantity(val float64) string {

156
pkg/types/ordermap.go Normal file
View File

@ -0,0 +1,156 @@
package types
import "sync"
// OrderMap is used for storing orders by their order id
type OrderMap map[uint64]Order
func (m OrderMap) Add(o Order) {
m[o.OrderID] = o
}
func (m OrderMap) Delete(orderID uint64) {
delete(m, orderID)
}
func (m OrderMap) IDs() (ids []uint64) {
for id := range m {
ids = append(ids, id)
}
return ids
}
func (m OrderMap) Exists(orderID uint64) bool {
_, ok := m[orderID]
return ok
}
func (m OrderMap) FindByStatus(status OrderStatus) (orders OrderSlice) {
for _, o := range m {
if o.Status == status {
orders = append(orders, o)
}
}
return orders
}
func (m OrderMap) Filled() OrderSlice {
return m.FindByStatus(OrderStatusFilled)
}
func (m OrderMap) Canceled() OrderSlice {
return m.FindByStatus(OrderStatusCanceled)
}
func (m OrderMap) Orders() (orders OrderSlice) {
for _, o := range m {
orders = append(orders, o)
}
return orders
}
type SyncOrderMap struct {
orders OrderMap
sync.RWMutex
}
func NewSyncOrderMap() *SyncOrderMap {
return &SyncOrderMap{
orders: make(OrderMap),
}
}
func (m *SyncOrderMap) Delete(orderID uint64) {
m.Lock()
defer m.Unlock()
m.orders.Delete(orderID)
}
func (m *SyncOrderMap) Add(o Order) {
m.Lock()
defer m.Unlock()
m.orders.Add(o)
}
func (m *SyncOrderMap) Iterate(it func(id uint64, order Order) bool) {
m.Lock()
defer m.Unlock()
for id := range m.orders {
if it(id, m.orders[id]) {
break
}
}
}
func (m *SyncOrderMap) Exists(orderID uint64) bool {
m.RLock()
defer m.RUnlock()
return m.orders.Exists(orderID)
}
func (m *SyncOrderMap) Len() int {
m.RLock()
defer m.RUnlock()
return len(m.orders)
}
func (m *SyncOrderMap) IDs() []uint64 {
m.RLock()
defer m.RUnlock()
return m.orders.IDs()
}
func (m *SyncOrderMap) FindByStatus(status OrderStatus) OrderSlice {
m.RLock()
defer m.RUnlock()
return m.orders.FindByStatus(status)
}
func (m *SyncOrderMap) Filled() OrderSlice {
return m.FindByStatus(OrderStatusFilled)
}
// AnyFilled find any order is filled and stop iterating the order map
func (m *SyncOrderMap) AnyFilled() (order Order, ok bool) {
m.RLock()
defer m.RUnlock()
for _, o := range m.orders {
if o.Status == OrderStatusFilled {
ok = true
order = o
return order, ok
}
}
return
}
func (m *SyncOrderMap) Canceled() OrderSlice {
return m.FindByStatus(OrderStatusCanceled)
}
func (m *SyncOrderMap) Orders() OrderSlice {
m.RLock()
defer m.RUnlock()
return m.orders.Orders()
}
type OrderSlice []Order
func (s OrderSlice) IDs() (ids []uint64) {
for _, o := range s {
ids = append(ids, o.OrderID)
}
return ids
}