from dataclasses import dataclass import argparse from functools import partial import gradio as gr from transformers import AutoConfig PRECISION_TO_BYTES = {"float32": 4, "fp32": 4, "float16": 2, "fp16": 2, "bfloat16": 2, "bf16": 2, "int8": 1} ZERO_STAGES = [0, 1, 2, 3] BATCH_SIZES = [1, 2, 4, 8, 16, 32, 64] OPTIMIZERS = ["adam", "adamw", "sgd"] HUGGINGFACE_URL_CONFIG = "https://huggingface.co/{}/resolve/main/config.json" @dataclass class ModelConfig: model_size: float hidden_size: int sequence_length: int num_layers: int num_heads: int def overwrite_with_hf_config(self, config: dict): self.model_size = round(get_model_size_from_config(config) / 10**9, 2) self.hidden_size = config["hidden_size"] self.sequence_length = config["max_position_embeddings"] self.num_layers = config["num_hidden_layers"] self.num_heads = config["num_attention_heads"] @dataclass class TrainingConfig: micro_batch_size: int num_gpus: int optimizer: str zero_stage: int gradient_checkpointing: False mixed_precision: False def parse_args(): parser = argparse.ArgumentParser(description="Parser for VRAM estimator") parser.add_argument("--repo_id", type=str, default=None, help="HuggingFace repo id to automatically determine model settings") parser.add_argument("--model_size", type=float, default=7, help="Model size (in billion parameters)") parser.add_argument("--hidden_size", type=int, default=4096, help="Hidden size") parser.add_argument("--sequence_length", type=int, default=8192, help="Sequence length") parser.add_argument("--num_layers", type=int, default=32, help="Number of layers") parser.add_argument("--num_heads", type=int, default=32, help="Number of heads") parser.add_argument("--micro_batch_size", type=int, default=4, help="Micro batch size (batch size per device/GPU)") parser.add_argument("--zero_stage", type=int, default=0, choices=ZERO_STAGES, help="ZeRO optimization stage") parser.add_argument("--gradient_checkpointing", action="store_false", help="Enable gradient checkpointing") parser.add_argument("--mixed_precision", action="store_false", help="Enable mixed precision for model training") parser.add_argument("--optimizer", type=str, default="adamw", choices=OPTIMIZERS, help="Type of optimizer") parser.add_argument("--num_gpus", type=int, default=4, help="Number of GPUs. Necessary for estimating ZeRO stages") parser.add_argument("--cache_dir", type=str, default=None, help="HuggingFace cache directory to download config from") parser.add_argument("--no-app", action="store_true", help="Launch gradio app. Otherwise, commandline output") return parser def get_model_size_from_config(config: dict): # Embedding parameters: embedding_params = config["vocab_size"] * config["hidden_size"] # Transformer layer parameters def transformer_layer_params(hidden_size, intermediate_size, num_key_value_heads): input_layernorm_params = hidden_size mlp_down_proj_params = hidden_size * intermediate_size mlp_gate_proj_params = intermediate_size * hidden_size mlp_up_proj_params = intermediate_size * hidden_size post_attention_layernorm_params = hidden_size self_attn_k_proj_params = (hidden_size // (num_key_value_heads // 2)) * hidden_size self_attn_o_proj_params = hidden_size * hidden_size self_attn_q_proj_params = hidden_size * hidden_size self_attn_v_proj_params = (hidden_size // (num_key_value_heads // 2)) * hidden_size total_layer_params = ( input_layernorm_params + mlp_down_proj_params + mlp_gate_proj_params + mlp_up_proj_params + post_attention_layernorm_params + self_attn_k_proj_params + self_attn_o_proj_params + self_attn_q_proj_params + self_attn_v_proj_params ) return total_layer_params # Total parameters for all transformer layers single_layer_params = transformer_layer_params(config["hidden_size"], config["intermediate_size"], config["num_key_value_heads"]) total_transformer_params = config["num_hidden_layers"] * single_layer_params # Output layer parameters output_params = config["vocab_size"] * config["hidden_size"] # Total parameters total_params = embedding_params + total_transformer_params + output_params return total_params def download_config_from_hub(repo_id: str, cache_dir: str): return AutoConfig.from_pretrained(pretrained_model_name_or_path=repo_id, cache_dir=cache_dir) def scrape_config_from_hub(repo_id): import requests url = HUGGINGFACE_URL_CONFIG.format(repo_id) try: print(f"Fetching config.json from the following URL: {url}...") response = requests.get(url) response.raise_for_status() # Raises a HTTPError if the status is 4xx, 5xx config = response.json() print(f"Fetched the config for model {repo_id} succesfully!") except requests.exceptions.HTTPError as errh: print(f"HTTP Error: {errh}") except requests.exceptions.ConnectionError as errc: print(f"Error Connecting: {errc}") except requests.exceptions.Timeout as errt: print(f"Timeout Error: {errt}") except requests.exceptions.RequestException as err: print(f"Something went wrong: {err}") except ValueError as e: print(f"Error decoding JSON: {e}") return config def model_memory(parameters, precision = "bf16", mixed_precision = False): if mixed_precision: return parameters * (PRECISION_TO_BYTES["fp32"] + PRECISION_TO_BYTES["fp16"]) return parameters * PRECISION_TO_BYTES[precision] def gradients_memory(parameters, precision = "fp32"): return parameters * PRECISION_TO_BYTES[precision] def optimizer_memory(parameters, optimizer= "adamw", precision = "fp32"): optimizer_choices = {"adam": 3, "adamw": 2, "sgd": 1} return optimizer_choices[optimizer] * parameters * PRECISION_TO_BYTES[precision] def activations_memory(num_layers, sequence_length, micro_batch_size, hidden_size, num_heads): # Reference: https://arxiv.org/pdf/2205.05198 # Activations assumed to be in 16-bit floating precision bytes_per_layer = sequence_length * micro_batch_size * hidden_size * (34 + 5 * (num_heads * sequence_length / hidden_size)) bytes_model = bytes_per_layer * num_layers return round(bytes_model / 10**9, 2) def vram_required(model_size, hidden_size, sequence_length, num_layers, num_heads, micro_batch_size, num_gpus, optimizer, zero_stage, gradient_checkpointing, mixed_precision): # Reference: https://www.microsoft.com/en-us/research/blog/zero-deepspeed-new-system-optimizations-enable-training-models-with-over-100-billion-parameters/ model_vram = model_memory(model_size, mixed_precision=mixed_precision) gradients_vram = gradients_memory(model_size) optimizer_vram = optimizer_memory(model_size, optimizer=optimizer) # Baseline if zero_stage == 0: pass # Optimizer state partitioning if zero_stage >= 1: optimizer_vram = optimizer_vram / num_gpus # Gradient + Optimzer state partitioning if zero_stage >= 2: gradients_vram = gradients_vram / num_gpus # Parameter partitioning + Gradient + Optimizer partitioning if zero_stage == 3: aggregated_vram = model_vram / num_gpus aggregated_vram = round(model_vram, 2) + gradients_vram + optimizer_vram activations_vram = activations_memory(num_layers, sequence_length, micro_batch_size, hidden_size, num_heads) if gradient_checkpointing: activations_vram = round(activations_vram ** 0.5, 2) total_vram = aggregated_vram + activations_vram return {"total": total_vram, "model": model_vram, "gradients": gradients_vram, "optimizer": optimizer_vram, "activations": activations_vram} def build_interface(estimate_vram_fn): training_params = [] with gr.Blocks() as app: option = gr.Radio(["Repo ID", "Model Parameters"], label="Select Input Type") repo_id = gr.Textbox(label="Repo ID", visible=False) with gr.Row(visible=False) as model_params_row: model_params = [gr.Slider(label="Model Size", minimum=0.1, maximum=400, step=0.1, value=7, info="Model size (in billion parameters)"), gr.Slider(label="Hidden size", minimum=256, maximum=8192, step=128, value=4096, info="Hidden size"), gr.Slider(label="Sequence length", minimum=256, maximum=128_000, step=256, value=8192, info="Sequence length"), gr.Slider(label="Num layers", minimum=8, maximum=64, step=1, value=32, info="Number of layers"), gr.Slider(label="Num heads", minimum=8, maximum=64, step=1, value=32, info="Number of attention heads") ] def update_visibility(selected_option): if selected_option == "Repo ID": return gr.update(visible=True), gr.update(visible=False), elif selected_option == "Model Parameters": return gr.update(visible=False), gr.update(visible=True) option.change( fn=update_visibility, inputs=[option], outputs=[repo_id, model_params_row] ) with gr.Row(equal_height=True): training_params = [gr.Dropdown(label="Micro batch size", choices=BATCH_SIZES, value=4, info="Micro batch size (batch size per device/GPU)"), gr.Dropdown(label="ZeRO stage", choices=ZERO_STAGES, value=0, info="ZeRO optimization stage"), gr.Dropdown(label="Gradient checkpointing", choices=[True, False], value=True, info="Enable gradient checkpointing"), gr.Dropdown(label="Mixed precision", choices=[False, True], value=False, info="Enable mixed precision for model training"), gr.Dropdown(label="Optimizer", choices=OPTIMIZERS, value="adamw", info="Type of optimizer"), gr.Slider(label="Num GPUs", minimum=1, maximum=64, step=1, value=4, info="Number of GPUs. Necessary for estimating ZeRO stages"), gr.Textbox(label="Cache dir", value=None, placeholder=".huggingface_configs", info="HuggingFace cache directory to download config from") ] submit_btn = gr.Button("Estimate!") output = gr.Textbox(label="Total estimated VRAM per device/GPU (in GB)") submit_btn.click( fn=estimate_vram_fn, inputs=[repo_id, *model_params, *training_params], outputs=[output] ) return app def estimate_vram(arg_keys, *args): params = dict(zip(arg_keys, args)) print("Parameters: ", params) model_config = ModelConfig(params["model_size"], params["hidden_size"], params["sequence_length"], params["num_layers"], params["num_heads"]) training_config = TrainingConfig(params["micro_batch_size"], params["num_gpus"], params["optimizer"], params["zero_stage"], params["gradient_checkpointing"], params["mixed_precision"]) if not params["repo_id"]: return "No model selected!" # If cache directory set, then download config if params["cache_dir"]: config = scrape_config_from_hub(params["repo_id"]) model_config.overwrite_with_hf_config(config) # By default, scrape config.json from hub else: config = download_config_from_hub(params["repo_id"], params["cache_dir"]) model_config.overwrite_with_hf_config(config.to_dict()) total_vram_dict = vram_required(**vars(model_config), **vars(training_config)) output_str = f"Total {total_vram_dict['total']}GB = {total_vram_dict['model']}GB (model) + {total_vram_dict['gradients']}GB (gradients) + {total_vram_dict['optimizer']}GB (optimizer) + {total_vram_dict['activations']}GB activations" return output_str if __name__ == "__main__": parser = parse_args() args = parser.parse_args() # Launch gradio interface if not args.no_app: import gradio as gr arg_keys = list(vars(args).keys()) estimate_vram_fn = partial(estimate_vram, arg_keys) interface = build_interface(estimate_vram_fn) interface.launch() # Command line interface else: model_config = ModelConfig(args.model_size, args.hidden_size, args.sequence_length, args.num_layers, args.num_heads) training_config = TrainingConfig(args.micro_batch_size, args.num_gpus, args.optimizer, args.zero_stage, args.gradient_checkpointing, args.mixed_precision) if args.repo_id: # If cache directory set, then download config if args.cache_dir: config = download_config_from_hub(args.repo_id, args.cache_dir).to_dict() # By default, scrape config.json from hub else: config = scrape_config_from_hub(args.repo_id) model_config.overwrite_with_hf_config(config) total_vram_dict = vram_required(**vars(model_config), **vars(training_config)) print(f"Total {total_vram_dict['total']}GB = {total_vram_dict['model']}GB (model) + {total_vram_dict['gradients']}GB (gradients) + {total_vram_dict['optimizer']}GB (optimizer) + {total_vram_dict['activations']}GB activations")