Source code for graspnetAPI.moving_graspnet_eval

__author__ = "mhgou"
# email: gouminghao@gmail.com

# GraspNet Toolbox.      version 2.0
# Data, paper, and tutorials available at:  https://graspnet.net/
# Code written by Minghao Gou 2022.
# Licensed under the none commercial CC4.0 license [see https://graspnet.net/about]

import os
import math
import numpy as np
import trackeval
import json
from itertools import product

from .moving_graspnet import MovingGraspNet, SCENE_DIR_NAME, CAMERA_PREFIX
from .grasp import Grasp, GraspGroup
from .utils.logger import get_logger
from .utils.eval_utils import model_free_collision_detection

QUERY_GRASP_POSE_DIR_NAME = "query_grasp_poses"
QUERY_GRASP_POSE_FILE_NAME = "grasp_group.npy"
QUERY_GRASP_POSE_JSON_NAME = "grasp_query.json"
START_FRAME_KEY_NAME = "start_frame"
OBJECT_ID_KEY_NAME = "object_id"


[docs]class GraspDist: """Grasp pose distance. Args: translation(float): translational distance. rotation(float): rotational distance """
[docs] @classmethod def dist_from_grasp_pose(cls, g1: Grasp, g2: Grasp): """Calculate grasp distance given two grasp pose. Args: g1(Grasp): grasp pose 1. g2(Grasp): grasp pose 2. Returns: GraspDist: distance of the two grasp pose. """ translation = np.linalg.norm(g1.translation - g2.translation) trace = np.matmul(g1.rotation_matrix, g2.rotation_matrix.T).trace() trace = min(max(trace, 1.0), 3.0) rotation = math.acos(0.5 * (trace - 1.0)) return cls(translation, rotation)
def __init__(self, translation=0.1, rotation=30.0 / 180.0 * np.pi): self.translation = translation self.rotation = rotation
[docs] def __lt__(self, other): """less than a thresh.""" return self.translation < other.translation and self.rotation < other.rotation
[docs] def __gt__(self, other): """greater than a thresh.""" return self.translation > other.translation or self.rotation > other.rotation
def __repr__(self): return "GraspDist:(t: {}, r:{})".format(self.translation, self.rotation)
[docs]class MovingGraspNetEval(MovingGraspNet): """Moving GraspNet Evaluation class. Args: root(str): MovingGraspNet root directory. pred_dir(str): prediction files directory. dist_thresh(GraspDist): threshold grasp distance. """ def __init__(self, root, pred_dir, dist_thresh=GraspDist()): super(MovingGraspNetEval, self).__init__(root) self.logger = get_logger("Moving-GraspNet-Eval") self.pred_dir = pred_dir self.dist_thresh = dist_thresh def _eval_mgta(self, data): """Calculate MGTA(Multi Grasppose Tracking Accuracy) Args: data(dict): the generated data. Returns: float: MGTA score. """ metric = trackeval.metrics.CLEAR() metric_result = metric.eval_sequence(data) return metric_result["MOTA"] def _parse_tracker(self, scene_name, camera_sn): """Parse tracker files. Args: scene_name(str): the scene name. camera_sn(str): the camera serial number. Returns: int, np.array, dict: num_tracker, tracker_present, tracker pose dict. """ frames = self.get_frame_list(scene_name=scene_name, camera_sn=camera_sn) num_time_step = len(frames) max_pred = 0 for frame in frames: tracker_ids = [ int(pred_file.split(".")[0]) for pred_file in os.listdir( os.path.join(self.pred_dir, scene_name, camera_sn, frame) ) ] if len(tracker_ids) > 0: max_pred = max(max_pred, max(tracker_ids)) tracker_present = np.zeros((num_time_step, max_pred), dtype=bool) tracker_pose_dict = dict() for time_step, frame in enumerate(frames): tracker_pose_dict[time_step] = dict() for tracker_id in range(max_pred): pose_file = os.path.join( self.pred_dir, scene_name, camera_sn, frame, "{}.npy".format(tracker_id + 1), ) if os.path.exists(pose_file): tracker_present[time_step][tracker_id] = True tracker_pose_dict[time_step][tracker_id] = Grasp().from_npy( pose_file ) return max_pred, tracker_present, tracker_pose_dict def _generate_gt(self, scene_name, camera_sn, dump_dir=None): """Generate ground truth from query. Args: scene_name(str): the scene name. camera_sn(str): the camera serial number. dump_dir(str): the directory to save the gt result. Returns: int, np.array, dict: num_gt, gt_present, gt pose dict. """ frame_list = self.get_frame_list(scene_name=scene_name, camera_sn=camera_sn) num_time_step = len(frame_list) query_grasp_pose_annotation_dir = os.path.join( self.root, SCENE_DIR_NAME, scene_name, "{}{}".format(CAMERA_PREFIX, camera_sn), QUERY_GRASP_POSE_DIR_NAME, ) query_grasp_poses = GraspGroup().from_npy( os.path.join( query_grasp_pose_annotation_dir, QUERY_GRASP_POSE_FILE_NAME, ) ) json_path = os.path.join( query_grasp_pose_annotation_dir, QUERY_GRASP_POSE_JSON_NAME ) with open(json_path, "r") as grasp_info_f: grasp_info_dict = json.load(grasp_info_f) num_gt_ids = len(query_grasp_poses) gt_present = np.zeros((num_time_step, num_gt_ids), dtype=bool) gt_pose_dict = dict() for gt_id, start_grasp_pose in enumerate(query_grasp_poses): start_frame = grasp_info_dict[gt_id][START_FRAME_KEY_NAME] attached_object_id = grasp_info_dict[gt_id][OBJECT_ID_KEY_NAME] find_start_frame_flag = False for time_step in range(num_time_step): if not frame_list[time_step] == start_frame: continue start_time_step = time_step find_start_frame_flag = True break if not find_start_frame_flag: raise ValueError("Start frame not found") start_pose = self.load_object_pose( scene_name, camera_sn, start_frame, attached_object_id, registered=True ) if start_pose is None: raise ValueError("start pose doesn't exist") for time_step in range(start_time_step, num_time_step): if not time_step in gt_pose_dict.keys(): gt_pose_dict[time_step] = dict() frame = frame_list[time_step] current_pose = self.load_object_pose( scene_name, camera_sn, frame, attached_object_id, registered=True ) if current_pose is None: continue scene_pcd = self.load_point_cloud(scene_name, camera_sn, frame) current_grasp_pose = start_grasp_pose.transform( np.linalg.inv(start_pose) ).transform( current_pose ) # might be a bug, not tested. # collision detection is_collision = model_free_collision_detection( current_grasp_pose, np.asarray(scene_pcd.points) ) if not is_collision: gt_present[time_step][gt_id] = True gt_pose_dict[time_step][gt_id] = current_grasp_pose if dump_dir is not None: scene_dir = os.path.join(dump_dir, scene_name) camera_dir = os.path.join(scene_dir, camera_sn) os.makedirs(camera_dir, exist_ok=True) for time_step in range(num_time_step): frame = frame_list[time_step] frame_dir = os.path.join(camera_dir, frame) os.makedirs(frame_dir, exist_ok=True) for gt_id in range(num_gt_ids): if gt_present[time_step][gt_id]: gt_pose_dict[time_step][gt_id].save_npy( os.path.join(frame_dir, "{}.npy".format(gt_id + 1)) ) return num_gt_ids, gt_present, gt_pose_dict def _calculate_similarity( self, tracker_present, tracker_pose_dict, gt_present, gt_pose_dict ): """Calculate similarity matrix from tracker and gt. Args: tracker_present(np.array): tracker presense array. tracker_pose_dict(dict): dict of tracker poses. gt_present(np.array): ground truth presense array. gt_pose_dict(np.array): ground truth poses. Returns: np.array: similarity matrix: [num_timestep, num_gt, num_tracker]. """ num_time_step_tracker, num_tracker_ids = tracker_present.shape num_time_step_gt, num_gt_ids = gt_present.shape assert ( num_time_step_tracker == num_time_step_gt ), "time steps in tracker and gt should be the same" num_time_step = num_time_step_gt similarity = np.zeros( shape=(num_time_step, num_gt_ids, num_tracker_ids), dtype=np.uint8 ) for time_step, gt_id, tracker_id in product( range(num_time_step), range(num_gt_ids), range(num_tracker_ids) ): self.logger.debug( "Calculating similarity for time step {}".format(time_step) ) if tracker_present[time_step, tracker_id] and gt_present[time_step, gt_id]: gt_pose = gt_pose_dict[time_step][gt_id] tracker_pose = tracker_pose_dict[time_step][tracker_id] if ( GraspDist.dist_from_grasp_pose(gt_pose, tracker_pose) < self.dist_thresh ): similarity[time_step, gt_id, tracker_id] = 1 return similarity def _parse_files(self, scene_name, camera_sn): """Generate needed matrix from dumped files. Args: scene_name(str): the scene name. camera_sn(str): the camera serial number. Returns: tuple: (num_timesteps, num_gt_ids, num_tracker_ids, gt_present, tracker_present, similarity) """ frame_list = self.get_frame_list(scene_name=scene_name, camera_sn=camera_sn) num_timestep = len(frame_list) num_gt_ids, gt_present, gt_pose_dict = self._generate_gt(scene_name, camera_sn) num_tracker_ids, tracker_present, tracker_pose_dict = self._parse_tracker( scene_name, camera_sn ) similarity = self._calculate_similarity( tracker_present, tracker_pose_dict, gt_present, gt_pose_dict ) return ( num_timestep, num_gt_ids, num_tracker_ids, gt_present, tracker_present, similarity, ) def _from_dense( self, num_timesteps, num_gt_ids, num_tracker_ids, gt_present, tracker_present, similarity, ): """Generate data for TrackEval from dense matraces.(from TrackEval) Args: num_timestep(int): number of time steps of the sequence. num_gt_ids(int): number of the groud truth grasp poses. num_tracker_ids(int): number of the tracker grasp poses. gt_present(np.array): gt presence matrix. tracker_present(np.array): tracker present matrix. similarity(np.array): similarity matrix between tracker and gt. Returns: dict: the data for TrackEval. """ gt_subset = [np.flatnonzero(gt_present[t, :]) for t in range(num_timesteps)] tracker_subset = [ np.flatnonzero(tracker_present[t, :]) for t in range(num_timesteps) ] similarity_subset = [ similarity[t][gt_subset[t], :][:, tracker_subset[t]] for t in range(num_timesteps) ] data = { "num_timesteps": num_timesteps, "num_gt_ids": num_gt_ids, "num_tracker_ids": num_tracker_ids, "num_gt_dets": np.sum(gt_present), "num_tracker_dets": np.sum(tracker_present), "gt_ids": gt_subset, "tracker_ids": tracker_subset, "similarity_scores": similarity_subset, } return data
[docs] def get_seq_mgta(self, scene_name, camera_sn): """Calculate MGTA score from dumped files. Args: scene_name(str): the scene name. camera_sn(str): the camera serial number. Returns: float: MGTA score. """ data = self._from_dense(*(self._parse_files(scene_name, camera_sn))) return self._eval_mgta(data)
[docs] def eval_mgta_all(self): """Evaluate MGTA for all sequences. Returns: float: average MGTA for all sequences. """ mgta_list = [] mgta_dict = dict() frame_dict = self.collision_path_dict_initial_frame for scene_name in frame_dict.keys(): mgta[scene_name] = dict() for camera_sn in frame_dict[scene_name].keys(): mgta = self.get_seq_mgta(scene_name, camera_sn) mgta_list.append(mgta) mgta_dict[scene_name][camera_sn] = mgta avg_mgta = sum(mgta_list) / len(mgta_list) self.logger.info("Detailed MGTA:{}\nAverage MGTA:{}").format( json.dumps(mgta_dict, indent=2), avg_mgta ) return avg_mgta
[docs] def load_query_pose(self, scene_name, camera_sn): """Load the grasp poses queries. Args: scene_name(str): scene name. camera_sn(str): camera serial number. Return: list: dict{"grasp_pose": gg, "start_frame": frame, "object_id": id} """ if not scene_name in self.collision_path_dict_initial_frame.keys(): raise ValueError("No valid query in scene:{}".format(scene_name)) if not camera_sn in self.collision_path_dict_initial_frame[scene_name].keys(): raise ValueError( "No valid query in scene:{}, camere:{}".format(scene_name, camera_sn) ) query_dir = os.path.join( self.root, "scenes", scene_name, "{}{}".format(CAMERA_PREFIX, camera_sn), QUERY_GRASP_POSE_DIR_NAME, ) query_json_path = os.path.join(query_dir, QUERY_GRASP_POSE_JSON_NAME) query_pose_path = os.path.join(query_dir, QUERY_GRASP_POSE_FILE_NAME) with open(query_json_path) as query_f: query_info = json.load(query_f) query_pose = GraspGroup(query_pose_path) num_grasp = len(query_info) if not len(query_pose) == num_grasp: raise ValueError("grasp number in json and GraspGroup doesn't match") query_list = [] for i in range(num_grasp): query_list.append( { "start_frame": query_info[i]["start_frame"], "object_id": query_info[i]["object_id"], "grasp_pose": query_pose[i], } ) return query_list