0
点赞
收藏
分享

微信扫一扫

[Mac + Icarus Verilog + gtkwave] Mac运行Verilog及查看波形图

科牛 2024-11-03 阅读 8
交互

代码实现了一个基于 OpenFOAM 的强化学习环境类OpenFoam,用于模拟流体动力学问题并与智能体进行交互。该环境类继承自gym.Env,提供了与强化学习算法进行交互的接口,包括初始化环境、执行动作、获取状态和奖励、重置环境以及关闭环境等方法。

#完整代码下载:https://download.csdn.net/download/huanghm88/89909280

from typing import Any, Callable, List, Optional, Tuple, Union

import gym
import numpy as np
import os
import re
import shutil
from abc import ABCMeta, abstractmethod
from time import time
from scipy import signal
import numpy as np
import pandas as pd
from DRLinFluids import cfd, utils
from gym import spaces, logger
from gym.utils import seeding
from tianshou.utils import RunningMeanStd
from sklearn.preprocessing import StandardScaler


class OpenFoam(gym.Env):
	def __init__(self,
	             foam_root_path:Optional[str]= None,
	              foam_params: Optional[dict]= None,
	             agent_params: Optional[dict]= None,
	             state_params: Optional[dict]= None,
	             server=True,**kwargs):
		self.foam_params = foam_params
		self.agent_params = agent_params
		self.state_params = state_params
		self.foam_root_path = foam_root_path
		self.task = 'OpenFoam-v0'
		self.state_params['probe_info'] = utils.read_foam_file(
			'/'.join([foam_root_path, 'system', 'probes'])
		)
		self.dashboard_data = {
   }
		self.trajectory_start_time = 0
		self.trajectory_end_time = 0
		self.num_episode = 0
		self.info_list = []
		self.episode_reward_sequence = []
		self.exec_info = {
   }
		self.num_trajectory = 0
		self.trajectory_reward = np.array([])
		self.all_episode_trajectory_reward = pd.DataFrame()
		self.state_data = np.array([])
		self.episode_reward = 0
		self.decorated_actions = np.array([])
		self.actions_sequence = np.array([])
		self.start_actions = 0
		self.end_actions = 0
		self.single_step_actions = np.array([])
		self.all_episode_actions = pd.DataFrame()
		self.all_episode_decorated_actions = pd.DataFrame()
		self.all_episode_single_step_actions = pd.DataFrame()
		self.probe_velocity_df = pd.DataFrame()
		self.probe_pressure_df = pd.DataFrame()
		self.force_df = pd.DataFrame()
		self.force_Coeffs_df = pd.DataFrame()
		self.history_force_df = pd.DataFrame()
		self.initial_force_Coeffs_df = pd.DataFrame()
		self.history_force_Coeffs_df = pd.DataFrame()
		self.history_force_Coeffs_df_alltime = pd.DataFrame()
		self.history_force_Coeffs_df_stepnumber=0
		self.start_time_float=0
		self.end_time_float=0
		self.action_time = 0
		self.vortex_shedding = 0
		self.svd_rank_df=10

		self.cfd_init_time_str = str(float(foam_params['cfd_init_time'])).rstrip('0').rstrip('.')
		self.decimal = int(np.max([
			len(str(agent_params['interaction_period']).split('.')[-1]),
			len(str(foam_params['cfd_init_time']).split('.')[-1])
		]))
		self.pressure_DMD_initial_snapshot=np.array([])
		self.control_matrix_gammaDMDc=np.array([])

		if server:
			action_tocsv_list = [[0, 0, 0, 0],
				[self.foam_params['cfd_init_time'], 0, 0, 0]]
			pd.DataFrame(
					action_tocsv_list
				).to_csv(self.foam_root_path + '/system/jet.csv', index=False, header=False)
			for f_name in os.listdir(self.foam_root_path):
				if f_name == 'prosessor0':
					shutil.rmtree('/'.join([self.foam_root_path, f_name]))
				elif f_name == 'prosessor1':
					shutil.rmtree('/'.join([self.foam_root_path, f_name]))
				elif f_name == 'prosessor2':
					shutil.rmtree('/'.join([self.foam_root_path, f_name]))
				elif f_name == 'prosessor3':
					shutil.rmtree('/'.join([self.foam_root_path, f_name]))
				else:
					pass
			cfd.run_init(foam_root_path, foam_params)

			self.velocity_table_init = utils.read_foam_file(
				foam_root_path + f'/postProcessing/probes/0.000/U',
				dimension=self.foam_params['num_dimension']
			)
			cfd_init_time = int(self.foam_
举报

相关推荐

0 条评论