Merge pull request #1206 from c9s/improve/concurrent-stop-loss

IMPROVE: improve stop loss methods
This commit is contained in:
c9s 2023-06-29 14:25:02 +08:00 committed by GitHub
commit 3da145877f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 181 additions and 68 deletions

View File

@ -8,8 +8,17 @@ import (
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/dynamic"
"github.com/c9s/bbgo/pkg/util"
)
var enableMarketTradeStop = true
func init() {
if v, defined := util.GetEnvVarBool("ENABLE_MARKET_TRADE_STOP"); defined {
enableMarketTradeStop = v
}
}
type ExitMethodSet []ExitMethod
func (s *ExitMethodSet) SetAndSubscribe(session *ExchangeSession, parent interface{}) {

View File

@ -9,8 +9,6 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
const enableMarketTradeStop = false
// ProtectiveStopLoss provides a way to protect your profit but also keep a room for the price volatility
// Set ActivationRatio to 1% means if the price is away from your average cost by 1%, we will activate the protective stop loss
// and the StopLossRatio is the minimal profit ratio you want to keep for your position.
@ -124,14 +122,17 @@ func (s *ProtectiveStopLoss) Bind(session *ExchangeSession, orderExecutor *Gener
})
position := orderExecutor.Position()
session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, func(kline types.KLine) {
f := func(kline types.KLine) {
isPositionOpened := !position.IsClosed() && !position.IsDust(kline.Close)
if isPositionOpened {
s.handleChange(context.Background(), position, kline.Close, s.orderExecutor)
} else {
s.stopLossPrice = fixedpoint.Zero
}
}))
}
session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, f))
session.MarketDataStream.OnKLine(types.KLineWith(s.Symbol, types.Interval1m, f))
if !IsBackTesting && enableMarketTradeStop {
session.MarketDataStream.OnMarketTrade(func(trade types.Trade) {
@ -170,8 +171,12 @@ func (s *ProtectiveStopLoss) handleChange(ctx context.Context, position *types.P
s.stopLossPrice = position.AverageCost.Mul(one.Add(s.StopLossRatio))
}
Notify("[ProtectiveStopLoss] %s protection stop loss activated, current price = %f, average cost = %f, stop loss price = %f",
position.Symbol, closePrice.Float64(), position.AverageCost.Float64(), s.stopLossPrice.Float64())
Notify("[ProtectiveStopLoss] %s protection (%s) stop loss activated, SL = %f, currentPrice = %f, averageCost = %f",
position.Symbol,
s.StopLossRatio.Percentage(),
s.stopLossPrice.Float64(),
closePrice.Float64(),
position.AverageCost.Float64())
if s.PlaceStopOrder {
if err := s.placeStopOrder(ctx, position, orderExecutor); err != nil {
@ -195,7 +200,11 @@ func (s *ProtectiveStopLoss) checkStopPrice(closePrice fixedpoint.Value, positio
}
if s.shouldStop(closePrice, position) {
Notify("[ProtectiveStopLoss] protection stop order is triggered at price %f", closePrice.Float64(), position)
Notify("[ProtectiveStopLoss] %s protection stop (%s) is triggered at price %f",
s.Symbol,
s.StopLossRatio.Percentage(),
closePrice.Float64(),
position)
if err := s.orderExecutor.ClosePosition(context.Background(), one, "protectiveStopLoss"); err != nil {
log.WithError(err).Errorf("failed to close position")
}

View File

@ -26,9 +26,12 @@ func (s *RoiStopLoss) Bind(session *ExchangeSession, orderExecutor *GeneralOrder
s.orderExecutor = orderExecutor
position := orderExecutor.Position()
session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, func(kline types.KLine) {
f := func(kline types.KLine) {
s.checkStopPrice(kline.Close, position)
}))
}
session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, f))
session.MarketDataStream.OnKLine(types.KLineWith(s.Symbol, types.Interval1m, f))
if !IsBackTesting && enableMarketTradeStop {
session.MarketDataStream.OnMarketTrade(func(trade types.Trade) {
@ -50,7 +53,7 @@ func (s *RoiStopLoss) checkStopPrice(closePrice fixedpoint.Value, position *type
// logrus.Debugf("ROIStopLoss: price=%f roi=%s stop=%s", closePrice.Float64(), roi.Percentage(), s.Percentage.Neg().Percentage())
if roi.Compare(s.Percentage.Neg()) < 0 {
// stop loss
Notify("[RoiStopLoss] %s stop loss triggered by ROI %s/%s, price: %f", position.Symbol, roi.Percentage(), s.Percentage.Neg().Percentage(), closePrice.Float64())
Notify("[RoiStopLoss] %s stop loss triggered by ROI %s/%s, currentPrice = %f", position.Symbol, roi.Percentage(), s.Percentage.Neg().Percentage(), closePrice.Float64())
if s.CancelActiveOrders {
_ = s.orderExecutor.GracefulCancel(context.Background())
}

View File

@ -52,11 +52,14 @@ func (s *TrailingStop2) Bind(session *ExchangeSession, orderExecutor *GeneralOrd
s.latestHigh = fixedpoint.Zero
position := orderExecutor.Position()
session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) {
f := func(kline types.KLine) {
if err := s.checkStopPrice(kline.Close, position); err != nil {
log.WithError(err).Errorf("error")
}
}))
}
session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, s.Interval, f))
session.MarketDataStream.OnKLine(types.KLineWith(s.Symbol, s.Interval, f))
if !IsBackTesting && enableMarketTradeStop {
session.MarketDataStream.OnMarketTrade(types.TradeWith(position.Symbol, func(trade types.Trade) {

View File

@ -4,13 +4,13 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.uber.org/multierr"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
@ -42,7 +42,6 @@ type GeneralOrderExecutor struct {
maxRetries uint
disableNotify bool
closing int64
}
func NewGeneralOrderExecutor(session *ExchangeSession, symbol, strategy, strategyInstanceID string, position *types.Position) *GeneralOrderExecutor {
@ -390,6 +389,10 @@ func (e *GeneralOrderExecutor) NewOrderFromOpenPosition(ctx context.Context, opt
// @return types.OrderSlice: Created orders with information from exchange.
// @return error: Error message.
func (e *GeneralOrderExecutor) OpenPosition(ctx context.Context, options OpenPositionOptions) (types.OrderSlice, error) {
if e.position.IsClosing() {
return nil, errors.Wrap(ErrPositionAlreadyClosing, "unable to open position")
}
submitOrder, err := e.NewOrderFromOpenPosition(ctx, &options)
if err != nil {
return nil, err
@ -442,23 +445,22 @@ func (e *GeneralOrderExecutor) GracefulCancel(ctx context.Context, orders ...typ
return nil
}
var ErrPositionAlreadyClosing = errors.New("position is already in closing process")
// ClosePosition closes the current position by a percentage.
// percentage 0.1 means close 10% position
// tag is the order tag you want to attach, you may pass multiple tags, the tags will be combined into one tag string by commas.
func (e *GeneralOrderExecutor) ClosePosition(ctx context.Context, percentage fixedpoint.Value, tags ...string) error {
if !e.position.SetClosing(true) {
return ErrPositionAlreadyClosing
}
defer e.position.SetClosing(false)
submitOrder := e.position.NewMarketCloseOrder(percentage)
if submitOrder == nil {
return nil
}
if e.closing > 0 {
log.Errorf("position is already closing")
return nil
}
atomic.AddInt64(&e.closing, 1)
defer atomic.StoreInt64(&e.closing, 0)
if e.session.Futures { // Futures: Use base qty in e.position
submitOrder.Quantity = e.position.GetBase().Abs()
submitOrder.ReduceOnly = true
@ -496,8 +498,22 @@ func (e *GeneralOrderExecutor) ClosePosition(ctx context.Context, percentage fix
Notify("Closing %s position %s with tags: %s", e.symbol, percentage.Percentage(), tagStr)
_, err := e.SubmitOrders(ctx, *submitOrder)
return err
createdOrders, err := e.SubmitOrders(ctx, *submitOrder)
if err != nil {
return err
}
if queryOrderService, ok := e.session.Exchange.(types.ExchangeOrderQueryService); ok && !IsBackTesting {
switch submitOrder.Type {
case types.OrderTypeMarket:
_, err2 := retry.QueryOrderUntilSuccessful(ctx, queryOrderService, createdOrders[0].Symbol, createdOrders[0].OrderID)
if err2 != nil {
log.WithError(err2).Errorf("unable to query order")
}
}
}
return nil
}
func (e *GeneralOrderExecutor) TradeCollector() *TradeCollector {

View File

@ -0,0 +1,76 @@
package retry
import (
"context"
"errors"
"strconv"
backoff2 "github.com/cenkalti/backoff/v4"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util/backoff"
)
type advancedOrderCancelService interface {
CancelAllOrders(ctx context.Context) ([]types.Order, error)
CancelOrdersBySymbol(ctx context.Context, symbol string) ([]types.Order, error)
CancelOrdersByGroupID(ctx context.Context, groupID uint32) ([]types.Order, error)
}
func QueryOrderUntilSuccessful(ctx context.Context, queryOrderService types.ExchangeOrderQueryService, symbol string, orderId uint64) (o *types.Order, err error) {
err = backoff.RetryGeneral(ctx, func() (err2 error) {
o, err2 = queryOrderService.QueryOrder(ctx, types.OrderQuery{
Symbol: symbol,
OrderID: strconv.FormatUint(orderId, 10),
})
if err2 != nil || o == nil {
return err2
}
if o.Status != types.OrderStatusFilled {
return errors.New("order is not filled yet")
}
return err2
})
return o, err
}
func GeneralBackoff(ctx context.Context, op backoff2.Operation) (err error) {
err = backoff2.Retry(op, backoff2.WithContext(
backoff2.WithMaxRetries(
backoff2.NewExponentialBackOff(),
101),
ctx))
return err
}
func QueryOpenOrdersUntilSuccessful(ctx context.Context, ex types.Exchange, symbol string) (openOrders []types.Order, err error) {
var op = func() (err2 error) {
openOrders, err2 = ex.QueryOpenOrders(ctx, symbol)
return err2
}
err = GeneralBackoff(ctx, op)
return openOrders, err
}
func CancelAllOrdersUntilSuccessful(ctx context.Context, service advancedOrderCancelService) error {
var op = func() (err2 error) {
_, err2 = service.CancelAllOrders(ctx)
return err2
}
return GeneralBackoff(ctx, op)
}
func CancelOrdersUntilSuccessful(ctx context.Context, ex types.Exchange, orders ...types.Order) error {
var op = func() (err2 error) {
err2 = ex.CancelOrders(ctx, orders...)
return err2
}
return GeneralBackoff(ctx, op)
}

View File

@ -11,7 +11,6 @@ import (
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@ -19,6 +18,7 @@ import (
"go.uber.org/multierr"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
@ -974,7 +974,7 @@ func (s *Strategy) cancelAll(ctx context.Context) error {
for {
s.logger.Infof("checking %s open orders...", s.Symbol)
openOrders, err := queryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol)
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol)
if err != nil {
s.logger.WithError(err).Errorf("CancelOrdersByGroupID api call error")
werr = multierr.Append(werr, err)
@ -987,7 +987,7 @@ func (s *Strategy) cancelAll(ctx context.Context) error {
s.logger.Infof("found %d open orders left, using cancel all orders api", len(openOrders))
s.logger.Infof("using cancal all orders api for canceling grid orders...")
if err := cancelAllOrdersUntilSuccessful(ctx, service); err != nil {
if err := retry.CancelAllOrdersUntilSuccessful(ctx, service); err != nil {
s.logger.WithError(err).Errorf("CancelAllOrders api call error")
werr = multierr.Append(werr, err)
}
@ -1393,12 +1393,12 @@ func (s *Strategy) generateGridOrders(totalQuote, totalBase, lastPrice fixedpoin
func (s *Strategy) clearOpenOrders(ctx context.Context, session *bbgo.ExchangeSession) error {
// clear open orders when start
openOrders, err := queryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol)
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol)
if err != nil {
return err
}
return cancelOrdersUntilSuccessful(ctx, session.Exchange, openOrders...)
return retry.CancelOrdersUntilSuccessful(ctx, session.Exchange, openOrders...)
}
func (s *Strategy) getLastTradePrice(ctx context.Context, session *bbgo.ExchangeSession) (fixedpoint.Value, error) {
@ -1996,7 +1996,7 @@ func (s *Strategy) recoverGrid(ctx context.Context, session *bbgo.ExchangeSessio
}
func (s *Strategy) recoverByScanningOrders(ctx context.Context, session *bbgo.ExchangeSession) error {
openOrders, err := queryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol)
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol)
if err != nil {
return err
}
@ -2047,7 +2047,7 @@ func (s *Strategy) openOrdersMismatches(ctx context.Context, session *bbgo.Excha
}
func (s *Strategy) cancelDuplicatedPriceOpenOrders(ctx context.Context, session *bbgo.ExchangeSession) error {
openOrders, err := queryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol)
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol)
if err != nil {
return err
}
@ -2105,40 +2105,3 @@ func (s *Strategy) newClientOrderID() string {
}
return ""
}
func generalBackoff(ctx context.Context, op backoff.Operation) (err error) {
err = backoff.Retry(op, backoff.WithContext(
backoff.WithMaxRetries(
backoff.NewExponentialBackOff(),
101),
ctx))
return err
}
func cancelAllOrdersUntilSuccessful(ctx context.Context, service advancedOrderCancelApi) error {
var op = func() (err2 error) {
_, err2 = service.CancelAllOrders(ctx)
return err2
}
return generalBackoff(ctx, op)
}
func cancelOrdersUntilSuccessful(ctx context.Context, ex types.Exchange, orders ...types.Order) error {
var op = func() (err2 error) {
err2 = ex.CancelOrders(ctx, orders...)
return err2
}
return generalBackoff(ctx, op)
}
func queryOpenOrdersUntilSuccessful(ctx context.Context, ex types.Exchange, symbol string) (openOrders []types.Order, err error) {
var op = func() (err2 error) {
openOrders, err2 = ex.QueryOpenOrders(ctx, symbol)
return err2
}
err = generalBackoff(ctx, op)
return openOrders, err
}

View File

@ -58,6 +58,9 @@ type Position struct {
AccumulatedProfit fixedpoint.Value `json:"accumulatedProfit,omitempty" db:"accumulated_profit"`
// closing is a flag for marking this position is closing
closing bool
sync.Mutex
// Modify position callbacks
@ -428,6 +431,25 @@ func (p *Position) BindStream(stream Stream) {
})
}
func (p *Position) SetClosing(c bool) bool {
p.Lock()
defer p.Unlock()
if p.closing && c {
return false
}
p.closing = c
return true
}
func (p *Position) IsClosing() (c bool) {
p.Lock()
c = p.closing
p.Unlock()
return c
}
func (p *Position) AddTrades(trades []Trade) (fixedpoint.Value, fixedpoint.Value, bool) {
var totalProfitAmount, totalNetProfit fixedpoint.Value
for _, trade := range trades {

View File

@ -313,3 +313,15 @@ func TestPosition(t *testing.T) {
})
}
}
func TestPosition_SetClosing(t *testing.T) {
p := NewPosition("BTCUSDT", "BTC", "USDT")
ret := p.SetClosing(true)
assert.True(t, ret)
ret = p.SetClosing(true)
assert.False(t, ret)
ret = p.SetClosing(false)
assert.True(t, ret)
}