diff --git a/pkg/bbgo/exchangeorderexecutor_callbacks.go b/pkg/bbgo/exchangeorderexecutor_callbacks.go new file mode 100644 index 000000000..dd253eb05 --- /dev/null +++ b/pkg/bbgo/exchangeorderexecutor_callbacks.go @@ -0,0 +1,27 @@ +// Code generated by "callbackgen -type ExchangeOrderExecutor"; DO NOT EDIT. + +package bbgo + +import ( + "github.com/c9s/bbgo/pkg/types" +) + +func (e *ExchangeOrderExecutor) OnTradeUpdate(cb func(trade types.Trade)) { + e.tradeUpdateCallbacks = append(e.tradeUpdateCallbacks, cb) +} + +func (e *ExchangeOrderExecutor) EmitTradeUpdate(trade types.Trade) { + for _, cb := range e.tradeUpdateCallbacks { + cb(trade) + } +} + +func (e *ExchangeOrderExecutor) OnOrderUpdate(cb func(order types.Order)) { + e.orderUpdateCallbacks = append(e.orderUpdateCallbacks, cb) +} + +func (e *ExchangeOrderExecutor) EmitOrderUpdate(order types.Order) { + for _, cb := range e.orderUpdateCallbacks { + cb(order) + } +} diff --git a/pkg/bbgo/order_execution.go b/pkg/bbgo/order_execution.go index ce0fceb72..1ab856b6a 100644 --- a/pkg/bbgo/order_execution.go +++ b/pkg/bbgo/order_execution.go @@ -33,10 +33,17 @@ func (e *ExchangeOrderExecutionRouter) SubmitOrdersTo(ctx context.Context, sessi } // ExchangeOrderExecutor is an order executor wrapper for single exchange instance. +//go:generate callbackgen -type ExchangeOrderExecutor type ExchangeOrderExecutor struct { Notifiability `json:"-"` Session *ExchangeSession + + // private trade update callbacks + tradeUpdateCallbacks []func(trade types.Trade) + + // private order update callbacks + orderUpdateCallbacks []func(order types.Order) } func (e *ExchangeOrderExecutor) notifySubmitOrders(orders ...types.SubmitOrder) { diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index 386c0fcdf..179127b94 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -149,6 +149,11 @@ func (trader *Trader) Run(ctx context.Context) error { Session: session, } + // forward trade updates and order updates to the order executor + session.Stream.OnTradeUpdate(baseOrderExecutor.EmitTradeUpdate) + session.Stream.OnOrderUpdate(baseOrderExecutor.EmitOrderUpdate) + + // default to base order executor var orderExecutor OrderExecutor = baseOrderExecutor @@ -252,7 +257,7 @@ func (trader *Trader) Run(ctx context.Context) error { StoreID: "default", Type: "memory", }, - Facade: trader.environment.PersistenceServiceFacade, + Facade: trader.environment.PersistenceServiceFacade, })) } else { elem := field.Elem() @@ -300,6 +305,9 @@ func (trader *Trader) ReportPnL() *PnLReporterManager { type OrderExecutor interface { SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) + + OnTradeUpdate(cb func(trade types.Trade)) + OnOrderUpdate(cb func(order types.Order)) } type OrderExecutionRouter interface { diff --git a/pkg/strategy/movingstop/strategy.go b/pkg/strategy/movingstop/strategy.go index 6e2441c29..e2c9cb707 100644 --- a/pkg/strategy/movingstop/strategy.go +++ b/pkg/strategy/movingstop/strategy.go @@ -71,10 +71,14 @@ type Strategy struct { order types.Order } +func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { + session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval.String()}) + session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.MovingAverageInterval.String()}) +} + func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) { sourceSession := sessions[s.SourceExchangeName] - sourceSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval.String()}) - sourceSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.MovingAverageInterval.String()}) + s.Subscribe(sourceSession) // make sure we have the connection alive targetSession := sessions[s.TargetExchangeName] @@ -170,6 +174,10 @@ func (s *Strategy) handleOrderUpdate(order types.Order) { } } +func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { + return nil +} + func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error { // source session sourceSession := sessions[s.SourceExchangeName]