diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index 1c66ad708..bc7b074b9 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -46,8 +46,7 @@ type Environment struct { // note that, for back tests, we don't need notification. Notifiability - PersistenceServiceFacade *PersistenceServiceFacade - + PersistenceServiceFacade *service.PersistenceServiceFacade DatabaseService *service.DatabaseService OrderService *service.OrderService TradeService *service.TradeService @@ -236,8 +235,8 @@ func (environ *Environment) Init(ctx context.Context) (err error) { } func (environ *Environment) ConfigurePersistence(conf *PersistenceConfig) error { - var facade = &PersistenceServiceFacade{ - Memory: NewMemoryService(), + var facade = &service.PersistenceServiceFacade{ + Memory: service.NewMemoryService(), } if conf.Redis != nil { @@ -245,7 +244,7 @@ func (environ *Environment) ConfigurePersistence(conf *PersistenceConfig) error return err } - facade.Redis = NewRedisPersistenceService(conf.Redis) + facade.Redis = service.NewRedisPersistenceService(conf.Redis) } if conf.Json != nil { @@ -256,7 +255,7 @@ func (environ *Environment) ConfigurePersistence(conf *PersistenceConfig) error } } - facade.Json = &JsonPersistenceService{Directory: conf.Json.Directory} + facade.Json = &service.JsonPersistenceService{Directory: conf.Json.Directory} } environ.PersistenceServiceFacade = facade diff --git a/pkg/bbgo/persistence.go b/pkg/bbgo/persistence.go index 3f6d5d0e9..3d669e337 100644 --- a/pkg/bbgo/persistence.go +++ b/pkg/bbgo/persistence.go @@ -1,6 +1,10 @@ package bbgo -import "fmt" +import ( + "fmt" + + "github.com/c9s/bbgo/pkg/service" +) type PersistenceSelector struct { // StoreID is the store you want to use. @@ -14,7 +18,7 @@ type PersistenceSelector struct { type Persistence struct { PersistenceSelector *PersistenceSelector `json:"persistence,omitempty" yaml:"persistence,omitempty"` - Facade *PersistenceServiceFacade `json:"-" yaml:"-"` + Facade *service.PersistenceServiceFacade `json:"-" yaml:"-"` } func (p *Persistence) backendService(t string) (service PersistenceService, err error) { @@ -63,8 +67,13 @@ func (p *Persistence) Save(val interface{}, subIDs ...string) error { return store.Save(val) } -type PersistenceServiceFacade struct { - Redis *RedisPersistenceService - Json *JsonPersistenceService - Memory *MemoryService +type PersistenceService interface { + NewStore(id string, subIDs ...string) Store } + +type Store interface { + Load(val interface{}) error + Save(val interface{}) error + Reset() error +} + diff --git a/pkg/bbgo/redis_persistence.go b/pkg/bbgo/redis_persistence.go deleted file mode 100644 index 53f80d4fd..000000000 --- a/pkg/bbgo/redis_persistence.go +++ /dev/null @@ -1,209 +0,0 @@ -package bbgo - -import ( - "context" - "encoding/json" - "io/ioutil" - "net" - "os" - "path/filepath" - "reflect" - "strings" - - "github.com/go-redis/redis/v8" - "github.com/pkg/errors" -) - -var ErrPersistenceNotExists = errors.New("persistent data does not exists") - -type PersistenceService interface { - NewStore(id string, subIDs ...string) Store -} - -type Store interface { - Load(val interface{}) error - Save(val interface{}) error - Reset() error -} - -type MemoryService struct { - Slots map[string]interface{} -} - -func NewMemoryService() *MemoryService { - return &MemoryService{ - Slots: make(map[string]interface{}), - } -} - -func (s *MemoryService) NewStore(id string, subIDs ...string) Store { - key := strings.Join(append([]string{id}, subIDs...), ":") - return &MemoryStore{ - Key: key, - memory: s, - } -} - -type MemoryStore struct { - Key string - memory *MemoryService -} - -func (store *MemoryStore) Save(val interface{}) error { - store.memory.Slots[store.Key] = val - return nil -} - -func (store *MemoryStore) Load(val interface{}) error { - v := reflect.ValueOf(val) - if data, ok := store.memory.Slots[store.Key]; ok { - v.Elem().Set(reflect.ValueOf(data).Elem()) - } else { - return ErrPersistenceNotExists - } - - return nil -} - -func (store *MemoryStore) Reset() error { - delete(store.memory.Slots, store.Key) - return nil -} - -type JsonPersistenceService struct { - Directory string -} - -func (s *JsonPersistenceService) NewStore(id string, subIDs ...string) Store { - return &JsonStore{ - ID: id, - Directory: filepath.Join(append([]string{s.Directory}, subIDs...)...), - } -} - -type JsonStore struct { - ID string - Directory string -} - -func (store JsonStore) Reset() error { - if _, err := os.Stat(store.Directory); os.IsNotExist(err) { - return nil - } - - p := filepath.Join(store.Directory, store.ID) + ".json" - if _, err := os.Stat(p); os.IsNotExist(err) { - return nil - } - - return os.Remove(p) -} - -func (store JsonStore) Load(val interface{}) error { - if _, err := os.Stat(store.Directory); os.IsNotExist(err) { - if err2 := os.Mkdir(store.Directory, 0777); err2 != nil { - return err2 - } - } - - p := filepath.Join(store.Directory, store.ID) + ".json" - - if _, err := os.Stat(p); os.IsNotExist(err) { - return ErrPersistenceNotExists - } - - data, err := ioutil.ReadFile(p) - if err != nil { - return err - } - - if len(data) == 0 { - return ErrPersistenceNotExists - } - - return json.Unmarshal(data, val) -} - -func (store JsonStore) Save(val interface{}) error { - if _, err := os.Stat(store.Directory); os.IsNotExist(err) { - if err2 := os.Mkdir(store.Directory, 0777); err2 != nil { - return err2 - } - } - - data, err := json.Marshal(val) - if err != nil { - return err - } - - p := filepath.Join(store.Directory, store.ID) + ".json" - return ioutil.WriteFile(p, data, 0666) -} - -type RedisPersistenceService struct { - redis *redis.Client -} - -func NewRedisPersistenceService(config *RedisPersistenceConfig) *RedisPersistenceService { - client := redis.NewClient(&redis.Options{ - Addr: net.JoinHostPort(config.Host, config.Port), - // Username: "", // username is only for redis 6.0 - Password: config.Password, // no password set - DB: config.DB, // use default DB - }) - - return &RedisPersistenceService{ - redis: client, - } -} - -func (s *RedisPersistenceService) NewStore(id string, subIDs ...string) Store { - if len(subIDs) > 0 { - id += ":" + strings.Join(subIDs, ":") - } - - return &RedisStore{ - redis: s.redis, - ID: id, - } -} - -type RedisStore struct { - redis *redis.Client - - ID string -} - -func (store *RedisStore) Load(val interface{}) error { - cmd := store.redis.Get(context.Background(), store.ID) - data, err := cmd.Result() - if err != nil { - if err == redis.Nil { - return ErrPersistenceNotExists - } - - return err - } - - if len(data) == 0 { - return ErrPersistenceNotExists - } - - return json.Unmarshal([]byte(data), val) -} - -func (store *RedisStore) Save(val interface{}) error { - data, err := json.Marshal(val) - if err != nil { - return err - } - - cmd := store.redis.Set(context.Background(), store.ID, data, 0) - _, err = cmd.Result() - return err -} - -func (store *RedisStore) Reset() error { - _, err := store.redis.Del(context.Background(), store.ID).Result() - return err -} diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 0364dfb2e..c440d7e6b 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -88,8 +88,8 @@ func runSetup(baseCtx context.Context, userConfig *bbgo.Config, enableApiServer return nil } -func newNotificationSystem(userConfig *bbgo.Config) bbgo.Notifiability { - notification := bbgo.Notifiability{ +func newNotificationSystem(userConfig *bbgo.Config, persistence bbgo.PersistenceService) (*bbgo.Notifiability, error) { + notification := &bbgo.Notifiability{ SymbolChannelRouter: bbgo.NewPatternChannelRouter(nil), SessionChannelRouter: bbgo.NewPatternChannelRouter(nil), ObjectChannelRouter: bbgo.NewObjectChannelRouter(), @@ -109,7 +109,59 @@ func newNotificationSystem(userConfig *bbgo.Config) bbgo.Notifiability { } } - return notification + telegramBotToken := viper.GetString("telegram-bot-token") + if len(telegramBotToken) > 0 { + tt := strings.Split(telegramBotToken, ":") + telegramID := tt[0] + + bot, err := tb.NewBot(tb.Settings{ + // You can also set custom API URL. + // If field is empty it equals to "https://api.telegram.org". + // URL: "http://195.129.111.17:8012", + Token: telegramBotToken, + Poller: &tb.LongPoller{Timeout: 10 * time.Second}, + }) + + if err != nil { + return nil, err + } + + // allocate a store, so that we can save the chatID for the owner + var sessionStore = persistence.NewStore("bbgo", "telegram", telegramID) + var interaction = telegramnotifier.NewInteraction(bot, sessionStore) + + authToken := viper.GetString("telegram-bot-auth-token") + if len(authToken) > 0 { + interaction.SetAuthToken(authToken) + + log.Info("telegram bot auth token is set, using fixed token for authorization...") + + printTelegramAuthTokenGuide(authToken) + } + + var session telegramnotifier.Session + if err := sessionStore.Load(&session); err != nil || session.Owner == nil { + log.Warnf("session not found, generating new one-time password key for new session...") + + qrcodeImagePath := fmt.Sprintf("otp-%s.png", telegramID) + key, err := setupNewOTPKey(qrcodeImagePath) + if err != nil { + return nil, errors.Wrapf(err, "failed to setup totp (time-based one time password) key") + } + + session = telegramnotifier.NewSession(key) + if err := sessionStore.Save(&session); err != nil { + return nil, errors.Wrap(err, "failed to save session") + } + } + + go interaction.Start(session) + + var notifier = telegramnotifier.New(interaction) + notification.AddNotifier(notifier) + } + + return notification, nil } func BootstrapEnvironment(ctx context.Context, environ *bbgo.Environment, userConfig *bbgo.Config) error { @@ -128,76 +180,19 @@ func BootstrapEnvironment(ctx context.Context, environ *bbgo.Environment, userCo } // configure persistence service, by default we will use memory service - var persistence bbgo.PersistenceService = bbgo.NewMemoryService() + var persistence bbgo.PersistenceService = service.NewMemoryService() if environ.PersistenceServiceFacade != nil { if environ.PersistenceServiceFacade.Redis != nil { persistence = environ.PersistenceServiceFacade.Redis } } - - notification := newNotificationSystem(userConfig) - - // for telegram - telegramBotToken := viper.GetString("telegram-bot-token") - telegramBotAuthToken := viper.GetString("telegram-bot-auth-token") - if len(telegramBotToken) > 0 { - log.Infof("initializing telegram bot...") - - tt := strings.Split(telegramBotToken, ":") - telegramID := tt[0] - - bot, err := tb.NewBot(tb.Settings{ - // You can also set custom API URL. - // If field is empty it equals to "https://api.telegram.org". - // URL: "http://195.129.111.17:8012", - Token: telegramBotToken, - Poller: &tb.LongPoller{Timeout: 10 * time.Second}, - }) - - if err != nil { - return err - } - - // allocate a store, so that we can save the chatID for the owner - var sessionStore = persistence.NewStore("bbgo", "telegram", telegramID) - var interaction = telegramnotifier.NewInteraction(bot, sessionStore) - - if len(telegramBotAuthToken) > 0 { - log.Infof("telegram bot auth token is set, using fixed token for authorization...") - interaction.SetAuthToken(telegramBotAuthToken) - - log.Infof("send the following command to the bbgo bot you created to enable the notification") - log.Infof("") - log.Infof("") - log.Infof(" /auth %s", telegramBotAuthToken) - log.Infof("") - log.Infof("") - } - - var session telegramnotifier.Session - if err := sessionStore.Load(&session); err != nil || session.Owner == nil { - log.Warnf("session not found, generating new one-time password key for new session...") - - qrcodeImagePath := fmt.Sprintf("otp-%s.png", telegramID) - key, err := setupNewOTPKey(qrcodeImagePath) - if err != nil { - return errors.Wrapf(err, "failed to setup totp (time-based one time password) key") - } - - session = telegramnotifier.NewSession(key) - if err := sessionStore.Save(&session); err != nil { - return errors.Wrap(err, "failed to save session") - } - } - - go interaction.Start(session) - - var notifier = telegramnotifier.New(interaction) - notification.AddNotifier(notifier) + notification, err := newNotificationSystem(userConfig, persistence) + if err != nil { + return err } - environ.Notifiability = notification + environ.Notifiability = *notification if userConfig.Notifications != nil { if err := environ.ConfigureNotification(userConfig.Notifications); err != nil { @@ -387,29 +382,39 @@ func writeOTPKeyAsQRCodePNG(key *otp.Key, imagePath string) error { return nil } -func displayOTPKey(key *otp.Key) { - log.Infof("") - log.Infof("====================PLEASE STORE YOUR OTP KEY=======================") - log.Infof("") - log.Infof("Issuer: %s", key.Issuer()) - log.Infof("AccountName: %s", key.AccountName()) - log.Infof("Secret: %s", key.Secret()) - log.Infof("Key URL: %s", key.URL()) - log.Infof("") - log.Infof("====================================================================") - log.Infof("") -} - +// setupNewOTPKey generates a new otp key and save the secret as a qrcode image func setupNewOTPKey(qrcodeImagePath string) (*otp.Key, error) { key, err := service.NewDefaultTotpKey() if err != nil { return nil, errors.Wrapf(err, "failed to setup totp (time-based one time password) key") } - displayOTPKey(key) + printOtpKey(key) - err = writeOTPKeyAsQRCodePNG(key, qrcodeImagePath) + if err := writeOTPKeyAsQRCodePNG(key, qrcodeImagePath) ; err != nil { + return nil, err + } + + printTelegramOtpAuthGuide(qrcodeImagePath) + + return key, nil +} + +func printOtpKey(key *otp.Key) { + fmt.Println("") + fmt.Println("====================PLEASE STORE YOUR OTP KEY=======================") + fmt.Println("") + fmt.Printf("Issuer: %s\n", key.Issuer()) + fmt.Printf("AccountName: %s\n", key.AccountName()) + fmt.Printf("Secret: %s\n", key.Secret()) + fmt.Printf("Key URL: %s\n", key.URL()) + fmt.Println("") + fmt.Println("====================================================================") + fmt.Println("") +} + +func printTelegramOtpAuthGuide(qrcodeImagePath string) { log.Infof("To scan your OTP QR code, please run the following command:") log.Infof("") log.Infof("") @@ -422,6 +427,13 @@ func setupNewOTPKey(qrcodeImagePath string) (*otp.Key, error) { log.Infof(" /auth {code}") log.Infof("") log.Infof("") - - return key, nil +} + +func printTelegramAuthTokenGuide(token string) { + fmt.Println("send the following command to the bbgo bot you created to enable the notification") + fmt.Println("") + fmt.Println("") + fmt.Printf(" /auth %s\n", token) + fmt.Println("") + fmt.Println("") } diff --git a/pkg/service/errors.go b/pkg/service/errors.go new file mode 100644 index 000000000..516301d5b --- /dev/null +++ b/pkg/service/errors.go @@ -0,0 +1,5 @@ +package service + +import "github.com/pkg/errors" + +var ErrPersistenceNotExists = errors.New("persistent data does not exists") diff --git a/pkg/service/memory.go b/pkg/service/memory.go new file mode 100644 index 000000000..1ff5056f9 --- /dev/null +++ b/pkg/service/memory.go @@ -0,0 +1,52 @@ +package service + +import ( + "reflect" + "strings" + + "github.com/c9s/bbgo/pkg/bbgo" +) + +type MemoryService struct { + Slots map[string]interface{} +} + +func NewMemoryService() *MemoryService { + return &MemoryService{ + Slots: make(map[string]interface{}), + } +} + +func (s *MemoryService) NewStore(id string, subIDs ...string) bbgo.Store { + key := strings.Join(append([]string{id}, subIDs...), ":") + return &MemoryStore{ + Key: key, + memory: s, + } +} + +type MemoryStore struct { + Key string + memory *MemoryService +} + +func (store *MemoryStore) Save(val interface{}) error { + store.memory.Slots[store.Key] = val + return nil +} + +func (store *MemoryStore) Load(val interface{}) error { + v := reflect.ValueOf(val) + if data, ok := store.memory.Slots[store.Key]; ok { + v.Elem().Set(reflect.ValueOf(data).Elem()) + } else { + return ErrPersistenceNotExists + } + + return nil +} + +func (store *MemoryStore) Reset() error { + delete(store.memory.Slots, store.Key) + return nil +} diff --git a/pkg/service/memory_test.go b/pkg/service/memory_test.go new file mode 100644 index 000000000..3accd9608 --- /dev/null +++ b/pkg/service/memory_test.go @@ -0,0 +1,33 @@ +package service + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMemoryService(t *testing.T) { + t.Run("load_empty", func(t *testing.T) { + service := NewMemoryService() + store := service.NewStore("test") + + j := 0 + err := store.Load(&j) + assert.Error(t, err) + }) + + t.Run("save_and_load", func(t *testing.T) { + service := NewMemoryService() + store := service.NewStore("test") + + i := 3 + err := store.Save(&i) + + assert.NoError(t, err) + + var j = 0 + err = store.Load(&j) + assert.NoError(t, err) + assert.Equal(t, i, j) + }) +} diff --git a/pkg/service/persistence_facade.go b/pkg/service/persistence_facade.go new file mode 100644 index 000000000..cc8f8b2f1 --- /dev/null +++ b/pkg/service/persistence_facade.go @@ -0,0 +1,7 @@ +package service + +type PersistenceServiceFacade struct { + Redis *RedisPersistenceService + Json *JsonPersistenceService + Memory *MemoryService +} diff --git a/pkg/service/persistence_json.go b/pkg/service/persistence_json.go new file mode 100644 index 000000000..3ac5c95f6 --- /dev/null +++ b/pkg/service/persistence_json.go @@ -0,0 +1,81 @@ +package service + +import ( + "encoding/json" + "io/ioutil" + "os" + "path/filepath" + + "github.com/c9s/bbgo/pkg/bbgo" +) + +type JsonPersistenceService struct { + Directory string +} + +func (s *JsonPersistenceService) NewStore(id string, subIDs ...string) bbgo.Store { + return &JsonStore{ + ID: id, + Directory: filepath.Join(append([]string{s.Directory}, subIDs...)...), + } +} + +type JsonStore struct { + ID string + Directory string +} + +func (store JsonStore) Reset() error { + if _, err := os.Stat(store.Directory); os.IsNotExist(err) { + return nil + } + + p := filepath.Join(store.Directory, store.ID) + ".json" + if _, err := os.Stat(p); os.IsNotExist(err) { + return nil + } + + return os.Remove(p) +} + +func (store JsonStore) Load(val interface{}) error { + if _, err := os.Stat(store.Directory); os.IsNotExist(err) { + if err2 := os.Mkdir(store.Directory, 0777); err2 != nil { + return err2 + } + } + + p := filepath.Join(store.Directory, store.ID) + ".json" + + if _, err := os.Stat(p); os.IsNotExist(err) { + return ErrPersistenceNotExists + } + + data, err := ioutil.ReadFile(p) + if err != nil { + return err + } + + if len(data) == 0 { + return ErrPersistenceNotExists + } + + return json.Unmarshal(data, val) +} + +func (store JsonStore) Save(val interface{}) error { + if _, err := os.Stat(store.Directory); os.IsNotExist(err) { + if err2 := os.Mkdir(store.Directory, 0777); err2 != nil { + return err2 + } + } + + data, err := json.Marshal(val) + if err != nil { + return err + } + + p := filepath.Join(store.Directory, store.ID) + ".json" + return ioutil.WriteFile(p, data, 0666) +} + diff --git a/pkg/service/persistence_redis.go b/pkg/service/persistence_redis.go new file mode 100644 index 000000000..e3ea08f2d --- /dev/null +++ b/pkg/service/persistence_redis.go @@ -0,0 +1,80 @@ +package service + +import ( + "context" + "encoding/json" + "net" + "strings" + + "github.com/go-redis/redis/v8" + + "github.com/c9s/bbgo/pkg/bbgo" +) + +type RedisPersistenceService struct { + redis *redis.Client +} + +func NewRedisPersistenceService(config *bbgo.RedisPersistenceConfig) *RedisPersistenceService { + client := redis.NewClient(&redis.Options{ + Addr: net.JoinHostPort(config.Host, config.Port), + // Username: "", // username is only for redis 6.0 + Password: config.Password, // no password set + DB: config.DB, // use default DB + }) + + return &RedisPersistenceService{ + redis: client, + } +} + +func (s *RedisPersistenceService) NewStore(id string, subIDs ...string) bbgo.Store { + if len(subIDs) > 0 { + id += ":" + strings.Join(subIDs, ":") + } + + return &RedisStore{ + redis: s.redis, + ID: id, + } +} + +type RedisStore struct { + redis *redis.Client + + ID string +} + +func (store *RedisStore) Load(val interface{}) error { + cmd := store.redis.Get(context.Background(), store.ID) + data, err := cmd.Result() + if err != nil { + if err == redis.Nil { + return ErrPersistenceNotExists + } + + return err + } + + if len(data) == 0 { + return ErrPersistenceNotExists + } + + return json.Unmarshal([]byte(data), val) +} + +func (store *RedisStore) Save(val interface{}) error { + data, err := json.Marshal(val) + if err != nil { + return err + } + + cmd := store.redis.Set(context.Background(), store.ID, data, 0) + _, err = cmd.Result() + return err +} + +func (store *RedisStore) Reset() error { + _, err := store.redis.Del(context.Background(), store.ID).Result() + return err +} diff --git a/pkg/bbgo/redis_persistence_test.go b/pkg/service/persistence_redis_test.go similarity index 58% rename from pkg/bbgo/redis_persistence_test.go rename to pkg/service/persistence_redis_test.go index 4c8b0679b..fbd719a1a 100644 --- a/pkg/bbgo/redis_persistence_test.go +++ b/pkg/service/persistence_redis_test.go @@ -1,15 +1,16 @@ -package bbgo +package service import ( "testing" "github.com/stretchr/testify/assert" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/fixedpoint" ) func TestRedisPersistentService(t *testing.T) { - redisService := NewRedisPersistenceService(&RedisPersistenceConfig{ + redisService := NewRedisPersistenceService(&bbgo.RedisPersistenceConfig{ Host: "127.0.0.1", Port: "6379", DB: 0, @@ -39,29 +40,3 @@ func TestRedisPersistentService(t *testing.T) { err = store.Reset() assert.NoError(t, err) } - -func TestMemoryService(t *testing.T) { - t.Run("load_empty", func(t *testing.T) { - service := NewMemoryService() - store := service.NewStore("test") - - j := 0 - err := store.Load(&j) - assert.Error(t, err) - }) - - t.Run("save_and_load", func(t *testing.T) { - service := NewMemoryService() - store := service.NewStore("test") - - i := 3 - err := store.Save(&i) - - assert.NoError(t, err) - - var j = 0 - err = store.Load(&j) - assert.NoError(t, err) - assert.Equal(t, i, j) - }) -}