Unverified Commit 7407ee71 authored by Zharkov Kirill's avatar Zharkov Kirill Committed by GitHub
Browse files

Merge pull request #115 from aimclub/new_criteria

New criteria and events
Showing with 757 additions and 209 deletions
+757 -209
......@@ -15,7 +15,7 @@ from rostok.library.rule_sets.ruleset_old_style import create_rules
# create rule vocabulary
rule_vocabul = create_rules()
# create blueprint for object to grasp
grasp_object_blueprint = get_object_parametrized_sphere(0.2, 1)
grasp_object_blueprint = get_object_parametrized_sphere(0.5, 1)
# create reward counter using run setup function
control_optimizer = config_with_standard(grasp_object_blueprint)
# Initialize MCTS
......
MAX_NUMBER_RULES = 10
MAX_NUMBER_RULES = 15
BASE_ITERATION_LIMIT = 20
BASE_ITERATION_LIMIT = 3
BASE_ITERATION_LIMIT_GRAPH = 200
ITERATION_REDUCTION_TIME = 0.7
TIME_CRITERION_WEIGHT = 3
TIME_CRITERION_WEIGHT = 1
FORCE_CRITERION_WEIGHT = 1
OBJECT_COG_CRITERION_WEIGHT = 2
LATE_FORCE_CRITERION_WEIGHT = 4
LATE_FORCE_AMOUNT_CRITERION_WEIGHT = 5
INSTANT_CONTACTING_LINK_CRITERION_WEIGHT = 2
CONTACTING_LINK_CALCULATION_TIME = 2
OBJECT_COG_CRITERION_WEIGHT = 1
INSTANT_FORCE_CRITERION_WEIGHT = 1
INSTANT_CONTACTING_LINK_CRITERION_WEIGHT = 1
GRASP_TIME_CRITERION_WEIGHT = 1
FINAL_POSITION_CRITERION_WEIGHT = 10
REFERENCE_DISTANCE = 20
CONTROL_OPTIMIZATION_BOUNDS = (6, 15)
CONTROL_OPTIMIZATION_ITERATION = 3
TIME_STEP_SIMULATION = 0.0025
TIME_SIMULATION = 3
GRASP_TIME = 5
FORCE_TEST_TIME = 5
TIME_SIMULATION = 10
FLAG_TIME_NO_CONTACT = 1
FLAG_TIME_SLIPOUT = 0.8
FLAG_TIME_SLIPOUT = 0.5
FLAG_FLYING_APART = 10
\ No newline at end of file
......@@ -5,10 +5,12 @@ import hyperparameters as hp
from rostok.block_builder_chrono.block_builder_chrono_api import \
ChronoBlockCreatorInterface as creator
from rostok.criterion.criterion_calculation import (ForceCriterion, InstantContactingLinkCriterion,
LateForceAmountCriterion, LateForceCriterion,
ObjectCOGCriterion, SimulationReward,
TimeCriterion)
from rostok.criterion.simulation_flags import (FlagContactTimeOut, FlagFlyingApart, FlagSlipout)
InstantForceCriterion,
InstantObjectCOGCriterion, SimulationReward,
TimeCriterion, FinalPositionCriterion,
GraspTimeCriterion)
from rostok.criterion.simulation_flags import (EventContact, EventContactTimeOut, EventFlyingApart,
EventGrasp, EventSlipOut, EventStopExternalForce)
from rostok.simulation_chrono.simulation_scenario import ConstTorqueGrasp
from rostok.trajectory_optimizer.control_optimizer import (CalculatorWithGraphOptimization,
CalculatorWithOptimizationDirect)
......@@ -19,21 +21,43 @@ def config_with_standard(grasp_object_blueprint):
simulation_manager = ConstTorqueGrasp(hp.TIME_STEP_SIMULATION, hp.TIME_SIMULATION)
simulation_manager.grasp_object_callback = lambda: creator.create_environment_body(
grasp_object_blueprint)
simulation_manager.add_flag(FlagContactTimeOut(hp.FLAG_TIME_NO_CONTACT))
simulation_manager.add_flag(FlagFlyingApart(hp.FLAG_FLYING_APART))
simulation_manager.add_flag(FlagSlipout(hp.FLAG_TIME_SLIPOUT))
event_contact = EventContact()
simulation_manager.add_event(event_contact)
event_timeout = EventContactTimeOut(hp.FLAG_TIME_NO_CONTACT, event_contact)
simulation_manager.add_event(event_timeout)
event_flying_apart = EventFlyingApart(hp.FLAG_FLYING_APART)
simulation_manager.add_event(event_flying_apart)
event_slipout = EventSlipOut(hp.FLAG_TIME_SLIPOUT)
simulation_manager.add_event(event_slipout)
event_grasp = EventGrasp(
grasp_limit_time=hp.GRASP_TIME,
contact_event=event_contact,
verbosity=0,
)
simulation_manager.add_event(event_grasp)
event_stop_external_force = EventStopExternalForce(grasp_event=event_grasp,
force_test_time=hp.FORCE_TEST_TIME)
simulation_manager.add_event(event_stop_external_force)
#create criterion manager
simulation_rewarder = SimulationReward(1)
simulation_rewarder = SimulationReward(verbosity=0)
#create criterions and add them to manager
simulation_rewarder.add_criterion(TimeCriterion(hp.TIME_SIMULATION), hp.TIME_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(ForceCriterion(), hp.FORCE_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(ObjectCOGCriterion(), hp.OBJECT_COG_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(LateForceCriterion(0.5, 3), hp.LATE_FORCE_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(LateForceAmountCriterion(0.5),
hp.LATE_FORCE_AMOUNT_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(TimeCriterion(hp.TIME_SIMULATION, event_timeout, event_grasp),
hp.TIME_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(ForceCriterion(event_timeout), hp.FORCE_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(InstantContactingLinkCriterion(event_grasp),
hp.INSTANT_CONTACTING_LINK_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(InstantForceCriterion(event_grasp),
hp.INSTANT_FORCE_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(InstantObjectCOGCriterion(event_grasp),
hp.OBJECT_COG_CRITERION_WEIGHT)
n_steps = int(hp.TIME_SIMULATION / hp.TIME_STEP_SIMULATION)
print(n_steps)
simulation_rewarder.add_criterion(GraspTimeCriterion(event_grasp, n_steps),
hp.GRASP_TIME_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(
InstantContactingLinkCriterion(hp.CONTACTING_LINK_CALCULATION_TIME),
hp.INSTANT_CONTACTING_LINK_CRITERION_WEIGHT)
FinalPositionCriterion(hp.REFERENCE_DISTANCE, event_grasp, event_slipout),
hp.FINAL_POSITION_CRITERION_WEIGHT)
control_optimizer = CalculatorWithOptimizationDirect(simulation_manager, simulation_rewarder,
hp.CONTROL_OPTIMIZATION_BOUNDS,
......@@ -47,21 +71,43 @@ def config_with_standard_graph(grasp_object_blueprint, torque_dict):
simulation_manager = ConstTorqueGrasp(hp.TIME_STEP_SIMULATION, hp.TIME_SIMULATION)
simulation_manager.grasp_object_callback = lambda: creator.create_environment_body(
grasp_object_blueprint)
simulation_manager.add_flag(FlagContactTimeOut(hp.FLAG_TIME_NO_CONTACT))
simulation_manager.add_flag(FlagFlyingApart(hp.FLAG_FLYING_APART))
simulation_manager.add_flag(FlagSlipout(hp.FLAG_TIME_SLIPOUT))
event_contact = EventContact()
simulation_manager.add_event(event_contact)
event_timeout = EventContactTimeOut(hp.FLAG_TIME_NO_CONTACT, event_contact)
simulation_manager.add_event(event_timeout)
event_flying_apart = EventFlyingApart(hp.FLAG_FLYING_APART)
simulation_manager.add_event(event_flying_apart)
event_slipout = EventSlipOut(hp.FLAG_TIME_SLIPOUT)
simulation_manager.add_event(event_slipout)
event_grasp = EventGrasp(
grasp_limit_time=hp.GRASP_TIME,
contact_event=event_contact,
verbosity=0,
)
simulation_manager.add_event(event_grasp)
event_stop_external_force = EventStopExternalForce(grasp_event=event_grasp,
force_test_time=hp.FORCE_TEST_TIME)
simulation_manager.add_event(event_stop_external_force)
#create criterion manager
simulation_rewarder = SimulationReward()
simulation_rewarder = SimulationReward(verbosity=0)
#create criterions and add them to manager
simulation_rewarder.add_criterion(TimeCriterion(hp.TIME_SIMULATION), hp.TIME_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(ForceCriterion(), hp.FORCE_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(ObjectCOGCriterion(), hp.OBJECT_COG_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(LateForceCriterion(0.5, 3), hp.LATE_FORCE_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(LateForceAmountCriterion(0.5),
hp.LATE_FORCE_AMOUNT_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(TimeCriterion(hp.TIME_SIMULATION, event_timeout, event_grasp),
hp.TIME_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(ForceCriterion(event_timeout), hp.FORCE_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(InstantContactingLinkCriterion(event_grasp),
hp.INSTANT_CONTACTING_LINK_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(InstantForceCriterion(event_grasp),
hp.INSTANT_FORCE_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(InstantObjectCOGCriterion(event_grasp),
hp.OBJECT_COG_CRITERION_WEIGHT)
n_steps = int(hp.TIME_SIMULATION / hp.TIME_STEP_SIMULATION)
print(n_steps)
simulation_rewarder.add_criterion(GraspTimeCriterion(event_grasp, n_steps),
hp.GRASP_TIME_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(
InstantContactingLinkCriterion(hp.CONTACTING_LINK_CALCULATION_TIME),
hp.INSTANT_CONTACTING_LINK_CRITERION_WEIGHT)
FinalPositionCriterion(hp.REFERENCE_DISTANCE, event_grasp, event_slipout),
hp.FINAL_POSITION_CRITERION_WEIGHT)
control_optimizer = CalculatorWithGraphOptimization(simulation_manager, simulation_rewarder,
torque_dict)
......@@ -71,30 +117,55 @@ def config_with_standard_graph(grasp_object_blueprint, torque_dict):
def config_with_standard_multiobject(grasp_object_blueprint, weights):
# configurate the simulation manager
simulation_manager = ConstTorqueGrasp(0.001, 3)
simulation_manager.add_flag(FlagContactTimeOut(1))
simulation_manager.add_flag(FlagFlyingApart(10))
simulation_manager.add_flag(FlagSlipout(0.8))
simulation_managers = []
simulation_manager = ConstTorqueGrasp(hp.TIME_STEP_SIMULATION, hp.TIME_SIMULATION)
object_callback = [
(lambda obj=obj: creator.create_environment_body(obj)) for obj in grasp_object_blueprint
]
simulation_managers = []
for k in range(len(grasp_object_blueprint)):
simulation_managers.append((deepcopy(simulation_manager), weights[k]))
simulation_managers[-1][0].grasp_object_callback = object_callback[k]
event_contact = EventContact()
event_timeout = EventContactTimeOut(hp.FLAG_TIME_NO_CONTACT, event_contact)
event_flying_apart = EventFlyingApart(hp.FLAG_FLYING_APART)
event_slipout = EventSlipOut(hp.FLAG_TIME_SLIPOUT)
event_grasp = EventGrasp(
grasp_limit_time=hp.GRASP_TIME,
contact_event=event_contact,
verbosity=0,
)
event_stop_external_force = EventStopExternalForce(grasp_event=event_grasp,
force_test_time=hp.FORCE_TEST_TIME)
for manager in simulation_managers:
manager[0].add_event(event_contact)
manager[0].add_event(event_timeout)
manager[0].add_event(event_flying_apart)
manager[0].add_event(event_grasp)
manager[0].add_event(event_stop_external_force)
#create criterion manager
simulation_rewarder = SimulationReward()
simulation_rewarder = SimulationReward(verbosity=0)
#create criterions and add them to manager
simulation_rewarder.add_criterion(TimeCriterion(3), 1)
simulation_rewarder.add_criterion(ForceCriterion(), 1)
simulation_rewarder.add_criterion(ObjectCOGCriterion(), 1)
simulation_rewarder.add_criterion(LateForceCriterion(0.5, 3), 1)
simulation_rewarder.add_criterion(LateForceAmountCriterion(0.5), 1)
simulation_rewarder.add_criterion(TimeCriterion(hp.TIME_SIMULATION, event_timeout, event_grasp),
hp.TIME_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(ForceCriterion(event_timeout), hp.FORCE_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(InstantContactingLinkCriterion(event_grasp),
hp.INSTANT_CONTACTING_LINK_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(InstantForceCriterion(event_grasp),
hp.INSTANT_FORCE_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(InstantObjectCOGCriterion(event_grasp),
hp.OBJECT_COG_CRITERION_WEIGHT)
n_steps = int(hp.TIME_SIMULATION / hp.TIME_STEP_SIMULATION)
print(n_steps)
simulation_rewarder.add_criterion(GraspTimeCriterion(event_grasp, n_steps),
hp.GRASP_TIME_CRITERION_WEIGHT)
simulation_rewarder.add_criterion(
FinalPositionCriterion(hp.REFERENCE_DISTANCE, event_grasp, event_slipout),
hp.FINAL_POSITION_CRITERION_WEIGHT)
control_optimizer = CalculatorWithOptimizationDirect(simulation_managers, simulation_rewarder,
(3, 15), 1)
hp.CONTROL_OPTIMIZATION_BOUNDS,
hp.CONTROL_OPTIMIZATION_ITERATION)
return control_optimizer
\ No newline at end of file
......@@ -15,7 +15,38 @@ from rostok.library.obj_grasp.objects import (get_obj_hard_mesh_piramida,
from rostok.utils.pickle_save import load_saveable
def vis_top_n_mechs(report: MCTSSaveable, n: int, object: EnvironmentBodyBlueprint):
def vis_top_n_mechs(n: int, object: EnvironmentBodyBlueprint):
root = Tk()
root.geometry("400x300")
root.title("Report loader")
label = ttk.Label(text="Choose path to report!")
label.pack()
label_file = ttk.Label(text="Enter path to report:")
label_file.pack(anchor=NW, padx=8)
entry_browse = ttk.Entry(width=30, font=12)
entry_browse.pack(anchor=NW, padx=8)
entry_browse.place(x=8, y=40)
report_path = None
def func_browse():
path = filedialog.askopenfilename()
nonlocal entry_browse
entry_browse.delete(first=0, last=END)
entry_browse.insert(0, path)
def func_add():
nonlocal report_path
report_path = Path(entry_browse.get())
nonlocal root
root.destroy()
btn_browse = ttk.Button(text="browse", command=func_browse) # создаем кнопку из пакета ttk
btn_browse.place(x=300, y=40)
btn_add = ttk.Button(text="add report", command=func_add) # создаем кнопку из пакета ttk
btn_add.place(x=300, y=85)
root.mainloop()
report = load_saveable(report_path)
graph_report = report.seen_graphs
control_optimizer = config_with_standard(grasp_object_blueprint)
simulation_rewarder = control_optimizer.rewarder
......@@ -86,6 +117,7 @@ def save_svg_mean_reward(name: str,
btn_finish.place(x=150, y=250)
root.mainloop()
reporter = []
for report_path in report_paths:
report = load_saveable(Path(report_path))
reporter.append(report)
......@@ -124,9 +156,7 @@ def save_svg_mean_reward(name: str,
if __name__ == "__main__":
grasp_object_blueprint = get_object_parametrized_sphere(0.4, 1)
grasp_object_blueprint = get_obj_hard_mesh_piramida()
report: OptimizedGraphReport = load_saveable(
Path(r"results\Reports_23y_06m_15d_03H_13M\MCTS_data.pickle"))
vis_top_n_mechs(report, 3, grasp_object_blueprint)
#grasp_object_blueprint = get_object_parametrized_sphere(0.4, 1)
grasp_object_blueprint = get_object_parametrized_sphere(0.5, 1)
vis_top_n_mechs(3, grasp_object_blueprint)
# save_svg_mean_reward( name = 'kek', objecy_name='sphere')
\ No newline at end of file
......@@ -64,7 +64,7 @@ class PrimitiveBodyBlueprint(BodyBlueprintType):
@dataclass
class EnvironmentBodyBlueprint(BodyBlueprintType):
shape: easy_body_shapes.ShapeTypes = easy_body_shapes.Box()
density: float = 1000.0
density: float = 100.0
material: Material = Material()
is_collide: bool = True
color: Optional[list[int]] = None
......
......@@ -7,146 +7,280 @@ import pychrono.core as chrono
from scipy.spatial import distance
from rostok.simulation_chrono.basic_simulation import SimulationResult
from rostok.criterion.simulation_flags import SimulationSingleEvent, EventContactTimeOut, EventGrasp, EventSlipOut
#Interface for criterions
class Criterion(ABC):
def calculate_reward(self, simulation_output:SimulationResult):
def calculate_reward(self, simulation_output: SimulationResult):
"""The function that returns the reward as a result of the simulation.
Args:
simulation_output (SimulationResult): the result of the simulation
"""
pass
class ForceCriterion(Criterion):
class TimeCriterion(Criterion):
"""Reward based on the time simulation if the grasp doesn't happen.
Attributes:
max_simulation_time (float): maximum simulation time
event_timeout (EventContactTimeOut): event of contact time out
event_grasp (EventGrasp): event of object grasping
"""
def __init__(self, time: float, event_timeout: EventContactTimeOut, event_grasp: EventGrasp):
self.max_simulation_time = time
self.event_timeout = event_timeout
self.event_grasp = event_grasp
def calculate_reward(self, simulation_output: SimulationResult):
"""Return 1 for all robots that can grasp the object.
Return 0 if there is no contact. If the robot only touches the object and
than lose contact, return reward that increases with time of the contact
def calculate_reward(self, simulation_output:SimulationResult) -> float:
env_data = simulation_output.environment_final_ds
body_contacts: List[np.ndarray] = env_data.get_data("forces")[0]
force_modules = []
for data in body_contacts:
if data.size > 0:
total_force = np.zeros(3)
for force in data:
total_force += force[1]
force_module = np.linalg.norm(total_force)
# Cut the steps with huge forces
if force_module<100:
force_modules.append(force_module)
if len(force_modules) > 0:
return 1 / (1 + np.mean(np.array(force_modules)))
Returns:
float: reward of the criterion
"""
if self.event_grasp.state:
return 1
if self.event_timeout.state:
return 0
else:
time = simulation_output.time
if (time) > 0:
return (time)**2 / (self.max_simulation_time)**2
else:
return 0
class ForceCriterion(Criterion):
"""Reward based on the mean force of the robot and object contact.
Attributes:
event_timeout (EventContactTimeOut): event of contact time out
"""
def __init__(self, event_timeout: EventContactTimeOut):
self.event_timeout = event_timeout
def calculate_reward(self, simulation_output: SimulationResult) -> float:
"""Return 0 if there is no contact. For every step where object is in contact with robot
the total force is added to a list and the final reward is calculated using the mean
value of that list
Returns:
float: reward of the criterion
"""
if self.event_timeout.state:
return 0
else:
env_data = simulation_output.environment_final_ds
body_contacts: List[np.ndarray] = env_data.get_data("forces")[0]
force_modules = []
for data in body_contacts:
if data.size > 0:
total_force = np.zeros(3)
for force in data:
total_force += force[1]
force_module = np.linalg.norm(total_force)
# Cut the steps with huge forces
if force_module < 100:
force_modules.append(force_module)
class TimeCriterion(Criterion):
if len(force_modules) > 0:
return 1 / (1 + np.mean(np.array(force_modules)))
else:
return 0
def __init__(self, time):
self.max_simulation_time = time
def calculate_reward(self, simulation_output:SimulationResult):
time = simulation_output.time
if (time) > 0:
return (time)**2 / (self.max_simulation_time)**2
class InstantObjectCOGCriterion(Criterion):
"""Reward based on the distance between object COG and force centroid.
Attributes:
event_grasp (EventGrasp): event of object grasping
"""
def __init__(self, grasp_event: EventGrasp):
self.grasp_event = grasp_event
def calculate_reward(self, simulation_output: SimulationResult):
"""Calculate the reward based on distance between object COG and force centroid in the moment of grasp.
Returns:
float: reward of the criterion
"""
if self.grasp_event.state:
env_data = simulation_output.environment_final_ds
body_COG = env_data.get_data("COG")[0][self.grasp_event.step_n + 1]
body_outer_force_center = env_data.get_data("force_center")[0][self.grasp_event.step_n +
1]
if body_outer_force_center is np.nan:
print(body_COG, body_outer_force_center)
dist = distance.euclidean(body_COG, body_outer_force_center)
return 1 / (1 + dist)
else:
return 0
class ObjectCOGCriterion(Criterion):
class InstantForceCriterion(Criterion):
"""Criterion based on the std of force modules.
def calculate_reward(self, simulation_output:SimulationResult):
dist_list = []
env_data = simulation_output.environment_final_ds
body_COG = env_data.get_data("COG")[0] # List[Tuple[step_n, List[x,y,z]]]
body_outer_force_center = env_data.get_data("force_center")[0]
dist_list = []
for cog, force in zip(body_COG, body_outer_force_center):
if not force is np.nan:
dist_list.append(distance.euclidean(cog, force))
Attributes:
event_grasp (EventGrasp): event of object grasping
"""
if np.size(dist_list) > 0:
cog_crit = 1 / (1 + np.mean(dist_list))
def __init__(self, grasp_event: EventGrasp):
self.grasp_event = grasp_event
def calculate_reward(self, simulation_output: SimulationResult):
"""Calculate std of force modules in the grasp moment and calculate reward using it.
Returns:
float: reward of the criterion
"""
if self.grasp_event.state:
env_data = simulation_output.environment_final_ds
body_contacts: np.ndarray = env_data.get_data("forces")[0][self.grasp_event.step_n + 1]
if len(body_contacts) > 0:
forces = []
for force in body_contacts:
forces.append(np.linalg.norm(force))
return 1 / (1 + np.std(forces))
else:
return 0
else:
cog_crit = 0
return 0
return cog_crit
class InstantContactingLinkCriterion(Criterion):
"""Criterion based on the percentage of contacting links.
class LateForceCriterion(Criterion):
Attributes:
event_grasp (EventGrasp): event of object grasping
"""
def __init__(self, cut_off, force_threshold):
self.cut_off = cut_off
self.force_threshold = force_threshold
def __init__(self, grasp_event: EventGrasp):
self.grasp_event = grasp_event
def calculate_reward(self, simulation_output:SimulationResult):
env_data = simulation_output.environment_final_ds
body_contacts = env_data.get_data("forces")[0]
step_cutoff = int(simulation_output.n_steps * self.cut_off)
body_contacts_cut = body_contacts[step_cutoff::]
counter = 0
for data in body_contacts_cut:
total_force_module = 0
for contact in data:
total_force_module += np.linalg.norm(contact[1])
def calculate_reward(self, simulation_output: SimulationResult):
"""The reward is the fraction of the links that contacts with object in the grasp moment.
Returns:
float: reward of the criterion
"""
if self.grasp_event.state:
robot_data = simulation_output.robot_final_ds
robot_contacts = robot_data.get_data("n_contacts")
n_bodies = len(robot_contacts.keys())
contacting_bodies = 0
for _, contacts in robot_contacts.items():
if contacts[self.grasp_event.step_n + 1] > 0:
contacting_bodies += 1
return contacting_bodies / n_bodies
else:
return 0
if total_force_module > self.force_threshold:
counter += 1
if len(body_contacts_cut) > 0:
return counter / (len(body_contacts_cut))
else: return 0
class GraspTimeCriterion(Criterion):
"""Criterion based on the time before grasp.
Attributes:
event_grasp (EventGrasp): event of object grasping
total_steps (total_steps): the amount of the possible steps
"""
class LateForceAmountCriterion(Criterion):
def __init__(self, grasp_event: EventGrasp, total_steps: int):
self.grasp_event = grasp_event
self.total_steps = total_steps
def __init__(self, cut_off):
self.cut_off = cut_off
def calculate_reward(self, simulation_output: SimulationResult):
"""Reward depends on the speed of the grasp.
def calculate_reward(self, simulation_output:SimulationResult):
env_data = simulation_output.environment_final_ds
body_contacts = env_data.get_data("n_contacts")[0]
step_cutoff = int(simulation_output.n_steps * self.cut_off)
counter = 0
body_contacts_cut = body_contacts[step_cutoff::]
for data in body_contacts_cut:
if not data is np.nan:
counter += data
Returns:
float: reward of the criterion
"""
if self.grasp_event.state:
return (self.total_steps - self.grasp_event.step_n) / self.total_steps
else:
return 0
if len(body_contacts_cut) > 0:
return counter / (len(body_contacts_cut))
else: return 0
class InstantContactingLinkCriterion(Criterion):
def __init__(self, time):
self.reference_time = time
class FinalPositionCriterion(Criterion):
"""Criterion based on the position change after applying testing force.
Attributes:
reference_distance (float): reference distance for the criterion
grasp_event (EventGrasp): event of object grasping
slipout_event (EventSlipOut): event of object slip out
"""
def __init__(self, reference_distance: float, grasp_event: EventGrasp,
slipout_event: EventSlipOut):
self.reference_distance = reference_distance
self.grasp_event = grasp_event
self.slipout_event = slipout_event
def calculate_reward(self, simulation_output: SimulationResult):
time_vector = simulation_output.time_vector
pos = bisect_left(time_vector, self.reference_time)
if pos == len(time_vector):
"""The reward is 1 - dist(position in the grasp moment, position in the final moment)/(reference distance)
Returns:
float: reward of the criterion
"""
if self.grasp_event.state and not self.slipout_event.state:
env_data = simulation_output.environment_final_ds
grasp_pos = env_data.get_data("COG")[0][self.grasp_event.step_n + 1]
final_pos = env_data.get_data("COG")[0][-1]
dist = distance.euclidean(grasp_pos, final_pos)
if dist <= self.reference_distance:
return 1 - dist / self.reference_distance
else:
return 0
else:
return 0
robot_data = simulation_output.robot_final_ds
body_contacts = robot_data.get_data("n_contacts")
n_bodies = len(body_contacts.keys())
contacting_bodies = 0
for body, contacts in body_contacts.items():
if contacts[pos] > 0:
contacting_bodies += 1
return contacting_bodies / n_bodies
class SimulationReward:
"""Aggregate criterions and weights to calculate reward.
Attributes:
criteria (List[Criterion]): list of criterions
weights (List[float]): criterion weights
verbosity (int): parameter to control console output
"""
def __init__(self, verbosity = 0) -> None:
def __init__(self, verbosity=0) -> None:
self.criteria: List[Criterion] = []
self.weights: List[float] = []
self.verbosity = verbosity
def add_criterion(self, citerion: Criterion, weight: float):
"""Add criterion and weight to the lists.
Args:
citerion (Criterion): new criterion
weight (float): weight of the new criterion
"""
self.criteria.append(citerion)
self.weights.append(weight)
def calculate_reward(self, simulation_output):
"""Calculate all rewards and return weighted sum of them.
Args:
simulation_output (_type_): the results of the simulation
Returns:
float: total reward
"""
partial_rewards = []
for criterion in self.criteria:
partial_rewards.append(criterion.calculate_reward(simulation_output))
......
from abc import ABC
from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Tuple, Union
from enum import Enum
import numpy as np
import pychrono.core as chrono
from rostok.virtual_experiment.sensors import DataStorage, Sensor
class FlagStopSimualtions(ABC):
def __init__(self):
class EventCommands(Enum):
"""Commands available for the events. The simulation class handles these commands"""
STOP = 0
CONTINUE = 1
ACTIVATE = 2
class SimulationSingleEvent(ABC):
""" The abstract class for the event that can occur during the simulation only once.
At each step of the simulation the event is checked using the current states of the sensors.
step_n is a single value because the event can occur only once in simulation.
Attributes:
state (bool): event occurrence flag
step_n (int): the step of the event occurrence
verbosity (int): the parameter that controls the console output of the class methods
"""
def __init__(self, verbosity=0):
self.state = False
self.step_n = None
self.verbosity = verbosity
def reset(self):
"""Reset the values of the attributes for the new simulation."""
def reset_flag(self):
self.state = False
self.step_n = None
@abstractmethod
def event_check(self, current_time: float, step_n: int, robot_data, env_data):
"""Simulation calls that method each step to check if the event occurred.
Args:
current_time (float): time from the start of the simulation
step_n (int): step number of the simulation
robot_data (_type_): current state of the robot sensors
env_data (_type_): current state of the environment sensors
"""
class EventContact(SimulationSingleEvent):
"""Event of contact between robot and object
"""
def event_check(self, current_time: float, step_n: int, robot_data, env_data):
if env_data.get_amount_contacts()[0] > 0:
self.state = True
self.step_n = step_n
return EventCommands.CONTINUE
return EventCommands.CONTINUE
def update_state(self, current_time, robot_data, env_data):
pass
class EventContactTimeOut(SimulationSingleEvent):
"""Event that occurs if the robot doesn't contact with body during the reference time from the start of the simulation.
class FlagFlyingApart(FlagStopSimualtions):
def __init__(self, max_distance:float):
Attributes:
reference_time (float): the moment of time where the simulation is interrupted
if there is no contact with the body
contact (bool): the flag that determines if there was a contact with body
"""
def __init__(self, ref_time: float, contact_event: EventContact):
super().__init__()
self.reference_time = ref_time
self.contact_event: EventContact = contact_event
def event_check(self, current_time: float, step_n: int, robot_data: Sensor, env_data: Sensor):
"""Return STOP if the time exceeds the reference time and there was no contact with body.
Returns:
EventCommands: return a command for simulation
"""
if self.contact_event.state:
return EventCommands.CONTINUE
if current_time > self.reference_time:
self.state = True
self.step_n = step_n
return EventCommands.STOP
return EventCommands.CONTINUE
class EventFlyingApart(SimulationSingleEvent):
"""The event that stops simulation if the robot parts have flown apart.
Attributes::
max_distance (float): the max distance for robot parts
"""
def __init__(self, max_distance: float):
super().__init__()
self.max_distance = max_distance
def update_state(self, current_time, robot_data:Sensor, env_data:Sensor):
def event_check(self, current_time: float, step_n: int, robot_data: Sensor, env_data: Sensor):
"""Return STOP if the current position of a part is max_distance away from the robot base body.
Returns:
EventCommands: return a command for simulation
"""
trajectory_points = robot_data.get_body_trajectory_point()
# It takes the position of the first block in the list, that should be the base body
base_position = trajectory_points[next(iter(trajectory_points))]
for block in trajectory_points.values():
position = block
if np.linalg.norm(np.array(base_position) - np.array(position)) > self.max_distance:
self.state = True
break
self.step_n = step_n
return EventCommands.STOP
return EventCommands.CONTINUE
class EventSlipOut(SimulationSingleEvent):
"""The event that stops simulation if the body slips out from the grasp after the contact.
Attributes:
time_last_contact (float): time of last contact of robot and body
reference_time (float): time of contact loss until the stop of the simulation
"""
class FlagSlipout(FlagStopSimualtions):
def __init__(self, ref_time):
super().__init__()
self.time_last_contact = None
self.reference_time = ref_time
def reset_flag(self):
def reset(self):
self.time_last_contact = None
super().reset_flag()
super().reset()
def update_state(self, current_time, robot_data:Sensor, env_data:Sensor):
def event_check(self, current_time: float, step_n: int, robot_data: Sensor, env_data: Sensor):
"""Return STOP if the body and mech lose contact for the reference time.
Returns:
EventCommands: return a command for simulation
"""
contact = env_data.get_amount_contacts()[0] > 0
if contact:
self.time_last_contact = current_time
self.state = False
return EventCommands.CONTINUE
if (not self.time_last_contact is None):
if current_time - self.time_last_contact > self.reference_time:
self.step_n = step_n
self.state = True
return EventCommands.STOP
return EventCommands.CONTINUE
class EventGrasp(SimulationSingleEvent):
"""Event that activates the force if
Attributes:
grasp_steps (int): the amount of consecutive steps body is not moving
grasp_time (float): the moment of the grasp event
contact (bool): the flag of body and object first contact
activation_code (int): the activation code for ACTIVATE command
grasp_limit_time (float): the time limit for the grasp event
force_test_time (float): the time period of the force test of the grasp
"""
def __init__(self, grasp_limit_time: float, contact_event: EventContact, verbosity: int = 0):
super().__init__(verbosity)
self.grasp_steps: int = 0
self.grasp_time: Optional[float] = None
self.grasp_limit_time = grasp_limit_time
self.contact_event: EventContact = contact_event
def reset(self):
super().reset()
self.grasp_steps = 0
self.grasp_time = None
def check_grasp_timeout(self, current_time):
if current_time > self.grasp_limit_time:
return True
else:
if self.time_last_contact is None:
self.state = False
else:
if current_time - self.time_last_contact > self.reference_time:
self.state = True
else:
self.state = False
class FlagContactTimeOut(FlagStopSimualtions):
def __init__(self, ref_time):
super().__init__()
self.reference_time = ref_time
self.contact = False
return False
def reset_flag(self):
super().reset_flag()
self.contact = False
def check_grasp_current_step(self, env_data: Sensor):
obj_velocity = np.linalg.norm(np.array(env_data.get_velocity()[0]))
if obj_velocity <= 0.01 and env_data.get_amount_contacts()[0] >= 2:
self.grasp_steps += 1
else:
self.grasp_steps = 0
def event_check(self, current_time: float, step_n: int, robot_data: Sensor, env_data: Sensor):
"""Return ACTIVATE if the body was in contact with the robot and after that at some
point doesn't move for at least 10 steps. Return STOP if the grasp didn't occur during grasp_limit_time.
Returns:
EventCommands: return a command for simulation
"""
if self.check_grasp_timeout(current_time):
return EventCommands.STOP
if self.contact_event.state:
self.check_grasp_current_step(env_data)
if self.grasp_steps == 10:
self.state = True
self.step_n = step_n
self.grasp_time = current_time
if self.verbosity > 0:
print('Grasp event!', current_time)
def update_state(self, current_time, robot_data:Sensor, env_data:Sensor):
if not self.contact:
self.contact = env_data.get_amount_contacts()[0] > 0
return EventCommands.ACTIVATE
if not (self.contact or self.state):
if current_time > self.reference_time:
return EventCommands.CONTINUE
class EventStopExternalForce(SimulationSingleEvent):
def __init__(self, grasp_event: EventGrasp, force_test_time: float):
super().__init__()
self.grasp_event = grasp_event
self.force_test_time = force_test_time
def event_check(self, current_time: float, step_n: int, robot_data, env_data):
"""STOP simulation in force_test_time after the grasp."""
# self.grasp_event.grasp_time is None until the grasp event have been occurred.
# Therefore we use nested if operators.
if self.grasp_event.state:
if current_time > self.force_test_time + self.grasp_event.grasp_time:
self.state = True
self.step_n = step_n
return EventCommands.STOP
return EventCommands.CONTINUE
......@@ -16,6 +16,7 @@ def create_rules():
super_flat = PrimitiveBodyBlueprint(Box(3, 0.1, 3))
link = list(map(lambda x: PrimitiveBodyBlueprint(Box(0.1, x, 0.3)), length_link))
radial_move_values = [0.9, 1.05, 1.2]
radial_move_values = [0.65, 0.85, 1.05 ]
RADIAL_MOVES = list(map(lambda x: FrameTransform([x, 0, 0], [1, 0, 0, 0]), radial_move_values))
tan_move_values = [0.4, 0.6, 0.8]
MOVES_POSITIVE = list(map(lambda x: FrameTransform([0, 0, x], [1, 0, 0, 0]), tan_move_values))
......
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
import pychrono as chrono
......@@ -11,6 +11,8 @@ from rostok.control_chrono.controller import ConstController, ForceControllerTem
from rostok.graph_grammar.node import GraphGrammar
from rostok.virtual_experiment.robot_new import BuiltGraphChrono, RobotChrono
from rostok.virtual_experiment.sensors import DataStorage, Sensor
from rostok.criterion.simulation_flags import SimulationSingleEvent, EventCommands
import time
class SystemPreviewChrono:
......@@ -86,12 +88,39 @@ class SystemPreviewChrono:
@dataclass
class SimulationResult:
"""Data class to aggregate the output of the simulation.
Attributes:
time (float): the total simulation time
time_vector (List[float]): the vector of time steps
n_steps (int): the maximum possible number of steps
robot_final_ds (Optional[DataStorage]): final data store of the robot
environment_final_ds (Optional[DataStorage]): final data store of the environment"""
time: float = 0
time_vector: List[float] = field(default_factory=list)
n_steps = 0
robot_final_ds: Optional[DataStorage] = None
environment_final_ds: Optional[DataStorage] = None
def reduce_ending(self, step_n):
if self.robot_final_ds:
storage = self.robot_final_ds.main_storage
for key in storage:
key_storage = storage[key]
for key_2 in key_storage:
value = key_storage[key_2]
new_value = value[:step_n + 2:]
key_storage[key_2] = new_value
if self.environment_final_ds:
storage = self.environment_final_ds.main_storage
for key in storage:
key_storage = storage[key]
for key_2 in key_storage:
value = key_storage[key_2]
new_value = value[:step_n + 2:]
key_storage[key_2] = new_value
def reduce_nan(self):
if self.robot_final_ds:
storage = self.robot_final_ds.main_storage
......@@ -254,7 +283,7 @@ class RobotSimulationChrono():
number_of_steps: int,
step_length: float,
frame_update: int,
flag_container=None,
event_container: List[SimulationSingleEvent] = None,
visualize=False,
):
"""Execute a simulation.
......@@ -282,19 +311,21 @@ class RobotSimulationChrono():
for i in range(number_of_steps):
current_time = self.chrono_system.GetChTime()
self.simulate_step(step_length, current_time, i)
self.result.time_vector.append(self.chrono_system.GetChTime()) # TODO: timevector can be constructed from number_of_steps and time_step
self.result.time_vector.append(self.chrono_system.GetChTime(
)) # TODO: timevector can be constructed from number_of_steps and time_step
if vis:
vis.Run()
if i % frame_update == 0:
vis.BeginScene(True, True, chrono.ChColor(0.1, 0.1, 0.1))
vis.Render()
vis.EndScene()
if flag_container:
for flag in flag_container:
flag.update_state(current_time, self.robot.sensor, self.data_storage.sensor)
if flag_container:
stop_flag = sum([flag.state for flag in flag_container])
if event_container:
for event in event_container:
event_command = event.event_check(current_time, self.robot.sensor,
self.data_storage.sensor)
if event_command == EventCommands.STOP:
stop_flag = True
break
if stop_flag:
break
......@@ -308,3 +339,88 @@ class RobotSimulationChrono():
self.n_steps = number_of_steps
self.result.reduce_nan()
return self.result
class RobotSimulationWithForceTest(RobotSimulationChrono):
def __init__(self, delay=False, object_list: List[Tuple[ChronoEasyShapeObject, bool]] = []):
super().__init__(object_list)
self.delay_flag = delay
def activate(self, current_time):
self.force_torque_container.controller_list[0].start_time = current_time
def handle_single_events(self, event_container, current_time, step_n):
if event_container is None:
return False
for event in event_container:
if not event.state:
event_command = event.event_check(current_time, step_n, self.robot.sensor,
self.data_storage.sensor)
if event_command == EventCommands.STOP:
return True
elif event_command == EventCommands.ACTIVATE:
self.activate(current_time)
return False
def simulate(
self,
number_of_steps: int,
step_length: float,
frame_update: int,
event_container=None,
visualize=False,
):
"""Execute a simulation.
Args:
number_of_steps(int): total number of steps in the simulation
step_length (float): the time length of a step
frame_update (int): rate of visualization update
flag_container: container of flags that controls simulation
visualize (bool): determine if run the visualization """
self.initialize(number_of_steps)
vis = None
if visualize:
vis = chronoirr.ChVisualSystemIrrlicht()
vis.AttachSystem(self.chrono_system)
vis.SetWindowSize(1024, 768)
vis.SetWindowTitle('Grab demo')
vis.Initialize()
vis.AddCamera(chrono.ChVectorD(1.5, 3, -4))
vis.AddTypicalLights()
vis.EnableCollisionShapeDrawing(True)
stop_flag = False
self.result.time_vector = [0]
for i in range(number_of_steps):
current_time = self.chrono_system.GetChTime()
self.simulate_step(step_length, current_time, i)
self.result.time_vector.append(self.chrono_system.GetChTime())
if vis:
vis.Run()
if i % frame_update == 0:
vis.BeginScene(True, True, chrono.ChColor(0.1, 0.1, 0.1))
vis.Render()
vis.EndScene()
stop_flag = self.handle_single_events(event_container, current_time, i)
if stop_flag:
break
# just to slow down the simulation
if self.delay_flag:
time.sleep(0.0001)
if visualize:
vis.GetDevice().closeDevice()
self.result.environment_final_ds = self.data_storage
self.result.robot_final_ds = self.robot.data_storage
self.result.time = self.chrono_system.GetChTime()
self.n_steps = number_of_steps
self.result.reduce_ending(i)
return self.result
\ No newline at end of file
......@@ -4,11 +4,12 @@ from typing import Dict, List, Optional, Tuple
import pychrono as chrono
import numpy as np
from rostok.criterion.simulation_flags import FlagStopSimualtions
from rostok.criterion.simulation_flags import SimulationSingleEvent
from rostok.graph_grammar.node import GraphGrammar
from rostok.simulation_chrono.basic_simulation import RobotSimulationChrono
from rostok.simulation_chrono.basic_simulation import RobotSimulationChrono, RobotSimulationWithForceTest
from rostok.virtual_experiment.sensors import (SensorCalls, SensorObjectClassification)
from rostok.simulation_chrono.simulation_utils import set_covering_sphere_based_position
from rostok.control_chrono.controller import ConstController, SinControllerChrono, YaxisShaker
class ParametrizedSimulation:
......@@ -26,23 +27,25 @@ class ConstTorqueGrasp(ParametrizedSimulation):
def __init__(self, step_length, simulation_length) -> None:
super().__init__(step_length, simulation_length)
self.grasp_object_callback = None
self.flag_container: List[FlagStopSimualtions] = []
self.event_container: List[SimulationSingleEvent] = []
def add_flag(self, flag):
self.flag_container.append(flag)
def add_event(self, event):
self.event_container.append(event)
def reset_flags(self):
for flag in self.flag_container:
flag.reset_flag()
def reset_events(self):
for event in self.event_container:
event.reset()
def run_simulation(self, graph: GraphGrammar, data, vis=True):
self.reset_flags()
simulation = RobotSimulationChrono([])
def run_simulation(self, graph: GraphGrammar, data, vis=False):
self.reset_events()
#simulation = RobotSimulationChrono([])
simulation = RobotSimulationWithForceTest(False, [])
simulation.add_design(graph, data)
grasp_object = self.grasp_object_callback()
shake = YaxisShaker(100, 1, 0.5, float("inf"))
set_covering_sphere_based_position(grasp_object,
reference_point=chrono.ChVectorD(0, 0.05, 0))
simulation.add_object(grasp_object, read_data=True)
simulation.add_object(grasp_object, read_data=True, force_torque_controller=shake)
n_steps = int(self.simulation_length / self.step_length)
env_data_dict = {
"n_contacts": (SensorCalls.AMOUNT_FORCE, SensorObjectClassification.BODY),
......@@ -56,4 +59,4 @@ class ConstTorqueGrasp(ParametrizedSimulation):
"n_contacts": (SensorCalls.AMOUNT_FORCE, SensorObjectClassification.BODY)
}
simulation.add_robot_data_type_dict(robot_data_dict)
return simulation.simulate(n_steps, self.step_length, 10, self.flag_container, vis)
\ No newline at end of file
return simulation.simulate(n_steps, self.step_length, 10, self.event_container, vis)
\ No newline at end of file
......@@ -112,6 +112,23 @@ class Sensor:
]
return output
def get_velocity(self):
output = {}
for idx, body in self.body_map_ordered.items():
output[idx] = [
round(body.body.GetPos_dt().x, 3),
round(body.body.GetPos_dt().y, 3),
round(body.body.GetPos_dt().z, 3)
]
return output
def get_rotation_velocity(self):
output = {}
for idx, body in self.body_map_ordered.items():
mat = body.body.GetA_dt()
output[idx] = [[mat.Get_A_Xaxis.x, mat.Get_A_Yaxis.x, mat.Get_A_Zaxis.x], [mat.Get_A_Xaxis.y, mat.Get_A_Yaxis.y, mat.Get_A_Zaxis.y], [mat.Get_A_Xaxis.z, mat.Get_A_Yaxis.z, mat.Get_A_Zaxis.z]]
return output
def get_joint_trajectory_point(self):
output = {}
for idx, joint in self.joint_map_ordered.items():
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment