import math
import os
import platform
import re
import subprocess
from xml.dom import minidom
import xml.etree.ElementTree as et
from enum import Enum, unique
from typing import Dict, List, Union, Tuple
from datetime import datetime
import numpy as np
import vehiclemodels.parameters_vehicle1 as p1
import vehiclemodels.parameters_vehicle2 as p2
import vehiclemodels.parameters_vehicle3 as p3
import vehiclemodels.parameters_vehicle4 as p4
from commonroad.common.validity import is_positive, is_real_number
from commonroad.geometry.shape import Rectangle
from commonroad.prediction.prediction import TrajectoryPrediction
from commonroad.scenario.obstacle import DynamicObstacle, ObstacleType
from commonroad.scenario.scenario import ScenarioID
from commonroad.scenario.state import TraceState, PMState, MBState, KSState, STState, PMInputState, InputState
from commonroad.scenario.trajectory import Trajectory
[docs]class SolutionException(Exception):
"""
Main exception class for solution related exceptions.
"""
pass
[docs]class StateTypeException(SolutionException):
"""
Main exception class for StateType related exceptions.
"""
pass
[docs]class SolutionReaderException(Exception):
"""
Main exception class for solution reader related exceptions.
"""
pass
@unique
class VehicleType(Enum):
FORD_ESCORT = 1
BMW_320i = 2
VW_VANAGON = 3
TRUCK = 4
vehicle_parameters = {VehicleType.FORD_ESCORT: p1.parameters_vehicle1(),
VehicleType.BMW_320i: p2.parameters_vehicle2(),
VehicleType.VW_VANAGON: p3.parameters_vehicle3(),
VehicleType.TRUCK: p4.parameters_vehicle4()}
@unique
class VehicleModel(Enum):
PM = 0
ST = 1
KS = 2
MB = 3
KST = 4
@unique
class CostFunction(Enum):
JB1 = 0
SA1 = 1
WX1 = 2
SM1 = 3
SM2 = 4
SM3 = 5
MW1 = 6
TR1 = 7
@unique
class StateFields(Enum):
"""
State Fields enum class for defining the state fields for vehicle models for different trajectory types.
PM | ST | KS | KST | MB -> Corresponding state fields for trajectory states
Input -> Input fields for ST, KS, and MB vehicle models
PMInput -> Input fields for PM vehicle model.
Note: If you change the order of field names, don't forget to change the order on the XMLStateFields enum as well,
because the indexes have to match.
"""
PM = ['position', 'velocity', 'velocity_y', 'time_step']
ST = ['position', 'steering_angle', 'velocity', 'orientation', 'yaw_rate', 'slip_angle', 'time_step']
KS = ['position', 'steering_angle', 'velocity', 'orientation', 'time_step']
KST = ['position', 'steering_angle', 'velocity', 'orientation', 'hitch_angle', 'time_step']
MB = ['position', 'steering_angle', 'velocity', 'orientation', 'yaw_rate', 'roll_angle', 'roll_rate', 'pitch_angle',
'pitch_rate', 'velocity_y', 'position_z', 'velocity_z', 'roll_angle_front', 'roll_rate_front',
'velocity_y_front', 'position_z_front', 'velocity_z_front', 'roll_angle_rear', 'roll_rate_rear',
'velocity_y_rear', 'position_z_rear', 'velocity_z_rear', 'left_front_wheel_angular_speed',
'right_front_wheel_angular_speed', 'left_rear_wheel_angular_speed', 'right_rear_wheel_angular_speed',
'delta_y_f', 'delta_y_r', 'time_step']
Input = ['steering_angle_speed', 'acceleration', 'time_step']
PMInput = ['acceleration', 'acceleration_y', 'time_step']
@unique
class XMLStateFields(Enum):
"""
XML names of the state fields for vehicle models for different trajectory types.
PM | ST | KS | KST | MB -> Corresponding xml names of the state fields for trajectory states
Input -> XML names of the input fields for ST, KS, and MB vehicle models
PMInput -> XML names of the input fields for PM vehicle model.
Note: If you change the order of xml names, don't forget to change the order on the StateFields enum as well,
because the indexes have to match.
"""
PM = [('x', 'y'), 'xVelocity', 'yVelocity', 'time']
ST = [('x', 'y'), 'steeringAngle', 'velocity', 'orientation', 'yawRate', 'slipAngle', 'time']
KS = [('x', 'y'), 'steeringAngle', 'velocity', 'orientation', 'time']
KST = [('x', 'y'), 'steeringAngle', 'velocity', 'orientation', 'hitch_angle', 'time']
MB = [('x', 'y'), 'steeringAngle', 'velocity', 'orientation', 'yawRate', 'rollAngle', 'rollRate', 'pitchAngle',
'pitchRate', 'yVelocity', 'zPosition', 'zVelocity', 'rollAngleFront', 'rollRateFront',
'yVelocityFront', 'zPositionFront', 'zVelocityFront', 'rollAngleRear', 'rollRateRear',
'yVelocityRear', 'zPositionRear', 'zVelocityRear', 'leftFrontWheelAngularSpeed',
'rightFrontWheelAngularSpeed', 'leftRearWheelAngularSpeed', 'rightRearWheelAngularSpeed',
'deltaYf', 'deltaYr', 'time']
Input = ['steeringAngleSpeed', 'acceleration', 'time']
PMInput = ['xAcceleration', 'yAcceleration', 'time']
@unique
class StateType(Enum):
"""
State Type enum class.
PM | ST | KS | KST | MB -> Corresponding state type for trajectory states
Input -> Input type for ST, KS, and MB vehicle models
PMInput -> Input type for PM vehicle model.
"""
MB = 'mbState'
ST = 'stState'
KS = 'ksState'
KST = 'kstState'
PM = 'pmState'
Input = 'input'
PMInput = 'pmInput'
@property
def fields(self) -> List[str]:
"""
Returns the state fields for the state type.
:return: State fields as list
"""
return StateFields[self.name].value
@property
def xml_fields(self) -> List[str]:
"""
Returns the xml state fields for the state type.
:return: XML names of the state fields as list
"""
return XMLStateFields[self.name].value
@classmethod
def get_state_type(cls, state: TraceState, desired_vehicle_model: VehicleModel = None) -> 'StateType':
"""
Returns the corresponding StateType for the given State object by matching State object's attributes
to the state fields.
:param state: CommonRoad State object
:param desired_vehicle_model: check if given vehicle_model is supported first
:return: corresponding StateType
"""
# put desired_vehicle_model first
attrs = state.attributes
if desired_vehicle_model is not None:
state_fields_all = [StateFields[str(desired_vehicle_model.name)], StateFields.Input, StateFields.PMInput]
state_fields_add = []
for sf in StateFields:
if sf not in state_fields_all:
state_fields_add.append(sf)
state_fields_all += state_fields_add
for state_fields in state_fields_all:
if not len(attrs) >= len(state_fields.value):
continue # >=
if not all([sf in attrs for sf in state_fields.value]):
continue
return cls[state_fields.name]
else:
state_fields_all = StateFields
for state_fields in state_fields_all:
if not len(attrs) == len(state_fields.value):
continue # ==
if not all([sf in attrs for sf in state_fields.value]):
continue
return cls[state_fields.name]
raise StateTypeException('Given state is not valid!')
@classmethod
def check_state_type(cls, vehicle_model: VehicleModel) -> None:
"""
Checks whether vehicle model can be supported by trajectory.
:param vehicle_model: vehicle model enum
:return: bool
"""
StateFields(vehicle_model.name)
@unique
class TrajectoryType(Enum):
"""
Trajectory Type enum class.
PM | ST | KS | KST | MB -> Corresponding trajectory type for the vehicle models
Input -> InputVector type for ST, KS, and MB vehicle models
PMInput -> InputVector type for PM vehicle model.
"""
MB = 'mbTrajectory'
ST = 'stTrajectory'
KS = 'ksTrajectory'
KST = 'kstTrajectory'
PM = 'pmTrajectory'
Input = 'inputVector'
PMInput = 'pmInputVector'
@property
def state_type(self) -> StateType:
"""
Returns the StateType corresponding to the TrajectoryType
:return: StateType
"""
return StateType[self.name]
@classmethod
def get_trajectory_type(cls, trajectory: Trajectory,
desired_vehicle_model: VehicleModel = None) -> 'TrajectoryType':
"""
Returns the corresponding TrajectoryType for the given Trajectory object based on the StateType of its states.
:param trajectory: CommonRoad Trajectory object
:param desired_vehicle_model: check if given vehicle_model is supported first
:return: corresponding TrajectoryType
"""
state_type = StateType.get_state_type(trajectory.state_list[0], desired_vehicle_model)
return cls[state_type.name]
def valid_vehicle_model(self, vehicle_model: VehicleModel) -> bool:
"""
Checks whether given vehicle model is valid for the TrajectoryType.
:param vehicle_model: CommonRoad enum for vehicle models
:return: True if the vehicle model is valid for the TrajectoryType
"""
return any([
self.name == 'Input' and vehicle_model in [VehicleModel.KS, VehicleModel.ST, VehicleModel.MB],
self.name == 'PMInput' and vehicle_model == VehicleModel.PM,
self.name == vehicle_model.name
])
class SupportedCostFunctions(Enum):
"""
Enum class for specifying which cost functions are supported for which vehicle model
"""
PM = [CostFunction.JB1, CostFunction.WX1, CostFunction.MW1]
ST = [cost_function for cost_function in CostFunction] # Supports all cost functions
KS = [cost_function for cost_function in CostFunction] # Supports all cost functions
MB = [cost_function for cost_function in CostFunction] # Supports all cost functions
KST = [cost_function for cost_function in CostFunction] # Supports all cost functions
class PlanningProblemSolution:
def __init__(self,
planning_problem_id: int,
vehicle_model: VehicleModel,
vehicle_type: VehicleType,
cost_function: CostFunction,
trajectory: Trajectory):
"""
Constructor for the PlanningProblemSolution class.
:param planning_problem_id: ID of the planning problem
:param vehicle_model: VehicleModel used for the solution
:param vehicle_type: VehicleType used for the solution
:param cost_function: CostFunction the solution will be evaluated with
:param trajectory: Ego vehicle's trajectory for the solution.
"""
self.planning_problem_id = planning_problem_id
self._vehicle_model = vehicle_model
self.vehicle_type = vehicle_type
self._cost_function = cost_function
self._trajectory = trajectory
self._trajectory_type = TrajectoryType.get_trajectory_type(self._trajectory, self.vehicle_model)
self._check_trajectory_supported(self._vehicle_model, self._trajectory_type)
self._check_cost_supported(self._vehicle_model, self._cost_function)
@staticmethod
def _check_cost_supported(vehicle_model: VehicleModel, cost_function: CostFunction) -> bool:
"""
Checks whether given cost function is supported by the given vehicle model.
:param vehicle_model: VehicleModel
:param cost_function: CostFunction
:return: True if supported.
"""
supported_costs = SupportedCostFunctions[vehicle_model.name].value
if cost_function not in supported_costs:
raise SolutionException("Cost function %s isn't supported for %s model!" % (cost_function.name,
vehicle_model.name))
return True
def _check_trajectory_supported(self, vehicle_model: VehicleModel, trajectory_type: TrajectoryType) -> bool:
"""
Checks whether given vehicle model is valid for the given trajectory type.
:param vehicle_model: VehicleModel
:param trajectory_type: TrajectoryType
:return: True if valid.
"""
if self._vehicle_model == VehicleModel.PM and self._trajectory_type == TrajectoryType.PM:
for state in self._trajectory.state_list:
if isinstance(state, PMState) and not hasattr(state, 'orientation'):
state.orientation = math.atan2(state.velocity_y, state.velocity)
if not trajectory_type.valid_vehicle_model(vehicle_model):
raise SolutionException('Vehicle model %s is not valid for the trajectory type %s!'
% (vehicle_model.name, trajectory_type.name))
return True
@property
def vehicle_model(self) -> VehicleModel:
""" VehicleModel of the PlanningProblemSolution """
return self._vehicle_model
@vehicle_model.setter
def vehicle_model(self, vehicle_model: VehicleModel):
self._check_trajectory_supported(vehicle_model, self._trajectory_type)
self._check_cost_supported(vehicle_model, self.cost_function)
self._vehicle_model = vehicle_model
@property
def cost_function(self) -> CostFunction:
""" CostFunction of the PlanningProblemSolution """
return self._cost_function
@cost_function.setter
def cost_function(self, cost_function: CostFunction):
self._check_cost_supported(self.vehicle_model, cost_function)
self._cost_function = cost_function
@property
def trajectory(self) -> Trajectory:
""" Trajectory of the PlanningProblemSolution """
return self._trajectory
@trajectory.setter
def trajectory(self, trajectory: Trajectory):
trajectory_type = TrajectoryType.get_trajectory_type(trajectory)
self._check_trajectory_supported(self.vehicle_model, trajectory_type)
self._trajectory = trajectory
self._trajectory_type = trajectory_type
@property
def trajectory_type(self) -> TrajectoryType:
"""
TrajectoryType of the PlanningProblemSolution.
Dynamically assigned when there is a change of trajectory.
"""
return self._trajectory_type
@property
def vehicle_id(self) -> str:
"""
Returns the vehicle id as string.
Example:
VehicleModel = PM
VehicleType = FORD_ESCORT
Vehicle ID = PM1
:return: Vehicle model ID
"""
return self.vehicle_model.name + str(self.vehicle_type.value)
@property
def cost_id(self) -> str:
"""
Returns cost function id as str.
Example:
CostFunction = JB1
Cost ID = JB1
:return: Cost function ID
"""
return self.cost_function.name
class Solution:
"""Stores a solution to a CommonRoad benchmark and additional meta data."""
def __init__(self,
scenario_id: ScenarioID,
planning_problem_solutions: List[PlanningProblemSolution],
date: datetime = datetime.today(),
computation_time: Union[float, None] = None,
processor_name: Union[str, None] = None):
"""
:param scenario_id: Scenario ID of the Solution
:param planning_problem_solutions: List of PlanningProblemSolution for corresponding
to the planning problems of the scenario
:param date: The date solution was produced. Default=datetime.today()
:param computation_time: The computation time measured in seconds for the Solution. Default=None
:param processor_name: The processor model used for the Solution. Determined automatically if set to 'auto'.
Default=None.
"""
self.scenario_id = scenario_id
self._planning_problem_solutions: Dict[int, PlanningProblemSolution] = {}
self.planning_problem_solutions = planning_problem_solutions
self.date = date
self._computation_time = None
self.computation_time = computation_time
self.processor_name = processor_name
@property
def planning_problem_solutions(self) -> List[PlanningProblemSolution]:
return list(self._planning_problem_solutions.values())
@planning_problem_solutions.setter
def planning_problem_solutions(self, planning_problem_solutions: List[PlanningProblemSolution]):
self._planning_problem_solutions = {s.planning_problem_id: s for s in planning_problem_solutions}
@property
def benchmark_id(self) -> str:
"""
Returns the benchmark id of the solution as string.
Example:
Scenario ID = TEST
VehicleModel = PM
VehicleType = FORD_ESCORT
CostFunction = JB1
Version = 2020a
Benchmark ID = PM1:JB1:TEST:2020a
Collaborative Solution Example:
Scenario ID = TEST
1st VehicleModel = PM
1st VehicleType = FORD_ESCORT
1st CostFunction = JB1
2nd VehicleModel = PM
2nd VehicleType = VW_VANAGON
2nd CostFunction = SA1
Version = 2020a
Benchmark ID = [PM1,PM3]:[JB1,SA1]:TEST:2020a
:return: Benchmark ID
"""
vehicle_ids = self.vehicle_ids
cost_ids = self.cost_ids
vehicles_str = vehicle_ids[0] if len(vehicle_ids) == 1 else '[%s]' % ','.join(vehicle_ids)
costs_str = cost_ids[0] if len(cost_ids) == 1 else '[%s]' % ','.join(cost_ids)
return '%s:%s:%s:%s' % (vehicles_str, costs_str, str(self.scenario_id), self.scenario_id.scenario_version)
@property
def vehicle_ids(self) -> List[str]:
"""
Returns the list of vehicle ids of all PlanningProblemSolutions of the Solution
Example:
1st PlanningProblemSolution Vehicle ID = PM1
2nd PlanningProblemSolution Vehicle ID = PM3
Vehicle IDS = [PM1, PM3]
:return: List of vehicle IDs
"""
return [pp_solution.vehicle_id for pp_solution in self.planning_problem_solutions]
@property
def cost_ids(self) -> List[str]:
"""
Returns the list of cost ids of all PlanningProblemSolutions of the Solution
Example:
1st PlanningProblemSolution Cost ID = JB1
2nd PlanningProblemSolution Cost ID = SA1
Cost IDS = [JB1, SA1]
:return: List of cost function IDs
"""
return [pp_solution.cost_id for pp_solution in self.planning_problem_solutions]
@property
def planning_problem_ids(self) -> List[int]:
"""
Returns the list of planning problem ids of all PlanningProblemSolutions of the Solution
Example:
1st PlanningProblemSolution planning_problem_id = 0
2nd PlanningProblemSolution planning_problem_id = 1
planning_problem_ids = [0, 1]
:return: List of planning problem ids
"""
return [pp_solution.planning_problem_id for pp_solution in self.planning_problem_solutions]
@property
def trajectory_types(self) -> List[TrajectoryType]:
"""
Returns the list of trajectory types of all PlanningProblemSolutions of the Solution
Example:
1st PlanningProblemSolution trajectory_type = TrajectoryType.PM
2nd PlanningProblemSolution trajectory_type = TrajectoryType.KS
trajectory_types = [TrajectoryType.PM, TrajectoryType.KS]
:return: List of trajectory types
"""
return [pp_solution.trajectory_type for pp_solution in self.planning_problem_solutions]
@property
def computation_time(self) -> Union[None, float]:
"""
Return the computation time [s] for the trajectory.
:return:
"""
return self._computation_time
@computation_time.setter
def computation_time(self, computation_time):
if computation_time is not None:
assert is_real_number(computation_time), "<Solution> computation_time provided as type {}," \
"but expected type float," \
"measured in seconds!".format(type(computation_time))
assert is_positive(computation_time), "<Solution> computation_time needs to be positive!"\
.format(type(computation_time))
self._computation_time = computation_time
def create_dynamic_obstacle(self) -> Dict[int, DynamicObstacle]:
"""
Creates dynamic obstacle(s) from solution(s) for every planning problem.
:return:
"""
obs = {}
for pp_id, solution in self._planning_problem_solutions.items():
shape = Rectangle(length=vehicle_parameters[solution.vehicle_type].l,
width=vehicle_parameters[solution.vehicle_type].w)
trajectory = Trajectory(initial_time_step=solution.trajectory.initial_time_step + 1,
state_list=solution.trajectory.state_list[1:])
prediction = TrajectoryPrediction(trajectory, shape=shape)
obs[pp_id] = DynamicObstacle(obstacle_id=pp_id,
obstacle_type=ObstacleType.CAR,
obstacle_shape=shape,
initial_state=solution.trajectory.state_list[0],
prediction=prediction)
return obs
class CommonRoadSolutionReader:
"""Reads solution xml files created with the CommonRoadSolutionWriter"""
# TODO Add parser check
@classmethod
def open(cls, filepath: str) -> Solution:
"""
Opens and parses the Solution XML file located on the given path.
:param filepath: Path to the file.
:return: Solution
"""
tree = et.parse(filepath)
root_node = tree.getroot()
return cls._parse_solution(root_node)
@classmethod
def fromstring(cls, file: str) -> Solution:
"""
Parses the given Solution XML string.
:param file: xml file as string
:return: Solution
"""
root_node = et.fromstring(file)
return cls._parse_solution(root_node)
@classmethod
def _parse_solution(cls, root_node: et.Element) -> Solution:
""" Parses the Solution XML root node. """ # TODO
benchmark_id, date, computation_time, processor_name = cls._parse_header(root_node)
vehicle_ids, cost_ids, scenario_id = cls._parse_benchmark_id(benchmark_id)
pp_solutions = [cls._parse_planning_problem_solution(vehicle_ids[idx], cost_ids[idx], trajectory_node)
for idx, trajectory_node in enumerate(root_node)]
return Solution(scenario_id, pp_solutions, date, computation_time, processor_name)
@staticmethod
def _parse_header(root_node: et.Element) -> Tuple[str, Union[None, datetime], Union[None, float], Union[None, str]]:
""" Parses the header attributes for the given Solution XML root node. """
benchmark_id = root_node.get('benchmark_id')
if not benchmark_id:
SolutionException("Solution xml does not have a benchmark id!")
date = root_node.attrib.get('date', None) # None if not found
if date is not None:
try:
date = datetime.strptime(date, '%Y-%m-%dT%H:%M:%S')
except ValueError:
# backward compatibility with old solution files
date = datetime.strptime(date, '%Y-%m-%d')
computation_time = root_node.attrib.get('computation_time', None)
if computation_time is not None:
computation_time = float(computation_time)
processor_name = root_node.attrib.get('processor_name', None)
return benchmark_id, date, computation_time, processor_name
@classmethod
def _parse_planning_problem_solution(cls, vehicle_id: str, cost_id: str,
trajectory_node: et.Element) -> PlanningProblemSolution:
""" Parses PlanningProblemSolution from the given XML node. """
vehicle_model, vehicle_type = cls._parse_vehicle_id(vehicle_id)
if cost_id not in [cfunc.name for cfunc in CostFunction]:
raise SolutionReaderException("Invalid Cost ID: " + cost_id)
cost_function = CostFunction[cost_id]
pp_id, trajectory = cls._parse_trajectory(trajectory_node)
return PlanningProblemSolution(pp_id, vehicle_model, vehicle_type, cost_function, trajectory)
@classmethod
def _parse_trajectory(cls, trajectory_node: et.Element) -> Tuple[int, Trajectory]:
""" Parses Trajectory and planning problem id from the given XML node. """
if trajectory_node.tag not in [ttype.value for ttype in TrajectoryType]:
raise SolutionReaderException("Invalid Trajectory Type: " + trajectory_node.tag)
trajectory_type = TrajectoryType(trajectory_node.tag)
planning_problem_id = int(trajectory_node.get('planningProblem'))
state_list = [cls._parse_state(trajectory_type.state_type, state_node) for state_node in trajectory_node]
state_list = sorted(state_list, key=lambda state: state.time_step)
return planning_problem_id, Trajectory(initial_time_step=state_list[0].time_step, state_list=state_list)
@classmethod
def _parse_sub_element(cls, state_node: et.Element, name: str, as_float: bool = True) -> Union[float, int]:
""" Parses the sub elements from the given XML node. """
elem = state_node.find(name)
if elem is None:
raise SolutionReaderException("Element '%s' couldn't be found in the xml node!" % name)
value = float(elem.text) if as_float else int(elem.text)
return value
@classmethod
def _parse_state(cls, state_type: StateType, state_node: et.Element) -> TraceState:
""" Parses State from the given XML node. """
if not state_node.tag == state_type.value:
raise SolutionReaderException("Given xml node is not a '%s' node!" % state_type.value)
state_types = {StateType.MB: MBState, StateType.KS: KSState, StateType.PM: PMState, StateType.ST: STState,
StateType.Input: InputState, StateType.PMInput: PMInputState}
state_vals = {}
for mapping in list(zip(state_type.xml_fields, state_type.fields)):
xml_name = mapping[0]
field_name = mapping[1]
if isinstance(xml_name, tuple):
state_vals[field_name] = np.array([cls._parse_sub_element(state_node, name) for name in xml_name])
else:
state_vals[field_name] = cls._parse_sub_element(state_node, xml_name, as_float=(not xml_name == 'time'))
return state_types[state_type](**state_vals)
@staticmethod
def _parse_benchmark_id(benchmark_id: str) -> (List[str], List[str], str):
""" Parses the given benchmark id string. """
segments = benchmark_id.replace(' ', '').split(':')
if len(segments) != 4:
raise SolutionReaderException("Invalid Benchmark ID: " + benchmark_id)
vehicle_model_ids = re.sub(r'[\[\]]', '', segments[0]).split(',')
cost_function_ids = re.sub(r'[\[\]]', '', segments[1]).split(',')
scenario_id = ScenarioID.from_benchmark_id(segments[2], segments[3])
return vehicle_model_ids, cost_function_ids, scenario_id
@staticmethod
def _parse_vehicle_id(vehicle_id: str) -> Tuple[VehicleModel, VehicleType]:
""" Parses the given vehicle id string. """
if not len(vehicle_id) == 3 and not len(vehicle_id) == 4:
raise SolutionReaderException("Invalid Vehicle ID: " + vehicle_id)
if not vehicle_id[:-1] in [vmodel.name for vmodel in VehicleModel]:
raise SolutionReaderException("Invalid Vehicle ID: " + vehicle_id)
if not int(vehicle_id[-1]) in [vtype.value for vtype in VehicleType]:
raise SolutionReaderException("Invalid Vehicle ID: " + vehicle_id)
return VehicleModel[vehicle_id[:-1]], VehicleType(int(vehicle_id[-1]))
class CommonRoadSolutionWriter:
def __init__(self, solution: Solution):
"""
Creates the xml file for the given solution that can be dumped as string, or written to file later on.
:param solution: Solution.
"""
assert isinstance(solution, Solution)
self.solution = solution
self._solution_root = self._serialize_solution(self.solution)
@staticmethod
def _get_processor_name() -> Union[str, None]:
# TODO: compare cpu names with the list of cpu names used on web server
delete_from_cpu_name = ['(R)', '(TM)']
def strip_substrings(string: str):
for del_string in delete_from_cpu_name:
string = string.replace(del_string, '')
return string
if platform.system() == "Windows":
name_tmp = platform.processor()
for del_str in delete_from_cpu_name:
name_tmp.replace(del_str, '')
return strip_substrings(name_tmp)
elif platform.system() == "Darwin":
os.environ['PATH'] = os.environ['PATH'] + os.pathsep + '/usr/sbin'
command = "sysctl -n machdep.cpu.brand_string"
return str(subprocess.check_output(command, shell=True).strip())
elif platform.system() == "Linux":
command = "cat /proc/cpuinfo"
all_info = str(subprocess.check_output(command, shell=True).strip())
for line in all_info.split("\\n"):
if "model name" in line:
name_tmp = re.sub(".*model name.*: ", "", line, 1)
return strip_substrings(name_tmp)
return None
@classmethod
def _serialize_solution(cls, solution: Solution) -> et.Element:
""" Serializes the given solution. """
root_node = cls._create_root_node(solution)
for pp_solution in solution.planning_problem_solutions:
trajectory_node = cls._create_trajectory_node(pp_solution.trajectory_type,
pp_solution.planning_problem_id,
pp_solution.trajectory)
root_node.append(trajectory_node)
return root_node
@classmethod
def _create_root_node(cls, solution: Solution) -> et.Element:
""" Creates the root node of the Solution XML. """
root_node = et.Element('CommonRoadSolution')
root_node.set('benchmark_id', solution.benchmark_id)
if solution.computation_time is not None:
root_node.set('computation_time', str(solution.computation_time))
if solution.date is not None:
root_node.set('date', solution.date.strftime('%Y-%m-%dT%H:%M:%S'))
processor_name = cls._get_processor_name() if solution.processor_name == 'auto' else solution.processor_name
if processor_name is not None:
root_node.set('processor_name', processor_name)
return root_node
@classmethod
def _create_trajectory_node(cls, trajectory_type: TrajectoryType, pp_id: int, trajectory: Trajectory) -> et.Element:
""" Creates the Trajectory XML Node for the given trajectory. """
trajectory_node = et.Element(trajectory_type.value)
trajectory_node.set('planningProblem', str(pp_id))
for state in trajectory.state_list:
state_node = cls._create_state_node(trajectory_type.state_type, state)
trajectory_node.append(state_node)
return trajectory_node
@classmethod
def _create_sub_element(cls, name: str, value: Union[float, int]) -> et.Element:
""" Creates an XML element for the given value. """
element = et.Element(name)
element.text = str(np.float64(value) if isinstance(value, float) else value)
return element
@classmethod
def _create_state_node(cls, state_type: StateType, state: TraceState) -> et.Element:
""" Creates XML nodes for the States of the Trajectory. """
state_node = et.Element(state_type.value)
for mapping in list(zip(state_type.xml_fields, state_type.fields)):
xml_name = mapping[0]
state_val = getattr(state, mapping[1])
if isinstance(xml_name, tuple):
for idx, name in enumerate(xml_name):
state_node.append(cls._create_sub_element(name, state_val[idx]))
else:
state_node.append(cls._create_sub_element(xml_name, state_val))
return state_node
def dump(self, pretty: bool = True) -> str:
"""
Dumps the Solution XML as string.
:param pretty: If set to true, prettifies the xml string.
:return: string - Solution XML as string.
"""
rough_string = et.tostring(self._solution_root, encoding='utf-8')
if not pretty:
return rough_string
parsed = minidom.parseString(rough_string)
return parsed.toprettyxml(indent=" ")
def write_to_file(self, output_path: str = './', filename: str = None,
overwrite: bool = False, pretty: bool = True):
"""
Writes the Solution XML to a file.
:param output_path: Output dir where the Solution XML file should be written to. \
Writes to the same folder where it is called from if not specified.
:param filename: Name of the Solution XML file. If not specified, sets the name as 'solution_BENCHMARKID.xml' \
where the BENCHMARKID is the benchmark_id of the solution.
:param overwrite: If set to True, overwrites the file if it already exists.
:param pretty: If set to True, prettifies the Solution XML string before writing to file.
"""
filename = filename if filename is not None else 'solution_%s.xml' % self.solution.benchmark_id
fullpath = os.path.join(output_path, filename) if filename is not None else os.path.join(output_path, filename)
if not os.path.exists(os.path.dirname(fullpath)):
raise NotADirectoryError("Directory %s does not exist." % os.path.dirname(fullpath))
if os.path.exists(fullpath) and not overwrite:
raise FileExistsError("File %s already exists. If you want to overwrite it set overwrite=True." % fullpath)
with open(fullpath, 'w') as f:
f.write(self.dump(pretty))