Automationscribe.com
  • Home
  • AI Scribe
  • AI Tools
  • Artificial Intelligence
  • Contact Us
No Result
View All Result
Automation Scribe
  • Home
  • AI Scribe
  • AI Tools
  • Artificial Intelligence
  • Contact Us
No Result
View All Result
Automationscribe.com
No Result
View All Result

Practice Your Giant Mannequin on A number of GPUs with Tensor Parallelism

admin by admin
January 10, 2026
in Artificial Intelligence
0
Practice Your Giant Mannequin on A number of GPUs with Tensor Parallelism
399
SHARES
2.3k
VIEWS
Share on FacebookShare on Twitter


import dataclasses

import datetime

import os

 

import datasets

import tokenizers

import torch

import torch.distributed as dist

import torch.nn as nn

import torch.nn.purposeful as F

import torch.optim.lr_scheduler as lr_scheduler

import tqdm

from torch import Tensor

from torch.distributed.checkpoint import load, save

from torch.distributed.checkpoint.default_planner import DefaultLoadPlanner

from torch.distributed.fsdp import FSDPModule, fully_shard

from torch.distributed.tensor import Replicate, Shard

from torch.distributed.tensor.parallel import (

    ColwiseParallel,

    PrepareModuleInput,

    RowwiseParallel,

    SequenceParallel,

    loss_parallel,

    parallelize_module,

)

from torch.utils.knowledge.distributed import DistributedSampler

 

# Set default to bfloat16

torch.set_default_dtype(torch.bfloat16)

print(“NCCL model:”, torch.cuda.nccl.model())

 

# Construct the mannequin

@dataclasses.dataclass

class LlamaConfig:

    “”“Outline Llama mannequin hyperparameters.”“”

    vocab_size: int = 50000  # Dimension of the tokenizer vocabulary

    max_position_embeddings: int = 2048  # Most sequence size

    hidden_size: int = 768  # Dimension of hidden layers

    intermediate_size: int = 4*768  # Dimension of MLP’s hidden layer

    num_hidden_layers: int = 12  # Variety of transformer layers

    num_attention_heads: int = 12  # Variety of consideration heads

    num_key_value_heads: int = 3  # Variety of key-value heads for GQA

 

 

class RotaryPositionEncoding(nn.Module):

    “”“Rotary place encoding.”“”

 

    def __init__(self, dim: int, max_position_embeddings: int) -> None:

        “”“Initialize the RotaryPositionEncoding module.

 

        Args:

            dim: The hidden dimension of the enter tensor to which RoPE is utilized

            max_position_embeddings: The utmost sequence size of the enter tensor

        ““”

        tremendous().__init__()

        self.dim = dim

        self.max_position_embeddings = max_position_embeddings

        # compute a matrix of ntheta_i

        N = 10_000.0

        inv_freq = 1.0 / (N ** (torch.arange(0, dim, 2) / dim))

        inv_freq = torch.cat((inv_freq, inv_freq), dim=–1)

        place = torch.arange(max_position_embeddings)

        sinusoid_inp = torch.outer(place, inv_freq)

        # save cosine and sine matrices as buffers, not parameters

        self.register_buffer(“cos”, sinusoid_inp.cos())

        self.register_buffer(“sin”, sinusoid_inp.sin())

 

    def ahead(self, x: Tensor) -> Tensor:

        “”“Apply RoPE to tensor x.

 

        Args:

            x: Enter tensor of form (batch_size, seq_length, num_heads, head_dim)

 

        Returns:

            Output tensor of form (batch_size, seq_length, num_heads, head_dim)

        ““”

        batch_size, seq_len, num_heads, head_dim = x.form

        machine = x.machine

        dtype = x.dtype

        # rework the cosine and sine matrices to 4D tensor and the identical dtype as x

        cos = self.cos.to(machine, dtype)[:seq_len].view(1, seq_len, 1, –1)

        sin = self.sin.to(machine, dtype)[:seq_len].view(1, seq_len, 1, –1)

        # apply RoPE to x

        x1, x2 = x.chunk(2, dim=–1)

        rotated = torch.cat((–x2, x1), dim=–1)

        output = (x * cos) + (rotated * sin)

        return output

 

 

class LlamaAttention(nn.Module):

    “”“Grouped-query consideration with rotary embeddings.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.hidden_size = config.hidden_size

        self.num_heads = config.num_attention_heads

        self.head_dim = self.hidden_size // self.num_heads

        self.num_kv_heads = config.num_key_value_heads  # GQA: H_kv < H_q

 

        # hidden_size should be divisible by num_heads

        assert (self.head_dim * self.num_heads) == self.hidden_dimension

 

        # Linear layers for Q, Ok, V projections

        self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)

        self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

        self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

        self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)

 

    def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding, attn_mask: Tensor) -> Tensor:

        bs, seq_len, dim = hidden_states.dimension()

 

        # Challenge inputs to Q, Ok, V

        query_states = self.q_proj(hidden_states).view(bs, seq_len, self.num_heads, self.head_dim)

        key_states = self.k_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

        value_states = self.v_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

 

        # Apply rotary place embeddings

        query_states = rope(query_states)

        key_states = rope(key_states)

 

        # Transpose tensors from BSHD to BHSD dimension for scaled_dot_product_attention

        query_states = query_states.transpose(1, 2)

        key_states = key_states.transpose(1, 2)

        value_states = value_states.transpose(1, 2)

 

        # Use PyTorch’s optimized consideration implementation

        # setting is_causal=True is incompatible with setting specific consideration masks

        attn_output = F.scaled_dot_product_attention(

            query_states,

            key_states,

            value_states,

            attn_mask=attn_mask,

            dropout_p=0.0,

            enable_gqa=True,

        )

 

        # Transpose output tensor from BHSD to BSHD dimension, reshape to 3D, after which undertaking output

        attn_output = attn_output.transpose(1, 2).reshape(bs, seq_len, self.hidden_size)

        attn_output = self.o_proj(attn_output)

        return attn_output

 

 

class LlamaMLP(nn.Module):

    “”“Feed-forward community with SwiGLU activation.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        # Two parallel projections for SwiGLU

        self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

        self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

        self.act_fn = F.silu  # SwiGLU activation perform

        # Challenge again to hidden dimension

        self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False)

 

    def ahead(self, x: Tensor) -> Tensor:

        # SwiGLU activation: multiply gate and up-projected inputs

        gate = self.act_fn(self.gate_proj(x))

        up = self.up_proj(x)

        return self.down_proj(gate * up)

 

 

class LlamaDecoderLayer(nn.Module):

    “”“Single transformer layer for a Llama mannequin.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.input_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)

        self.self_attn = LlamaAttention(config)

        self.post_attention_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)

        self.mlp = LlamaMLP(config)

 

    def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding, attn_mask: Tensor) -> Tensor:

        # First residual block: Self-attention

        residual = hidden_states

        hidden_states = self.input_layernorm(hidden_states)

        attn_outputs = self.self_attn(hidden_states, rope=rope, attn_mask=attn_mask)

        hidden_states = attn_outputs + residual

 

        # Second residual block: MLP

        residual = hidden_states

        hidden_states = self.post_attention_layernorm(hidden_states)

        hidden_states = self.mlp(hidden_states) + residual

        return hidden_states

 

 

class LlamaModel(nn.Module):

    “”“The complete Llama mannequin with none pretraining heads.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.rotary_emb = RotaryPositionEncoding(

            config.hidden_size // config.num_attention_heads,

            config.max_position_embeddings,

        )

 

        self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size)

        self.layers = nn.ModuleList([

            LlamaDecoderLayer(config) for _ in range(config.num_hidden_layers)

        ])

        self.norm = nn.RMSNorm(config.hidden_size, eps=1e–5)

 

    def ahead(self, input_ids: Tensor, attn_mask: Tensor) -> Tensor:

        # Convert enter token IDs to embeddings

        hidden_states = self.embed_tokens(input_ids)

        # Course of via all transformer layers, then the ultimate norm layer

        for layer in self.layers:

            hidden_states = layer(hidden_states, rope=self.rotary_emb, attn_mask=attn_mask)

        hidden_states = self.norm(hidden_states)

        # Return the ultimate hidden states

        return hidden_states

 

 

class LlamaForPretraining(nn.Module):

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.base_model = LlamaModel(config)

        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

 

    def ahead(self, input_ids: Tensor, attn_mask: Tensor) -> Tensor:

        hidden_states = self.base_model(input_ids, attn_mask)

        return self.lm_head(hidden_states)

 

 

def create_causal_mask(batch: Tensor, dtype: torch.dtype = torch.float32) -> Tensor:

    “”“Create a causal masks for self-attention.

 

    Args:

        batch: Batch of sequences, form (batch_size, seq_len)

        dtype: Information kind of the masks

 

    Returns:

        Causal masks of form (seq_len, seq_len)

    ““”

    batch_size, seq_len = batch.form

    masks = torch.full((seq_len, seq_len), float(“-inf”), machine=batch.machine, dtype=dtype)

                .triu(diagonal=1)

    return masks

 

 

def create_padding_mask(batch: Tensor, padding_token_id: int, dtype: torch.dtype = torch.float32) -> Tensor:

    “”“Create a padding masks for a batch of sequences for self-attention.

 

    Args:

        batch: Batch of sequences, form (batch_size, seq_len)

        padding_token_id: ID of the padding token

        dtype: Information kind of the masks

 

    Returns:

        Padding masks of form (batch_size, 1, seq_len, seq_len)

    ““”

    padded = torch.zeros_like(batch, machine=batch.machine, dtype=dtype)

                  .masked_fill(batch == padding_token_id, float(“-inf”))

    masks = padded[:,:,None] + padded[:,None,:]

    return masks[:, None, :, :]

 

 

# Generator perform to create padded sequences of mounted size

class PretrainingDataset(torch.utils.knowledge.Dataset):

    def __init__(self, dataset: datasets.Dataset, tokenizer: tokenizers.Tokenizer,

                 seq_length: int):

        self.dataset = dataset

        self.tokenizer = tokenizer

        self.seq_length = seq_length

        self.bot = tokenizer.token_to_id(“[BOT]”)

        self.eot = tokenizer.token_to_id(“[EOT]”)

        self.pad = tokenizer.token_to_id(“[PAD]”)

 

    def __len__(self):

        return len(self.dataset)

 

    def __getitem__(self, index: int) -> tuple[Tensor, Tensor]:

        “”“Get a sequence of token ids from the dataset. [BOT] and [EOT] tokens

        are added. Clipped and padded to the sequence size.

        ““”

        seq = self.dataset[index][“text”]

        tokens: checklist[int] = [self.bot] + self.tokenizer.encode(seq).ids + [self.eot]

        # pad to focus on sequence size

        toklen = len(tokens)

        if toklen < self.seq_length+1:

            pad_length = self.seq_length+1 – toklen

            tokens += [self.pad] * pad_size

        # return the sequence

        x = torch.tensor(tokens[:self.seq_length], dtype=torch.int64)

        y = torch.tensor(tokens[1:self.seq_length+1], dtype=torch.int64)

        return x, y

 

 

def load_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer, scheduler: lr_scheduler.SequentialLR) -> None:

    dist.barrier()

    load(

        {“mannequin”: mannequin, “optimizer”: optimizer},

        checkpoint_id=“checkpoint-dist”,

        planner=DefaultLoadPlanner(allow_partial_load=True),  # ignore keys for RoPE buffer

    )

    scheduler.load_state_dict(

        torch.load(“checkpoint-dist/lrscheduler.pt”, map_location=machine),

    )

    dist.barrier()

 

 

def save_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer, scheduler: lr_scheduler.SequentialLR) -> None:

    dist.barrier()

    save(

        {“mannequin”: mannequin, “optimizer”: optimizer},

        checkpoint_id=“checkpoint-dist”,

    )

    if dist.get_rank() == 0:

        torch.save(scheduler.state_dict(), “checkpoint-dist/lrscheduler.pt”)

    dist.barrier()

 

 

# Load the tokenizer and dataset

tokenizer = tokenizers.Tokenizer.from_file(“bpe_50K.json”)

dataset = datasets.load_dataset(“HuggingFaceFW/fineweb”, “sample-10BT”, break up=“practice”)

 

# Initialize the distributed setting

dist.init_process_group(backend=“nccl”, timeout=datetime.timedelta(seconds=60))

local_rank = int(os.environ[“LOCAL_RANK”])

machine = torch.machine(f“cuda:{local_rank}”)

rank = dist.get_rank()

world_size = dist.get_world_size()

print(f“World dimension {world_size}, rank {rank}, native rank {local_rank}. Utilizing {machine}”)

 

# Initialize the mesh for tensor parallelism

n_tensor_parallel = 2

assert world_size % n_tensor_parallel == 0, “Count on world dimension to be divisible by variety of tensor parallel GPUs”

mesh = dist.device_mesh.init_device_mesh(

    “cuda”,

    (world_size // n_tensor_parallel, n_tensor_parallel),

    mesh_dim_names=(“dp”, “tp”),

)

print(f“({rank}) Mesh: {mesh}, DP dimension: {mesh[‘dp’].dimension()}, TP dimension: {mesh[‘tp’].dimension()}, DP native rank: {mesh[‘dp’].get_local_rank()}, TP native rank: {mesh[‘tp’].get_local_rank()}”)

 

# Create pretraining mannequin on meta machine, on all ranks

with torch.machine(“meta”):

    model_config = LlamaConfig()

    mannequin = LlamaForPretraining(model_config)

 

# Arrange tensor parallelism on every transformer block within the base mannequin

tp_plan = {

    “input_layernorm”: SequenceParallel(),

    “self_attn”: PrepareModuleInput(

        input_layouts=Shard(dim=1),  # just one place arg might be used

        desired_input_layouts=Replicate(),

    ),

    # Q/Ok projections output might be used with RoPE, have to be replicated

    # Q/Ok/V output might be used with GQA, additionally have to be replicated

    “self_attn.q_proj”: ColwiseParallel(output_layouts=Replicate()),

    “self_attn.k_proj”: ColwiseParallel(output_layouts=Replicate()),

    “self_attn.v_proj”: ColwiseParallel(output_layouts=Replicate()),

    “self_attn.o_proj”: RowwiseParallel(input_layouts=Replicate(), output_layouts=Shard(1)),

    “post_attention_layernorm”: SequenceParallel(),

    “mlp”: PrepareModuleInput(

        input_layouts=Shard(dim=1),

        desired_input_layouts=Replicate(),

    ),

    “mlp.gate_proj”: ColwiseParallel(),

    “mlp.up_proj”: ColwiseParallel(),

    “mlp.down_proj”: RowwiseParallel(output_layouts=Shard(1)),

}

for layer in mannequin.base_model.layers:

    parallelize_module(layer, mesh[“tp”], tp_plan)

 

# Arrange tensor parallelism on the embedding and output norm layers within the base mannequin

# and the prediction head within the top-level mannequin

tp_plan = {

    “base_model.embed_tokens”: RowwiseParallel(

        input_layouts=Replicate(),

        output_layouts=Shard(1),

    ),

    “base_model.norm”: SequenceParallel(),

    “lm_head”: ColwiseParallel(

        input_layouts=Shard(1),

        # output_layouts=Replicate(), # provided that not utilizing loss parallel

        use_local_output=False,  # Preserve DTensor output for loss parallel

    ),

}

parallelize_module(mannequin, mesh[“tp”], tp_plan)

 

# Convert tensor-parallelized mannequin to FSDP2, should shard each part

# shard throughout the “dp” dimension of the mesh

for layer in mannequin.base_model.layers:

    fully_shard(layer, mesh=mesh[“dp”])

fully_shard(mannequin.base_model, mesh=mesh[“dp”])

fully_shard(mannequin, mesh=mesh[“dp”])

 

def reset_all_weights(mannequin: nn.Module) -> None:

    “”“Initialize all weights of the mannequin after transferring it away from meta machine.”“”

    @torch.no_grad()

    def weight_reset(m: nn.Module):

        reset_parameters = getattr(m, “reset_parameters”, None)

        if callable(reset_parameters):

            m.reset_parameters()

 

    # Applies fn recursively to mannequin itself and all of mannequin.youngsters()

    mannequin.apply(fn=weight_reset)

 

torch.manual_seed(42)

mannequin.to_empty(machine=machine)

reset_all_weights(mannequin)

assert isinstance(mannequin, FSDPModule), f“Anticipated FSDPModule, obtained {kind(mannequin)}”

 

# Coaching parameters

epochs = 3

learning_rate = 1e–3

batch_size = 64 // mesh[“dp”].dimension()

seq_length = 512

num_warmup_steps = 1000

PAD_TOKEN_ID = tokenizer.token_to_id(“[PAD]”)

mannequin.practice()

 

# DataLoader, optimizer, scheduler, and loss perform

# Sampler is required to shard the dataset throughout world dimension

dataset = PretrainingDataset(dataset, tokenizer, seq_length)

sampler = DistributedSampler(

    dataset, shuffle=False, drop_last=True,

    num_replicas=mesh[“dp”].dimension(),

    rank=mesh[“dp”].get_local_rank(),

)

dataloader = torch.utils.knowledge.DataLoader(

    dataset,

    sampler=sampler,

    batch_size=batch_size,

    pin_memory=True,  # non-compulsory

    shuffle=False,

    num_workers=2,

    prefetch_factor=2,

)

num_training_steps = len(dataloader) * epochs

 

optimizer = torch.optim.AdamW(

    mannequin.parameters(), lr=learning_rate, betas=(0.9, 0.99), eps=1e–8, weight_decay=0.1,

)

warmup_scheduler = lr_scheduler.LinearLR(

    optimizer,

    start_factor=0.1, end_factor=1.0, total_iters=num_warmup_steps,

)

cosine_scheduler = lr_scheduler.CosineAnnealingLR(

    optimizer,

    T_max=num_training_steps – num_warmup_steps,

    eta_min=0,

)

scheduler = lr_scheduler.SequentialLR(

    optimizer,

    schedulers=[warmup_scheduler, cosine_scheduler],

    milestones=[num_warmup_steps],

)

loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN_ID)

 

# if checkpoint-dist dir exists, load the checkpoint to mannequin and optimizer

if os.path.exists(“checkpoint-dist”):

    load_checkpoint(mannequin, optimizer, scheduler)

 

# begin coaching

print(f“({rank}) Beginning coaching”)

for epoch in vary(epochs):

    pbar = tqdm.tqdm(dataloader, desc=f“({rank}) Epoch {epoch+1}/{epochs}”)

    for batch_id, batch in enumerate(pbar):

        if batch_id % 1000 == 0:

            save_checkpoint(mannequin, optimizer, scheduler)

        # Express prefetching earlier than sending any knowledge to mannequin

        mannequin.unshard()

        # Get batched knowledge, transfer from CPU to GPU

        input_ids, target_ids = batch

        input_ids = input_ids.to(machine)

        target_ids = target_ids.to(machine)

        # create consideration masks: causal masks + padding masks

        attn_mask = create_causal_mask(input_ids) +

                    create_padding_mask(input_ids, PAD_TOKEN_ID)

        # Extract output from mannequin

        logits = mannequin(input_ids, attn_mask)

        optimizer.zero_grad()

        with loss_parallel():

            # Compute loss: cross-entropy between logits and goal, ignoring padding tokens

            loss = loss_fn(logits.view(–1, logits.dimension(–1)), target_ids.view(–1))

            # Backward with loss on DTensor

            loss.backward()

        torch.nn.utils.clip_grad_norm_(mannequin.parameters(), 1.0)

        optimizer.step()

        scheduler.step()

        pbar.set_postfix(loss=loss.merchandise())

        pbar.replace(1)

    pbar.shut()

 

# Save the mannequin

save_checkpoint(mannequin, optimizer, scheduler)

 

# Clear up the distributed setting

dist.destroy_process_group()

Tags: GPUsLargeModelMultipleParallelismTensorTrain
Previous Post

How LLMs Deal with Infinite Context With Finite Reminiscence

Next Post

Crossmodal search with Amazon Nova Multimodal Embeddings

Next Post
Crossmodal search with Amazon Nova Multimodal Embeddings

Crossmodal search with Amazon Nova Multimodal Embeddings

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Popular News

  • Greatest practices for Amazon SageMaker HyperPod activity governance

    Greatest practices for Amazon SageMaker HyperPod activity governance

    405 shares
    Share 162 Tweet 101
  • Speed up edge AI improvement with SiMa.ai Edgematic with a seamless AWS integration

    403 shares
    Share 161 Tweet 101
  • Optimizing Mixtral 8x7B on Amazon SageMaker with AWS Inferentia2

    403 shares
    Share 161 Tweet 101
  • Unlocking Japanese LLMs with AWS Trainium: Innovators Showcase from the AWS LLM Growth Assist Program

    403 shares
    Share 161 Tweet 101
  • The Good-Sufficient Fact | In direction of Knowledge Science

    403 shares
    Share 161 Tweet 101

About Us

Automation Scribe is your go-to site for easy-to-understand Artificial Intelligence (AI) articles. Discover insights on AI tools, AI Scribe, and more. Stay updated with the latest advancements in AI technology. Dive into the world of automation with simplified explanations and informative content. Visit us today!

Category

  • AI Scribe
  • AI Tools
  • Artificial Intelligence

Recent Posts

  • Why the Sophistication of Your Immediate Correlates Nearly Completely with the Sophistication of the Response, as Analysis by Anthropic Discovered
  • How PDI constructed an enterprise-grade RAG system for AI functions with AWS
  • The 2026 Time Collection Toolkit: 5 Basis Fashions for Autonomous Forecasting
  • Home
  • Contact Us
  • Disclaimer
  • Privacy Policy
  • Terms & Conditions

© 2024 automationscribe.com. All rights reserved.

No Result
View All Result
  • Home
  • AI Scribe
  • AI Tools
  • Artificial Intelligence
  • Contact Us

© 2024 automationscribe.com. All rights reserved.