diff --git a/pkg/backtest/exchange.go b/pkg/backtest/exchange.go index f5dfac234..e6708932d 100644 --- a/pkg/backtest/exchange.go +++ b/pkg/backtest/exchange.go @@ -151,6 +151,9 @@ func (e *Exchange) NewStream() types.Stream { } func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) { + if e.userDataStream == nil { + return createdOrders, fmt.Errorf("SubmitOrders should be called after userDataStream been initialized") + } for _, order := range orders { symbol := order.Symbol matching, ok := e.matchingBook(symbol) @@ -198,6 +201,9 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, } func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) error { + if e.userDataStream == nil { + return fmt.Errorf("CancelOrders should be called after userDataStream been initialized") + } for _, order := range orders { matching, ok := e.matchingBook(order.Symbol) if !ok { diff --git a/pkg/strategy/flashcrash/strategy.go b/pkg/strategy/flashcrash/strategy.go index bfc797ae9..dce180f84 100644 --- a/pkg/strategy/flashcrash/strategy.go +++ b/pkg/strategy/flashcrash/strategy.go @@ -129,11 +129,13 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se Window: 25, }) + session.UserDataStream.OnStart(func() { + s.updateOrders(orderExecutor, session) + }) + session.MarketDataStream.OnKLineClosed(func(kline types.KLine) { s.updateOrders(orderExecutor, session) }) - // TODO: move this to the stream onConnect handler - s.updateOrders(orderExecutor, session) return nil }