How to configure a RuntimeDataConnector (V3 API only)

This guide demonstrates how to configure a RuntimeDataConnector and only applies for the V3 (Batch Request) API. A RuntimeDataConnector allows you to specify a Batch using a Runtime Batch Request, which is used to create a Validator. A Validator is the key object used to create Expectations and validate datasets.

A RuntimeDataConnector is a special kind of Data Connector that enables you to use a RuntimeBatchRequest to provide a Batch’s data directly at runtime. The RuntimeBatchRequest can wrap either an in-memory dataframe, filepath, or SQL query, and must include batch identifiers that uniquely identify the data (e.g. a run_id from an AirFlow DAG run). The batch identifiers that must be passed in at runtime are specified in the RuntimeDataConnector’s configuration.

Add a RuntimeDataConnector to a Datasource configuration

The following example uses test_yaml_config and sanitize_yaml_and_save_datasource to add a new SQL Datasource to a project’s great_expectations.yml. If you already have configured Datasources, you can add an additional RuntimeDataConnector configuration directly to your great_expectations.yml.

Note

Currently, RuntimeDataConnector cannot be used with Datasources of type SimpleSqlalchemyDatasource.

import great_expectations as gx
from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource

context = gx.get_context()
config = f"""
  name: my_sqlite_datasource
  class_name: Datasource
  execution_engine:
    class_name: SqlAlchemyExecutionEngine
    connection_string: sqlite:///my_db_file
  data_connectors:
    my_runtime_data_connector:
      class_name: RuntimeDataConnector
      batch_identifiers:
        - pipeline_stage_name
        - airflow_run_id
  """
context.test_yaml_config(
    yaml_config=config
)
sanitize_yaml_and_save_datasource(context, config, overwrite_existing=False)

At runtime, you would get a Validator from the Data Context as follows:

validator = context.get_validator(
    batch_request=RuntimeBatchRequest(
        datasource_name="my_sqlite_datasource",
        data_connector_name="my_runtime_data_connector",
        data_asset_name="my_data_asset_name",
        runtime_parameters={
            "query": "SELECT * FROM table_partitioned_by_date_column__A"
        },
        batch_identifiers={
            "pipeline_stage_name": "core_processing",
            "airflow_run_id": 1234567890,
        },
    ),
    expectation_suite=my_expectation_suite,
)

# Simplified call to get_validator - RuntimeBatchRequest is inferred under the hood
validator = context.get_validator(
    datasource_name="my_sqlite_datasource",
    data_connector_name="my_runtime_data_connector",
    data_asset_name="my_data_asset_name",
    runtime_parameters={
        "query": "SELECT * FROM table_partitioned_by_date_column__A"
    },
    batch_identifiers={
        "pipeline_stage_name": "core_processing",
        "airflow_run_id": 1234567890,
    },
    expectation_suite=my_expectation_suite,
)