mirror of
https://github.com/freqtrade/freqtrade.git
synced 2024-11-16 05:03:55 +00:00
89 lines
2.8 KiB
Python
89 lines
2.8 KiB
Python
import logging
|
|
from functools import reduce
|
|
from typing import Dict
|
|
|
|
import talib.abstract as ta
|
|
from pandas import DataFrame
|
|
|
|
from freqtrade.strategy import IStrategy
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class freqai_rl_test_strat(IStrategy):
|
|
"""
|
|
Test strategy - used for testing freqAI functionalities.
|
|
DO not use in production.
|
|
"""
|
|
|
|
minimal_roi = {"0": 0.1, "240": -1}
|
|
|
|
process_only_new_candles = True
|
|
stoploss = -0.05
|
|
use_exit_signal = True
|
|
startup_candle_count: int = 300
|
|
can_short = False
|
|
|
|
def feature_engineering_expand_all(
|
|
self, dataframe: DataFrame, period: int, metadata: Dict, **kwargs
|
|
):
|
|
dataframe["%-rsi-period"] = ta.RSI(dataframe, timeperiod=period)
|
|
|
|
return dataframe
|
|
|
|
def feature_engineering_expand_basic(self, dataframe: DataFrame, metadata: Dict, **kwargs):
|
|
dataframe["%-pct-change"] = dataframe["close"].pct_change()
|
|
dataframe["%-raw_volume"] = dataframe["volume"]
|
|
|
|
return dataframe
|
|
|
|
def feature_engineering_standard(self, dataframe: DataFrame, metadata: Dict, **kwargs):
|
|
dataframe["%-day_of_week"] = dataframe["date"].dt.dayofweek
|
|
dataframe["%-hour_of_day"] = dataframe["date"].dt.hour
|
|
|
|
dataframe["%-raw_close"] = dataframe["close"]
|
|
dataframe["%-raw_open"] = dataframe["open"]
|
|
dataframe["%-raw_high"] = dataframe["high"]
|
|
dataframe["%-raw_low"] = dataframe["low"]
|
|
|
|
return dataframe
|
|
|
|
def set_freqai_targets(self, dataframe: DataFrame, metadata: Dict, **kwargs):
|
|
dataframe["&-action"] = 0
|
|
|
|
return dataframe
|
|
|
|
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
|
|
dataframe = self.freqai.start(dataframe, metadata, self)
|
|
|
|
return dataframe
|
|
|
|
def populate_entry_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
|
|
enter_long_conditions = [df["do_predict"] == 1, df["&-action"] == 1]
|
|
|
|
if enter_long_conditions:
|
|
df.loc[
|
|
reduce(lambda x, y: x & y, enter_long_conditions), ["enter_long", "enter_tag"]
|
|
] = (1, "long")
|
|
|
|
enter_short_conditions = [df["do_predict"] == 1, df["&-action"] == 3]
|
|
|
|
if enter_short_conditions:
|
|
df.loc[
|
|
reduce(lambda x, y: x & y, enter_short_conditions), ["enter_short", "enter_tag"]
|
|
] = (1, "short")
|
|
|
|
return df
|
|
|
|
def populate_exit_trend(self, df: DataFrame, metadata: dict) -> DataFrame:
|
|
exit_long_conditions = [df["do_predict"] == 1, df["&-action"] == 2]
|
|
if exit_long_conditions:
|
|
df.loc[reduce(lambda x, y: x & y, exit_long_conditions), "exit_long"] = 1
|
|
|
|
exit_short_conditions = [df["do_predict"] == 1, df["&-action"] == 4]
|
|
if exit_short_conditions:
|
|
df.loc[reduce(lambda x, y: x & y, exit_short_conditions), "exit_short"] = 1
|
|
|
|
return df
|