From eaad414706352f60823037d5a3c0b18d9710aa86 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 22 Feb 2021 15:01:05 +0800 Subject: [PATCH] adjust max api call rate limiting --- frontend/pages/_app.tsx | 9 +++++---- pkg/bbgo/environment.go | 9 ++++++++- pkg/bbgo/session.go | 2 +- pkg/exchange/batch/batch.go | 2 +- pkg/exchange/max/exchange.go | 31 +++++++++++++++++++------------ pkg/service/sync.go | 10 +++++++++- 6 files changed, 43 insertions(+), 20 deletions(-) diff --git a/frontend/pages/_app.tsx b/frontend/pages/_app.tsx index f1a4b0cc5..745ffacb7 100644 --- a/frontend/pages/_app.tsx +++ b/frontend/pages/_app.tsx @@ -21,6 +21,9 @@ const SyncNotStarted = 0 const Syncing = 1 const SyncDone = 2 +// session is configured, check if we're syncing data +let syncStatusPoller = null + export default function MyApp(props) { const {Component, pageProps} = props; @@ -38,8 +41,6 @@ export default function MyApp(props) { if (sessions.length > 0) { setSyncing(true) - // session is configured, check if we're syncing data - let poller = null const pollSyncStatus = () => { querySyncStatus((status) => { switch (status) { @@ -49,9 +50,9 @@ export default function MyApp(props) { setSyncing(true); break; case SyncDone: + clearInterval(syncStatusPoller); setLoading(false); setSyncing(false); - clearInterval(poller); break; } }).catch((err) => { @@ -59,7 +60,7 @@ export default function MyApp(props) { }) } - poller = setInterval(pollSyncStatus, 1000) + syncStatusPoller = setInterval(pollSyncStatus, 1000) } else { // no session found, so we can not sync any data setLoading(false) diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index a6d804425..e80501eb5 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -50,7 +50,6 @@ func RegisterStrategy(key string, s interface{}) { var emptyTime time.Time - type SyncStatus int const ( @@ -91,6 +90,7 @@ func NewEnvironment() *Environment { sessions: make(map[string]*ExchangeSession), startTime: time.Now(), + syncStatus: SyncNotStarted, PersistenceServiceFacade: &service.PersistenceServiceFacade{ Memory: service.NewMemoryService(), }, @@ -497,6 +497,7 @@ func (environ *Environment) IsSyncing() (status SyncStatus) { func (environ *Environment) setSyncing(status SyncStatus) { environ.syncStatusMutex.Lock() environ.syncStatus = status + log.Infof("setting sync status to %d", environ.syncStatus) environ.syncStatusMutex.Unlock() } @@ -528,11 +529,17 @@ func (environ *Environment) SyncSession(ctx context.Context, session *ExchangeSe } func (environ *Environment) syncSession(ctx context.Context, session *ExchangeSession, defaultSymbols ...string) error { + if err := session.Init(ctx, environ) ; err != nil { + return err + } + symbols, err := getSessionSymbols(session, defaultSymbols...) if err != nil { return err } + log.Infof("syncing symbols %v from session %s", symbols, session.Name) + return environ.TradeSync.SyncSessionSymbols(ctx, session.Exchange, environ.syncStartTime, symbols...) } diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index a7477a8b1..c77eab6cc 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -521,7 +521,7 @@ func (session *ExchangeSession) UpdatePrices(ctx context.Context) (err error) { func (session *ExchangeSession) FindPossibleSymbols() (symbols []string, err error) { // If the session is an isolated margin session, there will be only the isolated margin symbol - if session.IsolatedMargin { + if session.Margin && session.IsolatedMargin { return []string{ session.IsolatedMarginSymbol, }, nil diff --git a/pkg/exchange/batch/batch.go b/pkg/exchange/batch/batch.go index 8f6773002..36f462cbf 100644 --- a/pkg/exchange/batch/batch.go +++ b/pkg/exchange/batch/batch.go @@ -131,7 +131,7 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str logrus.WithError(err).Error("rate limit error") } - logrus.Debugf("querying %s trades from id=%d limit=%d", symbol, lastTradeID, options.Limit) + logrus.Infof("querying %s trades from id=%d limit=%d", symbol, lastTradeID, options.Limit) var err error var trades []types.Trade diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index da3d14432..3a84fb688 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -19,7 +19,10 @@ import ( "github.com/c9s/bbgo/pkg/util" ) -var limiter = rate.NewLimiter(rate.Every(5*time.Second), 2) +var closedOrderQueryLimiter = rate.NewLimiter(rate.Every(10*time.Second), 1) +var tradeQuerylimiter = rate.NewLimiter(rate.Every(5*time.Second), 1) +var accountQuerylimiter = rate.NewLimiter(rate.Every(5*time.Second), 1) +var marketDataLimiter = rate.NewLimiter(rate.Every(5*time.Second), 1) var log = logrus.WithField("exchange", "max") @@ -66,17 +69,21 @@ func (e *Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticke } func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) { - var ret = make(map[string]types.Ticker) + if err := marketDataLimiter.Wait(ctx) ; err != nil { + return nil, err + } + var tickers = make(map[string]types.Ticker) if len(symbol) == 1 { ticker, err := e.QueryTicker(ctx, symbol[0]) if err != nil { return nil, err } - ret[toGlobalSymbol(symbol[0])] = *ticker + tickers[toGlobalSymbol(symbol[0])] = *ticker } else { - tickers, err := e.client.PublicService.Tickers() + + maxTickers, err := e.client.PublicService.Tickers() if err != nil { return nil, err } @@ -87,11 +94,11 @@ func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[stri m[toGlobalSymbol(s)] = exists } - for k, v := range tickers { + for k, v := range maxTickers { if _, ok := m[toGlobalSymbol(k)]; len(symbol) != 0 && !ok { continue } - ret[toGlobalSymbol(k)] = types.Ticker{ + tickers[toGlobalSymbol(k)] = types.Ticker{ Time: v.Time, Volume: util.MustParseFloat(v.Volume), Last: util.MustParseFloat(v.Last), @@ -104,7 +111,7 @@ func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[stri } } - return ret, nil + return tickers, nil } func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) { @@ -167,7 +174,7 @@ func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders [ // lastOrderID is not supported on MAX func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []types.Order, err error) { - if err := limiter.Wait(ctx) ; err != nil { + if err := closedOrderQueryLimiter.Wait(ctx) ; err != nil { return nil, err } @@ -369,7 +376,7 @@ func (e *Exchange) PlatformFeeCurrency() string { } func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) { - if err := limiter.Wait(ctx) ; err != nil { + if err := accountQuerylimiter.Wait(ctx) ; err != nil { return nil, err } @@ -514,7 +521,7 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, } func (e *Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap, error) { - if err := limiter.Wait(ctx) ; err != nil { + if err := accountQuerylimiter.Wait(ctx) ; err != nil { return nil, err } @@ -537,7 +544,7 @@ func (e *Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap, } func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) { - if err := limiter.Wait(ctx) ; err != nil { + if err := tradeQuerylimiter.Wait(ctx) ; err != nil { return nil, err } @@ -577,7 +584,7 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type } func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) { - if err := limiter.Wait(ctx) ; err != nil { + if err := marketDataLimiter.Wait(ctx) ; err != nil { return nil, err } diff --git a/pkg/service/sync.go b/pkg/service/sync.go index ba6b2edc0..cb9d5e7e8 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -119,7 +119,15 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s tradeKeys[key] = struct{}{} - logrus.Infof("inserting trade: %d %s %-4s price: %-13f volume: %-11f %5s %s", trade.ID, trade.Symbol, trade.Side, trade.Price, trade.Quantity, trade.MakerOrTakerLabel(), trade.Time.String()) + logrus.Infof("inserting trade: %s %d %s %-4s price: %-13f volume: %-11f %5s %s", + trade.Exchange, + trade.ID, + trade.Symbol, + trade.Side, + trade.Price, + trade.Quantity, + trade.MakerOrTakerLabel(), + trade.Time.String()) if err := s.TradeService.Insert(trade); err != nil { return err