bbgo_origin/pkg/exchange/okex/exchange.go

661 lines
20 KiB
Go

package okex
import (
"context"
"fmt"
"regexp"
"strconv"
"time"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
"golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/exchange/okex/okexapi"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
var (
// clientOrderIdRegex combine of case-sensitive alphanumerics, all numbers, or all letters of up to 32 characters.
clientOrderIdRegex = regexp.MustCompile("^[a-zA-Z0-9]{0,32}$")
// Rate Limit: 20 requests per 2 seconds, Rate limit rule: IP + instrumentType.
// Currently, calls are not made very frequently, so only IP is considered.
queryMarketLimiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
// Rate Limit: 20 requests per 2 seconds, Rate limit rule: IP
queryTickerLimiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
// Rate Limit: 20 requests per 2 seconds, Rate limit rule: IP
queryTickersLimiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
// Rate Limit: 10 requests per 2 seconds, Rate limit rule: UserID
queryAccountLimiter = rate.NewLimiter(rate.Every(200*time.Millisecond), 1)
// Rate Limit: 60 requests per 2 seconds, Rate limit rule (except Options): UserID + Instrument ID.
// TODO: support UserID + Instrument ID
placeOrderLimiter = rate.NewLimiter(rate.Every(33*time.Millisecond), 1)
// Rate Limit: 60 requests per 2 seconds, Rate limit rule (except Options): UserID + Instrument ID
// TODO: support UserID + Instrument ID
batchCancelOrderLimiter = rate.NewLimiter(rate.Every(33*time.Millisecond), 1)
// Rate Limit: 60 requests per 2 seconds, Rate limit rule: UserID
queryOpenOrderLimiter = rate.NewLimiter(rate.Every(33*time.Millisecond), 1)
// Rate Limit: 20 requests per 2 seconds, Rate limit rule: UserID
queryClosedOrderRateLimiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 1)
// Rate Limit: 10 requests per 2 seconds, Rate limit rule: UserID
queryTradeLimiter = rate.NewLimiter(rate.Every(200*time.Millisecond), 1)
// Rate Limit: 40 requests per 2 seconds, Rate limit rule: IP
queryKLineLimiter = rate.NewLimiter(rate.Every(50*time.Millisecond), 1)
)
const (
ID = "okex"
// PlatformToken is the platform currency of OKEx, pre-allocate static string here
PlatformToken = "OKB"
defaultQueryLimit = 100
maxHistoricalDataQueryPeriod = 90 * 24 * time.Hour
threeDaysHistoricalPeriod = 3 * 24 * time.Hour
)
var log = logrus.WithFields(logrus.Fields{
"exchange": ID,
})
var ErrSymbolRequired = errors.New("symbol is a required parameter")
type Exchange struct {
key, secret, passphrase string
client *okexapi.RestClient
timeNowFunc func() time.Time
}
func New(key, secret, passphrase string) *Exchange {
client := okexapi.NewClient()
if len(key) > 0 && len(secret) > 0 {
client.Auth(key, secret, passphrase)
}
return &Exchange{
key: key,
secret: secret,
passphrase: passphrase,
client: client,
timeNowFunc: time.Now,
}
}
func (e *Exchange) Name() types.ExchangeName {
return types.ExchangeOKEx
}
func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) {
if err := queryMarketLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("markets rate limiter wait error: %w", err)
}
instruments, err := e.client.NewGetInstrumentsInfoRequest().Do(ctx)
if err != nil {
return nil, err
}
markets := types.MarketMap{}
for _, instrument := range instruments {
symbol := toGlobalSymbol(instrument.InstrumentID)
market := types.Market{
Exchange: types.ExchangeOKEx,
Symbol: symbol,
LocalSymbol: instrument.InstrumentID,
QuoteCurrency: instrument.QuoteCurrency,
BaseCurrency: instrument.BaseCurrency,
// convert tick size OKEx to precision
PricePrecision: instrument.TickSize.NumFractionalDigits(),
VolumePrecision: instrument.LotSize.NumFractionalDigits(),
// TickSize: OKEx's price tick, for BTC-USDT it's "0.1"
TickSize: instrument.TickSize,
// Quantity step size, for BTC-USDT, it's "0.00000001"
StepSize: instrument.LotSize,
// for BTC-USDT, it's "0.00001"
MinQuantity: instrument.MinSize,
// OKEx does not offer minimal notional, use 1 USD here.
MinNotional: fixedpoint.One,
MinAmount: fixedpoint.One,
}
markets[symbol] = market
}
return markets, nil
}
func (e *Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticker, error) {
if err := queryTickerLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("ticker rate limiter wait error: %w", err)
}
symbol = toLocalSymbol(symbol)
marketTicker, err := e.client.NewGetTickerRequest().InstId(symbol).Do(ctx)
if err != nil {
return nil, err
}
if len(marketTicker) != 1 {
return nil, fmt.Errorf("unexpected length of %s market ticker, got: %v", symbol, marketTicker)
}
return toGlobalTicker(marketTicker[0]), nil
}
func (e *Exchange) QueryTickers(ctx context.Context, symbols ...string) (map[string]types.Ticker, error) {
if err := queryTickersLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("tickers rate limiter wait error: %w", err)
}
marketTickers, err := e.client.NewGetTickersRequest().Do(ctx)
if err != nil {
return nil, err
}
tickers := make(map[string]types.Ticker)
for _, marketTicker := range marketTickers {
symbol := toGlobalSymbol(marketTicker.InstrumentID)
ticker := toGlobalTicker(marketTicker)
tickers[symbol] = *ticker
}
if len(symbols) == 0 {
return tickers, nil
}
selectedTickers := make(map[string]types.Ticker, len(symbols))
for _, symbol := range symbols {
if ticker, ok := tickers[symbol]; ok {
selectedTickers[symbol] = ticker
}
}
return selectedTickers, nil
}
func (e *Exchange) PlatformFeeCurrency() string {
return PlatformToken
}
func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) {
bals, err := e.QueryAccountBalances(ctx)
if err != nil {
return nil, err
}
account := types.NewAccount()
account.UpdateBalances(bals)
return account, nil
}
func (e *Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap, error) {
if err := queryAccountLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("account rate limiter wait error: %w", err)
}
accountBalances, err := e.client.NewGetAccountInfoRequest().Do(ctx)
if err != nil {
return nil, err
}
if len(accountBalances) != 1 {
return nil, fmt.Errorf("unexpected length of balances: %v", accountBalances)
}
return toGlobalBalance(&accountBalances[0]), nil
}
func (e *Exchange) SubmitOrder(ctx context.Context, order types.SubmitOrder) (*types.Order, error) {
orderReq := e.client.NewPlaceOrderRequest()
orderReq.InstrumentID(toLocalSymbol(order.Symbol))
orderReq.Side(toLocalSideType(order.Side))
orderReq.Size(order.Market.FormatQuantity(order.Quantity))
// set price field for limit orders
switch order.Type {
case types.OrderTypeStopLimit, types.OrderTypeLimit, types.OrderTypeLimitMaker:
orderReq.Price(order.Market.FormatPrice(order.Price))
case types.OrderTypeMarket:
// Because our order.Quantity unit is base coin, so we indicate the target currency to Base.
if order.Side == types.SideTypeBuy {
orderReq.Size(order.Market.FormatQuantity(order.Quantity))
orderReq.TargetCurrency(okexapi.TargetCurrencyBase)
} else {
orderReq.Size(order.Market.FormatQuantity(order.Quantity))
orderReq.TargetCurrency(okexapi.TargetCurrencyQuote)
}
}
orderType, err := toLocalOrderType(order.Type)
if err != nil {
return nil, err
}
switch order.TimeInForce {
case types.TimeInForceFOK:
orderReq.OrderType(okexapi.OrderTypeFOK)
case types.TimeInForceIOC:
orderReq.OrderType(okexapi.OrderTypeIOC)
default:
orderReq.OrderType(orderType)
}
if err := placeOrderLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("place order rate limiter wait error: %w", err)
}
if len(order.ClientOrderID) > 0 {
if ok := clientOrderIdRegex.MatchString(order.ClientOrderID); !ok {
return nil, fmt.Errorf("client order id should be case-sensitive alphanumerics, all numbers, or all letters of up to 32 characters: %s", order.ClientOrderID)
}
orderReq.ClientOrderID(order.ClientOrderID)
}
timeNow := time.Now()
orders, err := orderReq.Do(ctx)
if err != nil {
return nil, err
}
if len(orders) != 1 {
return nil, fmt.Errorf("unexpected length of order response: %v", orders)
}
orderID, err := strconv.ParseUint(orders[0].OrderID, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse response order id: %w", err)
}
return &types.Order{
SubmitOrder: order,
Exchange: types.ExchangeOKEx,
OrderID: orderID,
Status: types.OrderStatusNew,
ExecutedQuantity: fixedpoint.Zero,
IsWorking: true,
CreationTime: types.Time(timeNow),
UpdateTime: types.Time(timeNow),
}, nil
// TODO: move this to batch place orders interface
/*
batchReq := e.client.TradeService.NewBatchPlaceOrderRequest()
batchReq.Add(reqs...)
orderHeads, err := batchReq.Do(ctx)
if err != nil {
return nil, err
}
for idx, orderHead := range orderHeads {
orderID, err := strconv.ParseInt(orderHead.OrderID, 10, 64)
if err != nil {
return createdOrder, err
}
submitOrder := order[idx]
createdOrder = append(createdOrder, types.Order{
SubmitOrder: submitOrder,
Exchange: types.ExchangeOKEx,
OrderID: uint64(orderID),
Status: types.OrderStatusNew,
ExecutedQuantity: fixedpoint.Zero,
IsWorking: true,
CreationTime: types.Time(time.Now()),
UpdateTime: types.Time(time.Now()),
IsMargin: false,
IsIsolated: false,
})
}
*/
}
// QueryOpenOrders retrieves the pending orders. The data returned is ordered by createdTime, and we utilized the
// `After` parameter to acquire all orders.
func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders []types.Order, err error) {
instrumentID := toLocalSymbol(symbol)
nextCursor := int64(0)
for {
if err := queryOpenOrderLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("query open orders rate limiter wait error: %w", err)
}
req := e.client.NewGetOpenOrdersRequest().
InstrumentID(instrumentID).
After(strconv.FormatInt(nextCursor, 10))
openOrders, err := req.Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query open orders: %w", err)
}
for _, o := range openOrders {
o, err := orderDetailToGlobal(&o.OrderDetail)
if err != nil {
return nil, fmt.Errorf("failed to convert order, err: %v", err)
}
orders = append(orders, *o)
}
orderLen := len(openOrders)
// a defensive programming to ensure the length of order response is expected.
if orderLen > defaultQueryLimit {
return nil, fmt.Errorf("unexpected open orders length %d", orderLen)
}
if orderLen < defaultQueryLimit {
break
}
nextCursor = int64(openOrders[orderLen-1].OrderId)
}
return orders, err
}
func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) error {
if len(orders) == 0 {
return nil
}
var reqs []*okexapi.CancelOrderRequest
for _, order := range orders {
if len(order.Symbol) == 0 {
return ErrSymbolRequired
}
req := e.client.NewCancelOrderRequest()
req.InstrumentID(toLocalSymbol(order.Symbol))
req.OrderID(strconv.FormatUint(order.OrderID, 10))
if len(order.ClientOrderID) > 0 {
if ok := clientOrderIdRegex.MatchString(order.ClientOrderID); !ok {
return fmt.Errorf("client order id should be case-sensitive alphanumerics, all numbers, or all letters of up to 32 characters: %s", order.ClientOrderID)
}
req.ClientOrderID(order.ClientOrderID)
}
reqs = append(reqs, req)
}
if err := batchCancelOrderLimiter.Wait(ctx); err != nil {
return fmt.Errorf("batch cancel order rate limiter wait error: %w", err)
}
batchReq := e.client.NewBatchCancelOrderRequest()
batchReq.Add(reqs...)
_, err := batchReq.Do(ctx)
return err
}
func (e *Exchange) NewStream() types.Stream {
return NewStream(e.client, e)
}
func (e *Exchange) QueryKLines(
ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions,
) ([]types.KLine, error) {
if err := queryKLineLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("query k line rate limiter wait error: %w", err)
}
intervalParam, err := toLocalInterval(interval)
if err != nil {
return nil, fmt.Errorf("failed to get interval: %w", err)
}
req := e.client.NewGetCandlesRequest().InstrumentID(toLocalSymbol(symbol))
req.Bar(intervalParam)
if options.StartTime != nil {
req.After(*options.StartTime)
}
if options.EndTime != nil {
req.Before(*options.EndTime)
}
candles, err := req.Do(ctx)
if err != nil {
return nil, err
}
var klines []types.KLine
for _, candle := range candles {
klines = append(klines, kLineToGlobal(candle, interval, symbol))
}
return klines, nil
}
func (e *Exchange) QueryOrder(ctx context.Context, q types.OrderQuery) (*types.Order, error) {
if len(q.Symbol) == 0 {
return nil, ErrSymbolRequired
}
if len(q.OrderID) == 0 && len(q.ClientOrderID) == 0 {
return nil, errors.New("okex.QueryOrder: OrderId or ClientOrderId is required parameter")
}
req := e.client.NewGetOrderDetailsRequest()
req.InstrumentID(toLocalSymbol(q.Symbol)).
OrderID(q.OrderID).
ClientOrderID(q.ClientOrderID)
var order *okexapi.OrderDetails
order, err := req.Do(ctx)
if err != nil {
return nil, err
}
return toGlobalOrder(order)
}
// QueryOrderTrades quires order trades can query trades in last 3 months.
func (e *Exchange) QueryOrderTrades(ctx context.Context, q types.OrderQuery) (trades []types.Trade, err error) {
if len(q.ClientOrderID) != 0 {
log.Warn("!!!OKEX EXCHANGE API NOTICE!!! Okex does not support searching for trades using OrderClientId.")
}
req := e.client.NewGetTransactionHistoryRequest()
if len(q.Symbol) != 0 {
req.InstrumentID(toLocalSymbol(q.Symbol))
}
if len(q.OrderID) != 0 {
req.OrderID(q.OrderID)
}
if err := queryTradeLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("order trade rate limiter wait error: %w", err)
}
response, err := req.Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to query order trades, err: %w", err)
}
for _, trade := range response {
trades = append(trades, tradeToGlobal(trade))
}
return trades, nil
}
/*
QueryClosedOrders can query closed orders in last 3 months, there are no time interval limitations, as long as until >= since.
Please Use lastOrderID as cursor, only return orders later than that order, that order is not included.
If you want to query all orders within a large time range (e.g. total orders > 100), we recommend using batch.ClosedOrderBatchQuery.
** since and until are inclusive, you can include the lastTradeId as well. **
*/
func (e *Exchange) QueryClosedOrders(
ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64,
) (orders []types.Order, err error) {
if symbol == "" {
return nil, ErrSymbolRequired
}
newSince := since
now := time.Now()
if time.Since(newSince) > maxHistoricalDataQueryPeriod {
newSince = now.Add(-maxHistoricalDataQueryPeriod)
log.Warnf("!!!OKX EXCHANGE API NOTICE!!! The closed order API cannot query data beyond 90 days from the current date, update %s -> %s", since, newSince)
}
if until.Before(newSince) {
log.Warnf("!!!OKX EXCHANGE API NOTICE!!! The 'until' comes before 'since', update until to now(%s -> %s).", until, now)
until = now
}
if until.Sub(newSince) > maxHistoricalDataQueryPeriod {
return nil, fmt.Errorf("the start time %s and end time %s cannot exceed 90 days", newSince, until)
}
if err := queryClosedOrderRateLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("query closed order rate limiter wait error: %w", err)
}
res, err := e.client.NewGetOrderHistoryRequest().
InstrumentID(toLocalSymbol(symbol)).
StartTime(since).
EndTime(until).
Limit(defaultQueryLimit).
Before(strconv.FormatUint(lastOrderID, 10)).
Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to call get order histories error: %w", err)
}
for _, order := range res {
o, err2 := orderDetailToGlobal(&order)
if err2 != nil {
err = multierr.Append(err, err2)
continue
}
orders = append(orders, *o)
}
if err != nil {
return nil, err
}
return types.SortOrdersAscending(orders), nil
}
/*
QueryTrades can query trades in last 3 months, there are no time interval limitations, as long as end_time >= start_time.
okx does not provide an API to query by trade ID, so we use the bill ID to do it. The trades result is ordered by timestamp.
REMARK: If your start time is 90 days earlier, we will update it to now - 90 days.
** StartTime and EndTime are inclusive. **
** StartTime and EndTime cannot exceed 90 days. **
** StartTime, EndTime, FromTradeId can be used together. **
If you want to query all trades within a large time range (e.g. total orders > 100), we recommend using batch.TradeBatchQuery.
We don't support the last trade id as a filter because okx supports bill ID only.
*/
func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) {
if symbol == "" {
return nil, ErrSymbolRequired
}
limit := options.Limit
if limit > defaultQueryLimit || limit <= 0 {
log.Infof("limit is exceeded default limit %d or zero, got: %d, use default limit", defaultQueryLimit, limit)
limit = defaultQueryLimit
}
timeNow := e.timeNowFunc()
newStartTime := timeNow.Add(-threeDaysHistoricalPeriod)
if options.StartTime != nil {
newStartTime = *options.StartTime
if timeNow.Sub(newStartTime) > maxHistoricalDataQueryPeriod {
newStartTime = timeNow.Add(-maxHistoricalDataQueryPeriod)
log.Warnf("!!!OKX EXCHANGE API NOTICE!!! The trade API cannot query data beyond 90 days from the current date, update %s -> %s", *options.StartTime, newStartTime)
}
}
endTime := timeNow
if options.EndTime != nil {
if options.EndTime.Before(newStartTime) {
return nil, fmt.Errorf("end time %s before start %s", *options.EndTime, newStartTime)
}
if options.EndTime.Sub(newStartTime) > maxHistoricalDataQueryPeriod {
return nil, fmt.Errorf("start time %s and end time %s cannot greater than 90 days", newStartTime, options.EndTime)
}
endTime = *options.EndTime
}
if options.LastTradeID != 0 {
// we don't support the last trade id as a filter because okx supports bill ID only.
// we don't have any more fields (types.Trade) to store it.
log.Infof("Last trade id not supported on QueryTrades")
}
if timeNow.Sub(newStartTime) <= threeDaysHistoricalPeriod {
c := e.client.NewGetThreeDaysTransactionHistoryRequest().
InstrumentID(toLocalSymbol(symbol)).
StartTime(newStartTime).
EndTime(endTime).
Limit(uint64(limit))
return getTrades(ctx, limit, func(ctx context.Context, billId string) ([]okexapi.Trade, error) {
c.Before(billId)
return c.Do(ctx)
})
}
c := e.client.NewGetTransactionHistoryRequest().
InstrumentID(toLocalSymbol(symbol)).
StartTime(newStartTime).
EndTime(endTime).
Limit(uint64(limit))
return getTrades(ctx, limit, func(ctx context.Context, billId string) ([]okexapi.Trade, error) {
c.Before(billId)
return c.Do(ctx)
})
}
func getTrades(ctx context.Context, limit int64, doFunc func(ctx context.Context, billId string) ([]okexapi.Trade, error)) (trades []types.Trade, err error) {
billId := "0"
for {
response, err := doFunc(ctx, billId)
if err != nil {
return nil, fmt.Errorf("failed to query trades, err: %w", err)
}
for _, trade := range response {
trades = append(trades, tradeToGlobal(trade))
}
tradeLen := int64(len(response))
// a defensive programming to ensure the length of order response is expected.
if tradeLen > limit {
return nil, fmt.Errorf("unexpected trade length %d", tradeLen)
}
if tradeLen < limit {
break
}
// use Before filter to get all data.
billId = response[tradeLen-1].BillId.String()
}
return trades, nil
}
func (e *Exchange) SupportedInterval() map[types.Interval]int {
return SupportedIntervals
}
func (e *Exchange) IsSupportedInterval(interval types.Interval) bool {
_, ok := SupportedIntervals[interval]
return ok
}