Skip to content

Reactive Planner Integration

Content

This example shows how to integrate the velocity planner into the reactive planner. This is, for example, useful in scenarios in which there are sharp curves after longer straight parts with high velocity, as the reactive planner might not break early enough.

from copy import deepcopy
from pathlib import Path
import logging

# commonroad
import commonroad_route_planner.fast_api.fast_api as rfapi
from commonroad_route_planner.reference_path import ReferencePath
from commonroad_rp.reactive_planner import ReactivePlanner
from commonroad_rp.utility.visualization import visualize_planner_at_timestep
from commonroad_rp.utility.evaluation import run_evaluation
from commonroad_rp.utility.config import ReactivePlannerConfiguration
from commonroad_rp.utility.logger import initialize_logger

# own code base
from commonroad_velocity_planner.velocity_planner_interface import IVelocityPlanner
from commonroad_velocity_planner.configuration.configuration_builder import ConfigurationBuilder
from commonroad_velocity_planner.velocity_planning_problem import VppBuilder

path_scenario = Path(__file__).parents[1] / "scenarios" / YOUR_FILENAME
path_config = Path(__file__).parents[0] / "artifacts" / "rp_config.yaml"

# Build config object
config = ReactivePlannerConfiguration.load(path_config, path_scenario)
config.update()

initialize_logger(config)
logger = logging.getLogger("RP_LOGGER")
logger.setLevel(logging.ERROR)

# Route Planner
reference_path = rfapi.generate_reference_path_from_scenario_and_planning_problem(
    scenario=config.scenario,
    planning_problem=config.planning_problem
)
# Velocity Planner
global_trajectory = IVelocityPlanner().plan_velocity(
    reference_path=reference_path,
    planner_config=ConfigurationBuilder().get_predefined_configuration(),
    velocity_planning_problem=VppBuilder().build_vpp(
        reference_path=reference_path,
        planning_problem=config.planning_problem,
        default_goal_velocity=config.planning_problem.initial_state.velocity
    )
)

planner = ReactivePlanner(config)

# set reference path for curvilinear coordinate system
planner.set_reference_path(global_trajectory.reference_path)

# **************************
# Run Planning
# **************************
# Add first state to recorded state and input list
planner.record_state_and_input(planner.x_0)

SAMPLING_ITERATION_IN_PLANNER = True

while not planner.goal_reached():
    current_count = len(planner.record_state_list) - 1

    # check if planning cycle or not
    plan_new_trajectory = current_count % config.planning.replanning_frequency == 0
    if plan_new_trajectory:

        # new planning cycle -> plan a new optimal trajectory
        current_position = planner.record_state_list[-1].position if planner.record_state_list else planner.x_0.position
        desired_speed: float = global_trajectory.get_velocity_at_position_with_lookahead(
            position=current_position,
            lookahead_s=2.0
        )

        # Change here to with or without desired velocity
        planner.set_desired_velocity(desired_velocity=desired_speed)

        if SAMPLING_ITERATION_IN_PLANNER:
            optimal = planner.plan()
        else:
            optimal = None
            i = 1
            while optimal is None and i <= planner.sampling_level:
                optimal = planner.plan(i)

        if not optimal:
            break

        # record state and input
        planner.record_state_and_input(optimal[0].state_list[1])

        # reset planner state for re-planning
        planner.reset(initial_state_cart=planner.record_state_list[-1],
                      initial_state_curv=(optimal[2][1], optimal[3][1]),
                      collision_checker=planner.collision_checker, coordinate_system=planner.coordinate_system)

        # visualization: create ego Vehicle for planned trajectory and store sampled trajectory set
        if config.debug.show_plots or config.debug.save_plots:
            ego_vehicle = planner.convert_state_list_to_commonroad_object(optimal[0].state_list)
            sampled_trajectory_bundle = None
            if config.debug.draw_traj_set:
                sampled_trajectory_bundle = deepcopy(planner.stored_trajectories)
    else:
        # simulate scenario one step forward with planned trajectory
        sampled_trajectory_bundle = None

        # continue on optimal trajectory
        temp = current_count % config.planning.replanning_frequency

        # record state and input
        planner.record_state_and_input(optimal[0].state_list[1 + temp])

        # reset planner state for re-planning
        planner.reset(initial_state_cart=planner.record_state_list[-1],
                      initial_state_curv=(optimal[2][1 + temp], optimal[3][1 + temp]),
                      collision_checker=planner.collision_checker, coordinate_system=planner.coordinate_system)

    # print(f"current time step: {current_count}")

    # visualize the current time step of the simulation
    if config.debug.show_plots or config.debug.save_plots:
        visualize_planner_at_timestep(scenario=config.scenario, planning_problem=config.planning_problem,
                                      ego=ego_vehicle, traj_set=sampled_trajectory_bundle,
                                      ref_path=planner.reference_path, timestep=current_count, config=config)

# make gif
# make_gif(config, range(0, planner.record_state_list[-1].time_step))

# **************************
# Evaluate results
# **************************
evaluate = True
if evaluate:
    cr_solution, feasibility_list = run_evaluation(planner.config, planner.record_state_list,
                                                   planner.record_input_list)