freqtrade_origin/freqtrade/rpc/rpc_manager.py

58 lines
1.5 KiB
Python
Raw Normal View History

2018-02-13 03:45:59 +00:00
"""
This module contains class to manage RPC communications (Telegram, Slack, ...)
"""
2018-05-31 18:55:26 +00:00
from typing import Any, Optional, List
2018-03-25 19:37:14 +00:00
import logging
2018-02-13 03:45:59 +00:00
from freqtrade.rpc.telegram import Telegram
2018-03-25 19:37:14 +00:00
logger = logging.getLogger(__name__)
2018-02-13 03:45:59 +00:00
class RPCManager(object):
"""
Class to manage RPC objects (Telegram, Slack, ...)
"""
def __init__(self, freqtrade) -> None:
"""
Initializes all enabled rpc modules
:param config: config to use
:return: None
"""
self.freqtrade = freqtrade
2018-05-31 18:55:26 +00:00
self.registered_modules: List[str] = []
self.telegram: Any = None
2018-02-13 03:45:59 +00:00
self._init()
def _init(self) -> None:
2018-02-13 03:45:59 +00:00
"""
Init RPC modules
:return:
"""
if self.freqtrade.config['telegram'].get('enabled', False):
2018-03-25 19:37:14 +00:00
logger.info('Enabling rpc.telegram ...')
2018-02-13 03:45:59 +00:00
self.registered_modules.append('telegram')
self.telegram = Telegram(self.freqtrade)
def cleanup(self) -> None:
"""
Stops all enabled rpc modules
:return: None
"""
if 'telegram' in self.registered_modules:
2018-03-25 19:37:14 +00:00
logger.info('Cleaning up rpc.telegram ...')
2018-02-13 03:45:59 +00:00
self.registered_modules.remove('telegram')
self.telegram.cleanup()
def send_msg(self, msg: str) -> None:
"""
Send given markdown message to all registered rpc modules
:param msg: message
:return: None
"""
2018-03-25 19:37:14 +00:00
logger.info(msg)
2018-02-13 03:45:59 +00:00
if 'telegram' in self.registered_modules:
self.telegram.send_msg(msg)