In this new ROS Project you are going to learn Step-by-Step how to create a robot cube that moves and that it learns to move using OpenAI environment.
In this fourth video we continue we talk about the first script we need to do reinforcement learning with OpenAI. Specifically the main script where we import the Robot Environment that we will define in the next video and we will use Qlearn.
#!/usr/bin/env python
'''
Training code made by Ricardo Tellez <rtellez@theconstructsim.com>
Based on many other examples around Internet
Visit our website at www.theconstruct.ai
'''
import gym
import numpy
import time
import qlearn
from gym import wrappers
# ROS packages required
import rospy
import rospkg
# import our training environment
import old_way_moving_cube_env
if __name__ == '__main__':
rospy.init_node('movingcube_gym', anonymous=True, log_level=rospy.WARN)
# Create the Gym environment
env = gym.make('OldMovingCube-v0')
rospy.loginfo ( "Gym environment done")
# Set the logging system
rospack = rospkg.RosPack()
pkg_path = rospack.get_path('moving_cube_training_pkg')
outdir = pkg_path + '/training_results'
env = wrappers.Monitor(env, outdir, force=True)
rospy.loginfo ( "Monitor Wrapper started")
last_time_steps = numpy.ndarray(0)
# Loads parameters from the ROS param server
# Parameters are stored in a yaml file inside the config directory
# They are loaded at runtime by the launch file
Alpha = rospy.get_param("/moving_cube/alpha")
Epsilon = rospy.get_param("/moving_cube/epsilon")
Gamma = rospy.get_param("/moving_cube/gamma")
epsilon_discount = rospy.get_param("/moving_cube/epsilon_discount")
nepisodes = rospy.get_param("/moving_cube/nepisodes")
nsteps = rospy.get_param("/moving_cube/nsteps")
running_step = rospy.get_param("/moving_cube/running_step")
# Initialises the algorithm that we are going to use for learning
qlearn = qlearn.QLearn(actions=range(env.action_space.n),
alpha=Alpha, gamma=Gamma, epsilon=Epsilon)
initial_epsilon = qlearn.epsilon
start_time = time.time()
highest_reward = 0
# Starts the main training loop: the one about the episodes to do
for x in range(nepisodes):
rospy.logdebug("############### START EPISODE=>" + str(x))
cumulated_reward = 0
done = False
if qlearn.epsilon > 0.05:
qlearn.epsilon *= epsilon_discount
# Initialize the environment and get first state of the robot
observation = env.reset()
state = ''.join(map(str, observation))
episode_time = rospy.get_rostime().to_sec()
# for each episode, we test the robot for nsteps
for i in range(nsteps):
rospy.loginfo("############### Start Step=>"+str(i))
# Pick an action based on the current state
action = qlearn.chooseAction(state)
rospy.loginfo ("Next action is:%d", action)
# Execute the action in the environment and get feedback
observation, reward, done, info = env.step(action)
rospy.loginfo(str(observation) + " " + str(reward))
cumulated_reward += reward
if highest_reward < cumulated_reward:
highest_reward = cumulated_reward
nextState = ''.join(map(str, observation))
# Make the algorithm learn based on the results
rospy.logwarn("############### state we were=>" + str(state))
rospy.logwarn("############### action that we took=>" + str(action))
rospy.logwarn("############### reward that action gave=>" + str(reward))
rospy.logwarn("############### State in which we will start nect step=>" + str(nextState))
qlearn.learn(state, action, reward, nextState)
if not(done):
state = nextState
else:
rospy.loginfo ("DONE")
last_time_steps = numpy.append(last_time_steps, [int(i + 1)])
break
rospy.loginfo("############### END Step=>" + str(i))
#raw_input("Next Step...PRESS KEY")
#rospy.sleep(2.0)
m, s = divmod(int(time.time() - start_time), 60)
h, m = divmod(m, 60)
rospy.logwarn ( ("EP: "+str(x+1)+" - [alpha: "+str(round(qlearn.alpha,2))+" - gamma: "+str(round(qlearn.gamma,2))+" - epsilon: "+str(round(qlearn.epsilon,2))+"] - Reward: "+str(cumulated_reward)+" Time: %d:%02d:%02d" % (h, m, s)))
rospy.loginfo ( ("\n|"+str(nepisodes)+"|"+str(qlearn.alpha)+"|"+str(qlearn.gamma)+"|"+str(initial_epsilon)+"*"+str(epsilon_discount)+"|"+str(highest_reward)+"| PICTURE |"))
l = last_time_steps.tolist()
l.sort()
#print("Parameters: a="+str)
rospy.loginfo("Overall score: {:0.2f}".format(last_time_steps.mean()))
rospy.loginfo("Best 100 score: {:0.2f}".format(reduce(lambda x, y: x + y, l[-100:]) / len(l[-100:])))
env.close()
This the script for training.
From line 42 to line 49, we read the parameters for the q-learn algorithm from the parameter server which we’ll discuss later.
The training process is done from line 59 to line 108. For each step, the algorithm decides which action to take based on the current state, then it measures the observation, decides if the simulation is done and calculates rewards. It will keep learning to optimize the reward it gets.
The implementation of the q-learn algorithm was done by Victor Mayoral Vilches.
'''
Q-learning approach for different RL problems
as part of the basic series on reinforcement learning @
Inspired by https://gym.openai.com/evaluations/eval_kWknKOkPQ7izrixdhriurA
@author: Victor Mayoral Vilches <victor@erlerobotics.com>
'''
import random
class QLearn:
def __init__(self, actions, epsilon, alpha, gamma):
self.q = {}
self.epsilon = epsilon # exploration constant
self.alpha = alpha # discount constant
self.gamma = gamma # discount factor
self.actions = actions
def getQ(self, state, action):
return self.q.get((state, action), 0.0)
def learnQ(self, state, action, reward, value):
'''
Q-learning:
Q(s, a) += alpha * (reward(s,a) + max(Q(s') - Q(s,a))
'''
oldv = self.q.get((state, action), None)
if oldv is None:
self.q[(state, action)] = reward
else:
self.q[(state, action)] = oldv + self.alpha * (value - oldv)
def chooseAction(self, state, return_q=False):
q = [self.getQ(state, a) for a in self.actions]
maxQ = max(q)
if random.random() < self.epsilon:
minQ = min(q); mag = max(abs(minQ), abs(maxQ))
# add random values to all the actions, recalculate maxQ
q = [q[i] + random.random() * mag - .5 * mag for i in range(len(self.actions))]
maxQ = max(q)
count = q.count(maxQ)
# In case there're several state-action max values
# we select a random one among them
if count > 1:
best = [i for i in range(len(self.actions)) if q[i] == maxQ]
i = random.choice(best)
else:
i = q.index(maxQ)
action = self.actions[i]
if return_q: # if they want it, give it!
return action, q
return action
def learn(self, state1, action1, reward, state2):
maxqnew = max([self.getQ(state2, a) for a in self.actions])
self.learnQ(state1, action1, reward, reward + self.gamma*maxqnew)
We put the parameter for the q-learn separately in a file called one_disk_walk_openai_params.yaml under the config folder, so you can tweak the parameters much easier.
moving_cube: #namespace
running_step: 0.04 # amount of time the control will be executed
pos_step: 0.016 # increment in position for each command
#qlearn parameters
alpha: 0.1
gamma: 0.7
epsilon: 0.9
epsilon_discount: 0.999
nepisodes: 500
nsteps: 1000
number_splits: 10 #set to change the number of state splits for the continuous problem and also the number of env_variable splits
running_step: 0.06 # Time for each step
wait_time: 0.1 # Time to wait in the reset phases
n_actions: 5 # We have 3 actions
speed_step: 1.0 # Time to wait in the reset phases
init_roll_vel: 0.0 # Initial speed of the Roll Disk
roll_speed_fixed_value: 100.0 # Speed at which it will move forwards or backwards
roll_speed_increment_value: 10.0 # Increment that could be done in each step
max_distance: 2.0 # Maximum distance allowed for the RobotCube
max_pitch_angle: 0.2 # Maximum Angle radians in Pitch that we allow before terminating episode
max_yaw_angle: 0.1 # Maximum yaw angle deviation, after that it starts getting negative rewards
init_cube_pose:
x: 0.0
y: 0.0
z: 0.0
end_episode_points: 1000 # Points given when ending an episode
move_distance_reward_weight: 1000.0 # Multiplier for the moved distance reward, Ex: inc_d = 0.1 --> 100points
y_linear_speed_reward_weight: 1000.0 # Multiplier for moving fast in the y Axis
y_axis_angle_reward_weight: 1000.0 # Multiplier of angle of yaw, to keep it straight
It might take a lot of time to tune the parameters. In ROSDS, we offer the gym computer feature to help you run training with different parameters parallelly. If you are interested, please check our paid program.
If you want to learn more applications with OpenAI in ROS, please check our OpenAI course in the robot ignite academy.
Edit by Tony Huang.
[irp posts=”10198″ name=”ROS Projects OpenAI with Moving Cube Robot in Gazebo Step-by-Step #Part5″]
In this video we are going to set a .STL mesh file to one of the links of our robot using its URDF code. From the URDF model used in the previous videos, we are going to define a new XML MACRO to use mesh files for a given link. Up to the end of the video, we will be able to see the mesh in RViz and Gazebo simulator.
Please make sure the meshscale is correct for your .stl file. In the simulation, the scale is in m. We change the meshscale to 0.001 because the stl file is done on the scale of mm.
Step 2. Modify the launch file
Since we only use one joint, we’ll only need the controller for one joint. Let’s change this part in the /launch/spawn.launch
In this new ROS Project you are going to learn Step-by-Step how to create a moving cube and that it learns to move using OpenAI environment.
In this third video we continue with previous video setting up all the basics needed for moving the cube and getting sensor data for the reward and done functions
In the last post, we’ve briefly walked through the last part of cube_rl_utils.py which tests the robot. Today, let’s dive deeper into the code.
#!/usr/bin/env python
import time
import rospy
import math
import copy
import numpy
from std_msgs.msg import Float64
from sensor_msgs.msg import JointState
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Point
from tf.transformations import euler_from_quaternion
class CubeRLUtils(object):
def __init__(self):
self.check_all_sensors_ready()
rospy.Subscriber("/moving_cube/joint_states", JointState, self.joints_callback)
rospy.Subscriber("/moving_cube/odom", Odometry, self.odom_callback)
self._roll_vel_pub = rospy.Publisher('/moving_cube/inertia_wheel_roll_joint_velocity_controller/command', Float64, queue_size=1)
self.check_publishers_connection()
def check_all_sensors_ready(self):
self.disk_joints_data = None
while self.disk_joints_data is None and not rospy.is_shutdown():
try:
self.disk_joints_data = rospy.wait_for_message("/moving_cube/joint_states", JointState, timeout=1.0)
rospy.loginfo("Current moving_cube/joint_states READY=>"+str(self.disk_joints_data))
except:
rospy.logerr("Current moving_cube/joint_states not ready yet, retrying for getting joint_states")
self.cube_odom_data = None
while self.disk_joints_data is None and not rospy.is_shutdown():
try:
self.cube_odom_data = rospy.wait_for_message("/moving_cube/odom", Odometry, timeout=1.0)
rospy.loginfo("Current /moving_cube/odom READY=>" + str(self.cube_odom_data))
except:
rospy.logerr("Current /moving_cube/odom not ready yet, retrying for getting odom")
rospy.loginfo("ALL SENSORS READY")
def check_publishers_connection(self):
"""
Checks that all the publishers are working
:return:
"""
rate = rospy.Rate(10) # 10hz
while (self._roll_vel_pub.get_num_connections() == 0 and not rospy.is_shutdown()):
rospy.loginfo("No susbribers to _roll_vel_pub yet so we wait and try again")
try:
rate.sleep()
except rospy.ROSInterruptException:
# This is to avoid error when world is rested, time when backwards.
pass
rospy.loginfo("_base_pub Publisher Connected")
rospy.loginfo("All Publishers READY")
def joints_callback(self, data):
self.joints = data
def odom_callback(self, data):
self.odom = data
...
The first part of the script is nothing special but creating publisher, subscriber and checking all the sensor.
# Reinforcement Learning Utility Code
def move_joints(self, roll_speed):
joint_speed_value = Float64()
joint_speed_value.data = roll_speed
rospy.loginfo("Single Disk Roll Velocity>>"+str(joint_speed_value))
self._roll_vel_pub.publish(joint_speed_value)
def get_cube_state(self):
# We convert from quaternions to euler
orientation_list = [self.odom.pose.pose.orientation.x,
self.odom.pose.pose.orientation.y,
self.odom.pose.pose.orientation.z,
self.odom.pose.pose.orientation.w]
roll, pitch, yaw = euler_from_quaternion(orientation_list)
# We get the distance from the origin
start_position = Point()
start_position.x = 0.0
start_position.y = 0.0
start_position.z = 0.0
distance = self.get_distance_from_point(start_position,
self.odom.pose.pose.position)
cube_state = [
round(self.joints.velocity[0],1),
round(distance,1),
round(roll,1),
round(pitch,1),
round(yaw,1)
]
return cube_state
def observation_checks(self, cube_state):
# MAximum distance to travel permited in meters from origin
max_distance=2.0
if (cube_state[1] > max_distance):
rospy.logerr("Cube Too Far==>"+str(cube_state[1]))
done = True
else:
rospy.loginfo("Cube NOT Too Far==>"+str(cube_state[1]))
done = False
return done
def get_distance_from_point(self, pstart, p_end):
"""
Given a Vector3 Object, get distance from current position
:param p_end:
:return:
"""
a = numpy.array((pstart.x, pstart.y, pstart.z))
b = numpy.array((p_end.x, p_end.y, p_end.z))
distance = numpy.linalg.norm(a - b)
return distance
def get_reward_for_observations(self, state):
# We reward it for lower speeds and distance traveled
speed = state[0]
distance = state[1]
# Positive Reinforcement
reward_distance = distance * 10.0
# Negative Reinforcement for magnitude of speed
reward_for_efective_movement = -1 * abs(speed)
reward = reward_distance + reward_for_efective_movement
rospy.loginfo("Reward_distance="+str(reward_distance))
rospy.loginfo("Reward_for_efective_movement= "+str(reward_for_efective_movement))
return reward
The second part is much more important. It prepares the elements for the reinforcement learning algorithm.
In the get_cube_state() function. We converted the sensor reading to cube state. We chose the joint velocity, distance, roll, pitch, yaw as the state. To get the roll, pitch, and yaw, we have to convert the odom from the quaternion to Euler angle.
We check if the simulation is done in the observation_checks() function and calculate reward in the get_reward_for_observations() function based on the distance the robot moved.
You can play with different parameters in the script to achieve a better reward. In the future, we will automate the learning process with the reinforcement learning algorithm.
Edit by: Tony Huang
[irp posts=”10079″ name=”ROS Projects OpenAI with Moving Cube Robot in Gazebo Step-by-Step #Part4″]
In this new ROS Project you are going to learn Step-by-Step how to create a moving cube and that it learns to move using OpenAI environment.
This second video is for learning the creation the basics of Reinforcement learning and how to connect to the various systems of the robot to get the state, perform actions and calculate rewards.
If you didn’t follow up, please check the link below for the last post.
[irp posts=”9744″ name=”[ROS Projects] OpenAI with Moving Cube Robot in Gazebo Step-by-Step Part1″]
Step 1. Clone the simulation
In order to make sure we have the same project. Please run the following command
cd ~/simulation_ws/src
git clone https://bitbucket.org/theconstructcore/moving_cube.git
NOTICE: please delete the previous code if you have problems to compile the code
In the /moving_cube/moving_cube_description/urdf/moving_cube.urdf file, please uncomment the following part. We’ll need this part to publish the odom topic.
To train the robot, let’s create a package for training under the catkin_ws/src
cd ~/catkin_ws/src
catkin_create_pkg my_moving_cube_traning_pkg rospy
Then we’ll create a script folder inside the package and put a file called cube_rl_utils.py inside it with the following content
#!/usr/bin/env python
import time
import rospy
import math
import copy
import numpy
from std_msgs.msg import Float64
from sensor_msgs.msg import JointState
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Point
from tf.transformations import euler_from_quaternion
class CubeRLUtils(object):
def __init__(self):
self.check_all_sensors_ready()
rospy.Subscriber("/moving_cube/joint_states", JointState, self.joints_callback)
rospy.Subscriber("/moving_cube/odom", Odometry, self.odom_callback)
self._roll_vel_pub = rospy.Publisher('/moving_cube/inertia_wheel_roll_joint_velocity_controller/command', Float64, queue_size=1)
self.check_publishers_connection()
def check_all_sensors_ready(self):
self.disk_joints_data = None
while self.disk_joints_data is None and not rospy.is_shutdown():
try:
self.disk_joints_data = rospy.wait_for_message("/moving_cube/joint_states", JointState, timeout=1.0)
rospy.loginfo("Current moving_cube/joint_states READY=>"+str(self.disk_joints_data))
except:
rospy.logerr("Current moving_cube/joint_states not ready yet, retrying for getting joint_states")
self.cube_odom_data = None
while self.disk_joints_data is None and not rospy.is_shutdown():
try:
self.cube_odom_data = rospy.wait_for_message("/moving_cube/odom", Odometry, timeout=1.0)
rospy.loginfo("Current /moving_cube/odom READY=>" + str(self.cube_odom_data))
except:
rospy.logerr("Current /moving_cube/odom not ready yet, retrying for getting odom")
rospy.loginfo("ALL SENSORS READY")
def check_publishers_connection(self):
"""
Checks that all the publishers are working
:return:
"""
rate = rospy.Rate(10) # 10hz
while (self._roll_vel_pub.get_num_connections() == 0 and not rospy.is_shutdown()):
rospy.loginfo("No susbribers to _roll_vel_pub yet so we wait and try again")
try:
rate.sleep()
except rospy.ROSInterruptException:
# This is to avoid error when world is rested, time when backwards.
pass
rospy.loginfo("_base_pub Publisher Connected")
rospy.loginfo("All Publishers READY")
def joints_callback(self, data):
self.joints = data
def odom_callback(self, data):
self.odom = data
# Reinforcement Learning Utility Code
def move_joints(self, roll_speed):
joint_speed_value = Float64()
joint_speed_value.data = roll_speed
rospy.loginfo("Single Disk Roll Velocity>>"+str(joint_speed_value))
self._roll_vel_pub.publish(joint_speed_value)
def get_cube_state(self):
# We convert from quaternions to euler
orientation_list = [self.odom.pose.pose.orientation.x,
self.odom.pose.pose.orientation.y,
self.odom.pose.pose.orientation.z,
self.odom.pose.pose.orientation.w]
roll, pitch, yaw = euler_from_quaternion(orientation_list)
# We get the distance from the origin
start_position = Point()
start_position.x = 0.0
start_position.y = 0.0
start_position.z = 0.0
distance = self.get_distance_from_point(start_position,
self.odom.pose.pose.position)
cube_state = [
round(self.joints.velocity[0],1),
round(distance,1),
round(roll,1),
round(pitch,1),
round(yaw,1)
]
return cube_state
def observation_checks(self, cube_state):
# MAximum distance to travel permited in meters from origin
max_distance=2.0
if (cube_state[1] > max_distance):
rospy.logerr("Cube Too Far==>"+str(cube_state[1]))
done = True
else:
rospy.loginfo("Cube NOT Too Far==>"+str(cube_state[1]))
done = False
return done
def get_distance_from_point(self, pstart, p_end):
"""
Given a Vector3 Object, get distance from current position
:param p_end:
:return:
"""
a = numpy.array((pstart.x, pstart.y, pstart.z))
b = numpy.array((p_end.x, p_end.y, p_end.z))
distance = numpy.linalg.norm(a - b)
return distance
def get_reward_for_observations(self, state):
# We reward it for lower speeds and distance traveled
speed = state[0]
distance = state[1]
# Positive Reinforcement
reward_distance = distance * 10.0
# Negative Reinforcement for magnitude of speed
reward_for_efective_movement = -1 * abs(speed)
reward = reward_distance + reward_for_efective_movement
rospy.loginfo("Reward_distance="+str(reward_distance))
rospy.loginfo("Reward_for_efective_movement= "+str(reward_for_efective_movement))
return reward
def cube_rl_systems_test():
rospy.init_node('cube_rl_systems_test_node', anonymous=True, log_level=rospy.INFO)
cube_rl_utils_object = CubeRLUtils()
rospy.loginfo("Moving to Speed==>80")
cube_rl_utils_object.move_joints(roll_speed=80.0)
time.sleep(2)
rospy.loginfo("Moving to Speed==>-80")
cube_rl_utils_object.move_joints(roll_speed=-80.0)
time.sleep(2)
rospy.loginfo("Moving to Speed==>0.0")
cube_rl_utils_object.move_joints(roll_speed=0.0)
time.sleep(2)
cube_state = cube_rl_utils_object.get_cube_state()
done = cube_rl_utils_object.observation_checks(cube_state)
reward = cube_rl_utils_object.get_reward_for_observations(cube_state)
rospy.loginfo("Done==>"+str(done))
rospy.loginfo("Reward==>"+str(reward))
if __name__ == "__main__":
cube_rl_systems_test()
In this post, we’ll focus on the cube_rl_systems_test() function. The function uses the class to move the cube, get the observation, calculate reward and check if it’s done. To run it, you have to run the simulation first. Please go to Simulations->Select launch file-> main.launch
NOTICE: You have to unpause the simulation by clicking the arrow key in the simulation window
Then you can run the following command to run the script
cd ~/catkin_ws/src/my_moving_cube_training_pkg/script
chmod +x cube_rl_utils.py
cd ~/catkin_ws
source devel_setup.bash
rosrun my_moving_cube_training_pkg cube_rl_utils.py
You should see the cube moving around and the reward and the done state is calculated.
Edit by: Tony Huang
[irp posts=”9976″ name=”ROS Projects OpenAI with Moving Cube Robot in Gazebo Step-by-Step Part3″]
Step 1. Create a project in ROS Development Studio(ROSDS)
We’ll use ROSDS through this project in order to avoid setting up the environment, manage packages and etc. You can create a free account here if you haven’t had an account yet.
Step 2. Create package
Since this is a simulation, let’s create a package called my_moving_cube_description under the simulation_ws.
cd ~/simulation_ws/src
catkin_create_pkg my_moving_cube_description rospy
We’ll start by building the URDF description of the robot. To do that, we’ll create a new folder called urdf under the my_moving_cube_description directory and create a file called my_moving_cube.urdf inside it with the following initial content. The robot tag indicates the name of the robot – my_moving_cube.
<robot name="my_moving_cube">
...
</robot>
Then let’s create the first link inside the robot. This includes 3 parts :
inertial: It defines the physical property of the link. You can calculate the inertia of an object by using this tool: rosrun spawn_robot_tools_pkg inertial_calculator.py
collision: It defines the collision property when the object interacts with other objects in the simulation.
visual: It defines the visual property, how the object will visually show in the simulation.
You also need to define the material property after the link if you want to use it in gazebo. (NOTICE: the reference of the material property should have the same name as the link)
Then we create a moving_cube.yaml file under the my_moving_cube_description/config to define parameters for the controller
# .yaml config file
#
# The PID gains and controller settings must be saved in a yaml file that gets loaded
# to the param server via the roslaunch file (moving_cube_control.launch).
my_moving_cube:
# Publish all joint states -----------------------------------
# Creates the /joint_states topic necessary in ROS
joint_state_controller:
type: joint_state_controller/JointStateController
publish_rate: 30
# Effort Controllers ---------------------------------------
inertia_wheel_roll_joint_velocity_controller:
type: effort_controllers/JointVelocityController
joint: inertia_wheel_roll_joint
pid: {p: 1.0, i: 0.0, d: 0.0}
In the end, you should create one new launch file called moving_cube control.launch under the launch folder to launch the controller
This is a series of posts. If you didn’t follow up, you can find the previous post here. In this 5th video of the robotic manipulator series, we will expand the ROS controllers to all joints of our robot using XACRO. At the end of the video we’ll have a full controlled robot through ROS topics. We are also going to use RQT Publisher and RQT Reconfigure to do some experiments with the robot.
Step 0. Create a project in ROS Development Studio(ROSDS)
ROSDS helps you follow our tutorial in a fast pace without dealing without setting up an environment locally. If you haven’t had an account yet, you can create a free account here. We’ll start the project for the previous video – manipulator_video_no4.
Step 1. Configure controller
In order to use controllers for our robot. The first step is we need to add transmission definitions for each joint in the links_joints.xarco and add limitation for each joint in the mrm.xarco file. The most important thing is to add the gazebo plugin in the mrm.xarco file.
Did you like the video? If you did please give us a thumbs up and remember to subscribe to our channel and press the bell for a new video every day. Either you like it or not, please share your thoughts and questions in the comments area. See you!