In this new ROS Project you are going to learn Step-by-Step how to create a robot cube that moves and that it learns to move using OpenAI environment.
In this fifth and last video of the cube series, we create the Robot environment for OpenAI Gym for the moving cube. You will learn all the details on why we do it like this and we finally execute the script to make the robot cube learn how to walk in one direction.
For any suggestion on the next AI project that we could do, please leave us a comment, we will be happy to hear your ideas :).
Moving Cube Git:
https://bitbucket.org/theconstructcore/moving_cube/src/master/
Moving Cube Training AI Git:
https://bitbucket.org/theconstructcore/moving_cube_ai/src/master/
Step 0. If you didn’t follow up
This is a series posts using ROS Development Studio(ROSDS). In case you haven’t had an account yet, you can create one for free here.
You can find the last post down below:
https://www.theconstruct.ai/ros-projects-openai-moving-cube-robot-gazebo-step-step-part4/
Step 1. Create gym environment file
We’ll start by creating a file called old_way_moving_cube_env.py under the my_moving_cube_training_pkg/script directory with the following content
import gym import rospy import time import numpy as np import math import copy from gym import utils, spaces import numpy from std_msgs.msg import Float64 from sensor_msgs.msg import JointState from rosgraph_msgs.msg import Clock from nav_msgs.msg import Odometry from gazebo_connection import GazeboConnection from controllers_connection import ControllersConnection from gym.utils import seeding from gym.envs.registration import register from geometry_msgs.msg import Point from tf.transformations import euler_from_quaternion reg = register( id='MyOldMovingCube-v0', entry_point='old_way_moving_cube_env:MyOldMovingCubeEnv', timestep_limit=1000, ) class MyOldMovingCubeEnv(gym.Env): def __init__(self): number_actions = rospy.get_param('/moving_cube/n_actions') self.action_space = spaces.Discrete(number_actions) self._seed() #get configuration parameters self.init_roll_vel = rospy.get_param('/moving_cube/init_roll_vel') # Actions self.roll_speed_fixed_value = rospy.get_param('/moving_cube/roll_speed_fixed_value') self.roll_speed_increment_value = rospy.get_param('/moving_cube/roll_speed_increment_value') self.start_point = Point() self.start_point.x = rospy.get_param("/moving_cube/init_cube_pose/x") self.start_point.y = rospy.get_param("/moving_cube/init_cube_pose/y") self.start_point.z = rospy.get_param("/moving_cube/init_cube_pose/z") # Done self.max_pitch_angle = rospy.get_param('/moving_cube/max_pitch_angle') # Rewards self.move_distance_reward_weight = rospy.get_param("/moving_cube/move_distance_reward_weight") self.y_linear_speed_reward_weight = rospy.get_param("/moving_cube/y_linear_speed_reward_weight") self.y_axis_angle_reward_weight = rospy.get_param("/moving_cube/y_axis_angle_reward_weight") self.end_episode_points = rospy.get_param("/moving_cube/end_episode_points") # stablishes connection with simulator self.gazebo = GazeboConnection() self.controllers_list = ['joint_state_controller', 'inertia_wheel_roll_joint_velocity_controller' ] self.controllers_object = ControllersConnection(namespace="moving_cube", controllers_list=self.controllers_list) self.gazebo.unpauseSim() self.controllers_object.reset_controllers() self.check_all_sensors_ready() rospy.Subscriber("/moving_cube/joint_states", JointState, self.joints_callback) rospy.Subscriber("/moving_cube/odom", Odometry, self.odom_callback) self._roll_vel_pub = rospy.Publisher('/moving_cube/inertia_wheel_roll_joint_velocity_controller/command', Float64, queue_size=1) self.check_publishers_connection() self.gazebo.pauseSim() def _seed(self, seed=None): #overriden function self.np_random, seed = seeding.np_random(seed) return [seed] def _step(self, action):#overriden function self.gazebo.unpauseSim() self.set_action(action) self.gazebo.pauseSim() obs = self._get_obs() done = self._is_done(obs) info = {} reward = self.compute_reward(obs, done) simplified_obs = self.convert_obs_to_state(obs) return simplified_obs, reward, done, info def _reset(self): self.gazebo.unpauseSim() self.controllers_object.reset_controllers() self.check_all_sensors_ready() self.set_init_pose() self.gazebo.pauseSim() self.gazebo.resetSim() self.gazebo.unpauseSim() self.controllers_object.reset_controllers() self.check_all_sensors_ready() self.gazebo.pauseSim() self.init_env_variables() obs = self._get_obs() simplified_obs = self.convert_obs_to_state(obs) return simplified_obs def init_env_variables(self): """ Inits variables needed to be initialised each time we reset at the start of an episode. :return: """ self.total_distance_moved = 0.0 self.current_y_distance = self.get_y_dir_distance_from_start_point(self.start_point) self.roll_turn_speed = rospy.get_param('/moving_cube/init_roll_vel') def _is_done(self, observations): pitch_angle = observations[3] if abs(pitch_angle) > self.max_pitch_angle: rospy.logerr("WRONG Cube Pitch Orientation==>" + str(pitch_angle)) done = True else: rospy.logdebug("Cube Pitch Orientation Ok==>" + str(pitch_angle)) done = False return done def set_action(self, action): # We convert the actions to speed movements to send to the parent class CubeSingleDiskEnv if action == 0:# Move Speed Wheel Forwards self.roll_turn_speed = self.roll_speed_fixed_value elif action == 1:# Move Speed Wheel Backwards self.roll_turn_speed = self.roll_speed_fixed_value elif action == 2:# Stop Speed Wheel self.roll_turn_speed = 0.0 elif action == 3:# Increment Speed self.roll_turn_speed += self.roll_speed_increment_value elif action == 4:# Decrement Speed self.roll_turn_speed -= self.roll_speed_increment_value # We clamp Values to maximum rospy.logdebug("roll_turn_speed before clamp=="+str(self.roll_turn_speed)) self.roll_turn_speed = numpy.clip(self.roll_turn_speed, -1*self.roll_speed_fixed_value, self.roll_speed_fixed_value) rospy.logdebug("roll_turn_speed after clamp==" + str(self.roll_turn_speed)) # We tell the OneDiskCube to spin the RollDisk at the selected speed self.move_joints(self.roll_turn_speed) def _get_obs(self): """ Here we define what sensor data defines our robots observations To know which Variables we have acces to, we need to read the MyCubeSingleDiskEnv API DOCS :return: """ # We get the orientation of the cube in RPY roll, pitch, yaw = self.get_orientation_euler() # We get the distance from the origin y_distance = self.get_y_dir_distance_from_start_point(self.start_point) # We get the current speed of the Roll Disk current_disk_roll_vel = self.get_roll_velocity() # We get the linear speed in the y axis y_linear_speed = self.get_y_linear_speed() cube_observations = [ round(current_disk_roll_vel, 0), round(y_distance, 1), round(roll, 1), round(pitch, 1), round(y_linear_speed,1), round(yaw, 1), ] return cube_observations def get_orientation_euler(self): # We convert from quaternions to euler orientation_list = [self.odom.pose.pose.orientation.x, self.odom.pose.pose.orientation.y, self.odom.pose.pose.orientation.z, self.odom.pose.pose.orientation.w] roll, pitch, yaw = euler_from_quaternion(orientation_list) return roll, pitch, yaw def get_roll_velocity(self): # We get the current joint roll velocity roll_vel = self.joints.velocity[0] return roll_vel def get_y_linear_speed(self): # We get the current joint roll velocity y_linear_speed = self.odom.twist.twist.linear.y return y_linear_speed def get_y_dir_distance_from_start_point(self, start_point): """ Calculates the distance from the given point and the current position given by odometry. In this case the increase or decrease in y. :param start_point: :return: """ y_dist_dir = self.odom.pose.pose.position.y - start_point.y return y_dist_dir def compute_reward(self, observations, done): if not done: y_distance_now = observations[1] delta_distance = y_distance_now - self.current_y_distance rospy.logdebug("y_distance_now=" + str(y_distance_now)+", current_y_distance=" + str(self.current_y_distance)) rospy.logdebug("delta_distance=" + str(delta_distance)) reward_distance = delta_distance * self.move_distance_reward_weight self.current_y_distance = y_distance_now y_linear_speed = observations[4] rospy.logdebug("y_linear_speed=" + str(y_linear_speed)) reward_y_axis_speed = y_linear_speed * self.y_linear_speed_reward_weight # Negative Reward for yaw different from zero. yaw_angle = observations[5] rospy.logdebug("yaw_angle=" + str(yaw_angle)) # Worst yaw is 90 and 270 degrees, best 0 and 180. We use sin function for giving reward. sin_yaw_angle = math.sin(yaw_angle) rospy.logdebug("sin_yaw_angle=" + str(sin_yaw_angle)) reward_y_axis_angle = -1 * abs(sin_yaw_angle) * self.y_axis_angle_reward_weight # We are not intereseted in decimals of the reward, doesnt give any advatage. reward = round(reward_distance, 0) + round(reward_y_axis_speed, 0) + round(reward_y_axis_angle, 0) rospy.logdebug("reward_distance=" + str(reward_distance)) rospy.logdebug("reward_y_axis_speed=" + str(reward_y_axis_speed)) rospy.logdebug("reward_y_axis_angle=" + str(reward_y_axis_angle)) rospy.logdebug("reward=" + str(reward)) else: reward = -1*self.end_episode_points return reward def joints_callback(self, data): self.joints = data def odom_callback(self, data): self.odom = data def check_all_sensors_ready(self): self.check_joint_states_ready() self.check_odom_ready() rospy.logdebug("ALL SENSORS READY") def check_joint_states_ready(self): self.joints = None while self.joints is None and not rospy.is_shutdown(): try: self.joints = rospy.wait_for_message("/moving_cube/joint_states", JointState, timeout=1.0) rospy.logdebug("Current moving_cube/joint_states READY=>" + str(self.joints)) except: rospy.logerr("Current moving_cube/joint_states not ready yet, retrying for getting joint_states") return self.joints def check_odom_ready(self): self.odom = None while self.odom is None and not rospy.is_shutdown(): try: self.odom = rospy.wait_for_message("/moving_cube/odom", Odometry, timeout=1.0) rospy.logdebug("Current /moving_cube/odom READY=>" + str(self.odom)) except: rospy.logerr("Current /moving_cube/odom not ready yet, retrying for getting odom") return self.odom def check_publishers_connection(self): """ Checks that all the publishers are working :return: """ rate = rospy.Rate(10) # 10hz while (self._roll_vel_pub.get_num_connections() == 0 and not rospy.is_shutdown()): rospy.logdebug("No susbribers to _roll_vel_pub yet so we wait and try again") try: rate.sleep() except rospy.ROSInterruptException: # This is to avoid error when world is rested, time when backwards. pass rospy.logdebug("_base_pub Publisher Connected") rospy.logdebug("All Publishers READY") def move_joints(self, roll_speed): joint_speed_value = Float64() joint_speed_value.data = roll_speed rospy.logdebug("Single Disk Roll Velocity>>" + str(joint_speed_value)) self._roll_vel_pub.publish(joint_speed_value) self.wait_until_roll_is_in_vel(joint_speed_value.data) def wait_until_roll_is_in_vel(self, velocity): rate = rospy.Rate(10) start_wait_time = rospy.get_rostime().to_sec() end_wait_time = 0.0 epsilon = 0.1 v_plus = velocity + epsilon v_minus = velocity - epsilon while not rospy.is_shutdown(): joint_data = self.check_joint_states_ready() roll_vel = joint_data.velocity[0] rospy.logdebug("VEL=" + str(roll_vel) + ", ?RANGE=[" + str(v_minus) + ","+str(v_plus)+"]") are_close = (roll_vel <= v_plus) and (roll_vel > v_minus) if are_close: rospy.logdebug("Reached Velocity!") end_wait_time = rospy.get_rostime().to_sec() break rospy.logdebug("Not there yet, keep waiting...") rate.sleep() delta_time = end_wait_time- start_wait_time rospy.logdebug("[Wait Time=" + str(delta_time)+"]") return delta_time def set_init_pose(self): """Sets the Robot in its init pose """ self.move_joints(self.init_roll_vel) return True def convert_obs_to_state(self,observations): """ Converts the observations used for reward and so on to the essentials for the robot state In this case we only need the orientation of the cube and the speed of the disc. The distance doesnt condition at all the actions """ disk_roll_vel = observations[0] y_linear_speed = observations[4] yaw_angle = observations[5] state_converted = [disk_roll_vel, y_linear_speed, yaw_angle] return state_converted
From line 22 to line 26 is the most important part of the file. You have to register your environment to OpenAI gym.
In the _get_obs() function in line 169 to line 198, the observation is measured from the simulation and will be used to generate a state.
The set_action() function in line 144 to line 166 executes the action decided by the q learning algorithm.
From line 88 to 188, the code defines what should the environment do when the algorithm runs into step or reset state.
The reward and checking if the training session is done or not is done in the __step() function.
In _reset() function, the environment will at first pause the gazebo simulation and reset the controller. In order to connect to gazebo and controller, we create a file called gazebo_connection.py under the same folder with the following content
#!/usr/bin/env python import rospy from std_srvs.srv import Empty from gazebo_msgs.msg import ODEPhysics from gazebo_msgs.srv import SetPhysicsProperties, SetPhysicsPropertiesRequest from std_msgs.msg import Float64 from geometry_msgs.msg import Vector3 class GazeboConnection(): def __init__(self): self.unpause = rospy.ServiceProxy('/gazebo/unpause_physics', Empty) self.pause = rospy.ServiceProxy('/gazebo/pause_physics', Empty) self.reset_proxy = rospy.ServiceProxy('/gazebo/reset_simulation', Empty) # Setup the Gravity Controle system service_name = '/gazebo/set_physics_properties' rospy.logdebug("Waiting for service " + str(service_name)) rospy.wait_for_service(service_name) rospy.logdebug("Service Found " + str(service_name)) self.set_physics = rospy.ServiceProxy(service_name, SetPhysicsProperties) self.init_values() # We always pause the simulation, important for legged robots learning self.pauseSim() def pauseSim(self): rospy.wait_for_service('/gazebo/pause_physics') try: self.pause() except rospy.ServiceException as e: print ("/gazebo/pause_physics service call failed") def unpauseSim(self): rospy.wait_for_service('/gazebo/unpause_physics') try: self.unpause() except rospy.ServiceException as e: print ("/gazebo/unpause_physics service call failed") def resetSim(self): rospy.wait_for_service('/gazebo/reset_simulation') try: self.reset_proxy() except rospy.ServiceException as e: print ("/gazebo/reset_simulation service call failed") def resetWorld(self): rospy.wait_for_service('/gazebo/reset_world') try: self.reset_proxy() except rospy.ServiceException as e: print ("/gazebo/reset_world service call failed") def init_values(self): rospy.wait_for_service('/gazebo/reset_simulation') try: # reset_proxy.call() self.reset_proxy() except rospy.ServiceException as e: print ("/gazebo/reset_simulation service call failed") self._time_step = Float64(0.001) self._max_update_rate = Float64(1000.0) self._gravity = Vector3() self._gravity.x = 0.0 self._gravity.y = 0.0 self._gravity.z = -9.81 self._ode_config = ODEPhysics() self._ode_config.auto_disable_bodies = False self._ode_config.sor_pgs_precon_iters = 0 self._ode_config.sor_pgs_iters = 50 self._ode_config.sor_pgs_w = 1.3 self._ode_config.sor_pgs_rms_error_tol = 0.0 self._ode_config.contact_surface_layer = 0.001 self._ode_config.contact_max_correcting_vel = 0.0 self._ode_config.cfm = 0.0 self._ode_config.erp = 0.2 self._ode_config.max_contacts = 20 self.update_gravity_call() def update_gravity_call(self): self.pauseSim() set_physics_request = SetPhysicsPropertiesRequest() set_physics_request.time_step = self._time_step.data set_physics_request.max_update_rate = self._max_update_rate.data set_physics_request.gravity = self._gravity set_physics_request.ode_config = self._ode_config rospy.logdebug(str(set_physics_request.gravity)) result = self.set_physics(set_physics_request) rospy.logdebug("Gravity Update Result==" + str(result.success) + ",message==" + str(result.status_message)) self.unpauseSim() def change_gravity(self, x, y, z): self._gravity.x = x self._gravity.y = y self._gravity.z = z self.update_gravity_call()
Then we create another file called controllers_connection.py with the following content
#!/usr/bin/env python import rospy import time from controller_manager_msgs.srv import SwitchController, SwitchControllerRequest, SwitchControllerResponse class ControllersConnection(): def __init__(self, namespace, controllers_list): self.controllers_list = controllers_list self.switch_service_name = '/'+namespace+'/controller_manager/switch_controller' self.switch_service = rospy.ServiceProxy(self.switch_service_name, SwitchController) def switch_controllers(self, controllers_on, controllers_off, strictness=1): """ Give the controllers you want to switch on or off. :param controllers_on: ["name_controler_1", "name_controller2",...,"name_controller_n"] :param controllers_off: ["name_controler_1", "name_controller2",...,"name_controller_n"] :return: """ rospy.wait_for_service(self.switch_service_name) try: switch_request_object = SwitchControllerRequest() switch_request_object.start_controllers = controllers_on switch_request_object.start_controllers = controllers_off switch_request_object.strictness = strictness switch_result = self.switch_service(switch_request_object) """ [controller_manager_msgs/SwitchController] int32 BEST_EFFORT=1 int32 STRICT=2 string[] start_controllers string[] stop_controllers int32 strictness --- bool ok """ rospy.logdebug("Switch Result==>"+str(switch_result.ok)) return switch_result.ok except rospy.ServiceException as e: print (self.switch_service_name+" service call failed") return None def reset_controllers(self): """ We turn on and off the given controllers :param controllers_reset: ["name_controler_1", "name_controller2",...,"name_controller_n"] :return: """ reset_result = False result_off_ok = self.switch_controllers(controllers_on = [], controllers_off = self.controllers_list) rospy.logdebug("Deactivated Controlers") if result_off_ok: rospy.logdebug("Activating Controlers") result_on_ok = self.switch_controllers(controllers_on=self.controllers_list, controllers_off=[]) if result_on_ok: rospy.logdebug("Controllers Reseted==>"+str(self.controllers_list)) reset_result = True else: rospy.logdebug("result_on_ok==>" + str(result_on_ok)) else: rospy.logdebug("result_off_ok==>" + str(result_off_ok)) return reset_result def update_controllers_list(self, new_controllers_list): self.controllers_list = new_controllers_list
Step 2. Start training
Now you have all the script you need for training, let’s create a launch file to launch the training under my_moving_cube_training_pkg/launch with the name start_training.launch
<launch> <rosparam command="load" file="$(find my_moving_cube_training_pkg)/config/one_disk_walk_openai_params.yaml" /> <!-- Launch the training system --> <node pkg="my_moving_cube_training_pkg" name="movingcube_gym" type="oldway_start_training.py" output="screen"/> </launch>
If you already closed the simulation, please start it again from Simulations->select launch file->main.launch
Then run the following command to launch the training.
cd ~/catkin_ws source devel/setup.bash roslaunch my_moving_cube_training_pkg start_training.launch
You should see the cube robot now moving around to find the best way to move, you can play with different parameters to optimize the training.
If you are interested in this topic, please do not forget to check our OpenAI course at Robot Ignite Academy where you can learn how to create the gym environment for different robots!
Edit by: Tony Huang
0 Comments
Trackbacks/Pingbacks