Merge pull request #1008 from zenixls2/feature/speedup_live_trading

improve: speed-up live trade
This commit is contained in:
Yo-An Lin 2022-11-24 16:55:16 +08:00 committed by GitHub
commit e43e56a9fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 814 additions and 124 deletions

View File

@ -38,7 +38,8 @@ type GeneralOrderExecutor struct {
marginBaseMaxBorrowable, marginQuoteMaxBorrowable fixedpoint.Value
closing int64
disableNotify bool
closing int64
}
func NewGeneralOrderExecutor(session *ExchangeSession, symbol, strategy, strategyInstanceID string, position *types.Position) *GeneralOrderExecutor {
@ -66,6 +67,10 @@ func NewGeneralOrderExecutor(session *ExchangeSession, symbol, strategy, strateg
return executor
}
func (e *GeneralOrderExecutor) DisableNotify() {
e.disableNotify = true
}
func (e *GeneralOrderExecutor) startMarginAssetUpdater(ctx context.Context) {
marginService, ok := e.session.Exchange.(types.MarginBorrowRepayService)
if !ok {
@ -110,6 +115,10 @@ func (e *GeneralOrderExecutor) marginAssetMaxBorrowableUpdater(ctx context.Conte
}
}
func (e *GeneralOrderExecutor) OrderStore() *OrderStore {
return e.orderStore
}
func (e *GeneralOrderExecutor) ActiveMakerOrders() *ActiveOrderBook {
return e.activeMakerOrders
}
@ -148,15 +157,17 @@ func (e *GeneralOrderExecutor) Bind() {
e.activeMakerOrders.BindStream(e.session.UserDataStream)
e.orderStore.BindStream(e.session.UserDataStream)
// trade notify
e.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
Notify(trade)
})
if !e.disableNotify {
// trade notify
e.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
Notify(trade)
})
e.tradeCollector.OnPositionUpdate(func(position *types.Position) {
log.Infof("position changed: %s", position)
Notify(position)
})
e.tradeCollector.OnPositionUpdate(func(position *types.Position) {
log.Infof("position changed: %s", position)
Notify(position)
})
}
e.tradeCollector.BindStream(e.session.UserDataStream)
}
@ -170,6 +181,36 @@ func (e *GeneralOrderExecutor) CancelOrders(ctx context.Context, orders ...types
return err
}
// FastSubmitOrders send []types.SubmitOrder directly to the exchange without blocking wait on the status update.
// This is a faster version of SubmitOrders(). Created orders will be consumed in newly created goroutine (in non-backteset session).
// @param ctx: golang context type.
// @param submitOrders: Lists of types.SubmitOrder to be sent to the exchange.
// @return *types.SubmitOrder: SubmitOrder with calculated quantity and price.
// @return error: Error message.
func (e *GeneralOrderExecutor) FastSubmitOrders(ctx context.Context, submitOrders ...types.SubmitOrder) (types.OrderSlice, error) {
formattedOrders, err := e.session.FormatOrders(submitOrders)
if err != nil {
return nil, err
}
createdOrders, errIdx, err := BatchPlaceOrder(ctx, e.session.Exchange, formattedOrders...)
if len(errIdx) > 0 {
return nil, err
}
if IsBackTesting {
e.orderStore.Add(createdOrders...)
e.activeMakerOrders.Add(createdOrders...)
e.tradeCollector.Process()
} else {
go func() {
e.orderStore.Add(createdOrders...)
e.activeMakerOrders.Add(createdOrders...)
e.tradeCollector.Process()
}()
}
return createdOrders, err
}
func (e *GeneralOrderExecutor) SubmitOrders(ctx context.Context, submitOrders ...types.SubmitOrder) (types.OrderSlice, error) {
formattedOrders, err := e.session.FormatOrders(submitOrders)
if err != nil {
@ -262,7 +303,12 @@ func (e *GeneralOrderExecutor) reduceQuantityAndSubmitOrder(ctx context.Context,
return nil, multierr.Append(ErrExceededSubmitOrderRetryLimit, err)
}
func (e *GeneralOrderExecutor) OpenPosition(ctx context.Context, options OpenPositionOptions) (types.OrderSlice, error) {
// Create new submitOrder from OpenPositionOptions.
// @param ctx: golang context type.
// @param options: OpenPositionOptions to control the generated SubmitOrder in a higher level way. Notice that the Price in options will be updated as the submitOrder price.
// @return *types.SubmitOrder: SubmitOrder with calculated quantity and price.
// @return error: Error message.
func (e *GeneralOrderExecutor) NewOrderFromOpenPosition(ctx context.Context, options *OpenPositionOptions) (*types.SubmitOrder, error) {
price := options.Price
submitOrder := types.SubmitOrder{
Symbol: e.position.Symbol,
@ -284,9 +330,11 @@ func (e *GeneralOrderExecutor) OpenPosition(ctx context.Context, options OpenPos
if options.Long {
// use higher price to buy (this ensures that our order will be filled)
price = price.Mul(one.Add(options.LimitOrderTakerRatio))
options.Price = price
} else if options.Short {
// use lower price to sell (this ensures that our order will be filled)
price = price.Mul(one.Sub(options.LimitOrderTakerRatio))
options.Price = price
}
}
@ -320,14 +368,7 @@ func (e *GeneralOrderExecutor) OpenPosition(ctx context.Context, options OpenPos
submitOrder.Side = types.SideTypeBuy
submitOrder.Quantity = quantity
Notify("Opening %s long position with quantity %v at price %v", e.position.Symbol, quantity, price)
createdOrder, err := e.SubmitOrders(ctx, submitOrder)
if err == nil {
return createdOrder, nil
}
return e.reduceQuantityAndSubmitOrder(ctx, price, submitOrder)
return &submitOrder, nil
} else if options.Short {
if quantity.IsZero() {
var err error
@ -350,13 +391,42 @@ func (e *GeneralOrderExecutor) OpenPosition(ctx context.Context, options OpenPos
submitOrder.Side = types.SideTypeSell
submitOrder.Quantity = quantity
Notify("Opening %s short position with quantity %v at price %v", e.position.Symbol, quantity, price)
return e.reduceQuantityAndSubmitOrder(ctx, price, submitOrder)
return &submitOrder, nil
}
return nil, errors.New("options Long or Short must be set")
}
// OpenPosition sends the orders generated from OpenPositionOptions to the exchange by calling SubmitOrders or reduceQuantityAndSubmitOrder.
// @param ctx: golang context type.
// @param options: OpenPositionOptions to control the generated SubmitOrder in a higher level way. Notice that the Price in options will be updated as the submitOrder price.
// @return types.OrderSlice: Created orders with information from exchange.
// @return error: Error message.
func (e *GeneralOrderExecutor) OpenPosition(ctx context.Context, options OpenPositionOptions) (types.OrderSlice, error) {
submitOrder, err := e.NewOrderFromOpenPosition(ctx, &options)
if err != nil {
return nil, err
}
if submitOrder == nil {
return nil, nil
}
price := options.Price
side := "long"
if submitOrder.Side == types.SideTypeSell {
side = "short"
}
Notify("Opening %s %s position with quantity %v at price %v", e.position.Symbol, side, submitOrder.Quantity, price)
createdOrder, err := e.SubmitOrders(ctx, *submitOrder)
if err == nil {
return createdOrder, nil
}
return e.reduceQuantityAndSubmitOrder(ctx, price, *submitOrder)
}
// GracefulCancelActiveOrderBook cancels the orders from the active orderbook.
func (e *GeneralOrderExecutor) GracefulCancelActiveOrderBook(ctx context.Context, activeOrders *ActiveOrderBook) error {
if activeOrders.NumOfOrders() == 0 {

View File

@ -89,6 +89,10 @@ func (s *OrderStore) Add(orders ...types.Order) {
defer s.mu.Unlock()
for _, o := range orders {
old, ok := s.orders[o.OrderID]
if ok && o.Tag == "" && old.Tag != "" {
o.Tag = old.Tag
}
s.orders[o.OrderID] = o
}
}
@ -120,11 +124,11 @@ func (s *OrderStore) BindStream(stream types.Stream) {
return
}
s.handleOrderUpdate(order)
s.HandleOrderUpdate(order)
})
}
func (s *OrderStore) handleOrderUpdate(order types.Order) {
func (s *OrderStore) HandleOrderUpdate(order types.Order) {
switch order.Status {
case types.OrderStatusNew, types.OrderStatusPartiallyFilled, types.OrderStatusFilled:

View File

@ -56,6 +56,10 @@ func (c *TradeCollector) Position() *types.Position {
return c.position
}
func (c *TradeCollector) TradeStore() *TradeStore {
return c.tradeStore
}
func (c *TradeCollector) SetPosition(position *types.Position) {
c.position = position
}
@ -197,9 +201,11 @@ func (c *TradeCollector) processTrade(trade types.Trade) bool {
func (c *TradeCollector) ProcessTrade(trade types.Trade) bool {
key := trade.Key()
// if it's already done, remove the trade from the trade store
c.mu.Lock()
if _, done := c.doneTrades[key]; done {
return false
}
c.mu.Unlock()
if c.processTrade(trade) {
return true

View File

@ -32,3 +32,18 @@ const (
OrderStatusTypeRejected OrderStatusType = binance.OrderStatusTypeRejected
OrderStatusTypeExpired OrderStatusType = binance.OrderStatusTypeExpired
)
type CancelReplaceModeType string
const (
StopOnFailure CancelReplaceModeType = "STOP_ON_FAILURE"
AllowFailure CancelReplaceModeType = "ALLOW_FAILURE"
)
type OrderRespType string
const (
Ack OrderRespType = "ACK"
Result OrderRespType = "RESULT"
Full OrderRespType = "FULL"
)

View File

@ -0,0 +1,47 @@
package binanceapi
import (
"github.com/adshao/go-binance/v2"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/requestgen"
)
type CancelReplaceSpotOrderData struct {
CancelResult string `json:"cancelResult"`
NewOrderResult string `json:"newOrderResult"`
NewOrderResponse *binance.Order `json:"newOrderResponse"`
}
type CancelReplaceSpotOrderResponse struct {
Code int `json:"code,omitempty"`
Msg string `json:"msg,omitempty"`
Data *CancelReplaceSpotOrderData `json:"data"`
}
//go:generate requestgen -method POST -url "/api/v3/order/cancelReplace" -type CancelReplaceSpotOrderRequest -responseType .CancelReplaceSpotOrderResponse
type CancelReplaceSpotOrderRequest struct {
client requestgen.AuthenticatedAPIClient
symbol string `param:"symbol"`
side SideType `param:"side"`
cancelReplaceMode CancelReplaceModeType `param:"cancelReplaceMode"`
timeInForce string `param:"timeInForce"`
quantity string `param:"quantity"`
quoteOrderQty string `param:"quoteOrderQty"`
price string `param:"price"`
cancelNewClientOrderId string `param:"cancelNewClientOrderId"`
cancelOrigClientOrderId string `param:"cancelOrigClientOrderId"`
cancelOrderId int `param:"cancelOrderId"`
newClientOrderId string `param:"newClientOrderId"`
strategyId int `param:"strategyId"`
strategyType int `param:"strategyType"`
stopPrice string `param:"stopPrice"`
trailingDelta int `param:"trailingDelta"`
icebergQty string `param:"icebergQty"`
newOrderRespType OrderRespType `param:"newOrderRespType"`
recvWindow int `param:"recvWindow"`
timestamp types.MillisecondTimestamp `param:"timestamp"`
}
func (c *RestClient) NewCancelReplaceSpotOrderRequest() *CancelReplaceSpotOrderRequest {
return &CancelReplaceSpotOrderRequest{client: c}
}

View File

@ -0,0 +1,351 @@
// Code generated by "requestgen -method POST -url /api/v3/order/cancelReplace -type CancelReplaceSpotOrderRequest -responseType .CancelReplaceSpotOrderResponse"; DO NOT EDIT.
package binanceapi
import (
"context"
"encoding/json"
"fmt"
"github.com/adshao/go-binance/v2"
"github.com/c9s/bbgo/pkg/types"
"net/url"
"reflect"
"regexp"
)
func (c *CancelReplaceSpotOrderRequest) Symbol(symbol string) *CancelReplaceSpotOrderRequest {
c.symbol = symbol
return c
}
func (c *CancelReplaceSpotOrderRequest) Side(side binance.SideType) *CancelReplaceSpotOrderRequest {
c.side = side
return c
}
func (c *CancelReplaceSpotOrderRequest) CancelReplaceMode(cancelReplaceMode CancelReplaceModeType) *CancelReplaceSpotOrderRequest {
c.cancelReplaceMode = cancelReplaceMode
return c
}
func (c *CancelReplaceSpotOrderRequest) TimeInForce(timeInForce string) *CancelReplaceSpotOrderRequest {
c.timeInForce = timeInForce
return c
}
func (c *CancelReplaceSpotOrderRequest) Quantity(quantity string) *CancelReplaceSpotOrderRequest {
c.quantity = quantity
return c
}
func (c *CancelReplaceSpotOrderRequest) QuoteOrderQty(quoteOrderQty string) *CancelReplaceSpotOrderRequest {
c.quoteOrderQty = quoteOrderQty
return c
}
func (c *CancelReplaceSpotOrderRequest) Price(price string) *CancelReplaceSpotOrderRequest {
c.price = price
return c
}
func (c *CancelReplaceSpotOrderRequest) CancelNewClientOrderId(cancelNewClientOrderId string) *CancelReplaceSpotOrderRequest {
c.cancelNewClientOrderId = cancelNewClientOrderId
return c
}
func (c *CancelReplaceSpotOrderRequest) CancelOrigClientOrderId(cancelOrigClientOrderId string) *CancelReplaceSpotOrderRequest {
c.cancelOrigClientOrderId = cancelOrigClientOrderId
return c
}
func (c *CancelReplaceSpotOrderRequest) CancelOrderId(cancelOrderId int) *CancelReplaceSpotOrderRequest {
c.cancelOrderId = cancelOrderId
return c
}
func (c *CancelReplaceSpotOrderRequest) NewClientOrderId(newClientOrderId string) *CancelReplaceSpotOrderRequest {
c.newClientOrderId = newClientOrderId
return c
}
func (c *CancelReplaceSpotOrderRequest) StrategyId(strategyId int) *CancelReplaceSpotOrderRequest {
c.strategyId = strategyId
return c
}
func (c *CancelReplaceSpotOrderRequest) StrategyType(strategyType int) *CancelReplaceSpotOrderRequest {
c.strategyType = strategyType
return c
}
func (c *CancelReplaceSpotOrderRequest) StopPrice(stopPrice string) *CancelReplaceSpotOrderRequest {
c.stopPrice = stopPrice
return c
}
func (c *CancelReplaceSpotOrderRequest) TrailingDelta(trailingDelta int) *CancelReplaceSpotOrderRequest {
c.trailingDelta = trailingDelta
return c
}
func (c *CancelReplaceSpotOrderRequest) IcebergQty(icebergQty string) *CancelReplaceSpotOrderRequest {
c.icebergQty = icebergQty
return c
}
func (c *CancelReplaceSpotOrderRequest) NewOrderRespType(newOrderRespType OrderRespType) *CancelReplaceSpotOrderRequest {
c.newOrderRespType = newOrderRespType
return c
}
func (c *CancelReplaceSpotOrderRequest) RecvWindow(recvWindow int) *CancelReplaceSpotOrderRequest {
c.recvWindow = recvWindow
return c
}
func (c *CancelReplaceSpotOrderRequest) Timestamp(timestamp types.MillisecondTimestamp) *CancelReplaceSpotOrderRequest {
c.timestamp = timestamp
return c
}
// GetQueryParameters builds and checks the query parameters and returns url.Values
func (c *CancelReplaceSpotOrderRequest) GetQueryParameters() (url.Values, error) {
var params = map[string]interface{}{}
query := url.Values{}
for _k, _v := range params {
query.Add(_k, fmt.Sprintf("%v", _v))
}
return query, nil
}
// GetParameters builds and checks the parameters and return the result in a map object
func (c *CancelReplaceSpotOrderRequest) GetParameters() (map[string]interface{}, error) {
var params = map[string]interface{}{}
// check symbol field -> json key symbol
symbol := c.symbol
// assign parameter of symbol
params["symbol"] = symbol
// check side field -> json key side
side := c.side
// assign parameter of side
params["side"] = side
// check cancelReplaceMode field -> json key cancelReplaceMode
cancelReplaceMode := c.cancelReplaceMode
// TEMPLATE check-valid-values
switch cancelReplaceMode {
case StopOnFailure, AllowFailure:
params["cancelReplaceMode"] = cancelReplaceMode
default:
return nil, fmt.Errorf("cancelReplaceMode value %v is invalid", cancelReplaceMode)
}
// END TEMPLATE check-valid-values
// assign parameter of cancelReplaceMode
params["cancelReplaceMode"] = cancelReplaceMode
// check timeInForce field -> json key timeInForce
timeInForce := c.timeInForce
// assign parameter of timeInForce
params["timeInForce"] = timeInForce
// check quantity field -> json key quantity
quantity := c.quantity
// assign parameter of quantity
params["quantity"] = quantity
// check quoteOrderQty field -> json key quoteOrderQty
quoteOrderQty := c.quoteOrderQty
// assign parameter of quoteOrderQty
params["quoteOrderQty"] = quoteOrderQty
// check price field -> json key price
price := c.price
// assign parameter of price
params["price"] = price
// check cancelNewClientOrderId field -> json key cancelNewClientOrderId
cancelNewClientOrderId := c.cancelNewClientOrderId
// assign parameter of cancelNewClientOrderId
params["cancelNewClientOrderId"] = cancelNewClientOrderId
// check cancelOrigClientOrderId field -> json key cancelOrigClientOrderId
cancelOrigClientOrderId := c.cancelOrigClientOrderId
// assign parameter of cancelOrigClientOrderId
params["cancelOrigClientOrderId"] = cancelOrigClientOrderId
// check cancelOrderId field -> json key cancelOrderId
cancelOrderId := c.cancelOrderId
// assign parameter of cancelOrderId
params["cancelOrderId"] = cancelOrderId
// check newClientOrderId field -> json key newClientOrderId
newClientOrderId := c.newClientOrderId
// assign parameter of newClientOrderId
params["newClientOrderId"] = newClientOrderId
// check strategyId field -> json key strategyId
strategyId := c.strategyId
// assign parameter of strategyId
params["strategyId"] = strategyId
// check strategyType field -> json key strategyType
strategyType := c.strategyType
// assign parameter of strategyType
params["strategyType"] = strategyType
// check stopPrice field -> json key stopPrice
stopPrice := c.stopPrice
// assign parameter of stopPrice
params["stopPrice"] = stopPrice
// check trailingDelta field -> json key trailingDelta
trailingDelta := c.trailingDelta
// assign parameter of trailingDelta
params["trailingDelta"] = trailingDelta
// check icebergQty field -> json key icebergQty
icebergQty := c.icebergQty
// assign parameter of icebergQty
params["icebergQty"] = icebergQty
// check newOrderRespType field -> json key newOrderRespType
newOrderRespType := c.newOrderRespType
// TEMPLATE check-valid-values
switch newOrderRespType {
case Ack, Result, Full:
params["newOrderRespType"] = newOrderRespType
default:
return nil, fmt.Errorf("newOrderRespType value %v is invalid", newOrderRespType)
}
// END TEMPLATE check-valid-values
// assign parameter of newOrderRespType
params["newOrderRespType"] = newOrderRespType
// check recvWindow field -> json key recvWindow
recvWindow := c.recvWindow
// assign parameter of recvWindow
params["recvWindow"] = recvWindow
// check timestamp field -> json key timestamp
timestamp := c.timestamp
// assign parameter of timestamp
params["timestamp"] = timestamp
return params, nil
}
// GetParametersQuery converts the parameters from GetParameters into the url.Values format
func (c *CancelReplaceSpotOrderRequest) GetParametersQuery() (url.Values, error) {
query := url.Values{}
params, err := c.GetParameters()
if err != nil {
return query, err
}
for _k, _v := range params {
if c.isVarSlice(_v) {
c.iterateSlice(_v, func(it interface{}) {
query.Add(_k+"[]", fmt.Sprintf("%v", it))
})
} else {
query.Add(_k, fmt.Sprintf("%v", _v))
}
}
return query, nil
}
// GetParametersJSON converts the parameters from GetParameters into the JSON format
func (c *CancelReplaceSpotOrderRequest) GetParametersJSON() ([]byte, error) {
params, err := c.GetParameters()
if err != nil {
return nil, err
}
return json.Marshal(params)
}
// GetSlugParameters builds and checks the slug parameters and return the result in a map object
func (c *CancelReplaceSpotOrderRequest) GetSlugParameters() (map[string]interface{}, error) {
var params = map[string]interface{}{}
return params, nil
}
func (c *CancelReplaceSpotOrderRequest) applySlugsToUrl(url string, slugs map[string]string) string {
for _k, _v := range slugs {
needleRE := regexp.MustCompile(":" + _k + "\\b")
url = needleRE.ReplaceAllString(url, _v)
}
return url
}
func (c *CancelReplaceSpotOrderRequest) iterateSlice(slice interface{}, _f func(it interface{})) {
sliceValue := reflect.ValueOf(slice)
for _i := 0; _i < sliceValue.Len(); _i++ {
it := sliceValue.Index(_i).Interface()
_f(it)
}
}
func (c *CancelReplaceSpotOrderRequest) isVarSlice(_v interface{}) bool {
rt := reflect.TypeOf(_v)
switch rt.Kind() {
case reflect.Slice:
return true
}
return false
}
func (c *CancelReplaceSpotOrderRequest) GetSlugsMap() (map[string]string, error) {
slugs := map[string]string{}
params, err := c.GetSlugParameters()
if err != nil {
return slugs, nil
}
for _k, _v := range params {
slugs[_k] = fmt.Sprintf("%v", _v)
}
return slugs, nil
}
func (c *CancelReplaceSpotOrderRequest) Do(ctx context.Context) (*CancelReplaceSpotOrderResponse, error) {
params, err := c.GetParameters()
if err != nil {
return nil, err
}
query := url.Values{}
apiURL := "/api/v3/order/cancelReplace"
req, err := c.client.NewAuthenticatedRequest(ctx, "POST", apiURL, query, params)
if err != nil {
return nil, err
}
response, err := c.client.SendRequest(req)
if err != nil {
return nil, err
}
var apiResponse CancelReplaceSpotOrderResponse
if err := response.DecodeJSON(&apiResponse); err != nil {
return nil, err
}
return &apiResponse, nil
}

View File

@ -5,8 +5,10 @@ import (
"context"
"crypto/hmac"
"crypto/sha256"
"crypto/tls"
"encoding/json"
"fmt"
"net"
"net/http"
"net/url"
"strconv"
@ -19,13 +21,31 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
const defaultHTTPTimeout = time.Second * 15
const defaultHTTPTimeout = time.Second * 2
const RestBaseURL = "https://api.binance.com"
const SandboxRestBaseURL = "https://testnet.binance.vision"
const DebugRequestResponse = false
var dialer = &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}
var defaultTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: dialer.DialContext,
MaxIdleConns: 100,
MaxConnsPerHost: 100,
MaxIdleConnsPerHost: 100,
//TLSNextProto: make(map[string]func(string, *tls.Conn) http.RoundTripper),
ExpectContinueTimeout: 0,
ForceAttemptHTTP2: true,
TLSClientConfig: &tls.Config{},
}
var DefaultHttpClient = &http.Client{
Timeout: defaultHTTPTimeout,
Timeout: defaultHTTPTimeout,
Transport: defaultTransport,
}
type RestClient struct {

View File

@ -0,0 +1,70 @@
package binance
import (
"context"
"fmt"
"github.com/adshao/go-binance/v2"
"github.com/c9s/bbgo/pkg/exchange/binance/binanceapi"
"github.com/c9s/bbgo/pkg/types"
)
func (e *Exchange) CancelReplace(ctx context.Context, cancelReplaceMode types.CancelReplaceModeType, o types.Order) (*types.Order, error) {
if err := orderLimiter.Wait(ctx); err != nil {
log.WithError(err).Errorf("order rate limiter wait error")
}
if e.IsFutures || e.IsMargin {
// Not supported at the moment
return nil, nil
}
var req = e.client2.NewCancelReplaceSpotOrderRequest()
req.Symbol(o.Symbol)
req.Side(binance.SideType(o.Side))
if o.OrderID > 0 {
req.CancelOrderId(int(o.OrderID))
} else {
return nil, types.NewOrderError(fmt.Errorf("cannot cancel %s order", o.Symbol), o)
}
req.CancelReplaceMode(binanceapi.CancelReplaceModeType(cancelReplaceMode))
if len(o.TimeInForce) > 0 {
// TODO: check the TimeInForce value
req.TimeInForce(string(binance.TimeInForceType(o.TimeInForce)))
} else {
switch o.Type {
case types.OrderTypeLimit, types.OrderTypeStopLimit:
req.TimeInForce(string(binance.TimeInForceTypeGTC))
}
}
if o.Market.Symbol != "" {
req.Quantity(o.Market.FormatQuantity(o.Quantity))
} else {
req.Quantity(o.Quantity.FormatString(8))
}
switch o.Type {
case types.OrderTypeStopLimit, types.OrderTypeLimit, types.OrderTypeLimitMaker:
if o.Market.Symbol != "" {
req.Price(o.Market.FormatPrice(o.Price))
} else {
// TODO: report error
req.Price(o.Price.FormatString(8))
}
}
switch o.Type {
case types.OrderTypeStopLimit, types.OrderTypeStopMarket:
if o.Market.Symbol != "" {
req.StopPrice(o.Market.FormatPrice(o.StopPrice))
} else {
// TODO report error
req.StopPrice(o.StopPrice.FormatString(8))
}
}
req.NewOrderRespType(binanceapi.Full)
resp, err := req.Do(ctx)
if resp != nil && resp.Data != nil && resp.Data.NewOrderResponse != nil {
return toGlobalOrder(resp.Data.NewOrderResponse, e.IsMargin)
}
return nil, err
}

View File

@ -1 +0,0 @@
package statistics

View File

@ -64,7 +64,6 @@ type Strategy struct {
*types.ProfitStats `persistence:"profit_stats"`
*types.TradeStats `persistence:"trade_stats"`
p *types.Position
MinInterval types.Interval `json:"MinInterval"` // minimum interval referred for doing stoploss/trailing exists and updating highest/lowest
elapsed *types.Queue
@ -173,10 +172,22 @@ func (s *Strategy) CurrentPosition() *types.Position {
return s.Position
}
func (s *Strategy) SubmitOrder(ctx context.Context, submitOrder types.SubmitOrder) (*types.Order, error) {
formattedOrder, err := s.Session.FormatOrder(submitOrder)
if err != nil {
return nil, err
}
createdOrders, errIdx, err := bbgo.BatchPlaceOrder(ctx, s.Session.Exchange, formattedOrder)
if len(errIdx) > 0 {
return nil, err
}
return &createdOrders[0], err
}
const closeOrderRetryLimit = 5
func (s *Strategy) ClosePosition(ctx context.Context, percentage fixedpoint.Value) error {
order := s.p.NewMarketCloseOrder(percentage)
order := s.Position.NewMarketCloseOrder(percentage)
if order == nil {
return nil
}
@ -186,10 +197,10 @@ func (s *Strategy) ClosePosition(ctx context.Context, percentage fixedpoint.Valu
order.MarginSideEffect = types.SideEffectTypeAutoRepay
for i := 0; i < closeOrderRetryLimit; i++ {
price := s.getLastPrice()
balances := s.GeneralOrderExecutor.Session().GetAccount().Balances()
baseBalance := balances[s.Market.BaseCurrency].Available
balances := s.Session.GetAccount().Balances()
baseBalance := balances[s.Market.BaseCurrency].Total()
if order.Side == types.SideTypeBuy {
quoteAmount := balances[s.Market.QuoteCurrency].Available.Div(price)
quoteAmount := balances[s.Market.QuoteCurrency].Total().Div(price)
if order.Quantity.Compare(quoteAmount) > 0 {
order.Quantity = quoteAmount
}
@ -199,11 +210,17 @@ func (s *Strategy) ClosePosition(ctx context.Context, percentage fixedpoint.Valu
if s.Market.IsDustQuantity(order.Quantity, price) {
return nil
}
_, err := s.GeneralOrderExecutor.SubmitOrders(ctx, *order)
o, err := s.SubmitOrder(ctx, *order)
if err != nil {
order.Quantity = order.Quantity.Mul(fixedpoint.One.Sub(Delta))
continue
}
if o != nil {
if o.Status == types.OrderStatusNew || o.Status == types.OrderStatusPartiallyFilled {
log.Errorf("created Order when Close: %v", o)
}
}
return nil
}
return errors.New("exceed retry limit")
@ -300,12 +317,13 @@ func (s *Strategy) smartCancel(ctx context.Context, pricef, atr float64, syscoun
}
if toCancel {
err := s.GeneralOrderExecutor.FastCancel(ctx)
s.pendingLock.Lock()
counters := s.orderPendingCounter
s.orderPendingCounter = make(map[uint64]int)
s.pendingLock.Unlock()
// TODO: clean orderPendingCounter on cancel/trade
for _, order := range nonTraded {
s.pendingLock.Lock()
counter := s.orderPendingCounter[order.OrderID]
delete(s.orderPendingCounter, order.OrderID)
s.pendingLock.Unlock()
counter := counters[order.OrderID]
if order.Side == types.SideTypeSell {
if s.maxCounterSellCanceled < counter {
s.maxCounterSellCanceled = counter
@ -411,47 +429,47 @@ func (s *Strategy) Rebalance(ctx context.Context) {
quoteBalance := balances[s.Market.QuoteCurrency].Total()
total := baseBalance.Add(quoteBalance.Div(price))
percentage := fixedpoint.One.Sub(Delta)
log.Infof("rebalance beta %f %v", beta, s.p)
log.Infof("rebalance beta %f %v", beta, s.Position)
if beta > s.RebalanceFilter {
if total.Mul(percentage).Compare(baseBalance) > 0 {
q := total.Mul(percentage).Sub(baseBalance)
s.p.Lock()
defer s.p.Unlock()
s.p.Base = q.Neg()
s.p.Quote = q.Mul(price)
s.p.AverageCost = price
s.Position.Lock()
defer s.Position.Unlock()
s.Position.Base = q.Neg()
s.Position.Quote = q.Mul(price)
s.Position.AverageCost = price
}
} else if beta <= -s.RebalanceFilter {
if total.Mul(percentage).Compare(quoteBalance.Div(price)) > 0 {
q := total.Mul(percentage).Sub(quoteBalance.Div(price))
s.p.Lock()
defer s.p.Unlock()
s.p.Base = q
s.p.Quote = q.Mul(price).Neg()
s.p.AverageCost = price
s.Position.Lock()
defer s.Position.Unlock()
s.Position.Base = q
s.Position.Quote = q.Mul(price).Neg()
s.Position.AverageCost = price
}
} else {
if total.Div(Two).Compare(quoteBalance.Div(price)) > 0 {
q := total.Div(Two).Sub(quoteBalance.Div(price))
s.p.Lock()
defer s.p.Unlock()
s.p.Base = q
s.p.Quote = q.Mul(price).Neg()
s.p.AverageCost = price
s.Position.Lock()
defer s.Position.Unlock()
s.Position.Base = q
s.Position.Quote = q.Mul(price).Neg()
s.Position.AverageCost = price
} else if total.Div(Two).Compare(baseBalance) > 0 {
q := total.Div(Two).Sub(baseBalance)
s.p.Lock()
defer s.p.Unlock()
s.p.Base = q.Neg()
s.p.Quote = q.Mul(price)
s.p.AverageCost = price
s.Position.Lock()
defer s.Position.Unlock()
s.Position.Base = q.Neg()
s.Position.Quote = q.Mul(price)
s.Position.AverageCost = price
} else {
s.p.Lock()
defer s.p.Unlock()
s.p.Reset()
s.Position.Lock()
defer s.Position.Unlock()
s.Position.Reset()
}
}
log.Infof("rebalanceafter %v %v %v", baseBalance, quoteBalance, s.p)
log.Infof("rebalanceafter %v %v %v", baseBalance, quoteBalance, s.Position)
s.beta = beta
}
@ -514,14 +532,14 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine, counter
sourcef := source.Float64()
s.priceLines.Update(sourcef)
// s.ma.Update(sourcef)
s.ma.Update(sourcef)
s.trendLine.Update(sourcef)
s.drift.Update(sourcef, kline.Volume.Abs().Float64())
s.atr.PushK(kline)
atr := s.atr.Last()
price := kline.Close // s.getLastPrice()
price := kline.Close //s.getLastPrice()
pricef := price.Float64()
lowf := math.Min(kline.Low.Float64(), pricef)
highf := math.Max(kline.High.Float64(), pricef)
@ -600,6 +618,10 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine, counter
}
return
}
if exitCondition {
_ = s.ClosePosition(ctx, fixedpoint.One)
return
}
if longCondition {
source = source.Sub(fixedpoint.NewFromFloat(s.stdevLow.Last() * s.HighLowVarianceMultiplier))
@ -607,19 +629,18 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine, counter
source = price
}
log.Infof("source in long %v %v %f", source, price, s.stdevLow.Last())
opt := s.OpenPositionOptions
opt.Long = true
opt.LimitOrder = true
// force to use market taker
if counter-s.maxCounterBuyCanceled <= s.PendingMinInterval {
if counter-s.maxCounterBuyCanceled <= s.PendingMinInterval && s.maxCounterBuyCanceled > s.maxCounterSellCanceled {
opt.LimitOrder = false
source = price
}
opt.Price = source
opt.Tags = []string{"long"}
createdOrders, err := s.GeneralOrderExecutor.OpenPosition(ctx, opt)
submitOrder, err := s.GeneralOrderExecutor.NewOrderFromOpenPosition(ctx, &opt)
if err != nil {
errs := filterErrors(multierr.Errors(err))
if len(errs) > 0 {
@ -628,15 +649,26 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine, counter
}
return
}
if submitOrder == nil {
return
}
log.Infof("orders %v", createdOrders)
if createdOrders != nil {
for _, o := range createdOrders {
if o.Status == types.OrderStatusNew || o.Status == types.OrderStatusPartiallyFilled {
s.pendingLock.Lock()
log.Infof("source in long %v %v %f", source, price, s.stdevLow.Last())
o, err := s.SubmitOrder(ctx, *submitOrder)
if err != nil {
log.WithError(err).Errorf("cannot place buy order")
return
}
log.Infof("order %v", o)
if o != nil {
if o.Status == types.OrderStatusNew || o.Status == types.OrderStatusPartiallyFilled {
s.pendingLock.Lock()
if _, ok := s.orderPendingCounter[o.OrderID]; !ok {
s.orderPendingCounter[o.OrderID] = counter
s.pendingLock.Unlock()
}
s.pendingLock.Unlock()
}
}
return
@ -647,17 +679,18 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine, counter
source = price
}
log.Infof("source in short: %v", source)
log.Infof("source in short: %v %v %f", source, price, s.stdevLow.Last())
opt := s.OpenPositionOptions
opt.Short = true
opt.Price = source
opt.LimitOrder = true
if counter-s.maxCounterSellCanceled <= s.PendingMinInterval {
if counter-s.maxCounterSellCanceled <= s.PendingMinInterval && s.maxCounterSellCanceled > s.maxCounterBuyCanceled {
opt.LimitOrder = false
source = price
}
opt.Price = source
opt.Tags = []string{"short"}
createdOrders, err := s.GeneralOrderExecutor.OpenPosition(ctx, opt)
submitOrder, err := s.GeneralOrderExecutor.NewOrderFromOpenPosition(ctx, &opt)
if err != nil {
errs := filterErrors(multierr.Errors(err))
if len(errs) > 0 {
@ -665,14 +698,22 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine, counter
}
return
}
log.Infof("orders %v", createdOrders)
if createdOrders != nil {
for _, o := range createdOrders {
if o.Status == types.OrderStatusNew || o.Status == types.OrderStatusPartiallyFilled {
s.pendingLock.Lock()
if submitOrder == nil {
return
}
o, err := s.SubmitOrder(ctx, *submitOrder)
if err != nil {
log.WithError(err).Errorf("cannot place sell order")
return
}
log.Infof("order %v", o)
if o != nil {
if o.Status == types.OrderStatusNew || o.Status == types.OrderStatusPartiallyFilled {
s.pendingLock.Lock()
if _, ok := s.orderPendingCounter[o.OrderID]; !ok {
s.orderPendingCounter[o.OrderID] = counter
s.pendingLock.Unlock()
}
s.pendingLock.Unlock()
}
}
return
@ -687,12 +728,12 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// Will be set by persistence if there's any from DB
if s.Position == nil {
s.Position = types.NewPositionFromMarket(s.Market)
s.p = types.NewPositionFromMarket(s.Market)
} else {
s.p = types.NewPositionFromMarket(s.Market)
s.p.Base = s.Position.Base
s.p.Quote = s.Position.Quote
s.p.AverageCost = s.Position.AverageCost
}
if s.Session.MakerFeeRate.Sign() > 0 || s.Session.TakerFeeRate.Sign() > 0 {
s.Position.SetExchangeFeeRate(s.Session.ExchangeName, types.ExchangeFee{
MakerFeeRate: s.Session.MakerFeeRate,
TakerFeeRate: s.Session.TakerFeeRate,
})
}
if s.ProfitStats == nil {
s.ProfitStats = types.NewProfitStats(s.Market)
@ -712,23 +753,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
_ = s.ClosePosition(ctx, fixedpoint.One)
})
s.GeneralOrderExecutor = bbgo.NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID, s.Position)
s.GeneralOrderExecutor.BindEnvironment(s.Environment)
s.GeneralOrderExecutor.BindProfitStats(s.ProfitStats)
s.GeneralOrderExecutor.BindTradeStats(s.TradeStats)
s.GeneralOrderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(ctx, s)
})
s.GeneralOrderExecutor.Bind()
s.orderPendingCounter = make(map[uint64]int)
// Exit methods from config
for _, method := range s.ExitMethods {
method.Bind(session, s.GeneralOrderExecutor)
}
profit := floats.Slice{1., 1.}
profitChart := floats.Slice{1., 1.}
price, _ := s.Session.LastPrice(s.Symbol)
initAsset := s.CalcAssetValue(price).Float64()
cumProfit := floats.Slice{initAsset, initAsset}
@ -740,33 +765,100 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
return p * (1. - Fee)
}
}
s.GeneralOrderExecutor.TradeCollector().OnTrade(func(trade types.Trade, _profit, _netProfit fixedpoint.Value) {
s.p.AddTrade(trade)
s.GeneralOrderExecutor = bbgo.NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID, s.Position)
s.GeneralOrderExecutor.DisableNotify()
orderStore := s.GeneralOrderExecutor.OrderStore()
orderStore.AddOrderUpdate = true
orderStore.RemoveCancelled = true
orderStore.RemoveFilled = true
activeOrders := s.GeneralOrderExecutor.ActiveMakerOrders()
tradeCollector := s.GeneralOrderExecutor.TradeCollector()
tradeStore := tradeCollector.TradeStore()
syscounter := 0
// Modify activeOrders to force write order updates
s.Session.UserDataStream.OnOrderUpdate(func(order types.Order) {
hasSymbol := len(activeOrders.Symbol) > 0
if hasSymbol && order.Symbol != activeOrders.Symbol {
return
}
switch order.Status {
case types.OrderStatusFilled:
s.pendingLock.Lock()
s.orderPendingCounter = make(map[uint64]int)
s.pendingLock.Unlock()
// make sure we have the order and we remove it
activeOrders.Remove(order)
case types.OrderStatusPartiallyFilled:
s.pendingLock.Lock()
if _, ok := s.orderPendingCounter[order.OrderID]; !ok {
s.orderPendingCounter[order.OrderID] = syscounter
}
s.pendingLock.Unlock()
activeOrders.Add(order)
case types.OrderStatusNew:
s.pendingLock.Lock()
if _, ok := s.orderPendingCounter[order.OrderID]; !ok {
s.orderPendingCounter[order.OrderID] = syscounter
}
s.pendingLock.Unlock()
activeOrders.Add(order)
case types.OrderStatusCanceled, types.OrderStatusRejected:
log.Debugf("[ActiveOrderBook] order status %s, removing order %s", order.Status, order)
s.pendingLock.Lock()
s.orderPendingCounter = make(map[uint64]int)
s.pendingLock.Unlock()
activeOrders.Remove(order)
default:
log.Errorf("unhandled order status: %s", order.Status)
}
orderStore.HandleOrderUpdate(order)
})
s.Session.UserDataStream.OnTradeUpdate(func(trade types.Trade) {
if trade.Symbol != s.Symbol {
return
}
profit, netProfit, madeProfit := s.Position.AddTrade(trade)
tradeStore.Add(trade)
if madeProfit {
p := s.Position.NewProfit(trade, profit, netProfit)
s.Environment.RecordPosition(s.Position, trade, &p)
s.TradeStats.Add(&p)
s.ProfitStats.AddTrade(trade)
s.ProfitStats.AddProfit(p)
bbgo.Notify(&p)
bbgo.Notify(s.ProfitStats)
}
price := trade.Price.Float64()
s.pendingLock.Lock()
delete(s.orderPendingCounter, trade.OrderID)
s.pendingLock.Unlock()
if s.buyPrice > 0 {
profit.Update(modify(price / s.buyPrice))
profitChart.Update(modify(price / s.buyPrice))
cumProfit.Update(s.CalcAssetValue(trade.Price).Float64())
} else if s.sellPrice > 0 {
profit.Update(modify(s.sellPrice / price))
profitChart.Update(modify(s.sellPrice / price))
cumProfit.Update(s.CalcAssetValue(trade.Price).Float64())
}
s.positionLock.Lock()
if s.p.IsDust(trade.Price) {
if s.Position.IsDust(trade.Price) {
s.buyPrice = 0
s.sellPrice = 0
s.highestPrice = 0
s.lowestPrice = 0
} else if s.p.IsLong() {
s.buyPrice = s.p.ApproximateAverageCost.Float64()
} else if s.Position.IsLong() {
s.buyPrice = s.Position.ApproximateAverageCost.Float64()
s.sellPrice = 0
s.highestPrice = math.Max(s.buyPrice, s.highestPrice)
s.lowestPrice = s.buyPrice
} else if s.p.IsShort() {
s.sellPrice = s.p.ApproximateAverageCost.Float64()
} else if s.Position.IsShort() {
s.sellPrice = s.Position.ApproximateAverageCost.Float64()
s.buyPrice = 0
s.highestPrice = s.sellPrice
if s.lowestPrice == 0 {
@ -778,6 +870,13 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.positionLock.Unlock()
})
s.orderPendingCounter = make(map[uint64]int)
// Exit methods from config
for _, method := range s.ExitMethods {
method.Bind(session, s.GeneralOrderExecutor)
}
s.frameKLine = &types.KLine{}
s.klineMin = &types.KLine{}
s.priceLines = types.NewQueue(300)
@ -788,7 +887,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, s.startTime))
s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1w, s.startTime))
s.InitDrawCommands(&profit, &cumProfit)
s.InitDrawCommands(&profitChart, &cumProfit)
bbgo.RegisterCommand("/config", "Show latest config", func(reply interact.Reply) {
var buffer bytes.Buffer
@ -796,10 +895,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
reply.Message(buffer.String())
})
bbgo.RegisterCommand("/pos", "Show internal position", func(reply interact.Reply) {
reply.Message(s.p.String())
})
bbgo.RegisterCommand("/dump", "Dump internal params", func(reply interact.Reply) {
reply.Message("Please enter series output length:")
}).Next(func(length string, reply interact.Reply) {
@ -825,9 +920,9 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
return nil
}
// var lastK types.KLine
store.OnKLineClosed(func(kline types.KLine) {
counter := int(kline.StartTime.Time().Add(kline.Interval.Duration()).Sub(s.startTime).Milliseconds()) / s.MinInterval.Milliseconds()
syscounter = counter
if kline.Interval == s.Interval {
s.klineHandler(ctx, kline, counter)
} else if kline.Interval == s.MinInterval {
@ -837,6 +932,10 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
if !bbgo.IsBackTesting {
bbgo.Sync(ctx, s)
}
var buffer bytes.Buffer
s.Print(&buffer, true, true)
@ -847,10 +946,12 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
}
fmt.Fprintln(&buffer, s.TradeStats.BriefString())
fmt.Fprintf(&buffer, "%v\n", s.orderPendingCounter)
os.Stdout.Write(buffer.Bytes())
if s.GenerateGraph {
s.Draw(s.frameKLine.StartTime, &profit, &cumProfit)
s.Draw(s.frameKLine.StartTime, &profitChart, &cumProfit)
}
wg.Done()
})

View File

@ -20,6 +20,13 @@ func init() {
_ = PlainText(&Order{})
}
type CancelReplaceModeType string
var (
StopOnFailure CancelReplaceModeType = "STOP_ON_FAILURE"
AllowFailure CancelReplaceModeType = "ALLOW_FAILURE"
)
type TimeInForce string
var (