interact: separate telegram user sessions

This commit is contained in:
c9s 2022-01-16 00:25:11 +08:00
parent 8a3f1c4dba
commit 2088234b44
8 changed files with 256 additions and 154 deletions

View File

@ -595,6 +595,21 @@ func (environ *Environment) ConfigureNotificationSystem(userConfig *Config) erro
return nil
}
func getAuthStoreID() string {
telegramBotToken := viper.GetString("telegram-bot-token")
if len(telegramBotToken) > 0 {
tt := strings.Split(telegramBotToken, ":")
return tt[0]
}
userEnv := os.Getenv("USER")
if userEnv != "" {
return userEnv
}
return "default"
}
func (environ *Environment) setupInteraction(persistence service.PersistenceService) error {
var otpQRCodeImagePath = fmt.Sprintf("otp.png")
var key *otp.Key
@ -653,7 +668,8 @@ func (environ *Environment) setupInteraction(persistence service.PersistenceServ
}
func (environ *Environment) getAuthStore(persistence service.PersistenceService) service.Store {
return persistence.NewStore("bbgo", "auth")
id := getAuthStoreID()
return persistence.NewStore("bbgo", "auth", id)
}
func (environ *Environment) setupSlack(userConfig *Config, slackToken string) {
@ -701,33 +717,25 @@ func (environ *Environment) setupTelegram(userConfig *Config, telegramBotToken s
Private: true,
}
var session = interact.NewTelegramSession()
var sessions = interact.TelegramSessionMap{}
var sessionStore = persistence.NewStore("bbgo", "telegram", telegramID)
if err := sessionStore.Load(session); err != nil {
log.WithError(err).Errorf("session load error")
if err := sessionStore.Load(sessions); err != nil {
if err := sessionStore.Save(session); err != nil {
return errors.Wrap(err, "failed to save session")
}
} else {
notifier.OwnerChat = session.OwnerChat
notifier.Owner = session.Owner
notifier.Subscribers = session.Subscribers
for _, session := range sessions {
if session.IsAuthorized() {
notifier.OwnerChat = session.Chat
notifier.Owner = session.User
}
}
// you must restore the session after the notifier updates
messenger.RestoreSession(session)
// right now it's only for telegram, should we share the session (?)
interact.Default().SetOriginState(interact.StateAuthenticated)
interact.Default().SetState(interact.StateAuthenticated)
messenger.RestoreSessions(sessions)
}
messenger.OnAuthorized(func(a *interact.TelegramAuthorizer) {
session.Owner = a.Telegram.Owner
session.OwnerChat = a.Telegram.OwnerChat
log.Infof("saving telegram session...")
if err := sessionStore.Save(session); err != nil {
messenger.OnAuthorized(func(userSession *interact.TelegramSession) {
log.Infof("saving telegram sessions...")
if err := sessionStore.Save(messenger.Sessions()); err != nil {
log.WithError(err).Errorf("telegram session save error")
}
})

View File

@ -43,7 +43,40 @@ func NewCoreInteraction(environment *Environment, trader *Trader) *CoreInteracti
}
func (it *CoreInteraction) Commands(i *interact.Interact) {
i.PrivateCommand("/position", "show the current position of a strategy", func(reply interact.Reply) error {
i.PrivateCommand("/sessions", "List Exchange Sessions", func(reply interact.Reply) error {
message := "Your connected sessions:\n"
for name, session := range it.environment.Sessions() {
message += "- " + name + " (" + session.ExchangeName.String() + ")\n"
}
reply.Message(message)
return nil
})
i.PrivateCommand("/balances", "Show balances", func(reply interact.Reply) error {
reply.Message("Please select an exchange session")
for name := range it.environment.Sessions() {
reply.AddButton(name)
}
return nil
}).Next(func(sessionName string, reply interact.Reply) error {
session, ok := it.environment.Session(sessionName)
if !ok {
reply.Message(fmt.Sprintf("Session %s not found", sessionName))
return fmt.Errorf("session %s not found", sessionName)
}
message := "Your balances\n"
balances := session.Account.Balances()
for _, balance := range balances {
message += "- " + balance.String() + "\n"
}
reply.Message(message)
return nil
})
i.PrivateCommand("/position", "Show Position", func(reply interact.Reply) error {
// it.trader.exchangeStrategies
// send symbol options
found := false
@ -88,7 +121,7 @@ func (it *CoreInteraction) Commands(i *interact.Interact) {
return nil
})
i.PrivateCommand("/closeposition", "close the position of a strategy", func(reply interact.Reply) error {
i.PrivateCommand("/closeposition", "Close position", func(reply interact.Reply) error {
// it.trader.exchangeStrategies
// send symbol options
found := false
@ -156,6 +189,7 @@ func (it *CoreInteraction) Commands(i *interact.Interact) {
return err
}
reply.Message("Done")
return nil
})
}

View File

@ -51,9 +51,9 @@ func (it *AuthInteract) Commands(interact *Interact) {
it.OneTimePasswordKey = key
}
interact.Command("/auth", "authorize", func(reply Reply, authorizer Authorizer) error {
interact.Command("/auth", "authorize", func(reply Reply, session Session) error {
reply.Message("Enter your authentication token")
authorizer.StartAuthorizing()
session.SetAuthorizing(true)
return nil
}).Next(func(token string, reply Reply) error {
if token == it.Token {
@ -71,11 +71,12 @@ func (it *AuthInteract) Commands(interact *Interact) {
}
return ErrAuthenticationFailed
}).NamedNext(StateAuthenticated, func(code string, reply Reply, authorizer Authorizer) error {
}).NamedNext(StateAuthenticated, func(code string, reply Reply, session Session) error {
if totp.Validate(code, it.OneTimePasswordKey.Secret()) {
reply.Message("Great! You're authenticated!")
interact.SetOriginState(StateAuthenticated)
return authorizer.Authorize()
session.SetOriginState(StateAuthenticated)
session.SetAuthorized()
return nil
}
reply.Message("Incorrect authentication code")
@ -85,20 +86,23 @@ func (it *AuthInteract) Commands(interact *Interact) {
interact.Command("/auth", "authorize", func(reply Reply) error {
reply.Message("Enter your authentication code")
return nil
}).NamedNext(StateAuthenticated, func(code string, reply Reply, authorizer Authorizer) error {
}).NamedNext(StateAuthenticated, func(code string, reply Reply, session Session) error {
switch it.Mode {
case AuthModeToken:
if code == it.Token {
reply.Message("Great! You're authenticated!")
interact.SetOriginState(StateAuthenticated)
return authorizer.Authorize()
session.SetOriginState(StateAuthenticated)
session.SetAuthorized()
return nil
}
case AuthModeOTP:
if totp.Validate(code, it.OneTimePasswordKey.Secret()) {
reply.Message("Great! You're authenticated!")
interact.SetOriginState(StateAuthenticated)
return authorizer.Authorize()
session.SetOriginState(StateAuthenticated)
session.SetAuthorized()
session.SetAuthorized()
return nil
}
}

View File

@ -22,6 +22,17 @@ type Messenger interface {
Start(ctx context.Context)
}
type Session interface {
ID() string
SetOriginState(state State)
GetOriginState() State
SetState(state State)
GetState() State
IsAuthorized() bool
SetAuthorized()
SetAuthorizing(b bool)
}
// Interact implements the interaction between bot and message software.
type Interact struct {
startTime time.Time
@ -35,7 +46,7 @@ type Interact struct {
states map[State]State
statesFunc map[State]interface{}
originState, currentState State
authenticatedSessions map[string]Session
customInteractions []CustomInteraction
@ -47,17 +58,11 @@ func New() *Interact {
startTime: time.Now(),
commands: make(map[string]*Command),
privateCommands: make(map[string]*Command),
originState: StatePublic,
currentState: StatePublic,
states: make(map[State]State),
statesFunc: make(map[State]interface{}),
}
}
func (it *Interact) SetOriginState(s State) {
it.originState = s
}
func (it *Interact) AddCustomInteraction(custom CustomInteraction) {
custom.Commands(it)
it.customInteractions = append(it.customInteractions, custom)
@ -75,7 +80,7 @@ func (it *Interact) Command(command string, desc string, f interface{}) *Command
return cmd
}
func (it *Interact) getNextState(currentState State) (nextState State, final bool) {
func (it *Interact) getNextState(session Session, currentState State) (nextState State, final bool) {
var ok bool
final = false
nextState, ok = it.states[currentState]
@ -89,17 +94,12 @@ func (it *Interact) getNextState(currentState State) (nextState State, final boo
}
// state not found, return to the origin state
return it.originState, final
return session.GetOriginState(), final
}
func (it *Interact) SetState(s State) {
log.Infof("[interact] transiting state from %s -> %s", it.currentState, s)
it.currentState = s
}
func (it *Interact) handleResponse(text string, ctxObjects ...interface{}) error {
// we only need response when executing a command
switch it.currentState {
func (it *Interact) handleResponse(session Session, text string, ctxObjects ...interface{}) error {
// We only need response when executing a command
switch session.GetState() {
case StatePublic, StateAuthenticated:
return nil
@ -107,40 +107,40 @@ func (it *Interact) handleResponse(text string, ctxObjects ...interface{}) error
args := parseCommand(text)
f, ok := it.statesFunc[it.currentState]
state := session.GetState()
f, ok := it.statesFunc[state]
if !ok {
return fmt.Errorf("state function of %s is not defined", it.currentState)
return fmt.Errorf("state function of %s is not defined", state)
}
ctxObjects = append(ctxObjects, session)
_, err := parseFuncArgsAndCall(f, args, ctxObjects...)
if err != nil {
return err
}
nextState, end := it.getNextState(it.currentState)
nextState, end := it.getNextState(session, state)
if end {
it.SetState(it.originState)
session.SetState(session.GetOriginState())
return nil
}
it.SetState(nextState)
session.SetState(nextState)
return nil
}
func (it *Interact) getCommand(command string) (*Command, error) {
switch it.currentState {
case StateAuthenticated:
func (it *Interact) getCommand(session Session, command string) (*Command, error) {
if session.IsAuthorized() {
if cmd, ok := it.privateCommands[command]; ok {
return cmd, nil
}
case StatePublic:
} else {
if _, ok := it.privateCommands[command]; ok {
return nil, fmt.Errorf("private command can not be executed in the public mode")
return nil, fmt.Errorf("private command can not be executed in the public mode, type /auth to get authorized")
}
}
// find any public command
if cmd, ok := it.commands[command]; ok {
return cmd, nil
}
@ -148,32 +148,34 @@ func (it *Interact) getCommand(command string) (*Command, error) {
return nil, fmt.Errorf("command %s not found", command)
}
func (it *Interact) runCommand(command string, args []string, ctxObjects ...interface{}) error {
cmd, err := it.getCommand(command)
func (it *Interact) runCommand(session Session, command string, args []string, ctxObjects ...interface{}) error {
cmd, err := it.getCommand(session, command)
if err != nil {
return err
}
it.SetState(cmd.initState)
ctxObjects = append(ctxObjects, session)
session.SetState(cmd.initState)
if _, err := parseFuncArgsAndCall(cmd.F, args, ctxObjects...); err != nil {
return err
}
// if we can successfully execute the command, then we can go to the next state.
nextState, end := it.getNextState(it.currentState)
state := session.GetState()
nextState, end := it.getNextState(session, state)
if end {
it.SetState(it.originState)
session.SetState(session.GetOriginState())
return nil
}
it.SetState(nextState)
session.SetState(nextState)
return nil
}
func (it *Interact) SetMessenger(messenger Messenger) {
// pass Responder function
messenger.SetTextMessageResponder(func(message string, reply Reply, ctxObjects ...interface{}) error {
return it.handleResponse(message, append(ctxObjects, reply)...)
messenger.SetTextMessageResponder(func(session Session, message string, reply Reply, ctxObjects ...interface{}) error {
return it.handleResponse(session, message, append(ctxObjects, reply)...)
})
it.messenger = messenger
}
@ -224,9 +226,9 @@ func (it *Interact) registerCommands(commands map[string]*Command) error {
}
commandName := n
it.messenger.AddCommand(cmd, func(message string, reply Reply, ctxObjects ...interface{}) error {
it.messenger.AddCommand(cmd, func(session Session, message string, reply Reply, ctxObjects ...interface{}) error {
args := parseCommand(message)
return it.runCommand(commandName, args, append(ctxObjects, reply)...)
return it.runCommand(session, commandName, args, append(ctxObjects, reply)...)
})
}
return nil

View File

@ -1,7 +1,7 @@
package interact
// Responder defines the logic of responding the message
type Responder func(message string, reply Reply, ctxObjects ...interface{}) error
type Responder func(session Session, message string, reply Reply, ctxObjects ...interface{}) error
type TextMessageResponder interface {
SetTextMessageResponder(responder Responder)

47
pkg/interact/session.go Normal file
View File

@ -0,0 +1,47 @@
package interact
import (
"time"
log "github.com/sirupsen/logrus"
)
type BaseSession struct {
OriginState State `json:"originState,omitempty"`
CurrentState State `json:"currentState,omitempty"`
Authorized bool `json:"authorized,omitempty"`
StartedTime time.Time `json:"startedTime,omitempty"`
// authorizing -- the user started authorizing himself/herself, do not ignore the message
authorizing bool
}
func (s *BaseSession) SetOriginState(state State) {
s.OriginState = state
}
func (s *BaseSession) GetOriginState() State {
return s.OriginState
}
func (s *BaseSession) SetState(state State) {
log.Infof("[interact] transiting state from %s -> %s", s.CurrentState, state)
s.CurrentState = state
}
func (s *BaseSession) GetState() State {
return s.CurrentState
}
func (s *BaseSession) SetAuthorized() {
s.Authorized = true
s.authorizing = false
}
func (s *BaseSession) IsAuthorized() bool {
return s.Authorized
}
func (s *BaseSession) SetAuthorizing(b bool) {
s.authorizing = b
}

View File

@ -10,9 +10,44 @@ import (
"gopkg.in/tucnak/telebot.v2"
)
type TelegramSessionKey struct {
UserID, ChatID int64
}
type TelegramSessionMap map[TelegramSessionKey]*TelegramSession
type TelegramSession struct {
BaseSession
telegram *Telegram
User *telebot.User `json:"user"`
Chat *telebot.Chat `json:"chat"`
}
func (s *TelegramSession) ID() string {
return fmt.Sprintf("telegram-%d-%d", s.User.ID, s.Chat.ID)
}
func NewTelegramSession(telegram *Telegram, message *telebot.Message) *TelegramSession {
return &TelegramSession{
BaseSession: BaseSession{
OriginState: StatePublic,
CurrentState: StatePublic,
Authorized: false,
authorizing: false,
StartedTime: time.Now(),
},
telegram: telegram,
User: message.Sender,
Chat: message.Chat,
}
}
type TelegramReply struct {
bot *telebot.Bot
chat *telebot.Chat
bot *telebot.Bot
session *TelegramSession
message string
menu *telebot.ReplyMarkup
@ -21,7 +56,7 @@ type TelegramReply struct {
}
func (r *TelegramReply) Send(message string) {
checkSendErr(r.bot.Send(r.chat, message))
checkSendErr(r.bot.Send(r.session.Chat, message))
}
func (r *TelegramReply) Message(message string) {
@ -51,24 +86,6 @@ func (r *TelegramReply) build() {
r.menu.Reply(rows...)
}
type TelegramAuthorizer struct {
Telegram *Telegram
Message *telebot.Message
}
func (a *TelegramAuthorizer) Authorize() error {
a.Telegram.Owner = a.Message.Sender
a.Telegram.OwnerChat = a.Message.Chat
a.Telegram.authorizing = false
log.Infof("[interact][telegram] authorized owner %+v and chat %+v", a.Message.Sender, a.Message.Chat)
a.Telegram.EmitAuthorized(a)
return nil
}
func (a *TelegramAuthorizer) StartAuthorizing() {
a.Telegram.authorizing = true
}
//go:generate callbackgen -type Telegram
type Telegram struct {
Bot *telebot.Bot `json:"-"`
@ -78,27 +95,14 @@ type Telegram struct {
authorizing bool
// Owner is the authorized bot owner
// This field is exported in order to be stored in file
Owner *telebot.User `json:"owner,omitempty"`
// OwnerChat is the chat of the authorized bot owner
// This field is exported in order to be stored in file
OwnerChat *telebot.Chat `json:"chat,omitempty"`
sessions map[TelegramSessionKey]*TelegramSession
// textMessageResponder is used for interact to register its message handler
textMessageResponder Responder
commands []*Command
authorizedCallbacks []func(a *TelegramAuthorizer)
}
func (tm *Telegram) newAuthorizer(message *telebot.Message) *TelegramAuthorizer {
return &TelegramAuthorizer{
Telegram: tm,
Message: message,
}
authorizedCallbacks []func(s *TelegramSession)
}
func (tm *Telegram) SetTextMessageResponder(textMessageResponder Responder) {
@ -109,30 +113,24 @@ func (tm *Telegram) Start(context.Context) {
tm.Bot.Handle(telebot.OnText, func(m *telebot.Message) {
log.Infof("[telegram] onText: %+v", m)
if tm.Private && !tm.authorizing {
// ignore the message directly if it's not authorized yet
if tm.Owner == nil {
session := tm.loadSession(m)
if tm.Private {
if !session.authorizing && !session.Authorized {
log.Warn("[telegram] telegram is set to private mode, skipping message")
return
} else if tm.Owner != nil && tm.Owner.ID != m.Sender.ID {
log.Warnf("[telegram] telegram is set to private mode, owner does not match: %d != %d", tm.Owner.ID, m.Sender.ID)
return
}
}
authorizer := tm.newAuthorizer(m)
reply := tm.newReply(m)
reply := tm.newReply(session)
if tm.textMessageResponder != nil {
if err := tm.textMessageResponder(m.Text, reply, authorizer); err != nil {
if err := tm.textMessageResponder(session, m.Text, reply); err != nil {
log.WithError(err).Errorf("[telegram] response handling error")
}
}
if reply.set {
reply.build()
if _, err := tm.Bot.Send(m.Sender, reply.message, reply.menu); err != nil {
log.WithError(err).Errorf("[telegram] message send error")
}
checkSendErr(tm.Bot.Send(m.Sender, reply.message, reply.menu))
}
})
@ -160,12 +158,28 @@ func checkSendErr(m *telebot.Message, err error) {
}
}
func (tm *Telegram) loadSession(m *telebot.Message) *TelegramSession {
if tm.sessions == nil {
tm.sessions = make(map[TelegramSessionKey]*TelegramSession)
}
key := TelegramSessionKey{UserID: m.Sender.ID, ChatID: m.Chat.ID}
session, ok := tm.sessions[key]
if ok {
return session
}
session = NewTelegramSession(tm, m)
tm.sessions[key] = session
return session
}
func (tm *Telegram) AddCommand(cmd *Command, responder Responder) {
tm.commands = append(tm.commands, cmd)
tm.Bot.Handle(cmd.Name, func(m *telebot.Message) {
authorizer := tm.newAuthorizer(m)
reply := tm.newReply(m)
if err := responder(m.Payload, reply, authorizer); err != nil {
session := tm.loadSession(m)
reply := tm.newReply(session)
if err := responder(session, m.Payload, reply); err != nil {
log.WithError(err).Errorf("[telegram] responder error")
checkSendErr(tm.Bot.Send(m.Sender, fmt.Sprintf("error: %v", err)))
return
@ -179,37 +193,30 @@ func (tm *Telegram) AddCommand(cmd *Command, responder Responder) {
})
}
func (tm *Telegram) newReply(m *telebot.Message) *TelegramReply {
func (tm *Telegram) newReply(session *TelegramSession) *TelegramReply {
return &TelegramReply{
bot: tm.Bot,
chat: m.Chat,
menu: &telebot.ReplyMarkup{ResizeReplyKeyboard: true},
bot: tm.Bot,
session: session,
menu: &telebot.ReplyMarkup{ResizeReplyKeyboard: true},
}
}
func (tm *Telegram) RestoreSession(session *TelegramSession) {
log.Infof("[telegram] restoring telegram session: %+v", session)
if session.OwnerChat != nil {
tm.OwnerChat = session.OwnerChat
tm.Owner = session.Owner
if _, err := tm.Bot.Send(tm.OwnerChat, fmt.Sprintf("Hi %s, I'm back. Your telegram session is restored.", tm.Owner.Username)); err != nil {
log.WithError(err).Error("[telegram] can not send telegram message")
func (tm *Telegram) Sessions() TelegramSessionMap {
return tm.sessions
}
func (tm *Telegram) RestoreSessions(sessions TelegramSessionMap) {
if len(sessions) == 0 {
return
}
log.Infof("[telegram] restoring telegram %d sessions", len(sessions))
tm.sessions = sessions
for _, session := range sessions {
if session.IsAuthorized() {
if _, err := tm.Bot.Send(session.Chat, fmt.Sprintf("Hi %s, I'm back. Your telegram session is restored.", session.User.Username)); err != nil {
log.WithError(err).Error("[telegram] can not send telegram message")
}
}
}
}
type TelegramSession struct {
Owner *telebot.User `json:"owner"`
OwnerChat *telebot.Chat `json:"chat"`
// Subscribers stores the Chat objects
Subscribers map[int64]time.Time `json:"chats"`
}
func NewTelegramSession() *TelegramSession {
return &TelegramSession{
Owner: nil,
OwnerChat: nil,
Subscribers: make(map[int64]time.Time),
}
}

View File

@ -4,12 +4,12 @@ package interact
import ()
func (tm *Telegram) OnAuthorized(cb func(a *TelegramAuthorizer)) {
func (tm *Telegram) OnAuthorized(cb func(s *TelegramSession)) {
tm.authorizedCallbacks = append(tm.authorizedCallbacks, cb)
}
func (tm *Telegram) EmitAuthorized(a *TelegramAuthorizer) {
func (tm *Telegram) EmitAuthorized(s *TelegramSession) {
for _, cb := range tm.authorizedCallbacks {
cb(a)
cb(s)
}
}