import dataclasses
import os
import datasets
import tokenizers
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.practical as F
import torch.optim.lr_scheduler as lr_scheduler
import tqdm
from torch import Tensor
from torch.distributed.checkpoint import load, save
from torch.distributed.checkpoint.state_dict import StateDictOptions, get_state_dict, set_state_dict
from torch.distributed.pipelining import PipelineStage, ScheduleGPipe
# Construct the mannequin
@dataclasses.dataclass
class LlamaConfig:
“”“Outline Llama mannequin hyperparameters.”“”
vocab_size: int = 50000 # Dimension of the tokenizer vocabulary
max_position_embeddings: int = 2048 # Most sequence size
hidden_size: int = 768 # Dimension of hidden layers
intermediate_size: int = 4*768 # Dimension of MLP’s hidden layer
num_hidden_layers: int = 12 # Variety of transformer layers
num_attention_heads: int = 12 # Variety of consideration heads
num_key_value_heads: int = 3 # Variety of key-value heads for GQA
class RotaryPositionEncoding(nn.Module):
“”“Rotary place encoding.”“”
def __init__(self, dim: int, max_position_embeddings: int) -> None:
“”“Initialize the RotaryPositionEncoding module.
Args:
dim: The hidden dimension of the enter tensor to which RoPE is utilized
max_position_embeddings: The utmost sequence size of the enter tensor
““”
tremendous().__init__()
self.dim = dim
self.max_position_embeddings = max_position_embeddings
# compute a matrix of ntheta_i
N = 10_000.0
inv_freq = 1.0 / (N ** (torch.arange(0, dim, 2) / dim))
inv_freq = torch.cat((inv_freq, inv_freq), dim=–1)
place = torch.arange(max_position_embeddings)
sinusoid_inp = torch.outer(place, inv_freq)
# save cosine and sine matrices as buffers, not parameters
self.register_buffer(“cos”, sinusoid_inp.cos())
self.register_buffer(“sin”, sinusoid_inp.sin())
def ahead(self, x: Tensor) -> Tensor:
“”“Apply RoPE to tensor x.
Args:
x: Enter tensor of form (batch_size, seq_length, num_heads, head_dim)
Returns:
Output tensor of form (batch_size, seq_length, num_heads, head_dim)
““”
batch_size, seq_len, num_heads, head_dim = x.form
dtype = x.dtype
# remodel the cosine and sine matrices to 4D tensor and the identical dtype as x
cos = self.cos.to(dtype)[:seq_len].view(1, seq_len, 1, –1)
sin = self.sin.to(dtype)[:seq_len].view(1, seq_len, 1, –1)
# apply RoPE to x
x1, x2 = x.chunk(2, dim=–1)
rotated = torch.cat((–x2, x1), dim=–1)
output = (x * cos) + (rotated * sin)
return output
class LlamaAttention(nn.Module):
“”“Grouped-query consideration with rotary embeddings.”“”
def __init__(self, config: LlamaConfig) -> None:
tremendous().__init__()
self.hidden_size = config.hidden_size
self.num_heads = config.num_attention_heads
self.head_dim = self.hidden_size // self.num_heads
self.num_kv_heads = config.num_key_value_heads # GQA: H_kv < H_q
# hidden_size have to be divisible by num_heads
assert (self.head_dim * self.num_heads) == self.hidden_dimension
# Linear layers for Q, Okay, V projections
self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)
self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)
self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)
self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)
def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding) -> Tensor:
bs, seq_len, dim = hidden_states.dimension()
# Venture inputs to Q, Okay, V
query_states = self.q_proj(hidden_states).view(bs, seq_len, self.num_heads, self.head_dim)
key_states = self.k_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)
value_states = self.v_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)
# Apply rotary place embeddings
query_states = rope(query_states)
key_states = rope(key_states)
# Transpose tensors from BSHD to BHSD dimension for scaled_dot_product_attention
query_states = query_states.transpose(1, 2)
key_states = key_states.transpose(1, 2)
value_states = value_states.transpose(1, 2)
# Use PyTorch’s optimized consideration implementation
# setting is_causal=True is incompatible with setting express consideration masks
attn_output = F.scaled_dot_product_attention(
query_states,
key_states,
value_states,
is_causal=True,
dropout_p=0.0,
enable_gqa=True,
)
# Transpose output tensor from BHSD to BSHD dimension, reshape to 3D, after which mission output
attn_output = attn_output.transpose(1, 2).reshape(bs, seq_len, self.hidden_size)
attn_output = self.o_proj(attn_output)
return attn_output
class LlamaMLP(nn.Module):
“”“Feed-forward community with SwiGLU activation.”“”
def __init__(self, config: LlamaConfig) -> None:
tremendous().__init__()
# Two parallel projections for SwiGLU
self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)
self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)
self.act_fn = F.silu # SwiGLU activation operate
# Venture again to hidden dimension
self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False)
def ahead(self, x: Tensor) -> Tensor:
# SwiGLU activation: multiply gate and up-projected inputs
gate = self.act_fn(self.gate_proj(x))
up = self.up_proj(x)
return self.down_proj(gate * up)
class LlamaDecoderLayer(nn.Module):
“”“Single transformer layer for a Llama mannequin.”“”
def __init__(self, config: LlamaConfig) -> None:
tremendous().__init__()
self.input_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)
self.self_attn = LlamaAttention(config)
self.post_attention_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)
self.mlp = LlamaMLP(config)
def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding) -> Tensor:
# First residual block: Self-attention
residual = hidden_states
hidden_states = self.input_layernorm(hidden_states)
attn_outputs = self.self_attn(hidden_states, rope=rope)
hidden_states = attn_outputs + residual
# Second residual block: MLP
residual = hidden_states
hidden_states = self.post_attention_layernorm(hidden_states)
hidden_states = self.mlp(hidden_states) + residual
return hidden_states
class LlamaModel(nn.Module):
“”“The complete Llama mannequin with none pretraining heads.”“”
def __init__(self, config: LlamaConfig) -> None:
tremendous().__init__()
self.rope = RotaryPositionEncoding(
config.hidden_size // config.num_attention_heads,
config.max_position_embeddings,
)
self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size)
self.layers = nn.ModuleDict({
str(i): LlamaDecoderLayer(config) for i in vary(config.num_hidden_layers)
})
self.norm = nn.RMSNorm(config.hidden_size, eps=1e–5)
def ahead(self, input_ids: Tensor) -> Tensor:
# Convert enter token IDs to embeddings
if self.embed_tokens is not None:
hidden_states = self.embed_tokens(input_ids)
else:
hidden_states = enter_ids
# Course of by way of all transformer layers, then the ultimate norm layer
for n in vary(len(self.layers)):
if self.layers[str(n)] is not None:
hidden_states = self.layers[str(n)](hidden_states, self.rope)
if self.norm is not None:
hidden_states = self.norm(hidden_states)
# Return the ultimate hidden states, and duplicate over the eye masks
return hidden_states
class LlamaForPretraining(nn.Module):
def __init__(self, config: LlamaConfig) -> None:
tremendous().__init__()
self.base_model = LlamaModel(config)
self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)
def ahead(self, input_ids: Tensor) -> Tensor:
hidden_states = self.base_model(input_ids)
if self.lm_head is not None:
hidden_states = self.lm_head(hidden_states)
return hidden_states
# Generator operate to create padded sequences of mounted size
class PretrainingDataset(torch.utils.knowledge.Dataset):
def __init__(self, dataset: datasets.Dataset, tokenizer: tokenizers.Tokenizer,
seq_length: int, gadget: torch.gadget = None):
self.dataset = dataset
self.tokenizer = tokenizer
self.gadget = gadget
self.seq_length = seq_length
self.bot = tokenizer.token_to_id(“[BOT]”)
self.eot = tokenizer.token_to_id(“[EOT]”)
self.pad = tokenizer.token_to_id(“[PAD]”)
def __len__(self):
return len(self.dataset)
def __getitem__(self, index):
“”“Get a sequence of token ids from the dataset. [BOT] and [EOT] tokens
are added. Clipped and padded to the sequence size.
““”
seq = self.dataset[index][“text”]
tokens: checklist[int] = [self.bot] + self.tokenizer.encode(seq).ids + [self.eot]
# pad to focus on sequence size
toklen = len(tokens)
if toklen < self.seq_length+1:
pad_length = self.seq_length+1 – toklen
tokens += [self.pad] * pad_size
# return the sequence
x = torch.tensor(tokens[:self.seq_length], dtype=torch.int64, gadget=self.gadget)
y = torch.tensor(tokens[1:self.seq_length+1], dtype=torch.int64, gadget=self.gadget)
return x, y
def load_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer) -> None:
dist.barrier()
model_state, optimizer_state = get_state_dict(
mannequin, optimizer, choices=StateDictOptions(full_state_dict=True),
)
load(
{“mannequin”: model_state, “optimizer”: optimizer_state},
checkpoint_id=“checkpoint-dist”,
)
set_state_dict(
mannequin, optimizer,
model_state_dict=model_state, optim_state_dict=optimizer_state,
choices=StateDictOptions(broadcast_from_rank0=True, full_state_dict=True),
)
dist.barrier()
def save_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer) -> None:
dist.barrier()
model_state, optimizer_state = get_state_dict(
mannequin, optimizer, choices=StateDictOptions(full_state_dict=True),
)
save(
{“mannequin”: model_state, “optimizer”: optimizer_state},
checkpoint_id=“checkpoint-dist”,
)
dist.barrier()
# Load the tokenizer and dataset
tokenizer = tokenizers.Tokenizer.from_file(“bpe_50K.json”)
dataset = datasets.load_dataset(“HuggingFaceFW/fineweb”, “sample-10BT”, break up=“practice”)
# Initialize the distributed surroundings
dist.init_process_group(backend=“nccl”)
rank = dist.get_rank()
local_rank = int(os.environ[“LOCAL_RANK”])
world_size = dist.get_world_size()
gadget = torch.gadget(f“cuda:{local_rank}”)
print(f“World dimension {world_size}, rank {rank}, native rank {local_rank}. Utilizing {gadget}”)
assert world_size == 3, f“This script is designed for 3 GPUs, bought {world_size}”
# Create pretraining mannequin with default config on meta gadget to forestall OOM
with torch.gadget(“meta”):
model_config = LlamaConfig()
mannequin = LlamaForPretraining(model_config)
# Partition the mannequin by eradicating some layers
num_layers = model_config.num_hidden_layers
partition = [num_layers // 3, 2 * num_layers // 3, num_layers]
if rank == 0:
# from embedding to 1/3 of the decoder layers
for n in vary(partition[0], partition[2]):
mannequin.base_model.layers[str(n)] = None
mannequin.base_model.norm = None
mannequin.lm_head = None
elif rank == 1:
# from 1/3 to 2/3 of the decoder layers
mannequin.base_model.embed_tokens = None
for n in vary(0, partition[0]):
mannequin.base_model.layers[str(n)] = None
for n in vary(partition[1], partition[2]):
mannequin.base_model.layers[str(n)] = None
mannequin.base_model.norm = None
mannequin.lm_head = None
elif rank == 2:
# from 2/3 to the tip of the decoder layers and the ultimate norm layer, LM head
mannequin.base_model.embed_tokens = None
for n in vary(partition[1]):
mannequin.base_model.layers[str(n)] = None
else:
increase ValueError(f“Invalid rank: {rank}”)
# Transfer mannequin from meta gadget to CUDA gadget, then initialize the weights
def reset_all_weights(mannequin: nn.Module) -> None:
@torch.no_grad()
def weight_reset(m: nn.Module):
reset_parameters = getattr(m, “reset_parameters”, None)
if callable(reset_parameters):
m.reset_parameters()
# Applies fn recursively to mannequin itself and all of mannequin.youngsters()
mannequin.apply(fn=weight_reset)
mannequin.to_empty(gadget=gadget)
reset_all_weights(mannequin)
mannequin.practice()
stage = PipelineStage(mannequin, stage_index=rank, num_stages=world_size, gadget=gadget)
# Coaching parameters
epochs = 3
learning_rate = 1e–3
batch_size = 64
seq_length = 512
num_warmup_steps = 1000
PAD_TOKEN_ID = tokenizer.token_to_id(“[PAD]”)
# DataLoader, optimizer, scheduler, and loss operate
dataset = PretrainingDataset(dataset, tokenizer, seq_length, gadget)
dataloader = torch.utils.knowledge.DataLoader(
dataset,
batch_size=batch_size,
)
num_training_steps = len(dataloader) * epochs
print(f“Variety of coaching steps: {num_training_steps} = {len(dataloader)} * {epochs}”)
optimizer = torch.optim.AdamW(
mannequin.parameters(), lr=learning_rate, betas=(0.9, 0.99), eps=1e–8, weight_decay=0.1,
)
warmup_scheduler = lr_scheduler.LinearLR(
optimizer,
start_factor=0.1, end_factor=1.0, total_iters=num_warmup_steps,
)
cosine_scheduler = lr_scheduler.CosineAnnealingLR(
optimizer,
T_max=num_training_steps – num_warmup_steps,
eta_min=0,
)
scheduler = lr_scheduler.SequentialLR(
optimizer,
schedulers=[warmup_scheduler, cosine_scheduler],
milestones=[num_warmup_steps],
)
# if checkpoint-dist dir exists, load the checkpoint to mannequin and optimizer
# Notice: You must implement learn how to reset the epoch and step to permit appropriate resume
if os.path.exists(“checkpoint-dist”):
load_checkpoint(mannequin, optimizer)
# Create pipeline schedule
def loss_fn(logits: Tensor, target_ids: Tensor) -> Tensor:
logits = logits.view(–1, logits.dimension(–1))
target_ids = target_ids.view(–1)
return F.cross_entropy(logits, target_ids, ignore_index=PAD_TOKEN_ID)
n_microbatches = 4 # num break up per batch
schedule = ScheduleGPipe(stage, n_microbatches=n_microbatches, loss_fn=loss_fn)
# begin coaching
for epoch in vary(epochs):
pbar = tqdm.tqdm(dataloader, desc=f“Epoch {epoch+1}/{epochs}”, disable=(rank != world_size – 1))
for batch_id, batch in enumerate(pbar):
if batch_id % 1000 == 0:
save_checkpoint(mannequin, optimizer)
# zero grad earlier than ahead cross, since no express backward cross is named
optimizer.zero_grad(set_to_none=True)
# get batched knowledge
input_ids, target_ids = batch
if rank == 0:
schedule.step(input_ids)
elif rank == world_size – 1:
losses = [] # expects one misplaced per microbatch
logits = schedule.step(goal=target_ids, losses=losses)
with torch.no_grad():
pbar.set_postfix(loss=sum(losses).merchandise() / len(losses))
else:
schedule.step()
torch.nn.utils.clip_grad_norm_(mannequin.parameters(), 1.0)
optimizer.step()
scheduler.step()
pbar.replace(1)
pbar.shut()
# Save the mannequin
save_checkpoint(mannequin, optimizer)
# Clear up the distributed surroundings
dist.destroy_process_group()


