Python SDK

Python package providing access to RIME’s backend services.

The main entry point should be through the Client. The other classes provide more modular functionality.

class rime_sdk.Client(domain: str, api_key: str = '', channel_timeout: float = 5.0, disable_tls: bool = False)

The Client provides an interface to RIME’s backend services for creating projects, starting stress test jobs, and querying the backend for current stress test jobs.

To initialize the Client, provide the address of your RIME instance.

Parameters:
  • domain – str The base domain/address of the RIME service.

  • api_key – str The api key providing authentication to RIME services.

  • channel_timeout – float The amount of time in seconds to wait for channels to become ready when opening connections to gRPC servers.

Raises:

ValueError – If a connection cannot be made to a backend service within timeout.

Example:

rime_client = Client("my_vpc.rime.com", "api-key")
create_project(name: str, description: str) Project

Create a new RIME project in RIME’s backend.

Projects allow you to organize stress test runs as you see fit. A natural way to organize stress test runs is to create a project for each specific ML task, such as predicting whether a transaction is fraudulent.

Parameters:
  • name – str Name of the new project.

  • description – str Description of the new project.

Returns:

A Project that allows users to interact with it. Its project_id attribute can be used in start_stress_test() and list_stress_test_jobs().

Raises:

ValueError – If the request to the Upload service failed.

Example:

project = rime_client.create_project(name='foo', description='bar')
get_project(project_id: str) Project

Get project by project ID.

delete_project(project_id: str) None

Delete a project in RIME’s backend.

create_managed_image(name: str, requirements: List[PipRequirement]) RIMEImageBuilder

Create a new managed Docker image with the desired PIP requirements to run RIME on.

These managed Docker images are managed by the RIME backend and will automatically be upgraded when you update your version of RIME. Note: Images take a few minutes to be built.

This method returns an object that can be used to track the progress of the image building job. The new custom image is only available for use in a stress test once it has status READY.

Parameters:
  • name – str The (unique) name of the new managed image. This acts as the unique identifier of the managed image. The call will fail if an image with the specified name already exists.

  • requirements – List[ManagedImage.PipRequirement] List of additional pip requirements to be installed on the managed image. A ManagedImage.PipRequirement can be created with the helper method Client.pip_requirement. The first argument is the name of the library (e.g. tensorflow or xgboost) and the second argument is a valid pip version specifier (e.g. >=0.1.2 or ==1.0.2).

Returns:

A RIMEImageBuilder object that provides an interface for monitoring the job in the backend.

Raises:

ValueError – If the request to the ImageRegistry service failed.

Example:

requirements = [
     # Fix the version of `xgboost` to `1.0.2`.
     rime_client.pip_requirement("xgboost", "==1.0.2"),
     # We do not care about the installed version of `tensorflow`.
     rime_client.pip_requirement("tensorflow")
 ]

# Start a new image building job
builder_job = rime_client.create_managed_image("xgboost102_tensorflow",
requirements)

# Wait until the job has finished and print out status information.
# Once this prints out the `READY` status, your image is available for
# use in stress tests.
builder_job.get_status(verbose=True, wait_until_finish=True)
has_managed_image(name: str) bool

Check whether managed image with name exists.

Parameters:

name – str The (unique) name of the new managed image. This acts as the unique identifier of the managed image. The call will return False if no image exists with this name, True if one does.

Returns:

Boolean for whether managed image with this name exists.

Example:

if rime_client.has_managed_image("xgboost102_tensorflow"):
     ....
get_managed_image(name: str) Dict

Get managed image by name.

Parameters:

name – str The (unique) name of the new managed image. This acts as the unique identifier of the managed image. The call will return None if no image exists with this name.

Returns:

A dictionary with information about the managed image.

Example:

image = rime_client.get_managed_image("xgboost102_tensorflow")
delete_managed_image(name: str) None

Delete a managed Docker image.

Parameters:

name – str The (unique) name of the managed image. This acts as the unique identifier of the managed image.

static pip_requirement(name: str, version_specifier: Optional[str] = None) PipRequirement

Construct a PipRequirement object for use in create_managed_image().

static pip_library_filter(name: str, fixed_version: Optional[str] = None) PipLibraryFilter

Construct a PipLibraryFilter object for use in list_managed_images().

list_managed_images(pip_library_filters: Optional[List[PipLibraryFilter]] = None) Iterator[Dict]

List all the managed Docker images.

This is where the true power of the managed images feature lies. You can search for images with specific pip libraries installed so that you do not have to create a new managed image every time you need to run a stress test.

Parameters:

pip_library_filters – Optional[List[ListImagesRequest.PipLibraryFilter]] Optional list of pip libraries to filter by. Construct each ListImagesRequest.PipLibraryFilter object with the pip_library_filter convenience method.

Returns:

An iterator of all the managed images

Raises:

ValueError – If the request to the ImageRegistry service failed or the list of pip library filters is improperly specified.

Example:

# Filter for an image with catboost1.0.3 and tensorflow installed.
filters = [
    rime_client.pip_library_filter("catboost", "1.0.3"),
    rime_client.pip_library_filter("tensorflow"),
]

# Query for the images.
images = rime_client.list_managed_images(
    pip_library_filters=filters)

# To get the names of an image.
[image["name"] for image in images]
list_projects() Iterator[Project]

List projects in a paginated form.

Returns:

An iterator of all the projects.

Raises:

ValueError – If the request to the ProjectManager service fails.

Example:

# Query for projects.
projects = rime_client.list_projects()
start_stress_test(test_run_config: dict, project_id: Optional[str] = None, custom_image: Optional[CustomImage] = None, rime_managed_image: Optional[str] = None, ram_request_megabytes: Optional[int] = None, cpu_request_millicores: Optional[int] = None, data_type: str = 'tabular') Job

Start a RIME model stress test on the backend’s ModelTesting service.

Parameters:
  • test_run_config – dict Configuration for the test to be run, which specifies paths to the model and datasets to used for the test.

  • project_id – Optional[str] Identifier for the project where the resulting test run will be stored. If not specified, the results will be stored in the default project.

  • custom_image – Optional[CustomImage] Specification of a customized container image to use running the model test. The image must have all dependencies required by your model. The image must specify a name for the image and optional a pull secret (of type CustomImage.PullSecret) with the name of the kubernetes pull secret used to access the given image.

  • rime_managed_image – Optional[str] Name of a managed image to use when running the model test. The image must have all dependencies required by your model. To create new managed images with your desired dependencies, use the client’s create_managed_image() method.

  • ram_request_megabytes – int Megabytes of RAM requested for the stress test job. The limit is 2x the megabytes requested.

  • cpu_request_millicores – int Millicores of CPU requested for the stress test job. The limit is 2x the millicores requested.

  • data_type – str Type of data this firewall test is to be run on. Should be one of tabular, nlp, images. Defaults to tabular.

Returns:

A Job providing information about the model stress test job.

Raises:

ValueError – If the request to the ModelTest service failed.

Example

This example will likely not work for you because it requires permissions to a specific S3 bucket. This demonstrates how you might specify such a configuration.

config = {
    "run_name": "Titanic",
    "data_info": {
        "label_col": "Survived",
        "ref_path": "s3://rime-datasets/titanic/titanic_example.csv",
        "eval_path": "s3://rime-datasets/titanic/titanic_example.csv"
    },
    "model_info": {
        "path": "s3://rime-models/titanic_s3_test/titanic_example_model.py"
    }
}

Run the job using the specified config and the default Docker image in the RIME backend. Store the results under project ID foo. Use the RIME Managed Image tensorflow115. This assumes you have already created the Managed Image and waited for it to be ready.

job = rime_client.start_stress_test_job(
 test_run_config=config, project_id="foo",
 rime_managed_image="tensorflow115")
get_test_run(test_run_id: str) TestRun

Get a TestRun object for interacting with the given test_run_id.

Checks to see if the test_run_id exists, then returns TestRun object.

Parameters:

test_run_id – str ID of the test run to query for

Returns:

A TestRun object corresponding to the test_run_id

list_stress_test_jobs(status_filters: Optional[List[str]] = None) List[Job]

Query the backend for a list of jobs filtered by status.

This is a good way to recover Job objects. Note that this only returns jobs from the last two days, because the time-to-live of job objects in the backend is set at two days.

Parameters:

status_filters – Optional[List[str]] = None Filter for selecting jobs by a union of statuses. The following list enumerates all acceptable values. [‘pending’, ‘running’, ‘failed’, ‘succeeded’] If omitted, jobs will not be filtered by status.

Returns:

A list of Job objects. These are not guaranteed to be in any sorted order.

Raises:

ValueError – If the provided status_filters array has invalid values. If the request to the ModelTest service failed.

Example:

# Get all running and succeeded jobs for project 'foo'
jobs = rime_client.list_stress_test_jobs(
    status_filters=['pending', 'succeeded'],
)
get_firewall_for_project(project_id: str) Firewall

Get the active fw for a project if it exists.

Query the backend for an active Firewall in a specified project which can be used to perform Firewall operations. If there is no active Firewall for the project, this call will error.

Parameters:

project_id – ID of the project which contains a Firewall.

Returns:

A Firewall object.

Raises:

ValueError – If the Firewall does not exist.

Example:

# Get FW in foo-project if it exists.
firewall = rime_client.get_firewall_for_project("foo-project")
upload_dataset_file(file_path: Union[Path, str], upload_path: Optional[str] = None) str

Upload a dataset file to make it accessible to RIME’s backend.

The uploaded file is stored with RIME’s backend in a blob store using its file name.

Parameters:
  • file_path – Union[Path, str] Path to the file to be uploaded to RIME’s blob store.

  • upload_path – Optional[str] = None Name of the directory in the blob store file system. If omitted, a unique random string will be the direcory.

Returns:

A reference to the uploaded file’s location in the blob store. This reference can be used to refer to that object when writing RIME configs. Please store this reference for future access to the file.

Raises:
  • FileNotFoundError – If the path file_path does not exist.

  • IOError – If file_path is not a file.

  • ValueError – If the specified upload_path is an empty string. If there was an error in obtaining a blobstore location from the RIME backend or in uploading file_path to RIME’s blob store. In the scenario the file fails to upload, the incomplete file will NOT automatically be deleted.

upload_local_image_dataset_file(file_path: Union[Path, str], image_path_key: str = 'image_path', upload_path: Optional[str] = None) Tuple[Dict, str]

Upload an image dataset file where image files are stored locally.

The image dataset file is expected to be a list of JSON dictionaries, with an image_path_key that references an image (either an absolute path or a relative path to an image file stored locally). Every image within the file is also uploaded to blob store, and the final file is also uploaded. If your image paths already reference an external blob storage, then use upload_dataset_file instead to upload the dataset file.

Parameters:

file_path – Union[Path, str] Path to the file to be uploaded to RIME’s blob store.

Returns:

A tuple containing of a (dict, string). The dict contains the updated dataset file with image paths replaced by s3 paths. The string contains a reference to the uploaded file’s location in the blob store. This reference can be used to refer to that object when writing RIME configs. Please store this reference for future access to the file.

Raises:
  • FileNotFoundError – If the path file_path does not exist.

  • IOError – If file_path is not a file.

  • ValueError – If there was an error in obtaining a blobstore location from the RIME backend or in uploading file_path to RIME’s blob store. In the scenario the file fails to upload, the incomplete file will NOT automatically be deleted.

upload_model_directory(dir_path: Union[Path, str], upload_hidden: bool = False, upload_path: Optional[str] = None) str

Upload a model directory to make it accessible to RIME’s backend.

The uploaded directory is stored within RIME’s backend in a blob store. All files contained within dir_path and its subdirectories are uploaded according to their relative paths within dir_path. However, if upload_hidden is False, all hidden files and subdirectories beginning with a ‘.’ are not uploaded.

Parameters:
  • dir_path – Union[Path, str] Path to the directory to be uploaded to RIME’s blob store.

  • upload_hidden – bool = False Whether or not to upload hidden files or subdirectories (ie. those beginning with a ‘.’).

  • upload_path – Optional[str] = None Name of the directory in the blob store file system. If omitted, a unique random string will be the direcory.

Returns:

A reference to the uploaded directory’s location in the blob store. This reference can be used to refer to that object when writing RIME configs. Please store this reference for future access to the directory.

Raises:
  • FileNotFoundError – If the directory dir_path does not exist.

  • IOError – If dir_path is not a directory or contains no files.

  • ValueError – If the specified upload_path is an empty string. If there was an error in obtaining a blobstore location from the RIME backend or in uploading dir_path to RIME’s blob store. In the scenario the directory fails to upload, files will NOT automatically be deleted.

list_uploaded_file_urls() Iterator[str]

Return an iterator of file paths that have been uploaded.

get_job(job_id: str) Job

Get job by id.

class rime_sdk.Project(backend: RIMEBackend, project_id: str)

An interface to a RIME project.

This object provides an interface for editing, updating, and deleting projects.

backend

RIMEBackend The RIME backend used to query about the status of the job.

project_id

str The identifier for the RIME project that this object monitors.

property info: ProjectInfo

Return information about this project.

property name: str

Return the name of this project.

property description: str

Return the description of this project.

list_test_runs() Iterator[TestRun]

List all the test runs associated with the project.

create_firewall(name: str, bin_size: str, test_run_id: str) Firewall

Create a Firewall for a given project.

Parameters:
  • name – str FW name.

  • bin_size – str Bin size. Can be year, month, week, day, hour.

  • test_run_id – str ID of the stress test run that firewall will be based on.

Returns:

A Firewall object.

Raises:

ValueError – If the provided status_filters array has invalid values. If the request to the ModelTest service failed.

Example:

# Create FW based on foo stress test in project.
firewall = project.create_firewall(
    "firewall name", "day", "foo")
get_firewall() Firewall

Get the active Firewall for a project if it exists.

Query the backend for an active Firewall in this project which can be used to perform Firewall operations. If there is no active Firewall for the project, this call will error.

Returns:

A Firewall object.

Raises:

ValueError – If the Firewall does not exist.

Example:

# Get FW if it exists.
firewall = project.get_firewall()
has_firewall() bool

Check whether a project has a firewall or not.

delete_firewall() None

Delete firewall for this project if exists.

class rime_sdk.Job(backend: RIMEBackend, job_id: str, job_type: JobType.V)

An interface to a RIME job.

This object provides an interface for monitoring the status of a job in the RIME backend.

property job_id: str

Return the id of the job.

property job_type: JobType.V

Return the type of the job.

get_test_run_id() str

Get the test run ID for a successful job.

Raises:

ValueError if the job does not have state 'SUCCEEDED.'

get_test_run() TestRun

Get the test run object.

Raises:

ValueError if the job does not have state 'SUCCEEDED.'

get_status(verbose: bool = False, wait_until_finish: bool = False, poll_rate_sec: float = 5.0) Dict

Query the ModelTest service for the job’s status.

This includes flags for blocking until the job is complete and printing information to stdout. This method can help with monitoring the progress of stress test jobs, because it prints out helpful information such as running time and the progress of the test run.

If the job has failed, the logs of the testing engine will be dumped to stdout to help with debuggability.

Parameters:
  • verbose – bool whether or not to print diagnostic information such as progress. Note that these logs have no strict form and will be subject to significant change in future versions.

  • wait_until_finish – bool whether or not to block until the job has succeeded or failed. If verbose is enabled too, information about the job including running time and progress will be printed to stdout every poll_rate_sec.

  • poll_rate_sec – float the frequency with which to poll the job’s status. Units are in seconds.

Returns:

A dictionary representing the job’s state.

{
"id": str
"type": str
"status": str
"start_time_secs": int64
"running_time_secs": double
}

Example:

# Block until this job is finished and dump monitoring info to stdout.
job_status = job.get_status(verbose=True, wait_until_finish=True)
class rime_sdk.TestRun(backend: RIMEBackend, test_run_id: str)

An interface for a RIME test run.

backend

RIMEBackend The RIME backend used to query about the test run.

test_run_id

str The string identifier for the successfully completed test run.

Get the web app URL for a successful stress test run.

This link directs to your organization’s deployment of RIME. You can view more detailed information about the results of your stress test in the web app, including helpful visualiziations, key insights, and explanations of test results.

Note: this is a string that should be copy-pasted into a browser.

get_result_df() DataFrame

Retrieve high level summary information for a complete stress test run in a single-row dataframe.

This dataframe includes information such as model metrics on the reference and evaluation datasets, overall RIME results such as severity across tests, and high level metadata such as the project ID and model task.

By concatenating these rows together, this allows you to build a table of test run results for sake of comparison. This only works on stress test jobs that have succeeded.

Note: this does not work on <0.14.0 RIME test runs.

Returns:

A pandas.DataFrame object containing the test run result. There are a lot of columns, so it is worth viewing them with the .columns method to see what they are. Generally, these columns have information about the model and datasets as well as summary statistics like the number of failing test cases or number of high severity test cases.

Example:

test_run = client.get_test_run(some_test_run_id)
test_run_result_df = test_run.get_result_df()
get_test_cases_df(show_test_case_metrics: bool = False) DataFrame

Retrieve all the test cases for a completed stress test run in a dataframe.

This gives you the ability to perform granular queries on test cases. For example, if you only care about subset performance tests and want to see the results on each feature, you can fetch all the test cases in a dataframe, then query on that dataframe by test type. This only works on stress test jobs that have succeeded.

Note: this will not work for test runs run on RIME versions <0.14.0.

Parameters:

show_test_case_metrics – bool = False Whether to show test case specific metrics. This could result in a sparse dataframe that is returned, since test cases return different metrics. Defaults to False.

Returns:

A pandas.DataFrame object containing the test case results. Here is a selected list of columns in the output: 1. test_run_id: ID of the parent test run. 2. features: List of features that the test case ran on. 3. test_batch_type: Type of test that was run (e.g. Subset AUC, Must be Int, etc.). 4. status: Status of the test case (e.g. Pass, Fail, Skip, etc.). 5. severity: Metric that denotes the severity of the failure of the test.

Example:

# Wait until the job has finished, since this method only works on
# SUCCEEDED jobs.
job.get_status(verbose=True, wait_until_finish=True)
# Get the test run result.
test_run = job.get_test_run()
# Dump the test cases in dataframe ``df``.
df = test_run.get_test_cases_df()
get_test_batch(test_type: str) TestBatch

Obtain the corresponding test batch.

A TestBatch object allows a user to query the results for the corresponding test. For example, the TestBatch object representing unseen_categorical allows a user to understand the results of the unseen_categorical test to varying levels of granularity.

Parameters:

test_type – str Name of the test.

Returns:

A TestBatch representing test_type.

Example:

batch = test_run.get_test_batch("unseen_categorical")
get_test_batches() Iterator[TestBatch]

Get all test batches for a given project.

Returns:

An iterator across TestBatch objects.

class rime_sdk.TestBatch(backend: RIMEBackend, test_run_id: str, test_type: str)

An interface for a test batch in a RIME test run.

backend

RIMEBackend The RIME backend used to query about the test run.

test_run_id

str The string identifier for the successfully completed test run.

test_type

str The unique identifer for the test type e.g. unseen_categorical.

summary() Series

Obtain the test batch summary as a Pandas Series.

The summary contains high level information about a test batch. For example, the name of the test batch, the category, and the severity of the test batch as a whole.

Returns:

  1. test_run_id

  2. test_type

  3. test_name

  4. category

  5. duration_in_millis

  6. severity

  7. failing_features

  8. description

  9. summary_counts.total

  10. summary_counts.pass

  11. summary_counts.fail

  12. summary_counts.warning

  13. summary_counts.skip

Return type:

A Pandas Series with the following columns

get_test_cases_df() DataFrame

Obtain a dataframe which delinates all test cases.

Different tests will have different columns/information. For example, some tests may have a column representing the number of failing rows.

Returns:

A Pandas Dataframe where each row represents a test case.

class rime_sdk.Firewall(backend: RIMEBackend, firewall_id: str)

Firewall object wrapper with helpful methods for working with RIME Firewall.

backend

RIMEBackend The RIME backend used to query about the status of the job.

firewall_id

str How to refer to the FW in the backend. Use this attribute to specify the Firewall for tasks in the backend.

delete_firewall() None

Delete firewall.

update_firewall_stress_test_run(stress_test_run_id: str) UpdateFirewallResponse

Update firewall with stress test run id.

Parameters:

stress_test_run_id – Stress Test Run Id to configure new firewall

Returns:

None

Raises:

ValueError – If the provided status_filters array has invalid values. If the request to the ModelTest service failed.

Get the web app URL to the firewall.

This link directs to your organization’s deployment of RIME. You can view more detailed information about the firewall in the web app, including helpful visualizations, key insights on your model’s performance, and explanations of test results for each batch.

Note: this is a string that should be copy-pasted into a browser.

run_firewall_incremental_data(test_run_config: dict, disable_firewall_events: bool = True, custom_image: Optional[CustomImage] = None, rime_managed_image: Optional[str] = None, ram_request_megabytes: Optional[int] = None, cpu_request_millicores: Optional[int] = None) Job

Start a RIME model firewall test on the backend’s ModelTesting service.

This allows you to run Firewall Test job on the RIME backend. This will run firewall on a batch of tabular data.

Parameters:
  • test_run_config – dict Configuration for the test to be run, which specifies paths to the model and datasets to used for the test.

  • custom_image – Optional[CustomImage] Specification of a customized container image to use running the model test. The image must have all dependencies required by your model. The image must specify a name for the image and optional a pull secret (of type CustomImage.PullSecret) with the name of the kubernetes pull secret used to access the given image.

  • rime_managed_image – Optional[str] Name of a managed image to use when running the model test. The image must have all dependencies required by your model. To create new managed images with your desired dependencies, use the client’s create_managed_image() method.

  • ram_request_megabytes – int Megabytes of RAM requested for the stress test job. If none specified, will default to 4000MB. The limit is 2x the megabytes requested.

  • cpu_request_millicores – int Millicores of CPU requested for the stress test job. If none specified, will default to 1500mi. The limit is 2x the millicores requested.

Returns:

A Job providing information about the model stress test job.

Raises:

ValueError – If the request to the ModelTest service failed.

Example:

# This example will likely not work for you because it requires permissions
# to a specific S3 bucket. This demonstrates how you might specify such a
# configuration.
incremental_config = {
    "eval_path": "s3://rime-datasets/
       fraud_continuous_testing/eval_2021_04_30_to_2021_05_01.csv",
    "timestamp_col": "timestamp"
}
# Run the job using the specified config and the default Docker image in
# the RIME backend. Use the RIME Managed Image "tensorflow115".
# This assumes you have already created the Managed Image and waited for it
# to be ready.
firewall = rime_client.get_firewall("foo")
job =
    firewall.run_firewall_incremental_data(
        test_run_config=incremental_config,
        rime_managed_image="tensorflow115",
        ram_request_megabytes=8000,
        cpu_request_millicores=2000)
class rime_sdk.RIMEImageBuilder(backend: RIMEBackend, name: str, requirements: Optional[List[PipRequirement]] = None)

An interface to a RIME image builder.

get_status(verbose: bool = False, wait_until_finish: bool = False, poll_rate_sec: float = 5.0) Dict

Query the ImageRegistry service for the image’s build status.

This query includes an option to wait until the image build is finished. It will either have succeeded or failed.

Parameters:
  • verbose – bool whether or not to print diagnostic information such as logs.

  • wait_until_finish – bool whether or not to block until the image is READY or FAILED.

  • poll_rate_sec – float the frequency with which to poll the image’s build status.

Returns:

A dictionary representing the image’s state.