|
|
import datetime |
|
|
import json |
|
|
import os |
|
|
import random |
|
|
import sys |
|
|
from pathlib import Path |
|
|
|
|
|
import numpy as np |
|
|
import yaml |
|
|
from decord import VideoReader, cpu |
|
|
|
|
|
import lmms_eval.tasks._task_utils.file_utils as file_utils |
|
|
|
|
|
with open(Path(__file__).parent / "_default_template_yaml", "r") as f: |
|
|
raw_data = f.readlines() |
|
|
safe_data = [] |
|
|
for i, line in enumerate(raw_data): |
|
|
|
|
|
if "!function" not in line: |
|
|
safe_data.append(line) |
|
|
|
|
|
config = yaml.safe_load("".join(safe_data)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
HF_HOME = os.environ["HF_HOME"] if "HF_HOME" in os.environ else os.path.expanduser("~/.cache/huggingface/hub") |
|
|
cache_dir = config["dataset_kwargs"]["cache_dir"] |
|
|
cache_dir = os.path.join(HF_HOME, cache_dir) |
|
|
cache_dir = os.path.join(cache_dir, "videos") |
|
|
|
|
|
from loguru import logger as eval_logger |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def egoschema_doc_to_visual(doc): |
|
|
video_path = doc["video_idx"] + ".mp4" |
|
|
video_path = os.path.join(cache_dir, video_path) |
|
|
if os.path.exists(video_path): |
|
|
video_path = video_path |
|
|
elif os.path.exists(video_path.replace("mp4", "MP4")): |
|
|
video_path = video_path.replace("mp4", "MP4") |
|
|
else: |
|
|
sys.exit(f"video path:{video_path} does not exist, please check") |
|
|
return [video_path] |
|
|
|
|
|
|
|
|
|
|
|
def egoschema_doc_to_text(doc, lmms_eval_specific_kwargs=None): |
|
|
if lmms_eval_specific_kwargs is None: |
|
|
lmms_eval_specific_kwargs = {} |
|
|
pre_prompt = "" |
|
|
post_prompt = "" |
|
|
if "pre_prompt" in lmms_eval_specific_kwargs: |
|
|
pre_prompt = lmms_eval_specific_kwargs["pre_prompt"] |
|
|
if "post_prompt" in lmms_eval_specific_kwargs: |
|
|
post_prompt = lmms_eval_specific_kwargs["post_prompt"] |
|
|
|
|
|
question = doc["question"] |
|
|
if "option" in doc: |
|
|
for op in doc["option"]: |
|
|
question += "\n" + op |
|
|
post_prompt = "\nAnswer with the option's letter from the given choices directly." |
|
|
|
|
|
return f"{pre_prompt}{question}{post_prompt}" |
|
|
|
|
|
|
|
|
def egoschema_doc_to_answer(doc): |
|
|
return doc["answer"] |
|
|
|
|
|
|
|
|
|
|
|
def egoschema_process_results(doc, result): |
|
|
|
|
|
min_value = float("inf") |
|
|
min_index = -1 |
|
|
|
|
|
|
|
|
for i, (value, _) in enumerate(result): |
|
|
if value < min_value: |
|
|
min_value = value |
|
|
min_index = i |
|
|
|
|
|
|
|
|
return {"submission": {doc["video_idx"]: min_index}, "score": {"pred": min_index, "ground_truth": doc["answer"]}} |
|
|
|
|
|
|
|
|
def get_multi_choice_info(doc): |
|
|
all_choices = [] |
|
|
index2ans = {} |
|
|
OPTIONS = ["A", "B", "C", "D", "E"] |
|
|
for i in range(5): |
|
|
|
|
|
index2ans[OPTIONS[i]] = doc["option"][i].strip() |
|
|
all_choices.append(OPTIONS[i]) |
|
|
|
|
|
return index2ans, all_choices |
|
|
|
|
|
|
|
|
def parse_multi_choice_response(response, all_choices, index2ans): |
|
|
""" |
|
|
Parse the prediction from the generated response. |
|
|
Return the predicted index e.g., A, B, C, D. |
|
|
https://github.com/MMMU-Benchmark/MMMU/blob/51ce7f3e829c16bb44bc5445782686b4c3508794/eval/eval_utils.py#L10 |
|
|
""" |
|
|
for char in [",", ".", "!", "?", ";", ":", "'"]: |
|
|
response = response.strip(char) |
|
|
response = " " + response + " " |
|
|
|
|
|
index_ans = True |
|
|
ans_with_brack = False |
|
|
ans_with_space = False |
|
|
ans_with_dot = False |
|
|
candidates = [] |
|
|
|
|
|
for choice in all_choices: |
|
|
if f"({choice})" in response: |
|
|
candidates.append(f"({choice})") |
|
|
ans_with_brack = True |
|
|
|
|
|
|
|
|
for choice in all_choices: |
|
|
if f"{choice} " in response: |
|
|
candidates.append(f"{choice} ") |
|
|
ans_with_space = True |
|
|
|
|
|
|
|
|
for choice in all_choices: |
|
|
if f"{choice}." in response: |
|
|
candidates.append(f"{choice}.") |
|
|
ans_with_dot = True |
|
|
|
|
|
|
|
|
if len(candidates) == 0 and len(response.split()) > 5: |
|
|
for index, ans in index2ans.items(): |
|
|
if ans.lower() in response.lower(): |
|
|
candidates.append(index) |
|
|
index_ans = False |
|
|
|
|
|
if len(candidates) == 0: |
|
|
pred_index = random.choice(all_choices) |
|
|
elif len(candidates) > 1: |
|
|
|
|
|
start_indexes = [] |
|
|
if index_ans: |
|
|
|
|
|
for can in candidates: |
|
|
index = response.rfind(can) |
|
|
start_indexes.append(index) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
|
for can in candidates: |
|
|
index = response.lower().rfind(index2ans[can].lower()) |
|
|
start_indexes.append(index) |
|
|
|
|
|
pred_index = candidates[np.argmin(start_indexes)] |
|
|
pred_index = pred_index.replace("(", "").replace(")", "").replace(".", "").strip() |
|
|
else: |
|
|
pred_index = candidates[0] |
|
|
pred_index = pred_index.replace("(", "").replace(")", "").replace(".", "").strip() |
|
|
|
|
|
return pred_index, len(candidates) > 0 |
|
|
|
|
|
|
|
|
|
|
|
def egoschema_process_results_generation(doc, result): |
|
|
|
|
|
pred = result[0] |
|
|
|
|
|
index2ans, all_choices = get_multi_choice_info(doc) |
|
|
parsed_pred, matched_tag = parse_multi_choice_response(pred, all_choices, index2ans) |
|
|
|
|
|
pred_to_index = {"A": 0, "B": 1, "C": 2, "D": 3, "E": 4} |
|
|
index = pred_to_index.get(parsed_pred, -1) |
|
|
|
|
|
return {"submission": {doc["video_idx"]: index}, "score": {"pred": index, "ground_truth": doc["answer"]}} |
|
|
|
|
|
|
|
|
def egoschema_aggregate_submissions(results, args, task): |
|
|
now_date_time = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") |
|
|
submission_file_name = f"inference_results_egoschema_{task}_{now_date_time}.json" |
|
|
path = file_utils.generate_submission_file(submission_file_name, args) |
|
|
|
|
|
|
|
|
|
|
|
combined_submission = {} |
|
|
|
|
|
for submission_dict in results: |
|
|
combined_submission.update(submission_dict) |
|
|
|
|
|
with open(path, "w") as f: |
|
|
json.dump(combined_submission, f, indent=4) |
|
|
|
|
|
eval_logger.info(f"Submission file saved to {path}") |
|
|
|
|
|
|
|
|
|
|
|
def egoschema_aggregate_mc(results, args): |
|
|
egoschema_aggregate_submissions(results, args, "MC") |
|
|
|
|
|
|
|
|
def egoschema_aggregate_mc_ppl(results, args): |
|
|
egoschema_aggregate_submissions(results, args, "MC_PPL") |
|
|
|
|
|
|
|
|
def egoschema_aggregate_score(results, args): |
|
|
yes_count = 0 |
|
|
|
|
|
|
|
|
for answer_dict in results: |
|
|
if str(answer_dict["ground_truth"]) == str(answer_dict["pred"]): |
|
|
yes_count = yes_count + 1 |
|
|
|
|
|
accuracy = yes_count / len(results) |
|
|
|
|
|
return accuracy |
|
|
|
|
|
|
|
|
def egoschema_doc_to_choice(doc): |
|
|
return [op.split(".")[1].strip() for op in doc["option"]] |
|
|
|