Merge pull request #1424 from c9s/c9s/start-up-balance-query

IMPROVE: improve startup balance query process
This commit is contained in:
c9s 2023-11-20 17:30:30 +08:00 committed by GitHub
commit da3150e288
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 21 deletions

View File

@ -329,8 +329,13 @@ type ServiceConfig struct {
type EnvironmentConfig struct { type EnvironmentConfig struct {
DisableDefaultKLineSubscription bool `json:"disableDefaultKLineSubscription"` DisableDefaultKLineSubscription bool `json:"disableDefaultKLineSubscription"`
DisableHistoryKLinePreload bool `json:"disableHistoryKLinePreload"` DisableHistoryKLinePreload bool `json:"disableHistoryKLinePreload"`
DisableSessionTradeBuffer bool `json:"disableSessionTradeBuffer"`
MaxSessionTradeBufferSize int `json:"maxSessionTradeBufferSize"` // DisableStartUpBalanceQuery disables the balance query in the startup process
// which initializes the session.Account with the QueryAccount method.
DisableStartupBalanceQuery bool `json:"disableStartupBalanceQuery"`
DisableSessionTradeBuffer bool `json:"disableSessionTradeBuffer"`
MaxSessionTradeBufferSize int `json:"maxSessionTradeBufferSize"`
} }
type Config struct { type Config struct {

View File

@ -15,6 +15,7 @@ import (
"github.com/c9s/bbgo/pkg/cache" "github.com/c9s/bbgo/pkg/cache"
"github.com/c9s/bbgo/pkg/core" "github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/util/templateutil" "github.com/c9s/bbgo/pkg/util/templateutil"
exchange2 "github.com/c9s/bbgo/pkg/exchange" exchange2 "github.com/c9s/bbgo/pkg/exchange"
@ -177,12 +178,16 @@ func (session *ExchangeSession) UpdateAccount(ctx context.Context) (*types.Accou
return nil, err return nil, err
} }
session.accountMutex.Lock() session.setAccount(account)
session.Account = account
session.accountMutex.Unlock()
return account, nil return account, nil
} }
func (session *ExchangeSession) setAccount(a *types.Account) {
session.accountMutex.Lock()
session.Account = a
session.accountMutex.Unlock()
}
// Init initializes the basic data structure and market information by its exchange. // Init initializes the basic data structure and market information by its exchange.
// Note that the subscribed symbols are not loaded in this stage. // Note that the subscribed symbols are not loaded in this stage.
func (session *ExchangeSession) Init(ctx context.Context, environ *Environment) error { func (session *ExchangeSession) Init(ctx context.Context, environ *Environment) error {
@ -256,18 +261,23 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
} }
} }
logger.Infof("querying account balances...") disableStartupBalanceQuery := environ.environmentConfig != nil && environ.environmentConfig.DisableStartupBalanceQuery
account, err := session.Exchange.QueryAccount(ctx) if disableStartupBalanceQuery {
if err != nil { session.accountMutex.Lock()
return err session.Account = types.NewAccount()
session.accountMutex.Unlock()
} else {
logger.Infof("querying account balances...")
account, err := retry.QueryAccountUntilSuccessful(ctx, session.Exchange)
if err != nil {
return err
}
session.setAccount(account)
session.metricsBalancesUpdater(account.Balances())
logger.Infof("account %s balances:\n%s", session.Name, account.Balances().String())
} }
session.accountMutex.Lock()
session.Account = account
session.accountMutex.Unlock()
logger.Infof("account %s balances:\n%s", session.Name, account.Balances().String())
// forward trade updates and order updates to the order executor // forward trade updates and order updates to the order executor
session.UserDataStream.OnTradeUpdate(session.OrderExecutor.EmitTradeUpdate) session.UserDataStream.OnTradeUpdate(session.OrderExecutor.EmitTradeUpdate)
session.UserDataStream.OnOrderUpdate(session.OrderExecutor.EmitOrderUpdate) session.UserDataStream.OnOrderUpdate(session.OrderExecutor.EmitOrderUpdate)
@ -288,7 +298,6 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
// if metrics mode is enabled, we bind the callbacks to update metrics // if metrics mode is enabled, we bind the callbacks to update metrics
if viper.GetBool("metrics") { if viper.GetBool("metrics") {
session.metricsBalancesUpdater(account.Balances())
session.bindUserDataStreamMetrics(session.UserDataStream) session.bindUserDataStreamMetrics(session.UserDataStream)
} }
} }

View File

@ -16,7 +16,9 @@ type advancedOrderCancelService interface {
CancelOrdersByGroupID(ctx context.Context, groupID uint32) ([]types.Order, error) CancelOrdersByGroupID(ctx context.Context, groupID uint32) ([]types.Order, error)
} }
func QueryOrderUntilFilled(ctx context.Context, queryOrderService types.ExchangeOrderQueryService, symbol string, orderId uint64) (o *types.Order, err error) { func QueryOrderUntilFilled(
ctx context.Context, queryOrderService types.ExchangeOrderQueryService, symbol string, orderId uint64,
) (o *types.Order, err error) {
var op = func() (err2 error) { var op = func() (err2 error) {
o, err2 = queryOrderService.QueryOrder(ctx, types.OrderQuery{ o, err2 = queryOrderService.QueryOrder(ctx, types.OrderQuery{
Symbol: symbol, Symbol: symbol,
@ -56,7 +58,9 @@ func GeneralLiteBackoff(ctx context.Context, op backoff.Operation) (err error) {
return err return err
} }
func QueryOpenOrdersUntilSuccessful(ctx context.Context, ex types.Exchange, symbol string) (openOrders []types.Order, err error) { func QueryOpenOrdersUntilSuccessful(
ctx context.Context, ex types.Exchange, symbol string,
) (openOrders []types.Order, err error) {
var op = func() (err2 error) { var op = func() (err2 error) {
openOrders, err2 = ex.QueryOpenOrders(ctx, symbol) openOrders, err2 = ex.QueryOpenOrders(ctx, symbol)
return err2 return err2
@ -66,7 +70,9 @@ func QueryOpenOrdersUntilSuccessful(ctx context.Context, ex types.Exchange, symb
return openOrders, err return openOrders, err
} }
func QueryOpenOrdersUntilSuccessfulLite(ctx context.Context, ex types.Exchange, symbol string) (openOrders []types.Order, err error) { func QueryOpenOrdersUntilSuccessfulLite(
ctx context.Context, ex types.Exchange, symbol string,
) (openOrders []types.Order, err error) {
var op = func() (err2 error) { var op = func() (err2 error) {
openOrders, err2 = ex.QueryOpenOrders(ctx, symbol) openOrders, err2 = ex.QueryOpenOrders(ctx, symbol)
return err2 return err2
@ -76,7 +82,21 @@ func QueryOpenOrdersUntilSuccessfulLite(ctx context.Context, ex types.Exchange,
return openOrders, err return openOrders, err
} }
func QueryOrderUntilSuccessful(ctx context.Context, query types.ExchangeOrderQueryService, opts types.OrderQuery) (order *types.Order, err error) { func QueryAccountUntilSuccessful(
ctx context.Context, ex types.ExchangeAccountService,
) (account *types.Account, err error) {
var op = func() (err2 error) {
account, err2 = ex.QueryAccount(ctx)
return err2
}
err = GeneralBackoff(ctx, op)
return account, err
}
func QueryOrderUntilSuccessful(
ctx context.Context, query types.ExchangeOrderQueryService, opts types.OrderQuery,
) (order *types.Order, err error) {
var op = func() (err2 error) { var op = func() (err2 error) {
order, err2 = query.QueryOrder(ctx, opts) order, err2 = query.QueryOrder(ctx, opts)
return err2 return err2

View File

@ -103,8 +103,26 @@ type IsolatedMarginAccountInfo struct {
func NewAccount() *Account { func NewAccount() *Account {
return &Account{ return &Account{
balances: make(BalanceMap), AccountType: "spot",
FuturesInfo: nil,
MarginInfo: nil,
IsolatedMarginInfo: nil,
MarginLevel: fixedpoint.Zero,
MarginTolerance: fixedpoint.Zero,
BorrowEnabled: false,
TransferEnabled: false,
MarginRatio: fixedpoint.Zero,
LiquidationPrice: fixedpoint.Zero,
LiquidationRate: fixedpoint.Zero,
MakerFeeRate: fixedpoint.Zero,
TakerFeeRate: fixedpoint.Zero,
TotalAccountValue: fixedpoint.Zero,
CanDeposit: false,
CanTrade: false,
CanWithdraw: false,
balances: make(BalanceMap),
} }
} }
// Balances lock the balances and returned the copied balances // Balances lock the balances and returned the copied balances