diff --git a/pkg/bbgo/order_execution.go b/pkg/bbgo/order_execution.go index fed6ae51e..1c99e92e2 100644 --- a/pkg/bbgo/order_execution.go +++ b/pkg/bbgo/order_execution.go @@ -48,7 +48,9 @@ type ExchangeOrderExecutionRouter struct { executors map[string]OrderExecutor } -func (e *ExchangeOrderExecutionRouter) SubmitOrdersTo(ctx context.Context, session string, orders ...types.SubmitOrder) (types.OrderSlice, error) { +func (e *ExchangeOrderExecutionRouter) SubmitOrdersTo( + ctx context.Context, session string, orders ...types.SubmitOrder, +) (types.OrderSlice, error) { if executor, ok := e.executors[session]; ok { return executor.SubmitOrders(ctx, orders...) } @@ -80,6 +82,7 @@ func (e *ExchangeOrderExecutionRouter) CancelOrdersTo(ctx context.Context, sessi } // ExchangeOrderExecutor is an order executor wrapper for single exchange instance. +// Deprecated: please use GeneralOrderExecutor instead. // //go:generate callbackgen -type ExchangeOrderExecutor type ExchangeOrderExecutor struct { @@ -128,7 +131,9 @@ type BasicRiskController struct { // 1. Increase the quantity by the minimal requirement // 2. Decrease the quantity by risk controls // 3. If the quantity does not meet minimal requirement, we should ignore the submit order. -func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ...types.SubmitOrder) (outOrders []types.SubmitOrder, errs []error) { +func (c *BasicRiskController) ProcessOrders( + session *ExchangeSession, orders ...types.SubmitOrder, +) (outOrders []types.SubmitOrder, errs []error) { balances := session.GetAccount().Balances() addError := func(err error) { @@ -310,7 +315,9 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ... type OrderCallback func(order types.Order) // BatchPlaceOrder -func BatchPlaceOrder(ctx context.Context, exchange types.Exchange, orderCallback OrderCallback, submitOrders ...types.SubmitOrder) (types.OrderSlice, []int, error) { +func BatchPlaceOrder( + ctx context.Context, exchange types.Exchange, orderCallback OrderCallback, submitOrders ...types.SubmitOrder, +) (types.OrderSlice, []int, error) { var createdOrders types.OrderSlice var err error @@ -335,7 +342,10 @@ func BatchPlaceOrder(ctx context.Context, exchange types.Exchange, orderCallback } // BatchRetryPlaceOrder places the orders and retries the failed orders -func BatchRetryPlaceOrder(ctx context.Context, exchange types.Exchange, errIdx []int, orderCallback OrderCallback, logger log.FieldLogger, submitOrders ...types.SubmitOrder) (types.OrderSlice, []int, error) { +func BatchRetryPlaceOrder( + ctx context.Context, exchange types.Exchange, errIdx []int, orderCallback OrderCallback, logger log.FieldLogger, + submitOrders ...types.SubmitOrder, +) (types.OrderSlice, []int, error) { if logger == nil { logger = log.StandardLogger() } diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index 6337f0980..dca4b1584 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -83,14 +83,29 @@ type ExchangeSession struct { // Deprecated: use GeneralOrderExecutor instead OrderExecutor *ExchangeOrderExecutor `json:"orderExecutor,omitempty" yaml:"orderExecutor,omitempty"` - // UserDataStream is the connection stream of the exchange - UserDataStream types.Stream `json:"-" yaml:"-"` + // UserDataStream is the user data connection stream of the exchange + // This stream is used for managing user data, such as orders, trades, balances, etc. + UserDataStream types.Stream `json:"-" yaml:"-"` + + // MarketDataStream is the market data connection stream of the exchange + // This stream is used for managing market data, such as klines, trades, order books, etc. MarketDataStream types.Stream `json:"-" yaml:"-"` - // Subscriptions - // this is a read-only field when running strategy + // UserDataConnectivity is the connectivity of the user data stream + UserDataConnectivity *types.Connectivity `json:"-" yaml:"-"` + + // MarketDataConnectivity is the connectivity of the market data stream + MarketDataConnectivity *types.Connectivity `json:"-" yaml:"-"` + + // Connectivity is the group of connectivity of the session + // This is used for managing both user data and market data connectivity + Connectivity *types.ConnectivityGroup `json:"-" yaml:"-"` + + // Subscriptions is the subscription list of the session + // This is a read-only field when running strategy Subscriptions map[types.Subscription]types.Subscription `json:"-" yaml:"-"` + // Exchange is the exchange instance, it is used for querying the exchange data or submitting orders Exchange types.Exchange `json:"-" yaml:"-"` UseHeikinAshi bool `json:"heikinAshi,omitempty" yaml:"heikinAshi,omitempty"` @@ -130,17 +145,33 @@ type ExchangeSession struct { func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession { userDataStream := exchange.NewStream() + marketDataStream := exchange.NewStream() marketDataStream.SetPublicOnly() + userDataConnectivity := types.NewConnectivity() + userDataConnectivity.Bind(userDataStream) + + marketDataConnectivity := types.NewConnectivity() + marketDataConnectivity.Bind(marketDataStream) + + connectivityGroup := types.NewConnectivityGroup(marketDataConnectivity, userDataConnectivity) + session := &ExchangeSession{ - Name: name, - Exchange: exchange, - UserDataStream: userDataStream, - MarketDataStream: marketDataStream, - Subscriptions: make(map[types.Subscription]types.Subscription), - Account: &types.Account{}, - Trades: make(map[string]*types.TradeSlice), + Name: name, + Exchange: exchange, + + UserDataStream: userDataStream, + UserDataConnectivity: userDataConnectivity, + + MarketDataStream: marketDataStream, + MarketDataConnectivity: marketDataConnectivity, + + Connectivity: connectivityGroup, + + Subscriptions: make(map[types.Subscription]types.Subscription), + Account: &types.Account{}, + Trades: make(map[string]*types.TradeSlice), orderBooks: make(map[string]*types.StreamOrderBook), markets: make(map[string]types.Market), @@ -851,6 +882,14 @@ func (session *ExchangeSession) InitExchange(name string, ex types.Exchange) err session.MarketDataStream = ex.NewStream() session.MarketDataStream.SetPublicOnly() + session.UserDataConnectivity = types.NewConnectivity() + session.UserDataConnectivity.Bind(session.UserDataStream) + + session.MarketDataConnectivity = types.NewConnectivity() + session.MarketDataConnectivity.Bind(session.MarketDataStream) + + session.Connectivity = types.NewConnectivityGroup(session.MarketDataConnectivity, session.MarketDataConnectivity) + // pointer fields session.Subscriptions = make(map[types.Subscription]types.Subscription) session.Account = &types.Account{} diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index be9106efa..b2ef73f76 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -234,7 +234,6 @@ type Strategy struct { metricsLabels prometheus.Labels sourceMarketDataConnectivity, sourceUserDataConnectivity *types.Connectivity - makerMarketDataConnectivity, makerUserDataConnectivity *types.Connectivity connectivityGroup *types.ConnectivityGroup // lastAggregatedSignal stores the last aggregated signal with mutex @@ -591,7 +590,13 @@ func (s *Strategy) updateQuote(ctx context.Context) error { return nil } - if !s.sourceMarketDataConnectivity.IsConnected() || !s.sourceUserDataConnectivity.IsConnected() { + if !s.sourceSession.Connectivity.IsConnected() { + s.logger.Warnf("source session is disconnected, skipping update quote") + return nil + } + + if !s.makerSession.Connectivity.IsConnected() { + s.logger.Warnf("maker session is disconnected, skipping update quote") return nil } @@ -1846,16 +1851,10 @@ func (s *Strategy) CrossRun( s.stopC = make(chan struct{}) - s.sourceUserDataConnectivity = types.NewConnectivity() - s.sourceUserDataConnectivity.Bind(s.sourceSession.UserDataStream) + s.sourceUserDataConnectivity = s.sourceSession.UserDataConnectivity + s.sourceMarketDataConnectivity = s.sourceSession.MarketDataConnectivity - s.makerUserDataConnectivity = types.NewConnectivity() - s.makerUserDataConnectivity.Bind(s.makerSession.UserDataStream) - - s.sourceMarketDataConnectivity = types.NewConnectivity() - s.sourceMarketDataConnectivity.Bind(s.sourceSession.MarketDataStream) - - s.connectivityGroup = types.NewConnectivityGroup(s.sourceUserDataConnectivity, s.makerUserDataConnectivity) + s.connectivityGroup = types.NewConnectivityGroup(s.sourceSession.UserDataConnectivity, s.makerSession.UserDataConnectivity) go func() { s.logger.Infof("waiting for authentication connections to be ready...") diff --git a/pkg/types/connectivitygroup.go b/pkg/types/connectivitygroup.go index 7cc177381..4761c1606 100644 --- a/pkg/types/connectivitygroup.go +++ b/pkg/types/connectivitygroup.go @@ -173,28 +173,7 @@ func (g *ConnectivityGroup) Add(con *Connectivity) { }) } -func (g *ConnectivityGroup) AnyDisconnected(ctx context.Context) bool { - g.mu.Lock() - conns := g.connections - g.mu.Unlock() - - for _, conn := range conns { - select { - case <-ctx.Done(): - return false - - case <-conn.connectedC: - continue - - case <-conn.disconnectedC: - return true - } - } - - return false -} - -func (g *ConnectivityGroup) waitAllAuthed(ctx context.Context, c chan struct{}) { +func (g *ConnectivityGroup) waitForState(ctx context.Context, c chan struct{}, expected ConnectivityState) { for { select { case <-ctx.Done(): @@ -202,7 +181,7 @@ func (g *ConnectivityGroup) waitAllAuthed(ctx context.Context, c chan struct{}) default: state := g.GetState() - if state == ConnectivityStateAuthed { + if state == expected { close(c) return } @@ -217,6 +196,6 @@ func (g *ConnectivityGroup) waitAllAuthed(ctx context.Context, c chan struct{}) // and the channel can only be used once (because we can't close a channel twice) func (g *ConnectivityGroup) AllAuthedC(ctx context.Context) <-chan struct{} { c := make(chan struct{}) - go g.waitAllAuthed(ctx, c) + go g.waitForState(ctx, c, ConnectivityStateAuthed) return c }