From 29105eb57f1ace8638b6b14d21852117eea41741 Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 9 Sep 2022 18:41:06 +0800 Subject: [PATCH] all: simplify underlying exchange submitOrder method - Replace SubmitOrders with SubmitOrder - Accept only one submit order and return one created order - Add bbgo.BatchPlaceOrders helper method and bbgo.BatchRetryPlaceOrders method --- examples/exchange-api/binance-margin/main.go | 2 +- pkg/backtest/exchange.go | 34 ++--- pkg/bbgo/exit_trailing_stop_test.go | 4 +- pkg/bbgo/order_execution.go | 40 ++++- pkg/bbgo/order_executor_general.go | 29 +--- pkg/cmd/orders.go | 4 +- pkg/exchange/binance/exchange.go | 37 ++--- pkg/exchange/ftx/exchange.go | 111 +++++++------- pkg/exchange/ftx/exchange_test.go | 2 +- pkg/exchange/kucoin/exchange.go | 134 ++++++++--------- pkg/exchange/max/exchange.go | 127 ++++++++-------- pkg/exchange/okex/exchange.go | 147 +++++++++++-------- pkg/grpc/server.go | 16 +- pkg/strategy/fmaker/strategy.go | 11 +- pkg/strategy/xgap/strategy.go | 3 +- pkg/types/exchange.go | 2 +- pkg/types/mocks/mock_exchange.go | 19 +-- 17 files changed, 366 insertions(+), 356 deletions(-) diff --git a/examples/exchange-api/binance-margin/main.go b/examples/exchange-api/binance-margin/main.go index 1b06efa19..aeb8c3250 100644 --- a/examples/exchange-api/binance-margin/main.go +++ b/examples/exchange-api/binance-margin/main.go @@ -94,7 +94,7 @@ var rootCmd = &cobra.Command{ time.Sleep(time.Second) - createdOrders, err := exchange.SubmitOrders(ctx, types.SubmitOrder{ + createdOrders, err := exchange.SubmitOrder(ctx, types.SubmitOrder{ Symbol: symbol, Market: market, Side: types.SideTypeBuy, diff --git a/pkg/backtest/exchange.go b/pkg/backtest/exchange.go index 58b0bbaa0..3b51bda08 100644 --- a/pkg/backtest/exchange.go +++ b/pkg/backtest/exchange.go @@ -169,31 +169,23 @@ func (e *Exchange) QueryOrder(ctx context.Context, q types.OrderQuery) (*types.O return nil, nil } -func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) { - for _, order := range orders { - symbol := order.Symbol - matching, ok := e.matchingBook(symbol) - if !ok { - return nil, fmt.Errorf("matching engine is not initialized for symbol %s", symbol) - } +func (e *Exchange) SubmitOrder(ctx context.Context, order types.SubmitOrder) (createdOrder *types.Order, err error) { + symbol := order.Symbol + matching, ok := e.matchingBook(symbol) + if !ok { + return nil, fmt.Errorf("matching engine is not initialized for symbol %s", symbol) + } - createdOrder, _, err := matching.PlaceOrder(order) - if err != nil { - return nil, err - } - - if createdOrder != nil { - createdOrders = append(createdOrders, *createdOrder) - - // market order can be closed immediately. - switch createdOrder.Status { - case types.OrderStatusFilled, types.OrderStatusCanceled, types.OrderStatusRejected: - e.addClosedOrder(*createdOrder) - } + createdOrder, _, err = matching.PlaceOrder(order) + if createdOrder != nil { + // market order can be closed immediately. + switch createdOrder.Status { + case types.OrderStatusFilled, types.OrderStatusCanceled, types.OrderStatusRejected: + e.addClosedOrder(*createdOrder) } } - return createdOrders, nil + return createdOrder, err } func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders []types.Order, err error) { diff --git a/pkg/bbgo/exit_trailing_stop_test.go b/pkg/bbgo/exit_trailing_stop_test.go index e585899ce..42bd37e88 100644 --- a/pkg/bbgo/exit_trailing_stop_test.go +++ b/pkg/bbgo/exit_trailing_stop_test.go @@ -35,7 +35,7 @@ func TestTrailingStop_ShortPosition(t *testing.T) { mockEx := mocks.NewMockExchange(mockCtrl) mockEx.EXPECT().NewStream().Return(&types.StandardStream{}).Times(2) - mockEx.EXPECT().SubmitOrders(gomock.Any(), types.SubmitOrder{ + mockEx.EXPECT().SubmitOrder(gomock.Any(), types.SubmitOrder{ Symbol: "BTCUSDT", Side: types.SideTypeBuy, Type: types.OrderTypeMarket, @@ -113,7 +113,7 @@ func TestTrailingStop_LongPosition(t *testing.T) { mockEx := mocks.NewMockExchange(mockCtrl) mockEx.EXPECT().NewStream().Return(&types.StandardStream{}).Times(2) - mockEx.EXPECT().SubmitOrders(gomock.Any(), types.SubmitOrder{ + mockEx.EXPECT().SubmitOrder(gomock.Any(), types.SubmitOrder{ Symbol: "BTCUSDT", Side: types.SideTypeSell, Type: types.OrderTypeMarket, diff --git a/pkg/bbgo/order_execution.go b/pkg/bbgo/order_execution.go index a4e93baf7..c2caf469c 100644 --- a/pkg/bbgo/order_execution.go +++ b/pkg/bbgo/order_execution.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "go.uber.org/multierr" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" @@ -42,7 +43,41 @@ func (e *ExchangeOrderExecutionRouter) SubmitOrdersTo(ctx context.Context, sessi return nil, err } - return es.Exchange.SubmitOrders(ctx, formattedOrders...) + createdOrders, _, err := BatchPlaceOrder(ctx, es.Exchange, formattedOrders...) + return createdOrders, err +} + +func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx []int, submitOrders ...types.SubmitOrder) (types.OrderSlice, error) { + var createdOrders types.OrderSlice + var err error + for _, idx := range errIdx { + createdOrder, err2 := exchange.SubmitOrder(ctx, submitOrders[idx]) + if err2 != nil { + err = multierr.Append(err, err2) + } else if createdOrder != nil { + createdOrders = append(createdOrders, *createdOrder) + } + } + + return createdOrders, err +} + +func BatchPlaceOrder(ctx context.Context, exchange types.Exchange, submitOrders ...types.SubmitOrder) (types.OrderSlice, []int, error) { + var createdOrders types.OrderSlice + var err error + var errIndexes []int + for i, submitOrder := range submitOrders { + createdOrder, err2 := exchange.SubmitOrder(ctx, submitOrder) + if err2 != nil { + err = multierr.Append(err, err2) + errIndexes = append(errIndexes, i) + } else if createdOrder != nil { + createdOrder.Tag = submitOrder.Tag + createdOrders = append(createdOrders, *createdOrder) + } + } + + return createdOrders, errIndexes, err } func (e *ExchangeOrderExecutionRouter) CancelOrdersTo(ctx context.Context, session string, orders ...types.Order) error { @@ -105,7 +140,8 @@ func (e *ExchangeOrderExecutor) SubmitOrders(ctx context.Context, orders ...type e.notifySubmitOrders(formattedOrders...) - return e.Session.Exchange.SubmitOrders(ctx, formattedOrders...) + createdOrders, _, err := BatchPlaceOrder(ctx, e.Session.Exchange, formattedOrders...) + return createdOrders, err } func (e *ExchangeOrderExecutor) CancelOrders(ctx context.Context, orders ...types.Order) error { diff --git a/pkg/bbgo/order_executor_general.go b/pkg/bbgo/order_executor_general.go index f8828236a..fb92c8229 100644 --- a/pkg/bbgo/order_executor_general.go +++ b/pkg/bbgo/order_executor_general.go @@ -7,6 +7,7 @@ import ( "strings" log "github.com/sirupsen/logrus" + "go.uber.org/multierr" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" @@ -110,28 +111,14 @@ func (e *GeneralOrderExecutor) SubmitOrders(ctx context.Context, submitOrders .. return nil, err } - var createdOrders types.OrderSlice - - retOrders, err := e.session.Exchange.SubmitOrders(ctx, formattedOrders...) - if len(retOrders) > 0 { - createdOrders = append(createdOrders, retOrders...) - } - - if err != nil { - // retry once - retOrders, err = e.session.Exchange.SubmitOrders(ctx, formattedOrders...) - if len(retOrders) > 0 { - createdOrders = append(createdOrders, retOrders...) + createdOrders, errIdx, err := BatchPlaceOrder(ctx, e.session.Exchange, formattedOrders...) + if len(errIdx) > 0 { + createdOrders2, err2 := BatchRetryPlaceOrder(ctx, e.session.Exchange, errIdx, formattedOrders...) + if err2 != nil { + err = multierr.Append(err, err2) + } else { + createdOrders = append(createdOrders, createdOrders2...) } - - if err != nil { - err = fmt.Errorf("can not place orders: %w", err) - } - } - - // FIXME: map by price and volume - for i := 0; i < len(createdOrders); i++ { - createdOrders[i].Tag = formattedOrders[i].Tag } e.orderStore.Add(createdOrders...) diff --git a/pkg/cmd/orders.go b/pkg/cmd/orders.go index f5dbf496c..3e7ac116c 100644 --- a/pkg/cmd/orders.go +++ b/pkg/cmd/orders.go @@ -384,12 +384,12 @@ var submitOrderCmd = &cobra.Command{ so.TimeInForce = types.TimeInForceGTC } - co, err := session.Exchange.SubmitOrders(ctx, so) + co, err := session.Exchange.SubmitOrder(ctx, so) if err != nil { return err } - log.Infof("submitted order: %+v\ncreated order: %+v", so, co[0]) + log.Infof("submitted order: %+v\ncreated order: %+v", so, co) return nil }, } diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index c6ef35577..3f1de9c0e 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -1251,33 +1251,20 @@ func (e *Exchange) submitSpotOrder(ctx context.Context, order types.SubmitOrder) return createdOrder, err } -func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) { - for _, order := range orders { - if err := orderLimiter.Wait(ctx); err != nil { - log.WithError(err).Errorf("order rate limiter wait error") - } - - var createdOrder *types.Order - if e.IsMargin { - createdOrder, err = e.submitMarginOrder(ctx, order) - } else if e.IsFutures { - createdOrder, err = e.submitFuturesOrder(ctx, order) - } else { - createdOrder, err = e.submitSpotOrder(ctx, order) - } - - if err != nil { - return createdOrders, err - } - - if createdOrder == nil { - return createdOrders, errors.New("nil converted order") - } - - createdOrders = append(createdOrders, *createdOrder) +func (e *Exchange) SubmitOrder(ctx context.Context, order types.SubmitOrder) (createdOrder *types.Order, err error) { + if err := orderLimiter.Wait(ctx); err != nil { + log.WithError(err).Errorf("order rate limiter wait error") } - return createdOrders, err + if e.IsMargin { + createdOrder, err = e.submitMarginOrder(ctx, order) + } else if e.IsFutures { + createdOrder, err = e.submitFuturesOrder(ctx, order) + } else { + createdOrder, err = e.submitSpotOrder(ctx, order) + } + + return createdOrder, err } // QueryKLines queries the Kline/candlestick bars for a symbol. Klines are uniquely identified by their open time. diff --git a/pkg/exchange/ftx/exchange.go b/pkg/exchange/ftx/exchange.go index 4f62cc052..04595e318 100644 --- a/pkg/exchange/ftx/exchange.go +++ b/pkg/exchange/ftx/exchange.go @@ -290,7 +290,7 @@ func (e *Exchange) _queryKLines(ctx context.Context, symbol string, interval typ } } - resp, err := e.newRest().HistoricalPrices(ctx, toLocalSymbol(symbol), interval, int64(options.Limit), options.StartTime, options.EndTime) + resp, err := e.newRest().marketRequest.HistoricalPrices(ctx, toLocalSymbol(symbol), interval, int64(options.Limit), options.StartTime, options.EndTime) if err != nil { return nil, err } @@ -401,61 +401,54 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, return } -func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (types.OrderSlice, error) { - var createdOrders types.OrderSlice +func (e *Exchange) SubmitOrder(ctx context.Context, order types.SubmitOrder) (*types.Order, error) { // TODO: currently only support limit and market order // TODO: support time in force - for _, so := range orders { - if err := requestLimit.Wait(ctx); err != nil { - logrus.WithError(err).Error("rate limit error") - } - - orderType, err := toLocalOrderType(so.Type) - if err != nil { - logrus.WithError(err).Error("type error") - } - - submitQuantity := so.Quantity - switch orderType { - case ftxapi.OrderTypeLimit, ftxapi.OrderTypeStopLimit: - submitQuantity = so.Quantity.Div(e.orderAmountReduceFactor) - } - - req := e.client.NewPlaceOrderRequest() - req.Market(toLocalSymbol(TrimUpperString(so.Symbol))) - req.OrderType(orderType) - req.Side(ftxapi.Side(TrimLowerString(string(so.Side)))) - req.Size(submitQuantity) - - switch so.Type { - case types.OrderTypeLimit, types.OrderTypeLimitMaker: - req.Price(so.Price) - - } - - if so.Type == types.OrderTypeLimitMaker { - req.PostOnly(true) - } - - if so.TimeInForce == types.TimeInForceIOC { - req.Ioc(true) - } - - req.ClientID(newSpotClientOrderID(so.ClientOrderID)) - - or, err := req.Do(ctx) - if err != nil { - return createdOrders, fmt.Errorf("failed to place order %+v: %w", so, err) - } - - globalOrder, err := toGlobalOrderNew(*or) - if err != nil { - return createdOrders, fmt.Errorf("failed to convert response to global order") - } - - createdOrders = append(createdOrders, globalOrder) + so := order + if err := requestLimit.Wait(ctx); err != nil { + logrus.WithError(err).Error("rate limit error") } - return createdOrders, nil + + orderType, err := toLocalOrderType(so.Type) + if err != nil { + logrus.WithError(err).Error("type error") + } + + submitQuantity := so.Quantity + switch orderType { + case ftxapi.OrderTypeLimit, ftxapi.OrderTypeStopLimit: + submitQuantity = so.Quantity.Div(e.orderAmountReduceFactor) + } + + req := e.client.NewPlaceOrderRequest() + req.Market(toLocalSymbol(TrimUpperString(so.Symbol))) + req.OrderType(orderType) + req.Side(ftxapi.Side(TrimLowerString(string(so.Side)))) + req.Size(submitQuantity) + + switch so.Type { + case types.OrderTypeLimit, types.OrderTypeLimitMaker: + req.Price(so.Price) + + } + + if so.Type == types.OrderTypeLimitMaker { + req.PostOnly(true) + } + + if so.TimeInForce == types.TimeInForceIOC { + req.Ioc(true) + } + + req.ClientID(newSpotClientOrderID(so.ClientOrderID)) + + or, err := req.Do(ctx) + if err != nil { + return nil, fmt.Errorf("failed to place order %+v: %w", so, err) + } + + globalOrder, err := toGlobalOrderNew(*or) + return &globalOrder, err } func (e *Exchange) QueryOrder(ctx context.Context, q types.OrderQuery) (*types.Order, error) { @@ -470,8 +463,12 @@ func (e *Exchange) QueryOrder(ctx context.Context, q types.OrderQuery) (*types.O return nil, err } - order, err := toGlobalOrderNew(*ftxOrder) - return &order, err + o, err := toGlobalOrderNew(*ftxOrder) + if err != nil { + return nil, err + } + + return &o, err } func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders []types.Order, err error) { @@ -572,7 +569,6 @@ func (e *Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticke } func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) { - var tickers = make(map[string]types.Ticker) markets, err := e._queryMarkets(ctx) @@ -586,7 +582,6 @@ func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[stri } rest := e.newRest() - for k, v := range markets { // if we provide symbol as condition then we only query the gieven symbol , @@ -603,7 +598,7 @@ func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[stri now := time.Now() since := now.Add(time.Duration(-1) * time.Hour) until := now - prices, err := rest.HistoricalPrices(ctx, v.Market.LocalSymbol, types.Interval1h, 1, &since, &until) + prices, err := rest.marketRequest.HistoricalPrices(ctx, v.Market.LocalSymbol, types.Interval1h, 1, &since, &until) if err != nil || !prices.Success || len(prices.Result) == 0 { continue } diff --git a/pkg/exchange/ftx/exchange_test.go b/pkg/exchange/ftx/exchange_test.go index eb7cc731a..1f4f46ebd 100644 --- a/pkg/exchange/ftx/exchange_test.go +++ b/pkg/exchange/ftx/exchange_test.go @@ -34,7 +34,7 @@ func TestExchange_IOCOrder(t *testing.T) { } ex := NewExchange(key, secret, "") - createdOrder, err := ex.SubmitOrders(context.Background(), types.SubmitOrder{ + createdOrder, err := ex.SubmitOrder(context.Background(), types.SubmitOrder{ Symbol: "LTCUSDT", Side: types.SideTypeBuy, Type: types.OrderTypeLimitMaker, diff --git a/pkg/exchange/kucoin/exchange.go b/pkg/exchange/kucoin/exchange.go index 28da3a04e..2f6746a2e 100644 --- a/pkg/exchange/kucoin/exchange.go +++ b/pkg/exchange/kucoin/exchange.go @@ -207,78 +207,74 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type return klines, nil } -func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) { - for _, order := range orders { - req := e.client.TradeService.NewPlaceOrderRequest() - req.Symbol(toLocalSymbol(order.Symbol)) - req.Side(toLocalSide(order.Side)) +func (e *Exchange) SubmitOrder(ctx context.Context, order types.SubmitOrder) (createdOrder *types.Order, err error) { + req := e.client.TradeService.NewPlaceOrderRequest() + req.Symbol(toLocalSymbol(order.Symbol)) + req.Side(toLocalSide(order.Side)) - if order.ClientOrderID != "" { - req.ClientOrderID(order.ClientOrderID) - } - - if order.Market.Symbol != "" { - req.Size(order.Market.FormatQuantity(order.Quantity)) - } else { - // TODO: report error? - req.Size(order.Quantity.FormatString(8)) - } - - // set price field for limit orders - switch order.Type { - case types.OrderTypeStopLimit, types.OrderTypeLimit, types.OrderTypeLimitMaker: - if order.Market.Symbol != "" { - req.Price(order.Market.FormatPrice(order.Price)) - } else { - // TODO: report error? - req.Price(order.Price.FormatString(8)) - } - } - - if order.Type == types.OrderTypeLimitMaker { - req.PostOnly(true) - } - - switch order.TimeInForce { - case "FOK": - req.TimeInForce(kucoinapi.TimeInForceFOK) - case "IOC": - req.TimeInForce(kucoinapi.TimeInForceIOC) - default: - // default to GTC - req.TimeInForce(kucoinapi.TimeInForceGTC) - } - - switch order.Type { - case types.OrderTypeStopLimit: - req.OrderType(kucoinapi.OrderTypeStopLimit) - - case types.OrderTypeLimit, types.OrderTypeLimitMaker: - req.OrderType(kucoinapi.OrderTypeLimit) - - case types.OrderTypeMarket: - req.OrderType(kucoinapi.OrderTypeMarket) - } - - orderResponse, err := req.Do(ctx) - if err != nil { - return createdOrders, err - } - - createdOrders = append(createdOrders, types.Order{ - SubmitOrder: order, - Exchange: types.ExchangeKucoin, - OrderID: hashStringID(orderResponse.OrderID), - UUID: orderResponse.OrderID, - Status: types.OrderStatusNew, - ExecutedQuantity: fixedpoint.Zero, - IsWorking: true, - CreationTime: types.Time(time.Now()), - UpdateTime: types.Time(time.Now()), - }) + if order.ClientOrderID != "" { + req.ClientOrderID(order.ClientOrderID) } - return createdOrders, err + if order.Market.Symbol != "" { + req.Size(order.Market.FormatQuantity(order.Quantity)) + } else { + // TODO: report error? + req.Size(order.Quantity.FormatString(8)) + } + + // set price field for limit orders + switch order.Type { + case types.OrderTypeStopLimit, types.OrderTypeLimit, types.OrderTypeLimitMaker: + if order.Market.Symbol != "" { + req.Price(order.Market.FormatPrice(order.Price)) + } else { + // TODO: report error? + req.Price(order.Price.FormatString(8)) + } + } + + if order.Type == types.OrderTypeLimitMaker { + req.PostOnly(true) + } + + switch order.TimeInForce { + case "FOK": + req.TimeInForce(kucoinapi.TimeInForceFOK) + case "IOC": + req.TimeInForce(kucoinapi.TimeInForceIOC) + default: + // default to GTC + req.TimeInForce(kucoinapi.TimeInForceGTC) + } + + switch order.Type { + case types.OrderTypeStopLimit: + req.OrderType(kucoinapi.OrderTypeStopLimit) + + case types.OrderTypeLimit, types.OrderTypeLimitMaker: + req.OrderType(kucoinapi.OrderTypeLimit) + + case types.OrderTypeMarket: + req.OrderType(kucoinapi.OrderTypeMarket) + } + + orderResponse, err := req.Do(ctx) + if err != nil { + return createdOrder, err + } + + return &types.Order{ + SubmitOrder: order, + Exchange: types.ExchangeKucoin, + OrderID: hashStringID(orderResponse.OrderID), + UUID: orderResponse.OrderID, + Status: types.OrderStatusNew, + ExecutedQuantity: fixedpoint.Zero, + IsWorking: true, + CreationTime: types.Time(time.Now()), + UpdateTime: types.Time(time.Now()), + }, nil } // QueryOpenOrders diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index e19604c05..e09ed0f9d 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -485,80 +485,73 @@ func (e *Exchange) Withdraw(ctx context.Context, asset string, amount fixedpoint return nil } -func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) { +func (e *Exchange) SubmitOrder(ctx context.Context, order types.SubmitOrder) (createdOrder *types.Order, err error) { walletType := maxapi.WalletTypeSpot if e.MarginSettings.IsMargin { walletType = maxapi.WalletTypeMargin } - for _, o := range orders { - orderType, err := toLocalOrderType(o.Type) - if err != nil { - return createdOrders, err - } - - // case IOC type - if orderType == maxapi.OrderTypeLimit && o.TimeInForce == types.TimeInForceIOC { - orderType = maxapi.OrderTypeIOCLimit - } - - var quantityString string - if o.Market.Symbol != "" { - quantityString = o.Market.FormatQuantity(o.Quantity) - } else { - quantityString = o.Quantity.String() - } - - clientOrderID := NewClientOrderID(o.ClientOrderID) - - req := e.v3order.NewCreateWalletOrderRequest(walletType) - req.Market(toLocalSymbol(o.Symbol)). - Side(toLocalSideType(o.Side)). - Volume(quantityString). - OrderType(orderType). - ClientOrderID(clientOrderID) - - switch o.Type { - case types.OrderTypeStopLimit, types.OrderTypeLimit, types.OrderTypeLimitMaker: - var priceInString string - if o.Market.Symbol != "" { - priceInString = o.Market.FormatPrice(o.Price) - } else { - priceInString = o.Price.String() - } - req.Price(priceInString) - } - - // set stop price field for limit orders - switch o.Type { - case types.OrderTypeStopLimit, types.OrderTypeStopMarket: - var priceInString string - if o.Market.Symbol != "" { - priceInString = o.Market.FormatPrice(o.StopPrice) - } else { - priceInString = o.StopPrice.String() - } - req.StopPrice(priceInString) - } - - retOrder, err := req.Do(ctx) - if err != nil { - return createdOrders, err - } - - if retOrder == nil { - return createdOrders, errors.New("returned nil order") - } - - createdOrder, err := toGlobalOrder(*retOrder) - if err != nil { - return createdOrders, err - } - - createdOrders = append(createdOrders, *createdOrder) + o := order + orderType, err := toLocalOrderType(o.Type) + if err != nil { + return createdOrder, err } - return createdOrders, err + // case IOC type + if orderType == maxapi.OrderTypeLimit && o.TimeInForce == types.TimeInForceIOC { + orderType = maxapi.OrderTypeIOCLimit + } + + var quantityString string + if o.Market.Symbol != "" { + quantityString = o.Market.FormatQuantity(o.Quantity) + } else { + quantityString = o.Quantity.String() + } + + clientOrderID := NewClientOrderID(o.ClientOrderID) + + req := e.v3order.NewCreateWalletOrderRequest(walletType) + req.Market(toLocalSymbol(o.Symbol)). + Side(toLocalSideType(o.Side)). + Volume(quantityString). + OrderType(orderType). + ClientOrderID(clientOrderID) + + switch o.Type { + case types.OrderTypeStopLimit, types.OrderTypeLimit, types.OrderTypeLimitMaker: + var priceInString string + if o.Market.Symbol != "" { + priceInString = o.Market.FormatPrice(o.Price) + } else { + priceInString = o.Price.String() + } + req.Price(priceInString) + } + + // set stop price field for limit orders + switch o.Type { + case types.OrderTypeStopLimit, types.OrderTypeStopMarket: + var priceInString string + if o.Market.Symbol != "" { + priceInString = o.Market.FormatPrice(o.StopPrice) + } else { + priceInString = o.StopPrice.String() + } + req.StopPrice(priceInString) + } + + retOrder, err := req.Do(ctx) + if err != nil { + return createdOrder, err + } + + if retOrder == nil { + return createdOrder, errors.New("returned nil order") + } + + createdOrder, err = toGlobalOrder(*retOrder) + return createdOrder, err } // PlatformFeeCurrency diff --git a/pkg/exchange/okex/exchange.go b/pkg/exchange/okex/exchange.go index ab8dcddd2..0bebbc62f 100644 --- a/pkg/exchange/okex/exchange.go +++ b/pkg/exchange/okex/exchange.go @@ -159,78 +159,97 @@ func (e *Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap, return balanceMap, nil } -func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) { - var reqs []*okexapi.PlaceOrderRequest - for _, order := range orders { - orderReq := e.client.TradeService.NewPlaceOrderRequest() +func (e *Exchange) SubmitOrder(ctx context.Context, order types.SubmitOrder) (*types.Order, error) { + orderReq := e.client.TradeService.NewPlaceOrderRequest() - orderType, err := toLocalOrderType(order.Type) - if err != nil { - return nil, err - } - - orderReq.InstrumentID(toLocalSymbol(order.Symbol)) - orderReq.Side(toLocalSideType(order.Side)) - - if order.Market.Symbol != "" { - orderReq.Quantity(order.Market.FormatQuantity(order.Quantity)) - } else { - // TODO report error - orderReq.Quantity(order.Quantity.FormatString(8)) - } - - // set price field for limit orders - switch order.Type { - case types.OrderTypeStopLimit, types.OrderTypeLimit: - if order.Market.Symbol != "" { - orderReq.Price(order.Market.FormatPrice(order.Price)) - } else { - // TODO report error - orderReq.Price(order.Price.FormatString(8)) - } - } - - switch order.TimeInForce { - case "FOK": - orderReq.OrderType(okexapi.OrderTypeFOK) - case "IOC": - orderReq.OrderType(okexapi.OrderTypeIOC) - default: - orderReq.OrderType(orderType) - } - - reqs = append(reqs, orderReq) - } - - batchReq := e.client.TradeService.NewBatchPlaceOrderRequest() - batchReq.Add(reqs...) - orderHeads, err := batchReq.Do(ctx) + orderType, err := toLocalOrderType(order.Type) if err != nil { return nil, err } - for idx, orderHead := range orderHeads { - orderID, err := strconv.ParseInt(orderHead.OrderID, 10, 64) - if err != nil { - return createdOrders, err - } + orderReq.InstrumentID(toLocalSymbol(order.Symbol)) + orderReq.Side(toLocalSideType(order.Side)) - submitOrder := orders[idx] - createdOrders = append(createdOrders, types.Order{ - SubmitOrder: submitOrder, - Exchange: types.ExchangeOKEx, - OrderID: uint64(orderID), - Status: types.OrderStatusNew, - ExecutedQuantity: fixedpoint.Zero, - IsWorking: true, - CreationTime: types.Time(time.Now()), - UpdateTime: types.Time(time.Now()), - IsMargin: false, - IsIsolated: false, - }) + if order.Market.Symbol != "" { + orderReq.Quantity(order.Market.FormatQuantity(order.Quantity)) + } else { + // TODO report error + orderReq.Quantity(order.Quantity.FormatString(8)) } - return createdOrders, nil + // set price field for limit orders + switch order.Type { + case types.OrderTypeStopLimit, types.OrderTypeLimit: + if order.Market.Symbol != "" { + orderReq.Price(order.Market.FormatPrice(order.Price)) + } else { + // TODO report error + orderReq.Price(order.Price.FormatString(8)) + } + } + + switch order.TimeInForce { + case "FOK": + orderReq.OrderType(okexapi.OrderTypeFOK) + case "IOC": + orderReq.OrderType(okexapi.OrderTypeIOC) + default: + orderReq.OrderType(orderType) + } + + orderHead, err := orderReq.Do(ctx) + if err != nil { + return nil, err + } + + orderID, err := strconv.ParseInt(orderHead.OrderID, 10, 64) + if err != nil { + return nil, err + } + + return &types.Order{ + SubmitOrder: order, + Exchange: types.ExchangeOKEx, + OrderID: uint64(orderID), + Status: types.OrderStatusNew, + ExecutedQuantity: fixedpoint.Zero, + IsWorking: true, + CreationTime: types.Time(time.Now()), + UpdateTime: types.Time(time.Now()), + IsMargin: false, + IsIsolated: false, + }, nil + + // TODO: move this to batch place orders interface + /* + batchReq := e.client.TradeService.NewBatchPlaceOrderRequest() + batchReq.Add(reqs...) + orderHeads, err := batchReq.Do(ctx) + if err != nil { + return nil, err + } + + for idx, orderHead := range orderHeads { + orderID, err := strconv.ParseInt(orderHead.OrderID, 10, 64) + if err != nil { + return createdOrder, err + } + + submitOrder := order[idx] + createdOrder = append(createdOrder, types.Order{ + SubmitOrder: submitOrder, + Exchange: types.ExchangeOKEx, + OrderID: uint64(orderID), + Status: types.OrderStatusNew, + ExecutedQuantity: fixedpoint.Zero, + IsWorking: true, + CreationTime: types.Time(time.Now()), + UpdateTime: types.Time(time.Now()), + IsMargin: false, + IsIsolated: false, + }) + } + */ } func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders []types.Order, err error) { diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go index ad6df8799..c7e31bbfc 100644 --- a/pkg/grpc/server.go +++ b/pkg/grpc/server.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "go.uber.org/multierr" "google.golang.org/grpc" "google.golang.org/grpc/reflection" @@ -46,20 +47,27 @@ func (s *TradingService) SubmitOrder(ctx context.Context, request *pb.SubmitOrde } } - createdOrders, err := session.Exchange.SubmitOrders(ctx, submitOrders...) - if err != nil { - return nil, err + createdOrders, errIdx, err := bbgo.BatchPlaceOrder(ctx, session.Exchange, submitOrders...) + if len(errIdx) > 0 { + createdOrders2, err2 := bbgo.BatchRetryPlaceOrder(ctx, session.Exchange, errIdx, submitOrders...) + if err2 != nil { + err = multierr.Append(err, err2) + } else { + createdOrders = append(createdOrders, createdOrders2...) + } } + // convert response resp := &pb.SubmitOrderResponse{ Session: sessionName, Orders: nil, } + for _, createdOrder := range createdOrders { resp.Orders = append(resp.Orders, transOrder(session, createdOrder)) } - return resp, nil + return resp, err } func (s *TradingService) CancelOrder(ctx context.Context, request *pb.CancelOrderRequest) (*pb.CancelOrderResponse, error) { diff --git a/pkg/strategy/fmaker/strategy.go b/pkg/strategy/fmaker/strategy.go index 9a1c1c5c4..ef0faca8b 100644 --- a/pkg/strategy/fmaker/strategy.go +++ b/pkg/strategy/fmaker/strategy.go @@ -132,13 +132,14 @@ func (s *Strategy) ClosePosition(ctx context.Context, percentage fixedpoint.Valu // s.Notify("Submitting %s %s order to close position by %v", s.Symbol, side.String(), percentage, submitOrder) - createdOrders, err := s.session.Exchange.SubmitOrders(ctx, submitOrder) + createdOrder, err := s.session.Exchange.SubmitOrder(ctx, submitOrder) if err != nil { log.WithError(err).Errorf("can not place position close order") + } else if createdOrder != nil { + s.orderStore.Add(*createdOrder) + s.activeMakerOrders.Add(*createdOrder) } - s.orderStore.Add(createdOrders...) - s.activeMakerOrders.Add(createdOrders...) return err } func (s *Strategy) InstanceID() string { @@ -464,7 +465,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se // Price: kline.Close.Mul(fixedpoint.One.Add(s.Spread)), // Quantity: fixedpoint.NewFromFloat(math.Max(math.Min(eq, 0.003), 0.0005)), //0.0005 // } - // createdOrders, err = orderExecutor.SubmitOrders(ctx, submitOrder) + // createdOrders, err = orderExecutor.SubmitOrder(ctx, submitOrder) // if err != nil { // log.WithError(err).Errorf("can not place orders") // } @@ -495,7 +496,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se // Price: kline.Close.Mul(fixedpoint.One.Sub(s.Spread)), // Quantity: fixedpoint.NewFromFloat(math.Max(math.Min(eq, 0.003), 0.0005)), //0.0005 // } - // createdOrders, err = orderExecutor.SubmitOrders(ctx, submitOrder) + // createdOrders, err = orderExecutor.SubmitOrder(ctx, submitOrder) // if err != nil { // log.WithError(err).Errorf("can not place orders") // } diff --git a/pkg/strategy/xgap/strategy.go b/pkg/strategy/xgap/strategy.go index 7406e5551..0758456ed 100644 --- a/pkg/strategy/xgap/strategy.go +++ b/pkg/strategy/xgap/strategy.go @@ -350,7 +350,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se s.tradingMarket.MinNotional.Mul(NotionModifier).Div(price)) } - createdOrders, err := tradingSession.Exchange.SubmitOrders(ctx, types.SubmitOrder{ + createdOrders, _, err := bbgo.BatchPlaceOrder(ctx, tradingSession.Exchange, types.SubmitOrder{ Symbol: s.Symbol, Side: types.SideTypeBuy, Type: types.OrderTypeLimit, @@ -369,6 +369,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se // TimeInForce: types.TimeInForceGTC, GroupID: s.groupID, }) + if err != nil { log.WithError(err).Error("order submit error") } diff --git a/pkg/types/exchange.go b/pkg/types/exchange.go index 243c663d3..cb6708f94 100644 --- a/pkg/types/exchange.go +++ b/pkg/types/exchange.go @@ -96,7 +96,7 @@ type ExchangeTradeService interface { QueryAccountBalances(ctx context.Context) (BalanceMap, error) - SubmitOrders(ctx context.Context, orders ...SubmitOrder) (createdOrders OrderSlice, err error) + SubmitOrder(ctx context.Context, order SubmitOrder) (createdOrder *Order, err error) QueryOpenOrders(ctx context.Context, symbol string) (orders []Order, err error) diff --git a/pkg/types/mocks/mock_exchange.go b/pkg/types/mocks/mock_exchange.go index 731ab94de..d1836c01d 100644 --- a/pkg/types/mocks/mock_exchange.go +++ b/pkg/types/mocks/mock_exchange.go @@ -206,22 +206,17 @@ func (mr *MockExchangeMockRecorder) QueryTickers(arg0 interface{}, arg1 ...inter return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryTickers", reflect.TypeOf((*MockExchange)(nil).QueryTickers), varargs...) } -// SubmitOrders mocks base method. -func (m *MockExchange) SubmitOrders(arg0 context.Context, arg1 ...types.SubmitOrder) (types.OrderSlice, error) { +// SubmitOrder mocks base method. +func (m *MockExchange) SubmitOrder(arg0 context.Context, arg1 types.SubmitOrder) (*types.Order, error) { m.ctrl.T.Helper() - varargs := []interface{}{arg0} - for _, a := range arg1 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "SubmitOrders", varargs...) - ret0, _ := ret[0].(types.OrderSlice) + ret := m.ctrl.Call(m, "SubmitOrder", arg0, arg1) + ret0, _ := ret[0].(*types.Order) ret1, _ := ret[1].(error) return ret0, ret1 } -// SubmitOrders indicates an expected call of SubmitOrders. -func (mr *MockExchangeMockRecorder) SubmitOrders(arg0 interface{}, arg1 ...interface{}) *gomock.Call { +// SubmitOrder indicates an expected call of SubmitOrder. +func (mr *MockExchangeMockRecorder) SubmitOrder(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - varargs := append([]interface{}{arg0}, arg1...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubmitOrders", reflect.TypeOf((*MockExchange)(nil).SubmitOrders), varargs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubmitOrder", reflect.TypeOf((*MockExchange)(nil).SubmitOrder), arg0, arg1) }