freqtrade_origin/freqtrade/freqai/prediction_models/ReinforcementLearningTDQN.py
2022-08-24 13:00:55 +02:00

304 lines
11 KiB
Python

import logging
from typing import Any, Dict # Optional
from enum import Enum
import numpy as np
import torch as th
from stable_baselines3.common.callbacks import EvalCallback
from stable_baselines3.common.monitor import Monitor
# from stable_baselines3.common.vec_env import SubprocVecEnv
from freqtrade.freqai.RL.BaseRLEnv import BaseRLEnv
from freqtrade.freqai.RL.BaseReinforcementLearningModel import BaseReinforcementLearningModel
from freqtrade.freqai.RL.TDQNagent import TDQN
from stable_baselines3.common.buffers import ReplayBuffer
from gym import spaces
from gym.utils import seeding
logger = logging.getLogger(__name__)
class ReinforcementLearningTDQN(BaseReinforcementLearningModel):
"""
User created Reinforcement Learning Model prediction model.
"""
def fit(self, data_dictionary: Dict[str, Any], pair: str = ''):
agent_params = self.freqai_info['model_training_parameters']
reward_params = self.freqai_info['model_reward_parameters']
train_df = data_dictionary["train_features"]
test_df = data_dictionary["test_features"]
eval_freq = agent_params["eval_cycles"] * len(test_df)
total_timesteps = agent_params["train_cycles"] * len(train_df)
# price data for model training and evaluation
price = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(len(train_df.index))
price_test = self.dd.historic_data[pair][f"{self.config['timeframe']}"].tail(
len(test_df.index))
# environments
train_env = MyRLEnv(df=train_df, prices=price, window_size=self.CONV_WIDTH,
reward_kwargs=reward_params)
eval = MyRLEnv(df=test_df, prices=price_test,
window_size=self.CONV_WIDTH, reward_kwargs=reward_params)
eval_env = Monitor(eval, ".")
eval_env.reset()
path = self.dk.data_path
eval_callback = EvalCallback(eval_env, best_model_save_path=f"{path}/",
log_path=f"{path}/tdqn/logs/", eval_freq=int(eval_freq),
deterministic=True, render=False)
# model arch
policy_kwargs = dict(activation_fn=th.nn.ReLU,
net_arch=[256, 256, 128])
model = TDQN('TMultiInputPolicy', train_env,
policy_kwargs=policy_kwargs,
tensorboard_log=f"{path}/tdqn/tensorboard/",
learning_rate=0.00025, gamma=0.9,
target_update_interval=5000, buffer_size=50000,
exploration_initial_eps=1, exploration_final_eps=0.1,
replay_buffer_class=ReplayBuffer
)
model.learn(
total_timesteps=int(total_timesteps),
callback=eval_callback
)
print('Training finished!')
return model
class Actions(Enum):
Neutral = 0
Long_buy = 1
Long_sell = 2
Short_buy = 3
Short_sell = 4
class Positions(Enum):
Short = 0
Long = 1
Neutral = 0.5
def opposite(self):
return Positions.Short if self == Positions.Long else Positions.Long
class MyRLEnv(BaseRLEnv):
"""
User can override any function in BaseRLEnv and gym.Env. Here the user
Adds 5 actions.
"""
metadata = {'render.modes': ['human']}
def __init__(self, df, prices, reward_kwargs, window_size=10, starting_point=True, ):
assert df.ndim == 2
self.seed()
self.df = df
self.signal_features = self.df
self.prices = prices
self.window_size = window_size
self.starting_point = starting_point
self.rr = reward_kwargs["rr"]
self.profit_aim = reward_kwargs["profit_aim"]
self.fee = 0.0015
# # spaces
self.shape = (window_size, self.signal_features.shape[1])
self.action_space = spaces.Discrete(len(Actions))
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf, shape=self.shape, dtype=np.float32)
# episode
self._start_tick = self.window_size
self._end_tick = len(self.prices) - 1
self._done = None
self._current_tick = None
self._last_trade_tick = None
self._position = Positions.Neutral
self._position_history = None
self.total_reward = None
self._total_profit = None
self._first_rendering = None
self.history = None
self.trade_history = []
# self.A_t, self.B_t = 0.000639, 0.00001954
self.r_t_change = 0.
self.returns_report = []
def seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
def reset(self):
self._done = False
if self.starting_point is True:
self._position_history = (self._start_tick * [None]) + [self._position]
else:
self._position_history = (self.window_size * [None]) + [self._position]
self._current_tick = self._start_tick
self._last_trade_tick = None
self._position = Positions.Neutral
self.total_reward = 0.
self._total_profit = 1. # unit
self._first_rendering = True
self.history = {}
self.trade_history = []
self.portfolio_log_returns = np.zeros(len(self.prices))
self._profits = [(self._start_tick, 1)]
self.close_trade_profit = []
self.r_t_change = 0.
self.returns_report = []
return self._get_observation()
def step(self, action):
self._done = False
self._current_tick += 1
if self._current_tick == self._end_tick:
self._done = True
self.update_portfolio_log_returns(action)
self._update_profit(action)
step_reward = self.calculate_reward(action)
self.total_reward += step_reward
trade_type = None
if self.is_tradesignal(action): # exclude 3 case not trade
# Update position
"""
Action: Neutral, position: Long -> Close Long
Action: Neutral, position: Short -> Close Short
Action: Long, position: Neutral -> Open Long
Action: Long, position: Short -> Close Short and Open Long
Action: Short, position: Neutral -> Open Short
Action: Short, position: Long -> Close Long and Open Short
"""
if action == Actions.Neutral.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Long_buy.value:
self._position = Positions.Long
trade_type = "long"
elif action == Actions.Short_buy.value:
self._position = Positions.Short
trade_type = "short"
elif action == Actions.Long_sell.value:
self._position = Positions.Neutral
trade_type = "neutral"
elif action == Actions.Short_sell.value:
self._position = Positions.Neutral
trade_type = "neutral"
else:
print("case not defined")
# Update last trade tick
self._last_trade_tick = self._current_tick
if trade_type is not None:
self.trade_history.append(
{'price': self.current_price(), 'index': self._current_tick,
'type': trade_type})
if self._total_profit < 0.2:
self._done = True
self._position_history.append(self._position)
observation = self._get_observation()
info = dict(
tick=self._current_tick,
total_reward=self.total_reward,
total_profit=self._total_profit,
position=self._position.value
)
self._update_history(info)
return observation, step_reward, self._done, info
def _get_observation(self):
return self.signal_features[(self._current_tick - self.window_size):self._current_tick]
def get_unrealized_profit(self):
if self._last_trade_tick is None:
return 0.
if self._position == Positions.Neutral:
return 0.
elif self._position == Positions.Short:
current_price = self.add_buy_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_sell_fee(self.prices.iloc[self._last_trade_tick].open)
return (last_trade_price - current_price) / last_trade_price
elif self._position == Positions.Long:
current_price = self.add_sell_fee(self.prices.iloc[self._current_tick].open)
last_trade_price = self.add_buy_fee(self.prices.iloc[self._last_trade_tick].open)
return (current_price - last_trade_price) / last_trade_price
else:
return 0.
def is_tradesignal(self, action):
# trade signal
"""
not trade signal is :
Action: Neutral, position: Neutral -> Nothing
Action: Long, position: Long -> Hold Long
Action: Short, position: Short -> Hold Short
"""
return not ((action == Actions.Neutral.value and self._position == Positions.Neutral) or
(action == Actions.Short_buy.value and self._position == Positions.Short) or
(action == Actions.Short_sell.value and self._position == Positions.Short) or
(action == Actions.Short_buy.value and self._position == Positions.Long) or
(action == Actions.Short_sell.value and self._position == Positions.Long) or
(action == Actions.Long_buy.value and self._position == Positions.Long) or
(action == Actions.Long_sell.value and self._position == Positions.Long) or
(action == Actions.Long_buy.value and self._position == Positions.Short) or
(action == Actions.Long_sell.value and self._position == Positions.Short))
def _is_trade(self, action):
return ((action == Actions.Long_buy.value and self._position == Positions.Short) or
(action == Actions.Short_buy.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Long) or
(action == Actions.Neutral.value and self._position == Positions.Short) or
(action == Actions.Neutral.Short_sell and self._position == Positions.Long) or
(action == Actions.Neutral.Long_sell and self._position == Positions.Short)
)
def is_hold(self, action):
return ((action == Actions.Short.value and self._position == Positions.Short)
or (action == Actions.Long.value and self._position == Positions.Long))
def add_buy_fee(self, price):
return price * (1 + self.fee)
def add_sell_fee(self, price):
return price / (1 + self.fee)
def _update_history(self, info):
if not self.history:
self.history = {key: [] for key in info.keys()}
for key, value in info.items():
self.history[key].append(value)