bbgo: improve account updating

This commit is contained in:
c9s 2022-04-22 18:53:06 +08:00
parent 9e48a850bd
commit cf055c3f7d
2 changed files with 29 additions and 14 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/c9s/bbgo/pkg/cache"
@ -190,7 +191,8 @@ type ExchangeSession struct {
// ---------------------------
// The exchange account states
Account *types.Account `json:"-" yaml:"-"`
Account *types.Account `json:"-" yaml:"-"`
accountMutex sync.Mutex
IsInitialized bool `json:"-" yaml:"-"`
@ -280,6 +282,18 @@ func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession {
return session
}
// UpdateAccount locks the account mutex and update the account object
func (session *ExchangeSession) UpdateAccount(ctx context.Context) error {
account, err := session.Exchange.QueryAccount(ctx)
if err != nil {
return err
}
session.accountMutex.Lock()
session.Account = account
session.accountMutex.Unlock()
return nil
}
// Init initializes the basic data structure and market information by its exchange.
// Note that the subscribed symbols are not loaded in this stage.
func (session *ExchangeSession) Init(ctx context.Context, environ *Environment) error {
@ -316,7 +330,9 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
return err
}
session.accountMutex.Lock()
session.Account = account
session.accountMutex.Unlock()
log.Infof("%s account", session.Name)
account.Balances().Print()
@ -324,7 +340,18 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
// forward trade updates and order updates to the order executor
session.UserDataStream.OnTradeUpdate(session.OrderExecutor.EmitTradeUpdate)
session.UserDataStream.OnOrderUpdate(session.OrderExecutor.EmitOrderUpdate)
session.Account.BindStream(session.UserDataStream)
session.UserDataStream.OnBalanceSnapshot(func(balances types.BalanceMap) {
session.accountMutex.Lock()
session.Account.UpdateBalances(balances)
session.accountMutex.Unlock()
})
session.UserDataStream.OnBalanceUpdate(func(balances types.BalanceMap) {
session.accountMutex.Lock()
session.Account.UpdateBalances(balances)
session.accountMutex.Unlock()
})
session.bindConnectionStatusNotification(session.UserDataStream, "user data")

View File

@ -418,18 +418,6 @@ func (a *Account) UpdateBalances(balances BalanceMap) {
}
}
func printBalanceUpdate(balances BalanceMap) {
logrus.Infof("balance update: %+v", balances)
}
func (a *Account) BindStream(stream Stream) {
stream.OnBalanceUpdate(a.UpdateBalances)
stream.OnBalanceSnapshot(a.UpdateBalances)
if debugBalance {
stream.OnBalanceUpdate(printBalanceUpdate)
}
}
func (a *Account) Print() {
a.Lock()
defer a.Unlock()