Predictive Hacks

How to Train a BERT Model with SageMaker

sagemaker

In this tutorial, we will provide an example of how we can train an NLP classification problem with BERT and SageMaker. ou will train a text classifier using a variant of BERT called RoBERTa within a PyTorch model ran as a SageMaker Training Job. The steps of our analysis are:

  • Configure dataset
  • Configure model hyper-parameters
  • Setup evaluation metrics, debugger and profiler
  • Train model
  • Analyze debugger results
  • Deploy and test the model

We will use the “Bring Your Own Script” schema.

Prepare the SageMaker Environment

import boto3
import sagemaker
import pandas as pd
import numpy as np
import botocore

config = botocore.config.Config(user_agent_extra='dlai-pds/c2/w2')

# low-level service client of the boto3 session
sm = boto3.client(service_name='sagemaker', 
                  config=config)

sm_runtime = boto3.client('sagemaker-runtime',
                          config=config)

sess = sagemaker.Session(sagemaker_client=sm,
                         sagemaker_runtime_client=sm_runtime)

bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = sess.boto_region_name

import matplotlib.pyplot as plt
%matplotlib inline
%config InlineBackend.figure_format='retina'
 

Configure the Dataset

Assume that you have already split your data into train, validation and test datasets. Then we can upload this data to S3.

processed_train_data_s3_uri = 's3://{}/data/sentiment-train/'.format(bucket)
processed_validation_data_s3_uri = 's3://{}/data/sentiment-validation/'.format(bucket)

!aws s3 cp --recursive ./data/sentiment-train $processed_train_data_s3_uri
!aws s3 cp --recursive ./data/sentiment-validation $processed_validation_data_s3_uri

Then, we need to setup the input channels.

s3_input_train_data = sagemaker.inputs.TrainingInput(
    s3_data=processed_train_data_s3_uri 
)


s3_input_validation_data = sagemaker.inputs.TrainingInput(
    s3_data=processed_validation_data_s3_uri
)


data_channels = {
    'train': s3_input_train_data, 
    'validation': s3_input_validation_data 
}

The next step is to configure the model hyper-parameters.

max_seq_length=128 # maximum number of input tokens passed to BERT model
freeze_bert_layer=False # specifies the depth of training within the network
epochs=3
learning_rate=2e-5
train_batch_size=256
train_steps_per_epoch=50
validation_batch_size=256
validation_steps_per_epoch=50
seed=42
run_validation=True
train_instance_count=1
train_instance_type='ml.c5.9xlarge'
train_volume_size=256
input_mode='File'

Some of them will be passed into the PyTorch estimator in the hyperparameters argument. Let’s setup the dictionary for that:

hyperparameters={
    'max_seq_length': max_seq_length,
    'freeze_bert_layer': freeze_bert_layer,
    'epochs': epochs,
    'learning_rate': learning_rate,
    'train_batch_size': train_batch_size,
    'train_steps_per_epoch': train_steps_per_epoch,
    'validation_batch_size': validation_batch_size,
    'validation_steps_per_epoch': validation_steps_per_epoch,    
    'seed': seed,
    'run_validation': run_validation
}

The next step is to setup the evaluation metrics that will be the loss and the accuracy. Regex will capture the values of metrics that the algorithm will emit.

metric_definitions = [
     {'Name': 'validation:loss', 'Regex': 'val_loss: ([0-9.]+)'},
     {'Name': 'validation:accuracy', 'Regex': 'val_acc: ([0-9.]+)'},
]

Then we have to setup a debugger and profiler

from sagemaker.debugger import Rule, ProfilerRule, rule_configs
from sagemaker.debugger import DebuggerHookConfig
from sagemaker.debugger import ProfilerConfig, FrameworkProfile

DebuggerHookConfig provides options to customize how debugging information is emitted and saved. s3_output_path argument value defines the location in Amazon S3 to store the output.

debugger_hook_config = DebuggerHookConfig(
    s3_output_path='s3://{}'.format(bucket),
)

ProfilerConfig sets the configuration for collecting system and framework metrics of SageMaker Training Jobs. Parameter system_monitor_interval_millis sets the time interval to collect system metrics (in milliseconds). Parameter framework_profile_params is the object for framework metrics profiling. Here you will set its local path, the step at which to start profiling, start_step, and the number of steps to profile, num_steps.

from sagemaker.debugger import ProfilerConfig, FrameworkProfile

profiler_config = ProfilerConfig(
    system_monitor_interval_millis=500,
    framework_profile_params=FrameworkProfile(local_path="/opt/ml/output/profiler/", start_step=5, num_steps=10)
)

For monitoring and profiling the built-in rules you can use the ProfilerReport. It creates a profiling report and updates when the individual rules are triggered. If you trigger this ProfilerReport rule without any customized parameter as in the cell below, then the ProfilerReport rule triggers all of the built-in rules for monitoring and profiling with their default parameter values.

The profiling report can be downloaded while the Training Job is running or after the job has finished.

rules=[ProfilerRule.sagemaker(rule_configs.ProfilerReport())]

Train the Model

We will use the PyTorch model running it as a SageMaker Training Job in a separate Python file, which will be called during the training, using a pre-trained model called robeta-base.

The train.py script is the following:

################################################################################################################################################
######################################################## Import required modules ###############################################################
################################################################################################################################################

import argparse
import pprint
import json
import logging
import os
import sys
import pandas as pd
import random
import time
import glob
import numpy as np
from collections import defaultdict

import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data
import torch.utils.data.distributed
from torch.utils.data import Dataset, DataLoader

from transformers import RobertaModel, RobertaConfig
from transformers import RobertaForSequenceClassification
from transformers import AdamW, get_linear_schedule_with_warmup

################################################################################################################################################
###################################################### Parse input arguments ###################################################################
################################################################################################################################################

def parse_args():

    parser = argparse.ArgumentParser()
    
    # CLI args
    
    parser.add_argument('--train_batch_size', 
                        type=int, 
                        default=64)
    
    parser.add_argument('--train_steps_per_epoch',
                        type=int,
                        default=64)

    parser.add_argument('--validation_batch_size', 
                        type=int, 
                        default=64)
    
    parser.add_argument('--validation_steps_per_epoch',
                        type=int,
                        default=64)

    parser.add_argument('--epochs', 
                        type=int, 
                        default=1)
    
    parser.add_argument('--freeze_bert_layer', 
                        type=eval, 
                        default=False)
    
    parser.add_argument('--learning_rate', 
                        type=float, 
                        default=0.01)
    
    parser.add_argument('--momentum', 
                        type=float, 
                        default=0.5)
    
    parser.add_argument('--seed', 
                        type=int, 
                        default=42)
    
    parser.add_argument('--log_interval', 
                        type=int, 
                        default=100)
    
    parser.add_argument('--backend', 
                        type=str, 
                        default=None)
    
    parser.add_argument('--max_seq_length', 
                        type=int, 
                        default=128)
    
    parser.add_argument('--run_validation', 
                        type=eval,
                        default=False)
        
    # Container environment  
    
    parser.add_argument('--hosts', 
                        type=list, 
                        default=json.loads(os.environ['SM_HOSTS']))
    
    parser.add_argument('--current_host', 
                        type=str, 
                        default=os.environ['SM_CURRENT_HOST'])
    
    parser.add_argument('--model_dir', 
                        type=str, 
                        default=os.environ['SM_MODEL_DIR'])

    parser.add_argument('--train_data', 
                        type=str, 
                        default=os.environ['SM_CHANNEL_TRAIN'])
    
    parser.add_argument('--validation_data', 
                        type=str, 
                        default=os.environ['SM_CHANNEL_VALIDATION'])
        
    parser.add_argument('--output_dir', 
                        type=str, 
                        default=os.environ['SM_OUTPUT_DIR'])
    
    parser.add_argument('--num_gpus', 
                        type=int, 
                        default=os.environ['SM_NUM_GPUS'])

    # Debugger args
    
    parser.add_argument("--save-frequency", 
                        type=int, 
                        default=10, 
                        help="frequency with which to save steps")
    
    parser.add_argument("--smdebug_path",
                        type=str,
                        help="output directory to save data in",
                        default="/opt/ml/output/tensors",)
    
    parser.add_argument("--hook-type",
                        type=str,
                        choices=["saveall", "module-input-output", "weights-bias-gradients"],
                        default="saveall",)

    return parser.parse_args()

################################################################################################################################################
########################################################### Tools and variables ################################################################
################################################################################################################################################

# Model name according to the PyTorch documentation: 
# https://github.com/aws/sagemaker-pytorch-inference-toolkit/blob/6936c08581e26ff3bac26824b1e4946ec68ffc85/src/sagemaker_pytorch_serving_container/torchserve.py#L45
MODEL_NAME = 'model.pth'
# Hugging face list of models: https://huggingface.co/models
PRE_TRAINED_MODEL_NAME = 'roberta-base'

def create_list_input_files(path):
    input_files = glob.glob('{}/*.tsv'.format(path))
    print(input_files)
    return input_files

def save_transformer_model(model, model_dir):
    path = '{}/transformer'.format(model_dir)
    os.makedirs(path, exist_ok=True)                              
    print('Saving Transformer model to {}'.format(path))
    model.save_pretrained(path)

def save_pytorch_model(model, model_dir):
    os.makedirs(model_dir, exist_ok=True) 
    print('Saving PyTorch model to {}'.format(model_dir))
    save_path = os.path.join(model_dir, MODEL_NAME)
    torch.save(model.state_dict(), save_path)

################################################################################################################################################
########################################################### Configure the model ################################################################
################################################################################################################################################

def configure_model():
    classes = [-1, 0, 1]

    config = RobertaConfig.from_pretrained(
        PRE_TRAINED_MODEL_NAME, 
        num_labels=len(classes),
        id2label={
            ### BEGIN SOLUTION - DO NOT delete this comment for grading purposes
            0: -1, # Replace all None
            1: 0, # Replace all None
            2: 1, # Replace all None
            ### END SOLUTION - DO NOT delete this comment for grading purposes
        },
        label2id={
            -1: 0,
            0: 1,
            1: 2,
        }
    )
    
    config.output_attentions=True

    return config

################################################################################################################################################
####################################################### PyTorch Dataset and DataLoader #########################################################
################################################################################################################################################

# PyTorch dataset retrieves the dataset’s features and labels one sample at a time
# Create a custom Dataset class for the reviews
class ReviewDataset(Dataset):
    
    def __init__(self, input_ids_list, label_id_list):
        self.input_ids_list = input_ids_list
        self.label_id_list = label_id_list

    def __len__(self):
        return len(self.input_ids_list)

    def __getitem__(self, item):
        # convert list of token_ids into an array of PyTorch LongTensors
        input_ids = json.loads(self.input_ids_list[item]) 
        label_id = self.label_id_list[item]

        input_ids_tensor = torch.LongTensor(input_ids)
        label_id_tensor = torch.tensor(label_id, dtype=torch.long)

        return input_ids_tensor, label_id_tensor

# PyTorch DataLoader helps to to organise the input training data in “minibatches” and reshuffle the data at every epoch
# It takes Dataset as an input
def create_data_loader(path, batch_size): 
    print("Get data loader")

    df = pd.DataFrame(columns=['input_ids', 'label_id'])
    
    input_files = create_list_input_files(path)

    for file in input_files:
        df_temp = pd.read_csv(file, 
                              sep='\t', 
                              usecols=['input_ids', 'label_id'])
        df = df.append(df_temp)
        
    ds = ReviewDataset(
        input_ids_list=df.input_ids.to_numpy(),
        label_id_list=df.label_id.to_numpy(),
    )
    
    return DataLoader(
        ds,
        batch_size=batch_size,
        shuffle=True,
        drop_last=True,
    ), df

################################################################################################################################################
################################################################ Train model ###################################################################
################################################################################################################################################

def train_model(model,
                train_data_loader,
                df_train,
                val_data_loader, 
                df_val,
                args):
    
    loss_function = nn.CrossEntropyLoss()    
    optimizer = optim.Adam(params=model.parameters(), lr=args.learning_rate)
    
    if args.freeze_bert_layer:
        print('Freezing BERT base layers...')
        for name, param in model.named_parameters():
            if 'classifier' not in name:  # classifier layer
                param.requires_grad = False
        print('Set classifier layers to `param.requires_grad=False`.')        
    
    train_correct = 0
    train_total = 0

    for epoch in range(args.epochs):
        print('EPOCH -- {}'.format(epoch))

        for i, (sent, label) in enumerate(train_data_loader):
            if i < args.train_steps_per_epoch:
                model.train()
                optimizer.zero_grad()
                sent = sent.squeeze(0)
                if torch.cuda.is_available():
                    sent = sent.cuda()
                    label = label.cuda()
                output = model(sent)[0]
                _, predicted = torch.max(output, 1)

                loss = loss_function(output, label)
                loss.backward()
                optimizer.step()
            
                if args.run_validation and i % args.validation_steps_per_epoch == 0:
                    print('RUNNING VALIDATION:')
                    correct = 0
                    total = 0
                    model.eval()

                    for sent, label in val_data_loader:
                        sent = sent.squeeze(0)
                        if torch.cuda.is_available():
                            sent = sent.cuda()
                            label = label.cuda()
                        output = model(sent)[0]
                        _, predicted = torch.max(output.data, 1)

                        total += label.size(0)
                        correct += (predicted.cpu() ==label.cpu()).sum()

                    accuracy = 100.00 * correct.numpy() / total
                    print('[epoch/step: {0}/{1}] val_loss: {2:.2f} - val_acc: {3:.2f}%'.format(epoch, i, loss.item(), accuracy))
            else:
                break           

    print('TRAINING COMPLETED.')
    return model

################################################################################################################################################
#################################################################### Main ######################################################################
################################################################################################################################################

if __name__ == '__main__':
    
    # Parse args
    
    args = parse_args()
    print('Loaded arguments:')
    print(args)

    # Get environment variables
    
    env_var = os.environ 
    print('Environment variables:')
    pprint.pprint(dict(env_var), width = 1) 
    
    
    # Check if distributed training
    
    is_distributed = len(args.hosts) > 1 and args.backend is not None
    
    print("Distributed training - {}".format(is_distributed))
    use_cuda = args.num_gpus > 0
    print("Number of gpus available - {}".format(args.num_gpus))
    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}

    device = torch.device('cuda' if use_cuda else 'cpu')

    # Initialize the distributed environment.
    
    if is_distributed:
        world_size = len(args.hosts)
        os.environ['WORLD_SIZE'] = str(world_size)
        host_rank = args.hosts.index(args.current_host)
        os.environ['RANK'] = str(host_rank)
        dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size)
        print('Initialized the distributed environment: \'{}\' backend on {} nodes. '.format(
            args.backend, dist.get_world_size()) + 'Current host rank is {}. Number of gpus: {}'.format(
            dist.get_rank(), args.num_gpus))
    
    # Set the seed for generating random numbers
    
    torch.manual_seed(args.seed)
    if use_cuda:
        torch.cuda.manual_seed(args.seed) 
    
    # Instantiate model
    
    config = None
    model = None
    
    successful_download = False
    retries = 0
    
    while (retries < 5 and not successful_download):
        try:
            # Configure model
            config = configure_model()
            model = RobertaForSequenceClassification.from_pretrained(
                'roberta-base',
                config=config
            )

            model.to(device)
            successful_download = True
            print('Sucessfully downloaded after {} retries.'.format(retries))
        
        except:
            retries = retries + 1
            random_sleep = random.randint(1, 30)
            print('Retry #{}.  Sleeping for {} seconds'.format(retries, random_sleep))
            time.sleep(random_sleep)
 
    if not model:
         print('Not properly initialized...')
    
    # Create data loaders
    
    train_data_loader, df_train = create_data_loader(args.train_data, args.train_batch_size)
    val_data_loader, df_val = create_data_loader(args.validation_data, args.validation_batch_size)
    
    print("Processes {}/{} ({:.0f}%) of train data".format(
        len(train_data_loader.sampler), len(train_data_loader.dataset),
        100. * len(train_data_loader.sampler) / len(train_data_loader.dataset)
    ))

    print("Processes {}/{} ({:.0f}%) of validation data".format(
        len(val_data_loader.sampler), len(val_data_loader.dataset),
        100. * len(val_data_loader.sampler) / len(val_data_loader.dataset)
    )) 
       
    print('model_dir: {}'.format(args.model_dir))    
    print('model summary: {}'.format(model))
    
    callbacks = []
    initial_epoch_number = 0
   
    # Start training

    model = train_model(
        model,
        train_data_loader,
        df_train,
        val_data_loader, 
        df_val,
        args
    )
    
    save_transformer_model(model, args.model_dir)
    save_pytorch_model(model, args.model_dir)
    
    # Prepare for inference which will be used in deployment
    # You will need three files for it: inference.py, requirements.txt, config.json
    
    inference_path = os.path.join(args.model_dir, "code/")
    os.makedirs(inference_path, exist_ok=True)
    os.system("cp inference.py {}".format(inference_path))
    os.system("cp requirements.txt {}".format(inference_path))
    os.system("cp config.json {}".format(inference_path))

Now, we will call the script:

import sys, importlib
sys.path.append('src/')

import train

# reload the module if it has been previously loaded
if 'train' in sys.modules:
    importlib.reload(train)

# Ignore warnings below
config = train.configure_model()

label_0 = config.id2label[0]
label_1 = config.id2label[1]
label_2 = config.id2label[2]

updated_correctly = False

if label_0 != -1 or label_1 != 0 or label_2 != 1:
    print('#######################################################################################')
    print('Please check that the function \'configure_model\' in the file src/train.py is complete.')
    print('########################################################################################')
    raise Exception('Please check that the function \'configure_model\' in the file src/train.py is complete.')
else:
    print('##################')    
    print('Updated correctly!')        
    print('##################')        

    updated_correctly = True

Then we setup the PyTorch estimator to train our model.

from sagemaker.pytorch import PyTorch as PyTorchEstimator

if updated_correctly:
    estimator = PyTorchEstimator(
        entry_point='train.py',
        source_dir='src',
        role=role,
        instance_count=train_instance_count,
        instance_type=train_instance_type,
        volume_size=train_volume_size,
        py_version='py3', # dynamically retrieves the correct training image (Python 3)
        framework_version='1.6.0', # dynamically retrieves the correct training image (PyTorch)
        hyperparameters=hyperparameters,
        metric_definitions=metric_definitions,
        input_mode=input_mode,
        debugger_hook_config=debugger_hook_config,
        profiler_config=profiler_config,
        rules=rules
    )

Then, we launch the SageMaker Training Job which will be fitting the model to the dataset.

estimator.fit(
    inputs=data_channels,
    wait=False
)


training_job_name = estimator.latest_training_job.name

print('Training Job name: {}'.format(training_job_name))


training_job_name = estimator.latest_training_job.describe()['TrainingJobName']

print('Training Job name: {}'.format(training_job_name))

Review the training metrics

df_metrics = estimator.training_job_analytics.dataframe()
df_metrics

Deploy the Model

We create a custom SentimentPredictor that encapsulates a JSONLines serializer and deserializer. To be passed into the PyTorchModel it needs to be wrapped as a class.

from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONLinesSerializer
from sagemaker.deserializers import JSONLinesDeserializer

class SentimentPredictor(Predictor):
    def __init__(self, endpoint_name, sagemaker_session):
        super().__init__(endpoint_name, 
                         sagemaker_session=sagemaker_session, 
                         serializer=JSONLinesSerializer(),
                         deserializer=JSONLinesDeserializer())


import time
from sagemaker.pytorch.model import PyTorchModel

timestamp = int(time.time())

pytorch_model_name = '{}-{}-{}'.format(training_job_name, 'pt', timestamp)

model = PyTorchModel(name=pytorch_model_name,
                     model_data=estimator.model_data,
                     predictor_cls=SentimentPredictor,
                     entry_point='inference.py',
                     source_dir='src',
                     framework_version='1.6.0',
                     py_version='py3',
                     role=role)


import time

pytorch_endpoint_name = '{}-{}-{}'.format(training_job_name, 'pt', timestamp)

print(pytorch_endpoint_name)


%%time

predictor = model.deploy(initial_instance_count=1, 
                         instance_type='ml.m5.large', 
                         endpoint_name=pytorch_endpoint_name)

Test the Model

Here, we will pass sample strings of text to the endpoint in order to see the sentiment. We give you one example of each, however, feel free to play around and change the strings yourself!

inputs = [
    {"features": ["I love this product!"]},
    {"features": ["OK, but not great."]},
    {"features": ["This is not the right product."]},
]

predictor = SentimentPredictor(endpoint_name=pytorch_endpoint_name,
                               sagemaker_session=sess)

predicted_classes = predictor.predict(inputs)

for predicted_class in predicted_classes:
    print("Predicted class {} with probability {}".format(predicted_class['predicted_label'], predicted_class['probability']))

Output:

Predicted class 1 with probability 0.9605445861816406
Predicted class 0 with probability 0.5798221230506897
Predicted class -1 with probability 0.7667604684829712

Share This Post

Share on facebook
Share on linkedin
Share on twitter
Share on email

Leave a Comment

Subscribe To Our Newsletter

Get updates and learn from the best

More To Explore

Python

Image Captioning with HuggingFace

Image captioning with AI is a fascinating application of artificial intelligence (AI) that involves generating textual descriptions for images automatically.