In this new ROS Project you are going to learn Step-by-Step how to create a moving cube and that it learns to move using OpenAI environment.
In this third video we continue with previous video setting up all the basics needed for moving the cube and getting sensor data for the reward and done functions
ROS Development Studio
OpenAi Course
Moving Cube Git:https://bitbucket.org/theconstructcore/moving_cube/src/master/
Moving Cube Training AI Git: https://bitbucket.org/theconstructcore/moving_cube_training/src/master/
Video On Quaternions to Euler:
https://www.theconstruct.ai/ros-qa-how-to-convert-quaternions-to-euler-angles/
This is the third post of this series. If you didn’t follow up, please check the link below.
https://www.theconstruct.ai/ros-projects-openai-moving-cube-robot-gazebo-step-step-part2/
Step 1. cube_rl_utils.py
In the last post, we’ve briefly walked through the last part of cube_rl_utils.py which tests the robot. Today, let’s dive deeper into the code.
#!/usr/bin/env python
import time
import rospy
import math
import copy
import numpy
from std_msgs.msg import Float64
from sensor_msgs.msg import JointState
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Point
from tf.transformations import euler_from_quaternion
class CubeRLUtils(object):
    def __init__(self):
        self.check_all_sensors_ready()
        
        rospy.Subscriber("/moving_cube/joint_states", JointState, self.joints_callback)
        rospy.Subscriber("/moving_cube/odom", Odometry, self.odom_callback)
        self._roll_vel_pub = rospy.Publisher('/moving_cube/inertia_wheel_roll_joint_velocity_controller/command', Float64, queue_size=1)
        
        self.check_publishers_connection()
        
    
    def check_all_sensors_ready(self):
        self.disk_joints_data = None
        while self.disk_joints_data is None and not rospy.is_shutdown():
            try:
                self.disk_joints_data = rospy.wait_for_message("/moving_cube/joint_states", JointState, timeout=1.0)
                rospy.loginfo("Current moving_cube/joint_states READY=>"+str(self.disk_joints_data))
            except:
                rospy.logerr("Current moving_cube/joint_states not ready yet, retrying for getting joint_states")
        self.cube_odom_data = None
        while self.disk_joints_data is None and not rospy.is_shutdown():
            try:
                self.cube_odom_data = rospy.wait_for_message("/moving_cube/odom", Odometry, timeout=1.0)
                rospy.loginfo("Current /moving_cube/odom READY=>" + str(self.cube_odom_data))
            except:
                rospy.logerr("Current /moving_cube/odom not ready yet, retrying for getting odom")
        rospy.loginfo("ALL SENSORS READY")
        
    def check_publishers_connection(self):
        """
        Checks that all the publishers are working
        :return:
        """
        rate = rospy.Rate(10)  # 10hz
        while (self._roll_vel_pub.get_num_connections() == 0 and not rospy.is_shutdown()):
            rospy.loginfo("No susbribers to _roll_vel_pub yet so we wait and try again")
            try:
                rate.sleep()
            except rospy.ROSInterruptException:
                # This is to avoid error when world is rested, time when backwards.
                pass
        rospy.loginfo("_base_pub Publisher Connected")
        rospy.loginfo("All Publishers READY")
        
    def joints_callback(self, data):
        self.joints = data
    def odom_callback(self, data):
        self.odom = data
...
The first part of the script is nothing special but creating publisher, subscriber and checking all the sensor.
    # Reinforcement Learning Utility Code
    def move_joints(self, roll_speed):
        
        joint_speed_value = Float64()
        joint_speed_value.data = roll_speed
        rospy.loginfo("Single Disk Roll Velocity>>"+str(joint_speed_value))
        self._roll_vel_pub.publish(joint_speed_value)
        
    def get_cube_state(self):
        
        # We convert from quaternions to euler
        orientation_list = [self.odom.pose.pose.orientation.x,
                            self.odom.pose.pose.orientation.y,
                            self.odom.pose.pose.orientation.z,
                            self.odom.pose.pose.orientation.w]
                            
        roll, pitch, yaw = euler_from_quaternion(orientation_list)
        
        # We get the distance from the origin
        start_position = Point()
        start_position.x = 0.0
        start_position.y = 0.0
        start_position.z = 0.0
        
        distance = self.get_distance_from_point(start_position,
                                                self.odom.pose.pose.position)
        
        cube_state = [
                        round(self.joints.velocity[0],1),
                        round(distance,1),
                        round(roll,1),
                        round(pitch,1),
                        round(yaw,1)
                    ]
        
        return cube_state
    
    def observation_checks(self, cube_state):
        
        
        # MAximum distance to travel permited in meters from origin
        max_distance=2.0
        
        if (cube_state[1] > max_distance): 
            rospy.logerr("Cube Too Far==>"+str(cube_state[1]))
            done = True
        else:
            rospy.loginfo("Cube NOT Too Far==>"+str(cube_state[1]))
            done = False
        
        return done
    def get_distance_from_point(self, pstart, p_end):
        """
        Given a Vector3 Object, get distance from current position
        :param p_end:
        :return:
        """
        a = numpy.array((pstart.x, pstart.y, pstart.z))
        b = numpy.array((p_end.x, p_end.y, p_end.z))
        distance = numpy.linalg.norm(a - b)
        return distance
    def get_reward_for_observations(self, state):
        # We reward it for lower speeds and distance traveled
        
        speed = state[0]
        distance = state[1]
        # Positive Reinforcement
        reward_distance = distance * 10.0
        # Negative Reinforcement for magnitude of speed
        reward_for_efective_movement = -1 * abs(speed) 
        
        reward = reward_distance + reward_for_efective_movement
        rospy.loginfo("Reward_distance="+str(reward_distance))
        rospy.loginfo("Reward_for_efective_movement= "+str(reward_for_efective_movement))
        
        return reward
The second part is much more important. It prepares the elements for the reinforcement learning algorithm.
In the get_cube_state() function. We converted the sensor reading to cube state. We chose the joint velocity, distance, roll, pitch, yaw as the state. To get the roll, pitch, and yaw, we have to convert the odom from the quaternion to Euler angle.
We check if the simulation is done in the observation_checks() function and calculate reward in the get_reward_for_observations() function based on the distance the robot moved.
You can play with different parameters in the script to achieve a better reward. In the future, we will automate the learning process with the reinforcement learning algorithm.
Edit by: Tony Huang
[irp posts=”10079″ name=”ROS Projects OpenAI with Moving Cube Robot in Gazebo Step-by-Step #Part4″]
 
				![[ROS-Projects]-OpenAI-with-Moving-Cube-Robot-in-Gazebo-Step-by-Step-Part3 [ROS-Projects]-OpenAI-with-Moving-Cube-Robot-in-Gazebo-Step-by-Step-Part3](https://www.theconstruct.ai/wp-content/uploads/2018/07/ROS-Projects-OpenAI-with-Moving-Cube-Robot-in-Gazebo-Step-by-Step-Part3.png)




0 Comments
Trackbacks/Pingbacks