RI Text Multiclass Classification Walkthrough

You are a data scientist working to maintain a large research library. The data science team has been tasked with implementing a research paper topic classification model and monitoring how that model performs over time. The performance of this model directly impacts the profits of the company. To ensure the data science team develops the best model and the performance of this model doesn’t degrade over time, the VP of Data Science purchases the RIME platform.

In this Notebook Walkthrough, we will walkthrough 2 of RIME’s core products - AI Stress Testing and AI Continuous Testing.

  1. AI Stress Testing is used in the model development stage. Using AI Stress Testing you can test the developed model. RIME goes beyond simply optimizing for basic model performance like accuracy and automatically discovers the model’s weaknesses.

  2. AI Continuous Testing is used after the model is deployed in production. Using AI Continuous Testing, you can automate the monitoring, discovery and remediation of issues that occur post-deployment.

Install Dependencies, Import Libraries and Download Data

Run the cell below to install libraries to receive data, install our SDK, and load analysis libraries.

[ ]:
!pip install rime-sdk &> /dev/null

import pandas as pd
from pathlib import Path
from rime_sdk import Client

[ ]:
!pip install https://github.com/RobustIntelligence/ri-public-examples/archive/master.zip

from ri_public_examples.download_files import download_files

download_files('nlp/classification/arxiv-2.0', 'arxiv')

Establish the RIME Client

To get started, provide the API credentials and the base domain/address of the RIME Cluster. You can generate and copy an API token from the API Access Tokens Page under Workspace settings. For the domain/address of the RIME Cluster, contact your admin.

Image of getting an API tokenImage of creating an API token

[ ]:
API_TOKEN = '' # PASTE API_KEY
CLUSTER_URL = '' # PASTE DEDICATED DOMAIN OF RIME SERVICE (e.g., https://rime.example.rbst.io)
AGENT_ID = '' # PASTE AGENT_ID IF USING AN AGENT THAT IS NOT THE DEFAULT
client = Client(CLUSTER_URL, API_TOKEN)

Create a New Project

You can create projects in RIME to organize your test runs. Each project represents a workspace for a given machine learning task. It can contain multiple candidate models, but should only contain one promoted production model.

[ ]:
description = (
    "Run Stress Testing and Continuous Testing on a"
    " text classification model and dataset. Demonstration uses "
    " a dataset composed of ArXiv paper titles where the task is"
    " to predict the paper topic."
)
project = client.create_project(
    name='Text Classification Demo',
    description=description,
    model_task='MODEL_TASK_MULTICLASS_CLASSIFICATION'
)

Go back to the UI to see the Arxiv Project

Preparing the Model + Datasets + Predictions

For this demo, we are going to use the prediction logs of a text classification model for arxiv, a popular research database.

The model classifies the research paper into a number of different categories such as -

  1. Black Hole

  2. Neutron Star

  3. Dark Matter

We now want to kick off RIME Stress Tests that will help us evaluate the model in further depth beyond basic performance metrics like accuracy, precision, recall. In order to do this, we will upload this pre-trained model, the reference dataset the model was trained on, and the evaluation dataset the model was evaluated on to an S3 bucket that can be accessed by RIME. Futhermore, we’ll need to register them with RIME.

Uploading Artifacts to Blob Storage

For SaaS environments using the default S3 storage location, the Python SDK supports direct file uploads using upload_*().

For other environments and storage technologies, artifacts must be managed through alternate means.

[ ]:
IS_SAAS = False # TOGGLE True/False (Note: SaaS environments use URLs ending in "rbst.io" and have an "Internal Agent")

[ ]:
if not IS_SAAS:
    BLOB_STORE_URI = "" # PROVIDE BLOB STORE URI (e.g., "s3://acmecorp-rime")
    assert BLOB_STORE_URI != ""

UPLOAD_PATH = "ri_public_examples_arxiv"

[ ]:
if IS_SAAS:
    ref_s3_path = client.upload_file(
        Path('arxiv/data/train.json.gz'), upload_path=UPLOAD_PATH
    )
    eval_s3_path = client.upload_file(
        Path('arxiv/data/val_0_with_label.json.gz'), upload_path=UPLOAD_PATH
    )
    ref_preds_s3_path = client.upload_file(
        Path("arxiv/data/preds.train.jsonl.gz"), upload_path=UPLOAD_PATH
    )
    eval_preds_s3_path = client.upload_file(
        Path("arxiv/data/preds.val_0.jsonl.gz"), upload_path=UPLOAD_PATH
    )
else:
    ref_s3_path = f"{BLOB_STORE_URI}/{UPLOAD_PATH}/data/train.json.gz"
    eval_s3_path = f"{BLOB_STORE_URI}/{UPLOAD_PATH}/data/val_0_with_label.json.gz"

    ref_preds_s3_path = f"{BLOB_STORE_URI}/{UPLOAD_PATH}/data/preds.train.jsonl.gz"
    eval_preds_s3_path = f"{BLOB_STORE_URI}/{UPLOAD_PATH}/data/preds.val_0.jsonl.gz"

Once the data and model are uploaded to S3, we can register them to RIME. Once they’re registered, we can refer to these resources using their RIME-generated ID’s.

[ ]:
from datetime import datetime

dt = str(datetime.now())

# Note: models and datasets need to have unique names.
model_id = project.register_model(
    f'model_{dt}',
    model_config={"hugging_face": {"model_uri": "Wi/arxiv-distilbert-base-cased"}},
    agent_id=AGENT_ID
)
data_params = {
    "label_col": "label",
    "text_features": [
        "text"
    ],
    "timestamp_col": "timestamp"
}
ref_dataset_id = project.register_dataset_from_file(
    f"ref_dataset_{dt}", ref_s3_path, data_params=data_params, agent_id=AGENT_ID
)
eval_dataset_id = project.register_dataset_from_file(
    f"eval_dataset_{dt}", eval_s3_path, data_params=data_params, agent_id=AGENT_ID
)
project.register_predictions_from_file(
    ref_dataset_id, model_id, ref_preds_s3_path, agent_id=AGENT_ID
)
project.register_predictions_from_file(
    eval_dataset_id, model_id, eval_preds_s3_path, agent_id=AGENT_ID
)

Running a Stress Test

AI Stress Tests allow you to test your data and model before deployment. They are a comprehensive suite of hundreds of tests that automatically identify implicit assumptions and weaknesses of pre-production models. Each stress test is run on a single model and its associated reference and evaluation datasets.

Below is a sample configuration of how to setup and run a RIME Stress Test for NLP.

[ ]:
stress_test_config = {
    "run_name": "ArXiv Topic Classification",
    "data_info": {
        "ref_dataset_id": ref_dataset_id,
        "eval_dataset_id": eval_dataset_id,
    },
    "model_id": model_id,
    "run_time_info": {
        "random_seed": "42",
    },
    "categories": [
        "TEST_CATEGORY_TYPE_ADVERSARIAL",
        "TEST_CATEGORY_TYPE_SUBSET_PERFORMANCE",
        "TEST_CATEGORY_TYPE_TRANSFORMATIONS",
        "TEST_CATEGORY_TYPE_BIAS_AND_FAIRNESS", # this category is off by default
        "TEST_CATEGORY_TYPE_DATA_CLEANLINESS",
        "TEST_CATEGORY_TYPE_ABNORMAL_INPUTS"
    ]
}
stress_job = client.start_stress_test(
    test_run_config=stress_test_config,
    project_id=project.project_id,
    agent_id=AGENT_ID
)
stress_job.get_status(verbose=True, wait_until_finish=True)

Stress Test Results

Stress tests are grouped into categories that measure various aspects of model robustness (subset performance, distribution drift, abnormal input). Suggestions to improve your model are aggregated on the category level as well. Tests are ranked by default by a shared severity metric. Clicking on an individual test surfaces more detailed information.

You can view the detailed results in the UI by running the below cell and redirecting to the generated link. This page shows granular results for a given AI Stress Test run.

[ ]:
test_run = stress_job.get_test_run()
test_run

Analyzing the Results

Subset Performance Tests

Here are the results of the Subset Performance tests. These tests can be thought as more detailed performance tests that identify subsets of underperformance. These tests help ensure that the model works equally well across different groups.

Image of ST subset results on a test of a text classification model

Below we are exploring the “Subset Macro Precision” test cases for the text metadata feature “text.DetectedLanguage”. We can see that even though the model has a Macro Precision of 0.53, it performs poorly on certain subsets.

Image of ST subset macro precision results on a test of a text classification model

Transformation Tests

Here are the results of the Transformation tests. These tests can be thought as ways to test your models response to augmented text data. They help to make sure that your model is invariant to such changes in your data.

Image of ST value-transformation results on a test of a text classification model

Below we are exploring a transformation test that changes the original text to upper-case text. We see that this transformation causes the original class’s predicted score to change by 0.52. As a result, the model predicts an entirely new class for the text and misclassifies it.

Image of ST transformation test warnings for a text classification model

Programmatically Querying the Results

RIME not only provides you with an intuitive UI to visualize and explore these results, but also allows you to programmatically query these results. This allows customers to integrate with their MLOps pipeline, log results to experiment management tools like MLFlow, bring automated decision making to their ML practices, or store these results for future references.

Run the below cell to programmatically query the results. The results are outputted as a pandas dataframe.

Access results at the a test run overview level

[ ]:
test_run_result = test_run.get_result_df()
test_run_result.to_csv("Arxiv_Test_Run_Results.csv")
test_run_result

Access detailed test results at each individual test cases level

[ ]:
test_case_result = test_run.get_test_cases_df()
test_case_result.to_csv("Arxiv_Test_Case_Results.csv")
test_case_result

Access detailed test results for a given test batch

[ ]:
subset_macro_f1 = test_run.get_test_batch("subset_performance:subset_macro_f1")
subset_macro_f1.get_test_cases_df()

Deploy to Production and set up Continuous Testing

Once you have identified the best stress test run, you can deploy the associated model and set up Continuous Testing in order to automatically detect “bad” incoming data and statistically significant distributional drift.

[ ]:
from datetime import timedelta

project.update_ct_categories(["TEST_CATEGORY_TYPE_ABNORMAL_INPUTS",
                              "TEST_CATEGORY_TYPE_DRIFT",
                              "TEST_CATEGORY_TYPE_BIAS_AND_FAIRNESS",
                              "TEST_CATEGORY_TYPE_EVASION_ATTACK_DETECTION"
                            ])

ct_instance = project.create_ct(model_id, ref_dataset_id, timedelta(days=1))

Uploading a Batch of Production Data & Model Predictions to Continuous Testing

The text classification model has been in production for the past week. Production data and model predictions have been collected and stored for the past week. Now, we will use Continuous Testing to track how the model performed across the last week.

Upload the Latest Batch of Production Data

[ ]:
dt = str(datetime.now())
data_params = {
    "text_features": [
        "text"
    ],
    "timestamp_col": "timestamp"
}

if IS_SAAS:
    prod_s3_path = client.upload_file(
        Path('arxiv/data/val_1.json.gz'),
        upload_path=UPLOAD_PATH
    )
    prod_preds_s3_path = client.upload_file(
        Path('arxiv/data/preds.val_1.jsonl.gz'),
        upload_path=UPLOAD_PATH,
    )
else:
    prod_s3_path = f"{BLOB_STORE_URI}/{UPLOAD_PATH}/data/val_1.json.gz"
    prod_preds_s3_path = f"{BLOB_STORE_URI}/{UPLOAD_PATH}/data/preds.val_1.jsonl.gz"

prod_dataset_id = project.register_dataset_from_file(
    f"prod_dataset_{dt}",
    prod_s3_path,
    data_params=data_params,
    agent_id=AGENT_ID
)
project.register_predictions_from_file(
    prod_dataset_id, model_id, prod_preds_s3_path, agent_id=AGENT_ID
)

Get Continuous Testing

[ ]:
ct_instance = client.get_ct_for_project(project.project_id)

Run Continuous Testing over Batch of Data

[ ]:
ct_job = ct_instance.start_continuous_test(prod_dataset_id, agent_id=AGENT_ID)
ct_job.get_status(verbose=True, wait_until_finish=True)
ct_instance

Wait for a couple minutes and your results will appear in the UI

Querying Results from Continuous Testing

After continuous testing has been set up and data has been uploaded for processing, the user can query the results throughout the entire uploaded history.

Obtain All Detection Events

[ ]:
events = [d.to_dict() for m in ct_instance.list_monitors() for d in m.list_detected_events()]
events_df = pd.DataFrame(events).drop(["id", "project_id", "firewall_id", "event_object_id", "description_html", "last_update_time"], axis=1)
events_df.head()

CT Overview

The Overview page is the mission control for your model’s production deployment health. In it, you can see the status of continuous test runs and their metrics change over time.

Image of a graphed test result history for a text classification model

CT Results

The Continuous Tests operate at the batch level and provide a mechanism to monitor the health of ML deployments in production. They allow the user to understand when errors begin to occur and surface the underlying drivers of such errors.

You can explore the results in the UI by running the below cell and redirecting to the generated link.

[ ]:
ct_instance

Analyzing CT Results

Failing Rows Rate Increases For a Time Period - In the below image, we can see that the failing rows rate has increased in the middle of the week, from when the model was first deployed.

Image of a graph showing an increase in the failing rows rate for a text classification model