adjust max api call rate limiting

This commit is contained in:
c9s 2021-02-22 15:01:05 +08:00
parent 724dad70bb
commit eaad414706
6 changed files with 43 additions and 20 deletions

View File

@ -21,6 +21,9 @@ const SyncNotStarted = 0
const Syncing = 1
const SyncDone = 2
// session is configured, check if we're syncing data
let syncStatusPoller = null
export default function MyApp(props) {
const {Component, pageProps} = props;
@ -38,8 +41,6 @@ export default function MyApp(props) {
if (sessions.length > 0) {
setSyncing(true)
// session is configured, check if we're syncing data
let poller = null
const pollSyncStatus = () => {
querySyncStatus((status) => {
switch (status) {
@ -49,9 +50,9 @@ export default function MyApp(props) {
setSyncing(true);
break;
case SyncDone:
clearInterval(syncStatusPoller);
setLoading(false);
setSyncing(false);
clearInterval(poller);
break;
}
}).catch((err) => {
@ -59,7 +60,7 @@ export default function MyApp(props) {
})
}
poller = setInterval(pollSyncStatus, 1000)
syncStatusPoller = setInterval(pollSyncStatus, 1000)
} else {
// no session found, so we can not sync any data
setLoading(false)

View File

@ -50,7 +50,6 @@ func RegisterStrategy(key string, s interface{}) {
var emptyTime time.Time
type SyncStatus int
const (
@ -91,6 +90,7 @@ func NewEnvironment() *Environment {
sessions: make(map[string]*ExchangeSession),
startTime: time.Now(),
syncStatus: SyncNotStarted,
PersistenceServiceFacade: &service.PersistenceServiceFacade{
Memory: service.NewMemoryService(),
},
@ -497,6 +497,7 @@ func (environ *Environment) IsSyncing() (status SyncStatus) {
func (environ *Environment) setSyncing(status SyncStatus) {
environ.syncStatusMutex.Lock()
environ.syncStatus = status
log.Infof("setting sync status to %d", environ.syncStatus)
environ.syncStatusMutex.Unlock()
}
@ -528,11 +529,17 @@ func (environ *Environment) SyncSession(ctx context.Context, session *ExchangeSe
}
func (environ *Environment) syncSession(ctx context.Context, session *ExchangeSession, defaultSymbols ...string) error {
if err := session.Init(ctx, environ) ; err != nil {
return err
}
symbols, err := getSessionSymbols(session, defaultSymbols...)
if err != nil {
return err
}
log.Infof("syncing symbols %v from session %s", symbols, session.Name)
return environ.TradeSync.SyncSessionSymbols(ctx, session.Exchange, environ.syncStartTime, symbols...)
}

View File

@ -521,7 +521,7 @@ func (session *ExchangeSession) UpdatePrices(ctx context.Context) (err error) {
func (session *ExchangeSession) FindPossibleSymbols() (symbols []string, err error) {
// If the session is an isolated margin session, there will be only the isolated margin symbol
if session.IsolatedMargin {
if session.Margin && session.IsolatedMargin {
return []string{
session.IsolatedMarginSymbol,
}, nil

View File

@ -131,7 +131,7 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
logrus.WithError(err).Error("rate limit error")
}
logrus.Debugf("querying %s trades from id=%d limit=%d", symbol, lastTradeID, options.Limit)
logrus.Infof("querying %s trades from id=%d limit=%d", symbol, lastTradeID, options.Limit)
var err error
var trades []types.Trade

View File

@ -19,7 +19,10 @@ import (
"github.com/c9s/bbgo/pkg/util"
)
var limiter = rate.NewLimiter(rate.Every(5*time.Second), 2)
var closedOrderQueryLimiter = rate.NewLimiter(rate.Every(10*time.Second), 1)
var tradeQuerylimiter = rate.NewLimiter(rate.Every(5*time.Second), 1)
var accountQuerylimiter = rate.NewLimiter(rate.Every(5*time.Second), 1)
var marketDataLimiter = rate.NewLimiter(rate.Every(5*time.Second), 1)
var log = logrus.WithField("exchange", "max")
@ -66,17 +69,21 @@ func (e *Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticke
}
func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) {
var ret = make(map[string]types.Ticker)
if err := marketDataLimiter.Wait(ctx) ; err != nil {
return nil, err
}
var tickers = make(map[string]types.Ticker)
if len(symbol) == 1 {
ticker, err := e.QueryTicker(ctx, symbol[0])
if err != nil {
return nil, err
}
ret[toGlobalSymbol(symbol[0])] = *ticker
tickers[toGlobalSymbol(symbol[0])] = *ticker
} else {
tickers, err := e.client.PublicService.Tickers()
maxTickers, err := e.client.PublicService.Tickers()
if err != nil {
return nil, err
}
@ -87,11 +94,11 @@ func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[stri
m[toGlobalSymbol(s)] = exists
}
for k, v := range tickers {
for k, v := range maxTickers {
if _, ok := m[toGlobalSymbol(k)]; len(symbol) != 0 && !ok {
continue
}
ret[toGlobalSymbol(k)] = types.Ticker{
tickers[toGlobalSymbol(k)] = types.Ticker{
Time: v.Time,
Volume: util.MustParseFloat(v.Volume),
Last: util.MustParseFloat(v.Last),
@ -104,7 +111,7 @@ func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[stri
}
}
return ret, nil
return tickers, nil
}
func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) {
@ -167,7 +174,7 @@ func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders [
// lastOrderID is not supported on MAX
func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []types.Order, err error) {
if err := limiter.Wait(ctx) ; err != nil {
if err := closedOrderQueryLimiter.Wait(ctx) ; err != nil {
return nil, err
}
@ -369,7 +376,7 @@ func (e *Exchange) PlatformFeeCurrency() string {
}
func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) {
if err := limiter.Wait(ctx) ; err != nil {
if err := accountQuerylimiter.Wait(ctx) ; err != nil {
return nil, err
}
@ -514,7 +521,7 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since,
}
func (e *Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap, error) {
if err := limiter.Wait(ctx) ; err != nil {
if err := accountQuerylimiter.Wait(ctx) ; err != nil {
return nil, err
}
@ -537,7 +544,7 @@ func (e *Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap,
}
func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) {
if err := limiter.Wait(ctx) ; err != nil {
if err := tradeQuerylimiter.Wait(ctx) ; err != nil {
return nil, err
}
@ -577,7 +584,7 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
}
func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
if err := limiter.Wait(ctx) ; err != nil {
if err := marketDataLimiter.Wait(ctx) ; err != nil {
return nil, err
}

View File

@ -119,7 +119,15 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
tradeKeys[key] = struct{}{}
logrus.Infof("inserting trade: %d %s %-4s price: %-13f volume: %-11f %5s %s", trade.ID, trade.Symbol, trade.Side, trade.Price, trade.Quantity, trade.MakerOrTakerLabel(), trade.Time.String())
logrus.Infof("inserting trade: %s %d %s %-4s price: %-13f volume: %-11f %5s %s",
trade.Exchange,
trade.ID,
trade.Symbol,
trade.Side,
trade.Price,
trade.Quantity,
trade.MakerOrTakerLabel(),
trade.Time.String())
if err := s.TradeService.Insert(trade); err != nil {
return err