__author__ = "mhgou"
# email: gouminghao@gmail.com
# GraspNet Toolbox. version 2.0
# Data, paper, and tutorials available at: https://graspnet.net/
# Code written by Minghao Gou 2022.
# Licensed under the none commercial CC4.0 license [see https://graspnet.net/about]
import os
import math
import numpy as np
import trackeval
import json
from itertools import product
from .moving_graspnet import MovingGraspNet, SCENE_DIR_NAME, CAMERA_PREFIX
from .grasp import Grasp, GraspGroup
from .utils.logger import get_logger
from .utils.eval_utils import model_free_collision_detection
QUERY_GRASP_POSE_DIR_NAME = "query_grasp_poses"
QUERY_GRASP_POSE_FILE_NAME = "grasp_group.npy"
QUERY_GRASP_POSE_JSON_NAME = "grasp_query.json"
START_FRAME_KEY_NAME = "start_frame"
OBJECT_ID_KEY_NAME = "object_id"
[docs]class GraspDist:
"""Grasp pose distance.
Args:
translation(float): translational distance.
rotation(float): rotational distance
"""
[docs] @classmethod
def dist_from_grasp_pose(cls, g1: Grasp, g2: Grasp):
"""Calculate grasp distance given two grasp pose.
Args:
g1(Grasp): grasp pose 1.
g2(Grasp): grasp pose 2.
Returns:
GraspDist: distance of the two grasp pose.
"""
translation = np.linalg.norm(g1.translation - g2.translation)
trace = np.matmul(g1.rotation_matrix, g2.rotation_matrix.T).trace()
trace = min(max(trace, 1.0), 3.0)
rotation = math.acos(0.5 * (trace - 1.0))
return cls(translation, rotation)
def __init__(self, translation=0.1, rotation=30.0 / 180.0 * np.pi):
self.translation = translation
self.rotation = rotation
[docs] def __lt__(self, other):
"""less than a thresh."""
return self.translation < other.translation and self.rotation < other.rotation
[docs] def __gt__(self, other):
"""greater than a thresh."""
return self.translation > other.translation or self.rotation > other.rotation
def __repr__(self):
return "GraspDist:(t: {}, r:{})".format(self.translation, self.rotation)
[docs]class MovingGraspNetEval(MovingGraspNet):
"""Moving GraspNet Evaluation class.
Args:
root(str): MovingGraspNet root directory.
pred_dir(str): prediction files directory.
dist_thresh(GraspDist): threshold grasp distance.
"""
def __init__(self, root, pred_dir, dist_thresh=GraspDist()):
super(MovingGraspNetEval, self).__init__(root)
self.logger = get_logger("Moving-GraspNet-Eval")
self.pred_dir = pred_dir
self.dist_thresh = dist_thresh
def _eval_mgta(self, data):
"""Calculate MGTA(Multi Grasppose Tracking Accuracy)
Args:
data(dict): the generated data.
Returns:
float: MGTA score.
"""
metric = trackeval.metrics.CLEAR()
metric_result = metric.eval_sequence(data)
return metric_result["MOTA"]
def _parse_tracker(self, scene_name, camera_sn):
"""Parse tracker files.
Args:
scene_name(str): the scene name.
camera_sn(str): the camera serial number.
Returns:
int, np.array, dict: num_tracker, tracker_present, tracker pose dict.
"""
frames = self.get_frame_list(scene_name=scene_name, camera_sn=camera_sn)
num_time_step = len(frames)
max_pred = 0
for frame in frames:
tracker_ids = [
int(pred_file.split(".")[0])
for pred_file in os.listdir(
os.path.join(self.pred_dir, scene_name, camera_sn, frame)
)
]
if len(tracker_ids) > 0:
max_pred = max(max_pred, max(tracker_ids))
tracker_present = np.zeros((num_time_step, max_pred), dtype=bool)
tracker_pose_dict = dict()
for time_step, frame in enumerate(frames):
tracker_pose_dict[time_step] = dict()
for tracker_id in range(max_pred):
pose_file = os.path.join(
self.pred_dir,
scene_name,
camera_sn,
frame,
"{}.npy".format(tracker_id + 1),
)
if os.path.exists(pose_file):
tracker_present[time_step][tracker_id] = True
tracker_pose_dict[time_step][tracker_id] = Grasp().from_npy(
pose_file
)
return max_pred, tracker_present, tracker_pose_dict
def _generate_gt(self, scene_name, camera_sn, dump_dir=None):
"""Generate ground truth from query.
Args:
scene_name(str): the scene name.
camera_sn(str): the camera serial number.
dump_dir(str): the directory to save the gt result.
Returns:
int, np.array, dict: num_gt, gt_present, gt pose dict.
"""
frame_list = self.get_frame_list(scene_name=scene_name, camera_sn=camera_sn)
num_time_step = len(frame_list)
query_grasp_pose_annotation_dir = os.path.join(
self.root,
SCENE_DIR_NAME,
scene_name,
"{}{}".format(CAMERA_PREFIX, camera_sn),
QUERY_GRASP_POSE_DIR_NAME,
)
query_grasp_poses = GraspGroup().from_npy(
os.path.join(
query_grasp_pose_annotation_dir,
QUERY_GRASP_POSE_FILE_NAME,
)
)
json_path = os.path.join(
query_grasp_pose_annotation_dir, QUERY_GRASP_POSE_JSON_NAME
)
with open(json_path, "r") as grasp_info_f:
grasp_info_dict = json.load(grasp_info_f)
num_gt_ids = len(query_grasp_poses)
gt_present = np.zeros((num_time_step, num_gt_ids), dtype=bool)
gt_pose_dict = dict()
for gt_id, start_grasp_pose in enumerate(query_grasp_poses):
start_frame = grasp_info_dict[gt_id][START_FRAME_KEY_NAME]
attached_object_id = grasp_info_dict[gt_id][OBJECT_ID_KEY_NAME]
find_start_frame_flag = False
for time_step in range(num_time_step):
if not frame_list[time_step] == start_frame:
continue
start_time_step = time_step
find_start_frame_flag = True
break
if not find_start_frame_flag:
raise ValueError("Start frame not found")
start_pose = self.load_object_pose(
scene_name, camera_sn, start_frame, attached_object_id, registered=True
)
if start_pose is None:
raise ValueError("start pose doesn't exist")
for time_step in range(start_time_step, num_time_step):
if not time_step in gt_pose_dict.keys():
gt_pose_dict[time_step] = dict()
frame = frame_list[time_step]
current_pose = self.load_object_pose(
scene_name, camera_sn, frame, attached_object_id, registered=True
)
if current_pose is None:
continue
scene_pcd = self.load_point_cloud(scene_name, camera_sn, frame)
current_grasp_pose = start_grasp_pose.transform(
np.linalg.inv(start_pose)
).transform(
current_pose
) # might be a bug, not tested.
# collision detection
is_collision = model_free_collision_detection(
current_grasp_pose, np.asarray(scene_pcd.points)
)
if not is_collision:
gt_present[time_step][gt_id] = True
gt_pose_dict[time_step][gt_id] = current_grasp_pose
if dump_dir is not None:
scene_dir = os.path.join(dump_dir, scene_name)
camera_dir = os.path.join(scene_dir, camera_sn)
os.makedirs(camera_dir, exist_ok=True)
for time_step in range(num_time_step):
frame = frame_list[time_step]
frame_dir = os.path.join(camera_dir, frame)
os.makedirs(frame_dir, exist_ok=True)
for gt_id in range(num_gt_ids):
if gt_present[time_step][gt_id]:
gt_pose_dict[time_step][gt_id].save_npy(
os.path.join(frame_dir, "{}.npy".format(gt_id + 1))
)
return num_gt_ids, gt_present, gt_pose_dict
def _calculate_similarity(
self, tracker_present, tracker_pose_dict, gt_present, gt_pose_dict
):
"""Calculate similarity matrix from tracker and gt.
Args:
tracker_present(np.array): tracker presense array.
tracker_pose_dict(dict): dict of tracker poses.
gt_present(np.array): ground truth presense array.
gt_pose_dict(np.array): ground truth poses.
Returns:
np.array: similarity matrix: [num_timestep, num_gt, num_tracker].
"""
num_time_step_tracker, num_tracker_ids = tracker_present.shape
num_time_step_gt, num_gt_ids = gt_present.shape
assert (
num_time_step_tracker == num_time_step_gt
), "time steps in tracker and gt should be the same"
num_time_step = num_time_step_gt
similarity = np.zeros(
shape=(num_time_step, num_gt_ids, num_tracker_ids), dtype=np.uint8
)
for time_step, gt_id, tracker_id in product(
range(num_time_step), range(num_gt_ids), range(num_tracker_ids)
):
self.logger.debug(
"Calculating similarity for time step {}".format(time_step)
)
if tracker_present[time_step, tracker_id] and gt_present[time_step, gt_id]:
gt_pose = gt_pose_dict[time_step][gt_id]
tracker_pose = tracker_pose_dict[time_step][tracker_id]
if (
GraspDist.dist_from_grasp_pose(gt_pose, tracker_pose)
< self.dist_thresh
):
similarity[time_step, gt_id, tracker_id] = 1
return similarity
def _parse_files(self, scene_name, camera_sn):
"""Generate needed matrix from dumped files.
Args:
scene_name(str): the scene name.
camera_sn(str): the camera serial number.
Returns:
tuple: (num_timesteps, num_gt_ids, num_tracker_ids, gt_present, tracker_present, similarity)
"""
frame_list = self.get_frame_list(scene_name=scene_name, camera_sn=camera_sn)
num_timestep = len(frame_list)
num_gt_ids, gt_present, gt_pose_dict = self._generate_gt(scene_name, camera_sn)
num_tracker_ids, tracker_present, tracker_pose_dict = self._parse_tracker(
scene_name, camera_sn
)
similarity = self._calculate_similarity(
tracker_present, tracker_pose_dict, gt_present, gt_pose_dict
)
return (
num_timestep,
num_gt_ids,
num_tracker_ids,
gt_present,
tracker_present,
similarity,
)
def _from_dense(
self,
num_timesteps,
num_gt_ids,
num_tracker_ids,
gt_present,
tracker_present,
similarity,
):
"""Generate data for TrackEval from dense matraces.(from TrackEval)
Args:
num_timestep(int): number of time steps of the sequence.
num_gt_ids(int): number of the groud truth grasp poses.
num_tracker_ids(int): number of the tracker grasp poses.
gt_present(np.array): gt presence matrix.
tracker_present(np.array): tracker present matrix.
similarity(np.array): similarity matrix between tracker and gt.
Returns:
dict: the data for TrackEval.
"""
gt_subset = [np.flatnonzero(gt_present[t, :]) for t in range(num_timesteps)]
tracker_subset = [
np.flatnonzero(tracker_present[t, :]) for t in range(num_timesteps)
]
similarity_subset = [
similarity[t][gt_subset[t], :][:, tracker_subset[t]]
for t in range(num_timesteps)
]
data = {
"num_timesteps": num_timesteps,
"num_gt_ids": num_gt_ids,
"num_tracker_ids": num_tracker_ids,
"num_gt_dets": np.sum(gt_present),
"num_tracker_dets": np.sum(tracker_present),
"gt_ids": gt_subset,
"tracker_ids": tracker_subset,
"similarity_scores": similarity_subset,
}
return data
[docs] def get_seq_mgta(self, scene_name, camera_sn):
"""Calculate MGTA score from dumped files.
Args:
scene_name(str): the scene name.
camera_sn(str): the camera serial number.
Returns:
float: MGTA score.
"""
data = self._from_dense(*(self._parse_files(scene_name, camera_sn)))
return self._eval_mgta(data)
[docs] def eval_mgta_all(self):
"""Evaluate MGTA for all sequences.
Returns:
float: average MGTA for all sequences.
"""
mgta_list = []
mgta_dict = dict()
frame_dict = self.collision_path_dict_initial_frame
for scene_name in frame_dict.keys():
mgta[scene_name] = dict()
for camera_sn in frame_dict[scene_name].keys():
mgta = self.get_seq_mgta(scene_name, camera_sn)
mgta_list.append(mgta)
mgta_dict[scene_name][camera_sn] = mgta
avg_mgta = sum(mgta_list) / len(mgta_list)
self.logger.info("Detailed MGTA:{}\nAverage MGTA:{}").format(
json.dumps(mgta_dict, indent=2), avg_mgta
)
return avg_mgta
[docs] def load_query_pose(self, scene_name, camera_sn):
"""Load the grasp poses queries.
Args:
scene_name(str): scene name.
camera_sn(str): camera serial number.
Return:
list: dict{"grasp_pose": gg, "start_frame": frame, "object_id": id}
"""
if not scene_name in self.collision_path_dict_initial_frame.keys():
raise ValueError("No valid query in scene:{}".format(scene_name))
if not camera_sn in self.collision_path_dict_initial_frame[scene_name].keys():
raise ValueError(
"No valid query in scene:{}, camere:{}".format(scene_name, camera_sn)
)
query_dir = os.path.join(
self.root,
"scenes",
scene_name,
"{}{}".format(CAMERA_PREFIX, camera_sn),
QUERY_GRASP_POSE_DIR_NAME,
)
query_json_path = os.path.join(query_dir, QUERY_GRASP_POSE_JSON_NAME)
query_pose_path = os.path.join(query_dir, QUERY_GRASP_POSE_FILE_NAME)
with open(query_json_path) as query_f:
query_info = json.load(query_f)
query_pose = GraspGroup(query_pose_path)
num_grasp = len(query_info)
if not len(query_pose) == num_grasp:
raise ValueError("grasp number in json and GraspGroup doesn't match")
query_list = []
for i in range(num_grasp):
query_list.append(
{
"start_frame": query_info[i]["start_frame"],
"object_id": query_info[i]["object_id"],
"grasp_pose": query_pose[i],
}
)
return query_list