import math import os import subprocess from datetime import timedelta from typing import List, Optional, Tuple, Union import numpy as np import requests import torch from accelerate import Accelerator, DistributedType, InitProcessGroupKwargs from accelerate.state import AcceleratorState from decord import VideoReader, cpu from huggingface_hub import snapshot_download from loguru import logger as eval_logger from tqdm import tqdm from transformers import AutoConfig from lmms_eval import utils from lmms_eval.api.instance import Instance from lmms_eval.api.model import lmms from lmms_eval.api.registry import register_model from lmms_eval.models.model_utils.load_video import read_video_pyav from lmms_eval.utils import stop_sequences_criteria try: from llamavid.constants import ( DEFAULT_IM_END_TOKEN, DEFAULT_IM_START_TOKEN, DEFAULT_IMAGE_TOKEN, IMAGE_TOKEN_INDEX, ) from llamavid.conversation import SeparatorStyle, conv_templates from llamavid.model.builder import load_pretrained_model from llava.mm_utils import ( KeywordsStoppingCriteria, get_model_name_from_path, tokenizer_image_token, ) except ImportError: eval_logger.debug("LLaMA-Video is not installed. Please install LLaMA-Video to use this model.") @register_model("llama_vid") class LLaMAVid(lmms): def __init__( self, pretrained: str = "YanweiLi/llama-vid-7b-full-224-video-fps-1", truncation: Optional[bool] = True, device: Optional[str] = "cuda:0", dtype: Optional[Union[str, torch.dtype]] = "auto", batch_size: Optional[Union[int, str]] = 1, trust_remote_code: Optional[bool] = False, revision=None, attn_implementation=( "sdpa" if torch.__version__ > "2.1.2" else "eager" ), # inference implementation for attention, can be "sdpa", "eager", "flash_attention_2". Seems FA2 is not effective during inference: https://discuss.huggingface.co/t/flash-attention-has-no-effect-on-inference/73453/5 device_map="cuda:0", conv_template="vicuna_v1", use_cache=True, truncate_context=False, num_frames: int = 100, **kwargs, ) -> None: super().__init__() accelerator_kwargs = InitProcessGroupKwargs(timeout=timedelta(weeks=52)) accelerator = Accelerator(kwargs_handlers=[accelerator_kwargs]) if accelerator.num_processes > 1: self._device = torch.device(f"cuda:{accelerator.local_process_index}") self.device_map = f"cuda:{accelerator.local_process_index}" elif accelerator.num_processes == 1 and device_map == "auto": self._device = torch.device(device) self.device_map = device_map else: self._device = torch.device(f"cuda:{accelerator.local_process_index}") self.device_map = f"cuda:{accelerator.local_process_index}" self.pretrained = pretrained self.model_path = snapshot_download(self.pretrained) self.model_name = get_model_name_from_path(pretrained) self.num_frames = num_frames if not os.path.exists("./model_zoo/LAVIS/eva_vit_g.pth") and accelerator.is_main_process: eval_logger.info("\n\n Eva Encoder is not found for LLaMA-VID. Download automatically to the folder ./model_zoo/LAVIS") cache_path = "model_zoo/LAVIS" os.makedirs(cache_path, exist_ok=True) subprocess.run(["wget https://storage.googleapis.com/sfr-vision-language-research/LAVIS/models/BLIP2/eva_vit_g.pth -O ./model_zoo/LAVIS/eva_vit_g.pth"], shell=True) accelerator.wait_for_everyone() self._tokenizer, self._model, self.image_processor, self._max_length = load_pretrained_model( self.model_path, None, self.model_name, device_map=self.device_map, ) self._config = self._model.config self.model.eval() self.model.tie_weights() self.truncation = truncation self.batch_size_per_gpu = int(batch_size) self.conv_template = conv_template self.use_cache = use_cache self.truncate_context = truncate_context # assert self.batch_size_per_gpu == 1, "Llava currently does not support batched generation. See https://github.com/haotian-liu/LLaVA/issues/754. HF Llava also has this issue." if accelerator.num_processes > 1: assert accelerator.distributed_type in [DistributedType.FSDP, DistributedType.MULTI_GPU, DistributedType.DEEPSPEED], "Unsupported distributed type provided. Only DDP and FSDP are supported." # If you want to use DistributedType.DEEPSPEED, you have to run accelerate config before using the model # Also, you have to select zero stage 0 (equivalent to DDP) in order to make the prepare model works # I tried to set different parameters in the kwargs to let default zero 2 stage works, but it didn't work. if accelerator.distributed_type == DistributedType.DEEPSPEED: kwargs = { "train_micro_batch_size_per_gpu": self.batch_size_per_gpu, "train_batch_size": self.batch_size_per_gpu * accelerator.num_processes, } AcceleratorState().deepspeed_plugin.deepspeed_config_process(must_match=True, **kwargs) eval_logger.info("Detected that you are using DistributedType.DEEPSPEED. Make sure you run `accelerate config` and set zero stage to 0") if accelerator.distributed_type == DistributedType.FSDP or accelerator.distributed_type == DistributedType.DEEPSPEED: self._model = accelerator.prepare(self.model) else: self._model = accelerator.prepare_model(self.model, evaluation_mode=True) self.accelerator = accelerator if self.accelerator.is_local_main_process: eval_logger.info(f"Using {accelerator.num_processes} devices with data parallelism") self._rank = self.accelerator.local_process_index self._world_size = self.accelerator.num_processes elif accelerator.num_processes == 1 and device_map == "auto": eval_logger.info(f"Using {accelerator.num_processes} devices with tensor parallelism") self._rank = 0 self._world_size = 1 else: eval_logger.info(f"Using single device: {self._device}") self.model.to(self._device) self._rank = 0 self._world_size = 1 def download_file(self, url, folder_path): # Create the folder if it doesn't exist if not os.path.exists(folder_path): os.makedirs(folder_path) # Extract filename from URL filename = url.split("/")[-1] # Define path to save the file file_path = os.path.join(folder_path, filename) # Send a GET request to the URL response = requests.get(url) # Check if request was successful (status code 200) if response.status_code == 200: # Save the file to the specified folder with open(file_path, "wb") as f: f.write(response.content) print(f"File downloaded successfully to {file_path}") else: print(f"Failed to download file. Status code: {response.status_code}") @property def config(self): # return the associated transformers.AutoConfig for the given pretrained model. return self._config @property def tokenizer(self): return self._tokenizer @property def model(self): # returns the model, unwrapping it if using Accelerate if hasattr(self, "accelerator"): return self.accelerator.unwrap_model(self._model) else: return self._model @property def eot_token_id(self): # we use EOT because end of *text* is more accurate for what we're doing than end of *sentence* return self.tokenizer.eos_token_id @property def max_length(self): return self._max_length def tok_encode(self, string: str, left_truncate_len=None, add_special_tokens=None) -> List[int]: """ """ add_special_tokens = False if add_special_tokens is None else add_special_tokens encoding = self.tokenizer.encode(string, add_special_tokens=add_special_tokens) # left-truncate the encoded context to be at most `left_truncate_len` tokens long if left_truncate_len: encoding = encoding[-left_truncate_len:] return encoding def tok_decode(self, tokens): return self.tokenizer.decode(tokens) def load_video(self, video_path): vr = VideoReader(video_path, ctx=cpu(0)) total_frame_num = len(vr) fps = round(vr.get_avg_fps()) frame_idx = [i for i in range(0, len(vr), fps)] spare_frames = vr.get_batch(frame_idx).asnumpy() return spare_frames def flatten(self, input): new_list = [] for i in input: for j in i: new_list.append(j) return new_list def generate_until(self, requests) -> List[str]: res = [] pbar = tqdm(total=len(requests), disable=(self.rank != 0), desc="Model Responding") for contexts, gen_kwargs, doc_to_visual, doc_id, task, split in [reg.args for reg in requests]: # encode, pad, and truncate contexts for this batch visuals = [doc_to_visual(self.task_dict[task][split][doc_id])] visuals = self.flatten(visuals) videos = [] for visual in visuals: video = read_video_pyav(visual, num_frm=self.num_frames) video = self.image_processor.preprocess(video, return_tensors="pt")["pixel_values"].half().cuda() video = [video] videos += video qs = contexts if self.model.config.mm_use_im_start_end: qs = DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN + "\n" + qs else: qs = DEFAULT_IMAGE_TOKEN + "\n" + qs conv = conv_templates[self.conv_template].copy() conv.append_message(conv.roles[0], qs) conv.append_message(conv.roles[1], None) prompt = conv.get_prompt() input_ids = tokenizer_image_token(prompt, self.tokenizer, IMAGE_TOKEN_INDEX, return_tensors="pt").unsqueeze(0).cuda() stop_str = conv.sep if conv.sep_style != SeparatorStyle.TWO else conv.sep2 keywords = [stop_str] stopping_criteria = KeywordsStoppingCriteria(keywords, self.tokenizer, input_ids) cur_prompt = contexts with torch.inference_mode(): self.model.update_prompt([[cur_prompt]]) output_ids = self.model.generate(input_ids, images=video, do_sample=True, temperature=0.2, max_new_tokens=1024, use_cache=True, stopping_criteria=[stopping_criteria]) input_token_len = input_ids.shape[1] n_diff_input_output = (input_ids != output_ids[:, :input_token_len]).sum().item() if n_diff_input_output > 0: print(f"[Warning] {n_diff_input_output} output_ids are not the same as the input_ids") outputs = self.tokenizer.batch_decode(output_ids[:, input_token_len:], skip_special_tokens=True)[0] outputs = outputs.strip() if outputs.endswith(stop_str): outputs = outputs[: -len(stop_str)] outputs = outputs.strip() pbar.update(1) res.append(outputs) return res def loglikelihood(self, requests: List[Instance]) -> List[Tuple[float, bool]]: return super().loglikelihood(requests) @property def batch_size(self): return self.batch_size_per_gpu @property def device(self): return self._device @property def rank(self): return self._rank @property def world_size(self): return self._world_size def generate_until_multi_round(self, requests) -> List[str]: raise NotImplementedError("TODO: Implement multi-round generation for LLaMAVid")