great_expectations.execution_engine

Package Contents

Classes

ExecutionEngine(name=None, caching=True, batch_spec_defaults=None, batch_data_dict=None, validator=None)

Helper class that provides a standard way to create an ABC using

PandasExecutionEngine(*args, **kwargs)

PandasExecutionEngine instantiates the great_expectations Expectations API as a subclass of a pandas.DataFrame.

SparkDFExecutionEngine(*args, persist=True, spark_config=None, force_reuse_spark_context=False, **kwargs)

This class holds an attribute spark_df which is a spark.sql.DataFrame.

SqlAlchemyExecutionEngine(name: Optional[str] = None, credentials: Optional[dict] = None, data_context: Optional[Any] = None, engine: Optional[SaEngine] = None, connection_string: Optional[str] = None, url: Optional[str] = None, batch_data_dict: Optional[dict] = None, create_temp_table: bool = True, concurrency: Optional[ConcurrencyConfig] = None, **kwargs)

Helper class that provides a standard way to create an ABC using

class great_expectations.execution_engine.ExecutionEngine(name=None, caching=True, batch_spec_defaults=None, batch_data_dict=None, validator=None)

Bases: abc.ABC

Helper class that provides a standard way to create an ABC using inheritance.

recognized_batch_spec_defaults :Set[str]
configure_validator(self, validator)

Optionally configure the validator as appropriate for the execution engine.

property config(self)
property dialect(self)
property batch_manager(self)

Getter for batch_manager

_load_batch_data_from_dict(self, batch_data_dict: Dict[str, BatchDataType])

Loads all data in batch_data_dict using cache_batch_data

load_batch_data(self, batch_id: str, batch_data: BatchDataType)
get_batch_data(self, batch_spec: BatchSpec)

Interprets batch_data and returns the appropriate data.

This method is primarily useful for utility cases (e.g. testing) where data is being fetched without a DataConnector and metadata like batch_markers is unwanted

Note: this method is currently a thin wrapper for get_batch_data_and_markers. It simply suppresses the batch_markers.

abstract get_batch_data_and_markers(self, batch_spec)
resolve_metrics(self, metrics_to_resolve: Iterable[MetricConfiguration], metrics: Optional[Dict[Tuple[str, str, str], MetricValue]] = None, runtime_configuration: Optional[dict] = None)

resolve_metrics is the main entrypoint for an execution engine. The execution engine will compute the value of the provided metrics.

Parameters
  • metrics_to_resolve – the metrics to evaluate

  • metrics – already-computed metrics currently available to the engine

  • runtime_configuration – runtime configuration information

Returns

a dictionary with the values for the metrics that have just been resolved.

Return type

resolved_metrics (Dict)

abstract resolve_metric_bundle(self, metric_fn_bundle)

Resolve a bundle of metrics with the same compute domain as part of a single trip to the compute engine.

abstract get_domain_records(self, domain_kwargs: dict)

get_domain_records computes the full-access data (dataframe or selectable) for computing metrics based on the given domain_kwargs and specific engine semantics.

Returns

data corresponding to the compute domain

abstract get_compute_domain(self, domain_kwargs: dict, domain_type: Union[str, MetricDomainTypes])

get_compute_domain computes the optimal domain_kwargs for computing metrics based on the given domain_kwargs and specific engine semantics.

Returns

  1. data corresponding to the compute domain;

  2. a modified copy of domain_kwargs describing the domain of the data returned in (1);

  3. a dictionary describing the access instructions for data elements included in the compute domain

    (e.g. specific column name).

In general, the union of the compute_domain_kwargs and accessor_domain_kwargs will be the same as the domain_kwargs provided to this method.

Return type

A tuple consisting of three elements

add_column_row_condition(self, domain_kwargs, column_name=None, filter_null=True, filter_nan=False)

EXPERIMENTAL

Add a row condition for handling null filter.

Parameters
  • domain_kwargs – the domain kwargs to use as the base and to which to add the condition

  • column_name – if provided, use this name to add the condition; otherwise, will use “column” key from table_domain_kwargs

  • filter_null – if true, add a filter for null values

  • filter_nan – if true, add a filter for nan values

resolve_data_reference(self, data_connector_name: str, template_arguments: dict)

Resolve file path for a (data_connector_name, execution_engine_name) combination.

_split_domain_kwargs(self, domain_kwargs: Dict[str, Any], domain_type: Union[str, MetricDomainTypes], accessor_keys: Optional[Iterable[str]] = None)

Split domain_kwargs for all domain types into compute and accessor domain kwargs.

Parameters
  • domain_kwargs – A dictionary consisting of the domain kwargs specifying which data to obtain

  • domain_type – an Enum value indicating which metric domain the user would

  • to be using, or a corresponding string value representing it. String types include "identity", (like) –

  • "column_pair", "table" and "other". Enum types include capitalized versions of these from the ("column",) –

  • MetricDomainTypes. (class) –

  • accessor_keys – keys that are part of the compute domain but should be ignored when

  • the domain and simply transferred with their associated values into accessor_domain_kwargs. (describing) –

Returns

compute_domain_kwargs, accessor_domain_kwargs from domain_kwargs The union of compute_domain_kwargs, accessor_domain_kwargs is the input domain_kwargs

static _split_table_metric_domain_kwargs(domain_kwargs: dict, domain_type: MetricDomainTypes, accessor_keys: Optional[Iterable[str]] = None)

Split domain_kwargs for table domain types into compute and accessor domain kwargs.

Parameters
  • domain_kwargs – A dictionary consisting of the domain kwargs specifying which data to obtain

  • domain_type – an Enum value indicating which metric domain the user would

  • to be using. (like) –

  • accessor_keys – keys that are part of the compute domain but should be ignored when

  • the domain and simply transferred with their associated values into accessor_domain_kwargs. (describing) –

Returns

compute_domain_kwargs, accessor_domain_kwargs from domain_kwargs The union of compute_domain_kwargs, accessor_domain_kwargs is the input domain_kwargs

static _split_column_metric_domain_kwargs(domain_kwargs: dict, domain_type: MetricDomainTypes)

Split domain_kwargs for column domain types into compute and accessor domain kwargs.

Parameters
  • domain_kwargs – A dictionary consisting of the domain kwargs specifying which data to obtain

  • domain_type – an Enum value indicating which metric domain the user would

  • to be using. (like) –

Returns

compute_domain_kwargs, accessor_domain_kwargs from domain_kwargs The union of compute_domain_kwargs, accessor_domain_kwargs is the input domain_kwargs

static _split_column_pair_metric_domain_kwargs(domain_kwargs: dict, domain_type: MetricDomainTypes)

Split domain_kwargs for column pair domain types into compute and accessor domain kwargs.

Parameters
  • domain_kwargs – A dictionary consisting of the domain kwargs specifying which data to obtain

  • domain_type – an Enum value indicating which metric domain the user would

  • to be using. (like) –

Returns

compute_domain_kwargs, accessor_domain_kwargs from domain_kwargs The union of compute_domain_kwargs, accessor_domain_kwargs is the input domain_kwargs

static _split_multi_column_metric_domain_kwargs(domain_kwargs: dict, domain_type: MetricDomainTypes)

Split domain_kwargs for multicolumn domain types into compute and accessor domain kwargs.

Parameters
  • domain_kwargs – A dictionary consisting of the domain kwargs specifying which data to obtain

  • domain_type – an Enum value indicating which metric domain the user would

  • to be using. (like) –

Returns

compute_domain_kwargs, accessor_domain_kwargs from domain_kwargs The union of compute_domain_kwargs, accessor_domain_kwargs is the input domain_kwargs

class great_expectations.execution_engine.PandasExecutionEngine(*args, **kwargs)

Bases: great_expectations.execution_engine.ExecutionEngine

PandasExecutionEngine instantiates the great_expectations Expectations API as a subclass of a pandas.DataFrame.

For the full API reference, please see Dataset

Notes

  1. Samples and Subsets of PandaDataSet have ALL the expectations of the original data frame unless the user specifies the discard_subset_failing_expectations = True property on the original data frame.

  2. Concatenations, joins, and merges of PandaDataSets contain NO expectations (since no autoinspection is performed by default).

Feature Maturity

icon-8f58ea407afb11eda8370242ac110002 Validation Engine - Pandas - How-to Guide
Use Pandas DataFrame to validate data
Maturity: Production
Details:
API Stability: Stable
Implementation Completeness: Complete
Unit Test Coverage: Complete
Integration Infrastructure/Test Coverage: N/A -> see relevant Datasource evaluation
Documentation Completeness: Complete
Bug Risk: Low
Expectation Completeness: Complete
recognized_batch_spec_defaults
_instantiate_azure_client(self)
_instantiate_s3_client(self)
_instantiate_gcs_client(self)

Helper method for instantiating GCS client when GCSBatchSpec is passed in.

The method accounts for 3 ways that a GCS connection can be configured:
  1. setting an environment variable, which is typically GOOGLE_APPLICATION_CREDENTIALS

  2. passing in explicit credentials via gcs_options

  3. running Great Expectations from within a GCP container, at which you would be able to create a Client

    without passing in an additional environment variable or explicit credentials

configure_validator(self, validator)

Optionally configure the validator as appropriate for the execution engine.

load_batch_data(self, batch_id: str, batch_data: Union[PandasBatchData, pd.DataFrame])
get_batch_data_and_markers(self, batch_spec: BatchSpec)
_apply_splitting_and_sampling_methods(self, batch_spec, batch_data)
property dataframe(self)

Tests whether or not a Batch has been loaded. If the loaded batch does not exist, raises a ValueError Exception

static guess_reader_method_from_path(path)

Helper method for deciding which reader to use to read in a certain path.

Parameters

path (str) – the to use to guess

Returns

ReaderMethod to use for the filepath

_get_reader_fn(self, reader_method=None, path=None)

Static helper for parsing reader types. If reader_method is not provided, path will be used to guess the correct reader_method.

Parameters
  • reader_method (str) – the name of the reader method to use, if available.

  • path (str) – the path used to guess

Returns

ReaderMethod to use for the filepath

resolve_metric_bundle(self, metric_fn_bundle)

Resolve a bundle of metrics with the same compute domain as part of a single trip to the compute engine.

get_domain_records(self, domain_kwargs: dict)

Uses the given domain kwargs (which include row_condition, condition_parser, and ignore_row_if directives) to obtain and/or query a batch. Returns in the format of a Pandas DataFrame.

Parameters

domain_kwargs (dict) –

Returns

A DataFrame (the data on which to compute)

get_compute_domain(self, domain_kwargs: dict, domain_type: Union[str, MetricDomainTypes], accessor_keys: Optional[Iterable[str]] = None)

Uses the given domain kwargs (which include row_condition, condition_parser, and ignore_row_if directives) to obtain and/or query a batch. Returns in the format of a Pandas DataFrame. If the domain is a single column, this is added to ‘accessor domain kwargs’ and used for later access

Parameters
  • domain_kwargs (dict) –

  • domain_type (str or MetricDomainTypes) –

  • to be using, or a corresponding string value representing it. String types include "column", (like) –

  • "table", and "other". Enum types include capitalized versions of these from the ("column_pair",) –

  • MetricDomainTypes. (class) –

  • accessor_keys (str iterable) –

  • the domain and simply transferred with their associated values into accessor_domain_kwargs. (describing) –

Returns

  • a DataFrame (the data on which to compute)

  • a dictionary of compute_domain_kwargs, describing the DataFrame

  • a dictionary of accessor_domain_kwargs, describing any accessors needed to identify the domain within the compute domain

Return type

A tuple including

class great_expectations.execution_engine.SparkDFExecutionEngine(*args, persist=True, spark_config=None, force_reuse_spark_context=False, **kwargs)

Bases: great_expectations.execution_engine.ExecutionEngine

This class holds an attribute spark_df which is a spark.sql.DataFrame.

Feature Maturity

icon-8f5a62307afb11eda8370242ac110002 Validation Engine - pyspark - Self-Managed - How-to Guide
Use Spark DataFrame to validate data
Maturity: Production
Details:
API Stability: Stable
Implementation Completeness: Moderate
Unit Test Coverage: Complete
Integration Infrastructure/Test Coverage: N/A -> see relevant Datasource evaluation
Documentation Completeness: Complete
Bug Risk: Low/Moderate
Expectation Completeness: Moderate
icon-8f5a63d47afb11eda8370242ac110002 Validation Engine - Databricks - How-to Guide
Use Spark DataFrame in a Databricks cluster to validate data
Maturity: Beta
Details:
API Stability: Stable
Implementation Completeness: Low (dbfs-specific handling)
Unit Test Coverage: N/A -> implementation not different
Integration Infrastructure/Test Coverage: Minimal (we’ve tested a bit, know others have used it)
Documentation Completeness: Moderate (need docs on managing project configuration via dbfs/etc.)
Bug Risk: Low/Moderate
Expectation Completeness: Moderate
icon-8f5a64ce7afb11eda8370242ac110002 Validation Engine - EMR - Spark - How-to Guide
Use Spark DataFrame in an EMR cluster to validate data
Maturity: Experimental
Details:
API Stability: Stable
Implementation Completeness: Low (need to provide guidance on “known good” paths, and we know there are many “knobs” to tune that we have not explored/tested)
Unit Test Coverage: N/A -> implementation not different
Integration Infrastructure/Test Coverage: Unknown
Documentation Completeness: Low (must install specific/latest version but do not have docs to that effect or of known useful paths)
Bug Risk: Low/Moderate
Expectation Completeness: Moderate
icon-8f5a65aa7afb11eda8370242ac110002 Validation Engine - Spark - Other - How-to Guide
Use Spark DataFrame to validate data
Maturity: Experimental
Details:
API Stability: Stable
Implementation Completeness: Other (we haven’t tested possibility, known glue deployment)
Unit Test Coverage: N/A -> implementation not different
Integration Infrastructure/Test Coverage: Unknown
Documentation Completeness: Low (must install specific/latest version but do not have docs to that effect or of known useful paths)
Bug Risk: Low/Moderate
Expectation Completeness: Moderate
recognized_batch_definition_keys
recognized_batch_spec_defaults
property dataframe(self)

If a batch has been loaded, returns a Spark Dataframe containing the data within the loaded batch

load_batch_data(self, batch_id: str, batch_data: Union[SparkDFBatchData, DataFrame])
get_batch_data_and_markers(self, batch_spec: BatchSpec)
_apply_splitting_and_sampling_methods(self, batch_spec, batch_data)
static guess_reader_method_from_path(path)

Based on a given filepath, decides a reader method. Currently supports tsv, csv, and parquet. If none of these file extensions are used, returns ExecutionEngineError stating that it is unable to determine the current path.

Parameters

- A given file path (path) –

Returns

reader_method}

Return type

A dictionary entry of format {‘reader_method’

_get_reader_fn(self, reader, reader_method=None, path=None)

Static helper for providing reader_fn

Parameters
  • reader – the base spark reader to use; this should have had reader_options applied already

  • reader_method – the name of the reader_method to use, if specified

  • path (str) – the path to use to guess reader_method if it was not specified

Returns

ReaderMethod to use for the filepath

get_domain_records(self, domain_kwargs: dict)

Uses the given domain kwargs (which include row_condition, condition_parser, and ignore_row_if directives) to obtain and/or query a batch. Returns in the format of a Spark DataFrame.

Parameters

domain_kwargs (dict) –

Returns

A DataFrame (the data on which to compute)

static _combine_row_conditions(row_conditions: List[RowCondition])

Combine row conditions using AND if condition_type is SPARK_SQL

Note, although this method does not currently use self internally we are not marking as @staticmethod since it is meant to only be called internally in this class.

Parameters

row_conditions – Row conditions of type Spark

Returns

Single Row Condition combined

get_compute_domain(self, domain_kwargs: dict, domain_type: Union[str, MetricDomainTypes], accessor_keys: Optional[Iterable[str]] = None)

Uses a given batch dictionary and domain kwargs (which include a row condition and a condition parser) to obtain and/or query a batch. Returns in the format of a Spark DataFrame.

Parameters
  • domain_kwargs (dict) –

  • domain_type (str or MetricDomainTypes) –

  • to be using, or a corresponding string value representing it. String types include "identity", (like) –

  • "column_pair", "table" and "other". Enum types include capitalized versions of these from the ("column",) –

  • MetricDomainTypes. (class) –

  • accessor_keys (str iterable) –

  • the domain and simply transferred with their associated values into accessor_domain_kwargs. (describing) –

Returns

  • a DataFrame (the data on which to compute)

  • a dictionary of compute_domain_kwargs, describing the DataFrame

  • a dictionary of accessor_domain_kwargs, describing any accessors needed to identify the domain within the compute domain

Return type

A tuple including

add_column_row_condition(self, domain_kwargs, column_name=None, filter_null=True, filter_nan=False)

EXPERIMENTAL

Add a row condition for handling null filter.

Parameters
  • domain_kwargs – the domain kwargs to use as the base and to which to add the condition

  • column_name – if provided, use this name to add the condition; otherwise, will use “column” key from table_domain_kwargs

  • filter_null – if true, add a filter for null values

  • filter_nan – if true, add a filter for nan values

resolve_metric_bundle(self, metric_fn_bundle: Iterable[BundledMetricConfiguration])

For every metric in a set of Metrics to resolve, obtains necessary metric keyword arguments and builds bundles of the metrics into one large query dictionary so that they are all executed simultaneously. Will fail if bundling the metrics together is not possible.

Args:
metric_fn_bundle (Iterable[BundledMetricConfiguration]): “BundledMetricConfiguration” contains MetricProvider’s MetricConfiguration (its unique identifier),

its metric provider function (the function that actually executes the metric), and arguments to pass to metric provider function (dictionary of metrics defined in registry and corresponding arguments).

Returns:

A dictionary of “MetricConfiguration” IDs and their corresponding fully resolved values for domains.

head(self, n=5)

Returns dataframe head. Default is 5

class great_expectations.execution_engine.SqlAlchemyExecutionEngine(name: Optional[str] = None, credentials: Optional[dict] = None, data_context: Optional[Any] = None, engine: Optional[SaEngine] = None, connection_string: Optional[str] = None, url: Optional[str] = None, batch_data_dict: Optional[dict] = None, create_temp_table: bool = True, concurrency: Optional[ConcurrencyConfig] = None, **kwargs)

Bases: great_expectations.execution_engine.ExecutionEngine

Helper class that provides a standard way to create an ABC using inheritance.

property credentials(self)
property connection_string(self)
property url(self)
property dialect(self)
property dialect_name(self)

Retrieve the string name of the engine dialect in lowercase e.g. “postgresql”.

Returns

String representation of the sql dialect.

_build_engine(self, credentials: dict, **kwargs)

Using a set of given credentials, constructs an Execution Engine , connecting to a database using a URL or a private key path.

static _get_sqlalchemy_key_pair_auth_url(drivername: str, credentials: dict)

Utilizing a private key path and a passphrase in a given credentials dictionary, attempts to encode the provided values into a private key. If passphrase is incorrect, this will fail and an exception is raised.

Parameters
  • drivername (str) –

  • credentials (dict) –

Returns

a tuple consisting of a url with the serialized key-pair authentication, and a dictionary of engine kwargs.

get_domain_records(self, domain_kwargs: dict)

Uses the given domain kwargs (which include row_condition, condition_parser, and ignore_row_if directives) to obtain and/or query a batch. Returns in the format of an SqlAlchemy table/column(s) object.

Parameters

domain_kwargs (dict) –

Returns

An SqlAlchemy table/column(s) (the selectable object for obtaining data on which to compute)

get_compute_domain(self, domain_kwargs: dict, domain_type: Union[str, MetricDomainTypes], accessor_keys: Optional[Iterable[str]] = None)

Uses a given batch dictionary and domain kwargs to obtain a SqlAlchemy column object.

Parameters
  • domain_kwargs (dict) –

  • domain_type (str or MetricDomainTypes) –

  • to be using, or a corresponding string value representing it. String types include "identity", (like) –

  • "column_pair", "table" and "other". Enum types include capitalized versions of these from the ("column",) –

  • MetricDomainTypes. (class) –

  • accessor_keys (str iterable) –

  • the domain and simply transferred with their associated values into accessor_domain_kwargs. (describing) –

Returns

SqlAlchemy column

_split_column_metric_domain_kwargs(self, domain_kwargs: dict, domain_type: MetricDomainTypes)

Split domain_kwargs for column domain types into compute and accessor domain kwargs.

Parameters
  • domain_kwargs – A dictionary consisting of the domain kwargs specifying which data to obtain

  • domain_type – an Enum value indicating which metric domain the user would

  • to be using. (like) –

Returns

compute_domain_kwargs, accessor_domain_kwargs split from domain_kwargs The union of compute_domain_kwargs, accessor_domain_kwargs is the input domain_kwargs

_split_column_pair_metric_domain_kwargs(self, domain_kwargs: dict, domain_type: MetricDomainTypes)

Split domain_kwargs for column pair domain types into compute and accessor domain kwargs.

Parameters
  • domain_kwargs – A dictionary consisting of the domain kwargs specifying which data to obtain

  • domain_type – an Enum value indicating which metric domain the user would

  • to be using. (like) –

Returns

compute_domain_kwargs, accessor_domain_kwargs split from domain_kwargs The union of compute_domain_kwargs, accessor_domain_kwargs is the input domain_kwargs

_split_multi_column_metric_domain_kwargs(self, domain_kwargs: dict, domain_type: MetricDomainTypes)

Split domain_kwargs for multicolumn domain types into compute and accessor domain kwargs.

Parameters
  • domain_kwargs – A dictionary consisting of the domain kwargs specifying which data to obtain

  • domain_type – an Enum value indicating which metric domain the user would

  • to be using. (like) –

Returns

compute_domain_kwargs, accessor_domain_kwargs split from domain_kwargs The union of compute_domain_kwargs, accessor_domain_kwargs is the input domain_kwargs

resolve_metric_bundle(self, metric_fn_bundle: Iterable[BundledMetricConfiguration])

For every metric in a set of Metrics to resolve, obtains necessary metric keyword arguments and builds bundles of the metrics into one large query dictionary so that they are all executed simultaneously. Will fail if bundling the metrics together is not possible.

Args:
metric_fn_bundle (Iterable[BundledMetricConfiguration]): “BundledMetricConfiguration” contains MetricProvider’s MetricConfiguration (its unique identifier),

its metric provider function (the function that actually executes the metric), and arguments to pass to metric provider function (dictionary of metrics defined in registry and corresponding arguments).

Returns:

A dictionary of “MetricConfiguration” IDs and their corresponding now-queried (fully resolved) values.

close(self)

Note: Will 20210729

This is a helper function that will close and dispose Sqlalchemy objects that are used to connect to a database. Databases like Snowflake require the connection and engine to be instantiated and closed separately, and not doing so has caused problems with hanging connections.

Currently the ExecutionEngine does not support handling connections and engine separately, and will actually override the engine with a connection in some cases, obfuscating what object is used to actually used by the ExecutionEngine to connect to the external database. This will be handled in an upcoming refactor, which will allow this function to eventually become:

self.connection.close() self.engine.dispose()

More background can be found here: https://github.com/great-expectations/great_expectations/pull/3104/

_get_splitter_method(self, splitter_method_name: str)

Get the appropriate splitter method from the method name.

Parameters

splitter_method_name – name of the splitter to retrieve.

Returns

splitter method.

execute_split_query(self, split_query: Selectable)

Use the execution engine to run the split query and fetch all of the results.

Parameters

split_query – Query to be executed as a sqlalchemy Selectable.

Returns

List of row results.

get_data_for_batch_identifiers(self, table_name: str, splitter_method_name: str, splitter_kwargs: dict)

Build data used to construct batch identifiers for the input table using the provided splitter config.

Sql splitter configurations yield the unique values that comprise a batch by introspecting your data.

Parameters
  • table_name – Table to split.

  • splitter_method_name – Desired splitter method to use.

  • splitter_kwargs – Dict of directives used by the splitter method as keyword arguments of key=value.

Returns

{“key”: value}}]

Return type

List of dicts of the form [{column_name

_build_selectable_from_batch_spec(self, batch_spec: BatchSpec)
get_batch_data_and_markers(self, batch_spec: BatchSpec)