Get started with Great Expectations and Databricks
Use the information provided here to learn how you can use Great Expectations (GX) with Databricks.
To use GX with Databricks, you'll complete the following tasks:
- Load data
- Instantiate a Data ContextThe primary entry point for a Great Expectations deployment, with configurations and methods for all supporting components.
- Create a Data SourceProvides a standard API for accessing and interacting with data from a wide variety of source systems. and a Data AssetA collection of records within a Data Source which is usually named based on the underlying data system and sliced to correspond to a desired specification.
- Create an Expectation SuiteA collection of verifiable assertions about data.
- Validate data using a CheckpointThe primary means for validating data in a production deployment of Great Expectations.
The information provided here is intended to get you started quickly. To validate files stored in the DBFS, select the File tab. If you have an existing Spark DataFrame loaded, select one of the DataFrame tabs. See the specific integration guides if you're using a different file store, such as Amazon S3, Google Cloud Storage (GCS), or Microsoft Azure Blob Storage (ABS).
The complete code used in the following examples is available on GitHub:
Prerequisites
- A complete Databricks setup, including a running Databricks cluster with an attached notebook
- Access to DBFS
Install GX
-
Run the following command in your notebook to install GX as a notebook-scoped library:
%pip install great-expectations
A notebook-scoped library is a custom Python environment that is specific to a notebook. You can also install a library at the cluster or workspace level. See Databricks Libraries.
- Run the following command to import the Python configurations you'll use in the following steps:
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint
Set up GX
To avoid configuring external resources, you'll use the Databricks File System (DBFS) for your Metadata Stores and Data DocsHuman readable documentation generated from Great Expectations metadata detailing Expectations, Validation Results, etc. store.
DBFS is a distributed file system mounted in a Databricks workspace and available on Databricks clusters. Files on DBFS can be written and read as if they were on a local filesystem by adding the /dbfs/ prefix to the path. It also persists in object storage, so you won’t lose data after terminating a cluster. See the Databricks documentation for best practices, including mounting object stores.
- Run the following code to set up a Data ContextThe primary entry point for a Great Expectations deployment, with configurations and methods for all supporting components. with the default settings:
context_root_dir = "/dbfs/great_expectations/"
- Run the following code to instantiate your Data Context:
context = gx.get_context(context_root_dir=context_root_dir)
Prepare your data
- File
- DataFrame
Run the following command with dbutils to copy existing example .csv
taxi data to your DBFS folder:
# Copy 3 months of data
for month in range(1, 4):
dbutils.fs.cp(
f"/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-0{month}.csv.gz",
f"/example_data/nyctaxi/tripdata/yellow/yellow_tripdata_2019-0{month}.csv.gz"
)
Run the following code in your notebook to load a month of existing example taxi data as a DataFrame:
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/databricks-datasets/nyctaxi/tripdata/yellow/yellow_tripdata_2019-01.csv.gz")
Connect to your data
- File
- DataFrame
- Run the following command to set the base directory that contains the data:
base_directory = "/dbfs/example_data/nyctaxi/tripdata/yellow/"
- Run the following command to create our Data SourceProvides a standard API for accessing and interacting with data from a wide variety of source systems.:
dbfs_datasource = context.sources.add_or_update_spark_dbfs(
name="my_spark_dbfs_datasource",
base_directory=base_directory,
)
- Run the following command to set the batching regex:
batching_regex = r"yellow_tripdata_(?P<year>\d{4})-(?P<month>\d{2})\.csv\.gz"
- Run the following command to create a Data AssetA collection of records within a Data Source which is usually named based on the underlying data system and sliced to correspond to a desired specification. with the Data Source:
csv_asset = dbfs_datasource.add_csv_asset(
name="yellow_tripdata",
batching_regex=batching_regex,
header=True,
infer_schema=True,
)
- Run the following command to build a Batch RequestProvided to a Data Source in order to create a Batch. with the Data AssetA collection of records within a Data Source which is usually named based on the underlying data system and sliced to correspond to a desired specification. you configured earlier:
batch_request = csv_asset.build_batch_request()
- Run the following command to create the Data Source:
dataframe_datasource = context.sources.add_or_update_spark(
name="my_spark_in_memory_datasource",
)
csv_file_path = "/path/to/data/directory/yellow_tripdata_2020-08.csv"
- Run the following command to create a Data AssetA collection of records within a Data Source which is usually named based on the underlying data system and sliced to correspond to a desired specification. with the Data Source:
df = spark.read.csv(csv_file_path, header=True)
dataframe_asset = dataframe_datasource.add_dataframe_asset(
name="yellow_tripdata",
dataframe=df,
)
- Run the following command to build a Batch RequestProvided to a Data Source in order to create a Batch. with the Data AssetA collection of records within a Data Source which is usually named based on the underlying data system and sliced to correspond to a desired specification. you configured earlier:
batch_request = dataframe_asset.build_batch_request()
Create Expectations
You'll use a ValidatorUsed to run an Expectation Suite against data. to interact with your batch of data and generate an Expectation SuiteA collection of verifiable assertions about data..
Every time you evaluate an Expectation with validator.expect_*
, it is immediately Validated against your data. This instant feedback helps you identify unexpected data and removes the guesswork from data exploration. The Expectation configuration is stored in the Validator. When you are finished running the Expectations on the dataset, you can use validator.save_expectation_suite()
to save all of your Expectation configurations into an Expectation Suite for later use in a checkpoint.
- Run the following command to create the suite and get a
Validator
:
expectation_suite_name = "insert_your_expectation_suite_name_here"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
print(validator.head())
- Run the following command to use the
Validator
to add a few Expectations:
validator.expect_column_values_to_not_be_null(column="passenger_count")
validator.expect_column_values_to_be_between(
column="congestion_surcharge", min_value=0, max_value=1000
)
- Run the following command to save your Expectation Suite (all the unique Expectation Configurations from each run of
validator.expect_*
) to your Expectation Store:
validator.save_expectation_suite(discard_failed_expectations=False)
Validate your data
You'll create and store a CheckpointThe primary means for validating data in a production deployment of Great Expectations. for your batch, which you can use to validate and run post-validation actions.
- Run the following command to create the Checkpoint configuration that uses your Data Context, passes in your Batch Request (your data) and your Expectation Suite (your tests):
my_checkpoint_name = "my_databricks_checkpoint"
checkpoint = Checkpoint(
name=my_checkpoint_name,
run_name_template="%Y%m%d-%H%M%S-my-run-name-template",
data_context=context,
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
action_list=[
{
"name": "store_validation_result",
"action": {"class_name": "StoreValidationResultAction"},
},
{"name": "update_data_docs", "action": {"class_name": "UpdateDataDocsAction"}},
],
)
- Run the following command to save the Checkpoint:
context.add_or_update_checkpoint(checkpoint=checkpoint)
- Run the following command to run the Checkpoint:
checkpoint_result = checkpoint.run()
Your Checkpoint configuration includes the store_validation_result
and update_data_docs
actions. The store_validation_result
action saves your validation results from the Checkpoint run and allows the results to be persisted for future use. The update_data_docs
action builds Data Docs files for the validations run in the Checkpoint.
To learn more about Data validation and customizing Checkpoints, see Validate Data:Overview .
To view the full Checkpoint configuration, run: print(checkpoint.get_config().to_yaml_str())
.
Build and view Data Docs
Your Checkpoint contained an UpdateDataDocsAction
, so your Data DocsHuman readable documentation generated from Great Expectations metadata detailing Expectations, Validation Results, etc. have already been built from the validation you ran and your Data Docs store contains a new rendered validation result.
Because you used the DBFS for your Data Docs store, you need to download your Data Docs locally to view them. If you use a different store, you can host your data docs in a place where they can be accessed directly by your organization.
If you have the Databricks CLI installed and configured locally, run the following commands from your local terminal to download your data docs and open the local copy of index.html
to view your updated Data Docs:
databricks fs cp -r dbfs:/great_expectations/uncommitted/data_docs/local_site/ great_expectations/uncommitted/data_docs/local_site/
cd great_expectations/uncommitted/data_docs/local_site
open -a "<YOUR_PREFERRED_BROWSER_NAME_HERE>" index.html
The displayHTML
command is another option you can use to display Data Docs in a Databricks notebook. However, when you use this option, an empty page is returned when you click a link in the displayed data documents. To view validation results, use the following method:
html = '/dbfs/great_expectations/uncommitted/data_docs/local_site/index.html'
with open(html, "r") as f:
data = "".join([l for l in f])
displayHTML(data)
Next steps
Now that you've created and saved a Data Context, Data Source, Data Asset, Expectation Suite, and Checkpoint, see Validate data with Expectations and Checkpoints to create a script to run the Checkpoint without the need to recreate your Data Assets and Expectations. To move Databricks notebooks to production, see Software Engineering Best Practices With Databricks Notebooks from Databricks.