ruff format: update strategy tests

This commit is contained in:
Matthias 2024-05-12 15:45:55 +02:00
parent 1cbd49fd4e
commit adeb93dc9c
4 changed files with 952 additions and 835 deletions

View File

@ -9,22 +9,25 @@ from .strats.strategy_test_v3 import StrategyTestV3
def test_strategy_test_v3_structure(): def test_strategy_test_v3_structure():
assert hasattr(StrategyTestV3, 'minimal_roi') assert hasattr(StrategyTestV3, "minimal_roi")
assert hasattr(StrategyTestV3, 'stoploss') assert hasattr(StrategyTestV3, "stoploss")
assert hasattr(StrategyTestV3, 'timeframe') assert hasattr(StrategyTestV3, "timeframe")
assert hasattr(StrategyTestV3, 'populate_indicators') assert hasattr(StrategyTestV3, "populate_indicators")
assert hasattr(StrategyTestV3, 'populate_entry_trend') assert hasattr(StrategyTestV3, "populate_entry_trend")
assert hasattr(StrategyTestV3, 'populate_exit_trend') assert hasattr(StrategyTestV3, "populate_exit_trend")
@pytest.mark.parametrize('is_short,side', [ @pytest.mark.parametrize(
(True, 'short'), "is_short,side",
(False, 'long'), [
]) (True, "short"),
(False, "long"),
],
)
def test_strategy_test_v3(dataframe_1m, fee, is_short, side): def test_strategy_test_v3(dataframe_1m, fee, is_short, side):
strategy = StrategyTestV3({}) strategy = StrategyTestV3({})
metadata = {'pair': 'ETH/BTC'} metadata = {"pair": "ETH/BTC"}
assert isinstance(strategy.minimal_roi, dict) assert isinstance(strategy.minimal_roi, dict)
assert isinstance(strategy.stoploss, float) assert isinstance(strategy.stoploss, float)
assert isinstance(strategy.timeframe, str) assert isinstance(strategy.timeframe, str)
@ -34,23 +37,46 @@ def test_strategy_test_v3(dataframe_1m, fee, is_short, side):
assert isinstance(strategy.populate_sell_trend(indicators, metadata), DataFrame) assert isinstance(strategy.populate_sell_trend(indicators, metadata), DataFrame)
trade = Trade( trade = Trade(
open_rate=19_000, open_rate=19_000, amount=0.1, pair="ETH/BTC", fee_open=fee.return_value, is_short=is_short
amount=0.1,
pair='ETH/BTC',
fee_open=fee.return_value,
is_short=is_short
) )
assert strategy.confirm_trade_entry(pair='ETH/BTC', order_type='limit', amount=0.1, assert (
rate=20000, time_in_force='gtc', strategy.confirm_trade_entry(
pair="ETH/BTC",
order_type="limit",
amount=0.1,
rate=20000,
time_in_force="gtc",
current_time=datetime.now(timezone.utc), current_time=datetime.now(timezone.utc),
side=side, entry_tag=None) is True side=side,
assert strategy.confirm_trade_exit(pair='ETH/BTC', trade=trade, order_type='limit', amount=0.1, entry_tag=None,
rate=20000, time_in_force='gtc', exit_reason='roi', )
sell_reason='roi', is True
)
assert (
strategy.confirm_trade_exit(
pair="ETH/BTC",
trade=trade,
order_type="limit",
amount=0.1,
rate=20000,
time_in_force="gtc",
exit_reason="roi",
sell_reason="roi",
current_time=datetime.now(timezone.utc), current_time=datetime.now(timezone.utc),
side=side) is True side=side,
)
is True
)
assert strategy.custom_stoploss(pair='ETH/BTC', trade=trade, current_time=datetime.now(), assert (
current_rate=20_000, current_profit=0.05, after_fill=False strategy.custom_stoploss(
) == strategy.stoploss pair="ETH/BTC",
trade=trade,
current_time=datetime.now(),
current_rate=20_000,
current_profit=0.05,
after_fill=False,
)
== strategy.stoploss
)

File diff suppressed because it is too large Load Diff

View File

@ -10,217 +10,221 @@ from tests.conftest import generate_test_data, get_patched_exchange
def test_merge_informative_pair(): def test_merge_informative_pair():
data = generate_test_data('15m', 40) data = generate_test_data("15m", 40)
informative = generate_test_data('1h', 40) informative = generate_test_data("1h", 40)
cols_inf = list(informative.columns) cols_inf = list(informative.columns)
result = merge_informative_pair(data, informative, '15m', '1h', ffill=True) result = merge_informative_pair(data, informative, "15m", "1h", ffill=True)
assert isinstance(result, pd.DataFrame) assert isinstance(result, pd.DataFrame)
assert list(informative.columns) == cols_inf assert list(informative.columns) == cols_inf
assert len(result) == len(data) assert len(result) == len(data)
assert 'date' in result.columns assert "date" in result.columns
assert result['date'].equals(data['date']) assert result["date"].equals(data["date"])
assert 'date_1h' in result.columns assert "date_1h" in result.columns
assert 'open' in result.columns assert "open" in result.columns
assert 'open_1h' in result.columns assert "open_1h" in result.columns
assert result['open'].equals(data['open']) assert result["open"].equals(data["open"])
assert 'close' in result.columns assert "close" in result.columns
assert 'close_1h' in result.columns assert "close_1h" in result.columns
assert result['close'].equals(data['close']) assert result["close"].equals(data["close"])
assert 'volume' in result.columns assert "volume" in result.columns
assert 'volume_1h' in result.columns assert "volume_1h" in result.columns
assert result['volume'].equals(data['volume']) assert result["volume"].equals(data["volume"])
# First 3 rows are empty # First 3 rows are empty
assert result.iloc[0]['date_1h'] is pd.NaT assert result.iloc[0]["date_1h"] is pd.NaT
assert result.iloc[1]['date_1h'] is pd.NaT assert result.iloc[1]["date_1h"] is pd.NaT
assert result.iloc[2]['date_1h'] is pd.NaT assert result.iloc[2]["date_1h"] is pd.NaT
# Next 4 rows contain the starting date (0:00) # Next 4 rows contain the starting date (0:00)
assert result.iloc[3]['date_1h'] == result.iloc[0]['date'] assert result.iloc[3]["date_1h"] == result.iloc[0]["date"]
assert result.iloc[4]['date_1h'] == result.iloc[0]['date'] assert result.iloc[4]["date_1h"] == result.iloc[0]["date"]
assert result.iloc[5]['date_1h'] == result.iloc[0]['date'] assert result.iloc[5]["date_1h"] == result.iloc[0]["date"]
assert result.iloc[6]['date_1h'] == result.iloc[0]['date'] assert result.iloc[6]["date_1h"] == result.iloc[0]["date"]
# Next 4 rows contain the next Hourly date original date row 4 # Next 4 rows contain the next Hourly date original date row 4
assert result.iloc[7]['date_1h'] == result.iloc[4]['date'] assert result.iloc[7]["date_1h"] == result.iloc[4]["date"]
assert result.iloc[8]['date_1h'] == result.iloc[4]['date'] assert result.iloc[8]["date_1h"] == result.iloc[4]["date"]
informative = generate_test_data('1h', 40) informative = generate_test_data("1h", 40)
result = merge_informative_pair(data, informative, '15m', '1h', ffill=False) result = merge_informative_pair(data, informative, "15m", "1h", ffill=False)
# First 3 rows are empty # First 3 rows are empty
assert result.iloc[0]['date_1h'] is pd.NaT assert result.iloc[0]["date_1h"] is pd.NaT
assert result.iloc[1]['date_1h'] is pd.NaT assert result.iloc[1]["date_1h"] is pd.NaT
assert result.iloc[2]['date_1h'] is pd.NaT assert result.iloc[2]["date_1h"] is pd.NaT
# Next 4 rows contain the starting date (0:00) # Next 4 rows contain the starting date (0:00)
assert result.iloc[3]['date_1h'] == result.iloc[0]['date'] assert result.iloc[3]["date_1h"] == result.iloc[0]["date"]
assert result.iloc[4]['date_1h'] is pd.NaT assert result.iloc[4]["date_1h"] is pd.NaT
assert result.iloc[5]['date_1h'] is pd.NaT assert result.iloc[5]["date_1h"] is pd.NaT
assert result.iloc[6]['date_1h'] is pd.NaT assert result.iloc[6]["date_1h"] is pd.NaT
# Next 4 rows contain the next Hourly date original date row 4 # Next 4 rows contain the next Hourly date original date row 4
assert result.iloc[7]['date_1h'] == result.iloc[4]['date'] assert result.iloc[7]["date_1h"] == result.iloc[4]["date"]
assert result.iloc[8]['date_1h'] is pd.NaT assert result.iloc[8]["date_1h"] is pd.NaT
def test_merge_informative_pair_weekly(): def test_merge_informative_pair_weekly():
# Covers roughly 2 months - until 2023-01-10 # Covers roughly 2 months - until 2023-01-10
data = generate_test_data('1h', 1040, '2022-11-28') data = generate_test_data("1h", 1040, "2022-11-28")
informative = generate_test_data('1w', 40, '2022-11-01') informative = generate_test_data("1w", 40, "2022-11-01")
informative['day'] = informative['date'].dt.day_name() informative["day"] = informative["date"].dt.day_name()
result = merge_informative_pair(data, informative, '1h', '1w', ffill=True) result = merge_informative_pair(data, informative, "1h", "1w", ffill=True)
assert isinstance(result, pd.DataFrame) assert isinstance(result, pd.DataFrame)
# 2022-12-24 is a Saturday # 2022-12-24 is a Saturday
candle1 = result.loc[(result['date'] == '2022-12-24T22:00:00.000Z')] candle1 = result.loc[(result["date"] == "2022-12-24T22:00:00.000Z")]
assert candle1.iloc[0]['date'] == pd.Timestamp('2022-12-24T22:00:00.000Z') assert candle1.iloc[0]["date"] == pd.Timestamp("2022-12-24T22:00:00.000Z")
assert candle1.iloc[0]['date_1w'] == pd.Timestamp('2022-12-12T00:00:00.000Z') assert candle1.iloc[0]["date_1w"] == pd.Timestamp("2022-12-12T00:00:00.000Z")
candle2 = result.loc[(result['date'] == '2022-12-24T23:00:00.000Z')] candle2 = result.loc[(result["date"] == "2022-12-24T23:00:00.000Z")]
assert candle2.iloc[0]['date'] == pd.Timestamp('2022-12-24T23:00:00.000Z') assert candle2.iloc[0]["date"] == pd.Timestamp("2022-12-24T23:00:00.000Z")
assert candle2.iloc[0]['date_1w'] == pd.Timestamp('2022-12-12T00:00:00.000Z') assert candle2.iloc[0]["date_1w"] == pd.Timestamp("2022-12-12T00:00:00.000Z")
# 2022-12-25 is a Sunday # 2022-12-25 is a Sunday
candle3 = result.loc[(result['date'] == '2022-12-25T22:00:00.000Z')] candle3 = result.loc[(result["date"] == "2022-12-25T22:00:00.000Z")]
assert candle3.iloc[0]['date'] == pd.Timestamp('2022-12-25T22:00:00.000Z') assert candle3.iloc[0]["date"] == pd.Timestamp("2022-12-25T22:00:00.000Z")
# Still old candle # Still old candle
assert candle3.iloc[0]['date_1w'] == pd.Timestamp('2022-12-12T00:00:00.000Z') assert candle3.iloc[0]["date_1w"] == pd.Timestamp("2022-12-12T00:00:00.000Z")
candle4 = result.loc[(result['date'] == '2022-12-25T23:00:00.000Z')] candle4 = result.loc[(result["date"] == "2022-12-25T23:00:00.000Z")]
assert candle4.iloc[0]['date'] == pd.Timestamp('2022-12-25T23:00:00.000Z') assert candle4.iloc[0]["date"] == pd.Timestamp("2022-12-25T23:00:00.000Z")
assert candle4.iloc[0]['date_1w'] == pd.Timestamp('2022-12-19T00:00:00.000Z') assert candle4.iloc[0]["date_1w"] == pd.Timestamp("2022-12-19T00:00:00.000Z")
def test_merge_informative_pair_monthly(): def test_merge_informative_pair_monthly():
# Covers roughly 2 months - until 2023-01-10 # Covers roughly 2 months - until 2023-01-10
data = generate_test_data('1h', 1040, '2022-11-28') data = generate_test_data("1h", 1040, "2022-11-28")
informative = generate_test_data('1M', 40, '2022-01-01') informative = generate_test_data("1M", 40, "2022-01-01")
result = merge_informative_pair(data, informative, '1h', '1M', ffill=True) result = merge_informative_pair(data, informative, "1h", "1M", ffill=True)
assert isinstance(result, pd.DataFrame) assert isinstance(result, pd.DataFrame)
candle1 = result.loc[(result['date'] == '2022-12-31T22:00:00.000Z')] candle1 = result.loc[(result["date"] == "2022-12-31T22:00:00.000Z")]
assert candle1.iloc[0]['date'] == pd.Timestamp('2022-12-31T22:00:00.000Z') assert candle1.iloc[0]["date"] == pd.Timestamp("2022-12-31T22:00:00.000Z")
assert candle1.iloc[0]['date_1M'] == pd.Timestamp('2022-11-01T00:00:00.000Z') assert candle1.iloc[0]["date_1M"] == pd.Timestamp("2022-11-01T00:00:00.000Z")
candle2 = result.loc[(result['date'] == '2022-12-31T23:00:00.000Z')] candle2 = result.loc[(result["date"] == "2022-12-31T23:00:00.000Z")]
assert candle2.iloc[0]['date'] == pd.Timestamp('2022-12-31T23:00:00.000Z') assert candle2.iloc[0]["date"] == pd.Timestamp("2022-12-31T23:00:00.000Z")
assert candle2.iloc[0]['date_1M'] == pd.Timestamp('2022-12-01T00:00:00.000Z') assert candle2.iloc[0]["date_1M"] == pd.Timestamp("2022-12-01T00:00:00.000Z")
# Candle is empty, as the start-date did fail. # Candle is empty, as the start-date did fail.
candle3 = result.loc[(result['date'] == '2022-11-30T22:00:00.000Z')] candle3 = result.loc[(result["date"] == "2022-11-30T22:00:00.000Z")]
assert candle3.iloc[0]['date'] == pd.Timestamp('2022-11-30T22:00:00.000Z') assert candle3.iloc[0]["date"] == pd.Timestamp("2022-11-30T22:00:00.000Z")
assert candle3.iloc[0]['date_1M'] is pd.NaT assert candle3.iloc[0]["date_1M"] is pd.NaT
# First candle with 1M data merged. # First candle with 1M data merged.
candle4 = result.loc[(result['date'] == '2022-11-30T23:00:00.000Z')] candle4 = result.loc[(result["date"] == "2022-11-30T23:00:00.000Z")]
assert candle4.iloc[0]['date'] == pd.Timestamp('2022-11-30T23:00:00.000Z') assert candle4.iloc[0]["date"] == pd.Timestamp("2022-11-30T23:00:00.000Z")
assert candle4.iloc[0]['date_1M'] == pd.Timestamp('2022-11-01T00:00:00.000Z') assert candle4.iloc[0]["date_1M"] == pd.Timestamp("2022-11-01T00:00:00.000Z")
def test_merge_informative_pair_same(): def test_merge_informative_pair_same():
data = generate_test_data('15m', 40) data = generate_test_data("15m", 40)
informative = generate_test_data('15m', 40) informative = generate_test_data("15m", 40)
result = merge_informative_pair(data, informative, '15m', '15m', ffill=True) result = merge_informative_pair(data, informative, "15m", "15m", ffill=True)
assert isinstance(result, pd.DataFrame) assert isinstance(result, pd.DataFrame)
assert len(result) == len(data) assert len(result) == len(data)
assert 'date' in result.columns assert "date" in result.columns
assert result['date'].equals(data['date']) assert result["date"].equals(data["date"])
assert 'date_15m' in result.columns assert "date_15m" in result.columns
assert 'open' in result.columns assert "open" in result.columns
assert 'open_15m' in result.columns assert "open_15m" in result.columns
assert result['open'].equals(data['open']) assert result["open"].equals(data["open"])
assert 'close' in result.columns assert "close" in result.columns
assert 'close_15m' in result.columns assert "close_15m" in result.columns
assert result['close'].equals(data['close']) assert result["close"].equals(data["close"])
assert 'volume' in result.columns assert "volume" in result.columns
assert 'volume_15m' in result.columns assert "volume_15m" in result.columns
assert result['volume'].equals(data['volume']) assert result["volume"].equals(data["volume"])
# Dates match 1:1 # Dates match 1:1
assert result['date_15m'].equals(result['date']) assert result["date_15m"].equals(result["date"])
def test_merge_informative_pair_lower(): def test_merge_informative_pair_lower():
data = generate_test_data('1h', 40) data = generate_test_data("1h", 40)
informative = generate_test_data('15m', 40) informative = generate_test_data("15m", 40)
with pytest.raises(ValueError, match=r"Tried to merge a faster timeframe .*"): with pytest.raises(ValueError, match=r"Tried to merge a faster timeframe .*"):
merge_informative_pair(data, informative, '1h', '15m', ffill=True) merge_informative_pair(data, informative, "1h", "15m", ffill=True)
def test_merge_informative_pair_empty(): def test_merge_informative_pair_empty():
data = generate_test_data('1h', 40) data = generate_test_data("1h", 40)
informative = pd.DataFrame(columns=data.columns) informative = pd.DataFrame(columns=data.columns)
result = merge_informative_pair(data, informative, '1h', '2h', ffill=True) result = merge_informative_pair(data, informative, "1h", "2h", ffill=True)
assert result['date'].equals(data['date']) assert result["date"].equals(data["date"])
assert list(result.columns) == [ assert list(result.columns) == [
'date', "date",
'open', "open",
'high', "high",
'low', "low",
'close', "close",
'volume', "volume",
'date_2h', "date_2h",
'open_2h', "open_2h",
'high_2h', "high_2h",
'low_2h', "low_2h",
'close_2h', "close_2h",
'volume_2h' "volume_2h",
] ]
# We merge an empty dataframe, so all values should be NaN # We merge an empty dataframe, so all values should be NaN
for col in ['date_2h', 'open_2h', 'high_2h', 'low_2h', 'close_2h', 'volume_2h']: for col in ["date_2h", "open_2h", "high_2h", "low_2h", "close_2h", "volume_2h"]:
assert result[col].isnull().all() assert result[col].isnull().all()
def test_merge_informative_pair_suffix(): def test_merge_informative_pair_suffix():
data = generate_test_data('15m', 20) data = generate_test_data("15m", 20)
informative = generate_test_data('1h', 20) informative = generate_test_data("1h", 20)
result = merge_informative_pair(data, informative, '15m', '1h', result = merge_informative_pair(
append_timeframe=False, suffix="suf") data, informative, "15m", "1h", append_timeframe=False, suffix="suf"
)
assert 'date' in result.columns assert "date" in result.columns
assert result['date'].equals(data['date']) assert result["date"].equals(data["date"])
assert 'date_suf' in result.columns assert "date_suf" in result.columns
assert 'open_suf' in result.columns assert "open_suf" in result.columns
assert 'open_1h' not in result.columns assert "open_1h" not in result.columns
assert list(result.columns) == [ assert list(result.columns) == [
'date', "date",
'open', "open",
'high', "high",
'low', "low",
'close', "close",
'volume', "volume",
'date_suf', "date_suf",
'open_suf', "open_suf",
'high_suf', "high_suf",
'low_suf', "low_suf",
'close_suf', "close_suf",
'volume_suf' "volume_suf",
] ]
def test_merge_informative_pair_suffix_append_timeframe(): def test_merge_informative_pair_suffix_append_timeframe():
data = generate_test_data('15m', 20) data = generate_test_data("15m", 20)
informative = generate_test_data('1h', 20) informative = generate_test_data("1h", 20)
with pytest.raises(ValueError, match=r"You can not specify `append_timeframe` .*"): with pytest.raises(ValueError, match=r"You can not specify `append_timeframe` .*"):
merge_informative_pair(data, informative, '15m', '1h', suffix="suf") merge_informative_pair(data, informative, "15m", "1h", suffix="suf")
@pytest.mark.parametrize("side,profitrange", [ @pytest.mark.parametrize(
"side,profitrange",
[
# profit range for long is [-1, inf] while for shorts is [-inf, 1] # profit range for long is [-1, inf] while for shorts is [-inf, 1]
("long", [-0.99, 2, 30]), ("long", [-0.99, 2, 30]),
("short", [-2.0, 0.99, 30]), ("short", [-2.0, 0.99, 30]),
]) ],
)
def test_stoploss_from_open(side, profitrange): def test_stoploss_from_open(side, profitrange):
open_price_ranges = [ open_price_ranges = [
[0.01, 1.00, 30], [0.01, 1.00, 30],
@ -231,8 +235,7 @@ def test_stoploss_from_open(side, profitrange):
for open_range in open_price_ranges: for open_range in open_price_ranges:
for open_price in np.linspace(*open_range): for open_price in np.linspace(*open_range):
for desired_stop in np.linspace(-0.50, 0.50, 30): for desired_stop in np.linspace(-0.50, 0.50, 30):
if side == "long":
if side == 'long':
# -1 is not a valid current_profit, should return 1 # -1 is not a valid current_profit, should return 1
assert stoploss_from_open(desired_stop, -1) == 1 assert stoploss_from_open(desired_stop, -1) == 1
else: else:
@ -240,7 +243,7 @@ def test_stoploss_from_open(side, profitrange):
assert stoploss_from_open(desired_stop, 1, True) == 1 assert stoploss_from_open(desired_stop, 1, True) == 1
for current_profit in np.linspace(*profitrange): for current_profit in np.linspace(*profitrange):
if side == 'long': if side == "long":
current_price = open_price * (1 + current_profit) current_price = open_price * (1 + current_profit)
expected_stop_price = open_price * (1 + desired_stop) expected_stop_price = open_price * (1 + desired_stop)
stoploss = stoploss_from_open(desired_stop, current_profit) stoploss = stoploss_from_open(desired_stop, current_profit)
@ -254,19 +257,22 @@ def test_stoploss_from_open(side, profitrange):
assert stoploss >= 0 assert stoploss >= 0
# Technically the formula can yield values greater than 1 for shorts # Technically the formula can yield values greater than 1 for shorts
# even though it doesn't make sense because the position would be liquidated # even though it doesn't make sense because the position would be liquidated
if side == 'long': if side == "long":
assert stoploss <= 1 assert stoploss <= 1
# there is no correct answer if the expected stop price is above # there is no correct answer if the expected stop price is above
# the current price # the current price
if ((side == 'long' and expected_stop_price > current_price) if (side == "long" and expected_stop_price > current_price) or (
or (side == 'short' and expected_stop_price < current_price)): side == "short" and expected_stop_price < current_price
):
assert stoploss == 0 assert stoploss == 0
else: else:
assert pytest.approx(stop_price) == expected_stop_price assert pytest.approx(stop_price) == expected_stop_price
@pytest.mark.parametrize("side,rel_stop,curr_profit,leverage,expected", [ @pytest.mark.parametrize(
"side,rel_stop,curr_profit,leverage,expected",
[
# profit range for long is [-1, inf] while for shorts is [-inf, 1] # profit range for long is [-1, inf] while for shorts is [-inf, 1]
("long", 0, -1, 1, 1), ("long", 0, -1, 1, 1),
("long", 0, 0.1, 1, 0.09090909), ("long", 0, 0.1, 1, 0.09090909),
@ -277,20 +283,19 @@ def test_stoploss_from_open(side, profitrange):
("long", 0, 5, 10, 3.3333333), # 500% profit, set stoploss break even ("long", 0, 5, 10, 3.3333333), # 500% profit, set stoploss break even
("long", 0.1, 5, 10, 3.26666666), # 500% profit, set stoploss to 10% above open price ("long", 0.1, 5, 10, 3.26666666), # 500% profit, set stoploss to 10% above open price
("long", -0.1, 5, 10, 3.3999999), # 500% profit, set stoploss to 10% belowopen price ("long", -0.1, 5, 10, 3.3999999), # 500% profit, set stoploss to 10% belowopen price
("short", 0, 0.1, 1, 0.1111111), ("short", 0, 0.1, 1, 0.1111111),
("short", -0.1, 0.1, 1, 0.2222222), ("short", -0.1, 0.1, 1, 0.2222222),
("short", 0.1, 0.2, 1, 0.125), ("short", 0.1, 0.2, 1, 0.125),
("short", 0.1, 1, 1, 1), ("short", 0.1, 1, 1, 1),
("short", -0.01, 5, 10, 10.01999999), # 500% profit at 10x ("short", -0.01, 5, 10, 10.01999999), # 500% profit at 10x
]) ],
)
def test_stoploss_from_open_leverage(side, rel_stop, curr_profit, leverage, expected): def test_stoploss_from_open_leverage(side, rel_stop, curr_profit, leverage, expected):
stoploss = stoploss_from_open(rel_stop, curr_profit, side == "short", leverage)
stoploss = stoploss_from_open(rel_stop, curr_profit, side == 'short', leverage)
assert pytest.approx(stoploss) == expected assert pytest.approx(stoploss) == expected
open_rate = 100 open_rate = 100
if stoploss != 1: if stoploss != 1:
if side == 'long': if side == "long":
current_rate = open_rate * (1 + curr_profit / leverage) current_rate = open_rate * (1 + curr_profit / leverage)
stop = current_rate * (1 - stoploss / leverage) stop = current_rate * (1 - stoploss / leverage)
assert pytest.approx(stop) == open_rate * (1 + rel_stop / leverage) assert pytest.approx(stop) == open_rate * (1 + rel_stop / leverage)
@ -322,73 +327,79 @@ def test_stoploss_from_absolute():
assert pytest.approx(stoploss_from_absolute(100, 1, is_short=True, leverage=5)) == 5 assert pytest.approx(stoploss_from_absolute(100, 1, is_short=True, leverage=5)) == 5
@pytest.mark.parametrize('trading_mode', ['futures', 'spot']) @pytest.mark.parametrize("trading_mode", ["futures", "spot"])
def test_informative_decorator(mocker, default_conf_usdt, trading_mode): def test_informative_decorator(mocker, default_conf_usdt, trading_mode):
candle_def = CandleType.get_default(trading_mode) candle_def = CandleType.get_default(trading_mode)
default_conf_usdt['candle_type_def'] = candle_def default_conf_usdt["candle_type_def"] = candle_def
test_data_5m = generate_test_data('5m', 40) test_data_5m = generate_test_data("5m", 40)
test_data_30m = generate_test_data('30m', 40) test_data_30m = generate_test_data("30m", 40)
test_data_1h = generate_test_data('1h', 40) test_data_1h = generate_test_data("1h", 40)
data = { data = {
('XRP/USDT', '5m', candle_def): test_data_5m, ("XRP/USDT", "5m", candle_def): test_data_5m,
('XRP/USDT', '30m', candle_def): test_data_30m, ("XRP/USDT", "30m", candle_def): test_data_30m,
('XRP/USDT', '1h', candle_def): test_data_1h, ("XRP/USDT", "1h", candle_def): test_data_1h,
('XRP/BTC', '1h', candle_def): test_data_1h, # from {base}/BTC ("XRP/BTC", "1h", candle_def): test_data_1h, # from {base}/BTC
('LTC/USDT', '5m', candle_def): test_data_5m, ("LTC/USDT", "5m", candle_def): test_data_5m,
('LTC/USDT', '30m', candle_def): test_data_30m, ("LTC/USDT", "30m", candle_def): test_data_30m,
('LTC/USDT', '1h', candle_def): test_data_1h, ("LTC/USDT", "1h", candle_def): test_data_1h,
('LTC/BTC', '1h', candle_def): test_data_1h, # from {base}/BTC ("LTC/BTC", "1h", candle_def): test_data_1h, # from {base}/BTC
('NEO/USDT', '30m', candle_def): test_data_30m, ("NEO/USDT", "30m", candle_def): test_data_30m,
('NEO/USDT', '5m', CandleType.SPOT): test_data_5m, # Explicit request with '' as candletype ("NEO/USDT", "5m", CandleType.SPOT): test_data_5m, # Explicit request with '' as candletype
('NEO/USDT', '15m', candle_def): test_data_5m, # Explicit request with '' as candletype ("NEO/USDT", "15m", candle_def): test_data_5m, # Explicit request with '' as candletype
('NEO/USDT', '1h', candle_def): test_data_1h, ("NEO/USDT", "1h", candle_def): test_data_1h,
('ETH/USDT', '1h', candle_def): test_data_1h, ("ETH/USDT", "1h", candle_def): test_data_1h,
('ETH/USDT', '30m', candle_def): test_data_30m, ("ETH/USDT", "30m", candle_def): test_data_30m,
('ETH/BTC', '1h', CandleType.SPOT): test_data_1h, # Explicitly selected as spot ("ETH/BTC", "1h", CandleType.SPOT): test_data_1h, # Explicitly selected as spot
} }
default_conf_usdt['strategy'] = 'InformativeDecoratorTest' default_conf_usdt["strategy"] = "InformativeDecoratorTest"
strategy = StrategyResolver.load_strategy(default_conf_usdt) strategy = StrategyResolver.load_strategy(default_conf_usdt)
exchange = get_patched_exchange(mocker, default_conf_usdt) exchange = get_patched_exchange(mocker, default_conf_usdt)
strategy.dp = DataProvider({}, exchange, None) strategy.dp = DataProvider({}, exchange, None)
mocker.patch.object(strategy.dp, 'current_whitelist', return_value=[ mocker.patch.object(
'XRP/USDT', 'LTC/USDT', 'NEO/USDT' strategy.dp, "current_whitelist", return_value=["XRP/USDT", "LTC/USDT", "NEO/USDT"]
]) )
assert len(strategy._ft_informative) == 7 # Equal to number of decorators used assert len(strategy._ft_informative) == 7 # Equal to number of decorators used
informative_pairs = [ informative_pairs = [
('XRP/USDT', '1h', candle_def), ("XRP/USDT", "1h", candle_def),
('XRP/BTC', '1h', candle_def), ("XRP/BTC", "1h", candle_def),
('LTC/USDT', '1h', candle_def), ("LTC/USDT", "1h", candle_def),
('LTC/BTC', '1h', candle_def), ("LTC/BTC", "1h", candle_def),
('XRP/USDT', '30m', candle_def), ("XRP/USDT", "30m", candle_def),
('LTC/USDT', '30m', candle_def), ("LTC/USDT", "30m", candle_def),
('NEO/USDT', '1h', candle_def), ("NEO/USDT", "1h", candle_def),
('NEO/USDT', '30m', candle_def), ("NEO/USDT", "30m", candle_def),
('NEO/USDT', '5m', candle_def), ("NEO/USDT", "5m", candle_def),
('NEO/USDT', '15m', candle_def), ("NEO/USDT", "15m", candle_def),
('NEO/USDT', '2h', CandleType.FUTURES), ("NEO/USDT", "2h", CandleType.FUTURES),
('ETH/BTC', '1h', CandleType.SPOT), # One candle remains as spot ("ETH/BTC", "1h", CandleType.SPOT), # One candle remains as spot
('ETH/USDT', '30m', candle_def)] ("ETH/USDT", "30m", candle_def),
]
for inf_pair in informative_pairs: for inf_pair in informative_pairs:
assert inf_pair in strategy.gather_informative_pairs() assert inf_pair in strategy.gather_informative_pairs()
def test_historic_ohlcv(pair, timeframe, candle_type): def test_historic_ohlcv(pair, timeframe, candle_type):
return data[ return data[
(pair, timeframe or strategy.timeframe, CandleType.from_string(candle_type))].copy() (pair, timeframe or strategy.timeframe, CandleType.from_string(candle_type))
].copy()
mocker.patch('freqtrade.data.dataprovider.DataProvider.historic_ohlcv', mocker.patch(
side_effect=test_historic_ohlcv) "freqtrade.data.dataprovider.DataProvider.historic_ohlcv", side_effect=test_historic_ohlcv
)
analyzed = strategy.advise_all_indicators( analyzed = strategy.advise_all_indicators(
{p: data[(p, strategy.timeframe, candle_def)] for p in ('XRP/USDT', 'LTC/USDT')}) {p: data[(p, strategy.timeframe, candle_def)] for p in ("XRP/USDT", "LTC/USDT")}
)
expected_columns = [ expected_columns = [
'rsi_1h', 'rsi_30m', # Stacked informative decorators "rsi_1h",
'neo_usdt_rsi_1h', # NEO 1h informative "rsi_30m", # Stacked informative decorators
'rsi_NEO_USDT_neo_usdt_NEO/USDT_30m', # Column formatting "neo_usdt_rsi_1h", # NEO 1h informative
'rsi_from_callable', # Custom column formatter "rsi_NEO_USDT_neo_usdt_NEO/USDT_30m", # Column formatting
'eth_btc_rsi_1h', # Quote currency not matching stake currency "rsi_from_callable", # Custom column formatter
'rsi', 'rsi_less', # Non-informative columns "eth_btc_rsi_1h", # Quote currency not matching stake currency
'rsi_5m', # Manual informative dataframe "rsi",
"rsi_less", # Non-informative columns
"rsi_5m", # Manual informative dataframe
] ]
for _, dataframe in analyzed.items(): for _, dataframe in analyzed.items():
for col in expected_columns: for col in expected_columns:

View File

@ -14,7 +14,7 @@ from tests.conftest import CURRENT_TEST_STRATEGY, log_has, log_has_re
def test_search_strategy(): def test_search_strategy():
default_location = Path(__file__).parent / 'strats' default_location = Path(__file__).parent / "strats"
s, _ = StrategyResolver._search_object( s, _ = StrategyResolver._search_object(
directory=default_location, directory=default_location,
@ -25,7 +25,7 @@ def test_search_strategy():
s, _ = StrategyResolver._search_object( s, _ = StrategyResolver._search_object(
directory=default_location, directory=default_location,
object_name='NotFoundStrategy', object_name="NotFoundStrategy",
add_source=True, add_source=True,
) )
assert s is None assert s is None
@ -46,9 +46,9 @@ def test_search_all_strategies_with_failed():
assert len(strategies) == 14 assert len(strategies) == 14
# with enum_failed=True search_all_objects() shall find 2 good strategies # with enum_failed=True search_all_objects() shall find 2 good strategies
# and 1 which fails to load # and 1 which fails to load
assert len([x for x in strategies if x['class'] is not None]) == 13 assert len([x for x in strategies if x["class"] is not None]) == 13
assert len([x for x in strategies if x['class'] is None]) == 1 assert len([x for x in strategies if x["class"] is None]) == 1
directory = Path(__file__).parent / "strats_nonexistingdir" directory = Path(__file__).parent / "strats_nonexistingdir"
strategies = StrategyResolver._search_all_objects(directory, enum_failed=True) strategies = StrategyResolver._search_all_objects(directory, enum_failed=True)
@ -56,123 +56,126 @@ def test_search_all_strategies_with_failed():
def test_load_strategy(default_conf, dataframe_1m): def test_load_strategy(default_conf, dataframe_1m):
default_conf.update({'strategy': 'SampleStrategy', default_conf.update(
'strategy_path': str(Path(__file__).parents[2] / 'freqtrade/templates') {
}) "strategy": "SampleStrategy",
"strategy_path": str(Path(__file__).parents[2] / "freqtrade/templates"),
}
)
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert isinstance(strategy.__source__, str) assert isinstance(strategy.__source__, str)
assert 'class SampleStrategy' in strategy.__source__ assert "class SampleStrategy" in strategy.__source__
assert isinstance(strategy.__file__, str) assert isinstance(strategy.__file__, str)
assert 'rsi' in strategy.advise_indicators(dataframe_1m, {'pair': 'ETH/BTC'}) assert "rsi" in strategy.advise_indicators(dataframe_1m, {"pair": "ETH/BTC"})
def test_load_strategy_base64(dataframe_1m, caplog, default_conf): def test_load_strategy_base64(dataframe_1m, caplog, default_conf):
filepath = Path(__file__).parents[2] / 'freqtrade/templates/sample_strategy.py' filepath = Path(__file__).parents[2] / "freqtrade/templates/sample_strategy.py"
encoded_string = urlsafe_b64encode(filepath.read_bytes()).decode("utf-8") encoded_string = urlsafe_b64encode(filepath.read_bytes()).decode("utf-8")
default_conf.update({'strategy': f'SampleStrategy:{encoded_string}'}) default_conf.update({"strategy": f"SampleStrategy:{encoded_string}"})
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert 'rsi' in strategy.advise_indicators(dataframe_1m, {'pair': 'ETH/BTC'}) assert "rsi" in strategy.advise_indicators(dataframe_1m, {"pair": "ETH/BTC"})
# Make sure strategy was loaded from base64 (using temp directory)!! # Make sure strategy was loaded from base64 (using temp directory)!!
assert log_has_re(r"Using resolved strategy SampleStrategy from '" assert log_has_re(
r".*(/|\\).*(/|\\)SampleStrategy\.py'\.\.\.", caplog) r"Using resolved strategy SampleStrategy from '"
r".*(/|\\).*(/|\\)SampleStrategy\.py'\.\.\.",
caplog,
)
def test_load_strategy_invalid_directory(caplog, default_conf, tmp_path): def test_load_strategy_invalid_directory(caplog, default_conf, tmp_path):
default_conf['user_data_dir'] = tmp_path default_conf["user_data_dir"] = tmp_path
extra_dir = Path.cwd() / 'some/path' extra_dir = Path.cwd() / "some/path"
with pytest.raises(OperationalException, match=r"Impossible to load Strategy.*"): with pytest.raises(OperationalException, match=r"Impossible to load Strategy.*"):
StrategyResolver._load_strategy('StrategyTestV333', config=default_conf, StrategyResolver._load_strategy(
extra_dir=extra_dir) "StrategyTestV333", config=default_conf, extra_dir=extra_dir
)
assert log_has_re(r'Path .*' + r'some.*path.*' + r'.* does not exist', caplog) assert log_has_re(r"Path .*" + r"some.*path.*" + r".* does not exist", caplog)
def test_load_not_found_strategy(default_conf, tmp_path): def test_load_not_found_strategy(default_conf, tmp_path):
default_conf['user_data_dir'] = tmp_path default_conf["user_data_dir"] = tmp_path
default_conf['strategy'] = 'NotFoundStrategy' default_conf["strategy"] = "NotFoundStrategy"
with pytest.raises(OperationalException, with pytest.raises(
OperationalException,
match=r"Impossible to load Strategy 'NotFoundStrategy'. " match=r"Impossible to load Strategy 'NotFoundStrategy'. "
r"This class does not exist or contains Python code errors."): r"This class does not exist or contains Python code errors.",
):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
def test_load_strategy_noname(default_conf): def test_load_strategy_noname(default_conf):
default_conf['strategy'] = '' default_conf["strategy"] = ""
with pytest.raises(OperationalException, with pytest.raises(
match="No strategy set. Please use `--strategy` to specify " OperationalException,
"the strategy class to use."): match="No strategy set. Please use `--strategy` to specify " "the strategy class to use.",
):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
@pytest.mark.filterwarnings("ignore:deprecated") @pytest.mark.filterwarnings("ignore:deprecated")
@ pytest.mark.parametrize('strategy_name', ['StrategyTestV2']) @pytest.mark.parametrize("strategy_name", ["StrategyTestV2"])
def test_strategy_pre_v3(dataframe_1m, default_conf, strategy_name): def test_strategy_pre_v3(dataframe_1m, default_conf, strategy_name):
default_conf.update({'strategy': strategy_name}) default_conf.update({"strategy": strategy_name})
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
metadata = {'pair': 'ETH/BTC'} metadata = {"pair": "ETH/BTC"}
assert strategy.minimal_roi[0] == 0.04 assert strategy.minimal_roi[0] == 0.04
assert default_conf["minimal_roi"]['0'] == 0.04 assert default_conf["minimal_roi"]["0"] == 0.04
assert strategy.stoploss == -0.10 assert strategy.stoploss == -0.10
assert default_conf['stoploss'] == -0.10 assert default_conf["stoploss"] == -0.10
assert strategy.timeframe == '5m' assert strategy.timeframe == "5m"
assert default_conf['timeframe'] == '5m' assert default_conf["timeframe"] == "5m"
df_indicators = strategy.advise_indicators(dataframe_1m, metadata=metadata) df_indicators = strategy.advise_indicators(dataframe_1m, metadata=metadata)
assert 'adx' in df_indicators assert "adx" in df_indicators
dataframe = strategy.advise_entry(df_indicators, metadata=metadata) dataframe = strategy.advise_entry(df_indicators, metadata=metadata)
assert 'buy' not in dataframe.columns assert "buy" not in dataframe.columns
assert 'enter_long' in dataframe.columns assert "enter_long" in dataframe.columns
dataframe = strategy.advise_exit(df_indicators, metadata=metadata) dataframe = strategy.advise_exit(df_indicators, metadata=metadata)
assert 'sell' not in dataframe.columns assert "sell" not in dataframe.columns
assert 'exit_long' in dataframe.columns assert "exit_long" in dataframe.columns
def test_strategy_can_short(caplog, default_conf): def test_strategy_can_short(caplog, default_conf):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
default_conf.update({ default_conf.update(
'strategy': CURRENT_TEST_STRATEGY, {
}) "strategy": CURRENT_TEST_STRATEGY,
}
)
strat = StrategyResolver.load_strategy(default_conf) strat = StrategyResolver.load_strategy(default_conf)
assert isinstance(strat, IStrategy) assert isinstance(strat, IStrategy)
default_conf['strategy'] = 'StrategyTestV3Futures' default_conf["strategy"] = "StrategyTestV3Futures"
with pytest.raises(ImportError, match=""): with pytest.raises(ImportError, match=""):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
default_conf['trading_mode'] = 'futures' default_conf["trading_mode"] = "futures"
strat = StrategyResolver.load_strategy(default_conf) strat = StrategyResolver.load_strategy(default_conf)
assert isinstance(strat, IStrategy) assert isinstance(strat, IStrategy)
def test_strategy_override_minimal_roi(caplog, default_conf): def test_strategy_override_minimal_roi(caplog, default_conf):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
default_conf.update({ default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "minimal_roi": {"20": 0.1, "0": 0.5}})
'strategy': CURRENT_TEST_STRATEGY,
'minimal_roi': {
"20": 0.1,
"0": 0.5
}
})
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert strategy.minimal_roi[0] == 0.5 assert strategy.minimal_roi[0] == 0.5
assert log_has( assert log_has(
"Override strategy 'minimal_roi' with value in config file: {'20': 0.1, '0': 0.5}.", "Override strategy 'minimal_roi' with value in config file: {'20': 0.1, '0': 0.5}.", caplog
caplog) )
def test_strategy_override_stoploss(caplog, default_conf): def test_strategy_override_stoploss(caplog, default_conf):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
default_conf.update({ default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "stoploss": -0.5})
'strategy': CURRENT_TEST_STRATEGY,
'stoploss': -0.5
})
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert strategy.stoploss == -0.5 assert strategy.stoploss == -0.5
@ -181,10 +184,7 @@ def test_strategy_override_stoploss(caplog, default_conf):
def test_strategy_override_max_open_trades(caplog, default_conf): def test_strategy_override_max_open_trades(caplog, default_conf):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
default_conf.update({ default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "max_open_trades": 7})
'strategy': CURRENT_TEST_STRATEGY,
'max_open_trades': 7
})
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert strategy.max_open_trades == 7 assert strategy.max_open_trades == 7
@ -193,10 +193,7 @@ def test_strategy_override_max_open_trades(caplog, default_conf):
def test_strategy_override_trailing_stop(caplog, default_conf): def test_strategy_override_trailing_stop(caplog, default_conf):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
default_conf.update({ default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "trailing_stop": True})
'strategy': CURRENT_TEST_STRATEGY,
'trailing_stop': True
})
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert strategy.trailing_stop assert strategy.trailing_stop
@ -206,84 +203,81 @@ def test_strategy_override_trailing_stop(caplog, default_conf):
def test_strategy_override_trailing_stop_positive(caplog, default_conf): def test_strategy_override_trailing_stop_positive(caplog, default_conf):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
default_conf.update({ default_conf.update(
'strategy': CURRENT_TEST_STRATEGY, {
'trailing_stop_positive': -0.1, "strategy": CURRENT_TEST_STRATEGY,
'trailing_stop_positive_offset': -0.2 "trailing_stop_positive": -0.1,
"trailing_stop_positive_offset": -0.2,
}) }
)
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert strategy.trailing_stop_positive == -0.1 assert strategy.trailing_stop_positive == -0.1
assert log_has("Override strategy 'trailing_stop_positive' with value in config file: -0.1.", assert log_has(
caplog) "Override strategy 'trailing_stop_positive' with value in config file: -0.1.", caplog
)
assert strategy.trailing_stop_positive_offset == -0.2 assert strategy.trailing_stop_positive_offset == -0.2
assert log_has("Override strategy 'trailing_stop_positive' with value in config file: -0.1.", assert log_has(
caplog) "Override strategy 'trailing_stop_positive' with value in config file: -0.1.", caplog
)
def test_strategy_override_timeframe(caplog, default_conf): def test_strategy_override_timeframe(caplog, default_conf):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
default_conf.update({ default_conf.update(
'strategy': CURRENT_TEST_STRATEGY, {"strategy": CURRENT_TEST_STRATEGY, "timeframe": 60, "stake_currency": "ETH"}
'timeframe': 60, )
'stake_currency': 'ETH'
})
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert strategy.timeframe == 60 assert strategy.timeframe == 60
assert strategy.stake_currency == 'ETH' assert strategy.stake_currency == "ETH"
assert log_has("Override strategy 'timeframe' with value in config file: 60.", assert log_has("Override strategy 'timeframe' with value in config file: 60.", caplog)
caplog)
def test_strategy_override_process_only_new_candles(caplog, default_conf): def test_strategy_override_process_only_new_candles(caplog, default_conf):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
default_conf.update({ default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "process_only_new_candles": False})
'strategy': CURRENT_TEST_STRATEGY,
'process_only_new_candles': False
})
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert not strategy.process_only_new_candles assert not strategy.process_only_new_candles
assert log_has("Override strategy 'process_only_new_candles' with value in config file: False.", assert log_has(
caplog) "Override strategy 'process_only_new_candles' with value in config file: False.", caplog
)
def test_strategy_override_order_types(caplog, default_conf): def test_strategy_override_order_types(caplog, default_conf):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
order_types = { order_types = {
'entry': 'market', "entry": "market",
'exit': 'limit', "exit": "limit",
'stoploss': 'limit', "stoploss": "limit",
'stoploss_on_exchange': True, "stoploss_on_exchange": True,
} }
default_conf.update({ default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "order_types": order_types})
'strategy': CURRENT_TEST_STRATEGY,
'order_types': order_types
})
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert strategy.order_types assert strategy.order_types
for method in ['entry', 'exit', 'stoploss', 'stoploss_on_exchange']: for method in ["entry", "exit", "stoploss", "stoploss_on_exchange"]:
assert strategy.order_types[method] == order_types[method] assert strategy.order_types[method] == order_types[method]
assert log_has("Override strategy 'order_types' with value in config file:" assert log_has(
"Override strategy 'order_types' with value in config file:"
" {'entry': 'market', 'exit': 'limit', 'stoploss': 'limit'," " {'entry': 'market', 'exit': 'limit', 'stoploss': 'limit',"
" 'stoploss_on_exchange': True}.", caplog) " 'stoploss_on_exchange': True}.",
caplog,
)
default_conf.update({ default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "order_types": {"exit": "market"}})
'strategy': CURRENT_TEST_STRATEGY,
'order_types': {'exit': 'market'}
})
# Raise error for invalid configuration # Raise error for invalid configuration
with pytest.raises(ImportError, with pytest.raises(
ImportError,
match=r"Impossible to load Strategy '" + CURRENT_TEST_STRATEGY + "'. " match=r"Impossible to load Strategy '" + CURRENT_TEST_STRATEGY + "'. "
r"Order-types mapping is incomplete."): r"Order-types mapping is incomplete.",
):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
@ -291,50 +285,57 @@ def test_strategy_override_order_tif(caplog, default_conf):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
order_time_in_force = { order_time_in_force = {
'entry': 'FOK', "entry": "FOK",
'exit': 'GTC', "exit": "GTC",
} }
default_conf.update({ default_conf.update(
'strategy': CURRENT_TEST_STRATEGY, {"strategy": CURRENT_TEST_STRATEGY, "order_time_in_force": order_time_in_force}
'order_time_in_force': order_time_in_force )
})
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert strategy.order_time_in_force assert strategy.order_time_in_force
for method in ['entry', 'exit']: for method in ["entry", "exit"]:
assert strategy.order_time_in_force[method] == order_time_in_force[method] assert strategy.order_time_in_force[method] == order_time_in_force[method]
assert log_has("Override strategy 'order_time_in_force' with value in config file:" assert log_has(
" {'entry': 'FOK', 'exit': 'GTC'}.", caplog) "Override strategy 'order_time_in_force' with value in config file:"
" {'entry': 'FOK', 'exit': 'GTC'}.",
caplog,
)
default_conf.update({ default_conf.update(
'strategy': CURRENT_TEST_STRATEGY, {"strategy": CURRENT_TEST_STRATEGY, "order_time_in_force": {"entry": "FOK"}}
'order_time_in_force': {'entry': 'FOK'} )
})
# Raise error for invalid configuration # Raise error for invalid configuration
with pytest.raises(ImportError, with pytest.raises(
ImportError,
match=f"Impossible to load Strategy '{CURRENT_TEST_STRATEGY}'. " match=f"Impossible to load Strategy '{CURRENT_TEST_STRATEGY}'. "
"Order-time-in-force mapping is incomplete."): "Order-time-in-force mapping is incomplete.",
):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
def test_strategy_override_use_exit_signal(caplog, default_conf): def test_strategy_override_use_exit_signal(caplog, default_conf):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
default_conf.update({ default_conf.update(
'strategy': CURRENT_TEST_STRATEGY, {
}) "strategy": CURRENT_TEST_STRATEGY,
}
)
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert strategy.use_exit_signal assert strategy.use_exit_signal
assert isinstance(strategy.use_exit_signal, bool) assert isinstance(strategy.use_exit_signal, bool)
# must be inserted to configuration # must be inserted to configuration
assert 'use_exit_signal' in default_conf assert "use_exit_signal" in default_conf
assert default_conf['use_exit_signal'] assert default_conf["use_exit_signal"]
default_conf.update({ default_conf.update(
'strategy': CURRENT_TEST_STRATEGY, {
'use_exit_signal': False, "strategy": CURRENT_TEST_STRATEGY,
}) "use_exit_signal": False,
}
)
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert not strategy.use_exit_signal assert not strategy.use_exit_signal
@ -344,20 +345,24 @@ def test_strategy_override_use_exit_signal(caplog, default_conf):
def test_strategy_override_use_exit_profit_only(caplog, default_conf): def test_strategy_override_use_exit_profit_only(caplog, default_conf):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
default_conf.update({ default_conf.update(
'strategy': CURRENT_TEST_STRATEGY, {
}) "strategy": CURRENT_TEST_STRATEGY,
}
)
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert not strategy.exit_profit_only assert not strategy.exit_profit_only
assert isinstance(strategy.exit_profit_only, bool) assert isinstance(strategy.exit_profit_only, bool)
# must be inserted to configuration # must be inserted to configuration
assert 'exit_profit_only' in default_conf assert "exit_profit_only" in default_conf
assert not default_conf['exit_profit_only'] assert not default_conf["exit_profit_only"]
default_conf.update({ default_conf.update(
'strategy': CURRENT_TEST_STRATEGY, {
'exit_profit_only': True, "strategy": CURRENT_TEST_STRATEGY,
}) "exit_profit_only": True,
}
)
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert strategy.exit_profit_only assert strategy.exit_profit_only
@ -367,138 +372,135 @@ def test_strategy_override_use_exit_profit_only(caplog, default_conf):
def test_strategy_max_open_trades_infinity_from_strategy(caplog, default_conf): def test_strategy_max_open_trades_infinity_from_strategy(caplog, default_conf):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
default_conf.update({ default_conf.update(
'strategy': CURRENT_TEST_STRATEGY, {
}) "strategy": CURRENT_TEST_STRATEGY,
del default_conf['max_open_trades'] }
)
del default_conf["max_open_trades"]
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
# this test assumes -1 set to 'max_open_trades' in CURRENT_TEST_STRATEGY # this test assumes -1 set to 'max_open_trades' in CURRENT_TEST_STRATEGY
assert strategy.max_open_trades == float('inf') assert strategy.max_open_trades == float("inf")
assert default_conf['max_open_trades'] == float('inf') assert default_conf["max_open_trades"] == float("inf")
def test_strategy_max_open_trades_infinity_from_config(caplog, default_conf, mocker): def test_strategy_max_open_trades_infinity_from_config(caplog, default_conf, mocker):
caplog.set_level(logging.INFO) caplog.set_level(logging.INFO)
default_conf.update({ default_conf.update(
'strategy': CURRENT_TEST_STRATEGY, {"strategy": CURRENT_TEST_STRATEGY, "max_open_trades": -1, "exchange": "binance"}
'max_open_trades': -1, )
'exchange': 'binance'
})
configuration = Configuration(args=default_conf) configuration = Configuration(args=default_conf)
parsed_config = configuration.get_config() parsed_config = configuration.get_config()
assert parsed_config['max_open_trades'] == float('inf') assert parsed_config["max_open_trades"] == float("inf")
strategy = StrategyResolver.load_strategy(parsed_config) strategy = StrategyResolver.load_strategy(parsed_config)
assert strategy.max_open_trades == float('inf') assert strategy.max_open_trades == float("inf")
@pytest.mark.filterwarnings("ignore:deprecated") @pytest.mark.filterwarnings("ignore:deprecated")
def test_missing_implements(default_conf, caplog): def test_missing_implements(default_conf, caplog):
default_location = Path(__file__).parent / "strats" default_location = Path(__file__).parent / "strats"
default_conf.update({'strategy': 'StrategyTestV2', default_conf.update({"strategy": "StrategyTestV2", "strategy_path": default_location})
'strategy_path': default_location})
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
log_has_re(r"DEPRECATED: .*use_sell_signal.*use_exit_signal.", caplog) log_has_re(r"DEPRECATED: .*use_sell_signal.*use_exit_signal.", caplog)
default_conf['trading_mode'] = 'futures' default_conf["trading_mode"] = "futures"
with pytest.raises(OperationalException, with pytest.raises(
match=r"DEPRECATED: .*use_sell_signal.*use_exit_signal."): OperationalException, match=r"DEPRECATED: .*use_sell_signal.*use_exit_signal."
):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
default_conf['trading_mode'] = 'spot' default_conf["trading_mode"] = "spot"
default_location = Path(__file__).parent / "strats/broken_strats" default_location = Path(__file__).parent / "strats/broken_strats"
default_conf.update({'strategy': 'TestStrategyNoImplements', default_conf.update({"strategy": "TestStrategyNoImplements", "strategy_path": default_location})
'strategy_path': default_location}) with pytest.raises(
with pytest.raises(OperationalException, OperationalException, match=r"`populate_entry_trend` or `populate_buy_trend`.*"
match=r"`populate_entry_trend` or `populate_buy_trend`.*"): ):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
default_conf['strategy'] = 'TestStrategyNoImplementSell' default_conf["strategy"] = "TestStrategyNoImplementSell"
with pytest.raises(OperationalException, with pytest.raises(
match=r"`populate_exit_trend` or `populate_sell_trend`.*"): OperationalException, match=r"`populate_exit_trend` or `populate_sell_trend`.*"
):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
# Futures mode is more strict ... # Futures mode is more strict ...
default_conf['trading_mode'] = 'futures' default_conf["trading_mode"] = "futures"
with pytest.raises(OperationalException, with pytest.raises(OperationalException, match=r"`populate_exit_trend` must be implemented.*"):
match=r"`populate_exit_trend` must be implemented.*"):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
default_conf['strategy'] = 'TestStrategyNoImplements' default_conf["strategy"] = "TestStrategyNoImplements"
with pytest.raises(OperationalException, with pytest.raises(OperationalException, match=r"`populate_entry_trend` must be implemented.*"):
match=r"`populate_entry_trend` must be implemented.*"):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
default_conf['strategy'] = 'TestStrategyImplementCustomSell' default_conf["strategy"] = "TestStrategyImplementCustomSell"
with pytest.raises(OperationalException, with pytest.raises(
match=r"Please migrate your implementation of `custom_sell`.*"): OperationalException, match=r"Please migrate your implementation of `custom_sell`.*"
):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
default_conf['strategy'] = 'TestStrategyImplementBuyTimeout' default_conf["strategy"] = "TestStrategyImplementBuyTimeout"
with pytest.raises(OperationalException, with pytest.raises(
match=r"Please migrate your implementation of `check_buy_timeout`.*"): OperationalException, match=r"Please migrate your implementation of `check_buy_timeout`.*"
):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
default_conf['strategy'] = 'TestStrategyImplementSellTimeout' default_conf["strategy"] = "TestStrategyImplementSellTimeout"
with pytest.raises(OperationalException, with pytest.raises(
match=r"Please migrate your implementation of `check_sell_timeout`.*"): OperationalException, match=r"Please migrate your implementation of `check_sell_timeout`.*"
):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
def test_call_deprecated_function(default_conf): def test_call_deprecated_function(default_conf):
default_location = Path(__file__).parent / "strats/broken_strats/" default_location = Path(__file__).parent / "strats/broken_strats/"
del default_conf['timeframe'] del default_conf["timeframe"]
default_conf.update({'strategy': 'TestStrategyLegacyV1', default_conf.update({"strategy": "TestStrategyLegacyV1", "strategy_path": default_location})
'strategy_path': default_location}) with pytest.raises(
with pytest.raises(OperationalException, OperationalException, match=r"Strategy Interface v1 is no longer supported.*"
match=r"Strategy Interface v1 is no longer supported.*"): ):
StrategyResolver.load_strategy(default_conf) StrategyResolver.load_strategy(default_conf)
def test_strategy_interface_versioning(dataframe_1m, default_conf): def test_strategy_interface_versioning(dataframe_1m, default_conf):
default_conf.update({'strategy': 'StrategyTestV2'}) default_conf.update({"strategy": "StrategyTestV2"})
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
metadata = {'pair': 'ETH/BTC'} metadata = {"pair": "ETH/BTC"}
assert strategy.INTERFACE_VERSION == 2 assert strategy.INTERFACE_VERSION == 2
indicator_df = strategy.advise_indicators(dataframe_1m, metadata=metadata) indicator_df = strategy.advise_indicators(dataframe_1m, metadata=metadata)
assert isinstance(indicator_df, DataFrame) assert isinstance(indicator_df, DataFrame)
assert 'adx' in indicator_df.columns assert "adx" in indicator_df.columns
enterdf = strategy.advise_entry(dataframe_1m, metadata=metadata) enterdf = strategy.advise_entry(dataframe_1m, metadata=metadata)
assert isinstance(enterdf, DataFrame) assert isinstance(enterdf, DataFrame)
assert 'buy' not in enterdf.columns assert "buy" not in enterdf.columns
assert 'enter_long' in enterdf.columns assert "enter_long" in enterdf.columns
exitdf = strategy.advise_exit(dataframe_1m, metadata=metadata) exitdf = strategy.advise_exit(dataframe_1m, metadata=metadata)
assert isinstance(exitdf, DataFrame) assert isinstance(exitdf, DataFrame)
assert 'sell' not in exitdf assert "sell" not in exitdf
assert 'exit_long' in exitdf assert "exit_long" in exitdf
def test_strategy_ft_load_params_from_file(mocker, default_conf): def test_strategy_ft_load_params_from_file(mocker, default_conf):
default_conf.update({'strategy': 'StrategyTestV2'}) default_conf.update({"strategy": "StrategyTestV2"})
del default_conf['max_open_trades'] del default_conf["max_open_trades"]
mocker.patch('freqtrade.strategy.hyper.HyperStrategyMixin.load_params_from_file', mocker.patch(
return_value={ "freqtrade.strategy.hyper.HyperStrategyMixin.load_params_from_file",
'params': { return_value={"params": {"max_open_trades": {"max_open_trades": -1}}},
'max_open_trades': { )
'max_open_trades': -1
}
}
})
strategy = StrategyResolver.load_strategy(default_conf) strategy = StrategyResolver.load_strategy(default_conf)
assert strategy.max_open_trades == float('inf') assert strategy.max_open_trades == float("inf")
assert strategy.config['max_open_trades'] == float('inf') assert strategy.config["max_open_trades"] == float("inf")