ruff format: rpc modules

This commit is contained in:
Matthias 2024-05-12 16:51:11 +02:00
parent cebbe0121e
commit 5f64cc8e76
23 changed files with 1994 additions and 1689 deletions

View File

@ -21,8 +21,9 @@ router_login = APIRouter()
def verify_auth(api_config, username: str, password: str):
"""Verify username/password"""
return (secrets.compare_digest(username, api_config.get('username')) and
secrets.compare_digest(password, api_config.get('password')))
return secrets.compare_digest(username, api_config.get("username")) and secrets.compare_digest(
password, api_config.get("password")
)
httpbasic = HTTPBasic(auto_error=False)
@ -38,7 +39,7 @@ def get_user_from_token(token, secret_key: str, token_type: str = "access") -> s
)
try:
payload = jwt.decode(token, secret_key, algorithms=[ALGORITHM])
username: str = payload.get("identity", {}).get('u')
username: str = payload.get("identity", {}).get("u")
if username is None:
raise credentials_exception
if payload.get("type") != token_type:
@ -55,10 +56,10 @@ def get_user_from_token(token, secret_key: str, token_type: str = "access") -> s
async def validate_ws_token(
ws: WebSocket,
ws_token: Union[str, None] = Query(default=None, alias="token"),
api_config: Dict[str, Any] = Depends(get_api_config)
api_config: Dict[str, Any] = Depends(get_api_config),
):
secret_ws_token = api_config.get('ws_token', None)
secret_jwt_key = api_config.get('jwt_secret_key', 'super-secret')
secret_ws_token = api_config.get("ws_token", None)
secret_jwt_key = api_config.get("jwt_secret_key", "super-secret")
# Check if ws_token is/in secret_ws_token
if ws_token and secret_ws_token:
@ -66,10 +67,9 @@ async def validate_ws_token(
if isinstance(secret_ws_token, str):
is_valid_ws_token = secrets.compare_digest(secret_ws_token, ws_token)
elif isinstance(secret_ws_token, list):
is_valid_ws_token = any([
secrets.compare_digest(potential, ws_token)
for potential in secret_ws_token
])
is_valid_ws_token = any(
[secrets.compare_digest(potential, ws_token) for potential in secret_ws_token]
)
if is_valid_ws_token:
return ws_token
@ -94,20 +94,24 @@ def create_token(data: dict, secret_key: str, token_type: str = "access") -> str
expire = datetime.now(timezone.utc) + timedelta(days=30)
else:
raise ValueError()
to_encode.update({
"exp": expire,
"iat": datetime.now(timezone.utc),
"type": token_type,
})
to_encode.update(
{
"exp": expire,
"iat": datetime.now(timezone.utc),
"type": token_type,
}
)
encoded_jwt = jwt.encode(to_encode, secret_key, algorithm=ALGORITHM)
return encoded_jwt
def http_basic_or_jwt_token(form_data: HTTPBasicCredentials = Depends(httpbasic),
token: str = Depends(oauth2_scheme),
api_config=Depends(get_api_config)):
def http_basic_or_jwt_token(
form_data: HTTPBasicCredentials = Depends(httpbasic),
token: str = Depends(oauth2_scheme),
api_config=Depends(get_api_config),
):
if token:
return get_user_from_token(token, api_config.get('jwt_secret_key', 'super-secret'))
return get_user_from_token(token, api_config.get("jwt_secret_key", "super-secret"))
elif form_data and verify_auth(api_config, form_data.username, form_data.password):
return form_data.username
@ -117,15 +121,16 @@ def http_basic_or_jwt_token(form_data: HTTPBasicCredentials = Depends(httpbasic)
)
@router_login.post('/token/login', response_model=AccessAndRefreshToken)
def token_login(form_data: HTTPBasicCredentials = Depends(security),
api_config=Depends(get_api_config)):
@router_login.post("/token/login", response_model=AccessAndRefreshToken)
def token_login(
form_data: HTTPBasicCredentials = Depends(security), api_config=Depends(get_api_config)
):
if verify_auth(api_config, form_data.username, form_data.password):
token_data = {'identity': {'u': form_data.username}}
access_token = create_token(token_data, api_config.get('jwt_secret_key', 'super-secret'))
refresh_token = create_token(token_data, api_config.get('jwt_secret_key', 'super-secret'),
token_type="refresh")
token_data = {"identity": {"u": form_data.username}}
access_token = create_token(token_data, api_config.get("jwt_secret_key", "super-secret"))
refresh_token = create_token(
token_data, api_config.get("jwt_secret_key", "super-secret"), token_type="refresh"
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
@ -137,12 +142,12 @@ def token_login(form_data: HTTPBasicCredentials = Depends(security),
)
@router_login.post('/token/refresh', response_model=AccessToken)
@router_login.post("/token/refresh", response_model=AccessToken)
def token_refresh(token: str = Depends(oauth2_scheme), api_config=Depends(get_api_config)):
# Refresh token
u = get_user_from_token(token, api_config.get(
'jwt_secret_key', 'super-secret'), 'refresh')
token_data = {'identity': {'u': u}}
access_token = create_token(token_data, api_config.get('jwt_secret_key', 'super-secret'),
token_type="access")
return {'access_token': access_token}
u = get_user_from_token(token, api_config.get("jwt_secret_key", "super-secret"), "refresh")
token_data = {"identity": {"u": u}}
access_token = create_token(
token_data, api_config.get("jwt_secret_key", "super-secret"), token_type="access"
)
return {"access_token": access_token}

View File

@ -27,105 +27,113 @@ logger = logging.getLogger(__name__)
router = APIRouter()
@router.get('/background', response_model=List[BackgroundTaskStatus], tags=['webserver'])
@router.get("/background", response_model=List[BackgroundTaskStatus], tags=["webserver"])
def background_job_list():
return [{
'job_id': jobid,
'job_category': job['category'],
'status': job['status'],
'running': job['is_running'],
'progress': job.get('progress'),
'error': job.get('error', None),
} for jobid, job in ApiBG.jobs.items()]
return [
{
"job_id": jobid,
"job_category": job["category"],
"status": job["status"],
"running": job["is_running"],
"progress": job.get("progress"),
"error": job.get("error", None),
}
for jobid, job in ApiBG.jobs.items()
]
@router.get('/background/{jobid}', response_model=BackgroundTaskStatus, tags=['webserver'])
@router.get("/background/{jobid}", response_model=BackgroundTaskStatus, tags=["webserver"])
def background_job(jobid: str):
if not (job := ApiBG.jobs.get(jobid)):
raise HTTPException(status_code=404, detail='Job not found.')
raise HTTPException(status_code=404, detail="Job not found.")
return {
'job_id': jobid,
'job_category': job['category'],
'status': job['status'],
'running': job['is_running'],
'progress': job.get('progress'),
'error': job.get('error', None),
"job_id": jobid,
"job_category": job["category"],
"status": job["status"],
"running": job["is_running"],
"progress": job.get("progress"),
"error": job.get("error", None),
}
@router.get('/pairlists/available',
response_model=PairListsResponse, tags=['pairlists', 'webserver'])
@router.get(
"/pairlists/available", response_model=PairListsResponse, tags=["pairlists", "webserver"]
)
def list_pairlists(config=Depends(get_config)):
from freqtrade.resolvers import PairListResolver
pairlists = PairListResolver.search_all_objects(
config, False)
pairlists = sorted(pairlists, key=lambda x: x['name'])
return {'pairlists': [{
"name": x['name'],
"is_pairlist_generator": x['class'].is_pairlist_generator,
"params": x['class'].available_parameters(),
"description": x['class'].description(),
} for x in pairlists
]}
pairlists = PairListResolver.search_all_objects(config, False)
pairlists = sorted(pairlists, key=lambda x: x["name"])
return {
"pairlists": [
{
"name": x["name"],
"is_pairlist_generator": x["class"].is_pairlist_generator,
"params": x["class"].available_parameters(),
"description": x["class"].description(),
}
for x in pairlists
]
}
def __run_pairlist(job_id: str, config_loc: Config):
try:
ApiBG.jobs[job_id]['is_running'] = True
ApiBG.jobs[job_id]["is_running"] = True
from freqtrade.plugins.pairlistmanager import PairListManager
with FtNoDBContext():
exchange = get_exchange(config_loc)
pairlists = PairListManager(exchange, config_loc)
pairlists.refresh_pairlist()
ApiBG.jobs[job_id]['result'] = {
'method': pairlists.name_list,
'length': len(pairlists.whitelist),
'whitelist': pairlists.whitelist
}
ApiBG.jobs[job_id]['status'] = 'success'
ApiBG.jobs[job_id]["result"] = {
"method": pairlists.name_list,
"length": len(pairlists.whitelist),
"whitelist": pairlists.whitelist,
}
ApiBG.jobs[job_id]["status"] = "success"
except (OperationalException, Exception) as e:
logger.exception(e)
ApiBG.jobs[job_id]['error'] = str(e)
ApiBG.jobs[job_id]['status'] = 'failed'
ApiBG.jobs[job_id]["error"] = str(e)
ApiBG.jobs[job_id]["status"] = "failed"
finally:
ApiBG.jobs[job_id]['is_running'] = False
ApiBG.jobs[job_id]["is_running"] = False
ApiBG.pairlist_running = False
@router.post('/pairlists/evaluate', response_model=BgJobStarted, tags=['pairlists', 'webserver'])
def pairlists_evaluate(payload: PairListsPayload, background_tasks: BackgroundTasks,
config=Depends(get_config)):
@router.post("/pairlists/evaluate", response_model=BgJobStarted, tags=["pairlists", "webserver"])
def pairlists_evaluate(
payload: PairListsPayload, background_tasks: BackgroundTasks, config=Depends(get_config)
):
if ApiBG.pairlist_running:
raise HTTPException(status_code=400, detail='Pairlist evaluation is already running.')
raise HTTPException(status_code=400, detail="Pairlist evaluation is already running.")
config_loc = deepcopy(config)
config_loc['stake_currency'] = payload.stake_currency
config_loc['pairlists'] = payload.pairlists
config_loc["stake_currency"] = payload.stake_currency
config_loc["pairlists"] = payload.pairlists
handleExchangePayload(payload, config_loc)
# TODO: overwrite blacklist? make it optional and fall back to the one in config?
# Outcome depends on the UI approach.
config_loc['exchange']['pair_blacklist'] = payload.blacklist
config_loc["exchange"]["pair_blacklist"] = payload.blacklist
# Random job id
job_id = ApiBG.get_job_id()
ApiBG.jobs[job_id] = {
'category': 'pairlist',
'status': 'pending',
'progress': None,
'is_running': False,
'result': {},
'error': None,
"category": "pairlist",
"status": "pending",
"progress": None,
"is_running": False,
"result": {},
"error": None,
}
background_tasks.add_task(__run_pairlist, job_id, config_loc)
ApiBG.pairlist_running = True
return {
'status': 'Pairlist evaluation started in background.',
'job_id': job_id,
"status": "Pairlist evaluation started in background.",
"job_id": job_id,
}
@ -135,31 +143,35 @@ def handleExchangePayload(payload: ExchangeModePayloadMixin, config_loc: Config)
Updates the configuration with the payload values.
"""
if payload.exchange:
config_loc['exchange']['name'] = payload.exchange
config_loc["exchange"]["name"] = payload.exchange
if payload.trading_mode:
config_loc['trading_mode'] = payload.trading_mode
config_loc['candle_type_def'] = CandleType.get_default(
config_loc.get('trading_mode', 'spot') or 'spot')
config_loc["trading_mode"] = payload.trading_mode
config_loc["candle_type_def"] = CandleType.get_default(
config_loc.get("trading_mode", "spot") or "spot"
)
if payload.margin_mode:
config_loc['margin_mode'] = payload.margin_mode
config_loc["margin_mode"] = payload.margin_mode
@router.get('/pairlists/evaluate/{jobid}', response_model=WhitelistEvaluateResponse,
tags=['pairlists', 'webserver'])
@router.get(
"/pairlists/evaluate/{jobid}",
response_model=WhitelistEvaluateResponse,
tags=["pairlists", "webserver"],
)
def pairlists_evaluate_get(jobid: str):
if not (job := ApiBG.jobs.get(jobid)):
raise HTTPException(status_code=404, detail='Job not found.')
raise HTTPException(status_code=404, detail="Job not found.")
if job['is_running']:
raise HTTPException(status_code=400, detail='Job not finished yet.')
if job["is_running"]:
raise HTTPException(status_code=400, detail="Job not finished yet.")
if error := job['error']:
if error := job["error"]:
return {
'status': 'failed',
'error': error,
"status": "failed",
"error": error,
}
return {
'status': 'success',
'result': job['result'],
"status": "success",
"result": job["result"],
}

View File

@ -49,67 +49,67 @@ def __run_backtest_bg(btconfig: Config):
asyncio.set_event_loop(asyncio.new_event_loop())
try:
# Reload strategy
lastconfig = ApiBG.bt['last_config']
lastconfig = ApiBG.bt["last_config"]
strat = StrategyResolver.load_strategy(btconfig)
validate_config_consistency(btconfig)
if (
not ApiBG.bt['bt']
or lastconfig.get('timeframe') != strat.timeframe
or lastconfig.get('timeframe_detail') != btconfig.get('timeframe_detail')
or lastconfig.get('timerange') != btconfig['timerange']
not ApiBG.bt["bt"]
or lastconfig.get("timeframe") != strat.timeframe
or lastconfig.get("timeframe_detail") != btconfig.get("timeframe_detail")
or lastconfig.get("timerange") != btconfig["timerange"]
):
from freqtrade.optimize.backtesting import Backtesting
ApiBG.bt['bt'] = Backtesting(btconfig)
ApiBG.bt['bt'].load_bt_data_detail()
ApiBG.bt["bt"] = Backtesting(btconfig)
ApiBG.bt["bt"].load_bt_data_detail()
else:
ApiBG.bt['bt'].config = btconfig
ApiBG.bt['bt'].init_backtest()
ApiBG.bt["bt"].config = btconfig
ApiBG.bt["bt"].init_backtest()
# Only reload data if timeframe changed.
if (
not ApiBG.bt['data']
or not ApiBG.bt['timerange']
or lastconfig.get('timeframe') != strat.timeframe
or lastconfig.get('timerange') != btconfig['timerange']
not ApiBG.bt["data"]
or not ApiBG.bt["timerange"]
or lastconfig.get("timeframe") != strat.timeframe
or lastconfig.get("timerange") != btconfig["timerange"]
):
ApiBG.bt['data'], ApiBG.bt['timerange'] = ApiBG.bt[
'bt'].load_bt_data()
ApiBG.bt["data"], ApiBG.bt["timerange"] = ApiBG.bt["bt"].load_bt_data()
lastconfig['timerange'] = btconfig['timerange']
lastconfig['timeframe'] = strat.timeframe
lastconfig['protections'] = btconfig.get('protections', [])
lastconfig['enable_protections'] = btconfig.get('enable_protections')
lastconfig['dry_run_wallet'] = btconfig.get('dry_run_wallet')
lastconfig["timerange"] = btconfig["timerange"]
lastconfig["timeframe"] = strat.timeframe
lastconfig["protections"] = btconfig.get("protections", [])
lastconfig["enable_protections"] = btconfig.get("enable_protections")
lastconfig["dry_run_wallet"] = btconfig.get("dry_run_wallet")
ApiBG.bt['bt'].enable_protections = btconfig.get('enable_protections', False)
ApiBG.bt['bt'].strategylist = [strat]
ApiBG.bt['bt'].results = get_BacktestResultType_default()
ApiBG.bt['bt'].load_prior_backtest()
ApiBG.bt["bt"].enable_protections = btconfig.get("enable_protections", False)
ApiBG.bt["bt"].strategylist = [strat]
ApiBG.bt["bt"].results = get_BacktestResultType_default()
ApiBG.bt["bt"].load_prior_backtest()
ApiBG.bt['bt'].abort = False
ApiBG.bt["bt"].abort = False
strategy_name = strat.get_strategy_name()
if (ApiBG.bt['bt'].results and
strategy_name in ApiBG.bt['bt'].results['strategy']):
if ApiBG.bt["bt"].results and strategy_name in ApiBG.bt["bt"].results["strategy"]:
# When previous result hash matches - reuse that result and skip backtesting.
logger.info(f'Reusing result of previous backtest for {strategy_name}')
logger.info(f"Reusing result of previous backtest for {strategy_name}")
else:
min_date, max_date = ApiBG.bt['bt'].backtest_one_strategy(
strat, ApiBG.bt['data'], ApiBG.bt['timerange'])
min_date, max_date = ApiBG.bt["bt"].backtest_one_strategy(
strat, ApiBG.bt["data"], ApiBG.bt["timerange"]
)
ApiBG.bt['bt'].results = generate_backtest_stats(
ApiBG.bt['data'], ApiBG.bt['bt'].all_results,
min_date=min_date, max_date=max_date)
ApiBG.bt["bt"].results = generate_backtest_stats(
ApiBG.bt["data"], ApiBG.bt["bt"].all_results, min_date=min_date, max_date=max_date
)
if btconfig.get('export', 'none') == 'trades':
combined_res = combined_dataframes_with_rel_mean(ApiBG.bt['data'], min_date, max_date)
if btconfig.get("export", "none") == "trades":
combined_res = combined_dataframes_with_rel_mean(ApiBG.bt["data"], min_date, max_date)
fn = store_backtest_stats(
btconfig['exportfilename'],
ApiBG.bt['bt'].results,
btconfig["exportfilename"],
ApiBG.bt["bt"].results,
datetime.now().strftime("%Y-%m-%d_%H-%M-%S"),
market_change_data=combined_res
)
ApiBG.bt['bt'].results['metadata'][strategy_name]['filename'] = str(fn.stem)
ApiBG.bt['bt'].results['metadata'][strategy_name]['strategy'] = strategy_name
market_change_data=combined_res,
)
ApiBG.bt["bt"].results["metadata"][strategy_name]["filename"] = str(fn.stem)
ApiBG.bt["bt"].results["metadata"][strategy_name]["strategy"] = strategy_name
logger.info("Backtest finished.")
@ -118,38 +118,38 @@ def __run_backtest_bg(btconfig: Config):
except (Exception, OperationalException, DependencyException) as e:
logger.exception(f"Backtesting caused an error: {e}")
ApiBG.bt['bt_error'] = str(e)
ApiBG.bt["bt_error"] = str(e)
finally:
ApiBG.bgtask_running = False
@router.post('/backtest', response_model=BacktestResponse, tags=['webserver', 'backtest'])
@router.post("/backtest", response_model=BacktestResponse, tags=["webserver", "backtest"])
async def api_start_backtest(
bt_settings: BacktestRequest, background_tasks: BackgroundTasks,
config=Depends(get_config)):
ApiBG.bt['bt_error'] = None
bt_settings: BacktestRequest, background_tasks: BackgroundTasks, config=Depends(get_config)
):
ApiBG.bt["bt_error"] = None
"""Start backtesting if not done so already"""
if ApiBG.bgtask_running:
raise RPCException('Bot Background task already running')
raise RPCException("Bot Background task already running")
if ':' in bt_settings.strategy:
if ":" in bt_settings.strategy:
raise HTTPException(status_code=500, detail="base64 encoded strategies are not allowed.")
btconfig = deepcopy(config)
remove_exchange_credentials(btconfig['exchange'], True)
remove_exchange_credentials(btconfig["exchange"], True)
settings = dict(bt_settings)
if settings.get('freqai', None) is not None:
settings['freqai'] = dict(settings['freqai'])
if settings.get("freqai", None) is not None:
settings["freqai"] = dict(settings["freqai"])
# Pydantic models will contain all keys, but non-provided ones are None
btconfig = deep_merge_dicts(settings, btconfig, allow_null_overrides=False)
try:
btconfig['stake_amount'] = float(btconfig['stake_amount'])
btconfig["stake_amount"] = float(btconfig["stake_amount"])
except ValueError:
pass
# Force dry-run for backtesting
btconfig['dry_run'] = True
btconfig["dry_run"] = True
# Start backtesting
# Initialize backtesting object
@ -166,39 +166,41 @@ async def api_start_backtest(
}
@router.get('/backtest', response_model=BacktestResponse, tags=['webserver', 'backtest'])
@router.get("/backtest", response_model=BacktestResponse, tags=["webserver", "backtest"])
def api_get_backtest():
"""
Get backtesting result.
Returns Result after backtesting has been ran.
"""
from freqtrade.persistence import LocalTrade
if ApiBG.bgtask_running:
return {
"status": "running",
"running": True,
"step": (ApiBG.bt['bt'].progress.action if ApiBG.bt['bt']
else str(BacktestState.STARTUP)),
"progress": ApiBG.bt['bt'].progress.progress if ApiBG.bt['bt'] else 0,
"step": (
ApiBG.bt["bt"].progress.action if ApiBG.bt["bt"] else str(BacktestState.STARTUP)
),
"progress": ApiBG.bt["bt"].progress.progress if ApiBG.bt["bt"] else 0,
"trade_count": len(LocalTrade.trades),
"status_msg": "Backtest running",
}
if not ApiBG.bt['bt']:
if not ApiBG.bt["bt"]:
return {
"status": "not_started",
"running": False,
"step": "",
"progress": 0,
"status_msg": "Backtest not yet executed"
"status_msg": "Backtest not yet executed",
}
if ApiBG.bt['bt_error']:
if ApiBG.bt["bt_error"]:
return {
"status": "error",
"running": False,
"step": "",
"progress": 0,
"status_msg": f"Backtest failed with {ApiBG.bt['bt_error']}"
"status_msg": f"Backtest failed with {ApiBG.bt['bt_error']}",
}
return {
@ -207,11 +209,11 @@ def api_get_backtest():
"status_msg": "Backtest ended",
"step": "finished",
"progress": 1,
"backtest_result": ApiBG.bt['bt'].results,
"backtest_result": ApiBG.bt["bt"].results,
}
@router.delete('/backtest', response_model=BacktestResponse, tags=['webserver', 'backtest'])
@router.delete("/backtest", response_model=BacktestResponse, tags=["webserver", "backtest"])
def api_delete_backtest():
"""Reset backtesting"""
if ApiBG.bgtask_running:
@ -222,12 +224,12 @@ def api_delete_backtest():
"progress": 0,
"status_msg": "Backtest running",
}
if ApiBG.bt['bt']:
ApiBG.bt['bt'].cleanup()
del ApiBG.bt['bt']
ApiBG.bt['bt'] = None
del ApiBG.bt['data']
ApiBG.bt['data'] = None
if ApiBG.bt["bt"]:
ApiBG.bt["bt"].cleanup()
del ApiBG.bt["bt"]
ApiBG.bt["bt"] = None
del ApiBG.bt["data"]
ApiBG.bt["data"] = None
logger.info("Backtesting reset")
return {
"status": "reset",
@ -238,7 +240,7 @@ def api_delete_backtest():
}
@router.get('/backtest/abort', response_model=BacktestResponse, tags=['webserver', 'backtest'])
@router.get("/backtest/abort", response_model=BacktestResponse, tags=["webserver", "backtest"])
def api_backtest_abort():
if not ApiBG.bgtask_running:
return {
@ -248,7 +250,7 @@ def api_backtest_abort():
"progress": 0,
"status_msg": "Backtest ended",
}
ApiBG.bt['bt'].abort = True
ApiBG.bt["bt"].abort = True
return {
"status": "stopping",
"running": False,
@ -258,24 +260,26 @@ def api_backtest_abort():
}
@router.get('/backtest/history', response_model=List[BacktestHistoryEntry],
tags=['webserver', 'backtest'])
@router.get(
"/backtest/history", response_model=List[BacktestHistoryEntry], tags=["webserver", "backtest"]
)
def api_backtest_history(config=Depends(get_config)):
# Get backtest result history, read from metadata files
return get_backtest_resultlist(config['user_data_dir'] / 'backtest_results')
return get_backtest_resultlist(config["user_data_dir"] / "backtest_results")
@router.get('/backtest/history/result', response_model=BacktestResponse,
tags=['webserver', 'backtest'])
@router.get(
"/backtest/history/result", response_model=BacktestResponse, tags=["webserver", "backtest"]
)
def api_backtest_history_result(filename: str, strategy: str, config=Depends(get_config)):
# Get backtest result history, read from metadata files
bt_results_base: Path = config['user_data_dir'] / 'backtest_results'
fn = (bt_results_base / filename).with_suffix('.json')
bt_results_base: Path = config["user_data_dir"] / "backtest_results"
fn = (bt_results_base / filename).with_suffix(".json")
results: Dict[str, Any] = {
'metadata': {},
'strategy': {},
'strategy_comparison': [],
"metadata": {},
"strategy": {},
"strategy_comparison": [],
}
if not is_file_in_dir(fn, bt_results_base):
raise HTTPException(status_code=404, detail="File not found.")
@ -290,33 +294,38 @@ def api_backtest_history_result(filename: str, strategy: str, config=Depends(get
}
@router.delete('/backtest/history/{file}', response_model=List[BacktestHistoryEntry],
tags=['webserver', 'backtest'])
@router.delete(
"/backtest/history/{file}",
response_model=List[BacktestHistoryEntry],
tags=["webserver", "backtest"],
)
def api_delete_backtest_history_entry(file: str, config=Depends(get_config)):
# Get backtest result history, read from metadata files
bt_results_base: Path = config['user_data_dir'] / 'backtest_results'
file_abs = (bt_results_base / file).with_suffix('.json')
bt_results_base: Path = config["user_data_dir"] / "backtest_results"
file_abs = (bt_results_base / file).with_suffix(".json")
# Ensure file is in backtest_results directory
if not is_file_in_dir(file_abs, bt_results_base):
raise HTTPException(status_code=404, detail="File not found.")
delete_backtest_result(file_abs)
return get_backtest_resultlist(config['user_data_dir'] / 'backtest_results')
return get_backtest_resultlist(config["user_data_dir"] / "backtest_results")
@router.patch('/backtest/history/{file}', response_model=List[BacktestHistoryEntry],
tags=['webserver', 'backtest'])
def api_update_backtest_history_entry(file: str, body: BacktestMetadataUpdate,
config=Depends(get_config)):
@router.patch(
"/backtest/history/{file}",
response_model=List[BacktestHistoryEntry],
tags=["webserver", "backtest"],
)
def api_update_backtest_history_entry(
file: str, body: BacktestMetadataUpdate, config=Depends(get_config)
):
# Get backtest result history, read from metadata files
bt_results_base: Path = config['user_data_dir'] / 'backtest_results'
file_abs = (bt_results_base / file).with_suffix('.json')
bt_results_base: Path = config["user_data_dir"] / "backtest_results"
file_abs = (bt_results_base / file).with_suffix(".json")
# Ensure file is in backtest_results directory
if not is_file_in_dir(file_abs, bt_results_base):
raise HTTPException(status_code=404, detail="File not found.")
content = {
'notes': body.notes
}
content = {"notes": body.notes}
try:
update_backtest_metadata(file_abs, body.strategy, content)
except ValueError as e:
@ -325,18 +334,21 @@ def api_update_backtest_history_entry(file: str, body: BacktestMetadataUpdate,
return get_backtest_result(file_abs)
@router.get('/backtest/history/{file}/market_change', response_model=BacktestMarketChange,
tags=['webserver', 'backtest'])
@router.get(
"/backtest/history/{file}/market_change",
response_model=BacktestMarketChange,
tags=["webserver", "backtest"],
)
def api_get_backtest_market_change(file: str, config=Depends(get_config)):
bt_results_base: Path = config['user_data_dir'] / 'backtest_results'
file_abs = (bt_results_base / f"{file}_market_change").with_suffix('.feather')
bt_results_base: Path = config["user_data_dir"] / "backtest_results"
file_abs = (bt_results_base / f"{file}_market_change").with_suffix(".feather")
# Ensure file is in backtest_results directory
if not is_file_in_dir(file_abs, bt_results_base):
raise HTTPException(status_code=404, detail="File not found.")
df = get_backtest_market_change(file_abs)
return {
'columns': df.columns.tolist(),
'data': df.values.tolist(),
'length': len(df),
"columns": df.columns.tolist(),
"data": df.values.tolist(),
"length": len(df),
}

View File

@ -381,7 +381,7 @@ class Locks(BaseModel):
class LocksPayload(BaseModel):
pair: str
side: str = '*' # Default to both sides
side: str = "*" # Default to both sides
until: AwareDatetime
reason: Optional[str] = None
@ -561,7 +561,7 @@ class BacktestHistoryEntry(BaseModel):
strategy: str
run_id: str
backtest_start_time: int
notes: Optional[str] = ''
notes: Optional[str] = ""
backtest_start_ts: Optional[int] = None
backtest_end_ts: Optional[int] = None
timeframe: Optional[str] = None
@ -570,7 +570,7 @@ class BacktestHistoryEntry(BaseModel):
class BacktestMetadataUpdate(BaseModel):
strategy: str
notes: str = ''
notes: str = ""
class BacktestMarketChange(BaseModel):

View File

@ -90,80 +90,84 @@ router_public = APIRouter()
router = APIRouter()
@router_public.get('/ping', response_model=Ping)
@router_public.get("/ping", response_model=Ping)
def ping():
"""simple ping"""
return {"status": "pong"}
@router.get('/version', response_model=Version, tags=['info'])
@router.get("/version", response_model=Version, tags=["info"])
def version():
""" Bot Version info"""
"""Bot Version info"""
return {"version": __version__}
@router.get('/balance', response_model=Balances, tags=['info'])
@router.get("/balance", response_model=Balances, tags=["info"])
def balance(rpc: RPC = Depends(get_rpc), config=Depends(get_config)):
"""Account Balances"""
return rpc._rpc_balance(config['stake_currency'], config.get('fiat_display_currency', ''),)
return rpc._rpc_balance(
config["stake_currency"],
config.get("fiat_display_currency", ""),
)
@router.get('/count', response_model=Count, tags=['info'])
@router.get("/count", response_model=Count, tags=["info"])
def count(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_count()
@router.get('/entries', response_model=List[Entry], tags=['info'])
@router.get("/entries", response_model=List[Entry], tags=["info"])
def entries(pair: Optional[str] = None, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_enter_tag_performance(pair)
@router.get('/exits', response_model=List[Exit], tags=['info'])
@router.get("/exits", response_model=List[Exit], tags=["info"])
def exits(pair: Optional[str] = None, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_exit_reason_performance(pair)
@router.get('/mix_tags', response_model=List[MixTag], tags=['info'])
@router.get("/mix_tags", response_model=List[MixTag], tags=["info"])
def mix_tags(pair: Optional[str] = None, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_mix_tag_performance(pair)
@router.get('/performance', response_model=List[PerformanceEntry], tags=['info'])
@router.get("/performance", response_model=List[PerformanceEntry], tags=["info"])
def performance(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_performance()
@router.get('/profit', response_model=Profit, tags=['info'])
@router.get("/profit", response_model=Profit, tags=["info"])
def profit(rpc: RPC = Depends(get_rpc), config=Depends(get_config)):
return rpc._rpc_trade_statistics(config['stake_currency'],
config.get('fiat_display_currency')
)
return rpc._rpc_trade_statistics(config["stake_currency"], config.get("fiat_display_currency"))
@router.get('/stats', response_model=Stats, tags=['info'])
@router.get("/stats", response_model=Stats, tags=["info"])
def stats(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_stats()
@router.get('/daily', response_model=DailyWeeklyMonthly, tags=['info'])
@router.get("/daily", response_model=DailyWeeklyMonthly, tags=["info"])
def daily(timescale: int = 7, rpc: RPC = Depends(get_rpc), config=Depends(get_config)):
return rpc._rpc_timeunit_profit(timescale, config['stake_currency'],
config.get('fiat_display_currency', ''))
return rpc._rpc_timeunit_profit(
timescale, config["stake_currency"], config.get("fiat_display_currency", "")
)
@router.get('/weekly', response_model=DailyWeeklyMonthly, tags=['info'])
@router.get("/weekly", response_model=DailyWeeklyMonthly, tags=["info"])
def weekly(timescale: int = 4, rpc: RPC = Depends(get_rpc), config=Depends(get_config)):
return rpc._rpc_timeunit_profit(timescale, config['stake_currency'],
config.get('fiat_display_currency', ''), 'weeks')
return rpc._rpc_timeunit_profit(
timescale, config["stake_currency"], config.get("fiat_display_currency", ""), "weeks"
)
@router.get('/monthly', response_model=DailyWeeklyMonthly, tags=['info'])
@router.get("/monthly", response_model=DailyWeeklyMonthly, tags=["info"])
def monthly(timescale: int = 3, rpc: RPC = Depends(get_rpc), config=Depends(get_config)):
return rpc._rpc_timeunit_profit(timescale, config['stake_currency'],
config.get('fiat_display_currency', ''), 'months')
return rpc._rpc_timeunit_profit(
timescale, config["stake_currency"], config.get("fiat_display_currency", ""), "months"
)
@router.get('/status', response_model=List[OpenTradeSchema], tags=['info'])
@router.get("/status", response_model=List[OpenTradeSchema], tags=["info"])
def status(rpc: RPC = Depends(get_rpc)):
try:
return rpc._rpc_trade_status()
@ -173,274 +177,305 @@ def status(rpc: RPC = Depends(get_rpc)):
# Using the responsemodel here will cause a ~100% increase in response time (from 1s to 2s)
# on big databases. Correct response model: response_model=TradeResponse,
@router.get('/trades', tags=['info', 'trading'])
@router.get("/trades", tags=["info", "trading"])
def trades(limit: int = 500, offset: int = 0, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_trade_history(limit, offset=offset, order_by_id=True)
@router.get('/trade/{tradeid}', response_model=OpenTradeSchema, tags=['info', 'trading'])
@router.get("/trade/{tradeid}", response_model=OpenTradeSchema, tags=["info", "trading"])
def trade(tradeid: int = 0, rpc: RPC = Depends(get_rpc)):
try:
return rpc._rpc_trade_status([tradeid])[0]
except (RPCException, KeyError):
raise HTTPException(status_code=404, detail='Trade not found.')
raise HTTPException(status_code=404, detail="Trade not found.")
@router.delete('/trades/{tradeid}', response_model=DeleteTrade, tags=['info', 'trading'])
@router.delete("/trades/{tradeid}", response_model=DeleteTrade, tags=["info", "trading"])
def trades_delete(tradeid: int, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_delete(tradeid)
@router.delete('/trades/{tradeid}/open-order', response_model=OpenTradeSchema, tags=['trading'])
@router.delete("/trades/{tradeid}/open-order", response_model=OpenTradeSchema, tags=["trading"])
def trade_cancel_open_order(tradeid: int, rpc: RPC = Depends(get_rpc)):
rpc._rpc_cancel_open_order(tradeid)
return rpc._rpc_trade_status([tradeid])[0]
@router.post('/trades/{tradeid}/reload', response_model=OpenTradeSchema, tags=['trading'])
@router.post("/trades/{tradeid}/reload", response_model=OpenTradeSchema, tags=["trading"])
def trade_reload(tradeid: int, rpc: RPC = Depends(get_rpc)):
rpc._rpc_reload_trade_from_exchange(tradeid)
return rpc._rpc_trade_status([tradeid])[0]
# TODO: Missing response model
@router.get('/edge', tags=['info'])
@router.get("/edge", tags=["info"])
def edge(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_edge()
@router.get('/show_config', response_model=ShowConfig, tags=['info'])
@router.get("/show_config", response_model=ShowConfig, tags=["info"])
def show_config(rpc: Optional[RPC] = Depends(get_rpc_optional), config=Depends(get_config)):
state = ''
state = ""
strategy_version = None
if rpc:
state = rpc._freqtrade.state
strategy_version = rpc._freqtrade.strategy.version()
resp = RPC._rpc_show_config(config, state, strategy_version)
resp['api_version'] = API_VERSION
resp["api_version"] = API_VERSION
return resp
# /forcebuy is deprecated with short addition. use /forceentry instead
@router.post('/forceenter', response_model=ForceEnterResponse, tags=['trading'])
@router.post('/forcebuy', response_model=ForceEnterResponse, tags=['trading'])
@router.post("/forceenter", response_model=ForceEnterResponse, tags=["trading"])
@router.post("/forcebuy", response_model=ForceEnterResponse, tags=["trading"])
def force_entry(payload: ForceEnterPayload, rpc: RPC = Depends(get_rpc)):
ordertype = payload.ordertype.value if payload.ordertype else None
trade = rpc._rpc_force_entry(payload.pair, payload.price, order_side=payload.side,
order_type=ordertype, stake_amount=payload.stakeamount,
enter_tag=payload.entry_tag or 'force_entry',
leverage=payload.leverage)
trade = rpc._rpc_force_entry(
payload.pair,
payload.price,
order_side=payload.side,
order_type=ordertype,
stake_amount=payload.stakeamount,
enter_tag=payload.entry_tag or "force_entry",
leverage=payload.leverage,
)
if trade:
return ForceEnterResponse.model_validate(trade.to_json())
else:
return ForceEnterResponse.model_validate(
{"status": f"Error entering {payload.side} trade for pair {payload.pair}."})
{"status": f"Error entering {payload.side} trade for pair {payload.pair}."}
)
# /forcesell is deprecated with short addition. use /forceexit instead
@router.post('/forceexit', response_model=ResultMsg, tags=['trading'])
@router.post('/forcesell', response_model=ResultMsg, tags=['trading'])
@router.post("/forceexit", response_model=ResultMsg, tags=["trading"])
@router.post("/forcesell", response_model=ResultMsg, tags=["trading"])
def forceexit(payload: ForceExitPayload, rpc: RPC = Depends(get_rpc)):
ordertype = payload.ordertype.value if payload.ordertype else None
return rpc._rpc_force_exit(str(payload.tradeid), ordertype, amount=payload.amount)
@router.get('/blacklist', response_model=BlacklistResponse, tags=['info', 'pairlist'])
@router.get("/blacklist", response_model=BlacklistResponse, tags=["info", "pairlist"])
def blacklist(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_blacklist()
@router.post('/blacklist', response_model=BlacklistResponse, tags=['info', 'pairlist'])
@router.post("/blacklist", response_model=BlacklistResponse, tags=["info", "pairlist"])
def blacklist_post(payload: BlacklistPayload, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_blacklist(payload.blacklist)
@router.delete('/blacklist', response_model=BlacklistResponse, tags=['info', 'pairlist'])
@router.delete("/blacklist", response_model=BlacklistResponse, tags=["info", "pairlist"])
def blacklist_delete(pairs_to_delete: List[str] = Query([]), rpc: RPC = Depends(get_rpc)):
"""Provide a list of pairs to delete from the blacklist"""
return rpc._rpc_blacklist_delete(pairs_to_delete)
@router.get('/whitelist', response_model=WhitelistResponse, tags=['info', 'pairlist'])
@router.get("/whitelist", response_model=WhitelistResponse, tags=["info", "pairlist"])
def whitelist(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_whitelist()
@router.get('/locks', response_model=Locks, tags=['info', 'locks'])
@router.get("/locks", response_model=Locks, tags=["info", "locks"])
def locks(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_locks()
@router.delete('/locks/{lockid}', response_model=Locks, tags=['info', 'locks'])
@router.delete("/locks/{lockid}", response_model=Locks, tags=["info", "locks"])
def delete_lock(lockid: int, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_delete_lock(lockid=lockid)
@router.post('/locks/delete', response_model=Locks, tags=['info', 'locks'])
@router.post("/locks/delete", response_model=Locks, tags=["info", "locks"])
def delete_lock_pair(payload: DeleteLockRequest, rpc: RPC = Depends(get_rpc)):
return rpc._rpc_delete_lock(lockid=payload.lockid, pair=payload.pair)
@router.post('/locks', response_model=Locks, tags=['info', 'locks'])
@router.post("/locks", response_model=Locks, tags=["info", "locks"])
def add_locks(payload: List[LocksPayload], rpc: RPC = Depends(get_rpc)):
for lock in payload:
rpc._rpc_add_lock(lock.pair, lock.until, lock.reason, lock.side)
return rpc._rpc_locks()
@router.get('/logs', response_model=Logs, tags=['info'])
@router.get("/logs", response_model=Logs, tags=["info"])
def logs(limit: Optional[int] = None):
return RPC._rpc_get_logs(limit)
@router.post('/start', response_model=StatusMsg, tags=['botcontrol'])
@router.post("/start", response_model=StatusMsg, tags=["botcontrol"])
def start(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_start()
@router.post('/stop', response_model=StatusMsg, tags=['botcontrol'])
@router.post("/stop", response_model=StatusMsg, tags=["botcontrol"])
def stop(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_stop()
@router.post('/stopentry', response_model=StatusMsg, tags=['botcontrol'])
@router.post('/stopbuy', response_model=StatusMsg, tags=['botcontrol'])
@router.post("/stopentry", response_model=StatusMsg, tags=["botcontrol"])
@router.post("/stopbuy", response_model=StatusMsg, tags=["botcontrol"])
def stop_buy(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_stopentry()
@router.post('/reload_config', response_model=StatusMsg, tags=['botcontrol'])
@router.post("/reload_config", response_model=StatusMsg, tags=["botcontrol"])
def reload_config(rpc: RPC = Depends(get_rpc)):
return rpc._rpc_reload_config()
@router.get('/pair_candles', response_model=PairHistory, tags=['candle data'])
@router.get("/pair_candles", response_model=PairHistory, tags=["candle data"])
def pair_candles(
pair: str, timeframe: str, limit: Optional[int] = None, rpc: RPC = Depends(get_rpc)):
pair: str, timeframe: str, limit: Optional[int] = None, rpc: RPC = Depends(get_rpc)
):
return rpc._rpc_analysed_dataframe(pair, timeframe, limit, None)
@router.post('/pair_candles', response_model=PairHistory, tags=['candle data'])
@router.post("/pair_candles", response_model=PairHistory, tags=["candle data"])
def pair_candles_filtered(payload: PairCandlesRequest, rpc: RPC = Depends(get_rpc)):
# Advanced pair_candles endpoint with column filtering
return rpc._rpc_analysed_dataframe(
payload.pair, payload.timeframe, payload.limit, payload.columns)
payload.pair, payload.timeframe, payload.limit, payload.columns
)
@router.get('/pair_history', response_model=PairHistory, tags=['candle data'])
def pair_history(pair: str, timeframe: str, timerange: str, strategy: str,
freqaimodel: Optional[str] = None,
config=Depends(get_config), exchange=Depends(get_exchange)):
@router.get("/pair_history", response_model=PairHistory, tags=["candle data"])
def pair_history(
pair: str,
timeframe: str,
timerange: str,
strategy: str,
freqaimodel: Optional[str] = None,
config=Depends(get_config),
exchange=Depends(get_exchange),
):
# The initial call to this endpoint can be slow, as it may need to initialize
# the exchange class.
config = deepcopy(config)
config.update({
'strategy': strategy,
'timerange': timerange,
'freqaimodel': freqaimodel if freqaimodel else config.get('freqaimodel'),
})
config.update(
{
"strategy": strategy,
"timerange": timerange,
"freqaimodel": freqaimodel if freqaimodel else config.get("freqaimodel"),
}
)
try:
return RPC._rpc_analysed_history_full(config, pair, timeframe, exchange, None)
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
@router.post('/pair_history', response_model=PairHistory, tags=['candle data'])
def pair_history_filtered(payload: PairHistoryRequest,
config=Depends(get_config), exchange=Depends(get_exchange)):
@router.post("/pair_history", response_model=PairHistory, tags=["candle data"])
def pair_history_filtered(
payload: PairHistoryRequest, config=Depends(get_config), exchange=Depends(get_exchange)
):
# The initial call to this endpoint can be slow, as it may need to initialize
# the exchange class.
config = deepcopy(config)
config.update({
'strategy': payload.strategy,
'timerange': payload.timerange,
'freqaimodel': payload.freqaimodel if payload.freqaimodel else config.get('freqaimodel'),
})
config.update(
{
"strategy": payload.strategy,
"timerange": payload.timerange,
"freqaimodel": payload.freqaimodel
if payload.freqaimodel
else config.get("freqaimodel"),
}
)
try:
return RPC._rpc_analysed_history_full(
config, payload.pair, payload.timeframe, exchange, payload.columns)
config, payload.pair, payload.timeframe, exchange, payload.columns
)
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
@router.get('/plot_config', response_model=PlotConfig, tags=['candle data'])
def plot_config(strategy: Optional[str] = None, config=Depends(get_config),
rpc: Optional[RPC] = Depends(get_rpc_optional)):
@router.get("/plot_config", response_model=PlotConfig, tags=["candle data"])
def plot_config(
strategy: Optional[str] = None,
config=Depends(get_config),
rpc: Optional[RPC] = Depends(get_rpc_optional),
):
if not strategy:
if not rpc:
raise RPCException("Strategy is mandatory in webserver mode.")
return PlotConfig.model_validate(rpc._rpc_plot_config())
else:
config1 = deepcopy(config)
config1.update({
'strategy': strategy
})
config1.update({"strategy": strategy})
try:
return PlotConfig.model_validate(RPC._rpc_plot_config_with_strategy(config1))
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
@router.get('/strategies', response_model=StrategyListResponse, tags=['strategy'])
@router.get("/strategies", response_model=StrategyListResponse, tags=["strategy"])
def list_strategies(config=Depends(get_config)):
from freqtrade.resolvers.strategy_resolver import StrategyResolver
strategies = StrategyResolver.search_all_objects(
config, False, config.get('recursive_strategy_search', False))
strategies = sorted(strategies, key=lambda x: x['name'])
config, False, config.get("recursive_strategy_search", False)
)
strategies = sorted(strategies, key=lambda x: x["name"])
return {'strategies': [x['name'] for x in strategies]}
return {"strategies": [x["name"] for x in strategies]}
@router.get('/strategy/{strategy}', response_model=StrategyResponse, tags=['strategy'])
@router.get("/strategy/{strategy}", response_model=StrategyResponse, tags=["strategy"])
def get_strategy(strategy: str, config=Depends(get_config)):
if ":" in strategy:
raise HTTPException(status_code=500, detail="base64 encoded strategies are not allowed.")
config_ = deepcopy(config)
from freqtrade.resolvers.strategy_resolver import StrategyResolver
try:
strategy_obj = StrategyResolver._load_strategy(strategy, config_,
extra_dir=config_.get('strategy_path'))
strategy_obj = StrategyResolver._load_strategy(
strategy, config_, extra_dir=config_.get("strategy_path")
)
except OperationalException:
raise HTTPException(status_code=404, detail='Strategy not found')
raise HTTPException(status_code=404, detail="Strategy not found")
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
return {
'strategy': strategy_obj.get_strategy_name(),
'code': strategy_obj.__source__,
'timeframe': getattr(strategy_obj, 'timeframe', None),
"strategy": strategy_obj.get_strategy_name(),
"code": strategy_obj.__source__,
"timeframe": getattr(strategy_obj, "timeframe", None),
}
@router.get('/exchanges', response_model=ExchangeListResponse, tags=[])
@router.get("/exchanges", response_model=ExchangeListResponse, tags=[])
def list_exchanges(config=Depends(get_config)):
from freqtrade.exchange import list_available_exchanges
exchanges = list_available_exchanges(config)
return {
'exchanges': exchanges,
"exchanges": exchanges,
}
@router.get('/freqaimodels', response_model=FreqAIModelListResponse, tags=['freqai'])
@router.get("/freqaimodels", response_model=FreqAIModelListResponse, tags=["freqai"])
def list_freqaimodels(config=Depends(get_config)):
from freqtrade.resolvers.freqaimodel_resolver import FreqaiModelResolver
models = FreqaiModelResolver.search_all_objects(
config, False)
models = sorted(models, key=lambda x: x['name'])
return {'freqaimodels': [x['name'] for x in models]}
models = FreqaiModelResolver.search_all_objects(config, False)
models = sorted(models, key=lambda x: x["name"])
return {"freqaimodels": [x["name"] for x in models]}
@router.get('/available_pairs', response_model=AvailablePairs, tags=['candle data'])
def list_available_pairs(timeframe: Optional[str] = None, stake_currency: Optional[str] = None,
candletype: Optional[CandleType] = None, config=Depends(get_config)):
dh = get_datahandler(config['datadir'], config.get('dataformat_ohlcv'))
trading_mode: TradingMode = config.get('trading_mode', TradingMode.SPOT)
pair_interval = dh.ohlcv_get_available_data(config['datadir'], trading_mode)
@router.get("/available_pairs", response_model=AvailablePairs, tags=["candle data"])
def list_available_pairs(
timeframe: Optional[str] = None,
stake_currency: Optional[str] = None,
candletype: Optional[CandleType] = None,
config=Depends(get_config),
):
dh = get_datahandler(config["datadir"], config.get("dataformat_ohlcv"))
trading_mode: TradingMode = config.get("trading_mode", TradingMode.SPOT)
pair_interval = dh.ohlcv_get_available_data(config["datadir"], trading_mode)
if timeframe:
pair_interval = [pair for pair in pair_interval if pair[1] == timeframe]
@ -457,18 +492,18 @@ def list_available_pairs(timeframe: Optional[str] = None, stake_currency: Option
pairs = list({x[0] for x in pair_interval})
pairs.sort()
result = {
'length': len(pairs),
'pairs': pairs,
'pair_interval': pair_interval,
"length": len(pairs),
"pairs": pairs,
"pair_interval": pair_interval,
}
return result
@router.get('/sysinfo', response_model=SysInfo, tags=['info'])
@router.get("/sysinfo", response_model=SysInfo, tags=["info"])
def sysinfo():
return RPC._rpc_sysinfo()
@router.get('/health', response_model=Health, tags=['info'])
@router.get("/health", response_model=Health, tags=["info"])
def health(rpc: RPC = Depends(get_rpc)):
return rpc.health()

View File

@ -37,7 +37,7 @@ async def channel_reader(channel: WebSocketChannel, rpc: RPC):
await _process_consumer_request(message, channel, rpc)
except FreqtradeException:
logger.exception(f"Error processing request from {channel}")
response = WSErrorMessage(data='Error processing request')
response = WSErrorMessage(data="Error processing request")
await channel.send(response.dict(exclude_none=True))
@ -47,23 +47,21 @@ async def channel_broadcaster(channel: WebSocketChannel, message_stream: Message
Iterate over messages in the message stream and send them
"""
async for message, ts in message_stream:
if channel.subscribed_to(message.get('type')):
if channel.subscribed_to(message.get("type")):
# Log a warning if this channel is behind
# on the message stream by a lot
if (time.time() - ts) > 60:
logger.warning(f"Channel {channel} is behind MessageStream by 1 minute,"
" this can cause a memory leak if you see this message"
" often, consider reducing pair list size or amount of"
" consumers.")
logger.warning(
f"Channel {channel} is behind MessageStream by 1 minute,"
" this can cause a memory leak if you see this message"
" often, consider reducing pair list size or amount of"
" consumers."
)
await channel.send(message, timeout=True)
async def _process_consumer_request(
request: Dict[str, Any],
channel: WebSocketChannel,
rpc: RPC
):
async def _process_consumer_request(request: Dict[str, Any], channel: WebSocketChannel, rpc: RPC):
"""
Validate and handle a request from a websocket consumer
"""
@ -102,8 +100,8 @@ async def _process_consumer_request(
elif type_ == RPCRequestType.ANALYZED_DF:
# Limit the amount of candles per dataframe to 'limit' or 1500
limit = int(min(data.get('limit', 1500), 1500)) if data else None
pair = data.get('pair', None) if data else None
limit = int(min(data.get("limit", 1500), 1500)) if data else None
pair = data.get("pair", None) if data else None
# For every pair in the generator, send a separate message
for message in rpc._ws_request_analyzed_df(limit, pair):
@ -117,11 +115,10 @@ async def message_endpoint(
websocket: WebSocket,
token: str = Depends(validate_ws_token),
rpc: RPC = Depends(get_rpc),
message_stream: MessageStream = Depends(get_message_stream)
message_stream: MessageStream = Depends(get_message_stream),
):
if token:
async with create_channel(websocket) as channel:
await channel.run_channel_tasks(
channel_reader(channel, rpc),
channel_broadcaster(channel, message_stream)
channel_reader(channel, rpc), channel_broadcaster(channel, message_stream)
)

View File

@ -20,7 +20,6 @@ def get_rpc_optional() -> Optional[RPC]:
async def get_rpc() -> Optional[AsyncIterator[RPC]]:
_rpc = get_rpc_optional()
if _rpc:
request_id = str(uuid4())
@ -33,7 +32,7 @@ async def get_rpc() -> Optional[AsyncIterator[RPC]]:
_request_id_ctx_var.reset(ctx_token)
else:
raise RPCException('Bot is not in the correct state')
raise RPCException("Bot is not in the correct state")
def get_config() -> Dict[str, Any]:
@ -41,7 +40,7 @@ def get_config() -> Dict[str, Any]:
def get_api_config() -> Dict[str, Any]:
return ApiServer._config['api_server']
return ApiServer._config["api_server"]
def _generate_exchange_key(config: Config) -> str:
@ -55,8 +54,8 @@ def get_exchange(config=Depends(get_config)):
exchange_key = _generate_exchange_key(config)
if not (exchange := ApiBG.exchanges.get(exchange_key)):
from freqtrade.resolvers import ExchangeResolver
exchange = ExchangeResolver.load_exchange(
config, validate=False, load_leverage_tiers=False)
exchange = ExchangeResolver.load_exchange(config, validate=False, load_leverage_tiers=False)
ApiBG.exchanges[exchange_key] = exchange
return exchange
@ -66,7 +65,6 @@ def get_message_stream():
def is_webserver_mode(config=Depends(get_config)):
if config['runmode'] != RunMode.WEBSERVER:
raise HTTPException(status_code=503,
detail='Bot is not in the correct state.')
if config["runmode"] != RunMode.WEBSERVER:
raise HTTPException(status_code=503, detail="Bot is not in the correct state.")
return None

View File

@ -14,6 +14,7 @@ def asyncio_setup() -> None: # pragma: no cover
if sys.version_info >= (3, 8) and sys.platform == "win32":
import asyncio
import selectors
selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)
@ -42,7 +43,6 @@ class UvicornServer(uvicorn.Server):
try:
import uvloop # noqa
except ImportError: # pragma: no cover
asyncio_setup()
else:
asyncio.set_event_loop(uvloop.new_event_loop())
@ -55,7 +55,7 @@ class UvicornServer(uvicorn.Server):
@contextlib.contextmanager
def run_in_thread(self):
self.thread = threading.Thread(target=self.run, name='FTUvicorn')
self.thread = threading.Thread(target=self.run, name="FTUvicorn")
self.thread.start()
while not self.started:
time.sleep(1e-3)

View File

@ -9,20 +9,21 @@ from starlette.responses import FileResponse
router_ui = APIRouter()
@router_ui.get('/favicon.ico', include_in_schema=False)
@router_ui.get("/favicon.ico", include_in_schema=False)
async def favicon():
return FileResponse(str(Path(__file__).parent / 'ui/favicon.ico'))
return FileResponse(str(Path(__file__).parent / "ui/favicon.ico"))
@router_ui.get('/fallback_file.html', include_in_schema=False)
@router_ui.get("/fallback_file.html", include_in_schema=False)
async def fallback():
return FileResponse(str(Path(__file__).parent / 'ui/fallback_file.html'))
return FileResponse(str(Path(__file__).parent / "ui/fallback_file.html"))
@router_ui.get('/ui_version', include_in_schema=False)
@router_ui.get("/ui_version", include_in_schema=False)
async def ui_version():
from freqtrade.commands.deploy_commands import read_ui_version
uibase = Path(__file__).parent / 'ui/installed/'
uibase = Path(__file__).parent / "ui/installed/"
version = read_ui_version(uibase)
return {
@ -40,26 +41,26 @@ def is_relative_to(path: Path, base: Path) -> bool:
return False
@router_ui.get('/{rest_of_path:path}', include_in_schema=False)
@router_ui.get("/{rest_of_path:path}", include_in_schema=False)
async def index_html(rest_of_path: str):
"""
Emulate path fallback to index.html.
"""
if rest_of_path.startswith('api') or rest_of_path.startswith('.'):
if rest_of_path.startswith("api") or rest_of_path.startswith("."):
raise HTTPException(status_code=404, detail="Not Found")
uibase = Path(__file__).parent / 'ui/installed/'
uibase = Path(__file__).parent / "ui/installed/"
filename = uibase / rest_of_path
# It's security relevant to check "relative_to".
# Without this, Directory-traversal is possible.
media_type: Optional[str] = None
if filename.suffix == '.js':
if filename.suffix == ".js":
# Force text/javascript for .js files - Circumvent faulty system configuration
media_type = 'application/javascript'
media_type = "application/javascript"
if filename.is_file() and is_relative_to(filename, uibase):
return FileResponse(str(filename), media_type=media_type)
index_file = uibase / 'index.html'
index_file = uibase / "index.html"
if not index_file.is_file():
return FileResponse(str(uibase.parent / 'fallback_file.html'))
return FileResponse(str(uibase.parent / "fallback_file.html"))
# Fall back to index.html, as indicated by vue router docs
return FileResponse(str(index_file))

View File

@ -32,7 +32,6 @@ class FTJSONResponse(JSONResponse):
class ApiServer(RPCHandler):
__instance = None
__initialized = False
@ -61,13 +60,14 @@ class ApiServer(RPCHandler):
ApiServer.__initialized = True
api_config = self._config['api_server']
api_config = self._config["api_server"]
self.app = FastAPI(title="Freqtrade API",
docs_url='/docs' if api_config.get('enable_openapi', False) else None,
redoc_url=None,
default_response_class=FTJSONResponse,
)
self.app = FastAPI(
title="Freqtrade API",
docs_url="/docs" if api_config.get("enable_openapi", False) else None,
redoc_url=None,
default_response_class=FTJSONResponse,
)
self.configure_app(self.app, self._config)
self.start_api()
@ -80,10 +80,10 @@ class ApiServer(RPCHandler):
ApiServer._has_rpc = True
else:
# This should not happen assuming we didn't mess up.
raise OperationalException('RPC Handler already attached.')
raise OperationalException("RPC Handler already attached.")
def cleanup(self) -> None:
""" Cleanup pending module resources """
"""Cleanup pending module resources"""
ApiServer._has_rpc = False
del ApiServer._rpc
if self._server and not self._standalone:
@ -109,8 +109,7 @@ class ApiServer(RPCHandler):
def handle_rpc_exception(self, request, exc):
logger.error(f"API Error calling: {exc}")
return JSONResponse(
status_code=502,
content={'error': f"Error querying {request.url.path}: {exc.message}"}
status_code=502, content={"error": f"Error querying {request.url.path}: {exc.message}"}
)
def configure_app(self, app: FastAPI, config):
@ -126,38 +125,36 @@ class ApiServer(RPCHandler):
app.include_router(api_v1_public, prefix="/api/v1")
app.include_router(router_login, prefix="/api/v1", tags=["auth"])
app.include_router(api_v1, prefix="/api/v1",
dependencies=[Depends(http_basic_or_jwt_token)],
)
app.include_router(api_backtest, prefix="/api/v1",
dependencies=[Depends(http_basic_or_jwt_token),
Depends(is_webserver_mode)],
)
app.include_router(api_bg_tasks, prefix="/api/v1",
dependencies=[Depends(http_basic_or_jwt_token),
Depends(is_webserver_mode)],
)
app.include_router(
api_v1,
prefix="/api/v1",
dependencies=[Depends(http_basic_or_jwt_token)],
)
app.include_router(
api_backtest,
prefix="/api/v1",
dependencies=[Depends(http_basic_or_jwt_token), Depends(is_webserver_mode)],
)
app.include_router(
api_bg_tasks,
prefix="/api/v1",
dependencies=[Depends(http_basic_or_jwt_token), Depends(is_webserver_mode)],
)
app.include_router(ws_router, prefix="/api/v1")
# UI Router MUST be last!
app.include_router(router_ui, prefix='')
app.include_router(router_ui, prefix="")
app.add_middleware(
CORSMiddleware,
allow_origins=config['api_server'].get('CORS_origins', []),
allow_origins=config["api_server"].get("CORS_origins", []),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_exception_handler(RPCException, self.handle_rpc_exception)
app.add_event_handler(
event_type="startup",
func=self._api_startup_event
)
app.add_event_handler(
event_type="shutdown",
func=self._api_shutdown_event
)
app.add_event_handler(event_type="startup", func=self._api_startup_event)
app.add_event_handler(event_type="shutdown", func=self._api_shutdown_event)
async def _api_startup_event(self):
"""
@ -179,35 +176,43 @@ class ApiServer(RPCHandler):
"""
Start API ... should be run in thread.
"""
rest_ip = self._config['api_server']['listen_ip_address']
rest_port = self._config['api_server']['listen_port']
rest_ip = self._config["api_server"]["listen_ip_address"]
rest_port = self._config["api_server"]["listen_port"]
logger.info(f'Starting HTTP Server at {rest_ip}:{rest_port}')
logger.info(f"Starting HTTP Server at {rest_ip}:{rest_port}")
if not IPv4Address(rest_ip).is_loopback and not running_in_docker():
logger.warning("SECURITY WARNING - Local Rest Server listening to external connections")
logger.warning("SECURITY WARNING - This is insecure please set to your loopback,"
"e.g 127.0.0.1 in config.json")
logger.warning(
"SECURITY WARNING - This is insecure please set to your loopback,"
"e.g 127.0.0.1 in config.json"
)
if not self._config['api_server'].get('password'):
logger.warning("SECURITY WARNING - No password for local REST Server defined. "
"Please make sure that this is intentional!")
if not self._config["api_server"].get("password"):
logger.warning(
"SECURITY WARNING - No password for local REST Server defined. "
"Please make sure that this is intentional!"
)
if (self._config['api_server'].get('jwt_secret_key', 'super-secret')
in ('super-secret, somethingrandom')):
logger.warning("SECURITY WARNING - `jwt_secret_key` seems to be default."
"Others may be able to log into your bot.")
if self._config["api_server"].get("jwt_secret_key", "super-secret") in (
"super-secret, somethingrandom"
):
logger.warning(
"SECURITY WARNING - `jwt_secret_key` seems to be default."
"Others may be able to log into your bot."
)
logger.info('Starting Local Rest Server.')
verbosity = self._config['api_server'].get('verbosity', 'error')
logger.info("Starting Local Rest Server.")
verbosity = self._config["api_server"].get("verbosity", "error")
uvconfig = uvicorn.Config(self.app,
port=rest_port,
host=rest_ip,
use_colors=False,
log_config=None,
access_log=True if verbosity != 'error' else False,
ws_ping_interval=None # We do this explicitly ourselves
)
uvconfig = uvicorn.Config(
self.app,
port=rest_port,
host=rest_ip,
use_colors=False,
log_config=None,
access_log=True if verbosity != "error" else False,
ws_ping_interval=None, # We do this explicitly ourselves
)
try:
self._server = UvicornServer(uvconfig)
if self._standalone:

View File

@ -1,4 +1,3 @@
from typing import Any, Dict, Literal, Optional, TypedDict
from uuid import uuid4
@ -6,7 +5,7 @@ from freqtrade.exchange.exchange import Exchange
class JobsContainer(TypedDict):
category: Literal['pairlist']
category: Literal["pairlist"]
is_running: bool
status: str
progress: Optional[float]
@ -17,11 +16,11 @@ class JobsContainer(TypedDict):
class ApiBG:
# Backtesting type: Backtesting
bt: Dict[str, Any] = {
'bt': None,
'data': None,
'timerange': None,
'last_config': {},
'bt_error': None,
"bt": None,
"data": None,
"timerange": None,
"last_config": {},
"bt_error": None,
}
bgtask_running: bool = False
# Exchange - only available in webserver mode.

View File

@ -25,12 +25,13 @@ class WebSocketChannel:
"""
Object to help facilitate managing a websocket connection
"""
def __init__(
self,
websocket: WebSocketType,
channel_id: Optional[str] = None,
serializer_cls: Type[WebSocketSerializer] = HybridJSONWebSocketSerializer,
send_throttle: float = 0.01
send_throttle: float = 0.01,
):
self.channel_id = channel_id if channel_id else uuid4().hex[:8]
self._websocket = WebSocketProxy(websocket)
@ -79,9 +80,7 @@ class WebSocketChannel:
self._send_high_limit = min(max(self.avg_send_time * 2, 1), 3)
async def send(
self,
message: Union[WSMessageSchemaType, Dict[str, Any]],
timeout: bool = False
self, message: Union[WSMessageSchemaType, Dict[str, Any]], timeout: bool = False
):
"""
Send a message on the wrapped websocket. If the sending
@ -97,8 +96,7 @@ class WebSocketChannel:
# a TimeoutError and bubble up to the
# message_endpoint to close the connection
await asyncio.wait_for(
self._wrapped_ws.send(message),
timeout=self._send_high_limit if timeout else None
self._wrapped_ws.send(message), timeout=self._send_high_limit if timeout else None
)
total_time = time.time() - _
self._send_times.append(total_time)
@ -207,7 +205,7 @@ class WebSocketChannel:
asyncio.TimeoutError,
WebSocketDisconnect,
ConnectionClosed,
RuntimeError
RuntimeError,
):
pass
except Exception as e:
@ -227,10 +225,7 @@ class WebSocketChannel:
@asynccontextmanager
async def create_channel(
websocket: WebSocketType,
**kwargs
) -> AsyncIterator[WebSocketChannel]:
async def create_channel(websocket: WebSocketType, **kwargs) -> AsyncIterator[WebSocketChannel]:
"""
Context manager for safely opening and closing a WebSocketChannel
"""

View File

@ -7,6 +7,7 @@ class MessageStream:
A message stream for consumers to subscribe to,
and for producers to publish to.
"""
def __init__(self):
self._loop = asyncio.get_running_loop()
self._waiter = self._loop.create_future()

View File

@ -46,15 +46,12 @@ class HybridJSONWebSocketSerializer(WebSocketSerializer):
# Support serializing pandas DataFrames
def _json_default(z):
if isinstance(z, DataFrame):
return {
'__type__': 'dataframe',
'__value__': dataframe_to_json(z)
}
return {"__type__": "dataframe", "__value__": dataframe_to_json(z)}
raise TypeError
# Support deserializing JSON to pandas DataFrames
def _json_object_hook(z):
if z.get('__type__') == 'dataframe':
return json_to_dataframe(z.get('__value__'))
if z.get("__type__") == "dataframe":
return json_to_dataframe(z.get("__value__"))
return z

View File

@ -26,7 +26,7 @@ class WSMessageSchemaType(TypedDict):
class WSMessageSchema(BaseArbitraryModel):
type: RPCMessageType
data: Optional[Any] = None
model_config = ConfigDict(extra='allow')
model_config = ConfigDict(extra="allow")
# ------------------------------ REQUEST SCHEMAS ----------------------------
@ -49,6 +49,7 @@ class WSAnalyzedDFRequest(WSRequestSchema):
# ------------------------------ MESSAGE SCHEMAS ----------------------------
class WSWhitelistMessage(WSMessageSchema):
type: RPCMessageType = RPCMessageType.WHITELIST
data: List[str]
@ -68,4 +69,5 @@ class WSErrorMessage(WSMessageSchema):
type: RPCMessageType = RPCMessageType.EXCEPTION
data: str
# --------------------------------------------------------------------------

View File

@ -10,18 +10,18 @@ logger = logging.getLogger(__name__)
class Discord(Webhook):
def __init__(self, rpc: 'RPC', config: Config):
def __init__(self, rpc: "RPC", config: Config):
self._config = config
self.rpc = rpc
self.strategy = config.get('strategy', '')
self.timeframe = config.get('timeframe', '')
self.bot_name = config.get('bot_name', '')
self.strategy = config.get("strategy", "")
self.timeframe = config.get("timeframe", "")
self.bot_name = config.get("bot_name", "")
self._url = config['discord']['webhook_url']
self._format = 'json'
self._url = config["discord"]["webhook_url"]
self._format = "json"
self._retries = 1
self._retry_delay = 0.1
self._timeout = self._config['discord'].get('timeout', 10)
self._timeout = self._config["discord"].get("timeout", 10)
def cleanup(self) -> None:
"""
@ -31,32 +31,31 @@ class Discord(Webhook):
pass
def send_msg(self, msg) -> None:
if (fields := self._config['discord'].get(msg['type'].value)):
if fields := self._config["discord"].get(msg["type"].value):
logger.info(f"Sending discord message: {msg}")
msg['strategy'] = self.strategy
msg['timeframe'] = self.timeframe
msg['bot_name'] = self.bot_name
msg["strategy"] = self.strategy
msg["timeframe"] = self.timeframe
msg["bot_name"] = self.bot_name
color = 0x0000FF
if msg['type'] in (RPCMessageType.EXIT, RPCMessageType.EXIT_FILL):
profit_ratio = msg.get('profit_ratio')
color = (0x00FF00 if profit_ratio > 0 else 0xFF0000)
title = msg['type'].value
if 'pair' in msg:
if msg["type"] in (RPCMessageType.EXIT, RPCMessageType.EXIT_FILL):
profit_ratio = msg.get("profit_ratio")
color = 0x00FF00 if profit_ratio > 0 else 0xFF0000
title = msg["type"].value
if "pair" in msg:
title = f"Trade: {msg['pair']} {msg['type'].value}"
embeds = [{
'title': title,
'color': color,
'fields': [],
}]
embeds = [
{
"title": title,
"color": color,
"fields": [],
}
]
for f in fields:
for k, v in f.items():
v = v.format(**msg)
embeds[0]['fields'].append(
{'name': k, 'value': v, 'inline': True})
embeds[0]["fields"].append({"name": k, "value": v, "inline": True})
# Send the message to discord channel
payload = {'embeds': embeds}
payload = {"embeds": embeds}
self._send_msg(payload)

View File

@ -4,6 +4,7 @@ ExternalMessageConsumer module
Main purpose is to connect to external bot's message websocket to consume data
from it
"""
import asyncio
import logging
import socket
@ -55,11 +56,7 @@ class ExternalMessageConsumer:
other freqtrade bot's
"""
def __init__(
self,
config: Dict[str, Any],
dataprovider: DataProvider
):
def __init__(self, config: Dict[str, Any], dataprovider: DataProvider):
self._config = config
self._dp = dataprovider
@ -69,21 +66,21 @@ class ExternalMessageConsumer:
self._main_task = None
self._sub_tasks = None
self._emc_config = self._config.get('external_message_consumer', {})
self._emc_config = self._config.get("external_message_consumer", {})
self.enabled = self._emc_config.get('enabled', False)
self.producers: List[Producer] = self._emc_config.get('producers', [])
self.enabled = self._emc_config.get("enabled", False)
self.producers: List[Producer] = self._emc_config.get("producers", [])
self.wait_timeout = self._emc_config.get('wait_timeout', 30) # in seconds
self.ping_timeout = self._emc_config.get('ping_timeout', 10) # in seconds
self.sleep_time = self._emc_config.get('sleep_time', 10) # in seconds
self.wait_timeout = self._emc_config.get("wait_timeout", 30) # in seconds
self.ping_timeout = self._emc_config.get("ping_timeout", 10) # in seconds
self.sleep_time = self._emc_config.get("sleep_time", 10) # in seconds
# The amount of candles per dataframe on the initial request
self.initial_candle_limit = self._emc_config.get('initial_candle_limit', 1500)
self.initial_candle_limit = self._emc_config.get("initial_candle_limit", 1500)
# Message size limit, in megabytes. Default 8mb, Use bitwise operator << 20 to convert
# as the websockets client expects bytes.
self.message_size_limit = (self._emc_config.get('message_size_limit', 8) << 20)
self.message_size_limit = self._emc_config.get("message_size_limit", 8) << 20
# Setting these explicitly as they probably shouldn't be changed by a user
# Unless we somehow integrate this with the strategy to allow creating
@ -94,7 +91,7 @@ class ExternalMessageConsumer:
self._initial_requests: List[WSRequestSchema] = [
WSSubscribeRequest(data=self.topics),
WSWhitelistRequest(),
WSAnalyzedDFRequest()
WSAnalyzedDFRequest(),
]
# Specify which function to use for which RPCMessageType
@ -192,31 +189,24 @@ class ExternalMessageConsumer:
"""
while self._running:
try:
host, port = producer['host'], producer['port']
token = producer['ws_token']
name = producer['name']
scheme = 'wss' if producer.get('secure', False) else 'ws'
host, port = producer["host"], producer["port"]
token = producer["ws_token"]
name = producer["name"]
scheme = "wss" if producer.get("secure", False) else "ws"
ws_url = f"{scheme}://{host}:{port}/api/v1/message/ws?token={token}"
# This will raise InvalidURI if the url is bad
async with websockets.connect(
ws_url,
max_size=self.message_size_limit,
ping_interval=None
ws_url, max_size=self.message_size_limit, ping_interval=None
) as ws:
async with create_channel(
ws,
channel_id=name,
send_throttle=0.5
) as channel:
async with create_channel(ws, channel_id=name, send_throttle=0.5) as channel:
# Create the message stream for this channel
self._channel_streams[name] = MessageStream()
# Run the channel tasks while connected
await channel.run_channel_tasks(
self._receive_messages(channel, producer, lock),
self._send_requests(channel, self._channel_streams[name])
self._send_requests(channel, self._channel_streams[name]),
)
except (websockets.exceptions.InvalidURI, ValueError) as e:
@ -227,7 +217,7 @@ class ExternalMessageConsumer:
socket.gaierror,
ConnectionRefusedError,
websockets.exceptions.InvalidStatusCode,
websockets.exceptions.InvalidMessage
websockets.exceptions.InvalidMessage,
) as e:
logger.error(f"Connection Refused - {e} retrying in {self.sleep_time}s")
await asyncio.sleep(self.sleep_time)
@ -235,7 +225,7 @@ class ExternalMessageConsumer:
except (
websockets.exceptions.ConnectionClosedError,
websockets.exceptions.ConnectionClosedOK
websockets.exceptions.ConnectionClosedOK,
):
# Just keep trying to connect again indefinitely
await asyncio.sleep(self.sleep_time)
@ -260,10 +250,7 @@ class ExternalMessageConsumer:
await channel.send(request)
async def _receive_messages(
self,
channel: WebSocketChannel,
producer: Producer,
lock: asyncio.Lock
self, channel: WebSocketChannel, producer: Producer, lock: asyncio.Lock
):
"""
Loop to handle receiving messages from a Producer
@ -274,10 +261,7 @@ class ExternalMessageConsumer:
"""
while self._running:
try:
message = await asyncio.wait_for(
channel.recv(),
timeout=self.wait_timeout
)
message = await asyncio.wait_for(channel.recv(), timeout=self.wait_timeout)
try:
async with lock:
@ -291,7 +275,7 @@ class ExternalMessageConsumer:
try:
# ping
pong = await channel.ping()
latency = (await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000)
latency = await asyncio.wait_for(pong, timeout=self.ping_timeout) * 1000
logger.info(f"Connection to {channel} still alive, latency: {latency}ms")
continue
@ -303,9 +287,7 @@ class ExternalMessageConsumer:
raise
def send_producer_request(
self,
producer_name: str,
request: Union[WSRequestSchema, Dict[str, Any]]
self, producer_name: str, request: Union[WSRequestSchema, Dict[str, Any]]
):
"""
Publish a message to the producer's message stream to be
@ -324,7 +306,7 @@ class ExternalMessageConsumer:
"""
Handles external messages from a Producer
"""
producer_name = producer.get('name', 'default')
producer_name = producer.get("name", "default")
try:
producer_message = WSMessageSchema.model_validate(message)
@ -377,7 +359,7 @@ class ExternalMessageConsumer:
return
# If set, remove the Entry and Exit signals from the Producer
if self._emc_config.get('remove_entry_exit_signals', False):
if self._emc_config.get("remove_entry_exit_signals", False):
df = remove_entry_exit_signals(df)
logger.debug(f"Received {len(df)} candle(s) for {key}")
@ -388,8 +370,8 @@ class ExternalMessageConsumer:
last_analyzed=la,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
)
producer_name=producer_name,
)
if not did_append:
# We want an overlap in candles in case some data has changed
@ -397,20 +379,17 @@ class ExternalMessageConsumer:
# Set to None for all candles if we missed a full df's worth of candles
n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500
logger.warning(f"Holes in data or no existing df, requesting {n_missing} candles "
f"for {key} from `{producer_name}`")
logger.warning(
f"Holes in data or no existing df, requesting {n_missing} candles "
f"for {key} from `{producer_name}`"
)
self.send_producer_request(
producer_name,
WSAnalyzedDFRequest(
data={
"limit": n_missing,
"pair": pair
}
)
producer_name, WSAnalyzedDFRequest(data={"limit": n_missing, "pair": pair})
)
return
logger.debug(
f"Consumed message from `{producer_name}` "
f"of type `RPCMessageType.ANALYZED_DF` for {key}")
f"of type `RPCMessageType.ANALYZED_DF` for {key}"
)

View File

@ -21,14 +21,14 @@ logger = logging.getLogger(__name__)
# Manually map symbol to ID for some common coins
# with duplicate coingecko entries
coingecko_mapping = {
'eth': 'ethereum',
'bnb': 'binancecoin',
'sol': 'solana',
'usdt': 'tether',
'busd': 'binance-usd',
'tusd': 'true-usd',
'usdc': 'usd-coin',
'btc': 'bitcoin'
"eth": "ethereum",
"bnb": "binancecoin",
"sol": "solana",
"usdt": "tether",
"busd": "binance-usd",
"tusd": "true-usd",
"usdc": "usd-coin",
"btc": "bitcoin",
}
@ -38,6 +38,7 @@ class CryptoToFiatConverter(LoggingMixin):
This object contains a list of pair Crypto, FIAT
This object is also a Singleton
"""
__instance = None
_coingecko: CoinGeckoAPI = None
_coinlistings: List[Dict] = []
@ -71,7 +72,8 @@ class CryptoToFiatConverter(LoggingMixin):
except RequestException as request_exception:
if "429" in str(request_exception):
logger.warning(
"Too many requests for CoinGecko API, backing off and trying again later.")
"Too many requests for CoinGecko API, backing off and trying again later."
)
# Set backoff timestamp to 60 seconds in the future
self._backoff = datetime.now().timestamp() + 60
return
@ -80,9 +82,10 @@ class CryptoToFiatConverter(LoggingMixin):
"Could not load FIAT Cryptocurrency map for the following problem: "
f"{request_exception}"
)
except (Exception) as exception:
except Exception as exception:
logger.error(
f"Could not load FIAT Cryptocurrency map for the following problem: {exception}")
f"Could not load FIAT Cryptocurrency map for the following problem: {exception}"
)
def _get_gecko_id(self, crypto_symbol):
if not self._coinlistings:
@ -93,13 +96,13 @@ class CryptoToFiatConverter(LoggingMixin):
return None
else:
return None
found = [x for x in self._coinlistings if x['symbol'].lower() == crypto_symbol]
found = [x for x in self._coinlistings if x["symbol"].lower() == crypto_symbol]
if crypto_symbol in coingecko_mapping.keys():
found = [x for x in self._coinlistings if x['id'] == coingecko_mapping[crypto_symbol]]
found = [x for x in self._coinlistings if x["id"] == coingecko_mapping[crypto_symbol]]
if len(found) == 1:
return found[0]['id']
return found[0]["id"]
if len(found) > 0:
# Wrong!
@ -130,26 +133,23 @@ class CryptoToFiatConverter(LoggingMixin):
fiat_symbol = fiat_symbol.lower()
inverse = False
if crypto_symbol == 'usd':
if crypto_symbol == "usd":
# usd corresponds to "uniswap-state-dollar" for coingecko.
# We'll therefore need to "swap" the currencies
logger.info(f"reversing Rates {crypto_symbol}, {fiat_symbol}")
crypto_symbol = fiat_symbol
fiat_symbol = 'usd'
fiat_symbol = "usd"
inverse = True
symbol = f"{crypto_symbol}/{fiat_symbol}"
# Check if the fiat conversion you want is supported
if not self._is_supported_fiat(fiat=fiat_symbol):
raise ValueError(f'The fiat {fiat_symbol} is not supported.')
raise ValueError(f"The fiat {fiat_symbol} is not supported.")
price = self._pair_price.get(symbol, None)
if not price:
price = self._find_price(
crypto_symbol=crypto_symbol,
fiat_symbol=fiat_symbol
)
price = self._find_price(crypto_symbol=crypto_symbol, fiat_symbol=fiat_symbol)
if inverse and price != 0.0:
price = 1 / price
self._pair_price[symbol] = price
@ -174,7 +174,7 @@ class CryptoToFiatConverter(LoggingMixin):
"""
# Check if the fiat conversion you want is supported
if not self._is_supported_fiat(fiat=fiat_symbol):
raise ValueError(f'The fiat {fiat_symbol} is not supported.')
raise ValueError(f"The fiat {fiat_symbol} is not supported.")
# No need to convert if both crypto and fiat are the same
if crypto_symbol == fiat_symbol:
@ -185,16 +185,15 @@ class CryptoToFiatConverter(LoggingMixin):
if not _gecko_id:
# return 0 for unsupported stake currencies (fiat-convert should not break the bot)
self.log_once(
f"unsupported crypto-symbol {crypto_symbol.upper()} - returning 0.0",
logger.warning)
f"unsupported crypto-symbol {crypto_symbol.upper()} - returning 0.0", logger.warning
)
return 0.0
try:
return float(
self._coingecko.get_price(
ids=_gecko_id,
vs_currencies=fiat_symbol
)[_gecko_id][fiat_symbol]
self._coingecko.get_price(ids=_gecko_id, vs_currencies=fiat_symbol)[_gecko_id][
fiat_symbol
]
)
except Exception as exception:
logger.error("Error in _find_price: %s", exception)

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,7 @@
"""
This module contains class to manage RPC communications (Telegram, API, ...)
"""
import logging
from collections import deque
from typing import List
@ -20,42 +21,46 @@ class RPCManager:
"""
def __init__(self, freqtrade) -> None:
""" Initializes all enabled rpc modules """
"""Initializes all enabled rpc modules"""
self.registered_modules: List[RPCHandler] = []
self._rpc = RPC(freqtrade)
config = freqtrade.config
# Enable telegram
if config.get('telegram', {}).get('enabled', False):
logger.info('Enabling rpc.telegram ...')
if config.get("telegram", {}).get("enabled", False):
logger.info("Enabling rpc.telegram ...")
from freqtrade.rpc.telegram import Telegram
self.registered_modules.append(Telegram(self._rpc, config))
# Enable discord
if config.get('discord', {}).get('enabled', False):
logger.info('Enabling rpc.discord ...')
if config.get("discord", {}).get("enabled", False):
logger.info("Enabling rpc.discord ...")
from freqtrade.rpc.discord import Discord
self.registered_modules.append(Discord(self._rpc, config))
# Enable Webhook
if config.get('webhook', {}).get('enabled', False):
logger.info('Enabling rpc.webhook ...')
if config.get("webhook", {}).get("enabled", False):
logger.info("Enabling rpc.webhook ...")
from freqtrade.rpc.webhook import Webhook
self.registered_modules.append(Webhook(self._rpc, config))
# Enable local rest api server for cmd line control
if config.get('api_server', {}).get('enabled', False):
logger.info('Enabling rpc.api_server')
if config.get("api_server", {}).get("enabled", False):
logger.info("Enabling rpc.api_server")
from freqtrade.rpc.api_server import ApiServer
apiserver = ApiServer(config)
apiserver.add_rpc_handler(self._rpc)
self.registered_modules.append(apiserver)
def cleanup(self) -> None:
""" Stops all enabled rpc modules """
logger.info('Cleaning up rpc modules ...')
"""Stops all enabled rpc modules"""
logger.info("Cleaning up rpc modules ...")
while self.registered_modules:
mod = self.registered_modules.pop()
logger.info('Cleaning up rpc.%s ...', mod.name)
logger.info("Cleaning up rpc.%s ...", mod.name)
mod.cleanup()
del mod
@ -68,16 +73,16 @@ class RPCManager:
'status': 'stopping bot'
}
"""
if msg.get('type') not in NO_ECHO_MESSAGES:
logger.info('Sending rpc message: %s', msg)
if msg.get("type") not in NO_ECHO_MESSAGES:
logger.info("Sending rpc message: %s", msg)
for mod in self.registered_modules:
logger.debug('Forwarding message to rpc.%s', mod.name)
logger.debug("Forwarding message to rpc.%s", mod.name)
try:
mod.send_msg(msg)
except NotImplementedError:
logger.error(f"Message type '{msg['type']}' not implemented by handler {mod.name}.")
except Exception:
logger.exception('Exception occurred within RPC module %s', mod.name)
logger.exception("Exception occurred within RPC module %s", mod.name)
def process_msg_queue(self, queue: deque) -> None:
"""
@ -85,47 +90,54 @@ class RPCManager:
"""
while queue:
msg = queue.popleft()
logger.info('Sending rpc strategy_msg: %s', msg)
logger.info("Sending rpc strategy_msg: %s", msg)
for mod in self.registered_modules:
if mod._config.get(mod.name, {}).get('allow_custom_messages', False):
mod.send_msg({
'type': RPCMessageType.STRATEGY_MSG,
'msg': msg,
})
if mod._config.get(mod.name, {}).get("allow_custom_messages", False):
mod.send_msg(
{
"type": RPCMessageType.STRATEGY_MSG,
"msg": msg,
}
)
def startup_messages(self, config: Config, pairlist, protections) -> None:
if config['dry_run']:
self.send_msg({
'type': RPCMessageType.WARNING,
'status': 'Dry run is enabled. All trades are simulated.'
})
stake_currency = config['stake_currency']
stake_amount = config['stake_amount']
minimal_roi = config['minimal_roi']
stoploss = config['stoploss']
trailing_stop = config['trailing_stop']
timeframe = config['timeframe']
exchange_name = config['exchange']['name']
strategy_name = config.get('strategy', '')
pos_adjust_enabled = 'On' if config['position_adjustment_enable'] else 'Off'
self.send_msg({
'type': RPCMessageType.STARTUP,
'status': f'*Exchange:* `{exchange_name}`\n'
f'*Stake per trade:* `{stake_amount} {stake_currency}`\n'
f'*Minimum ROI:* `{minimal_roi}`\n'
f'*{"Trailing " if trailing_stop else ""}Stoploss:* `{stoploss}`\n'
f'*Position adjustment:* `{pos_adjust_enabled}`\n'
f'*Timeframe:* `{timeframe}`\n'
f'*Strategy:* `{strategy_name}`'
})
self.send_msg({
'type': RPCMessageType.STARTUP,
'status': f'Searching for {stake_currency} pairs to buy and sell '
f'based on {pairlist.short_desc()}'
})
if config["dry_run"]:
self.send_msg(
{
"type": RPCMessageType.WARNING,
"status": "Dry run is enabled. All trades are simulated.",
}
)
stake_currency = config["stake_currency"]
stake_amount = config["stake_amount"]
minimal_roi = config["minimal_roi"]
stoploss = config["stoploss"]
trailing_stop = config["trailing_stop"]
timeframe = config["timeframe"]
exchange_name = config["exchange"]["name"]
strategy_name = config.get("strategy", "")
pos_adjust_enabled = "On" if config["position_adjustment_enable"] else "Off"
self.send_msg(
{
"type": RPCMessageType.STARTUP,
"status": f'*Exchange:* `{exchange_name}`\n'
f'*Stake per trade:* `{stake_amount} {stake_currency}`\n'
f'*Minimum ROI:* `{minimal_roi}`\n'
f'*{"Trailing " if trailing_stop else ""}Stoploss:* `{stoploss}`\n'
f'*Position adjustment:* `{pos_adjust_enabled}`\n'
f'*Timeframe:* `{timeframe}`\n'
f'*Strategy:* `{strategy_name}`',
}
)
self.send_msg(
{
"type": RPCMessageType.STARTUP,
"status": f"Searching for {stake_currency} pairs to buy and sell "
f"based on {pairlist.short_desc()}",
}
)
if len(protections.name_list) > 0:
prots = '\n'.join([p for prot in protections.short_desc() for k, p in prot.items()])
self.send_msg({
'type': RPCMessageType.STARTUP,
'status': f'Using Protections: \n{prots}'
})
prots = "\n".join([p for prot in protections.short_desc() for k, p in prot.items()])
self.send_msg(
{"type": RPCMessageType.STARTUP, "status": f"Using Protections: \n{prots}"}
)

View File

@ -15,12 +15,14 @@ class RPCSendMsgBase(TypedDict):
class RPCStatusMsg(RPCSendMsgBase):
"""Used for Status, Startup and Warning messages"""
type: Literal[RPCMessageType.STATUS, RPCMessageType.STARTUP, RPCMessageType.WARNING]
status: str
class RPCStrategyMsg(RPCSendMsgBase):
"""Used for Status, Startup and Warning messages"""
type: Literal[RPCMessageType.STRATEGY_MSG]
msg: str
@ -108,12 +110,14 @@ class _AnalyzedDFData(TypedDict):
class RPCAnalyzedDFMsg(RPCSendMsgBase):
"""New Analyzed dataframe message"""
type: Literal[RPCMessageType.ANALYZED_DF]
data: _AnalyzedDFData
class RPCNewCandleMsg(RPCSendMsgBase):
"""New candle ping message, issued once per new candle/pair"""
type: Literal[RPCMessageType.NEW_CANDLE]
data: PairWithTimeframe
@ -131,5 +135,5 @@ RPCSendMsg = Union[
RPCExitMsg,
RPCExitCancelMsg,
RPCAnalyzedDFMsg,
RPCNewCandleMsg
]
RPCNewCandleMsg,
]

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,7 @@
"""
This module manages webhook communication
"""
import logging
import time
from typing import Any, Dict, Optional
@ -15,11 +16,11 @@ from freqtrade.rpc.rpc_types import RPCSendMsg
logger = logging.getLogger(__name__)
logger.debug('Included module rpc.webhook ...')
logger.debug("Included module rpc.webhook ...")
class Webhook(RPCHandler):
""" This class handles all webhook communication """
"""This class handles all webhook communication"""
def __init__(self, rpc: RPC, config: Config) -> None:
"""
@ -30,11 +31,11 @@ class Webhook(RPCHandler):
"""
super().__init__(rpc, config)
self._url = self._config['webhook']['url']
self._format = self._config['webhook'].get('format', 'form')
self._retries = self._config['webhook'].get('retries', 0)
self._retry_delay = self._config['webhook'].get('retry_delay', 0.1)
self._timeout = self._config['webhook'].get('timeout', 10)
self._url = self._config["webhook"]["url"]
self._format = self._config["webhook"].get("format", "form")
self._retries = self._config["webhook"].get("retries", 0)
self._retry_delay = self._config["webhook"].get("retry_delay", 0.1)
self._timeout = self._config["webhook"].get("timeout", 10)
def cleanup(self) -> None:
"""
@ -44,54 +45,59 @@ class Webhook(RPCHandler):
pass
def _get_value_dict(self, msg: RPCSendMsg) -> Optional[Dict[str, Any]]:
whconfig = self._config['webhook']
if msg['type'].value in whconfig:
whconfig = self._config["webhook"]
if msg["type"].value in whconfig:
# Explicit types should have priority
valuedict = whconfig.get(msg['type'].value)
valuedict = whconfig.get(msg["type"].value)
# Deprecated 2022.10 - only keep generic method.
elif msg['type'] in [RPCMessageType.ENTRY]:
valuedict = whconfig.get('webhookentry')
elif msg['type'] in [RPCMessageType.ENTRY_CANCEL]:
valuedict = whconfig.get('webhookentrycancel')
elif msg['type'] in [RPCMessageType.ENTRY_FILL]:
valuedict = whconfig.get('webhookentryfill')
elif msg['type'] == RPCMessageType.EXIT:
valuedict = whconfig.get('webhookexit')
elif msg['type'] == RPCMessageType.EXIT_FILL:
valuedict = whconfig.get('webhookexitfill')
elif msg['type'] == RPCMessageType.EXIT_CANCEL:
valuedict = whconfig.get('webhookexitcancel')
elif msg['type'] in (RPCMessageType.STATUS,
RPCMessageType.STARTUP,
RPCMessageType.EXCEPTION,
RPCMessageType.WARNING):
valuedict = whconfig.get('webhookstatus')
elif msg['type'] in (
RPCMessageType.PROTECTION_TRIGGER,
RPCMessageType.PROTECTION_TRIGGER_GLOBAL,
RPCMessageType.WHITELIST,
RPCMessageType.ANALYZED_DF,
RPCMessageType.NEW_CANDLE,
RPCMessageType.STRATEGY_MSG):
elif msg["type"] in [RPCMessageType.ENTRY]:
valuedict = whconfig.get("webhookentry")
elif msg["type"] in [RPCMessageType.ENTRY_CANCEL]:
valuedict = whconfig.get("webhookentrycancel")
elif msg["type"] in [RPCMessageType.ENTRY_FILL]:
valuedict = whconfig.get("webhookentryfill")
elif msg["type"] == RPCMessageType.EXIT:
valuedict = whconfig.get("webhookexit")
elif msg["type"] == RPCMessageType.EXIT_FILL:
valuedict = whconfig.get("webhookexitfill")
elif msg["type"] == RPCMessageType.EXIT_CANCEL:
valuedict = whconfig.get("webhookexitcancel")
elif msg["type"] in (
RPCMessageType.STATUS,
RPCMessageType.STARTUP,
RPCMessageType.EXCEPTION,
RPCMessageType.WARNING,
):
valuedict = whconfig.get("webhookstatus")
elif msg["type"] in (
RPCMessageType.PROTECTION_TRIGGER,
RPCMessageType.PROTECTION_TRIGGER_GLOBAL,
RPCMessageType.WHITELIST,
RPCMessageType.ANALYZED_DF,
RPCMessageType.NEW_CANDLE,
RPCMessageType.STRATEGY_MSG,
):
# Don't fail for non-implemented types
return None
return valuedict
def send_msg(self, msg: RPCSendMsg) -> None:
""" Send a message to telegram channel """
"""Send a message to telegram channel"""
try:
valuedict = self._get_value_dict(msg)
if not valuedict:
logger.debug("Message type '%s' not configured for webhooks", msg['type'])
logger.debug("Message type '%s' not configured for webhooks", msg["type"])
return
payload = {key: value.format(**msg) for (key, value) in valuedict.items()}
self._send_msg(payload)
except KeyError as exc:
logger.exception("Problem calling Webhook. Please check your webhook configuration. "
"Exception: %s", exc)
logger.exception(
"Problem calling Webhook. Please check your webhook configuration. "
"Exception: %s",
exc,
)
def _send_msg(self, payload: dict) -> None:
"""do the actual call to the webhook"""
@ -107,16 +113,19 @@ class Webhook(RPCHandler):
attempts += 1
try:
if self._format == 'form':
if self._format == "form":
response = post(self._url, data=payload, timeout=self._timeout)
elif self._format == 'json':
elif self._format == "json":
response = post(self._url, json=payload, timeout=self._timeout)
elif self._format == 'raw':
response = post(self._url, data=payload['data'],
headers={'Content-Type': 'text/plain'},
timeout=self._timeout)
elif self._format == "raw":
response = post(
self._url,
data=payload["data"],
headers={"Content-Type": "text/plain"},
timeout=self._timeout,
)
else:
raise NotImplementedError(f'Unknown format: {self._format}')
raise NotImplementedError(f"Unknown format: {self._format}")
# Throw a RequestException if the post was not successful
response.raise_for_status()