[Clearpath Jackal] Robot, problem with syncing rostopics readings

I have had access to their ClearPath jackal robot that contains a Stereo Camera setup. I have an Artificial Intelligence model for Odometry problem, that expects two consecutive images from each stereo camera and the corresponding IMU measurements between them, and the pose odometry of the robot. I am writing to you since I have encountered the following challenges:

  1. I have been working with ApproximateTimeSynchronizer of message_filters to get the sync measurements, but unfortunately the right camera can only be synced with the left camera, to solve this issue I used a cache for other measurements wwhich later in tthe callback I will try to get tthe measurements by calling getInterval function. Isn’t there any other way to get all the topics in ApproximateTimeSynchronizer?

  2. My model outputs a 6D pose estimation, is there any way to transform the (from nav_msgs.msg import Odometry) readings to 6D back and forth?

  3. The IMU readings between the two consecutive images that I get from have various length, from 3 to 258 readings, is there any way to have a more stable readings?

the following is my code:

    class SaveData(object):
        def __init__(self):
            self.i = 0
            self.last_stamp = rospy.get_rostime()

            self.trajectory_folder = 'fifth-floor3'
            self.save_dir = f'{dir}/{self.trajectory_folder}'  # Specify the directory to save the poses and images

            
        # Define the subscribers for the two odometry topics
            left_image_subscriber = message_filters.Subscriber('/zed_node/left/image_rect_color', Image)
            right_image_subscriber = message_filters.Subscriber('/zed_node/right/image_rect_color', Image)
            pose_subscriber = message_filters.Subscriber('/zed_node/odom', Odometry)
            self.pose_cache = message_filters.Cache(pose_subscriber, cache_size=20000)

            imu_subscriber = message_filters.Subscriber('/imu/data', Imu)
            self.imu_cache = message_filters.Cache(imu_subscriber, cache_size=20000)

            time_synchronizer = message_filters.ApproximateTimeSynchronizer([ left_image_subscriber
                                                                             , right_image_subscriber
                                                                             ]
                                                                             #, self.cache]
                                                                             , 10, 1
                                                                             )#, allow_headerless=True)

            time_synchronizer.registerCallback(self.callback)

            

        def callback(self
                     , left_image : Image
                     , right_image: Image
                     # , imu
                     # , pose
                     ):

        # Process the synchronized odometry data
            # print(rospy.get_rostime())

            # rospy.loginfo("Received synchronized image data:")

            if (self.last_stamp <= left_image.header.stamp):
                imu_len = len(self.imu_cache.getInterval(self.last_stamp
                                                , left_image.header.stamp))

                if(imu_len >= 11):

                    self.save_image_data(left_image, f'{self.save_dir}/left_images')
                    self.save_image_data(right_image, f'{self.save_dir}/right_images')
                # self.save_pose(pose)
                    if (self.i > 0):
                        interval = [self.last_stamp, left_image.header.stamp]
                        self.save_imu(interval)
                        self.save_pose(interval)


                    print(self.i)

                    self.last_stamp = left_image.header.stamp
                    self.i+=1
            elif(self.i == 0):
                self.save_image_data(left_image, f'{self.save_dir}/left_images')
                self.save_image_data(right_image, f'{self.save_dir}/right_images')

                self.last_stamp = left_image.header.stamp


        def save_image_data(self, msg, dir):
            image_np = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width, -1))
            image_np = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)


            # print(rospy.get_rostime())

            image_filename = os.path.join(dir, f"{self.i}.jpg")

            if not cv2.imwrite(image_filename, image_np):
                raise Exception("Could not write image")
            
        def save_imu(self, interval):

            imu_list = self.imu_cache.getInterval(interval[0], interval[1])

            imu_df = [self.get_df_imu(json) for json in imu_list]
            imu_df = pd.concat(imu_df)
            imu_df.to_csv(f'{self.save_dir}/imu/{self.i}.csv')
            
        def get_df_imu(self, imu_msg: Imu):
            
            return pd.DataFrame([[
                            imu_msg.header.seq,
                            imu_msg.header.stamp.to_sec(),
                            imu_msg.header.frame_id,
                            imu_msg.orientation.x,
                            imu_msg.orientation.y,
                            imu_msg.orientation.z,
                            imu_msg.orientation.w,
                            imu_msg.orientation_covariance,
                            imu_msg.angular_velocity.x,
                            imu_msg.angular_velocity.y,
                            imu_msg.angular_velocity.z,
                            imu_msg.angular_velocity_covariance,
                            imu_msg.linear_acceleration.x,
                            imu_msg.linear_acceleration.y,
                            imu_msg.linear_acceleration.z,
                            imu_msg.linear_acceleration_covariance
                        ]], columns= ['seq', 'stamp', 'frame_id',
                              'orientation_x', 'orientation_y', 'orientation_z', 'orientation_w',
                              'orientation_covariance',
                              'angular_velocity_x', 'angular_velocity_y', 'angular_velocity_z',
                              'angular_velocity_covariance',
                              'linear_acceleration_x', 'linear_acceleration_y', 'linear_acceleration_z',
                              'linear_acceleration_covariance'])
            
        def save_pose(self, interval):
            interval = self.pose_cache.getInterval(interval[0], interval[1])
            
            pose_df = [self.get_df_pose(json) for json in interval]
            pose_df = pd.concat(pose_df)
            pose_df.to_csv(f'{self.save_dir}/gt/{self.i}.csv')


        def get_df_pose(self, pose_msg: Odometry) :

            
            return pd.DataFrame([[pose_msg.header.stamp.secs,
                            pose_msg.header.stamp.nsecs,
                            pose_msg.pose.pose,
                            pose_msg.pose.covariance,
                            pose_msg.twist.twist,
                            pose_msg.twist.covariance]],
                                columns=['secs', 'nsecs',
                                         'pose', 'pose_cov',
                                         'twist', 'twist_cov'])
            
            

        def print_pose(self):
            print(self.poses1)
            print(self.poses2)
       

    if __name__ == '__main__':
        rospy.init_node("save_poses_node", log_level=rospy.INFO)
        node = SaveData()
        
        # Register shutdown callback to save pose data and visualize poses on node exit
        rospy.spin()
        
        rospy.on_shutdown(node.save_image_data)

Dear @r.doorashi ,

I hope that you are doing well.

Regarding the issues that you are facing, I would suggest the following solution:

The ApproximateTimeSynchronizer in ROS is primarily designed to synchronize a fixed set of topics. To synchronize more topics, a common approach is to use a combination of synchronizers or to implement a custom callback. Here’s an example using message_filters to sync multiple topics:

from message_filters import Subscriber, ApproximateTimeSynchronizer

class SaveData:
    def __init__(self):
        self.i = 0
        self.last_stamp = rospy.get_rostime()

        self.trajectory_folder = 'fifth-floor3'
        self.save_dir = f'{dir}/{self.trajectory_folder}'

        left_image_subscriber = Subscriber('/zed_node/left/image_rect_color', Image)
        right_image_subscriber = Subscriber('/zed_node/right/image_rect_color', Image)
        pose_subscriber = Subscriber('/zed_node/odom', Odometry)
        imu_subscriber = Subscriber('/imu/data', Imu)

        ats = ApproximateTimeSynchronizer([left_image_subscriber, right_image_subscriber, pose_subscriber, imu_subscriber], queue_size=10, slop=0.1)
        ats.registerCallback(self.callback)

    def callback(self, left_image, right_image, pose, imu):
        # Your callback code here

Using this method, all your topics should be synchronized in a single callback. This eliminates the need for manual cache and interval fetching.

To convert nav_msgs/Odometry to a 6D pose (position and orientation), you can extract the position and quaternion from the odometry message:

from tf.transformations import euler_from_quaternion

def odometry_to_6d(odom_msg):
    position = odom_msg.pose.pose.position
    orientation_q = odom_msg.pose.pose.orientation
    orientation_list = [orientation_q.x, orientation_q.y, orientation_q.z, orientation_q.w]
    roll, pitch, yaw = euler_from_quaternion(orientation_list)
    return [position.x, position.y, position.z, roll, pitch, yaw]

IMU data can indeed vary in frequency. One way to handle this is to downsample or interpolate the IMU readings to match the frequency of your images. Here’s an example using linear interpolation:

import numpy as np
from scipy.interpolate import interp1d

def interpolate_imu(imu_data, start_time, end_time, target_length):
    times = [imu.header.stamp.to_sec() for imu in imu_data]
    data = np.array([[imu.linear_acceleration.x, imu.linear_acceleration.y, imu.linear_acceleration.z] for imu in imu_data])

    interpolated_times = np.linspace(start_time, end_time, target_length)
    interp_func = interp1d(times, data, axis=0, fill_value="extrapolate")
    interpolated_data = interp_func(interpolated_times)
    
    return interpolated_data

You can call this function to interpolate your IMU data to a fixed length before saving it.

Here is your updated code incorporating the above suggestions:

import rospy
from sensor_msgs.msg import Image, Imu
from nav_msgs.msg import Odometry
import message_filters
import numpy as np
import pandas as pd
import os
import cv2
from tf.transformations import euler_from_quaternion

class SaveData(object):
    def __init__(self):
        self.i = 0
        self.last_stamp = rospy.get_rostime()

        self.trajectory_folder = 'fifth-floor3'
        self.save_dir = f'{dir}/{self.trajectory_folder}' 

        left_image_subscriber = message_filters.Subscriber('/zed_node/left/image_rect_color', Image)
        right_image_subscriber = message_filters.Subscriber('/zed_node/right/image_rect_color', Image)
        pose_subscriber = message_filters.Subscriber('/zed_node/odom', Odometry)
        imu_subscriber = message_filters.Subscriber('/imu/data', Imu)

        time_synchronizer = message_filters.ApproximateTimeSynchronizer([left_image_subscriber, right_image_subscriber, pose_subscriber, imu_subscriber], 10, 0.1)
        time_synchronizer.registerCallback(self.callback)

    def callback(self, left_image, right_image, pose, imu):
        if self.last_stamp <= left_image.header.stamp:
            imu_data = self.get_imu_data(self.last_stamp, left_image.header.stamp)
            if len(imu_data) >= 11:
                self.save_image_data(left_image, f'{self.save_dir}/left_images')
                self.save_image_data(right_image, f'{self.save_dir}/right_images')
                if self.i > 0:
                    interval = [self.last_stamp, left_image.header.stamp]
                    self.save_imu(interval)
                    self.save_pose(interval)
                self.last_stamp = left_image.header.stamp
                self.i += 1
            elif self.i == 0:
                self.save_image_data(left_image, f'{self.save_dir}/left_images')
                self.save_image_data(right_image, f'{self.save_dir}/right_images')
                self.last_stamp = left_image.header.stamp

    def get_imu_data(self, start_time, end_time):
        imu_list = self.imu_cache.getInterval(start_time, end_time)
        return interpolate_imu(imu_list, start_time.to_sec(), end_time.to_sec(), len(imu_list))

    def save_image_data(self, msg, dir):
        image_np = np.frombuffer(msg.data, dtype=np.uint8).reshape((msg.height, msg.width, -1))
        image_np = cv2.cvtColor(image_np, cv2.COLOR_RGB2BGR)
        image_filename = os.path.join(dir, f"{self.i}.jpg")
        if not cv2.imwrite(image_filename, image_np):
            raise Exception("Could not write image")

    def save_imu(self, interval):
        imu_list = self.imu_cache.getInterval(interval[0], interval[1])
        imu_df = [self.get_df_imu(imu) for imu in imu_list]
        imu_df = pd.concat(imu_df)
        imu_df.to_csv(f'{self.save_dir}/imu/{self.i}.csv')

    def get_df_imu(self, imu_msg: Imu):
        return pd.DataFrame([[
            imu_msg.header.seq,
            imu_msg.header.stamp.to_sec(),
            imu_msg.header.frame_id,
            imu_msg.orientation.x,
            imu_msg.orientation.y,
            imu_msg.orientation.z,
            imu_msg.orientation.w,
            imu_msg.orientation_covariance,
            imu_msg.angular_velocity.x,
            imu_msg.angular_velocity.y,
            imu_msg.angular_velocity.z,
            imu_msg.angular_velocity_covariance,
            imu_msg.linear_acceleration.x,
            imu_msg.linear_acceleration.y,
            imu_msg.linear_acceleration.z,
            imu_msg.linear_acceleration_covariance
        ]], columns=[
            'seq', 'stamp', 'frame_id',
            'orientation_x', 'orientation_y', 'orientation_z', 'orientation_w',
            'orientation_covariance',
            'angular_velocity_x', 'angular_velocity_y', 'angular_velocity_z',
            'angular_velocity_covariance',
            'linear_acceleration_x', 'linear_acceleration_y', 'linear_acceleration_z',
            'linear_acceleration_covariance'
        ])

    def save_pose(self, interval):
        pose_list = self.pose_cache.getInterval(interval[0], interval[1])
        pose_df = [self.get_df_pose(pose) for pose in pose_list]
        pose_df = pd.concat(pose_df)
        pose_df.to_csv(f'{self.save_dir}/gt/{self.i}.csv')

    def get_df_pose(self, pose_msg: Odometry):
        pose_6d = odometry_to_6d(pose_msg)
        return pd.DataFrame([pose_6d], columns=['x', 'y', 'z', 'roll', 'pitch', 'yaw'])

    def print_pose(self):
        print(self.poses1)
        print(self.poses2)

if __name__ == '__main__':
    rospy.init_node("save_poses_node", log_level=rospy.INFO)
    node = SaveData()
    rospy.spin()
    rospy.on_shutdown(node.save_image_data)

I hope that this resolves the problem that you are facing.