diff --git a/pkg/bbgo/order_executor_general.go b/pkg/bbgo/order_executor_general.go index 80bdba8a5..1c4d85426 100644 --- a/pkg/bbgo/order_executor_general.go +++ b/pkg/bbgo/order_executor_general.go @@ -3,8 +3,8 @@ package bbgo import ( "context" "fmt" + "strconv" "strings" - "sync/atomic" "time" "github.com/pkg/errors" @@ -14,6 +14,7 @@ import ( "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" + "github.com/c9s/bbgo/pkg/util/backoff" ) var ErrExceededSubmitOrderRetryLimit = errors.New("exceeded submit order retry limit") @@ -42,7 +43,6 @@ type GeneralOrderExecutor struct { maxRetries uint disableNotify bool - closing int64 } func NewGeneralOrderExecutor(session *ExchangeSession, symbol, strategy, strategyInstanceID string, position *types.Position) *GeneralOrderExecutor { @@ -442,23 +442,22 @@ func (e *GeneralOrderExecutor) GracefulCancel(ctx context.Context, orders ...typ return nil } +var ErrPositionAlreadyClosing = errors.New("position is already in closing process, can't close it again") + // ClosePosition closes the current position by a percentage. // percentage 0.1 means close 10% position // tag is the order tag you want to attach, you may pass multiple tags, the tags will be combined into one tag string by commas. func (e *GeneralOrderExecutor) ClosePosition(ctx context.Context, percentage fixedpoint.Value, tags ...string) error { + if !e.position.SetClosing(true) { + return ErrPositionAlreadyClosing + } + defer e.position.SetClosing(false) + submitOrder := e.position.NewMarketCloseOrder(percentage) if submitOrder == nil { return nil } - if e.closing > 0 { - log.Errorf("position is already closing") - return nil - } - - atomic.AddInt64(&e.closing, 1) - defer atomic.StoreInt64(&e.closing, 0) - if e.session.Futures { // Futures: Use base qty in e.position submitOrder.Quantity = e.position.GetBase().Abs() submitOrder.ReduceOnly = true @@ -496,8 +495,34 @@ func (e *GeneralOrderExecutor) ClosePosition(ctx context.Context, percentage fix Notify("Closing %s position %s with tags: %s", e.symbol, percentage.Percentage(), tagStr) - _, err := e.SubmitOrders(ctx, *submitOrder) - return err + createdOrders, err := e.SubmitOrders(ctx, *submitOrder) + if err != nil { + return err + } + + if queryOrderService, ok := e.session.Exchange.(types.ExchangeOrderQueryService); ok { + switch submitOrder.Type { + case types.OrderTypeMarket: + _ = backoff.RetryGeneral(ctx, func() error { + order, err2 := queryOrderService.QueryOrder(ctx, types.OrderQuery{ + Symbol: e.symbol, + OrderID: strconv.FormatUint(createdOrders[0].OrderID, 10), + }) + + if err2 != nil { + return err2 + } + + if order.Status != types.OrderStatusFilled { + return errors.New("order is not filled yet") + } + + return nil + }) + } + } + + return nil } func (e *GeneralOrderExecutor) TradeCollector() *TradeCollector { diff --git a/pkg/types/position.go b/pkg/types/position.go index 6cee34e75..5f2bf58f7 100644 --- a/pkg/types/position.go +++ b/pkg/types/position.go @@ -58,6 +58,9 @@ type Position struct { AccumulatedProfit fixedpoint.Value `json:"accumulatedProfit,omitempty" db:"accumulated_profit"` + // closing is a flag for marking this position is closing + closing bool + sync.Mutex // Modify position callbacks @@ -428,6 +431,25 @@ func (p *Position) BindStream(stream Stream) { }) } +func (p *Position) SetClosing(c bool) bool { + p.Lock() + defer p.Unlock() + + if p.closing && c { + return false + } + + p.closing = c + return true +} + +func (p *Position) IsClosing() (c bool) { + p.Lock() + c = p.closing + p.Unlock() + return c +} + func (p *Position) AddTrades(trades []Trade) (fixedpoint.Value, fixedpoint.Value, bool) { var totalProfitAmount, totalNetProfit fixedpoint.Value for _, trade := range trades {