From adeb93dc9cceb7218bb46e10368465006d98d0eb Mon Sep 17 00:00:00 2001 From: Matthias Date: Sun, 12 May 2024 15:45:55 +0200 Subject: [PATCH] ruff format: update strategy tests --- tests/strategy/test_default_strategy.py | 82 ++- tests/strategy/test_interface.py | 858 +++++++++++++----------- tests/strategy/test_strategy_helpers.py | 423 ++++++------ tests/strategy/test_strategy_loading.py | 424 ++++++------ 4 files changed, 952 insertions(+), 835 deletions(-) diff --git a/tests/strategy/test_default_strategy.py b/tests/strategy/test_default_strategy.py index afe7fc97a..494e374c4 100644 --- a/tests/strategy/test_default_strategy.py +++ b/tests/strategy/test_default_strategy.py @@ -9,22 +9,25 @@ from .strats.strategy_test_v3 import StrategyTestV3 def test_strategy_test_v3_structure(): - assert hasattr(StrategyTestV3, 'minimal_roi') - assert hasattr(StrategyTestV3, 'stoploss') - assert hasattr(StrategyTestV3, 'timeframe') - assert hasattr(StrategyTestV3, 'populate_indicators') - assert hasattr(StrategyTestV3, 'populate_entry_trend') - assert hasattr(StrategyTestV3, 'populate_exit_trend') + assert hasattr(StrategyTestV3, "minimal_roi") + assert hasattr(StrategyTestV3, "stoploss") + assert hasattr(StrategyTestV3, "timeframe") + assert hasattr(StrategyTestV3, "populate_indicators") + assert hasattr(StrategyTestV3, "populate_entry_trend") + assert hasattr(StrategyTestV3, "populate_exit_trend") -@pytest.mark.parametrize('is_short,side', [ - (True, 'short'), - (False, 'long'), -]) +@pytest.mark.parametrize( + "is_short,side", + [ + (True, "short"), + (False, "long"), + ], +) def test_strategy_test_v3(dataframe_1m, fee, is_short, side): strategy = StrategyTestV3({}) - metadata = {'pair': 'ETH/BTC'} + metadata = {"pair": "ETH/BTC"} assert isinstance(strategy.minimal_roi, dict) assert isinstance(strategy.stoploss, float) assert isinstance(strategy.timeframe, str) @@ -34,23 +37,46 @@ def test_strategy_test_v3(dataframe_1m, fee, is_short, side): assert isinstance(strategy.populate_sell_trend(indicators, metadata), DataFrame) trade = Trade( - open_rate=19_000, - amount=0.1, - pair='ETH/BTC', - fee_open=fee.return_value, - is_short=is_short + open_rate=19_000, amount=0.1, pair="ETH/BTC", fee_open=fee.return_value, is_short=is_short ) - assert strategy.confirm_trade_entry(pair='ETH/BTC', order_type='limit', amount=0.1, - rate=20000, time_in_force='gtc', - current_time=datetime.now(timezone.utc), - side=side, entry_tag=None) is True - assert strategy.confirm_trade_exit(pair='ETH/BTC', trade=trade, order_type='limit', amount=0.1, - rate=20000, time_in_force='gtc', exit_reason='roi', - sell_reason='roi', - current_time=datetime.now(timezone.utc), - side=side) is True + assert ( + strategy.confirm_trade_entry( + pair="ETH/BTC", + order_type="limit", + amount=0.1, + rate=20000, + time_in_force="gtc", + current_time=datetime.now(timezone.utc), + side=side, + entry_tag=None, + ) + is True + ) + assert ( + strategy.confirm_trade_exit( + pair="ETH/BTC", + trade=trade, + order_type="limit", + amount=0.1, + rate=20000, + time_in_force="gtc", + exit_reason="roi", + sell_reason="roi", + current_time=datetime.now(timezone.utc), + side=side, + ) + is True + ) - assert strategy.custom_stoploss(pair='ETH/BTC', trade=trade, current_time=datetime.now(), - current_rate=20_000, current_profit=0.05, after_fill=False - ) == strategy.stoploss + assert ( + strategy.custom_stoploss( + pair="ETH/BTC", + trade=trade, + current_time=datetime.now(), + current_rate=20_000, + current_profit=0.05, + after_fill=False, + ) + == strategy.stoploss + ) diff --git a/tests/strategy/test_interface.py b/tests/strategy/test_interface.py index d6ac74005..a7c258a17 100644 --- a/tests/strategy/test_interface.py +++ b/tests/strategy/test_interface.py @@ -45,167 +45,165 @@ _STRATEGY.dp = DataProvider({}, None, None) def test_returns_latest_signal(ohlcv_history): - ohlcv_history.loc[1, 'date'] = dt_now() + ohlcv_history.loc[1, "date"] = dt_now() # Take a copy to correctly modify the call mocked_history = ohlcv_history.copy() - mocked_history['enter_long'] = 0 - mocked_history['exit_long'] = 0 - mocked_history['enter_short'] = 0 - mocked_history['exit_short'] = 0 + mocked_history["enter_long"] = 0 + mocked_history["exit_long"] = 0 + mocked_history["enter_short"] = 0 + mocked_history["exit_short"] = 0 # Set tags in lines that don't matter to test nan in the sell line - mocked_history.loc[0, 'enter_tag'] = 'wrong_line' - mocked_history.loc[0, 'exit_tag'] = 'wrong_line' - mocked_history.loc[1, 'exit_long'] = 1 + mocked_history.loc[0, "enter_tag"] = "wrong_line" + mocked_history.loc[0, "exit_tag"] = "wrong_line" + mocked_history.loc[1, "exit_long"] = 1 - assert _STRATEGY.get_entry_signal('ETH/BTC', '5m', mocked_history) == (None, None) - assert _STRATEGY.get_exit_signal('ETH/BTC', '5m', mocked_history) == (False, True, None) - assert _STRATEGY.get_exit_signal('ETH/BTC', '5m', mocked_history, True) == (False, False, None) - mocked_history.loc[1, 'exit_long'] = 0 - mocked_history.loc[1, 'enter_long'] = 1 + assert _STRATEGY.get_entry_signal("ETH/BTC", "5m", mocked_history) == (None, None) + assert _STRATEGY.get_exit_signal("ETH/BTC", "5m", mocked_history) == (False, True, None) + assert _STRATEGY.get_exit_signal("ETH/BTC", "5m", mocked_history, True) == (False, False, None) + mocked_history.loc[1, "exit_long"] = 0 + mocked_history.loc[1, "enter_long"] = 1 - assert _STRATEGY.get_entry_signal( - 'ETH/BTC', '5m', mocked_history) == (SignalDirection.LONG, None) - assert _STRATEGY.get_exit_signal('ETH/BTC', '5m', mocked_history) == (True, False, None) - assert _STRATEGY.get_exit_signal('ETH/BTC', '5m', mocked_history, True) == (False, False, None) - mocked_history.loc[1, 'exit_long'] = 0 - mocked_history.loc[1, 'enter_long'] = 0 + assert _STRATEGY.get_entry_signal("ETH/BTC", "5m", mocked_history) == ( + SignalDirection.LONG, + None, + ) + assert _STRATEGY.get_exit_signal("ETH/BTC", "5m", mocked_history) == (True, False, None) + assert _STRATEGY.get_exit_signal("ETH/BTC", "5m", mocked_history, True) == (False, False, None) + mocked_history.loc[1, "exit_long"] = 0 + mocked_history.loc[1, "enter_long"] = 0 - assert _STRATEGY.get_entry_signal('ETH/BTC', '5m', mocked_history) == (None, None) - assert _STRATEGY.get_exit_signal('ETH/BTC', '5m', mocked_history) == (False, False, None) - assert _STRATEGY.get_exit_signal('ETH/BTC', '5m', mocked_history, True) == (False, False, None) - mocked_history.loc[1, 'exit_long'] = 0 - mocked_history.loc[1, 'enter_long'] = 1 - mocked_history.loc[1, 'enter_tag'] = 'buy_signal_01' + assert _STRATEGY.get_entry_signal("ETH/BTC", "5m", mocked_history) == (None, None) + assert _STRATEGY.get_exit_signal("ETH/BTC", "5m", mocked_history) == (False, False, None) + assert _STRATEGY.get_exit_signal("ETH/BTC", "5m", mocked_history, True) == (False, False, None) + mocked_history.loc[1, "exit_long"] = 0 + mocked_history.loc[1, "enter_long"] = 1 + mocked_history.loc[1, "enter_tag"] = "buy_signal_01" - assert _STRATEGY.get_entry_signal( - 'ETH/BTC', '5m', mocked_history) == (SignalDirection.LONG, 'buy_signal_01') - assert _STRATEGY.get_exit_signal('ETH/BTC', '5m', mocked_history) == (True, False, None) - assert _STRATEGY.get_exit_signal('ETH/BTC', '5m', mocked_history, True) == (False, False, None) + assert _STRATEGY.get_entry_signal("ETH/BTC", "5m", mocked_history) == ( + SignalDirection.LONG, + "buy_signal_01", + ) + assert _STRATEGY.get_exit_signal("ETH/BTC", "5m", mocked_history) == (True, False, None) + assert _STRATEGY.get_exit_signal("ETH/BTC", "5m", mocked_history, True) == (False, False, None) - mocked_history.loc[1, 'exit_long'] = 0 - mocked_history.loc[1, 'enter_long'] = 0 - mocked_history.loc[1, 'enter_short'] = 1 - mocked_history.loc[1, 'exit_short'] = 0 - mocked_history.loc[1, 'enter_tag'] = 'sell_signal_01' + mocked_history.loc[1, "exit_long"] = 0 + mocked_history.loc[1, "enter_long"] = 0 + mocked_history.loc[1, "enter_short"] = 1 + mocked_history.loc[1, "exit_short"] = 0 + mocked_history.loc[1, "enter_tag"] = "sell_signal_01" # Don't provide short signal while in spot mode - assert _STRATEGY.get_entry_signal('ETH/BTC', '5m', mocked_history) == (None, None) + assert _STRATEGY.get_entry_signal("ETH/BTC", "5m", mocked_history) == (None, None) - _STRATEGY.config['trading_mode'] = 'futures' + _STRATEGY.config["trading_mode"] = "futures" # Short signal gets ignored as can_short is not set. - assert _STRATEGY.get_entry_signal('ETH/BTC', '5m', mocked_history) == (None, None) + assert _STRATEGY.get_entry_signal("ETH/BTC", "5m", mocked_history) == (None, None) _STRATEGY.can_short = True - assert _STRATEGY.get_entry_signal( - 'ETH/BTC', '5m', mocked_history) == (SignalDirection.SHORT, 'sell_signal_01') - assert _STRATEGY.get_exit_signal('ETH/BTC', '5m', mocked_history) == (False, False, None) - assert _STRATEGY.get_exit_signal('ETH/BTC', '5m', mocked_history, True) == (True, False, None) + assert _STRATEGY.get_entry_signal("ETH/BTC", "5m", mocked_history) == ( + SignalDirection.SHORT, + "sell_signal_01", + ) + assert _STRATEGY.get_exit_signal("ETH/BTC", "5m", mocked_history) == (False, False, None) + assert _STRATEGY.get_exit_signal("ETH/BTC", "5m", mocked_history, True) == (True, False, None) - mocked_history.loc[1, 'enter_short'] = 0 - mocked_history.loc[1, 'exit_short'] = 1 - mocked_history.loc[1, 'exit_tag'] = 'sell_signal_02' - assert _STRATEGY.get_entry_signal( - 'ETH/BTC', '5m', mocked_history) == (None, None) - assert _STRATEGY.get_exit_signal( - 'ETH/BTC', '5m', mocked_history) == (False, False, 'sell_signal_02') - assert _STRATEGY.get_exit_signal( - 'ETH/BTC', '5m', mocked_history, True) == (False, True, 'sell_signal_02') + mocked_history.loc[1, "enter_short"] = 0 + mocked_history.loc[1, "exit_short"] = 1 + mocked_history.loc[1, "exit_tag"] = "sell_signal_02" + assert _STRATEGY.get_entry_signal("ETH/BTC", "5m", mocked_history) == (None, None) + assert _STRATEGY.get_exit_signal("ETH/BTC", "5m", mocked_history) == ( + False, + False, + "sell_signal_02", + ) + assert _STRATEGY.get_exit_signal("ETH/BTC", "5m", mocked_history, True) == ( + False, + True, + "sell_signal_02", + ) _STRATEGY.can_short = False - _STRATEGY.config['trading_mode'] = 'spot' + _STRATEGY.config["trading_mode"] = "spot" def test_analyze_pair_empty(mocker, caplog, ohlcv_history): - mocker.patch.object(_STRATEGY.dp, 'ohlcv', return_value=ohlcv_history) - mocker.patch.object( - _STRATEGY, '_analyze_ticker_internal', - return_value=DataFrame([]) - ) - mocker.patch.object(_STRATEGY, 'assert_df') + mocker.patch.object(_STRATEGY.dp, "ohlcv", return_value=ohlcv_history) + mocker.patch.object(_STRATEGY, "_analyze_ticker_internal", return_value=DataFrame([])) + mocker.patch.object(_STRATEGY, "assert_df") - _STRATEGY.analyze_pair('ETH/BTC') + _STRATEGY.analyze_pair("ETH/BTC") - assert log_has('Empty dataframe for pair ETH/BTC', caplog) + assert log_has("Empty dataframe for pair ETH/BTC", caplog) def test_get_signal_empty(default_conf, caplog): assert (None, None) == _STRATEGY.get_latest_candle( - 'foo', default_conf['timeframe'], DataFrame() + "foo", default_conf["timeframe"], DataFrame() ) - assert log_has('Empty candle (OHLCV) data for pair foo', caplog) + assert log_has("Empty candle (OHLCV) data for pair foo", caplog) caplog.clear() - assert (None, None) == _STRATEGY.get_latest_candle('bar', default_conf['timeframe'], None) - assert log_has('Empty candle (OHLCV) data for pair bar', caplog) + assert (None, None) == _STRATEGY.get_latest_candle("bar", default_conf["timeframe"], None) + assert log_has("Empty candle (OHLCV) data for pair bar", caplog) caplog.clear() assert (None, None) == _STRATEGY.get_latest_candle( - 'baz', - default_conf['timeframe'], - DataFrame([]) + "baz", default_conf["timeframe"], DataFrame([]) ) - assert log_has('Empty candle (OHLCV) data for pair baz', caplog) + assert log_has("Empty candle (OHLCV) data for pair baz", caplog) def test_get_signal_exception_valueerror(mocker, caplog, ohlcv_history): caplog.set_level(logging.INFO) - mocker.patch.object(_STRATEGY.dp, 'ohlcv', return_value=ohlcv_history) - mocker.patch.object( - _STRATEGY, '_analyze_ticker_internal', - side_effect=ValueError('xyz') - ) - _STRATEGY.analyze_pair('foo') - assert log_has_re(r'Strategy caused the following exception: xyz.*', caplog) + mocker.patch.object(_STRATEGY.dp, "ohlcv", return_value=ohlcv_history) + mocker.patch.object(_STRATEGY, "_analyze_ticker_internal", side_effect=ValueError("xyz")) + _STRATEGY.analyze_pair("foo") + assert log_has_re(r"Strategy caused the following exception: xyz.*", caplog) caplog.clear() mocker.patch.object( - _STRATEGY, 'analyze_ticker', - side_effect=Exception('invalid ticker history ') + _STRATEGY, "analyze_ticker", side_effect=Exception("invalid ticker history ") ) - _STRATEGY.analyze_pair('foo') - assert log_has_re(r'Strategy caused the following exception: xyz.*', caplog) + _STRATEGY.analyze_pair("foo") + assert log_has_re(r"Strategy caused the following exception: xyz.*", caplog) def test_get_signal_old_dataframe(default_conf, mocker, caplog, ohlcv_history): # default_conf defines a 5m interval. we check interval * 2 + 5m # this is necessary as the last candle is removed (partial candles) by default - ohlcv_history.loc[1, 'date'] = dt_now() - timedelta(minutes=16) + ohlcv_history.loc[1, "date"] = dt_now() - timedelta(minutes=16) # Take a copy to correctly modify the call mocked_history = ohlcv_history.copy() - mocked_history['exit_long'] = 0 - mocked_history['enter_long'] = 0 - mocked_history.loc[1, 'enter_long'] = 1 + mocked_history["exit_long"] = 0 + mocked_history["enter_long"] = 0 + mocked_history.loc[1, "enter_long"] = 1 caplog.set_level(logging.INFO) - mocker.patch.object(_STRATEGY, 'assert_df') + mocker.patch.object(_STRATEGY, "assert_df") assert (None, None) == _STRATEGY.get_latest_candle( - 'xyz', - default_conf['timeframe'], - mocked_history + "xyz", default_conf["timeframe"], mocked_history ) - assert log_has('Outdated history for pair xyz. Last tick is 16 minutes old', caplog) + assert log_has("Outdated history for pair xyz. Last tick is 16 minutes old", caplog) def test_get_signal_no_sell_column(default_conf, mocker, caplog, ohlcv_history): # default_conf defines a 5m interval. we check interval * 2 + 5m # this is necessary as the last candle is removed (partial candles) by default - ohlcv_history.loc[1, 'date'] = dt_now() + ohlcv_history.loc[1, "date"] = dt_now() # Take a copy to correctly modify the call mocked_history = ohlcv_history.copy() # Intentionally don't set sell column # mocked_history['sell'] = 0 - mocked_history['enter_long'] = 0 - mocked_history.loc[1, 'enter_long'] = 1 + mocked_history["enter_long"] = 0 + mocked_history.loc[1, "enter_long"] = 1 caplog.set_level(logging.INFO) - mocker.patch.object(_STRATEGY, 'assert_df') + mocker.patch.object(_STRATEGY, "assert_df") assert (SignalDirection.LONG, None) == _STRATEGY.get_entry_signal( - 'xyz', - default_conf['timeframe'], - mocked_history + "xyz", default_conf["timeframe"], mocked_history ) @@ -217,77 +215,102 @@ def test_ignore_expired_candle(default_conf): # Add 1 candle length as the "latest date" defines candle open. current_time = latest_date + timedelta(seconds=80 + 300) - assert strategy.ignore_expired_candle( - latest_date=latest_date, - current_time=current_time, - timeframe_seconds=300, - enter=True - ) is True + assert ( + strategy.ignore_expired_candle( + latest_date=latest_date, current_time=current_time, timeframe_seconds=300, enter=True + ) + is True + ) current_time = latest_date + timedelta(seconds=30 + 300) - assert strategy.ignore_expired_candle( - latest_date=latest_date, - current_time=current_time, - timeframe_seconds=300, - enter=True - ) is not True + assert ( + strategy.ignore_expired_candle( + latest_date=latest_date, current_time=current_time, timeframe_seconds=300, enter=True + ) + is not True + ) def test_assert_df_raise(mocker, caplog, ohlcv_history): - ohlcv_history.loc[1, 'date'] = dt_now() - timedelta(minutes=16) + ohlcv_history.loc[1, "date"] = dt_now() - timedelta(minutes=16) # Take a copy to correctly modify the call mocked_history = ohlcv_history.copy() - mocked_history['sell'] = 0 - mocked_history['buy'] = 0 - mocked_history.loc[1, 'buy'] = 1 + mocked_history["sell"] = 0 + mocked_history["buy"] = 0 + mocked_history.loc[1, "buy"] = 1 caplog.set_level(logging.INFO) - mocker.patch.object(_STRATEGY.dp, 'ohlcv', return_value=ohlcv_history) - mocker.patch.object(_STRATEGY.dp, 'get_analyzed_dataframe', return_value=(mocked_history, 0)) - mocker.patch.object( - _STRATEGY, 'assert_df', - side_effect=StrategyError('Dataframe returned...') + mocker.patch.object(_STRATEGY.dp, "ohlcv", return_value=ohlcv_history) + mocker.patch.object(_STRATEGY.dp, "get_analyzed_dataframe", return_value=(mocked_history, 0)) + mocker.patch.object(_STRATEGY, "assert_df", side_effect=StrategyError("Dataframe returned...")) + _STRATEGY.analyze_pair("xyz") + assert log_has( + "Unable to analyze candle (OHLCV) data for pair xyz: Dataframe returned...", caplog ) - _STRATEGY.analyze_pair('xyz') - assert log_has('Unable to analyze candle (OHLCV) data for pair xyz: Dataframe returned...', - caplog) def test_assert_df(ohlcv_history, caplog): df_len = len(ohlcv_history) - 1 - ohlcv_history.loc[:, 'enter_long'] = 0 - ohlcv_history.loc[:, 'exit_long'] = 0 + ohlcv_history.loc[:, "enter_long"] = 0 + ohlcv_history.loc[:, "exit_long"] = 0 # Ensure it's running when passed correctly - _STRATEGY.assert_df(ohlcv_history, len(ohlcv_history), - ohlcv_history.loc[df_len, 'close'], ohlcv_history.loc[df_len, 'date']) + _STRATEGY.assert_df( + ohlcv_history, + len(ohlcv_history), + ohlcv_history.loc[df_len, "close"], + ohlcv_history.loc[df_len, "date"], + ) with pytest.raises(StrategyError, match=r"Dataframe returned from strategy.*length\."): - _STRATEGY.assert_df(ohlcv_history, len(ohlcv_history) + 1, - ohlcv_history.loc[df_len, 'close'], ohlcv_history.loc[df_len, 'date']) + _STRATEGY.assert_df( + ohlcv_history, + len(ohlcv_history) + 1, + ohlcv_history.loc[df_len, "close"], + ohlcv_history.loc[df_len, "date"], + ) - with pytest.raises(StrategyError, - match=r"Dataframe returned from strategy.*last close price\."): - _STRATEGY.assert_df(ohlcv_history, len(ohlcv_history), - ohlcv_history.loc[df_len, 'close'] + 0.01, - ohlcv_history.loc[df_len, 'date']) - with pytest.raises(StrategyError, - match=r"Dataframe returned from strategy.*last date\."): - _STRATEGY.assert_df(ohlcv_history, len(ohlcv_history), - ohlcv_history.loc[df_len, 'close'], ohlcv_history.loc[0, 'date']) - with pytest.raises(StrategyError, - match=r"No dataframe returned \(return statement missing\?\)."): - _STRATEGY.assert_df(None, len(ohlcv_history), - ohlcv_history.loc[df_len, 'close'], ohlcv_history.loc[0, 'date']) - with pytest.raises(StrategyError, - match="enter_long/buy column not set."): - _STRATEGY.assert_df(ohlcv_history.drop('enter_long', axis=1), len(ohlcv_history), - ohlcv_history.loc[df_len, 'close'], ohlcv_history.loc[0, 'date']) + with pytest.raises( + StrategyError, match=r"Dataframe returned from strategy.*last close price\." + ): + _STRATEGY.assert_df( + ohlcv_history, + len(ohlcv_history), + ohlcv_history.loc[df_len, "close"] + 0.01, + ohlcv_history.loc[df_len, "date"], + ) + with pytest.raises(StrategyError, match=r"Dataframe returned from strategy.*last date\."): + _STRATEGY.assert_df( + ohlcv_history, + len(ohlcv_history), + ohlcv_history.loc[df_len, "close"], + ohlcv_history.loc[0, "date"], + ) + with pytest.raises( + StrategyError, match=r"No dataframe returned \(return statement missing\?\)." + ): + _STRATEGY.assert_df( + None, + len(ohlcv_history), + ohlcv_history.loc[df_len, "close"], + ohlcv_history.loc[0, "date"], + ) + with pytest.raises(StrategyError, match="enter_long/buy column not set."): + _STRATEGY.assert_df( + ohlcv_history.drop("enter_long", axis=1), + len(ohlcv_history), + ohlcv_history.loc[df_len, "close"], + ohlcv_history.loc[0, "date"], + ) _STRATEGY.disable_dataframe_checks = True caplog.clear() - _STRATEGY.assert_df(ohlcv_history, len(ohlcv_history), - ohlcv_history.loc[2, 'close'], ohlcv_history.loc[0, 'date']) + _STRATEGY.assert_df( + ohlcv_history, + len(ohlcv_history), + ohlcv_history.loc[2, "close"], + ohlcv_history.loc[0, "date"], + ) assert log_has_re(r"Dataframe returned from strategy.*last date\.", caplog) # reset to avoid problems in other tests due to test leakage _STRATEGY.disable_dataframe_checks = False @@ -296,26 +319,24 @@ def test_assert_df(ohlcv_history, caplog): def test_advise_all_indicators(default_conf, testdatadir) -> None: strategy = StrategyResolver.load_strategy(default_conf) - timerange = TimeRange.parse_timerange('1510694220-1510700340') - data = load_data(testdatadir, '1m', ['UNITTEST/BTC'], timerange=timerange, - fill_up_missing=True) + timerange = TimeRange.parse_timerange("1510694220-1510700340") + data = load_data(testdatadir, "1m", ["UNITTEST/BTC"], timerange=timerange, fill_up_missing=True) processed = strategy.advise_all_indicators(data) - assert len(processed['UNITTEST/BTC']) == 103 + assert len(processed["UNITTEST/BTC"]) == 103 def test_freqai_not_initialized(default_conf) -> None: strategy = StrategyResolver.load_strategy(default_conf) strategy.ft_bot_start() - with pytest.raises(OperationalException, match=r'freqAI is not enabled\.'): + with pytest.raises(OperationalException, match=r"freqAI is not enabled\."): strategy.freqai.start() def test_advise_all_indicators_copy(mocker, default_conf, testdatadir) -> None: strategy = StrategyResolver.load_strategy(default_conf) - aimock = mocker.patch('freqtrade.strategy.interface.IStrategy.advise_indicators') - timerange = TimeRange.parse_timerange('1510694220-1510700340') - data = load_data(testdatadir, '1m', ['UNITTEST/BTC'], timerange=timerange, - fill_up_missing=True) + aimock = mocker.patch("freqtrade.strategy.interface.IStrategy.advise_indicators") + timerange = TimeRange.parse_timerange("1510694220-1510700340") + data = load_data(testdatadir, "1m", ["UNITTEST/BTC"], timerange=timerange, fill_up_missing=True) strategy.advise_all_indicators(data) assert aimock.call_count == 1 # Ensure that a copy of the dataframe is passed to advice_indicators @@ -323,21 +344,19 @@ def test_advise_all_indicators_copy(mocker, default_conf, testdatadir) -> None: def test_min_roi_reached(default_conf, fee) -> None: - # Use list to confirm sequence does not matter - min_roi_list = [{20: 0.05, 55: 0.01, 0: 0.1}, - {0: 0.1, 20: 0.05, 55: 0.01}] + min_roi_list = [{20: 0.05, 55: 0.01, 0: 0.1}, {0: 0.1, 20: 0.05, 55: 0.01}] for roi in min_roi_list: strategy = StrategyResolver.load_strategy(default_conf) strategy.minimal_roi = roi trade = Trade( - pair='ETH/BTC', + pair="ETH/BTC", stake_amount=0.001, amount=5, open_date=dt_now() - timedelta(hours=1), fee_open=fee.return_value, fee_close=fee.return_value, - exchange='binance', + exchange="binance", open_rate=1, ) @@ -352,30 +371,22 @@ def test_min_roi_reached(default_conf, fee) -> None: def test_min_roi_reached2(default_conf, fee) -> None: - # test with ROI raising after last interval - min_roi_list = [{20: 0.07, - 30: 0.05, - 55: 0.30, - 0: 0.1 - }, - {0: 0.1, - 20: 0.07, - 30: 0.05, - 55: 0.30 - }, - ] + min_roi_list = [ + {20: 0.07, 30: 0.05, 55: 0.30, 0: 0.1}, + {0: 0.1, 20: 0.07, 30: 0.05, 55: 0.30}, + ] for roi in min_roi_list: strategy = StrategyResolver.load_strategy(default_conf) strategy.minimal_roi = roi trade = Trade( - pair='ETH/BTC', + pair="ETH/BTC", stake_amount=0.001, amount=5, open_date=dt_now() - timedelta(hours=1), fee_open=fee.return_value, fee_close=fee.return_value, - exchange='binance', + exchange="binance", open_rate=1, ) @@ -394,22 +405,22 @@ def test_min_roi_reached2(default_conf, fee) -> None: def test_min_roi_reached3(default_conf, fee) -> None: - # test for issue #1948 - min_roi = {20: 0.07, - 30: 0.05, - 55: 0.30, - } + min_roi = { + 20: 0.07, + 30: 0.05, + 55: 0.30, + } strategy = StrategyResolver.load_strategy(default_conf) strategy.minimal_roi = min_roi trade = Trade( - pair='ETH/BTC', + pair="ETH/BTC", stake_amount=0.001, amount=5, open_date=dt_now() - timedelta(hours=1), fee_open=fee.return_value, fee_close=fee.return_value, - exchange='binance', + exchange="binance", open_rate=1, ) @@ -428,42 +439,117 @@ def test_min_roi_reached3(default_conf, fee) -> None: @pytest.mark.parametrize( - 'profit,adjusted,expected,liq,trailing,custom,profit2,adjusted2,expected2,custom_stop', [ + "profit,adjusted,expected,liq,trailing,custom,profit2,adjusted2,expected2,custom_stop", + [ # Profit, adjusted stoploss(absolute), profit for 2nd call, enable trailing, # enable custom stoploss, expected after 1st call, expected after 2nd call (0.2, 0.9, ExitType.NONE, None, False, False, 0.3, 0.9, ExitType.NONE, None), (0.2, 0.9, ExitType.NONE, None, False, False, -0.2, 0.9, ExitType.STOP_LOSS, None), (0.2, 0.9, ExitType.NONE, 0.92, False, False, -0.09, 0.9, ExitType.LIQUIDATION, None), - (0.2, 1.14, ExitType.NONE, None, True, False, 0.05, 1.14, ExitType.TRAILING_STOP_LOSS, - None), + ( + 0.2, + 1.14, + ExitType.NONE, + None, + True, + False, + 0.05, + 1.14, + ExitType.TRAILING_STOP_LOSS, + None, + ), (0.01, 0.96, ExitType.NONE, None, True, False, 0.05, 1, ExitType.NONE, None), (0.05, 1, ExitType.NONE, None, True, False, -0.01, 1, ExitType.TRAILING_STOP_LOSS, None), # Default custom case - trails with 10% (0.05, 0.95, ExitType.NONE, None, False, True, -0.02, 0.95, ExitType.NONE, None), - (0.05, 0.95, ExitType.NONE, None, False, True, -0.06, 0.95, ExitType.TRAILING_STOP_LOSS, - None), - (0.05, 1, ExitType.NONE, None, False, True, -0.06, 1, ExitType.TRAILING_STOP_LOSS, - lambda **kwargs: -0.05), - (0.05, 1, ExitType.NONE, None, False, True, 0.09, 1.04, ExitType.NONE, - lambda **kwargs: -0.05), - (0.05, 0.95, ExitType.NONE, None, False, True, 0.09, 0.98, ExitType.NONE, - lambda current_profit, **kwargs: -0.1 if current_profit < 0.6 else -(current_profit * 2)), + ( + 0.05, + 0.95, + ExitType.NONE, + None, + False, + True, + -0.06, + 0.95, + ExitType.TRAILING_STOP_LOSS, + None, + ), + ( + 0.05, + 1, + ExitType.NONE, + None, + False, + True, + -0.06, + 1, + ExitType.TRAILING_STOP_LOSS, + lambda **kwargs: -0.05, + ), + ( + 0.05, + 1, + ExitType.NONE, + None, + False, + True, + 0.09, + 1.04, + ExitType.NONE, + lambda **kwargs: -0.05, + ), + ( + 0.05, + 0.95, + ExitType.NONE, + None, + False, + True, + 0.09, + 0.98, + ExitType.NONE, + lambda current_profit, **kwargs: -0.1 + if current_profit < 0.6 + else -(current_profit * 2), + ), # Error case - static stoploss in place - (0.05, 0.9, ExitType.NONE, None, False, True, 0.09, 0.9, ExitType.NONE, - lambda **kwargs: None), - ]) -def test_ft_stoploss_reached(default_conf, fee, profit, adjusted, expected, liq, trailing, custom, - profit2, adjusted2, expected2, custom_stop) -> None: - + ( + 0.05, + 0.9, + ExitType.NONE, + None, + False, + True, + 0.09, + 0.9, + ExitType.NONE, + lambda **kwargs: None, + ), + ], +) +def test_ft_stoploss_reached( + default_conf, + fee, + profit, + adjusted, + expected, + liq, + trailing, + custom, + profit2, + adjusted2, + expected2, + custom_stop, +) -> None: strategy = StrategyResolver.load_strategy(default_conf) trade = Trade( - pair='ETH/BTC', + pair="ETH/BTC", stake_amount=0.01, amount=1, open_date=dt_now() - timedelta(hours=1), fee_open=fee.return_value, fee_close=fee.return_value, - exchange='binance', + exchange="binance", open_rate=1, liquidation_price=liq, ) @@ -477,9 +563,14 @@ def test_ft_stoploss_reached(default_conf, fee, profit, adjusted, expected, liq, now = dt_now() current_rate = trade.open_rate * (1 + profit) - sl_flag = strategy.ft_stoploss_reached(current_rate=current_rate, trade=trade, - current_time=now, current_profit=profit, - force_stoploss=0, high=None) + sl_flag = strategy.ft_stoploss_reached( + current_rate=current_rate, + trade=trade, + current_time=now, + current_profit=profit, + force_stoploss=0, + high=None, + ) assert isinstance(sl_flag, ExitCheckTuple) assert sl_flag.exit_type == expected if expected == ExitType.NONE: @@ -489,9 +580,14 @@ def test_ft_stoploss_reached(default_conf, fee, profit, adjusted, expected, liq, assert round(trade.stop_loss, 2) == adjusted current_rate2 = trade.open_rate * (1 + profit2) - sl_flag = strategy.ft_stoploss_reached(current_rate=current_rate2, trade=trade, - current_time=now, current_profit=profit2, - force_stoploss=0, high=None) + sl_flag = strategy.ft_stoploss_reached( + current_rate=current_rate2, + trade=trade, + current_time=now, + current_profit=profit2, + force_stoploss=0, + high=None, + ) assert sl_flag.exit_type == expected2 if expected2 == ExitType.NONE: assert sl_flag.exit_flag is False @@ -503,159 +599,145 @@ def test_ft_stoploss_reached(default_conf, fee, profit, adjusted, expected, liq, def test_custom_exit(default_conf, fee, caplog) -> None: - strategy = StrategyResolver.load_strategy(default_conf) trade = Trade( - pair='ETH/BTC', + pair="ETH/BTC", stake_amount=0.01, amount=1, open_date=dt_now() - timedelta(hours=1), fee_open=fee.return_value, fee_close=fee.return_value, - exchange='binance', + exchange="binance", open_rate=1, leverage=1.0, ) now = dt_now() - res = strategy.should_exit(trade, 1, now, - enter=False, exit_=False, - low=None, high=None) + res = strategy.should_exit(trade, 1, now, enter=False, exit_=False, low=None, high=None) assert res == [] strategy.custom_exit = MagicMock(return_value=True) - res = strategy.should_exit(trade, 1, now, - enter=False, exit_=False, - low=None, high=None) + res = strategy.should_exit(trade, 1, now, enter=False, exit_=False, low=None, high=None) assert res[0].exit_flag is True assert res[0].exit_type == ExitType.CUSTOM_EXIT - assert res[0].exit_reason == 'custom_exit' + assert res[0].exit_reason == "custom_exit" - strategy.custom_exit = MagicMock(return_value='hello world') + strategy.custom_exit = MagicMock(return_value="hello world") - res = strategy.should_exit(trade, 1, now, - enter=False, exit_=False, - low=None, high=None) + res = strategy.should_exit(trade, 1, now, enter=False, exit_=False, low=None, high=None) assert res[0].exit_type == ExitType.CUSTOM_EXIT assert res[0].exit_flag is True - assert res[0].exit_reason == 'hello world' + assert res[0].exit_reason == "hello world" caplog.clear() - strategy.custom_exit = MagicMock(return_value='h' * CUSTOM_TAG_MAX_LENGTH * 2) - res = strategy.should_exit(trade, 1, now, - enter=False, exit_=False, - low=None, high=None) + strategy.custom_exit = MagicMock(return_value="h" * CUSTOM_TAG_MAX_LENGTH * 2) + res = strategy.should_exit(trade, 1, now, enter=False, exit_=False, low=None, high=None) assert res[0].exit_type == ExitType.CUSTOM_EXIT assert res[0].exit_flag is True - assert res[0].exit_reason == 'h' * (CUSTOM_TAG_MAX_LENGTH) - assert log_has_re('Custom exit reason returned from custom_exit is too long.*', caplog) + assert res[0].exit_reason == "h" * (CUSTOM_TAG_MAX_LENGTH) + assert log_has_re("Custom exit reason returned from custom_exit is too long.*", caplog) def test_should_sell(default_conf, fee) -> None: - strategy = StrategyResolver.load_strategy(default_conf) trade = Trade( - pair='ETH/BTC', + pair="ETH/BTC", stake_amount=0.01, amount=1, open_date=dt_now() - timedelta(hours=1), fee_open=fee.return_value, fee_close=fee.return_value, - exchange='binance', + exchange="binance", open_rate=1, leverage=1.0, ) now = dt_now() - res = strategy.should_exit(trade, 1, now, - enter=False, exit_=False, - low=None, high=None) + res = strategy.should_exit(trade, 1, now, enter=False, exit_=False, low=None, high=None) assert res == [] strategy.min_roi_reached = MagicMock(return_value=True) - res = strategy.should_exit(trade, 1, now, - enter=False, exit_=False, - low=None, high=None) + res = strategy.should_exit(trade, 1, now, enter=False, exit_=False, low=None, high=None) assert len(res) == 1 assert res == [ExitCheckTuple(exit_type=ExitType.ROI)] strategy.min_roi_reached = MagicMock(return_value=True) strategy.ft_stoploss_reached = MagicMock( - return_value=ExitCheckTuple(exit_type=ExitType.STOP_LOSS)) + return_value=ExitCheckTuple(exit_type=ExitType.STOP_LOSS) + ) - res = strategy.should_exit(trade, 1, now, - enter=False, exit_=False, - low=None, high=None) + res = strategy.should_exit(trade, 1, now, enter=False, exit_=False, low=None, high=None) assert len(res) == 2 assert res == [ ExitCheckTuple(exit_type=ExitType.STOP_LOSS), ExitCheckTuple(exit_type=ExitType.ROI), - ] + ] - strategy.custom_exit = MagicMock(return_value='hello world') + strategy.custom_exit = MagicMock(return_value="hello world") # custom-exit and exit-signal is first - res = strategy.should_exit(trade, 1, now, - enter=False, exit_=False, - low=None, high=None) + res = strategy.should_exit(trade, 1, now, enter=False, exit_=False, low=None, high=None) assert len(res) == 3 assert res == [ - ExitCheckTuple(exit_type=ExitType.CUSTOM_EXIT, exit_reason='hello world'), + ExitCheckTuple(exit_type=ExitType.CUSTOM_EXIT, exit_reason="hello world"), ExitCheckTuple(exit_type=ExitType.STOP_LOSS), ExitCheckTuple(exit_type=ExitType.ROI), - ] + ] strategy.ft_stoploss_reached = MagicMock( - return_value=ExitCheckTuple(exit_type=ExitType.TRAILING_STOP_LOSS)) + return_value=ExitCheckTuple(exit_type=ExitType.TRAILING_STOP_LOSS) + ) # Regular exit signal - res = strategy.should_exit(trade, 1, now, - enter=False, exit_=True, - low=None, high=None) + res = strategy.should_exit(trade, 1, now, enter=False, exit_=True, low=None, high=None) assert len(res) == 3 assert res == [ ExitCheckTuple(exit_type=ExitType.EXIT_SIGNAL), ExitCheckTuple(exit_type=ExitType.ROI), ExitCheckTuple(exit_type=ExitType.TRAILING_STOP_LOSS), - ] + ] # Regular exit signal, no ROI strategy.min_roi_reached = MagicMock(return_value=False) - res = strategy.should_exit(trade, 1, now, - enter=False, exit_=True, - low=None, high=None) + res = strategy.should_exit(trade, 1, now, enter=False, exit_=True, low=None, high=None) assert len(res) == 2 assert res == [ ExitCheckTuple(exit_type=ExitType.EXIT_SIGNAL), ExitCheckTuple(exit_type=ExitType.TRAILING_STOP_LOSS), - ] + ] -@pytest.mark.parametrize('side', TRADE_SIDES) +@pytest.mark.parametrize("side", TRADE_SIDES) def test_leverage_callback(default_conf, side) -> None: - default_conf['strategy'] = 'StrategyTestV2' + default_conf["strategy"] = "StrategyTestV2" strategy = StrategyResolver.load_strategy(default_conf) - assert strategy.leverage( - pair='XRP/USDT', - current_time=datetime.now(timezone.utc), - current_rate=2.2, - proposed_leverage=1.0, - max_leverage=5.0, - side=side, - entry_tag=None, - ) == 1 + assert ( + strategy.leverage( + pair="XRP/USDT", + current_time=datetime.now(timezone.utc), + current_rate=2.2, + proposed_leverage=1.0, + max_leverage=5.0, + side=side, + entry_tag=None, + ) + == 1 + ) - default_conf['strategy'] = CURRENT_TEST_STRATEGY + default_conf["strategy"] = CURRENT_TEST_STRATEGY strategy = StrategyResolver.load_strategy(default_conf) - assert strategy.leverage( - pair='XRP/USDT', - current_time=datetime.now(timezone.utc), - current_rate=2.2, - proposed_leverage=1.0, - max_leverage=5.0, - side=side, - entry_tag='entry_tag_test', - ) == 3 + assert ( + strategy.leverage( + pair="XRP/USDT", + current_time=datetime.now(timezone.utc), + current_rate=2.2, + proposed_leverage=1.0, + max_leverage=5.0, + side=side, + entry_tag="entry_tag_test", + ) + == 3 + ) def test_analyze_ticker_default(ohlcv_history, mocker, caplog) -> None: @@ -664,29 +746,28 @@ def test_analyze_ticker_default(ohlcv_history, mocker, caplog) -> None: entry_mock = MagicMock(side_effect=lambda x, meta: x) exit_mock = MagicMock(side_effect=lambda x, meta: x) mocker.patch.multiple( - 'freqtrade.strategy.interface.IStrategy', + "freqtrade.strategy.interface.IStrategy", advise_indicators=ind_mock, advise_entry=entry_mock, advise_exit=exit_mock, - ) strategy = StrategyTestV3({}) - strategy.analyze_ticker(ohlcv_history, {'pair': 'ETH/BTC'}) + strategy.analyze_ticker(ohlcv_history, {"pair": "ETH/BTC"}) assert ind_mock.call_count == 1 assert entry_mock.call_count == 1 assert entry_mock.call_count == 1 - assert log_has('TA Analysis Launched', caplog) - assert not log_has('Skipping TA Analysis for already analyzed candle', caplog) + assert log_has("TA Analysis Launched", caplog) + assert not log_has("Skipping TA Analysis for already analyzed candle", caplog) caplog.clear() - strategy.analyze_ticker(ohlcv_history, {'pair': 'ETH/BTC'}) + strategy.analyze_ticker(ohlcv_history, {"pair": "ETH/BTC"}) # No analysis happens as process_only_new_candles is true assert ind_mock.call_count == 2 assert entry_mock.call_count == 2 assert entry_mock.call_count == 2 - assert log_has('TA Analysis Launched', caplog) - assert not log_has('Skipping TA Analysis for already analyzed candle', caplog) + assert log_has("TA Analysis Launched", caplog) + assert not log_has("Skipping TA Analysis for already analyzed candle", caplog) def test__analyze_ticker_internal_skip_analyze(ohlcv_history, mocker, caplog) -> None: @@ -695,65 +776,64 @@ def test__analyze_ticker_internal_skip_analyze(ohlcv_history, mocker, caplog) -> entry_mock = MagicMock(side_effect=lambda x, meta: x) exit_mock = MagicMock(side_effect=lambda x, meta: x) mocker.patch.multiple( - 'freqtrade.strategy.interface.IStrategy', + "freqtrade.strategy.interface.IStrategy", advise_indicators=ind_mock, advise_entry=entry_mock, advise_exit=exit_mock, - ) strategy = StrategyTestV3({}) strategy.dp = DataProvider({}, None, None) strategy.process_only_new_candles = True - ret = strategy._analyze_ticker_internal(ohlcv_history, {'pair': 'ETH/BTC'}) - assert 'high' in ret.columns - assert 'low' in ret.columns - assert 'close' in ret.columns + ret = strategy._analyze_ticker_internal(ohlcv_history, {"pair": "ETH/BTC"}) + assert "high" in ret.columns + assert "low" in ret.columns + assert "close" in ret.columns assert isinstance(ret, DataFrame) assert ind_mock.call_count == 1 assert entry_mock.call_count == 1 assert entry_mock.call_count == 1 - assert log_has('TA Analysis Launched', caplog) - assert not log_has('Skipping TA Analysis for already analyzed candle', caplog) + assert log_has("TA Analysis Launched", caplog) + assert not log_has("Skipping TA Analysis for already analyzed candle", caplog) caplog.clear() - ret = strategy._analyze_ticker_internal(ohlcv_history, {'pair': 'ETH/BTC'}) + ret = strategy._analyze_ticker_internal(ohlcv_history, {"pair": "ETH/BTC"}) # No analysis happens as process_only_new_candles is true assert ind_mock.call_count == 1 assert entry_mock.call_count == 1 assert entry_mock.call_count == 1 # only skipped analyze adds buy and sell columns, otherwise it's all mocked - assert 'enter_long' in ret.columns - assert 'exit_long' in ret.columns - assert ret['enter_long'].sum() == 0 - assert ret['exit_long'].sum() == 0 - assert not log_has('TA Analysis Launched', caplog) - assert log_has('Skipping TA Analysis for already analyzed candle', caplog) + assert "enter_long" in ret.columns + assert "exit_long" in ret.columns + assert ret["enter_long"].sum() == 0 + assert ret["exit_long"].sum() == 0 + assert not log_has("TA Analysis Launched", caplog) + assert log_has("Skipping TA Analysis for already analyzed candle", caplog) @pytest.mark.usefixtures("init_persistence") def test_is_pair_locked(default_conf): - PairLocks.timeframe = default_conf['timeframe'] + PairLocks.timeframe = default_conf["timeframe"] PairLocks.use_db = True strategy = StrategyResolver.load_strategy(default_conf) # No lock should be present assert len(PairLocks.get_pair_locks(None)) == 0 - pair = 'ETH/BTC' + pair = "ETH/BTC" assert not strategy.is_pair_locked(pair) strategy.lock_pair(pair, dt_now() + timedelta(minutes=4)) # ETH/BTC locked for 4 minutes assert strategy.is_pair_locked(pair) # XRP/BTC should not be locked now - pair = 'XRP/BTC' + pair = "XRP/BTC" assert not strategy.is_pair_locked(pair) # Unlocking a pair that's not locked should not raise an error strategy.unlock_pair(pair) # Unlock original pair - pair = 'ETH/BTC' + pair = "ETH/BTC" strategy.unlock_pair(pair) assert not strategy.is_pair_locked(pair) @@ -764,7 +844,7 @@ def test_is_pair_locked(default_conf): strategy.unlock_reason(reason) assert not strategy.is_pair_locked(pair) - pair = 'BTC/USDT' + pair = "BTC/USDT" # Lock until 14:30 lock_time = datetime(2020, 5, 1, 14, 30, 0, tzinfo=timezone.utc) # Subtract 2 seconds, as locking rounds up to the next candle. @@ -783,7 +863,7 @@ def test_is_pair_locked(default_conf): assert not strategy.is_pair_locked(pair, candle_date=lock_time + timedelta(minutes=10)) # Change timeframe to 15m - strategy.timeframe = '15m' + strategy.timeframe = "15m" # Candle from 14:14 - lock goes until 14:30 assert strategy.is_pair_locked(pair, candle_date=lock_time + timedelta(minutes=-16)) assert strategy.is_pair_locked(pair, candle_date=lock_time + timedelta(minutes=-15, seconds=-2)) @@ -792,45 +872,48 @@ def test_is_pair_locked(default_conf): def test_is_informative_pairs_callback(default_conf): - default_conf.update({'strategy': 'StrategyTestV2'}) + default_conf.update({"strategy": "StrategyTestV2"}) strategy = StrategyResolver.load_strategy(default_conf) # Should return empty # Uses fallback to base implementation assert [] == strategy.gather_informative_pairs() -@pytest.mark.parametrize('error', [ - ValueError, KeyError, Exception, -]) +@pytest.mark.parametrize( + "error", + [ + ValueError, + KeyError, + Exception, + ], +) def test_strategy_safe_wrapper_error(caplog, error): def failing_method(): - raise error('This is an error.') + raise error("This is an error.") - with pytest.raises(StrategyError, match=r'This is an error.'): - strategy_safe_wrapper(failing_method, message='DeadBeef')() + with pytest.raises(StrategyError, match=r"This is an error."): + strategy_safe_wrapper(failing_method, message="DeadBeef")() - assert log_has_re(r'DeadBeef.*', caplog) - ret = strategy_safe_wrapper(failing_method, message='DeadBeef', default_retval=True)() + assert log_has_re(r"DeadBeef.*", caplog) + ret = strategy_safe_wrapper(failing_method, message="DeadBeef", default_retval=True)() assert isinstance(ret, bool) assert ret caplog.clear() # Test suppressing error - ret = strategy_safe_wrapper(failing_method, message='DeadBeef', supress_error=True)() - assert log_has_re(r'DeadBeef.*', caplog) + ret = strategy_safe_wrapper(failing_method, message="DeadBeef", supress_error=True)() + assert log_has_re(r"DeadBeef.*", caplog) -@pytest.mark.parametrize('value', [ - 1, 22, 55, True, False, {'a': 1, 'b': '112'}, - [1, 2, 3, 4], (4, 2, 3, 6) -]) +@pytest.mark.parametrize( + "value", [1, 22, 55, True, False, {"a": 1, "b": "112"}, [1, 2, 3, 4], (4, 2, 3, 6)] +) def test_strategy_safe_wrapper(value): - def working_method(argumentpassedin): return argumentpassedin - ret = strategy_safe_wrapper(working_method, message='DeadBeef')(value) + ret = strategy_safe_wrapper(working_method, message="DeadBeef")(value) assert isinstance(ret, type(value)) assert ret == value @@ -850,7 +933,7 @@ def test_strategy_safe_wrapper_trade_copy(fee): trade = Trade.get_open_trades()[0] # Don't assert anything before strategy_wrapper. # This ensures that relationship loading works correctly. - ret = strategy_safe_wrapper(working_method, message='DeadBeef')(trade=trade) + ret = strategy_safe_wrapper(working_method, message="DeadBeef")(trade=trade) assert isinstance(ret, Trade) assert id(trade) != id(ret) # Did not modify the original order @@ -863,35 +946,35 @@ def test_hyperopt_parameters(): from skopt.space import Categorical, Integer, Real with pytest.raises(OperationalException, match=r"Name is determined.*"): - IntParameter(low=0, high=5, default=1, name='hello') + IntParameter(low=0, high=5, default=1, name="hello") with pytest.raises(OperationalException, match=r"IntParameter space must be.*"): - IntParameter(low=0, default=5, space='buy') + IntParameter(low=0, default=5, space="buy") with pytest.raises(OperationalException, match=r"RealParameter space must be.*"): - RealParameter(low=0, default=5, space='buy') + RealParameter(low=0, default=5, space="buy") with pytest.raises(OperationalException, match=r"DecimalParameter space must be.*"): - DecimalParameter(low=0, default=5, space='buy') + DecimalParameter(low=0, default=5, space="buy") with pytest.raises(OperationalException, match=r"IntParameter space invalid\."): - IntParameter([0, 10], high=7, default=5, space='buy') + IntParameter([0, 10], high=7, default=5, space="buy") with pytest.raises(OperationalException, match=r"RealParameter space invalid\."): - RealParameter([0, 10], high=7, default=5, space='buy') + RealParameter([0, 10], high=7, default=5, space="buy") with pytest.raises(OperationalException, match=r"DecimalParameter space invalid\."): - DecimalParameter([0, 10], high=7, default=5, space='buy') + DecimalParameter([0, 10], high=7, default=5, space="buy") with pytest.raises(OperationalException, match=r"CategoricalParameter space must.*"): - CategoricalParameter(['aa'], default='aa', space='buy') + CategoricalParameter(["aa"], default="aa", space="buy") with pytest.raises(TypeError): - BaseParameter(opt_range=[0, 1], default=1, space='buy') + BaseParameter(opt_range=[0, 1], default=1, space="buy") - intpar = IntParameter(low=0, high=5, default=1, space='buy') + intpar = IntParameter(low=0, high=5, default=1, space="buy") assert intpar.value == 1 - assert isinstance(intpar.get_space(''), Integer) + assert isinstance(intpar.get_space(""), Integer) assert isinstance(intpar.range, range) assert len(list(intpar.range)) == 1 # Range contains ONLY the default / value. @@ -901,13 +984,13 @@ def test_hyperopt_parameters(): assert len(list(intpar.range)) == 6 assert list(intpar.range) == [0, 1, 2, 3, 4, 5] - fltpar = RealParameter(low=0.0, high=5.5, default=1.0, space='buy') + fltpar = RealParameter(low=0.0, high=5.5, default=1.0, space="buy") assert fltpar.value == 1 - assert isinstance(fltpar.get_space(''), Real) + assert isinstance(fltpar.get_space(""), Real) - fltpar = DecimalParameter(low=0.0, high=0.5, default=0.14, decimals=1, space='buy') + fltpar = DecimalParameter(low=0.0, high=0.5, default=0.14, decimals=1, space="buy") assert fltpar.value == 0.1 - assert isinstance(fltpar.get_space(''), SKDecimal) + assert isinstance(fltpar.get_space(""), SKDecimal) assert isinstance(fltpar.range, list) assert len(list(fltpar.range)) == 1 # Range contains ONLY the default / value. @@ -916,21 +999,22 @@ def test_hyperopt_parameters(): assert len(list(fltpar.range)) == 6 assert list(fltpar.range) == [0.0, 0.1, 0.2, 0.3, 0.4, 0.5] - catpar = CategoricalParameter(['buy_rsi', 'buy_macd', 'buy_none'], - default='buy_macd', space='buy') - assert catpar.value == 'buy_macd' - assert isinstance(catpar.get_space(''), Categorical) + catpar = CategoricalParameter( + ["buy_rsi", "buy_macd", "buy_none"], default="buy_macd", space="buy" + ) + assert catpar.value == "buy_macd" + assert isinstance(catpar.get_space(""), Categorical) assert isinstance(catpar.range, list) assert len(list(catpar.range)) == 1 # Range contains ONLY the default / value. assert list(catpar.range) == [catpar.value] catpar.in_space = True assert len(list(catpar.range)) == 3 - assert list(catpar.range) == ['buy_rsi', 'buy_macd', 'buy_none'] + assert list(catpar.range) == ["buy_rsi", "buy_macd", "buy_none"] - boolpar = BooleanParameter(default=True, space='buy') + boolpar = BooleanParameter(default=True, space="buy") assert boolpar.value is True - assert isinstance(boolpar.get_space(''), Categorical) + assert isinstance(boolpar.get_space(""), Categorical) assert isinstance(boolpar.range, list) assert len(list(boolpar.range)) == 1 @@ -947,59 +1031,55 @@ def test_hyperopt_parameters(): def test_auto_hyperopt_interface(default_conf): - default_conf.update({'strategy': 'HyperoptableStrategyV2'}) - PairLocks.timeframe = default_conf['timeframe'] + default_conf.update({"strategy": "HyperoptableStrategyV2"}) + PairLocks.timeframe = default_conf["timeframe"] strategy = StrategyResolver.load_strategy(default_conf) strategy.ft_bot_start() with pytest.raises(OperationalException): - next(strategy.enumerate_parameters('deadBeef')) + next(strategy.enumerate_parameters("deadBeef")) - assert strategy.buy_rsi.value == strategy.buy_params['buy_rsi'] + assert strategy.buy_rsi.value == strategy.buy_params["buy_rsi"] # PlusDI is NOT in the buy-params, so default should be used assert strategy.buy_plusdi.value == 0.5 - assert strategy.sell_rsi.value == strategy.sell_params['sell_rsi'] + assert strategy.sell_rsi.value == strategy.sell_params["sell_rsi"] - assert repr(strategy.sell_rsi) == 'IntParameter(74)' + assert repr(strategy.sell_rsi) == "IntParameter(74)" # Parameter is disabled - so value from sell_param dict will NOT be used. assert strategy.sell_minusdi.value == 0.5 all_params = strategy.detect_all_parameters() assert isinstance(all_params, dict) # Only one buy param at class level - assert len(all_params['buy']) == 1 + assert len(all_params["buy"]) == 1 # Running detect params at instance level reveals both parameters. - assert len(list(detect_parameters(strategy, 'buy'))) == 2 - assert len(all_params['sell']) == 2 + assert len(list(detect_parameters(strategy, "buy"))) == 2 + assert len(all_params["sell"]) == 2 # Number of Hyperoptable parameters - assert all_params['count'] == 5 + assert all_params["count"] == 5 - strategy.__class__.sell_rsi = IntParameter([0, 10], default=5, space='buy') + strategy.__class__.sell_rsi = IntParameter([0, 10], default=5, space="buy") with pytest.raises(OperationalException, match=r"Inconclusive parameter.*"): - [x for x in detect_parameters(strategy, 'sell')] + [x for x in detect_parameters(strategy, "sell")] def test_auto_hyperopt_interface_loadparams(default_conf, mocker, caplog): - default_conf.update({'strategy': 'HyperoptableStrategy'}) - del default_conf['stoploss'] - del default_conf['minimal_roi'] - mocker.patch.object(Path, 'is_file', MagicMock(return_value=True)) - mocker.patch.object(Path, 'open') + default_conf.update({"strategy": "HyperoptableStrategy"}) + del default_conf["stoploss"] + del default_conf["minimal_roi"] + mocker.patch.object(Path, "is_file", MagicMock(return_value=True)) + mocker.patch.object(Path, "open") expected_result = { "strategy_name": "HyperoptableStrategy", "params": { "stoploss": { "stoploss": -0.05, }, - "roi": { - "0": 0.2, - "1200": 0.01 - } - } + "roi": {"0": 0.2, "1200": 0.01}, + }, } - mocker.patch('freqtrade.strategy.hyper.HyperoptTools.load_params', - return_value=expected_result) - PairLocks.timeframe = default_conf['timeframe'] + mocker.patch("freqtrade.strategy.hyper.HyperoptTools.load_params", return_value=expected_result) + PairLocks.timeframe = default_conf["timeframe"] strategy = StrategyResolver.load_strategy(default_conf) assert strategy.stoploss == -0.05 assert strategy.minimal_roi == {0: 0.2, 1200: 0.01} @@ -1010,47 +1090,45 @@ def test_auto_hyperopt_interface_loadparams(default_conf, mocker, caplog): "stoploss": { "stoploss": -0.05, }, - "roi": { - "0": 0.2, - "1200": 0.01 - } - } + "roi": {"0": 0.2, "1200": 0.01}, + }, } - mocker.patch('freqtrade.strategy.hyper.HyperoptTools.load_params', - return_value=expected_result) + mocker.patch("freqtrade.strategy.hyper.HyperoptTools.load_params", return_value=expected_result) with pytest.raises(OperationalException, match="Invalid parameter file provided."): StrategyResolver.load_strategy(default_conf) - mocker.patch('freqtrade.strategy.hyper.HyperoptTools.load_params', - MagicMock(side_effect=ValueError())) + mocker.patch( + "freqtrade.strategy.hyper.HyperoptTools.load_params", MagicMock(side_effect=ValueError()) + ) StrategyResolver.load_strategy(default_conf) assert log_has("Invalid parameter file format.", caplog) -@pytest.mark.parametrize('function,raises', [ - ('populate_entry_trend', False), - ('advise_entry', False), - ('populate_exit_trend', False), - ('advise_exit', False), -]) +@pytest.mark.parametrize( + "function,raises", + [ + ("populate_entry_trend", False), + ("advise_entry", False), + ("populate_exit_trend", False), + ("advise_exit", False), + ], +) def test_pandas_warning_direct(ohlcv_history, function, raises, recwarn): - - df = _STRATEGY.populate_indicators(ohlcv_history, {'pair': 'ETH/BTC'}) + df = _STRATEGY.populate_indicators(ohlcv_history, {"pair": "ETH/BTC"}) if raises: assert len(recwarn) == 1 # https://github.com/pandas-dev/pandas/issues/56503 # Fixed in 2.2.x - getattr(_STRATEGY, function)(df, {'pair': 'ETH/BTC'}) + getattr(_STRATEGY, function)(df, {"pair": "ETH/BTC"}) else: assert len(recwarn) == 0 - getattr(_STRATEGY, function)(df, {'pair': 'ETH/BTC'}) + getattr(_STRATEGY, function)(df, {"pair": "ETH/BTC"}) def test_pandas_warning_through_analyze_pair(ohlcv_history, mocker, recwarn): - - mocker.patch.object(_STRATEGY.dp, 'ohlcv', return_value=ohlcv_history) - _STRATEGY.analyze_pair('ETH/BTC') + mocker.patch.object(_STRATEGY.dp, "ohlcv", return_value=ohlcv_history) + _STRATEGY.analyze_pair("ETH/BTC") assert len(recwarn) == 0 diff --git a/tests/strategy/test_strategy_helpers.py b/tests/strategy/test_strategy_helpers.py index b7fb7dea1..aeb46a4e4 100644 --- a/tests/strategy/test_strategy_helpers.py +++ b/tests/strategy/test_strategy_helpers.py @@ -10,217 +10,221 @@ from tests.conftest import generate_test_data, get_patched_exchange def test_merge_informative_pair(): - data = generate_test_data('15m', 40) - informative = generate_test_data('1h', 40) + data = generate_test_data("15m", 40) + informative = generate_test_data("1h", 40) cols_inf = list(informative.columns) - result = merge_informative_pair(data, informative, '15m', '1h', ffill=True) + result = merge_informative_pair(data, informative, "15m", "1h", ffill=True) assert isinstance(result, pd.DataFrame) assert list(informative.columns) == cols_inf assert len(result) == len(data) - assert 'date' in result.columns - assert result['date'].equals(data['date']) - assert 'date_1h' in result.columns + assert "date" in result.columns + assert result["date"].equals(data["date"]) + assert "date_1h" in result.columns - assert 'open' in result.columns - assert 'open_1h' in result.columns - assert result['open'].equals(data['open']) + assert "open" in result.columns + assert "open_1h" in result.columns + assert result["open"].equals(data["open"]) - assert 'close' in result.columns - assert 'close_1h' in result.columns - assert result['close'].equals(data['close']) + assert "close" in result.columns + assert "close_1h" in result.columns + assert result["close"].equals(data["close"]) - assert 'volume' in result.columns - assert 'volume_1h' in result.columns - assert result['volume'].equals(data['volume']) + assert "volume" in result.columns + assert "volume_1h" in result.columns + assert result["volume"].equals(data["volume"]) # First 3 rows are empty - assert result.iloc[0]['date_1h'] is pd.NaT - assert result.iloc[1]['date_1h'] is pd.NaT - assert result.iloc[2]['date_1h'] is pd.NaT + assert result.iloc[0]["date_1h"] is pd.NaT + assert result.iloc[1]["date_1h"] is pd.NaT + assert result.iloc[2]["date_1h"] is pd.NaT # Next 4 rows contain the starting date (0:00) - assert result.iloc[3]['date_1h'] == result.iloc[0]['date'] - assert result.iloc[4]['date_1h'] == result.iloc[0]['date'] - assert result.iloc[5]['date_1h'] == result.iloc[0]['date'] - assert result.iloc[6]['date_1h'] == result.iloc[0]['date'] + assert result.iloc[3]["date_1h"] == result.iloc[0]["date"] + assert result.iloc[4]["date_1h"] == result.iloc[0]["date"] + assert result.iloc[5]["date_1h"] == result.iloc[0]["date"] + assert result.iloc[6]["date_1h"] == result.iloc[0]["date"] # Next 4 rows contain the next Hourly date original date row 4 - assert result.iloc[7]['date_1h'] == result.iloc[4]['date'] - assert result.iloc[8]['date_1h'] == result.iloc[4]['date'] + assert result.iloc[7]["date_1h"] == result.iloc[4]["date"] + assert result.iloc[8]["date_1h"] == result.iloc[4]["date"] - informative = generate_test_data('1h', 40) - result = merge_informative_pair(data, informative, '15m', '1h', ffill=False) + informative = generate_test_data("1h", 40) + result = merge_informative_pair(data, informative, "15m", "1h", ffill=False) # First 3 rows are empty - assert result.iloc[0]['date_1h'] is pd.NaT - assert result.iloc[1]['date_1h'] is pd.NaT - assert result.iloc[2]['date_1h'] is pd.NaT + assert result.iloc[0]["date_1h"] is pd.NaT + assert result.iloc[1]["date_1h"] is pd.NaT + assert result.iloc[2]["date_1h"] is pd.NaT # Next 4 rows contain the starting date (0:00) - assert result.iloc[3]['date_1h'] == result.iloc[0]['date'] - assert result.iloc[4]['date_1h'] is pd.NaT - assert result.iloc[5]['date_1h'] is pd.NaT - assert result.iloc[6]['date_1h'] is pd.NaT + assert result.iloc[3]["date_1h"] == result.iloc[0]["date"] + assert result.iloc[4]["date_1h"] is pd.NaT + assert result.iloc[5]["date_1h"] is pd.NaT + assert result.iloc[6]["date_1h"] is pd.NaT # Next 4 rows contain the next Hourly date original date row 4 - assert result.iloc[7]['date_1h'] == result.iloc[4]['date'] - assert result.iloc[8]['date_1h'] is pd.NaT + assert result.iloc[7]["date_1h"] == result.iloc[4]["date"] + assert result.iloc[8]["date_1h"] is pd.NaT def test_merge_informative_pair_weekly(): # Covers roughly 2 months - until 2023-01-10 - data = generate_test_data('1h', 1040, '2022-11-28') - informative = generate_test_data('1w', 40, '2022-11-01') - informative['day'] = informative['date'].dt.day_name() + data = generate_test_data("1h", 1040, "2022-11-28") + informative = generate_test_data("1w", 40, "2022-11-01") + informative["day"] = informative["date"].dt.day_name() - result = merge_informative_pair(data, informative, '1h', '1w', ffill=True) + result = merge_informative_pair(data, informative, "1h", "1w", ffill=True) assert isinstance(result, pd.DataFrame) # 2022-12-24 is a Saturday - candle1 = result.loc[(result['date'] == '2022-12-24T22:00:00.000Z')] - assert candle1.iloc[0]['date'] == pd.Timestamp('2022-12-24T22:00:00.000Z') - assert candle1.iloc[0]['date_1w'] == pd.Timestamp('2022-12-12T00:00:00.000Z') + candle1 = result.loc[(result["date"] == "2022-12-24T22:00:00.000Z")] + assert candle1.iloc[0]["date"] == pd.Timestamp("2022-12-24T22:00:00.000Z") + assert candle1.iloc[0]["date_1w"] == pd.Timestamp("2022-12-12T00:00:00.000Z") - candle2 = result.loc[(result['date'] == '2022-12-24T23:00:00.000Z')] - assert candle2.iloc[0]['date'] == pd.Timestamp('2022-12-24T23:00:00.000Z') - assert candle2.iloc[0]['date_1w'] == pd.Timestamp('2022-12-12T00:00:00.000Z') + candle2 = result.loc[(result["date"] == "2022-12-24T23:00:00.000Z")] + assert candle2.iloc[0]["date"] == pd.Timestamp("2022-12-24T23:00:00.000Z") + assert candle2.iloc[0]["date_1w"] == pd.Timestamp("2022-12-12T00:00:00.000Z") # 2022-12-25 is a Sunday - candle3 = result.loc[(result['date'] == '2022-12-25T22:00:00.000Z')] - assert candle3.iloc[0]['date'] == pd.Timestamp('2022-12-25T22:00:00.000Z') + candle3 = result.loc[(result["date"] == "2022-12-25T22:00:00.000Z")] + assert candle3.iloc[0]["date"] == pd.Timestamp("2022-12-25T22:00:00.000Z") # Still old candle - assert candle3.iloc[0]['date_1w'] == pd.Timestamp('2022-12-12T00:00:00.000Z') + assert candle3.iloc[0]["date_1w"] == pd.Timestamp("2022-12-12T00:00:00.000Z") - candle4 = result.loc[(result['date'] == '2022-12-25T23:00:00.000Z')] - assert candle4.iloc[0]['date'] == pd.Timestamp('2022-12-25T23:00:00.000Z') - assert candle4.iloc[0]['date_1w'] == pd.Timestamp('2022-12-19T00:00:00.000Z') + candle4 = result.loc[(result["date"] == "2022-12-25T23:00:00.000Z")] + assert candle4.iloc[0]["date"] == pd.Timestamp("2022-12-25T23:00:00.000Z") + assert candle4.iloc[0]["date_1w"] == pd.Timestamp("2022-12-19T00:00:00.000Z") def test_merge_informative_pair_monthly(): # Covers roughly 2 months - until 2023-01-10 - data = generate_test_data('1h', 1040, '2022-11-28') - informative = generate_test_data('1M', 40, '2022-01-01') + data = generate_test_data("1h", 1040, "2022-11-28") + informative = generate_test_data("1M", 40, "2022-01-01") - result = merge_informative_pair(data, informative, '1h', '1M', ffill=True) + result = merge_informative_pair(data, informative, "1h", "1M", ffill=True) assert isinstance(result, pd.DataFrame) - candle1 = result.loc[(result['date'] == '2022-12-31T22:00:00.000Z')] - assert candle1.iloc[0]['date'] == pd.Timestamp('2022-12-31T22:00:00.000Z') - assert candle1.iloc[0]['date_1M'] == pd.Timestamp('2022-11-01T00:00:00.000Z') + candle1 = result.loc[(result["date"] == "2022-12-31T22:00:00.000Z")] + assert candle1.iloc[0]["date"] == pd.Timestamp("2022-12-31T22:00:00.000Z") + assert candle1.iloc[0]["date_1M"] == pd.Timestamp("2022-11-01T00:00:00.000Z") - candle2 = result.loc[(result['date'] == '2022-12-31T23:00:00.000Z')] - assert candle2.iloc[0]['date'] == pd.Timestamp('2022-12-31T23:00:00.000Z') - assert candle2.iloc[0]['date_1M'] == pd.Timestamp('2022-12-01T00:00:00.000Z') + candle2 = result.loc[(result["date"] == "2022-12-31T23:00:00.000Z")] + assert candle2.iloc[0]["date"] == pd.Timestamp("2022-12-31T23:00:00.000Z") + assert candle2.iloc[0]["date_1M"] == pd.Timestamp("2022-12-01T00:00:00.000Z") # Candle is empty, as the start-date did fail. - candle3 = result.loc[(result['date'] == '2022-11-30T22:00:00.000Z')] - assert candle3.iloc[0]['date'] == pd.Timestamp('2022-11-30T22:00:00.000Z') - assert candle3.iloc[0]['date_1M'] is pd.NaT + candle3 = result.loc[(result["date"] == "2022-11-30T22:00:00.000Z")] + assert candle3.iloc[0]["date"] == pd.Timestamp("2022-11-30T22:00:00.000Z") + assert candle3.iloc[0]["date_1M"] is pd.NaT # First candle with 1M data merged. - candle4 = result.loc[(result['date'] == '2022-11-30T23:00:00.000Z')] - assert candle4.iloc[0]['date'] == pd.Timestamp('2022-11-30T23:00:00.000Z') - assert candle4.iloc[0]['date_1M'] == pd.Timestamp('2022-11-01T00:00:00.000Z') + candle4 = result.loc[(result["date"] == "2022-11-30T23:00:00.000Z")] + assert candle4.iloc[0]["date"] == pd.Timestamp("2022-11-30T23:00:00.000Z") + assert candle4.iloc[0]["date_1M"] == pd.Timestamp("2022-11-01T00:00:00.000Z") def test_merge_informative_pair_same(): - data = generate_test_data('15m', 40) - informative = generate_test_data('15m', 40) + data = generate_test_data("15m", 40) + informative = generate_test_data("15m", 40) - result = merge_informative_pair(data, informative, '15m', '15m', ffill=True) + result = merge_informative_pair(data, informative, "15m", "15m", ffill=True) assert isinstance(result, pd.DataFrame) assert len(result) == len(data) - assert 'date' in result.columns - assert result['date'].equals(data['date']) - assert 'date_15m' in result.columns + assert "date" in result.columns + assert result["date"].equals(data["date"]) + assert "date_15m" in result.columns - assert 'open' in result.columns - assert 'open_15m' in result.columns - assert result['open'].equals(data['open']) + assert "open" in result.columns + assert "open_15m" in result.columns + assert result["open"].equals(data["open"]) - assert 'close' in result.columns - assert 'close_15m' in result.columns - assert result['close'].equals(data['close']) + assert "close" in result.columns + assert "close_15m" in result.columns + assert result["close"].equals(data["close"]) - assert 'volume' in result.columns - assert 'volume_15m' in result.columns - assert result['volume'].equals(data['volume']) + assert "volume" in result.columns + assert "volume_15m" in result.columns + assert result["volume"].equals(data["volume"]) # Dates match 1:1 - assert result['date_15m'].equals(result['date']) + assert result["date_15m"].equals(result["date"]) def test_merge_informative_pair_lower(): - data = generate_test_data('1h', 40) - informative = generate_test_data('15m', 40) + data = generate_test_data("1h", 40) + informative = generate_test_data("15m", 40) with pytest.raises(ValueError, match=r"Tried to merge a faster timeframe .*"): - merge_informative_pair(data, informative, '1h', '15m', ffill=True) + merge_informative_pair(data, informative, "1h", "15m", ffill=True) def test_merge_informative_pair_empty(): - data = generate_test_data('1h', 40) + data = generate_test_data("1h", 40) informative = pd.DataFrame(columns=data.columns) - result = merge_informative_pair(data, informative, '1h', '2h', ffill=True) - assert result['date'].equals(data['date']) + result = merge_informative_pair(data, informative, "1h", "2h", ffill=True) + assert result["date"].equals(data["date"]) assert list(result.columns) == [ - 'date', - 'open', - 'high', - 'low', - 'close', - 'volume', - 'date_2h', - 'open_2h', - 'high_2h', - 'low_2h', - 'close_2h', - 'volume_2h' + "date", + "open", + "high", + "low", + "close", + "volume", + "date_2h", + "open_2h", + "high_2h", + "low_2h", + "close_2h", + "volume_2h", ] # We merge an empty dataframe, so all values should be NaN - for col in ['date_2h', 'open_2h', 'high_2h', 'low_2h', 'close_2h', 'volume_2h']: + for col in ["date_2h", "open_2h", "high_2h", "low_2h", "close_2h", "volume_2h"]: assert result[col].isnull().all() def test_merge_informative_pair_suffix(): - data = generate_test_data('15m', 20) - informative = generate_test_data('1h', 20) + data = generate_test_data("15m", 20) + informative = generate_test_data("1h", 20) - result = merge_informative_pair(data, informative, '15m', '1h', - append_timeframe=False, suffix="suf") + result = merge_informative_pair( + data, informative, "15m", "1h", append_timeframe=False, suffix="suf" + ) - assert 'date' in result.columns - assert result['date'].equals(data['date']) - assert 'date_suf' in result.columns + assert "date" in result.columns + assert result["date"].equals(data["date"]) + assert "date_suf" in result.columns - assert 'open_suf' in result.columns - assert 'open_1h' not in result.columns + assert "open_suf" in result.columns + assert "open_1h" not in result.columns assert list(result.columns) == [ - 'date', - 'open', - 'high', - 'low', - 'close', - 'volume', - 'date_suf', - 'open_suf', - 'high_suf', - 'low_suf', - 'close_suf', - 'volume_suf' + "date", + "open", + "high", + "low", + "close", + "volume", + "date_suf", + "open_suf", + "high_suf", + "low_suf", + "close_suf", + "volume_suf", ] def test_merge_informative_pair_suffix_append_timeframe(): - data = generate_test_data('15m', 20) - informative = generate_test_data('1h', 20) + data = generate_test_data("15m", 20) + informative = generate_test_data("1h", 20) with pytest.raises(ValueError, match=r"You can not specify `append_timeframe` .*"): - merge_informative_pair(data, informative, '15m', '1h', suffix="suf") + merge_informative_pair(data, informative, "15m", "1h", suffix="suf") -@pytest.mark.parametrize("side,profitrange", [ - # profit range for long is [-1, inf] while for shorts is [-inf, 1] - ("long", [-0.99, 2, 30]), - ("short", [-2.0, 0.99, 30]), -]) +@pytest.mark.parametrize( + "side,profitrange", + [ + # profit range for long is [-1, inf] while for shorts is [-inf, 1] + ("long", [-0.99, 2, 30]), + ("short", [-2.0, 0.99, 30]), + ], +) def test_stoploss_from_open(side, profitrange): open_price_ranges = [ [0.01, 1.00, 30], @@ -231,8 +235,7 @@ def test_stoploss_from_open(side, profitrange): for open_range in open_price_ranges: for open_price in np.linspace(*open_range): for desired_stop in np.linspace(-0.50, 0.50, 30): - - if side == 'long': + if side == "long": # -1 is not a valid current_profit, should return 1 assert stoploss_from_open(desired_stop, -1) == 1 else: @@ -240,7 +243,7 @@ def test_stoploss_from_open(side, profitrange): assert stoploss_from_open(desired_stop, 1, True) == 1 for current_profit in np.linspace(*profitrange): - if side == 'long': + if side == "long": current_price = open_price * (1 + current_profit) expected_stop_price = open_price * (1 + desired_stop) stoploss = stoploss_from_open(desired_stop, current_profit) @@ -254,43 +257,45 @@ def test_stoploss_from_open(side, profitrange): assert stoploss >= 0 # Technically the formula can yield values greater than 1 for shorts # even though it doesn't make sense because the position would be liquidated - if side == 'long': + if side == "long": assert stoploss <= 1 # there is no correct answer if the expected stop price is above # the current price - if ((side == 'long' and expected_stop_price > current_price) - or (side == 'short' and expected_stop_price < current_price)): + if (side == "long" and expected_stop_price > current_price) or ( + side == "short" and expected_stop_price < current_price + ): assert stoploss == 0 else: assert pytest.approx(stop_price) == expected_stop_price -@pytest.mark.parametrize("side,rel_stop,curr_profit,leverage,expected", [ - # profit range for long is [-1, inf] while for shorts is [-inf, 1] - ("long", 0, -1, 1, 1), - ("long", 0, 0.1, 1, 0.09090909), - ("long", -0.1, 0.1, 1, 0.18181818), - ("long", 0.1, 0.2, 1, 0.08333333), - ("long", 0.1, 0.5, 1, 0.266666666), - ("long", 0.1, 5, 1, 0.816666666), # 500% profit, set stoploss to 10% above open price - ("long", 0, 5, 10, 3.3333333), # 500% profit, set stoploss break even - ("long", 0.1, 5, 10, 3.26666666), # 500% profit, set stoploss to 10% above open price - ("long", -0.1, 5, 10, 3.3999999), # 500% profit, set stoploss to 10% belowopen price - - ("short", 0, 0.1, 1, 0.1111111), - ("short", -0.1, 0.1, 1, 0.2222222), - ("short", 0.1, 0.2, 1, 0.125), - ("short", 0.1, 1, 1, 1), - ("short", -0.01, 5, 10, 10.01999999), # 500% profit at 10x -]) +@pytest.mark.parametrize( + "side,rel_stop,curr_profit,leverage,expected", + [ + # profit range for long is [-1, inf] while for shorts is [-inf, 1] + ("long", 0, -1, 1, 1), + ("long", 0, 0.1, 1, 0.09090909), + ("long", -0.1, 0.1, 1, 0.18181818), + ("long", 0.1, 0.2, 1, 0.08333333), + ("long", 0.1, 0.5, 1, 0.266666666), + ("long", 0.1, 5, 1, 0.816666666), # 500% profit, set stoploss to 10% above open price + ("long", 0, 5, 10, 3.3333333), # 500% profit, set stoploss break even + ("long", 0.1, 5, 10, 3.26666666), # 500% profit, set stoploss to 10% above open price + ("long", -0.1, 5, 10, 3.3999999), # 500% profit, set stoploss to 10% belowopen price + ("short", 0, 0.1, 1, 0.1111111), + ("short", -0.1, 0.1, 1, 0.2222222), + ("short", 0.1, 0.2, 1, 0.125), + ("short", 0.1, 1, 1, 1), + ("short", -0.01, 5, 10, 10.01999999), # 500% profit at 10x + ], +) def test_stoploss_from_open_leverage(side, rel_stop, curr_profit, leverage, expected): - - stoploss = stoploss_from_open(rel_stop, curr_profit, side == 'short', leverage) + stoploss = stoploss_from_open(rel_stop, curr_profit, side == "short", leverage) assert pytest.approx(stoploss) == expected open_rate = 100 if stoploss != 1: - if side == 'long': + if side == "long": current_rate = open_rate * (1 + curr_profit / leverage) stop = current_rate * (1 - stoploss / leverage) assert pytest.approx(stop) == open_rate * (1 + rel_stop / leverage) @@ -322,73 +327,79 @@ def test_stoploss_from_absolute(): assert pytest.approx(stoploss_from_absolute(100, 1, is_short=True, leverage=5)) == 5 -@pytest.mark.parametrize('trading_mode', ['futures', 'spot']) +@pytest.mark.parametrize("trading_mode", ["futures", "spot"]) def test_informative_decorator(mocker, default_conf_usdt, trading_mode): candle_def = CandleType.get_default(trading_mode) - default_conf_usdt['candle_type_def'] = candle_def - test_data_5m = generate_test_data('5m', 40) - test_data_30m = generate_test_data('30m', 40) - test_data_1h = generate_test_data('1h', 40) + default_conf_usdt["candle_type_def"] = candle_def + test_data_5m = generate_test_data("5m", 40) + test_data_30m = generate_test_data("30m", 40) + test_data_1h = generate_test_data("1h", 40) data = { - ('XRP/USDT', '5m', candle_def): test_data_5m, - ('XRP/USDT', '30m', candle_def): test_data_30m, - ('XRP/USDT', '1h', candle_def): test_data_1h, - ('XRP/BTC', '1h', candle_def): test_data_1h, # from {base}/BTC - ('LTC/USDT', '5m', candle_def): test_data_5m, - ('LTC/USDT', '30m', candle_def): test_data_30m, - ('LTC/USDT', '1h', candle_def): test_data_1h, - ('LTC/BTC', '1h', candle_def): test_data_1h, # from {base}/BTC - ('NEO/USDT', '30m', candle_def): test_data_30m, - ('NEO/USDT', '5m', CandleType.SPOT): test_data_5m, # Explicit request with '' as candletype - ('NEO/USDT', '15m', candle_def): test_data_5m, # Explicit request with '' as candletype - ('NEO/USDT', '1h', candle_def): test_data_1h, - ('ETH/USDT', '1h', candle_def): test_data_1h, - ('ETH/USDT', '30m', candle_def): test_data_30m, - ('ETH/BTC', '1h', CandleType.SPOT): test_data_1h, # Explicitly selected as spot + ("XRP/USDT", "5m", candle_def): test_data_5m, + ("XRP/USDT", "30m", candle_def): test_data_30m, + ("XRP/USDT", "1h", candle_def): test_data_1h, + ("XRP/BTC", "1h", candle_def): test_data_1h, # from {base}/BTC + ("LTC/USDT", "5m", candle_def): test_data_5m, + ("LTC/USDT", "30m", candle_def): test_data_30m, + ("LTC/USDT", "1h", candle_def): test_data_1h, + ("LTC/BTC", "1h", candle_def): test_data_1h, # from {base}/BTC + ("NEO/USDT", "30m", candle_def): test_data_30m, + ("NEO/USDT", "5m", CandleType.SPOT): test_data_5m, # Explicit request with '' as candletype + ("NEO/USDT", "15m", candle_def): test_data_5m, # Explicit request with '' as candletype + ("NEO/USDT", "1h", candle_def): test_data_1h, + ("ETH/USDT", "1h", candle_def): test_data_1h, + ("ETH/USDT", "30m", candle_def): test_data_30m, + ("ETH/BTC", "1h", CandleType.SPOT): test_data_1h, # Explicitly selected as spot } - default_conf_usdt['strategy'] = 'InformativeDecoratorTest' + default_conf_usdt["strategy"] = "InformativeDecoratorTest" strategy = StrategyResolver.load_strategy(default_conf_usdt) exchange = get_patched_exchange(mocker, default_conf_usdt) strategy.dp = DataProvider({}, exchange, None) - mocker.patch.object(strategy.dp, 'current_whitelist', return_value=[ - 'XRP/USDT', 'LTC/USDT', 'NEO/USDT' - ]) + mocker.patch.object( + strategy.dp, "current_whitelist", return_value=["XRP/USDT", "LTC/USDT", "NEO/USDT"] + ) - assert len(strategy._ft_informative) == 7 # Equal to number of decorators used + assert len(strategy._ft_informative) == 7 # Equal to number of decorators used informative_pairs = [ - ('XRP/USDT', '1h', candle_def), - ('XRP/BTC', '1h', candle_def), - ('LTC/USDT', '1h', candle_def), - ('LTC/BTC', '1h', candle_def), - ('XRP/USDT', '30m', candle_def), - ('LTC/USDT', '30m', candle_def), - ('NEO/USDT', '1h', candle_def), - ('NEO/USDT', '30m', candle_def), - ('NEO/USDT', '5m', candle_def), - ('NEO/USDT', '15m', candle_def), - ('NEO/USDT', '2h', CandleType.FUTURES), - ('ETH/BTC', '1h', CandleType.SPOT), # One candle remains as spot - ('ETH/USDT', '30m', candle_def)] + ("XRP/USDT", "1h", candle_def), + ("XRP/BTC", "1h", candle_def), + ("LTC/USDT", "1h", candle_def), + ("LTC/BTC", "1h", candle_def), + ("XRP/USDT", "30m", candle_def), + ("LTC/USDT", "30m", candle_def), + ("NEO/USDT", "1h", candle_def), + ("NEO/USDT", "30m", candle_def), + ("NEO/USDT", "5m", candle_def), + ("NEO/USDT", "15m", candle_def), + ("NEO/USDT", "2h", CandleType.FUTURES), + ("ETH/BTC", "1h", CandleType.SPOT), # One candle remains as spot + ("ETH/USDT", "30m", candle_def), + ] for inf_pair in informative_pairs: assert inf_pair in strategy.gather_informative_pairs() def test_historic_ohlcv(pair, timeframe, candle_type): return data[ - (pair, timeframe or strategy.timeframe, CandleType.from_string(candle_type))].copy() + (pair, timeframe or strategy.timeframe, CandleType.from_string(candle_type)) + ].copy() - mocker.patch('freqtrade.data.dataprovider.DataProvider.historic_ohlcv', - side_effect=test_historic_ohlcv) + mocker.patch( + "freqtrade.data.dataprovider.DataProvider.historic_ohlcv", side_effect=test_historic_ohlcv + ) analyzed = strategy.advise_all_indicators( - {p: data[(p, strategy.timeframe, candle_def)] for p in ('XRP/USDT', 'LTC/USDT')}) + {p: data[(p, strategy.timeframe, candle_def)] for p in ("XRP/USDT", "LTC/USDT")} + ) expected_columns = [ - 'rsi_1h', 'rsi_30m', # Stacked informative decorators - 'neo_usdt_rsi_1h', # NEO 1h informative - 'rsi_NEO_USDT_neo_usdt_NEO/USDT_30m', # Column formatting - 'rsi_from_callable', # Custom column formatter - 'eth_btc_rsi_1h', # Quote currency not matching stake currency - 'rsi', 'rsi_less', # Non-informative columns - 'rsi_5m', # Manual informative dataframe + "rsi_1h", + "rsi_30m", # Stacked informative decorators + "neo_usdt_rsi_1h", # NEO 1h informative + "rsi_NEO_USDT_neo_usdt_NEO/USDT_30m", # Column formatting + "rsi_from_callable", # Custom column formatter + "eth_btc_rsi_1h", # Quote currency not matching stake currency + "rsi", + "rsi_less", # Non-informative columns + "rsi_5m", # Manual informative dataframe ] for _, dataframe in analyzed.items(): for col in expected_columns: diff --git a/tests/strategy/test_strategy_loading.py b/tests/strategy/test_strategy_loading.py index 33245cc5f..523bd4a77 100644 --- a/tests/strategy/test_strategy_loading.py +++ b/tests/strategy/test_strategy_loading.py @@ -14,7 +14,7 @@ from tests.conftest import CURRENT_TEST_STRATEGY, log_has, log_has_re def test_search_strategy(): - default_location = Path(__file__).parent / 'strats' + default_location = Path(__file__).parent / "strats" s, _ = StrategyResolver._search_object( directory=default_location, @@ -25,7 +25,7 @@ def test_search_strategy(): s, _ = StrategyResolver._search_object( directory=default_location, - object_name='NotFoundStrategy', + object_name="NotFoundStrategy", add_source=True, ) assert s is None @@ -46,9 +46,9 @@ def test_search_all_strategies_with_failed(): assert len(strategies) == 14 # with enum_failed=True search_all_objects() shall find 2 good strategies # and 1 which fails to load - assert len([x for x in strategies if x['class'] is not None]) == 13 + assert len([x for x in strategies if x["class"] is not None]) == 13 - assert len([x for x in strategies if x['class'] is None]) == 1 + assert len([x for x in strategies if x["class"] is None]) == 1 directory = Path(__file__).parent / "strats_nonexistingdir" strategies = StrategyResolver._search_all_objects(directory, enum_failed=True) @@ -56,123 +56,126 @@ def test_search_all_strategies_with_failed(): def test_load_strategy(default_conf, dataframe_1m): - default_conf.update({'strategy': 'SampleStrategy', - 'strategy_path': str(Path(__file__).parents[2] / 'freqtrade/templates') - }) + default_conf.update( + { + "strategy": "SampleStrategy", + "strategy_path": str(Path(__file__).parents[2] / "freqtrade/templates"), + } + ) strategy = StrategyResolver.load_strategy(default_conf) assert isinstance(strategy.__source__, str) - assert 'class SampleStrategy' in strategy.__source__ + assert "class SampleStrategy" in strategy.__source__ assert isinstance(strategy.__file__, str) - assert 'rsi' in strategy.advise_indicators(dataframe_1m, {'pair': 'ETH/BTC'}) + assert "rsi" in strategy.advise_indicators(dataframe_1m, {"pair": "ETH/BTC"}) def test_load_strategy_base64(dataframe_1m, caplog, default_conf): - filepath = Path(__file__).parents[2] / 'freqtrade/templates/sample_strategy.py' + filepath = Path(__file__).parents[2] / "freqtrade/templates/sample_strategy.py" encoded_string = urlsafe_b64encode(filepath.read_bytes()).decode("utf-8") - default_conf.update({'strategy': f'SampleStrategy:{encoded_string}'}) + default_conf.update({"strategy": f"SampleStrategy:{encoded_string}"}) strategy = StrategyResolver.load_strategy(default_conf) - assert 'rsi' in strategy.advise_indicators(dataframe_1m, {'pair': 'ETH/BTC'}) + assert "rsi" in strategy.advise_indicators(dataframe_1m, {"pair": "ETH/BTC"}) # Make sure strategy was loaded from base64 (using temp directory)!! - assert log_has_re(r"Using resolved strategy SampleStrategy from '" - r".*(/|\\).*(/|\\)SampleStrategy\.py'\.\.\.", caplog) + assert log_has_re( + r"Using resolved strategy SampleStrategy from '" + r".*(/|\\).*(/|\\)SampleStrategy\.py'\.\.\.", + caplog, + ) def test_load_strategy_invalid_directory(caplog, default_conf, tmp_path): - default_conf['user_data_dir'] = tmp_path + default_conf["user_data_dir"] = tmp_path - extra_dir = Path.cwd() / 'some/path' + extra_dir = Path.cwd() / "some/path" with pytest.raises(OperationalException, match=r"Impossible to load Strategy.*"): - StrategyResolver._load_strategy('StrategyTestV333', config=default_conf, - extra_dir=extra_dir) + StrategyResolver._load_strategy( + "StrategyTestV333", config=default_conf, extra_dir=extra_dir + ) - assert log_has_re(r'Path .*' + r'some.*path.*' + r'.* does not exist', caplog) + assert log_has_re(r"Path .*" + r"some.*path.*" + r".* does not exist", caplog) def test_load_not_found_strategy(default_conf, tmp_path): - default_conf['user_data_dir'] = tmp_path - default_conf['strategy'] = 'NotFoundStrategy' - with pytest.raises(OperationalException, - match=r"Impossible to load Strategy 'NotFoundStrategy'. " - r"This class does not exist or contains Python code errors."): + default_conf["user_data_dir"] = tmp_path + default_conf["strategy"] = "NotFoundStrategy" + with pytest.raises( + OperationalException, + match=r"Impossible to load Strategy 'NotFoundStrategy'. " + r"This class does not exist or contains Python code errors.", + ): StrategyResolver.load_strategy(default_conf) def test_load_strategy_noname(default_conf): - default_conf['strategy'] = '' - with pytest.raises(OperationalException, - match="No strategy set. Please use `--strategy` to specify " - "the strategy class to use."): + default_conf["strategy"] = "" + with pytest.raises( + OperationalException, + match="No strategy set. Please use `--strategy` to specify " "the strategy class to use.", + ): StrategyResolver.load_strategy(default_conf) -@ pytest.mark.filterwarnings("ignore:deprecated") -@ pytest.mark.parametrize('strategy_name', ['StrategyTestV2']) +@pytest.mark.filterwarnings("ignore:deprecated") +@pytest.mark.parametrize("strategy_name", ["StrategyTestV2"]) def test_strategy_pre_v3(dataframe_1m, default_conf, strategy_name): - default_conf.update({'strategy': strategy_name}) + default_conf.update({"strategy": strategy_name}) strategy = StrategyResolver.load_strategy(default_conf) - metadata = {'pair': 'ETH/BTC'} + metadata = {"pair": "ETH/BTC"} assert strategy.minimal_roi[0] == 0.04 - assert default_conf["minimal_roi"]['0'] == 0.04 + assert default_conf["minimal_roi"]["0"] == 0.04 assert strategy.stoploss == -0.10 - assert default_conf['stoploss'] == -0.10 + assert default_conf["stoploss"] == -0.10 - assert strategy.timeframe == '5m' - assert default_conf['timeframe'] == '5m' + assert strategy.timeframe == "5m" + assert default_conf["timeframe"] == "5m" df_indicators = strategy.advise_indicators(dataframe_1m, metadata=metadata) - assert 'adx' in df_indicators + assert "adx" in df_indicators dataframe = strategy.advise_entry(df_indicators, metadata=metadata) - assert 'buy' not in dataframe.columns - assert 'enter_long' in dataframe.columns + assert "buy" not in dataframe.columns + assert "enter_long" in dataframe.columns dataframe = strategy.advise_exit(df_indicators, metadata=metadata) - assert 'sell' not in dataframe.columns - assert 'exit_long' in dataframe.columns + assert "sell" not in dataframe.columns + assert "exit_long" in dataframe.columns def test_strategy_can_short(caplog, default_conf): caplog.set_level(logging.INFO) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - }) + default_conf.update( + { + "strategy": CURRENT_TEST_STRATEGY, + } + ) strat = StrategyResolver.load_strategy(default_conf) assert isinstance(strat, IStrategy) - default_conf['strategy'] = 'StrategyTestV3Futures' + default_conf["strategy"] = "StrategyTestV3Futures" with pytest.raises(ImportError, match=""): StrategyResolver.load_strategy(default_conf) - default_conf['trading_mode'] = 'futures' + default_conf["trading_mode"] = "futures" strat = StrategyResolver.load_strategy(default_conf) assert isinstance(strat, IStrategy) def test_strategy_override_minimal_roi(caplog, default_conf): caplog.set_level(logging.INFO) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'minimal_roi': { - "20": 0.1, - "0": 0.5 - } - }) + default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "minimal_roi": {"20": 0.1, "0": 0.5}}) strategy = StrategyResolver.load_strategy(default_conf) assert strategy.minimal_roi[0] == 0.5 assert log_has( - "Override strategy 'minimal_roi' with value in config file: {'20': 0.1, '0': 0.5}.", - caplog) + "Override strategy 'minimal_roi' with value in config file: {'20': 0.1, '0': 0.5}.", caplog + ) def test_strategy_override_stoploss(caplog, default_conf): caplog.set_level(logging.INFO) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'stoploss': -0.5 - }) + default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "stoploss": -0.5}) strategy = StrategyResolver.load_strategy(default_conf) assert strategy.stoploss == -0.5 @@ -181,10 +184,7 @@ def test_strategy_override_stoploss(caplog, default_conf): def test_strategy_override_max_open_trades(caplog, default_conf): caplog.set_level(logging.INFO) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'max_open_trades': 7 - }) + default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "max_open_trades": 7}) strategy = StrategyResolver.load_strategy(default_conf) assert strategy.max_open_trades == 7 @@ -193,10 +193,7 @@ def test_strategy_override_max_open_trades(caplog, default_conf): def test_strategy_override_trailing_stop(caplog, default_conf): caplog.set_level(logging.INFO) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'trailing_stop': True - }) + default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "trailing_stop": True}) strategy = StrategyResolver.load_strategy(default_conf) assert strategy.trailing_stop @@ -206,84 +203,81 @@ def test_strategy_override_trailing_stop(caplog, default_conf): def test_strategy_override_trailing_stop_positive(caplog, default_conf): caplog.set_level(logging.INFO) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'trailing_stop_positive': -0.1, - 'trailing_stop_positive_offset': -0.2 - - }) + default_conf.update( + { + "strategy": CURRENT_TEST_STRATEGY, + "trailing_stop_positive": -0.1, + "trailing_stop_positive_offset": -0.2, + } + ) strategy = StrategyResolver.load_strategy(default_conf) assert strategy.trailing_stop_positive == -0.1 - assert log_has("Override strategy 'trailing_stop_positive' with value in config file: -0.1.", - caplog) + assert log_has( + "Override strategy 'trailing_stop_positive' with value in config file: -0.1.", caplog + ) assert strategy.trailing_stop_positive_offset == -0.2 - assert log_has("Override strategy 'trailing_stop_positive' with value in config file: -0.1.", - caplog) + assert log_has( + "Override strategy 'trailing_stop_positive' with value in config file: -0.1.", caplog + ) def test_strategy_override_timeframe(caplog, default_conf): caplog.set_level(logging.INFO) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'timeframe': 60, - 'stake_currency': 'ETH' - }) + default_conf.update( + {"strategy": CURRENT_TEST_STRATEGY, "timeframe": 60, "stake_currency": "ETH"} + ) strategy = StrategyResolver.load_strategy(default_conf) assert strategy.timeframe == 60 - assert strategy.stake_currency == 'ETH' - assert log_has("Override strategy 'timeframe' with value in config file: 60.", - caplog) + assert strategy.stake_currency == "ETH" + assert log_has("Override strategy 'timeframe' with value in config file: 60.", caplog) def test_strategy_override_process_only_new_candles(caplog, default_conf): caplog.set_level(logging.INFO) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'process_only_new_candles': False - }) + default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "process_only_new_candles": False}) strategy = StrategyResolver.load_strategy(default_conf) assert not strategy.process_only_new_candles - assert log_has("Override strategy 'process_only_new_candles' with value in config file: False.", - caplog) + assert log_has( + "Override strategy 'process_only_new_candles' with value in config file: False.", caplog + ) def test_strategy_override_order_types(caplog, default_conf): caplog.set_level(logging.INFO) order_types = { - 'entry': 'market', - 'exit': 'limit', - 'stoploss': 'limit', - 'stoploss_on_exchange': True, + "entry": "market", + "exit": "limit", + "stoploss": "limit", + "stoploss_on_exchange": True, } - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'order_types': order_types - }) + default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "order_types": order_types}) strategy = StrategyResolver.load_strategy(default_conf) assert strategy.order_types - for method in ['entry', 'exit', 'stoploss', 'stoploss_on_exchange']: + for method in ["entry", "exit", "stoploss", "stoploss_on_exchange"]: assert strategy.order_types[method] == order_types[method] - assert log_has("Override strategy 'order_types' with value in config file:" - " {'entry': 'market', 'exit': 'limit', 'stoploss': 'limit'," - " 'stoploss_on_exchange': True}.", caplog) + assert log_has( + "Override strategy 'order_types' with value in config file:" + " {'entry': 'market', 'exit': 'limit', 'stoploss': 'limit'," + " 'stoploss_on_exchange': True}.", + caplog, + ) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'order_types': {'exit': 'market'} - }) + default_conf.update({"strategy": CURRENT_TEST_STRATEGY, "order_types": {"exit": "market"}}) # Raise error for invalid configuration - with pytest.raises(ImportError, - match=r"Impossible to load Strategy '" + CURRENT_TEST_STRATEGY + "'. " - r"Order-types mapping is incomplete."): + with pytest.raises( + ImportError, + match=r"Impossible to load Strategy '" + CURRENT_TEST_STRATEGY + "'. " + r"Order-types mapping is incomplete.", + ): StrategyResolver.load_strategy(default_conf) @@ -291,50 +285,57 @@ def test_strategy_override_order_tif(caplog, default_conf): caplog.set_level(logging.INFO) order_time_in_force = { - 'entry': 'FOK', - 'exit': 'GTC', + "entry": "FOK", + "exit": "GTC", } - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'order_time_in_force': order_time_in_force - }) + default_conf.update( + {"strategy": CURRENT_TEST_STRATEGY, "order_time_in_force": order_time_in_force} + ) strategy = StrategyResolver.load_strategy(default_conf) assert strategy.order_time_in_force - for method in ['entry', 'exit']: + for method in ["entry", "exit"]: assert strategy.order_time_in_force[method] == order_time_in_force[method] - assert log_has("Override strategy 'order_time_in_force' with value in config file:" - " {'entry': 'FOK', 'exit': 'GTC'}.", caplog) + assert log_has( + "Override strategy 'order_time_in_force' with value in config file:" + " {'entry': 'FOK', 'exit': 'GTC'}.", + caplog, + ) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'order_time_in_force': {'entry': 'FOK'} - }) + default_conf.update( + {"strategy": CURRENT_TEST_STRATEGY, "order_time_in_force": {"entry": "FOK"}} + ) # Raise error for invalid configuration - with pytest.raises(ImportError, - match=f"Impossible to load Strategy '{CURRENT_TEST_STRATEGY}'. " - "Order-time-in-force mapping is incomplete."): + with pytest.raises( + ImportError, + match=f"Impossible to load Strategy '{CURRENT_TEST_STRATEGY}'. " + "Order-time-in-force mapping is incomplete.", + ): StrategyResolver.load_strategy(default_conf) def test_strategy_override_use_exit_signal(caplog, default_conf): caplog.set_level(logging.INFO) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - }) + default_conf.update( + { + "strategy": CURRENT_TEST_STRATEGY, + } + ) strategy = StrategyResolver.load_strategy(default_conf) assert strategy.use_exit_signal assert isinstance(strategy.use_exit_signal, bool) # must be inserted to configuration - assert 'use_exit_signal' in default_conf - assert default_conf['use_exit_signal'] + assert "use_exit_signal" in default_conf + assert default_conf["use_exit_signal"] - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'use_exit_signal': False, - }) + default_conf.update( + { + "strategy": CURRENT_TEST_STRATEGY, + "use_exit_signal": False, + } + ) strategy = StrategyResolver.load_strategy(default_conf) assert not strategy.use_exit_signal @@ -344,20 +345,24 @@ def test_strategy_override_use_exit_signal(caplog, default_conf): def test_strategy_override_use_exit_profit_only(caplog, default_conf): caplog.set_level(logging.INFO) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - }) + default_conf.update( + { + "strategy": CURRENT_TEST_STRATEGY, + } + ) strategy = StrategyResolver.load_strategy(default_conf) assert not strategy.exit_profit_only assert isinstance(strategy.exit_profit_only, bool) # must be inserted to configuration - assert 'exit_profit_only' in default_conf - assert not default_conf['exit_profit_only'] + assert "exit_profit_only" in default_conf + assert not default_conf["exit_profit_only"] - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'exit_profit_only': True, - }) + default_conf.update( + { + "strategy": CURRENT_TEST_STRATEGY, + "exit_profit_only": True, + } + ) strategy = StrategyResolver.load_strategy(default_conf) assert strategy.exit_profit_only @@ -367,138 +372,135 @@ def test_strategy_override_use_exit_profit_only(caplog, default_conf): def test_strategy_max_open_trades_infinity_from_strategy(caplog, default_conf): caplog.set_level(logging.INFO) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - }) - del default_conf['max_open_trades'] + default_conf.update( + { + "strategy": CURRENT_TEST_STRATEGY, + } + ) + del default_conf["max_open_trades"] strategy = StrategyResolver.load_strategy(default_conf) # this test assumes -1 set to 'max_open_trades' in CURRENT_TEST_STRATEGY - assert strategy.max_open_trades == float('inf') - assert default_conf['max_open_trades'] == float('inf') + assert strategy.max_open_trades == float("inf") + assert default_conf["max_open_trades"] == float("inf") def test_strategy_max_open_trades_infinity_from_config(caplog, default_conf, mocker): caplog.set_level(logging.INFO) - default_conf.update({ - 'strategy': CURRENT_TEST_STRATEGY, - 'max_open_trades': -1, - 'exchange': 'binance' - }) + default_conf.update( + {"strategy": CURRENT_TEST_STRATEGY, "max_open_trades": -1, "exchange": "binance"} + ) configuration = Configuration(args=default_conf) parsed_config = configuration.get_config() - assert parsed_config['max_open_trades'] == float('inf') + assert parsed_config["max_open_trades"] == float("inf") strategy = StrategyResolver.load_strategy(parsed_config) - assert strategy.max_open_trades == float('inf') + assert strategy.max_open_trades == float("inf") -@ pytest.mark.filterwarnings("ignore:deprecated") +@pytest.mark.filterwarnings("ignore:deprecated") def test_missing_implements(default_conf, caplog): - default_location = Path(__file__).parent / "strats" - default_conf.update({'strategy': 'StrategyTestV2', - 'strategy_path': default_location}) + default_conf.update({"strategy": "StrategyTestV2", "strategy_path": default_location}) StrategyResolver.load_strategy(default_conf) log_has_re(r"DEPRECATED: .*use_sell_signal.*use_exit_signal.", caplog) - default_conf['trading_mode'] = 'futures' - with pytest.raises(OperationalException, - match=r"DEPRECATED: .*use_sell_signal.*use_exit_signal."): + default_conf["trading_mode"] = "futures" + with pytest.raises( + OperationalException, match=r"DEPRECATED: .*use_sell_signal.*use_exit_signal." + ): StrategyResolver.load_strategy(default_conf) - default_conf['trading_mode'] = 'spot' + default_conf["trading_mode"] = "spot" default_location = Path(__file__).parent / "strats/broken_strats" - default_conf.update({'strategy': 'TestStrategyNoImplements', - 'strategy_path': default_location}) - with pytest.raises(OperationalException, - match=r"`populate_entry_trend` or `populate_buy_trend`.*"): + default_conf.update({"strategy": "TestStrategyNoImplements", "strategy_path": default_location}) + with pytest.raises( + OperationalException, match=r"`populate_entry_trend` or `populate_buy_trend`.*" + ): StrategyResolver.load_strategy(default_conf) - default_conf['strategy'] = 'TestStrategyNoImplementSell' + default_conf["strategy"] = "TestStrategyNoImplementSell" - with pytest.raises(OperationalException, - match=r"`populate_exit_trend` or `populate_sell_trend`.*"): + with pytest.raises( + OperationalException, match=r"`populate_exit_trend` or `populate_sell_trend`.*" + ): StrategyResolver.load_strategy(default_conf) # Futures mode is more strict ... - default_conf['trading_mode'] = 'futures' + default_conf["trading_mode"] = "futures" - with pytest.raises(OperationalException, - match=r"`populate_exit_trend` must be implemented.*"): + with pytest.raises(OperationalException, match=r"`populate_exit_trend` must be implemented.*"): StrategyResolver.load_strategy(default_conf) - default_conf['strategy'] = 'TestStrategyNoImplements' - with pytest.raises(OperationalException, - match=r"`populate_entry_trend` must be implemented.*"): + default_conf["strategy"] = "TestStrategyNoImplements" + with pytest.raises(OperationalException, match=r"`populate_entry_trend` must be implemented.*"): StrategyResolver.load_strategy(default_conf) - default_conf['strategy'] = 'TestStrategyImplementCustomSell' - with pytest.raises(OperationalException, - match=r"Please migrate your implementation of `custom_sell`.*"): + default_conf["strategy"] = "TestStrategyImplementCustomSell" + with pytest.raises( + OperationalException, match=r"Please migrate your implementation of `custom_sell`.*" + ): StrategyResolver.load_strategy(default_conf) - default_conf['strategy'] = 'TestStrategyImplementBuyTimeout' - with pytest.raises(OperationalException, - match=r"Please migrate your implementation of `check_buy_timeout`.*"): + default_conf["strategy"] = "TestStrategyImplementBuyTimeout" + with pytest.raises( + OperationalException, match=r"Please migrate your implementation of `check_buy_timeout`.*" + ): StrategyResolver.load_strategy(default_conf) - default_conf['strategy'] = 'TestStrategyImplementSellTimeout' - with pytest.raises(OperationalException, - match=r"Please migrate your implementation of `check_sell_timeout`.*"): + default_conf["strategy"] = "TestStrategyImplementSellTimeout" + with pytest.raises( + OperationalException, match=r"Please migrate your implementation of `check_sell_timeout`.*" + ): StrategyResolver.load_strategy(default_conf) def test_call_deprecated_function(default_conf): default_location = Path(__file__).parent / "strats/broken_strats/" - del default_conf['timeframe'] - default_conf.update({'strategy': 'TestStrategyLegacyV1', - 'strategy_path': default_location}) - with pytest.raises(OperationalException, - match=r"Strategy Interface v1 is no longer supported.*"): + del default_conf["timeframe"] + default_conf.update({"strategy": "TestStrategyLegacyV1", "strategy_path": default_location}) + with pytest.raises( + OperationalException, match=r"Strategy Interface v1 is no longer supported.*" + ): StrategyResolver.load_strategy(default_conf) def test_strategy_interface_versioning(dataframe_1m, default_conf): - default_conf.update({'strategy': 'StrategyTestV2'}) + default_conf.update({"strategy": "StrategyTestV2"}) strategy = StrategyResolver.load_strategy(default_conf) - metadata = {'pair': 'ETH/BTC'} + metadata = {"pair": "ETH/BTC"} assert strategy.INTERFACE_VERSION == 2 indicator_df = strategy.advise_indicators(dataframe_1m, metadata=metadata) assert isinstance(indicator_df, DataFrame) - assert 'adx' in indicator_df.columns + assert "adx" in indicator_df.columns enterdf = strategy.advise_entry(dataframe_1m, metadata=metadata) assert isinstance(enterdf, DataFrame) - assert 'buy' not in enterdf.columns - assert 'enter_long' in enterdf.columns + assert "buy" not in enterdf.columns + assert "enter_long" in enterdf.columns exitdf = strategy.advise_exit(dataframe_1m, metadata=metadata) assert isinstance(exitdf, DataFrame) - assert 'sell' not in exitdf - assert 'exit_long' in exitdf + assert "sell" not in exitdf + assert "exit_long" in exitdf def test_strategy_ft_load_params_from_file(mocker, default_conf): - default_conf.update({'strategy': 'StrategyTestV2'}) - del default_conf['max_open_trades'] - mocker.patch('freqtrade.strategy.hyper.HyperStrategyMixin.load_params_from_file', - return_value={ - 'params': { - 'max_open_trades': { - 'max_open_trades': -1 - } - } - }) + default_conf.update({"strategy": "StrategyTestV2"}) + del default_conf["max_open_trades"] + mocker.patch( + "freqtrade.strategy.hyper.HyperStrategyMixin.load_params_from_file", + return_value={"params": {"max_open_trades": {"max_open_trades": -1}}}, + ) strategy = StrategyResolver.load_strategy(default_conf) - assert strategy.max_open_trades == float('inf') - assert strategy.config['max_open_trades'] == float('inf') + assert strategy.max_open_trades == float("inf") + assert strategy.config["max_open_trades"] == float("inf")