Automationscribe.com
  • Home
  • AI Scribe
  • AI Tools
  • Artificial Intelligence
  • Contact Us
No Result
View All Result
Automation Scribe
  • Home
  • AI Scribe
  • AI Tools
  • Artificial Intelligence
  • Contact Us
No Result
View All Result
Automationscribe.com
No Result
View All Result

Practice Your Giant Mannequin on A number of GPUs with Totally Sharded Knowledge Parallelism

admin by admin
January 5, 2026
in Artificial Intelligence
0
Practice Your Giant Mannequin on A number of GPUs with Totally Sharded Knowledge Parallelism
399
SHARES
2.3k
VIEWS
Share on FacebookShare on Twitter


import dataclasses

import functools

import os

 

import datasets

import tokenizers

import torch

import torch.distributed as dist

import torch.nn as nn

import torch.nn.useful as F

import torch.optim.lr_scheduler as lr_scheduler

import tqdm

from torch import Tensor

from torch.distributed.algorithms._checkpoint.checkpoint_wrapper import (

    apply_activation_checkpointing,

    checkpoint_wrapper,

)

from torch.distributed.checkpoint import load, save

from torch.distributed.checkpoint.state_dict import (

    StateDictOptions,

    get_state_dict,

    set_state_dict,

)

from torch.distributed.fsdp import (

    CPUOffloadPolicy,

    FSDPModule,

    MixedPrecisionPolicy,

    fully_shard,

)

from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy

from torch.utils.information.distributed import DistributedSampler

 

 

# Construct the mannequin

@dataclasses.dataclass

class LlamaConfig:

    “”“Outline Llama mannequin hyperparameters.”“”

    vocab_size: int = 50000  # Measurement of the tokenizer vocabulary

    max_position_embeddings: int = 2048  # Most sequence size

    hidden_size: int = 768  # Dimension of hidden layers

    intermediate_size: int = 4*768  # Dimension of MLP’s hidden layer

    num_hidden_layers: int = 12  # Variety of transformer layers

    num_attention_heads: int = 12  # Variety of consideration heads

    num_key_value_heads: int = 3  # Variety of key-value heads for GQA

 

 

class RotaryPositionEncoding(nn.Module):

    “”“Rotary place encoding.”“”

 

    def __init__(self, dim: int, max_position_embeddings: int) -> None:

        “”“Initialize the RotaryPositionEncoding module.

 

        Args:

            dim: The hidden dimension of the enter tensor to which RoPE is utilized

            max_position_embeddings: The utmost sequence size of the enter tensor

        ““”

        tremendous().__init__()

        self.dim = dim

        self.max_position_embeddings = max_position_embeddings

        # compute a matrix of ntheta_i

        N = 10_000.0

        inv_freq = 1.0 / (N ** (torch.arange(0, dim, 2) / dim))

        inv_freq = torch.cat((inv_freq, inv_freq), dim=–1)

        place = torch.arange(max_position_embeddings)

        sinusoid_inp = torch.outer(place, inv_freq)

        # save cosine and sine matrices as buffers, not parameters

        self.register_buffer(“cos”, sinusoid_inp.cos())

        self.register_buffer(“sin”, sinusoid_inp.sin())

 

    def ahead(self, x: Tensor) -> Tensor:

        “”“Apply RoPE to tensor x.

 

        Args:

            x: Enter tensor of form (batch_size, seq_length, num_heads, head_dim)

 

        Returns:

            Output tensor of form (batch_size, seq_length, num_heads, head_dim)

        ““”

        batch_size, seq_len, num_heads, head_dim = x.form

        machine = x.machine

        dtype = x.dtype

        # rework the cosine and sine matrices to 4D tensor and the identical dtype as x

        cos = self.cos.to(machine, dtype)[:seq_len].view(1, seq_len, 1, –1)

        sin = self.sin.to(machine, dtype)[:seq_len].view(1, seq_len, 1, –1)

        # apply RoPE to x

        x1, x2 = x.chunk(2, dim=–1)

        rotated = torch.cat((–x2, x1), dim=–1)

        output = (x * cos) + (rotated * sin)

        return output

 

 

class LlamaAttention(nn.Module):

    “”“Grouped-query consideration with rotary embeddings.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.hidden_size = config.hidden_size

        self.num_heads = config.num_attention_heads

        self.head_dim = self.hidden_size // self.num_heads

        self.num_kv_heads = config.num_key_value_heads  # GQA: H_kv < H_q

 

        # hidden_size should be divisible by num_heads

        assert (self.head_dim * self.num_heads) == self.hidden_dimension

 

        # Linear layers for Q, Okay, V projections

        self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)

        self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

        self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

        self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)

 

    def reset_parameters(self):

        self.q_proj.reset_parameters()

        self.k_proj.reset_parameters()

        self.v_proj.reset_parameters()

        self.o_proj.reset_parameters()

 

    def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding, attn_mask: Tensor) -> Tensor:

        bs, seq_len, dim = hidden_states.dimension()

 

        # Undertaking inputs to Q, Okay, V

        query_states = self.q_proj(hidden_states).view(bs, seq_len, self.num_heads, self.head_dim)

        key_states = self.k_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

        value_states = self.v_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

 

        # Apply rotary place embeddings

        query_states = rope(query_states)

        key_states = rope(key_states)

 

        # Transpose tensors from BSHD to BHSD dimension for scaled_dot_product_attention

        query_states = query_states.transpose(1, 2)

        key_states = key_states.transpose(1, 2)

        value_states = value_states.transpose(1, 2)

 

        # Use PyTorch’s optimized consideration implementation

        # setting is_causal=True is incompatible with setting express consideration masks

        attn_output = F.scaled_dot_product_attention(

            query_states,

            key_states,

            value_states,

            attn_mask=attn_mask,

            dropout_p=0.0,

            enable_gqa=True,

        )

 

        # Transpose output tensor from BHSD to BSHD dimension, reshape to 3D, after which undertaking output

        attn_output = attn_output.transpose(1, 2).reshape(bs, seq_len, self.hidden_size)

        attn_output = self.o_proj(attn_output)

        return attn_output

 

 

class LlamaMLP(nn.Module):

    “”“Feed-forward community with SwiGLU activation.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        # Two parallel projections for SwiGLU

        self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

        self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

        self.act_fn = F.silu  # SwiGLU activation operate

        # Undertaking again to hidden dimension

        self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False)

 

    def reset_parameters(self):

        self.gate_proj.reset_parameters()

        self.up_proj.reset_parameters()

        self.down_proj.reset_parameters()

 

    def ahead(self, x: Tensor) -> Tensor:

        # SwiGLU activation: multiply gate and up-projected inputs

        gate = self.act_fn(self.gate_proj(x))

        up = self.up_proj(x)

        return self.down_proj(gate * up)

 

 

class LlamaDecoderLayer(nn.Module):

    “”“Single transformer layer for a Llama mannequin.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.input_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)

        self.self_attn = LlamaAttention(config)

        self.post_attention_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)

        self.mlp = LlamaMLP(config)

 

    def reset_parameters(self):

        self.input_layernorm.reset_parameters()

        self.self_attn.reset_parameters()

        self.post_attention_layernorm.reset_parameters()

        self.mlp.reset_parameters()

 

    def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding, attn_mask: Tensor) -> Tensor:

        # First residual block: Self-attention

        residual = hidden_states

        hidden_states = self.input_layernorm(hidden_states)

        attn_outputs = self.self_attn(hidden_states, rope=rope, attn_mask=attn_mask)

        hidden_states = attn_outputs + residual

 

        # Second residual block: MLP

        residual = hidden_states

        hidden_states = self.post_attention_layernorm(hidden_states)

        hidden_states = self.mlp(hidden_states) + residual

        return hidden_states

 

 

class LlamaModel(nn.Module):

    “”“The complete Llama mannequin with none pretraining heads.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.rotary_emb = RotaryPositionEncoding(

            config.hidden_size // config.num_attention_heads,

            config.max_position_embeddings,

        )

 

        self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size)

        self.layers = nn.ModuleList([

            LlamaDecoderLayer(config) for _ in range(config.num_hidden_layers)

        ])

        self.norm = nn.RMSNorm(config.hidden_size, eps=1e–5)

 

    def reset_parameters(self):

        self.embed_tokens.reset_parameters()

        for layer in self.layers:

            layer.reset_parameters()

        self.norm.reset_parameters()

 

    def ahead(self, input_ids: Tensor, attn_mask: Tensor) -> Tensor:

        # Convert enter token IDs to embeddings

        hidden_states = self.embed_tokens(input_ids)

        # Course of via all transformer layers, then the ultimate norm layer

        for layer in self.layers:

            hidden_states = layer(hidden_states, rope=self.rotary_emb, attn_mask=attn_mask)

        hidden_states = self.norm(hidden_states)

        # Return the ultimate hidden states

        return hidden_states

 

 

class LlamaForPretraining(nn.Module):

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.base_model = LlamaModel(config)

        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

 

    def reset_parameters(self):

        self.base_model.reset_parameters()

        self.lm_head.reset_parameters()

 

    def ahead(self, input_ids: Tensor, attn_mask: Tensor) -> Tensor:

        hidden_states = self.base_model(input_ids, attn_mask)

        return self.lm_head(hidden_states)

 

 

def create_causal_mask(batch: Tensor, dtype: torch.dtype = torch.float32) -> Tensor:

    “”“Create a causal masks for self-attention.

 

    Args:

        batch: Batch of sequences, form (batch_size, seq_len)

        dtype: Knowledge sort of the masks

 

    Returns:

        Causal masks of form (seq_len, seq_len)

    ““”

    batch_size, seq_len = batch.form

    masks = torch.full((seq_len, seq_len), float(“-inf”), machine=batch.machine, dtype=dtype)

                .triu(diagonal=1)

    return masks

 

 

def create_padding_mask(batch: Tensor, padding_token_id: int, dtype: torch.dtype = torch.float32) -> Tensor:

    “”“Create a padding masks for a batch of sequences for self-attention.

 

    Args:

        batch: Batch of sequences, form (batch_size, seq_len)

        padding_token_id: ID of the padding token

        dtype: Knowledge sort of the masks

 

    Returns:

        Padding masks of form (batch_size, 1, seq_len, seq_len)

    ““”

    padded = torch.zeros_like(batch, machine=batch.machine, dtype=dtype)

                  .masked_fill(batch == padding_token_id, float(“-inf”))

    masks = padded[:,:,None] + padded[:,None,:]

    return masks[:, None, :, :]

 

 

# Generator operate to create padded sequences of mounted size

class PretrainingDataset(torch.utils.information.Dataset):

    def __init__(self, dataset: datasets.Dataset, tokenizer: tokenizers.Tokenizer,

                 seq_length: int):

        self.dataset = dataset

        self.tokenizer = tokenizer

        self.seq_length = seq_length

        self.bot = tokenizer.token_to_id(“[BOT]”)

        self.eot = tokenizer.token_to_id(“[EOT]”)

        self.pad = tokenizer.token_to_id(“[PAD]”)

 

    def __len__(self):

        return len(self.dataset)

 

    def __getitem__(self, index: int) -> tuple[Tensor, Tensor]:

        “”“Get a sequence of token ids from the dataset. [BOT] and [EOT] tokens

        are added. Clipped and padded to the sequence size.

        ““”

        seq = self.dataset[index][“text”]

        tokens: listing[int] = [self.bot] + self.tokenizer.encode(seq).ids + [self.eot]

        # pad to focus on sequence size

        toklen = len(tokens)

        if toklen < self.seq_length+1:

            pad_length = self.seq_length+1 – toklen

            tokens += [self.pad] * pad_size

        # return the sequence

        x = torch.tensor(tokens[:self.seq_length], dtype=torch.int64)

        y = torch.tensor(tokens[1:self.seq_length+1], dtype=torch.int64)

        return x, y

 

 

def load_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer, scheduler: lr_scheduler.SequentialLR) -> None:

    dist.barrier()

    model_state, optimizer_state = get_state_dict(

        mannequin, optimizer, choices=StateDictOptions(full_state_dict=True, cpu_offload=cpu_offload),

    )

    load(

        {“mannequin”: model_state, “optimizer”: optimizer_state},

        checkpoint_id=“checkpoint-dist”,

    )

    set_state_dict(

        mannequin, optimizer,

        model_state_dict=model_state, optim_state_dict=optimizer_state,

        choices=StateDictOptions(broadcast_from_rank0=True, full_state_dict=True, cpu_offload=cpu_offload),

    )

    scheduler.load_state_dict(

        torch.load(“checkpoint-dist/lrscheduler.pt”, map_location=machine),

    )

    dist.barrier()

 

 

def save_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer, scheduler: lr_scheduler.SequentialLR) -> None:

    dist.barrier()

    model_state, optimizer_state = get_state_dict(

        mannequin, optimizer, choices=StateDictOptions(full_state_dict=True, cpu_offload=cpu_offload),

    )

    save(

        {“mannequin”: model_state, “optimizer”: optimizer_state},

        checkpoint_id=“checkpoint-dist”,

    )

    if dist.get_rank() == 0:

        torch.save(scheduler.state_dict(), “checkpoint-dist/lrscheduler.pt”)

    dist.barrier()

 

 

# Load the tokenizer and dataset

tokenizer = tokenizers.Tokenizer.from_file(“bpe_50K.json”)

dataset = datasets.load_dataset(“HuggingFaceFW/fineweb”, “sample-10BT”, break up=“prepare”)

 

# Initialize the distributed surroundings

dist.init_process_group(backend=“nccl”)

local_rank = int(os.environ[“LOCAL_RANK”])

machine = torch.machine(f“cuda:{local_rank}”)

rank = dist.get_rank()

world_size = dist.get_world_size()

print(f“World dimension {world_size}, rank {rank}, native rank {local_rank}. Utilizing {machine}”)

 

# Create pretraining mannequin on meta machine, on all ranks

with torch.machine(“meta”):

    model_config = LlamaConfig()

    mannequin = LlamaForPretraining(model_config)

 

# Convert mannequin from meta machine to FSDP2, should shard each element

cpu_offload = False

fsdp_kwargs = {

    # elective: use combined precision coaching

    “mp_policy”: MixedPrecisionPolicy(

        param_dtype=torch.bfloat16,

        reduce_dtype=torch.float32,

    ),

    # elective: CPU offloading

    “offload_policy”: CPUOffloadPolicy() if cpu_offload else None,

    # elective: discard all-gathered parameters after ahead move even on root modules

    # “reshard_after_forward”: True,

}

for layer in mannequin.base_model.layers:

    fully_shard(layer, **fsdp_kwargs)

fully_shard(mannequin.base_model, **fsdp_kwargs)

fully_shard(mannequin, **fsdp_kwargs)

mannequin.to_empty(machine=“cpu” if cpu_offload else machine)

mannequin.reset_parameters()

assert isinstance(mannequin, FSDPModule), f“Anticipated FSDPModule, received {sort(mannequin)}”

 

# Set express prefetching on fashions

# extra prefetching makes use of extra reminiscence, however enable extra overlap of computation and communication

num_prefetch = 1

if num_prefetch > 1:

    modules = listing(mannequin.base_model.layers)

    for i, module in enumerate(modules):

        if i == len(modules) – 1:

            break

        module.set_modules_to_forward_prefetch(modules[i+1:i+num_prefetch+1])

    for i, module in enumerate(modules):

        if i == 0:

            proceed

        module.set_modules_to_backward_prefetch(modules[max(0, i–num_prefetch):i])

 

# Non-compulsory: Apply gradient checkpointing on a distributed mannequin (all ranks)

#wrap_policy = functools.partial(

#    transformer_auto_wrap_policy,

#    transformer_layer_cls={LlamaDecoderLayer, nn.Embedding},

#)

#apply_activation_checkpointing(

#    mannequin,

#    checkpoint_wrapper_fn=checkpoint_wrapper,

#    auto_wrap_policy=wrap_policy,

#)

 

# Coaching parameters

epochs = 3

learning_rate = 1e–3

batch_size = 64 // world_size

seq_length = 512

num_warmup_steps = 1000

PAD_TOKEN_ID = tokenizer.token_to_id(“[PAD]”)

mannequin.prepare()

 

# DataLoader, optimizer, scheduler, and loss operate

# Sampler is required to shard the dataset throughout world dimension

dataset = PretrainingDataset(dataset, tokenizer, seq_length)

sampler = DistributedSampler(dataset, shuffle=False, drop_last=True)

dataloader = torch.utils.information.DataLoader(

    dataset,

    sampler=sampler,

    batch_size=batch_size,

    pin_memory=True,  # elective

    shuffle=False,

    num_workers=2,

    prefetch_factor=2,

)

num_training_steps = len(dataloader) * epochs

 

optimizer = torch.optim.AdamW(

    mannequin.parameters(), lr=learning_rate, betas=(0.9, 0.99), eps=1e–8, weight_decay=0.1,

)

warmup_scheduler = lr_scheduler.LinearLR(

    optimizer,

    start_factor=0.1, end_factor=1.0, total_iters=num_warmup_steps,

)

cosine_scheduler = lr_scheduler.CosineAnnealingLR(

    optimizer,

    T_max=num_training_steps – num_warmup_steps,

    eta_min=0,

)

scheduler = lr_scheduler.SequentialLR(

    optimizer,

    schedulers=[warmup_scheduler, cosine_scheduler],

    milestones=[num_warmup_steps],

)

loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN_ID)

 

# Non-compulsory: Compile the mannequin and loss operate

#mannequin = torch.compile(mannequin)

#loss_fn = torch.compile(loss_fn)

 

# if checkpoint-dist dir exists, load the checkpoint to mannequin and optimizer

if os.path.exists(“checkpoint-dist”):

    load_checkpoint(mannequin, optimizer, scheduler)

 

# begin coaching

for epoch in vary(epochs):

    pbar = tqdm.tqdm(dataloader, desc=f“Epoch {epoch+1}/{epochs}”)

    for batch_id, batch in enumerate(pbar):

        if batch_id % 1000 == 0:

            save_checkpoint(mannequin, optimizer, scheduler)

        # Specific prefetching earlier than sending any information to mannequin

        mannequin.unshard()

        # Get batched information, transfer from CPU to GPU

        input_ids, target_ids = batch

        input_ids = input_ids.to(machine)

        target_ids = target_ids.to(machine)

        # create consideration masks: causal masks + padding masks

        attn_mask = create_causal_mask(input_ids) +

                    create_padding_mask(input_ids, PAD_TOKEN_ID)

        # Extract output from mannequin

        logits = mannequin(input_ids, attn_mask)

        # Compute loss: cross-entropy between logits and goal, ignoring padding tokens

        loss = loss_fn(logits.view(–1, logits.dimension(–1)), target_ids.view(–1))

        # Backward with loss and gradient clipping by L2 norm to 1.0

        # Optimizer and gradient clipping works on DTensor

        optimizer.zero_grad(set_to_none=False if cpu_offload else True)

        loss.backward()

        # All-reduce fail if utilizing CPU offloading

        if not cpu_offload:

            torch.nn.utils.clip_grad_norm_(mannequin.parameters(), 1.0)

        optimizer.step()

        scheduler.step()

        pbar.set_postfix(loss=loss.merchandise())

        pbar.replace(1)

    pbar.shut()

 

# Save the mannequin

save_checkpoint(mannequin, optimizer, scheduler)

 

# Clear up the distributed surroundings

dist.destroy_process_group()

Tags: DataFullyGPUsLargeModelMultipleParallelismShardedTrain
Previous Post

Immediate Engineering vs RAG for Modifying Resumes

Next Post

Improve doc analytics with Strands AI Brokers for the GenAI IDP Accelerator

Next Post
Improve doc analytics with Strands AI Brokers for the GenAI IDP Accelerator

Improve doc analytics with Strands AI Brokers for the GenAI IDP Accelerator

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Popular News

  • Greatest practices for Amazon SageMaker HyperPod activity governance

    Greatest practices for Amazon SageMaker HyperPod activity governance

    405 shares
    Share 162 Tweet 101
  • Speed up edge AI improvement with SiMa.ai Edgematic with a seamless AWS integration

    403 shares
    Share 161 Tweet 101
  • Unlocking Japanese LLMs with AWS Trainium: Innovators Showcase from the AWS LLM Growth Assist Program

    403 shares
    Share 161 Tweet 101
  • Optimizing Mixtral 8x7B on Amazon SageMaker with AWS Inferentia2

    403 shares
    Share 161 Tweet 101
  • The Good-Sufficient Fact | In direction of Knowledge Science

    403 shares
    Share 161 Tweet 101

About Us

Automation Scribe is your go-to site for easy-to-understand Artificial Intelligence (AI) articles. Discover insights on AI tools, AI Scribe, and more. Stay updated with the latest advancements in AI technology. Dive into the world of automation with simplified explanations and informative content. Visit us today!

Category

  • AI Scribe
  • AI Tools
  • Artificial Intelligence

Recent Posts

  • Construct a multimodal generative AI assistant for root trigger prognosis in predictive upkeep utilizing Amazon Bedrock
  • Function Detection, Half 3: Harris Nook Detection
  • Improve doc analytics with Strands AI Brokers for the GenAI IDP Accelerator
  • Home
  • Contact Us
  • Disclaimer
  • Privacy Policy
  • Terms & Conditions

© 2024 automationscribe.com. All rights reserved.

No Result
View All Result
  • Home
  • AI Scribe
  • AI Tools
  • Artificial Intelligence
  • Contact Us

© 2024 automationscribe.com. All rights reserved.