Volver al blog
data-engineering

Data Quality con Great Expectations: No más data corrupta

Cómo implementar data quality checks que atrapan problemas antes de producción.

7 de junio de 202611 min
data-qualitytestingvalidation

La calidad de datos es el problema silencioso que destruye la confianza en los sistemas de analytics. Un pipeline puede correr perfectamente mientras que los datos que produce están incorrectos. Great Expectations (GX) es el framework que usamos para automatizar las validaciones.

El problema

Sin validación de datos:

# Este pipeline "funciona" pero el resultado puede ser incorrecto
def compute_conversion_rate(df):
    return df[df['event'] == 'purchase'].count() / df.count()

# ¿Qué pasa si:
# - Hay filas duplicadas?
# - El campo 'event' tiene valores nulos?
# - Los timestamps son del futuro (bug en el source)?
# - El rate calculado es 150% (matemáticamente imposible)?

Los dashboards muestran números incorrectos. El equipo de negocio toma decisiones basadas en datos rotos. Nadie lo sabe hasta que es demasiado tarde.

Great Expectations: conceptos

Expectation: Una aserción sobre los datos. "La columna user_id no debe tener nulos."

Expectation Suite: Conjunto de expectations que definen la calidad esperada de un dataset.

Checkpoint: Valida un dataset contra un Expectation Suite y genera un reporte.

Setup básico

pip install great-expectations
gx init  # Crea el directorio gx/ con la configuración

Crear Expectations

import great_expectations as gx

context = gx.get_context()

# Definir una fuente de datos
datasource = context.sources.add_pandas("my_datasource")
data_asset = datasource.add_dataframe_asset("orders")

batch_request = data_asset.build_batch_request(dataframe=df_orders)
validator = context.get_validator(batch_request=batch_request)

# Expectations sobre columnas críticas
validator.expect_column_to_exist("order_id")
validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")

validator.expect_column_to_exist("gmv")
validator.expect_column_values_to_not_be_null("gmv")
validator.expect_column_values_to_be_between("gmv", min_value=0, max_value=1_000_000)

validator.expect_column_values_to_be_in_set(
    "status",
    ["pending", "completed", "cancelled", "refunded"]
)

validator.expect_column_values_to_match_regex(
    "seller_id",
    regex=r"^\d{6,12}$"  # IDs numéricos de 6-12 dígitos
)

# Expectations estadísticas
validator.expect_column_mean_to_be_between("gmv", min_value=50, max_value=5000)
validator.expect_column_proportion_of_unique_values_to_be_between(
    "buyer_id", min_value=0.3, max_value=1.0
)

# Guardar el Expectation Suite
validator.save_expectation_suite(discard_failed_expectations=False)

Integrar en el pipeline

import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest

def validate_and_load(df: pd.DataFrame, suite_name: str) -> bool:
    """Valida el dataframe antes de cargarlo. Devuelve False si falla la validación."""
    
    context = gx.get_context()
    
    checkpoint = context.add_or_update_checkpoint(
        name=f"{suite_name}_checkpoint",
        validations=[{
            "batch_request": {
                "datasource_name": "my_datasource",
                "data_asset_name": "orders",
                "runtime_parameters": {"batch_data": df},
                "batch_identifiers": {"default_identifier_name": "default_identifier"},
            },
            "expectation_suite_name": suite_name,
        }],
    )
    
    result = checkpoint.run()
    
    if not result.success:
        # Loggear qué falló
        for validation_result in result.run_results.values():
            for exp_result in validation_result["validation_result"]["results"]:
                if not exp_result["success"]:
                    logger.error(f"Data quality failure: {exp_result['expectation_config']}")
        
        return False
    
    return True

def etl_pipeline(date: str):
    df = extract_orders(date)
    
    if not validate_and_load(df, "orders_suite"):
        raise DataQualityException(f"Data quality check failed for {date}")
    
    transformed = transform(df)
    load(transformed)

Expectations personalizadas

from great_expectations.expectations.expectation import ColumnExpectation
from great_expectations.execution_engine import PandasExecutionEngine

class ExpectColumnValuesToBeValidArgentinaDate(ColumnExpectation):
    """Verifica que las fechas sean válidas para el contexto argentino."""
    
    metric_dependencies = ("column.values.to_be_valid_ar_date",)
    success_keys = ("min_date", "max_date")
    
    @classmethod
    def _validate(cls, configuration, metrics, *args, **kwargs):
        column_value_counts = metrics.get("column.value_counts.without_modifying_index")
        min_date = configuration.kwargs.get("min_date", "2020-01-01")
        max_date = configuration.kwargs.get("max_date")
        
        # Tu lógica de validación acá
        ...

Reporting automático

Great Expectations genera reportes HTML automáticamente:

context.build_data_docs()
context.open_data_docs()  # Abre en el navegador

Los reportes muestran qué expectativas pasaron, cuáles fallaron, ejemplos de valores que no cumplen, y estadísticas del dataset.

El impacto real

Después de implementar GX en nuestros pipelines principales:

  • 0 incidentes de datos incorrectos en producción en 8 meses
  • Detección temprana: los problemas se detectan en el pipeline, no cuando el equipo de negocio reporta números raros
  • Documentación viva: las expectations documentan qué propiedades se garantizan del dataset

Escrito por Mariano Gobea Alcoba