Step 1. Create a project in ROS Development Studio(ROSDS)
We’ll use ROSDS through this project in order to avoid setting up the environment, manage packages and etc. You can create a free account here if you haven’t had an account yet.
Step 2. Create package
Since this is a simulation, let’s create a package called my_moving_cube_description under the simulation_ws.
cd ~/simulation_ws/src
catkin_create_pkg my_moving_cube_description rospy
We’ll start by building the URDF description of the robot. To do that, we’ll create a new folder called urdf under the my_moving_cube_description directory and create a file called my_moving_cube.urdf inside it with the following initial content. The robot tag indicates the name of the robot – my_moving_cube.
<robot name="my_moving_cube">
...
</robot>
Then let’s create the first link inside the robot. This includes 3 parts :
inertial: It defines the physical property of the link. You can calculate the inertia of an object by using this tool: rosrun spawn_robot_tools_pkg inertial_calculator.py
collision: It defines the collision property when the object interacts with other objects in the simulation.
visual: It defines the visual property, how the object will visually show in the simulation.
You also need to define the material property after the link if you want to use it in gazebo. (NOTICE: the reference of the material property should have the same name as the link)
Then we create a moving_cube.yaml file under the my_moving_cube_description/config to define parameters for the controller
# .yaml config file
#
# The PID gains and controller settings must be saved in a yaml file that gets loaded
# to the param server via the roslaunch file (moving_cube_control.launch).
my_moving_cube:
# Publish all joint states -----------------------------------
# Creates the /joint_states topic necessary in ROS
joint_state_controller:
type: joint_state_controller/JointStateController
publish_rate: 30
# Effort Controllers ---------------------------------------
inertia_wheel_roll_joint_velocity_controller:
type: effort_controllers/JointVelocityController
joint: inertia_wheel_roll_joint
pid: {p: 1.0, i: 0.0, d: 0.0}
In the end, you should create one new launch file called moving_cube control.launch under the launch folder to launch the controller
This is a series of posts. If you didn’t follow up, you can find the previous post here. In this 5th video of the robotic manipulator series, we will expand the ROS controllers to all joints of our robot using XACRO. At the end of the video we’ll have a full controlled robot through ROS topics. We are also going to use RQT Publisher and RQT Reconfigure to do some experiments with the robot.
Step 0. Create a project in ROS Development Studio(ROSDS)
ROSDS helps you follow our tutorial in a fast pace without dealing without setting up an environment locally. If you haven’t had an account yet, you can create a free account here. We’ll start the project for the previous video – manipulator_video_no4.
Step 1. Configure controller
In order to use controllers for our robot. The first step is we need to add transmission definitions for each joint in the links_joints.xarco and add limitation for each joint in the mrm.xarco file. The most important thing is to add the gazebo plugin in the mrm.xarco file.
Did you like the video? If you did please give us a thumbs up and remember to subscribe to our channel and press the bell for a new video every day. Either you like it or not, please share your thoughts and questions in the comments area. See you!
Got a suggestion for the next steps to take of this project? We would love to hear them in the comments bellow :).
Part 2
Step 1. Environment setup
We’ll start by explaining how to create the gym environment for training. You can create a file called monoped_env.py under the my_hopper_training/src directory with the following content
#!/usr/bin/env python
'''
By Miguel Angel Rodriguez <duckfrost@theconstructsim.com>
Visit our website at ec2-54-246-60-98.eu-west-1.compute.amazonaws.com
'''
import gym
import rospy
import numpy as np
import time
from gym import utils, spaces
from geometry_msgs.msg import Pose
from gym.utils import seeding
from gym.envs.registration import register
from gazebo_connection import GazeboConnection
from joint_publisher import JointPub
from monoped_state import MonopedState
from controllers_connection import ControllersConnection
#register the training environment in the gym as an available one
reg = register(
id='Monoped-v0',
entry_point='monoped_env:MonopedEnv',
timestep_limit=50,
)
class MonopedEnv(gym.Env):
def __init__(self):
# We assume that a ROS node has already been created
# before initialising the environment
# gets training parameters from param server
self.desired_pose = Pose()
self.desired_pose.position.x = rospy.get_param("/desired_pose/x")
self.desired_pose.position.y = rospy.get_param("/desired_pose/y")
self.desired_pose.position.z = rospy.get_param("/desired_pose/z")
self.running_step = rospy.get_param("/running_step")
self.max_incl = rospy.get_param("/max_incl")
self.max_height = rospy.get_param("/max_height")
self.min_height = rospy.get_param("/min_height")
self.joint_increment_value = rospy.get_param("/joint_increment_value")
self.done_reward = rospy.get_param("/done_reward")
self.alive_reward = rospy.get_param("/alive_reward")
self.desired_force = rospy.get_param("/desired_force")
self.desired_yaw = rospy.get_param("/desired_yaw")
self.weight_r1 = rospy.get_param("/weight_r1")
self.weight_r2 = rospy.get_param("/weight_r2")
self.weight_r3 = rospy.get_param("/weight_r3")
self.weight_r4 = rospy.get_param("/weight_r4")
self.weight_r5 = rospy.get_param("/weight_r5")
# stablishes connection with simulator
self.gazebo = GazeboConnection()
self.controllers_object = ControllersConnection(namespace="monoped")
self.monoped_state_object = MonopedState( max_height=self.max_height,
min_height=self.min_height,
abs_max_roll=self.max_incl,
abs_max_pitch=self.max_incl,
joint_increment_value=self.joint_increment_value,
done_reward=self.done_reward,
alive_reward=self.alive_reward,
desired_force=self.desired_force,
desired_yaw=self.desired_yaw,
weight_r1=self.weight_r1,
weight_r2=self.weight_r2,
weight_r3=self.weight_r3,
weight_r4=self.weight_r4,
weight_r5=self.weight_r5
)
self.monoped_state_object.set_desired_world_point(self.desired_pose.position.x,
self.desired_pose.position.y,
self.desired_pose.position.z)
self.monoped_joint_pubisher_object = JointPub()
"""
For this version, we consider 6 actions
1-2) Increment/Decrement haa_joint
3-4) Increment/Decrement hfe_joint
5-6) Increment/Decrement kfe_joint
"""
self.action_space = spaces.Discrete(6)
self.reward_range = (-np.inf, np.inf)
self._seed()
# A function to initialize the random generator
def _seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
# Resets the state of the environment and returns an initial observation.
def _reset(self):
# 0st: We pause the Simulator
rospy.logdebug("Pausing SIM...")
self.gazebo.pauseSim()
# 1st: resets the simulation to initial values
rospy.logdebug("Reset SIM...")
self.gazebo.resetSim()
# 2nd: We Set the gravity to 0.0 so that we dont fall when reseting joints
# It also UNPAUSES the simulation
rospy.logdebug("Remove Gravity...")
self.gazebo.change_gravity(0.0, 0.0, 0.0)
# EXTRA: Reset JoinStateControlers because sim reset doesnt reset TFs, generating time problems
rospy.logdebug("reset_monoped_joint_controllers...")
self.controllers_object.reset_monoped_joint_controllers()
# 3rd: resets the robot to initial conditions
rospy.logdebug("set_init_pose...")
self.monoped_joint_pubisher_object.set_init_pose()
# 5th: Check all subscribers work.
# Get the state of the Robot defined by its RPY orientation, distance from
# desired point, contact force and JointState of the three joints
rospy.logdebug("check_all_systems_ready...")
self.monoped_state_object.check_all_systems_ready()
rospy.logdebug("get_observations...")
observation = self.monoped_state_object.get_observations()
# 6th: We restore the gravity to original
rospy.logdebug("Restore Gravity...")
self.gazebo.change_gravity(0.0, 0.0, -9.81)
# 7th: pauses simulation
rospy.logdebug("Pause SIM...")
self.gazebo.pauseSim()
# Get the State Discrete Stringuified version of the observations
state = self.get_state(observation)
return state
def _step(self, action):
# Given the action selected by the learning algorithm,
# we perform the corresponding movement of the robot
# 1st, decide which action corresponsd to which joint is incremented
next_action_position = self.monoped_state_object.get_action_to_position(action)
# We move it to that pos
self.gazebo.unpauseSim()
self.monoped_joint_pubisher_object.move_joints(next_action_position)
# Then we send the command to the robot and let it go
# for running_step seconds
time.sleep(self.running_step)
self.gazebo.pauseSim()
# We now process the latest data saved in the class state to calculate
# the state and the rewards. This way we guarantee that they work
# with the same exact data.
# Generate State based on observations
observation = self.monoped_state_object.get_observations()
# finally we get an evaluation based on what happened in the sim
reward,done = self.monoped_state_object.process_data()
# Get the State Discrete Stringuified version of the observations
state = self.get_state(observation)
return state, reward, done, {}
def get_state(self, observation):
"""
We retrieve the Stringuified-Discrete version of the given observation
:return: state
"""
return self.monoped_state_object.get_state_as_string(observation)
This script creates the gym environment with the following parts:
1.Register the training environment
You will always need to do this step to inform the gym package that a new training environment is created.
2.Load the desired pose
Parameters related to the training is loaded at this step.
3.Connect with the gazebo
In order to connect the gym environment with gazebo simulation, we create a script called gazebo_connection.py in the same directory with the following content
#!/usr/bin/env python
import rospy
from std_srvs.srv import Empty
from gazebo_msgs.msg import ODEPhysics
from gazebo_msgs.srv import SetPhysicsProperties, SetPhysicsPropertiesRequest
from std_msgs.msg import Float64
from geometry_msgs.msg import Vector3
class GazeboConnection():
def __init__(self):
self.unpause = rospy.ServiceProxy('/gazebo/unpause_physics', Empty)
self.pause = rospy.ServiceProxy('/gazebo/pause_physics', Empty)
self.reset_proxy = rospy.ServiceProxy('/gazebo/reset_simulation', Empty)
# Setup the Gravity Controle system
service_name = '/gazebo/set_physics_properties'
rospy.logdebug("Waiting for service " + str(service_name))
rospy.wait_for_service(service_name)
rospy.logdebug("Service Found " + str(service_name))
self.set_physics = rospy.ServiceProxy(service_name, SetPhysicsProperties)
self.init_values()
# We always pause the simulation, important for legged robots learning
self.pauseSim()
def pauseSim(self):
rospy.wait_for_service('/gazebo/pause_physics')
try:
self.pause()
except rospy.ServiceException, e:
print ("/gazebo/pause_physics service call failed")
def unpauseSim(self):
rospy.wait_for_service('/gazebo/unpause_physics')
try:
self.unpause()
except rospy.ServiceException, e:
print ("/gazebo/unpause_physics service call failed")
def resetSim(self):
rospy.wait_for_service('/gazebo/reset_simulation')
try:
self.reset_proxy()
except rospy.ServiceException, e:
print ("/gazebo/reset_simulation service call failed")
def resetWorld(self):
rospy.wait_for_service('/gazebo/reset_world')
try:
self.reset_proxy()
except rospy.ServiceException, e:
print ("/gazebo/reset_world service call failed")
def init_values(self):
rospy.wait_for_service('/gazebo/reset_simulation')
try:
# reset_proxy.call()
self.reset_proxy()
except rospy.ServiceException, e:
print ("/gazebo/reset_simulation service call failed")
self._time_step = Float64(0.001)
self._max_update_rate = Float64(1000.0)
self._gravity = Vector3()
self._gravity.x = 0.0
self._gravity.y = 0.0
self._gravity.z = 0.0
self._ode_config = ODEPhysics()
self._ode_config.auto_disable_bodies = False
self._ode_config.sor_pgs_precon_iters = 0
self._ode_config.sor_pgs_iters = 50
self._ode_config.sor_pgs_w = 1.3
self._ode_config.sor_pgs_rms_error_tol = 0.0
self._ode_config.contact_surface_layer = 0.001
self._ode_config.contact_max_correcting_vel = 0.0
self._ode_config.cfm = 0.0
self._ode_config.erp = 0.2
self._ode_config.max_contacts = 20
self.update_gravity_call()
def update_gravity_call(self):
self.pauseSim()
set_physics_request = SetPhysicsPropertiesRequest()
set_physics_request.time_step = self._time_step.data
set_physics_request.max_update_rate = self._max_update_rate.data
set_physics_request.gravity = self._gravity
set_physics_request.ode_config = self._ode_config
rospy.logdebug(str(set_physics_request.gravity))
result = self.set_physics(set_physics_request)
rospy.logdebug("Gravity Update Result==" + str(result.success) + ",message==" + str(result.status_message))
self.unpauseSim()
def change_gravity(self, x, y, z):
self._gravity.x = x
self._gravity.y = y
self._gravity.z = z
self.update_gravity_call()
This script is responsible to reset, pause and unpause the simulation for the training script.
4.Connect with the controller
Since TF doesn’t like that we reset the environment and it will generate some problems. We have to manually reset the controller with the controller_connection.py script. We won’t go into detail here. If you want to learn more about the controller, please check our controller 101 course.
#!/usr/bin/env python
'''
By Miguel Angel Rodriguez <duckfrost@theconstructsim.com>
Visit our website at ec2-54-246-60-98.eu-west-1.compute.amazonaws.com
'''
import gym
import rospy
import numpy as np
import time
from gym import utils, spaces
from geometry_msgs.msg import Pose
from gym.utils import seeding
from gym.envs.registration import register
from gazebo_connection import GazeboConnection
from joint_publisher import JointPub
from monoped_state import MonopedState
from controllers_connection import ControllersConnection
#register the training environment in the gym as an available one
reg = register(
id='Monoped-v0',
entry_point='monoped_env:MonopedEnv',
timestep_limit=50,
)
class MonopedEnv(gym.Env):
def __init__(self):
# We assume that a ROS node has already been created
# before initialising the environment
# gets training parameters from param server
self.desired_pose = Pose()
self.desired_pose.position.x = rospy.get_param("/desired_pose/x")
self.desired_pose.position.y = rospy.get_param("/desired_pose/y")
self.desired_pose.position.z = rospy.get_param("/desired_pose/z")
self.running_step = rospy.get_param("/running_step")
self.max_incl = rospy.get_param("/max_incl")
self.max_height = rospy.get_param("/max_height")
self.min_height = rospy.get_param("/min_height")
self.joint_increment_value = rospy.get_param("/joint_increment_value")
self.done_reward = rospy.get_param("/done_reward")
self.alive_reward = rospy.get_param("/alive_reward")
self.desired_force = rospy.get_param("/desired_force")
self.desired_yaw = rospy.get_param("/desired_yaw")
self.weight_r1 = rospy.get_param("/weight_r1")
self.weight_r2 = rospy.get_param("/weight_r2")
self.weight_r3 = rospy.get_param("/weight_r3")
self.weight_r4 = rospy.get_param("/weight_r4")
self.weight_r5 = rospy.get_param("/weight_r5")
# stablishes connection with simulator
self.gazebo = GazeboConnection()
self.controllers_object = ControllersConnection(namespace="monoped")
self.monoped_state_object = MonopedState( max_height=self.max_height,
min_height=self.min_height,
abs_max_roll=self.max_incl,
abs_max_pitch=self.max_incl,
joint_increment_value=self.joint_increment_value,
done_reward=self.done_reward,
alive_reward=self.alive_reward,
desired_force=self.desired_force,
desired_yaw=self.desired_yaw,
weight_r1=self.weight_r1,
weight_r2=self.weight_r2,
weight_r3=self.weight_r3,
weight_r4=self.weight_r4,
weight_r5=self.weight_r5
)
self.monoped_state_object.set_desired_world_point(self.desired_pose.position.x,
self.desired_pose.position.y,
self.desired_pose.position.z)
self.monoped_joint_pubisher_object = JointPub()
"""
For this version, we consider 6 actions
1-2) Increment/Decrement haa_joint
3-4) Increment/Decrement hfe_joint
5-6) Increment/Decrement kfe_joint
"""
self.action_space = spaces.Discrete(6)
self.reward_range = (-np.inf, np.inf)
self._seed()
# A function to initialize the random generator
def _seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
# Resets the state of the environment and returns an initial observation.
def _reset(self):
# 0st: We pause the Simulator
rospy.logdebug("Pausing SIM...")
self.gazebo.pauseSim()
# 1st: resets the simulation to initial values
rospy.logdebug("Reset SIM...")
self.gazebo.resetSim()
# 2nd: We Set the gravity to 0.0 so that we dont fall when reseting joints
# It also UNPAUSES the simulation
rospy.logdebug("Remove Gravity...")
self.gazebo.change_gravity(0.0, 0.0, 0.0)
# EXTRA: Reset JoinStateControlers because sim reset doesnt reset TFs, generating time problems
rospy.logdebug("reset_monoped_joint_controllers...")
self.controllers_object.reset_monoped_joint_controllers()
# 3rd: resets the robot to initial conditions
rospy.logdebug("set_init_pose...")
self.monoped_joint_pubisher_object.set_init_pose()
# 5th: Check all subscribers work.
# Get the state of the Robot defined by its RPY orientation, distance from
# desired point, contact force and JointState of the three joints
rospy.logdebug("check_all_systems_ready...")
self.monoped_state_object.check_all_systems_ready()
rospy.logdebug("get_observations...")
observation = self.monoped_state_object.get_observations()
# 6th: We restore the gravity to original
rospy.logdebug("Restore Gravity...")
self.gazebo.change_gravity(0.0, 0.0, -9.81)
# 7th: pauses simulation
rospy.logdebug("Pause SIM...")
self.gazebo.pauseSim()
# Get the State Discrete Stringuified version of the observations
state = self.get_state(observation)
return state
def _step(self, action):
# Given the action selected by the learning algorithm,
# we perform the corresponding movement of the robot
# 1st, decide which action corresponsd to which joint is incremented
next_action_position = self.monoped_state_object.get_action_to_position(action)
# We move it to that pos
self.gazebo.unpauseSim()
self.monoped_joint_pubisher_object.move_joints(next_action_position)
# Then we send the command to the robot and let it go
# for running_step seconds
time.sleep(self.running_step)
self.gazebo.pauseSim()
# We now process the latest data saved in the class state to calculate
# the state and the rewards. This way we guarantee that they work
# with the same exact data.
# Generate State based on observations
observation = self.monoped_state_object.get_observations()
# finally we get an evaluation based on what happened in the sim
reward,done = self.monoped_state_object.process_data()
# Get the State Discrete Stringuified version of the observations
state = self.get_state(observation)
return state, reward, done, {}
def get_state(self, observation):
"""
We retrieve the Stringuified-Discrete version of the given observation
:return: state
"""
return self.monoped_state_object.get_state_as_string(observation)
5.Generate state object
The state object is then generated with the current state with the following monoped_state.py script
#!/usr/bin/env python
'''
By Miguel Angel Rodriguez <duckfrost@theconstructsim.com>
Visit our website at ec2-54-246-60-98.eu-west-1.compute.amazonaws.com
'''
import gym
import rospy
import numpy as np
import time
from gym import utils, spaces
from geometry_msgs.msg import Pose
from gym.utils import seeding
from gym.envs.registration import register
from gazebo_connection import GazeboConnection
from joint_publisher import JointPub
from monoped_state import MonopedState
from controllers_connection import ControllersConnection
#register the training environment in the gym as an available one
reg = register(
id='Monoped-v0',
entry_point='monoped_env:MonopedEnv',
timestep_limit=50,
)
class MonopedEnv(gym.Env):
def __init__(self):
# We assume that a ROS node has already been created
# before initialising the environment
# gets training parameters from param server
self.desired_pose = Pose()
self.desired_pose.position.x = rospy.get_param("/desired_pose/x")
self.desired_pose.position.y = rospy.get_param("/desired_pose/y")
self.desired_pose.position.z = rospy.get_param("/desired_pose/z")
self.running_step = rospy.get_param("/running_step")
self.max_incl = rospy.get_param("/max_incl")
self.max_height = rospy.get_param("/max_height")
self.min_height = rospy.get_param("/min_height")
self.joint_increment_value = rospy.get_param("/joint_increment_value")
self.done_reward = rospy.get_param("/done_reward")
self.alive_reward = rospy.get_param("/alive_reward")
self.desired_force = rospy.get_param("/desired_force")
self.desired_yaw = rospy.get_param("/desired_yaw")
self.weight_r1 = rospy.get_param("/weight_r1")
self.weight_r2 = rospy.get_param("/weight_r2")
self.weight_r3 = rospy.get_param("/weight_r3")
self.weight_r4 = rospy.get_param("/weight_r4")
self.weight_r5 = rospy.get_param("/weight_r5")
# stablishes connection with simulator
self.gazebo = GazeboConnection()
self.controllers_object = ControllersConnection(namespace="monoped")
self.monoped_state_object = MonopedState( max_height=self.max_height,
min_height=self.min_height,
abs_max_roll=self.max_incl,
abs_max_pitch=self.max_incl,
joint_increment_value=self.joint_increment_value,
done_reward=self.done_reward,
alive_reward=self.alive_reward,
desired_force=self.desired_force,
desired_yaw=self.desired_yaw,
weight_r1=self.weight_r1,
weight_r2=self.weight_r2,
weight_r3=self.weight_r3,
weight_r4=self.weight_r4,
weight_r5=self.weight_r5
)
self.monoped_state_object.set_desired_world_point(self.desired_pose.position.x,
self.desired_pose.position.y,
self.desired_pose.position.z)
self.monoped_joint_pubisher_object = JointPub()
"""
For this version, we consider 6 actions
1-2) Increment/Decrement haa_joint
3-4) Increment/Decrement hfe_joint
5-6) Increment/Decrement kfe_joint
"""
self.action_space = spaces.Discrete(6)
self.reward_range = (-np.inf, np.inf)
self._seed()
# A function to initialize the random generator
def _seed(self, seed=None):
self.np_random, seed = seeding.np_random(seed)
return [seed]
# Resets the state of the environment and returns an initial observation.
def _reset(self):
# 0st: We pause the Simulator
rospy.logdebug("Pausing SIM...")
self.gazebo.pauseSim()
# 1st: resets the simulation to initial values
rospy.logdebug("Reset SIM...")
self.gazebo.resetSim()
# 2nd: We Set the gravity to 0.0 so that we dont fall when reseting joints
# It also UNPAUSES the simulation
rospy.logdebug("Remove Gravity...")
self.gazebo.change_gravity(0.0, 0.0, 0.0)
# EXTRA: Reset JoinStateControlers because sim reset doesnt reset TFs, generating time problems
rospy.logdebug("reset_monoped_joint_controllers...")
self.controllers_object.reset_monoped_joint_controllers()
# 3rd: resets the robot to initial conditions
rospy.logdebug("set_init_pose...")
self.monoped_joint_pubisher_object.set_init_pose()
# 5th: Check all subscribers work.
# Get the state of the Robot defined by its RPY orientation, distance from
# desired point, contact force and JointState of the three joints
rospy.logdebug("check_all_systems_ready...")
self.monoped_state_object.check_all_systems_ready()
rospy.logdebug("get_observations...")
observation = self.monoped_state_object.get_observations()
# 6th: We restore the gravity to original
rospy.logdebug("Restore Gravity...")
self.gazebo.change_gravity(0.0, 0.0, -9.81)
# 7th: pauses simulation
rospy.logdebug("Pause SIM...")
self.gazebo.pauseSim()
# Get the State Discrete Stringuified version of the observations
state = self.get_state(observation)
return state
def _step(self, action):
# Given the action selected by the learning algorithm,
# we perform the corresponding movement of the robot
# 1st, decide which action corresponsd to which joint is incremented
next_action_position = self.monoped_state_object.get_action_to_position(action)
# We move it to that pos
self.gazebo.unpauseSim()
self.monoped_joint_pubisher_object.move_joints(next_action_position)
# Then we send the command to the robot and let it go
# for running_step seconds
time.sleep(self.running_step)
self.gazebo.pauseSim()
# We now process the latest data saved in the class state to calculate
# the state and the rewards. This way we guarantee that they work
# with the same exact data.
# Generate State based on observations
observation = self.monoped_state_object.get_observations()
# finally we get an evaluation based on what happened in the sim
reward,done = self.monoped_state_object.process_data()
# Get the State Discrete Stringuified version of the observations
state = self.get_state(observation)
return state, reward, done, {}
def get_state(self, observation):
"""
We retrieve the Stringuified-Discrete version of the given observation
:return: state
"""
return self.monoped_state_object.get_state_as_string(observation)
6.Reset
Setup the environment back to its initial observation. This includes the steps like pause the simulation, reset the controller and etc. as we described before.
7.Step
In this part, an action will be decided based on the current state and then be published with the following joint_publisher.py script. After that, the reward is calculated based on the new observation.
#!/usr/bin/env python
import rospy
import math
from std_msgs.msg import String
from std_msgs.msg import Float64
class JointPub(object):
def __init__(self):
self.publishers_array = []
self._haa_joint_pub = rospy.Publisher('/monoped/haa_joint_position_controller/command', Float64, queue_size=1)
self._hfe_joint_pub = rospy.Publisher('/monoped/hfe_joint_position_controller/command', Float64, queue_size=1)
self._kfe_joint_pub = rospy.Publisher('/monoped/kfe_joint_position_controller/command', Float64, queue_size=1)
self.publishers_array.append(self._haa_joint_pub)
self.publishers_array.append(self._hfe_joint_pub)
self.publishers_array.append(self._kfe_joint_pub)
self.init_pos = [0.0,0.0,0.0]
def set_init_pose(self):
"""
Sets joints to initial position [0,0,0]
:return:
"""
self.check_publishers_connection()
self.move_joints(self.init_pos)
def check_publishers_connection(self):
"""
Checks that all the publishers are working
:return:
"""
rate = rospy.Rate(10) # 10hz
while (self._haa_joint_pub.get_num_connections() == 0):
rospy.logdebug("No susbribers to _haa_joint_pub yet so we wait and try again")
try:
rate.sleep()
except rospy.ROSInterruptException:
# This is to avoid error when world is rested, time when backwards.
pass
rospy.logdebug("_haa_joint_pub Publisher Connected")
while (self._hfe_joint_pub.get_num_connections() == 0):
rospy.logdebug("No susbribers to _hfe_joint_pub yet so we wait and try again")
try:
rate.sleep()
except rospy.ROSInterruptException:
# This is to avoid error when world is rested, time when backwards.
pass
rospy.logdebug("_hfe_joint_pub Publisher Connected")
while (self._kfe_joint_pub.get_num_connections() == 0):
rospy.logdebug("No susbribers to _kfe_joint_pub yet so we wait and try again")
try:
rate.sleep()
except rospy.ROSInterruptException:
# This is to avoid error when world is rested, time when backwards.
pass
rospy.logdebug("_kfe_joint_pub Publisher Connected")
rospy.logdebug("All Publishers READY")
def joint_mono_des_callback(self, msg):
rospy.logdebug(str(msg.joint_state.position))
self.move_joints(msg.joint_state.position)
def move_joints(self, joints_array):
i = 0
for publisher_object in self.publishers_array:
joint_value = Float64()
joint_value.data = joints_array[i]
rospy.logdebug("JointsPos>>"+str(joint_value))
publisher_object.publish(joint_value)
i += 1
def start_loop(self, rate_value = 2.0):
rospy.logdebug("Start Loop")
pos1 = [0.0,0.0,1.6]
pos2 = [0.0,0.0,-1.6]
position = "pos1"
rate = rospy.Rate(rate_value)
while not rospy.is_shutdown():
if position == "pos1":
self.move_joints(pos1)
position = "pos2"
else:
self.move_joints(pos2)
position = "pos1"
rate.sleep()
def start_sinus_loop(self, rate_value = 2.0):
rospy.logdebug("Start Loop")
w = 0.0
x = 2.0*math.sin(w)
#pos_x = [0.0,0.0,x]
#pos_x = [x, 0.0, 0.0]
pos_x = [0.0, x, 0.0]
rate = rospy.Rate(rate_value)
while not rospy.is_shutdown():
self.move_joints(pos_x)
w += 0.05
x = 2.0 * math.sin(w)
#pos_x = [0.0, 0.0, x]
#pos_x = [x, 0.0, 0.0]
pos_x = [0.0, x, 0.0]
rate.sleep()
if __name__=="__main__":
rospy.init_node('joint_publisher_node')
joint_publisher = JointPub()
rate_value = 50.0
#joint_publisher.start_loop(rate_value)
joint_publisher.start_sinus_loop(rate_value)
Step 2. Training
Now you have all the code you need to start training. Let’s run the simulation first. Go to Simulations -> Select launch file -> my_legged_robot_sims->main.launch to launch the hopper robot simulation in Gazebo.
Then you can run the training with
roslaunch my_hopper_training main.launch
The algorithm is working and start to train the robot to perform the task we want, however, it still requires lots of tuning to work properly.
You can have multiple instances and train robots with different parameters in parallel with our paid program in ROSDS. Please check it here if you are interested.
In this video we are going to set up in our robotic manipulator a new URDF element: Transmissions. After that, we are able to integrate ROS and gazebo simulation using ROS controllers. We’ll use a simplified model of the robot to see how it works. At the end of the video we’ll be able to send joint position commands to the robot publishing to some ROS topics.
Find a complete course about ROS controllers and URDF
Robot Ignite Academy:
https://goo.gl/93gmxz
In this series, we are going to show you how to build a hopper robot in ROS and make it learn to hop using reinforcement learning algorithm. The hopper robot simulation has been built in the last post. In case you didn’t follow it, you can find the post here.
Part 1
Use OpenAI to make a Hopper robot learn in Gazebo simulator, using ROS Development Studio. We will use Qlearning and Gym for that.
Step 1. Create a training package
Let’s create a package for training
cd ~/simulation_ws/src/loco_motion
catkin_create_pkg my_hopper_training rospy
Then we create a launch file called main.launch inside the my_hopper_training/launch directory with the following content
<!--
Date of creation: 5/II/2018
Application created by: Miguel Angel Rodriguez <duckfrost@theconstructsim.com>
The Construct https://www.theconstruct.ai
License LGPLV3 << Basically means you can do whatever you want with this!
-->
<launch>
<!-- Load the parameters for the algorithm -->
<rosparam command="load" file="$(find my_hopper_training)/config/qlearn_params.yaml" />
<!-- Launch the training system -->
<node pkg="my_hopper_training" name="monoped_gym" type="start_training_v2.py" output="screen"/>
</launch>
To implement reinforcement learning, we’ll use an algorithm called q-learn. We’ll save the parameters for the q-learn algorithm as qlearn_params.yaml under the my_hopper_training/config directory with the following content
# Algortihm Parameters
alpha: 0.1
gamma: 0.8
epsilon: 0.9
epsilon_discount: 0.999 # 1098 eps to reach 0.1
nepisodes: 100000
nsteps: 1000
# Environment Parameters
desired_pose:
x: 0.0
y: 0.0
z: 1.0
desired_force: 7.08 # In Newtons, normal contact force when stanting still with 9.81 gravity
desired_yaw: 0.0 # Desired yaw in radians for the hopper to stay
max_height: 3.0 # in meters
min_height: 0.5 # in meters
max_incl: 1.57 # in rads
running_step: 0.001 # in seconds
joint_increment_value: 0.05 # in radians
done_reward: -1000.0 # reward
alive_reward: 100.0 # reward
weight_r1: 1.0 # Weight for joint positions ( joints in the zero is perfect )
weight_r2: 0.0 # Weight for joint efforts ( no efforts is perfect )
weight_r3: 1.0 # Weight for contact force similar to desired ( weight of monoped )
weight_r4: 1.0 # Weight for orientation ( vertical is perfect )
weight_r5: 1.0 # Weight for distance from desired point ( on the point is perfect )
In this post, we’ll focus on explaining the training script. Let’s create it under the my_hopper_training_src directory and call it start_training_v2.py with the following content
#!/usr/bin/env python
'''
Original Training code made by Ricardo Tellez <rtellez@theconstructsim.com>
Moded by Miguel Angel Rodriguez <duckfrost@theconstructsim.com>
Visit our website at ec2-54-246-60-98.eu-west-1.compute.amazonaws.com
'''
import gym
import time
import numpy
import random
import qlearn
from gym import wrappers
from std_msgs.msg import Float64
# ROS packages required
import rospy
import rospkg
# import our training environment
import monoped_env
if __name__ == '__main__':
rospy.init_node('monoped_gym', anonymous=True, log_level=rospy.INFO)
# Create the Gym environment
env = gym.make('Monoped-v0')
rospy.logdebug ( "Gym environment done")
reward_pub = rospy.Publisher('/monoped/reward', Float64, queue_size=1)
episode_reward_pub = rospy.Publisher('/monoped/episode_reward', Float64, queue_size=1)
# Set the logging system
rospack = rospkg.RosPack()
pkg_path = rospack.get_path('my_hopper_training')
outdir = pkg_path + '/training_results'
env = wrappers.Monitor(env, outdir, force=True)
rospy.logdebug("Monitor Wrapper started")
last_time_steps = numpy.ndarray(0)
# Loads parameters from the ROS param server
# Parameters are stored in a yaml file inside the config directory
# They are loaded at runtime by the launch file
Alpha = rospy.get_param("/alpha")
Epsilon = rospy.get_param("/epsilon")
Gamma = rospy.get_param("/gamma")
epsilon_discount = rospy.get_param("/epsilon_discount")
nepisodes = rospy.get_param("/nepisodes")
nsteps = rospy.get_param("/nsteps")
# Initialises the algorithm that we are going to use for learning
qlearn = qlearn.QLearn(actions=range(env.action_space.n),
alpha=Alpha, gamma=Gamma, epsilon=Epsilon)
initial_epsilon = qlearn.epsilon
start_time = time.time()
highest_reward = 0
# Starts the main training loop: the one about the episodes to do
for x in range(nepisodes):
rospy.loginfo ("STARTING Episode #"+str(x))
cumulated_reward = 0
cumulated_reward_msg = Float64()
episode_reward_msg = Float64()
done = False
if qlearn.epsilon > 0.05:
qlearn.epsilon *= epsilon_discount
# Initialize the environment and get first state of the robot
rospy.logdebug("env.reset...")
# Now We return directly the stringuified observations called state
state = env.reset()
rospy.logdebug("env.get_state...==>"+str(state))
# for each episode, we test the robot for nsteps
for i in range(nsteps):
# Pick an action based on the current state
action = qlearn.chooseAction(state)
# Execute the action in the environment and get feedback
rospy.logdebug("###################### Start Step...["+str(i)+"]")
rospy.logdebug("haa+,haa-,hfe+,hfe-,kfe+,kfe- >> [0,1,2,3,4,5]")
rospy.logdebug("Action to Perform >> "+str(action))
nextState, reward, done, info = env.step(action)
rospy.logdebug("END Step...")
rospy.logdebug("Reward ==> " + str(reward))
cumulated_reward += reward
if highest_reward < cumulated_reward:
highest_reward = cumulated_reward
rospy.logdebug("env.get_state...[distance_from_desired_point,base_roll,base_pitch,base_yaw,contact_force,joint_states_haa,joint_states_hfe,joint_states_kfe]==>" + str(nextState))
# Make the algorithm learn based on the results
qlearn.learn(state, action, reward, nextState)
# We publish the cumulated reward
cumulated_reward_msg.data = cumulated_reward
reward_pub.publish(cumulated_reward_msg)
if not(done):
state = nextState
else:
rospy.logdebug ("DONE")
last_time_steps = numpy.append(last_time_steps, [int(i + 1)])
break
rospy.logdebug("###################### END Step...["+str(i)+"]")
m, s = divmod(int(time.time() - start_time), 60)
h, m = divmod(m, 60)
episode_reward_msg.data = cumulated_reward
episode_reward_pub.publish(episode_reward_msg)
rospy.loginfo( ("EP: "+str(x+1)+" - [alpha: "+str(round(qlearn.alpha,2))+" - gamma: "+str(round(qlearn.gamma,2))+" - epsilon: "+str(round(qlearn.epsilon,2))+"] - Reward: "+str(cumulated_reward)+" Time: %d:%02d:%02d" % (h, m, s)))
rospy.loginfo ( ("\n|"+str(nepisodes)+"|"+str(qlearn.alpha)+"|"+str(qlearn.gamma)+"|"+str(initial_epsilon)+"*"+str(epsilon_discount)+"|"+str(highest_reward)+"| PICTURE |"))
l = last_time_steps.tolist()
l.sort()
rospy.loginfo("Overall score: {:0.2f}".format(last_time_steps.mean()))
rospy.loginfo("Best 100 score: {:0.2f}".format(reduce(lambda x, y: x + y, l[-100:]) / len(l[-100:])))
env.close()
We won’t go into detail to explain the q-learn algorithm. You can find a tutorial here if you are interested. You can simply copy and paste the following code into a file called qlearn.py and put it under the my_hopper_training/src directory
'''
Q-learning approach for different RL problems
as part of the basic series on reinforcement learning @