bbgo: when calling order cancel we should use background context

This commit is contained in:
c9s 2022-01-15 00:49:27 +08:00
parent 77e92d544a
commit 255ee40c98
4 changed files with 30 additions and 14 deletions

View File

@ -76,7 +76,9 @@ func (b *LocalActiveOrderBook) GracefulCancel(ctx context.Context, ex types.Exch
// If we cancel these orders directly, we will get an unsent order error // If we cancel these orders directly, we will get an unsent order error
// We wait here for a while for server to create these orders. // We wait here for a while for server to create these orders.
time.Sleep(SentOrderWaitTime) time.Sleep(SentOrderWaitTime)
if err := ex.CancelOrders(ctx, orders...); err != nil {
// since ctx might be canceled, we should use background context here
if err := ex.CancelOrders(context.Background(), orders...); err != nil {
log.WithError(err).Errorf("[LocalActiveOrderBook] can not cancel %s orders", b.Symbol) log.WithError(err).Errorf("[LocalActiveOrderBook] can not cancel %s orders", b.Symbol)
} }

View File

@ -246,6 +246,7 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
return return
} }
// Start initializes the symbols data streams
func (environ *Environment) Start(ctx context.Context) (err error) { func (environ *Environment) Start(ctx context.Context) (err error) {
for n := range environ.sessions { for n := range environ.sessions {
var session = environ.sessions[n] var session = environ.sessions[n]
@ -439,6 +440,11 @@ func (environ *Environment) SetSyncStartTime(t time.Time) *Environment {
} }
func (environ *Environment) Connect(ctx context.Context) error { func (environ *Environment) Connect(ctx context.Context) error {
log.Debugf("starting interaction...")
if err := interact.Start(ctx); err != nil {
return err
}
for n := range environ.sessions { for n := range environ.sessions {
// avoid using the placeholder variable for the session because we use that in the callbacks // avoid using the placeholder variable for the session because we use that in the callbacks
var session = environ.sessions[n] var session = environ.sessions[n]
@ -561,7 +567,7 @@ func (environ *Environment) ConfigureNotificationSystem(userConfig *Config) erro
var persistence = environ.PersistenceServiceFacade.Get() var persistence = environ.PersistenceServiceFacade.Get()
err := environ.setupInteractionAuth(persistence) err := environ.setupInteraction(persistence)
if err != nil { if err != nil {
return err return err
} }
@ -586,19 +592,14 @@ func (environ *Environment) ConfigureNotificationSystem(userConfig *Config) erro
} }
} }
// TODO: replace this background context later
if err := interact.Start(context.Background()); err != nil {
return err
}
return nil return nil
} }
func (environ *Environment) setupInteractionAuth(persistence service.PersistenceService) error { func (environ *Environment) setupInteraction(persistence service.PersistenceService) error {
var otpQRCodeImagePath = fmt.Sprintf("otp.png") var otpQRCodeImagePath = fmt.Sprintf("otp.png")
var key *otp.Key var key *otp.Key
var keySecret string var keySecret string
var authStore = persistence.NewStore("bbgo", "auth") var authStore = environ.getAuthStore(persistence)
if err := authStore.Load(&keySecret); err != nil { if err := authStore.Load(&keySecret); err != nil {
log.Warnf("telegram session not found, generating new one-time password key for new telegram session...") log.Warnf("telegram session not found, generating new one-time password key for new telegram session...")
@ -651,6 +652,10 @@ func (environ *Environment) setupInteractionAuth(persistence service.Persistence
return nil return nil
} }
func (environ *Environment) getAuthStore(persistence service.PersistenceService) service.Store {
return persistence.NewStore("bbgo", "auth")
}
func (environ *Environment) setupSlack(userConfig *Config, slackToken string) { func (environ *Environment) setupSlack(userConfig *Config, slackToken string) {
if conf := userConfig.Notifications.Slack; conf != nil { if conf := userConfig.Notifications.Slack; conf != nil {
if conf.ErrorChannel != "" { if conf.ErrorChannel != "" {

View File

@ -159,12 +159,21 @@ func runConfig(basectx context.Context, userConfig *bbgo.Config, enableWebServer
} }
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
cancelTrading()
log.Infof("shutting down stratgies...") log.Infof("shutting down...")
shutdownCtx, cancelShutdown := context.WithDeadline(ctx, time.Now().Add(30*time.Second)) shutdownCtx, cancelShutdown := context.WithDeadline(ctx, time.Now().Add(30*time.Second))
trader.Graceful.Shutdown(shutdownCtx) trader.Graceful.Shutdown(shutdownCtx)
cancelShutdown() cancelShutdown()
cancelTrading()
for _, session := range environ.Sessions() {
if err := session.MarketDataStream.Close(); err != nil {
log.WithError(err).Errorf("[%s] market data stream close error", session.Name)
}
if err := session.UserDataStream.Close(); err != nil {
log.WithError(err).Errorf("[%s] user data stream close error", session.Name)
}
}
return nil return nil
} }

View File

@ -146,8 +146,6 @@ func (s *StandardStream) SetConn(ctx context.Context, conn *websocket.Conn) (con
func (s *StandardStream) Read(ctx context.Context, conn *websocket.Conn, cancel context.CancelFunc) { func (s *StandardStream) Read(ctx context.Context, conn *websocket.Conn, cancel context.CancelFunc) {
defer func() { defer func() {
cancel() cancel()
// if we failed to read, we need to cancel the context
_ = conn.Close()
s.EmitDisconnect() s.EmitDisconnect()
}() }()
@ -360,7 +358,7 @@ func (s *StandardStream) Dial(ctx context.Context, args ...string) (*websocket.C
} }
func (s *StandardStream) Close() error { func (s *StandardStream) Close() error {
log.Infof("closing stream...") log.Debugf("closing stream...")
// close the close signal channel, so that reader and ping worker will stop // close the close signal channel, so that reader and ping worker will stop
close(s.CloseC) close(s.CloseC)
@ -382,6 +380,8 @@ func (s *StandardStream) Close() error {
return errors.Wrap(err, "websocket write close message error") return errors.Wrap(err, "websocket write close message error")
} }
log.Debugf("stream closed")
// let the reader close the connection // let the reader close the connection
<-time.After(time.Second) <-time.After(time.Second)
return nil return nil