diff --git a/pkg/bbgo/active_book.go b/pkg/bbgo/active_book.go index ead041e1e..5bb78fdee 100644 --- a/pkg/bbgo/active_book.go +++ b/pkg/bbgo/active_book.go @@ -76,7 +76,9 @@ func (b *LocalActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exch // If we cancel these orders directly, we will get an unsent order error // We wait here for a while for server to create these orders. time.Sleep(SentOrderWaitTime) - if err := ex.CancelOrders(ctx, orders...); err != nil { + + // since ctx might be canceled, we should use background context here + if err := ex.CancelOrders(context.Background(), orders...); err != nil { log.WithError(err).Errorf("[LocalActiveOrderBook] can not cancel %s orders", b.Symbol) } diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index e2e3965f3..087762c6b 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -246,6 +246,7 @@ func (environ *Environment) Init(ctx context.Context) (err error) { return } +// Start initializes the symbols data streams func (environ *Environment) Start(ctx context.Context) (err error) { for n := range environ.sessions { var session = environ.sessions[n] @@ -439,6 +440,11 @@ func (environ *Environment) SetSyncStartTime(t time.Time) *Environment { } func (environ *Environment) Connect(ctx context.Context) error { + log.Debugf("starting interaction...") + if err := interact.Start(ctx); err != nil { + return err + } + for n := range environ.sessions { // avoid using the placeholder variable for the session because we use that in the callbacks var session = environ.sessions[n] @@ -561,7 +567,7 @@ func (environ *Environment) ConfigureNotificationSystem(userConfig *Config) erro var persistence = environ.PersistenceServiceFacade.Get() - err := environ.setupInteractionAuth(persistence) + err := environ.setupInteraction(persistence) if err != nil { return err } @@ -586,19 +592,14 @@ func (environ *Environment) ConfigureNotificationSystem(userConfig *Config) erro } } - // TODO: replace this background context later - if err := interact.Start(context.Background()); err != nil { - return err - } - return nil } -func (environ *Environment) setupInteractionAuth(persistence service.PersistenceService) error { +func (environ *Environment) setupInteraction(persistence service.PersistenceService) error { var otpQRCodeImagePath = fmt.Sprintf("otp.png") var key *otp.Key var keySecret string - var authStore = persistence.NewStore("bbgo", "auth") + var authStore = environ.getAuthStore(persistence) if err := authStore.Load(&keySecret); err != nil { log.Warnf("telegram session not found, generating new one-time password key for new telegram session...") @@ -651,6 +652,10 @@ func (environ *Environment) setupInteractionAuth(persistence service.Persistence return nil } +func (environ *Environment) getAuthStore(persistence service.PersistenceService) service.Store { + return persistence.NewStore("bbgo", "auth") +} + func (environ *Environment) setupSlack(userConfig *Config, slackToken string) { if conf := userConfig.Notifications.Slack; conf != nil { if conf.ErrorChannel != "" { diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index d03590a35..0e3f37937 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -159,12 +159,21 @@ func runConfig(basectx context.Context, userConfig *bbgo.Config, enableWebServer } cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) + cancelTrading() - log.Infof("shutting down stratgies...") + log.Infof("shutting down...") shutdownCtx, cancelShutdown := context.WithDeadline(ctx, time.Now().Add(30*time.Second)) trader.Graceful.Shutdown(shutdownCtx) cancelShutdown() - cancelTrading() + + for _, session := range environ.Sessions() { + if err := session.MarketDataStream.Close(); err != nil { + log.WithError(err).Errorf("[%s] market data stream close error", session.Name) + } + if err := session.UserDataStream.Close(); err != nil { + log.WithError(err).Errorf("[%s] user data stream close error", session.Name) + } + } return nil } diff --git a/pkg/types/stream.go b/pkg/types/stream.go index e89c39372..c31ab510b 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -146,8 +146,6 @@ func (s *StandardStream) SetConn(ctx context.Context, conn *websocket.Conn) (con func (s *StandardStream) Read(ctx context.Context, conn *websocket.Conn, cancel context.CancelFunc) { defer func() { cancel() - // if we failed to read, we need to cancel the context - _ = conn.Close() s.EmitDisconnect() }() @@ -360,7 +358,7 @@ func (s *StandardStream) Dial(ctx context.Context, args ...string) (*websocket.C } func (s *StandardStream) Close() error { - log.Infof("closing stream...") + log.Debugf("closing stream...") // close the close signal channel, so that reader and ping worker will stop close(s.CloseC) @@ -382,6 +380,8 @@ func (s *StandardStream) Close() error { return errors.Wrap(err, "websocket write close message error") } + log.Debugf("stream closed") + // let the reader close the connection <-time.After(time.Second) return nil