move time.Sleep to batch processor to avoid rate limit

This commit is contained in:
c9s 2021-01-20 02:32:55 +08:00
parent c79c7d1b11
commit 0e99d9bdcb
4 changed files with 46 additions and 30 deletions

View File

@ -45,11 +45,6 @@ var SyncCmd = &cobra.Command{
return err
}
sessionName, err := cmd.Flags().GetString("session")
if err != nil {
return err
}
since, err := cmd.Flags().GetString("since")
if err != nil {
return err
@ -81,33 +76,54 @@ var SyncCmd = &cobra.Command{
}
}
sessionName, err := cmd.Flags().GetString("session")
if err != nil {
return err
}
symbol, err := cmd.Flags().GetString("symbol")
if err != nil {
return err
}
session, ok := environ.Session(sessionName)
if !ok {
return fmt.Errorf("session %s not found", sessionName)
if len(sessionName) > 0 {
session, ok := environ.Session(sessionName)
if !ok {
return fmt.Errorf("session %s not found", sessionName)
}
return syncSession(ctx, environ, session, symbol, startTime)
}
if session.IsIsolatedMargin {
log.Infof("session is configured as isolated margin, using isolated margin symbol %s instead", session.IsolatedMarginSymbol)
symbol = session.IsolatedMarginSymbol
for _, session := range environ.Sessions() {
if err := syncSession(ctx, environ, session, symbol, startTime); err != nil {
return err
}
}
log.Infof("syncing trades from exchange session %s...", sessionName)
if err := environ.TradeSync.SyncTrades(ctx, session.Exchange, symbol, startTime); err != nil {
return err
}
log.Infof("syncing orders from exchange session %s...", sessionName)
if err := environ.TradeSync.SyncOrders(ctx, session.Exchange, symbol, startTime); err != nil {
return err
}
log.Info("synchronization done")
return nil
},
}
func syncSession(ctx context.Context, environ *bbgo.Environment, session *bbgo.ExchangeSession, symbol string, startTime time.Time) error {
log.Infof("starting syncing exchange session %s", session.Name)
if session.IsIsolatedMargin {
log.Infof("session is configured as isolated margin session, using isolated margin symbol %s instead of %s", session.IsolatedMarginSymbol, symbol)
symbol = session.IsolatedMarginSymbol
}
log.Infof("syncing trades from exchange session %s...", session.Name)
if err := environ.TradeSync.SyncTrades(ctx, session.Exchange, symbol, startTime); err != nil {
return err
}
log.Infof("syncing orders from exchange session %s...", session.Name)
if err := environ.TradeSync.SyncOrders(ctx, session.Exchange, symbol, startTime); err != nil {
return err
}
log.Infof("exchange session %s synchronization done", session.Name)
return nil
}

View File

@ -138,7 +138,8 @@ func toGlobalSideType(side binance.SideType) types.SideType {
func toGlobalOrderType(orderType binance.OrderType) types.OrderType {
switch orderType {
case binance.OrderTypeLimit:
case binance.OrderTypeLimit,
binance.OrderTypeLimitMaker, binance.OrderTypeTakeProfitLimit:
return types.OrderTypeLimit
case binance.OrderTypeMarket:

View File

@ -320,7 +320,6 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since,
until = since.Add(24*time.Hour - time.Millisecond)
}
time.Sleep(3 * time.Second)
log.Infof("querying closed orders %s from %s <=> %s ...", symbol, since, until)
if e.IsMargin {
@ -572,8 +571,6 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
log.Infof("querying kline %s %s %v", symbol, interval, options)
// avoid rate limit
time.Sleep(500 * time.Millisecond)
req := e.Client.NewKlinesService().
Symbol(symbol).
Interval(string(interval)).
@ -700,9 +697,6 @@ func (e *Exchange) BatchQueryKLines(ctx context.Context, symbol string, interval
allKLines = append(allKLines, kline)
startTime = kline.EndTime
}
// avoid rate limit
time.Sleep(100 * time.Millisecond)
}
return allKLines, nil

View File

@ -95,6 +95,8 @@ func (e ExchangeBatchProcessor) BatchQueryClosedOrders(ctx context.Context, symb
}
for startTime.Before(endTime) {
time.Sleep(5 * time.Second)
log.Infof("batch querying %s closed orders %s <=> %s", symbol, startTime, endTime)
orders, err := e.QueryClosedOrders(ctx, symbol, startTime, endTime, lastOrderID)
@ -134,6 +136,8 @@ func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol str
defer close(errC)
for startTime.Before(endTime) {
time.Sleep(5 * time.Second)
kLines, err := e.QueryKLines(ctx, symbol, interval, KLineQueryOptions{
StartTime: &startTime,
Limit: 1000,
@ -184,6 +188,7 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
defer close(errC)
for {
time.Sleep(5 * time.Second)
log.Infof("querying %s trades from %s, limit=%d", symbol, startTime, options.Limit)
trades, err := e.QueryTrades(ctx, symbol, &TradeQueryOptions{