diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index 02e255d30..c7411262b 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -595,6 +595,21 @@ func (environ *Environment) ConfigureNotificationSystem(userConfig *Config) erro return nil } +func getAuthStoreID() string { + telegramBotToken := viper.GetString("telegram-bot-token") + if len(telegramBotToken) > 0 { + tt := strings.Split(telegramBotToken, ":") + return tt[0] + } + + userEnv := os.Getenv("USER") + if userEnv != "" { + return userEnv + } + + return "default" +} + func (environ *Environment) setupInteraction(persistence service.PersistenceService) error { var otpQRCodeImagePath = fmt.Sprintf("otp.png") var key *otp.Key @@ -653,7 +668,8 @@ func (environ *Environment) setupInteraction(persistence service.PersistenceServ } func (environ *Environment) getAuthStore(persistence service.PersistenceService) service.Store { - return persistence.NewStore("bbgo", "auth") + id := getAuthStoreID() + return persistence.NewStore("bbgo", "auth", id) } func (environ *Environment) setupSlack(userConfig *Config, slackToken string) { @@ -701,33 +717,25 @@ func (environ *Environment) setupTelegram(userConfig *Config, telegramBotToken s Private: true, } - var session = interact.NewTelegramSession() + var sessions = interact.TelegramSessionMap{} var sessionStore = persistence.NewStore("bbgo", "telegram", telegramID) - if err := sessionStore.Load(session); err != nil { - log.WithError(err).Errorf("session load error") + if err := sessionStore.Load(sessions); err != nil { - if err := sessionStore.Save(session); err != nil { - return errors.Wrap(err, "failed to save session") - } } else { - notifier.OwnerChat = session.OwnerChat - notifier.Owner = session.Owner - notifier.Subscribers = session.Subscribers + for _, session := range sessions { + if session.IsAuthorized() { + notifier.OwnerChat = session.Chat + notifier.Owner = session.User + } + } // you must restore the session after the notifier updates - messenger.RestoreSession(session) - - // right now it's only for telegram, should we share the session (?) - interact.Default().SetOriginState(interact.StateAuthenticated) - interact.Default().SetState(interact.StateAuthenticated) + messenger.RestoreSessions(sessions) } - messenger.OnAuthorized(func(a *interact.TelegramAuthorizer) { - session.Owner = a.Telegram.Owner - session.OwnerChat = a.Telegram.OwnerChat - - log.Infof("saving telegram session...") - if err := sessionStore.Save(session); err != nil { + messenger.OnAuthorized(func(userSession *interact.TelegramSession) { + log.Infof("saving telegram sessions...") + if err := sessionStore.Save(messenger.Sessions()); err != nil { log.WithError(err).Errorf("telegram session save error") } }) diff --git a/pkg/bbgo/interact.go b/pkg/bbgo/interact.go index ec0327085..9d2259d12 100644 --- a/pkg/bbgo/interact.go +++ b/pkg/bbgo/interact.go @@ -43,7 +43,40 @@ func NewCoreInteraction(environment *Environment, trader *Trader) *CoreInteracti } func (it *CoreInteraction) Commands(i *interact.Interact) { - i.PrivateCommand("/position", "show the current position of a strategy", func(reply interact.Reply) error { + i.PrivateCommand("/sessions", "List Exchange Sessions", func(reply interact.Reply) error { + message := "Your connected sessions:\n" + for name, session := range it.environment.Sessions() { + message += "- " + name + " (" + session.ExchangeName.String() + ")\n" + } + + reply.Message(message) + return nil + }) + + i.PrivateCommand("/balances", "Show balances", func(reply interact.Reply) error { + reply.Message("Please select an exchange session") + for name := range it.environment.Sessions() { + reply.AddButton(name) + } + return nil + }).Next(func(sessionName string, reply interact.Reply) error { + session, ok := it.environment.Session(sessionName) + if !ok { + reply.Message(fmt.Sprintf("Session %s not found", sessionName)) + return fmt.Errorf("session %s not found", sessionName) + } + + message := "Your balances\n" + balances := session.Account.Balances() + for _, balance := range balances { + message += "- " + balance.String() + "\n" + } + + reply.Message(message) + return nil + }) + + i.PrivateCommand("/position", "Show Position", func(reply interact.Reply) error { // it.trader.exchangeStrategies // send symbol options found := false @@ -88,7 +121,7 @@ func (it *CoreInteraction) Commands(i *interact.Interact) { return nil }) - i.PrivateCommand("/closeposition", "close the position of a strategy", func(reply interact.Reply) error { + i.PrivateCommand("/closeposition", "Close position", func(reply interact.Reply) error { // it.trader.exchangeStrategies // send symbol options found := false @@ -156,6 +189,7 @@ func (it *CoreInteraction) Commands(i *interact.Interact) { return err } + reply.Message("Done") return nil }) } diff --git a/pkg/interact/auth.go b/pkg/interact/auth.go index 823029c9b..f97592c9c 100644 --- a/pkg/interact/auth.go +++ b/pkg/interact/auth.go @@ -51,9 +51,9 @@ func (it *AuthInteract) Commands(interact *Interact) { it.OneTimePasswordKey = key } - interact.Command("/auth", "authorize", func(reply Reply, authorizer Authorizer) error { + interact.Command("/auth", "authorize", func(reply Reply, session Session) error { reply.Message("Enter your authentication token") - authorizer.StartAuthorizing() + session.SetAuthorizing(true) return nil }).Next(func(token string, reply Reply) error { if token == it.Token { @@ -71,11 +71,12 @@ func (it *AuthInteract) Commands(interact *Interact) { } return ErrAuthenticationFailed - }).NamedNext(StateAuthenticated, func(code string, reply Reply, authorizer Authorizer) error { + }).NamedNext(StateAuthenticated, func(code string, reply Reply, session Session) error { if totp.Validate(code, it.OneTimePasswordKey.Secret()) { reply.Message("Great! You're authenticated!") - interact.SetOriginState(StateAuthenticated) - return authorizer.Authorize() + session.SetOriginState(StateAuthenticated) + session.SetAuthorized() + return nil } reply.Message("Incorrect authentication code") @@ -85,20 +86,23 @@ func (it *AuthInteract) Commands(interact *Interact) { interact.Command("/auth", "authorize", func(reply Reply) error { reply.Message("Enter your authentication code") return nil - }).NamedNext(StateAuthenticated, func(code string, reply Reply, authorizer Authorizer) error { + }).NamedNext(StateAuthenticated, func(code string, reply Reply, session Session) error { switch it.Mode { case AuthModeToken: if code == it.Token { reply.Message("Great! You're authenticated!") - interact.SetOriginState(StateAuthenticated) - return authorizer.Authorize() + session.SetOriginState(StateAuthenticated) + session.SetAuthorized() + return nil } case AuthModeOTP: if totp.Validate(code, it.OneTimePasswordKey.Secret()) { reply.Message("Great! You're authenticated!") - interact.SetOriginState(StateAuthenticated) - return authorizer.Authorize() + session.SetOriginState(StateAuthenticated) + session.SetAuthorized() + session.SetAuthorized() + return nil } } diff --git a/pkg/interact/interact.go b/pkg/interact/interact.go index 5af8508f7..6822dd5a2 100644 --- a/pkg/interact/interact.go +++ b/pkg/interact/interact.go @@ -22,6 +22,17 @@ type Messenger interface { Start(ctx context.Context) } +type Session interface { + ID() string + SetOriginState(state State) + GetOriginState() State + SetState(state State) + GetState() State + IsAuthorized() bool + SetAuthorized() + SetAuthorizing(b bool) +} + // Interact implements the interaction between bot and message software. type Interact struct { startTime time.Time @@ -35,7 +46,7 @@ type Interact struct { states map[State]State statesFunc map[State]interface{} - originState, currentState State + authenticatedSessions map[string]Session customInteractions []CustomInteraction @@ -47,17 +58,11 @@ func New() *Interact { startTime: time.Now(), commands: make(map[string]*Command), privateCommands: make(map[string]*Command), - originState: StatePublic, - currentState: StatePublic, states: make(map[State]State), statesFunc: make(map[State]interface{}), } } -func (it *Interact) SetOriginState(s State) { - it.originState = s -} - func (it *Interact) AddCustomInteraction(custom CustomInteraction) { custom.Commands(it) it.customInteractions = append(it.customInteractions, custom) @@ -75,7 +80,7 @@ func (it *Interact) Command(command string, desc string, f interface{}) *Command return cmd } -func (it *Interact) getNextState(currentState State) (nextState State, final bool) { +func (it *Interact) getNextState(session Session, currentState State) (nextState State, final bool) { var ok bool final = false nextState, ok = it.states[currentState] @@ -89,17 +94,12 @@ func (it *Interact) getNextState(currentState State) (nextState State, final boo } // state not found, return to the origin state - return it.originState, final + return session.GetOriginState(), final } -func (it *Interact) SetState(s State) { - log.Infof("[interact] transiting state from %s -> %s", it.currentState, s) - it.currentState = s -} - -func (it *Interact) handleResponse(text string, ctxObjects ...interface{}) error { - // we only need response when executing a command - switch it.currentState { +func (it *Interact) handleResponse(session Session, text string, ctxObjects ...interface{}) error { + // We only need response when executing a command + switch session.GetState() { case StatePublic, StateAuthenticated: return nil @@ -107,40 +107,40 @@ func (it *Interact) handleResponse(text string, ctxObjects ...interface{}) error args := parseCommand(text) - f, ok := it.statesFunc[it.currentState] + state := session.GetState() + f, ok := it.statesFunc[state] if !ok { - return fmt.Errorf("state function of %s is not defined", it.currentState) + return fmt.Errorf("state function of %s is not defined", state) } + ctxObjects = append(ctxObjects, session) _, err := parseFuncArgsAndCall(f, args, ctxObjects...) if err != nil { return err } - nextState, end := it.getNextState(it.currentState) + nextState, end := it.getNextState(session, state) if end { - it.SetState(it.originState) + session.SetState(session.GetOriginState()) return nil } - it.SetState(nextState) + session.SetState(nextState) return nil } -func (it *Interact) getCommand(command string) (*Command, error) { - switch it.currentState { - case StateAuthenticated: +func (it *Interact) getCommand(session Session, command string) (*Command, error) { + if session.IsAuthorized() { if cmd, ok := it.privateCommands[command]; ok { return cmd, nil } - - case StatePublic: + } else { if _, ok := it.privateCommands[command]; ok { - return nil, fmt.Errorf("private command can not be executed in the public mode") + return nil, fmt.Errorf("private command can not be executed in the public mode, type /auth to get authorized") } - } + // find any public command if cmd, ok := it.commands[command]; ok { return cmd, nil } @@ -148,32 +148,34 @@ func (it *Interact) getCommand(command string) (*Command, error) { return nil, fmt.Errorf("command %s not found", command) } -func (it *Interact) runCommand(command string, args []string, ctxObjects ...interface{}) error { - cmd, err := it.getCommand(command) +func (it *Interact) runCommand(session Session, command string, args []string, ctxObjects ...interface{}) error { + cmd, err := it.getCommand(session, command) if err != nil { return err } - it.SetState(cmd.initState) + ctxObjects = append(ctxObjects, session) + session.SetState(cmd.initState) if _, err := parseFuncArgsAndCall(cmd.F, args, ctxObjects...); err != nil { return err } // if we can successfully execute the command, then we can go to the next state. - nextState, end := it.getNextState(it.currentState) + state := session.GetState() + nextState, end := it.getNextState(session, state) if end { - it.SetState(it.originState) + session.SetState(session.GetOriginState()) return nil } - it.SetState(nextState) + session.SetState(nextState) return nil } func (it *Interact) SetMessenger(messenger Messenger) { // pass Responder function - messenger.SetTextMessageResponder(func(message string, reply Reply, ctxObjects ...interface{}) error { - return it.handleResponse(message, append(ctxObjects, reply)...) + messenger.SetTextMessageResponder(func(session Session, message string, reply Reply, ctxObjects ...interface{}) error { + return it.handleResponse(session, message, append(ctxObjects, reply)...) }) it.messenger = messenger } @@ -224,9 +226,9 @@ func (it *Interact) registerCommands(commands map[string]*Command) error { } commandName := n - it.messenger.AddCommand(cmd, func(message string, reply Reply, ctxObjects ...interface{}) error { + it.messenger.AddCommand(cmd, func(session Session, message string, reply Reply, ctxObjects ...interface{}) error { args := parseCommand(message) - return it.runCommand(commandName, args, append(ctxObjects, reply)...) + return it.runCommand(session, commandName, args, append(ctxObjects, reply)...) }) } return nil diff --git a/pkg/interact/responder.go b/pkg/interact/responder.go index f89dfd7a8..26c1b4950 100644 --- a/pkg/interact/responder.go +++ b/pkg/interact/responder.go @@ -1,7 +1,7 @@ package interact // Responder defines the logic of responding the message -type Responder func(message string, reply Reply, ctxObjects ...interface{}) error +type Responder func(session Session, message string, reply Reply, ctxObjects ...interface{}) error type TextMessageResponder interface { SetTextMessageResponder(responder Responder) diff --git a/pkg/interact/session.go b/pkg/interact/session.go new file mode 100644 index 000000000..ae8f01e5e --- /dev/null +++ b/pkg/interact/session.go @@ -0,0 +1,47 @@ +package interact + +import ( + "time" + + log "github.com/sirupsen/logrus" +) + +type BaseSession struct { + OriginState State `json:"originState,omitempty"` + CurrentState State `json:"currentState,omitempty"` + Authorized bool `json:"authorized,omitempty"` + StartedTime time.Time `json:"startedTime,omitempty"` + + // authorizing -- the user started authorizing himself/herself, do not ignore the message + authorizing bool +} + +func (s *BaseSession) SetOriginState(state State) { + s.OriginState = state +} + +func (s *BaseSession) GetOriginState() State { + return s.OriginState +} + +func (s *BaseSession) SetState(state State) { + log.Infof("[interact] transiting state from %s -> %s", s.CurrentState, state) + s.CurrentState = state +} + +func (s *BaseSession) GetState() State { + return s.CurrentState +} + +func (s *BaseSession) SetAuthorized() { + s.Authorized = true + s.authorizing = false +} + +func (s *BaseSession) IsAuthorized() bool { + return s.Authorized +} + +func (s *BaseSession) SetAuthorizing(b bool) { + s.authorizing = b +} diff --git a/pkg/interact/telegram.go b/pkg/interact/telegram.go index 42eaffb89..4dc1bb0bf 100644 --- a/pkg/interact/telegram.go +++ b/pkg/interact/telegram.go @@ -10,9 +10,44 @@ import ( "gopkg.in/tucnak/telebot.v2" ) +type TelegramSessionKey struct { + UserID, ChatID int64 +} + +type TelegramSessionMap map[TelegramSessionKey]*TelegramSession + +type TelegramSession struct { + BaseSession + + telegram *Telegram + + User *telebot.User `json:"user"` + Chat *telebot.Chat `json:"chat"` +} + +func (s *TelegramSession) ID() string { + return fmt.Sprintf("telegram-%d-%d", s.User.ID, s.Chat.ID) +} + +func NewTelegramSession(telegram *Telegram, message *telebot.Message) *TelegramSession { + return &TelegramSession{ + BaseSession: BaseSession{ + OriginState: StatePublic, + CurrentState: StatePublic, + Authorized: false, + authorizing: false, + + StartedTime: time.Now(), + }, + telegram: telegram, + User: message.Sender, + Chat: message.Chat, + } +} + type TelegramReply struct { - bot *telebot.Bot - chat *telebot.Chat + bot *telebot.Bot + session *TelegramSession message string menu *telebot.ReplyMarkup @@ -21,7 +56,7 @@ type TelegramReply struct { } func (r *TelegramReply) Send(message string) { - checkSendErr(r.bot.Send(r.chat, message)) + checkSendErr(r.bot.Send(r.session.Chat, message)) } func (r *TelegramReply) Message(message string) { @@ -51,24 +86,6 @@ func (r *TelegramReply) build() { r.menu.Reply(rows...) } -type TelegramAuthorizer struct { - Telegram *Telegram - Message *telebot.Message -} - -func (a *TelegramAuthorizer) Authorize() error { - a.Telegram.Owner = a.Message.Sender - a.Telegram.OwnerChat = a.Message.Chat - a.Telegram.authorizing = false - log.Infof("[interact][telegram] authorized owner %+v and chat %+v", a.Message.Sender, a.Message.Chat) - a.Telegram.EmitAuthorized(a) - return nil -} - -func (a *TelegramAuthorizer) StartAuthorizing() { - a.Telegram.authorizing = true -} - //go:generate callbackgen -type Telegram type Telegram struct { Bot *telebot.Bot `json:"-"` @@ -78,27 +95,14 @@ type Telegram struct { authorizing bool - // Owner is the authorized bot owner - // This field is exported in order to be stored in file - Owner *telebot.User `json:"owner,omitempty"` - - // OwnerChat is the chat of the authorized bot owner - // This field is exported in order to be stored in file - OwnerChat *telebot.Chat `json:"chat,omitempty"` + sessions map[TelegramSessionKey]*TelegramSession // textMessageResponder is used for interact to register its message handler textMessageResponder Responder commands []*Command - authorizedCallbacks []func(a *TelegramAuthorizer) -} - -func (tm *Telegram) newAuthorizer(message *telebot.Message) *TelegramAuthorizer { - return &TelegramAuthorizer{ - Telegram: tm, - Message: message, - } + authorizedCallbacks []func(s *TelegramSession) } func (tm *Telegram) SetTextMessageResponder(textMessageResponder Responder) { @@ -109,30 +113,24 @@ func (tm *Telegram) Start(context.Context) { tm.Bot.Handle(telebot.OnText, func(m *telebot.Message) { log.Infof("[telegram] onText: %+v", m) - if tm.Private && !tm.authorizing { - // ignore the message directly if it's not authorized yet - if tm.Owner == nil { + session := tm.loadSession(m) + if tm.Private { + if !session.authorizing && !session.Authorized { log.Warn("[telegram] telegram is set to private mode, skipping message") return - } else if tm.Owner != nil && tm.Owner.ID != m.Sender.ID { - log.Warnf("[telegram] telegram is set to private mode, owner does not match: %d != %d", tm.Owner.ID, m.Sender.ID) - return } } - authorizer := tm.newAuthorizer(m) - reply := tm.newReply(m) + reply := tm.newReply(session) if tm.textMessageResponder != nil { - if err := tm.textMessageResponder(m.Text, reply, authorizer); err != nil { + if err := tm.textMessageResponder(session, m.Text, reply); err != nil { log.WithError(err).Errorf("[telegram] response handling error") } } if reply.set { reply.build() - if _, err := tm.Bot.Send(m.Sender, reply.message, reply.menu); err != nil { - log.WithError(err).Errorf("[telegram] message send error") - } + checkSendErr(tm.Bot.Send(m.Sender, reply.message, reply.menu)) } }) @@ -160,12 +158,28 @@ func checkSendErr(m *telebot.Message, err error) { } } +func (tm *Telegram) loadSession(m *telebot.Message) *TelegramSession { + if tm.sessions == nil { + tm.sessions = make(map[TelegramSessionKey]*TelegramSession) + } + + key := TelegramSessionKey{UserID: m.Sender.ID, ChatID: m.Chat.ID} + session, ok := tm.sessions[key] + if ok { + return session + } + + session = NewTelegramSession(tm, m) + tm.sessions[key] = session + return session +} + func (tm *Telegram) AddCommand(cmd *Command, responder Responder) { tm.commands = append(tm.commands, cmd) tm.Bot.Handle(cmd.Name, func(m *telebot.Message) { - authorizer := tm.newAuthorizer(m) - reply := tm.newReply(m) - if err := responder(m.Payload, reply, authorizer); err != nil { + session := tm.loadSession(m) + reply := tm.newReply(session) + if err := responder(session, m.Payload, reply); err != nil { log.WithError(err).Errorf("[telegram] responder error") checkSendErr(tm.Bot.Send(m.Sender, fmt.Sprintf("error: %v", err))) return @@ -179,37 +193,30 @@ func (tm *Telegram) AddCommand(cmd *Command, responder Responder) { }) } -func (tm *Telegram) newReply(m *telebot.Message) *TelegramReply { +func (tm *Telegram) newReply(session *TelegramSession) *TelegramReply { return &TelegramReply{ - bot: tm.Bot, - chat: m.Chat, - menu: &telebot.ReplyMarkup{ResizeReplyKeyboard: true}, + bot: tm.Bot, + session: session, + menu: &telebot.ReplyMarkup{ResizeReplyKeyboard: true}, } } -func (tm *Telegram) RestoreSession(session *TelegramSession) { - log.Infof("[telegram] restoring telegram session: %+v", session) - if session.OwnerChat != nil { - tm.OwnerChat = session.OwnerChat - tm.Owner = session.Owner - if _, err := tm.Bot.Send(tm.OwnerChat, fmt.Sprintf("Hi %s, I'm back. Your telegram session is restored.", tm.Owner.Username)); err != nil { - log.WithError(err).Error("[telegram] can not send telegram message") +func (tm *Telegram) Sessions() TelegramSessionMap { + return tm.sessions +} + +func (tm *Telegram) RestoreSessions(sessions TelegramSessionMap) { + if len(sessions) == 0 { + return + } + + log.Infof("[telegram] restoring telegram %d sessions", len(sessions)) + tm.sessions = sessions + for _, session := range sessions { + if session.IsAuthorized() { + if _, err := tm.Bot.Send(session.Chat, fmt.Sprintf("Hi %s, I'm back. Your telegram session is restored.", session.User.Username)); err != nil { + log.WithError(err).Error("[telegram] can not send telegram message") + } } } } - -type TelegramSession struct { - Owner *telebot.User `json:"owner"` - OwnerChat *telebot.Chat `json:"chat"` - - // Subscribers stores the Chat objects - Subscribers map[int64]time.Time `json:"chats"` -} - -func NewTelegramSession() *TelegramSession { - return &TelegramSession{ - Owner: nil, - OwnerChat: nil, - Subscribers: make(map[int64]time.Time), - } -} diff --git a/pkg/interact/telegram_callbacks.go b/pkg/interact/telegram_callbacks.go index 636c75564..bc3c15bcf 100644 --- a/pkg/interact/telegram_callbacks.go +++ b/pkg/interact/telegram_callbacks.go @@ -4,12 +4,12 @@ package interact import () -func (tm *Telegram) OnAuthorized(cb func(a *TelegramAuthorizer)) { +func (tm *Telegram) OnAuthorized(cb func(s *TelegramSession)) { tm.authorizedCallbacks = append(tm.authorizedCallbacks, cb) } -func (tm *Telegram) EmitAuthorized(a *TelegramAuthorizer) { +func (tm *Telegram) EmitAuthorized(s *TelegramSession) { for _, cb := range tm.authorizedCallbacks { - cb(a) + cb(s) } }