Automationscribe.com
  • Home
  • AI Scribe
  • AI Tools
  • Artificial Intelligence
  • Contact Us
No Result
View All Result
Automation Scribe
  • Home
  • AI Scribe
  • AI Tools
  • Artificial Intelligence
  • Contact Us
No Result
View All Result
Automationscribe.com
No Result
View All Result

Practice Your Massive Mannequin on A number of GPUs with Pipeline Parallelism

admin by admin
January 13, 2026
in Artificial Intelligence
0
Practice Your Massive Mannequin on A number of GPUs with Pipeline Parallelism
399
SHARES
2.3k
VIEWS
Share on FacebookShare on Twitter


import dataclasses

import os

 

import datasets

import tokenizers

import torch

import torch.distributed as dist

import torch.nn as nn

import torch.nn.practical as F

import torch.optim.lr_scheduler as lr_scheduler

import tqdm

from torch import Tensor

from torch.distributed.checkpoint import load, save

from torch.distributed.checkpoint.state_dict import StateDictOptions, get_state_dict, set_state_dict

from torch.distributed.pipelining import PipelineStage, ScheduleGPipe

 

 

# Construct the mannequin

@dataclasses.dataclass

class LlamaConfig:

    “”“Outline Llama mannequin hyperparameters.”“”

    vocab_size: int = 50000  # Dimension of the tokenizer vocabulary

    max_position_embeddings: int = 2048  # Most sequence size

    hidden_size: int = 768  # Dimension of hidden layers

    intermediate_size: int = 4*768  # Dimension of MLP’s hidden layer

    num_hidden_layers: int = 12  # Variety of transformer layers

    num_attention_heads: int = 12  # Variety of consideration heads

    num_key_value_heads: int = 3  # Variety of key-value heads for GQA

 

 

class RotaryPositionEncoding(nn.Module):

    “”“Rotary place encoding.”“”

 

    def __init__(self, dim: int, max_position_embeddings: int) -> None:

        “”“Initialize the RotaryPositionEncoding module.

 

        Args:

            dim: The hidden dimension of the enter tensor to which RoPE is utilized

            max_position_embeddings: The utmost sequence size of the enter tensor

        ““”

        tremendous().__init__()

        self.dim = dim

        self.max_position_embeddings = max_position_embeddings

        # compute a matrix of ntheta_i

        N = 10_000.0

        inv_freq = 1.0 / (N ** (torch.arange(0, dim, 2) / dim))

        inv_freq = torch.cat((inv_freq, inv_freq), dim=–1)

        place = torch.arange(max_position_embeddings)

        sinusoid_inp = torch.outer(place, inv_freq)

        # save cosine and sine matrices as buffers, not parameters

        self.register_buffer(“cos”, sinusoid_inp.cos())

        self.register_buffer(“sin”, sinusoid_inp.sin())

 

    def ahead(self, x: Tensor) -> Tensor:

        “”“Apply RoPE to tensor x.

 

        Args:

            x: Enter tensor of form (batch_size, seq_length, num_heads, head_dim)

 

        Returns:

            Output tensor of form (batch_size, seq_length, num_heads, head_dim)

        ““”

        batch_size, seq_len, num_heads, head_dim = x.form

        dtype = x.dtype

        # remodel the cosine and sine matrices to 4D tensor and the identical dtype as x

        cos = self.cos.to(dtype)[:seq_len].view(1, seq_len, 1, –1)

        sin = self.sin.to(dtype)[:seq_len].view(1, seq_len, 1, –1)

        # apply RoPE to x

        x1, x2 = x.chunk(2, dim=–1)

        rotated = torch.cat((–x2, x1), dim=–1)

        output = (x * cos) + (rotated * sin)

        return output

 

 

class LlamaAttention(nn.Module):

    “”“Grouped-query consideration with rotary embeddings.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.hidden_size = config.hidden_size

        self.num_heads = config.num_attention_heads

        self.head_dim = self.hidden_size // self.num_heads

        self.num_kv_heads = config.num_key_value_heads  # GQA: H_kv < H_q

 

        # hidden_size have to be divisible by num_heads

        assert (self.head_dim * self.num_heads) == self.hidden_dimension

 

        # Linear layers for Q, Okay, V projections

        self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)

        self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

        self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)

        self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)

 

    def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding) -> Tensor:

        bs, seq_len, dim = hidden_states.dimension()

 

        # Venture inputs to Q, Okay, V

        query_states = self.q_proj(hidden_states).view(bs, seq_len, self.num_heads, self.head_dim)

        key_states = self.k_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

        value_states = self.v_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)

 

        # Apply rotary place embeddings

        query_states = rope(query_states)

        key_states = rope(key_states)

 

        # Transpose tensors from BSHD to BHSD dimension for scaled_dot_product_attention

        query_states = query_states.transpose(1, 2)

        key_states = key_states.transpose(1, 2)

        value_states = value_states.transpose(1, 2)

 

        # Use PyTorch’s optimized consideration implementation

        # setting is_causal=True is incompatible with setting express consideration masks

        attn_output = F.scaled_dot_product_attention(

            query_states,

            key_states,

            value_states,

            is_causal=True,

            dropout_p=0.0,

            enable_gqa=True,

        )

 

        # Transpose output tensor from BHSD to BSHD dimension, reshape to 3D, after which mission output

        attn_output = attn_output.transpose(1, 2).reshape(bs, seq_len, self.hidden_size)

        attn_output = self.o_proj(attn_output)

        return attn_output

 

 

class LlamaMLP(nn.Module):

    “”“Feed-forward community with SwiGLU activation.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        # Two parallel projections for SwiGLU

        self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

        self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)

        self.act_fn = F.silu  # SwiGLU activation operate

        # Venture again to hidden dimension

        self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False)

 

    def ahead(self, x: Tensor) -> Tensor:

        # SwiGLU activation: multiply gate and up-projected inputs

        gate = self.act_fn(self.gate_proj(x))

        up = self.up_proj(x)

        return self.down_proj(gate * up)

 

 

class LlamaDecoderLayer(nn.Module):

    “”“Single transformer layer for a Llama mannequin.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.input_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)

        self.self_attn = LlamaAttention(config)

        self.post_attention_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)

        self.mlp = LlamaMLP(config)

 

    def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding) -> Tensor:

        # First residual block: Self-attention

        residual = hidden_states

        hidden_states = self.input_layernorm(hidden_states)

        attn_outputs = self.self_attn(hidden_states, rope=rope)

        hidden_states = attn_outputs + residual

 

        # Second residual block: MLP

        residual = hidden_states

        hidden_states = self.post_attention_layernorm(hidden_states)

        hidden_states = self.mlp(hidden_states) + residual

        return hidden_states

 

 

class LlamaModel(nn.Module):

    “”“The complete Llama mannequin with none pretraining heads.”“”

 

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.rope = RotaryPositionEncoding(

            config.hidden_size // config.num_attention_heads,

            config.max_position_embeddings,

        )

 

        self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size)

        self.layers = nn.ModuleDict({

            str(i): LlamaDecoderLayer(config) for i in vary(config.num_hidden_layers)

        })

        self.norm = nn.RMSNorm(config.hidden_size, eps=1e–5)

 

    def ahead(self, input_ids: Tensor) -> Tensor:

        # Convert enter token IDs to embeddings

        if self.embed_tokens is not None:

            hidden_states = self.embed_tokens(input_ids)

        else:

            hidden_states = enter_ids

        # Course of by way of all transformer layers, then the ultimate norm layer

        for n in vary(len(self.layers)):

            if self.layers[str(n)] is not None:

                hidden_states = self.layers[str(n)](hidden_states, self.rope)

        if self.norm is not None:

            hidden_states = self.norm(hidden_states)

        # Return the ultimate hidden states, and duplicate over the eye masks

        return hidden_states

 

 

class LlamaForPretraining(nn.Module):

    def __init__(self, config: LlamaConfig) -> None:

        tremendous().__init__()

        self.base_model = LlamaModel(config)

        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)

 

    def ahead(self, input_ids: Tensor) -> Tensor:

        hidden_states = self.base_model(input_ids)

        if self.lm_head is not None:

            hidden_states = self.lm_head(hidden_states)

        return hidden_states

 

 

# Generator operate to create padded sequences of mounted size

class PretrainingDataset(torch.utils.knowledge.Dataset):

    def __init__(self, dataset: datasets.Dataset, tokenizer: tokenizers.Tokenizer,

                 seq_length: int, gadget: torch.gadget = None):

        self.dataset = dataset

        self.tokenizer = tokenizer

        self.gadget = gadget

        self.seq_length = seq_length

        self.bot = tokenizer.token_to_id(“[BOT]”)

        self.eot = tokenizer.token_to_id(“[EOT]”)

        self.pad = tokenizer.token_to_id(“[PAD]”)

 

    def __len__(self):

        return len(self.dataset)

 

    def __getitem__(self, index):

        “”“Get a sequence of token ids from the dataset. [BOT] and [EOT] tokens

        are added. Clipped and padded to the sequence size.

        ““”

        seq = self.dataset[index][“text”]

        tokens: checklist[int] = [self.bot] + self.tokenizer.encode(seq).ids + [self.eot]

        # pad to focus on sequence size

        toklen = len(tokens)

        if toklen < self.seq_length+1:

            pad_length = self.seq_length+1 – toklen

            tokens += [self.pad] * pad_size

        # return the sequence

        x = torch.tensor(tokens[:self.seq_length], dtype=torch.int64, gadget=self.gadget)

        y = torch.tensor(tokens[1:self.seq_length+1], dtype=torch.int64, gadget=self.gadget)

        return x, y

 

 

def load_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer) -> None:

    dist.barrier()

    model_state, optimizer_state = get_state_dict(

        mannequin, optimizer, choices=StateDictOptions(full_state_dict=True),

    )

    load(

        {“mannequin”: model_state, “optimizer”: optimizer_state},

        checkpoint_id=“checkpoint-dist”,

    )

    set_state_dict(

        mannequin, optimizer,

        model_state_dict=model_state, optim_state_dict=optimizer_state,

        choices=StateDictOptions(broadcast_from_rank0=True, full_state_dict=True),

    )

    dist.barrier()

 

 

def save_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer) -> None:

    dist.barrier()

    model_state, optimizer_state = get_state_dict(

        mannequin, optimizer, choices=StateDictOptions(full_state_dict=True),

    )

    save(

        {“mannequin”: model_state, “optimizer”: optimizer_state},

        checkpoint_id=“checkpoint-dist”,

    )

    dist.barrier()

 

 

# Load the tokenizer and dataset

tokenizer = tokenizers.Tokenizer.from_file(“bpe_50K.json”)

dataset = datasets.load_dataset(“HuggingFaceFW/fineweb”, “sample-10BT”, break up=“practice”)

 

# Initialize the distributed surroundings

dist.init_process_group(backend=“nccl”)

rank = dist.get_rank()

local_rank = int(os.environ[“LOCAL_RANK”])

world_size = dist.get_world_size()

gadget = torch.gadget(f“cuda:{local_rank}”)

print(f“World dimension {world_size}, rank {rank}, native rank {local_rank}. Utilizing {gadget}”)

assert world_size == 3, f“This script is designed for 3 GPUs, bought {world_size}”

 

# Create pretraining mannequin with default config on meta gadget to forestall OOM

with torch.gadget(“meta”):

    model_config = LlamaConfig()

    mannequin = LlamaForPretraining(model_config)

    # Partition the mannequin by eradicating some layers

    num_layers = model_config.num_hidden_layers

    partition = [num_layers // 3, 2 * num_layers // 3, num_layers]

    if rank == 0:

        # from embedding to 1/3 of the decoder layers

        for n in vary(partition[0], partition[2]):

            mannequin.base_model.layers[str(n)] = None

        mannequin.base_model.norm = None

        mannequin.lm_head = None

    elif rank == 1:

        # from 1/3 to 2/3 of the decoder layers

        mannequin.base_model.embed_tokens = None

        for n in vary(0, partition[0]):

            mannequin.base_model.layers[str(n)] = None

        for n in vary(partition[1], partition[2]):

            mannequin.base_model.layers[str(n)] = None

        mannequin.base_model.norm = None

        mannequin.lm_head = None

    elif rank == 2:

        # from 2/3 to the tip of the decoder layers and the ultimate norm layer, LM head

        mannequin.base_model.embed_tokens = None

        for n in vary(partition[1]):

            mannequin.base_model.layers[str(n)] = None

    else:

        increase ValueError(f“Invalid rank: {rank}”)

 

 

# Transfer mannequin from meta gadget to CUDA gadget, then initialize the weights

def reset_all_weights(mannequin: nn.Module) -> None:

    @torch.no_grad()

    def weight_reset(m: nn.Module):

        reset_parameters = getattr(m, “reset_parameters”, None)

        if callable(reset_parameters):

            m.reset_parameters()

 

    # Applies fn recursively to mannequin itself and all of mannequin.youngsters()

    mannequin.apply(fn=weight_reset)

 

 

mannequin.to_empty(gadget=gadget)

reset_all_weights(mannequin)

mannequin.practice()

stage = PipelineStage(mannequin, stage_index=rank, num_stages=world_size, gadget=gadget)

 

# Coaching parameters

epochs = 3

learning_rate = 1e–3

batch_size = 64

seq_length = 512

num_warmup_steps = 1000

PAD_TOKEN_ID = tokenizer.token_to_id(“[PAD]”)

 

# DataLoader, optimizer, scheduler, and loss operate

dataset = PretrainingDataset(dataset, tokenizer, seq_length, gadget)

dataloader = torch.utils.knowledge.DataLoader(

    dataset,

    batch_size=batch_size,

)

num_training_steps = len(dataloader) * epochs

print(f“Variety of coaching steps: {num_training_steps} = {len(dataloader)} * {epochs}”)

 

optimizer = torch.optim.AdamW(

    mannequin.parameters(), lr=learning_rate, betas=(0.9, 0.99), eps=1e–8, weight_decay=0.1,

)

warmup_scheduler = lr_scheduler.LinearLR(

    optimizer,

    start_factor=0.1, end_factor=1.0, total_iters=num_warmup_steps,

)

cosine_scheduler = lr_scheduler.CosineAnnealingLR(

    optimizer,

    T_max=num_training_steps – num_warmup_steps,

    eta_min=0,

)

scheduler = lr_scheduler.SequentialLR(

    optimizer,

    schedulers=[warmup_scheduler, cosine_scheduler],

    milestones=[num_warmup_steps],

)

 

# if checkpoint-dist dir exists, load the checkpoint to mannequin and optimizer

# Notice: You must implement learn how to reset the epoch and step to permit appropriate resume

if os.path.exists(“checkpoint-dist”):

    load_checkpoint(mannequin, optimizer)

 

# Create pipeline schedule

def loss_fn(logits: Tensor, target_ids: Tensor) -> Tensor:

    logits = logits.view(–1, logits.dimension(–1))

    target_ids = target_ids.view(–1)

    return F.cross_entropy(logits, target_ids, ignore_index=PAD_TOKEN_ID)

 

n_microbatches = 4  # num break up per batch

schedule = ScheduleGPipe(stage, n_microbatches=n_microbatches, loss_fn=loss_fn)

 

# begin coaching

for epoch in vary(epochs):

    pbar = tqdm.tqdm(dataloader, desc=f“Epoch {epoch+1}/{epochs}”, disable=(rank != world_size – 1))

    for batch_id, batch in enumerate(pbar):

        if batch_id % 1000 == 0:

            save_checkpoint(mannequin, optimizer)

        # zero grad earlier than ahead cross, since no express backward cross is named

        optimizer.zero_grad(set_to_none=True)

        # get batched knowledge

        input_ids, target_ids = batch

        if rank == 0:

            schedule.step(input_ids)

        elif rank == world_size – 1:

            losses = []  # expects one misplaced per microbatch

            logits = schedule.step(goal=target_ids, losses=losses)

            with torch.no_grad():

                pbar.set_postfix(loss=sum(losses).merchandise() / len(losses))

        else:

            schedule.step()

 

        torch.nn.utils.clip_grad_norm_(mannequin.parameters(), 1.0)

        optimizer.step()

        scheduler.step()

        pbar.replace(1)

    pbar.shut()

 

# Save the mannequin

save_checkpoint(mannequin, optimizer)

 

# Clear up the distributed surroundings

dist.destroy_process_group()

Tags: GPUsLargeModelMultipleParallelismpipelineTrain
Previous Post

Find out how to Maximize Claude Code Effectiveness

Next Post

Securing Amazon Bedrock cross-Area inference: Geographic and international

Next Post
Securing Amazon Bedrock cross-Area inference: Geographic and international

Securing Amazon Bedrock cross-Area inference: Geographic and international

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

Popular News

  • Greatest practices for Amazon SageMaker HyperPod activity governance

    Greatest practices for Amazon SageMaker HyperPod activity governance

    405 shares
    Share 162 Tweet 101
  • Speed up edge AI improvement with SiMa.ai Edgematic with a seamless AWS integration

    403 shares
    Share 161 Tweet 101
  • Optimizing Mixtral 8x7B on Amazon SageMaker with AWS Inferentia2

    403 shares
    Share 161 Tweet 101
  • Unlocking Japanese LLMs with AWS Trainium: Innovators Showcase from the AWS LLM Growth Assist Program

    403 shares
    Share 161 Tweet 101
  • The Good-Sufficient Fact | In direction of Knowledge Science

    403 shares
    Share 161 Tweet 101

About Us

Automation Scribe is your go-to site for easy-to-understand Artificial Intelligence (AI) articles. Discover insights on AI tools, AI Scribe, and more. Stay updated with the latest advancements in AI technology. Dive into the world of automation with simplified explanations and informative content. Visit us today!

Category

  • AI Scribe
  • AI Tools
  • Artificial Intelligence

Recent Posts

  • Why the Sophistication of Your Immediate Correlates Nearly Completely with the Sophistication of the Response, as Analysis by Anthropic Discovered
  • How PDI constructed an enterprise-grade RAG system for AI functions with AWS
  • The 2026 Time Collection Toolkit: 5 Basis Fashions for Autonomous Forecasting
  • Home
  • Contact Us
  • Disclaimer
  • Privacy Policy
  • Terms & Conditions

© 2024 automationscribe.com. All rights reserved.

No Result
View All Result
  • Home
  • AI Scribe
  • AI Tools
  • Artificial Intelligence
  • Contact Us

© 2024 automationscribe.com. All rights reserved.