Skip to main content

Command Palette

Search for a command to run...

Change Data Feed en Delta Lake: Captura incremental de cambios para pipelines modernos en Microsoft Fabric

Updated
17 min read

Change Data Feed es una funcionalidad de Delta Lake que registra los cambios a nivel de fila entre versiones de una tabla Delta. Al activarla, el runtime genera un evento de cambio por cada operación de escritura: inserciones, actualizaciones y eliminaciones.

Piénsalo como un CDC (Change Data Capture) nativo dentro de Delta Lake, sin necesidad de herramientas externas ni lectura de logs de base de datos. Los cambios quedan accesibles directamente vía Spark SQL o PySpark.


El esquema de eventos CDF

Al leer el CDF, obtienes las columnas de datos de tu tabla más tres columnas de metadatos automáticas:

Columna Tipo Descripción
_change_type String Tipo de operación: insert, update_preimage, update_postimage, delete
_commit_version Long Versión del log de Delta del commit
_commit_timestamp Timestamp Timestamp exacto del commit

Los cuatro valores posibles de _change_type son:

  • insert — Fila nueva insertada

  • update_preimage — Estado de la fila antes de la actualización

  • update_postimage — Estado de la fila después de la actualización

  • delete — Fila eliminada

Nota clave sobre updates: cada UPDATE genera dos filas: update_preimage y update_postimage. Esto es fundamental para SCD2, ya que tenemos el estado anterior y el nuevo con sus timestamps exactos.


Cómo habilitar el Change Data Feed en Fabric

CDF no está activo por defecto: debe activarse explícitamente. Además, solo captura cambios realizados después de su activación — el historial previo no queda registrado.

Hay tres formas de activarlo:

1. Al crear una tabla nueva

CREATE TABLE raw_productos (
    id       INT,
    nombre   STRING,
    precio   DECIMAL(10,2),
    stock    INT
)
TBLPROPERTIES (delta.enableChangeDataFeed = true);

2. En una tabla existente

ALTER TABLE raw_productos
SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

3. Para todas las tablas nuevas de la sesión

spark.conf.set('spark.microsoft.delta.properties.defaults.enableChangeDataFeed', 'true')

Verificar que está habilitado

spark.sql("DESCRIBE DETAIL raw_productos") \
     .select('name', 'properties') \
     .show(truncate=False)

Resultado esperado en Fabric:

{delta.enableChangeDataFeed -> true, delta.stats.extended.collect -> true, ...}

Lectura de cambios

Puedes consultar los cambios por rango de versiones o por rango de timestamps. Ambos extremos del rango son inclusivos.

Spark SQL

-- Por rango de versiones
SELECT * FROM table_changes('raw_productos', 0, 10);

-- Desde una versión hasta la última disponible
SELECT * FROM table_changes('raw_productos', 0);

-- Por rango de timestamps
SELECT * FROM table_changes('raw_productos',
    '2026-01-01 00:00:00',
    '2026-03-05 09:30:55');

PySpark

# Por versiones
df = (spark.read
    .format('delta')
    .option('readChangeFeed', 'true')
    .option('startingVersion', 0)
    .option('endingVersion', 10)
    .table('raw_productos'))

# Por timestamps
df = (spark.read
    .format('delta')
    .option('readChangeFeed', 'true')
    .option('startingTimestamp', '2026-03-05 09:30:05')
    .option('endingTimestamp',   '2026-03-05 09:30:55')
    .table('raw_productos'))

# Desde una versión hasta la última disponible
df = (spark.read
    .format('delta')
    .option('readChangeFeed', 'true')
    .option('startingVersion', 0)
    .table('raw_productos'))

Ejemplo de salida real con una tabla de productos:


Almacenamiento de los datos de cambio

Delta Lake almacena los registros de cambio para operaciones UPDATE, DELETE y MERGE en la carpeta _change_datadentro del directorio de la tabla Delta en OneLake. Sin embargo, existen situaciones donde Delta Lake optimiza el proceso y no genera archivos en esta carpeta:

  • Operaciones de solo inserción (INSERT): los cambios se calculan directamente desde el transaction log.

  • Eliminaciones completas de partición: igualmente calculables sin archivos adicionales.

Los archivos en _change_data siguen la misma política de retención de la tabla. Si ejecutas VACUUM con el período de retención por defecto (7 días), también se limpiarán estos registros de cambio.


Caso de uso 1: Sincronización Silver → Gold

El primer caso de uso es el más frecuente en arquitecturas Medallion: propagar cambios desde la capa Silver a Gold procesando únicamente las filas que han cambiado.

Datos iniciales

Disponemos de dos tablas de productos (productos_silver y productos_gold) con los siguientes datos:

Y se generan los siguientes cambios sobre la tabla de silver:

# Actualizar precio del Laptop (bajada de precio)
spark.sql("""
    UPDATE productos_silver
    SET precio = 849.99, stock = 45
    WHERE producto_id = 1
""")

# Añadir nuevo producto al catálogo
nuevo = spark.createDataFrame([
    Row(5, 'Webcam HD', 'Perifericos', Decimal('59.99'), 180)
], schema)
nuevo.write.format('delta').mode('append').saveAsTable('productos_silver')

# Eliminar producto descatalogado
spark.sql("DELETE FROM productos_silver WHERE producto_id = 4")

Punto de partida: detectar la versión CDF inicial

Un problema práctico es saber desde qué versión empezar a leer el CDF. Consultar el historial de la tabla y buscar cuándo se habilitó delta.enableChangeDataFeed resuelve esto de forma robusta:

def get_cdf_start_version(table_name: str) -> int:
    """
    Obtiene la primera versión donde CDF fue habilitado en la tabla,
    consultando DESCRIBE HISTORY y buscando en operationParameters
    la propiedad delta.enableChangeDataFeed = true.
    """
    history_df = spark.sql(f"DESCRIBE HISTORY {table_name}")
    cdf_versions = []

    for row in history_df.collect():
        operation = row['operation'] or ''
        params    = row['operationParameters'] or {}

        if operation not in ('SET TBLPROPERTIES', 'CREATE TABLE',
                             'CREATE OR REPLACE TABLE AS SELECT'):
            continue

        raw_props = params.get('properties', '{}')
        try:
            props = json.loads(raw_props)
        except (json.JSONDecodeError, TypeError):
            props = {}

        if props.get('delta.enableChangeDataFeed') == 'true':
            cdf_versions.append(row['version'])

    if not cdf_versions:
        raise ValueError(
            f"CDF no está habilitado en '{table_name}'. "
            f"Ejecuta: ALTER TABLE {table_name} "
            f"SET TBLPROPERTIES (delta.enableChangeDataFeed = true)"
        )

    return min(cdf_versions)

El proceso de sincronización completo

Con los datos de Silver cargados y los cambios simulados (update de precio, insert de nuevo producto, delete de producto descatalogado), el proceso completo es:

from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Obtener versión inicial del CDF
cdf_start_version = get_cdf_start_version('productos_silver')

# Leer todos los cambios
cambios_cdf = (spark.read
               .format('delta')
               .option('readChangeFeed', 'true')
               .option('startingVersion', cdf_start_version)
               .table('productos_silver'))

# Filtrar update_preimage (nos quedamos con el estado final)
cambios_filtrados = cambios_cdf.filter(
    F.col('_change_type') != 'update_preimage'
)

# Quedarse con la última operación por producto (en caso de múltiples cambios)
window = Window.partitionBy('producto_id').orderBy(F.col('_commit_version').desc())
ultimos_cambios = (cambios_filtrados
    .withColumn('rn', F.row_number().over(window))
    .filter(F.col('rn') == 1)
    .drop('rn'))

# Aplicar MERGE a Gold
gold   = DeltaTable.forName(spark, 'productos_gold')
upserts = ultimos_cambios.drop('_commit_version', '_commit_timestamp')

gold.alias('gold').merge(
    upserts.alias('silver'),
    'gold.producto_id = silver.producto_id'
).whenMatchedDelete(
    condition="silver._change_type = 'delete'"
).whenMatchedUpdateAll(
    condition="silver._change_type = 'update_postimage'"
).whenNotMatchedInsert(
    condition="silver._change_type = 'insert'",
    values={
        'producto_id': 'silver.producto_id',
        'nombre':      'silver.nombre',
        'categoria':   'silver.categoria',
        'precio':      'silver.precio',
        'stock':       'silver.stock',
    }
).execute()

El MERGE discrimina por _change_type dentro de cada cláusula whenMatched / whenNotMatched, propagando correctamente inserts, updates y deletes a Gold. Con row_number() resuelve el caso en que el mismo registro cambia varias veces entre dos ejecuciones — nos quedamos únicamente con su estado final.

select * from productos_gold

Puedes ver el notebook completo aquí: NB_cdf_sample.ipynb


Caso de uso 2: SCD Tipo 2 con CDF

El segundo caso de uso es más sofisticado: implementar Slowly Changing Dimension Type 2 (SCD2) aprovechando el preimage/postimage que proporciona CDF.

En SCD2, cada cambio genera una nueva fila en lugar de sobreescribir la existente, manteniendo el historial completo con columnas valid_from, valid_to e is_current.

CDF es ideal para SCD2 porque nos da exactamente lo que necesitamos: el estado anterior (update_preimage) y el nuevo (update_postimage) con el timestamp exacto del cambio.

Inicialización: snapshot inicial de Silver

En la primera ejecución no hay historial CDF que procesar — construimos Gold desde el snapshot de Silver en la versión donde se habilitó CDF:

SILVER_TABLE = 'productos_silver_scd2'
GOLD_TABLE   = 'productos_gold_scd2'

gold_exists = spark.catalog.tableExists(GOLD_TABLE)
cdf_start_version, cdf_start_timestamp = get_cdf_start_info(SILVER_TABLE)

if not gold_exists:
    # Primera ejecución: leemos el snapshot de silver exactamente en la versión
    # donde se habilitó CDF, usando Delta Time Travel. Así obtenemos el estado
    # inicial de la tabla sin depender de tipos de eventos CDF.
    silver_snapshot = (spark.read
                        .format('delta')
                        .option('versionAsOf', cdf_start_version)
                        .table(SILVER_TABLE))

    gold_df = (silver_snapshot
               .withColumn('valid_from', F.lit(cdf_start_timestamp).cast('timestamp'))
               .withColumn('valid_to',   F.lit(None).cast('timestamp'))
               .withColumn('is_current', F.lit(True)))

    gold_df.write.format('delta').mode('overwrite').saveAsTable(GOLD_TABLE)

    # Guardar watermark: la versión de silver que acabamos de procesar
    spark.sql(f"""
        ALTER TABLE {GOLD_TABLE}
        SET TBLPROPERTIES ('scd2.last_processed_version' = '{cdf_start_version}')
    """)

    print(f"[INIT] Tabla '{GOLD_TABLE}' creada desde snapshot v{cdf_start_version} ({cdf_start_timestamp}).")

else:
    print(f"[SKIP] Tabla '{GOLD_TABLE}' ya existe. Ejecuta el proceso incremental.")

display(spark.table(GOLD_TABLE).orderBy('producto_id'))

Patrón watermark: almacenar la última versión procesada como propiedad de la propia tabla Gold (scd2.last_processed_version) es una buena técnica — el estado del pipeline viaja junto con los datos, sin necesidad de tablas de control externas.


Simular cambios en Silver

# Actualizar precio del Laptop (bajada de precio)
spark.sql("""
    UPDATE productos_silver_scd2
    SET precio = 849.99, stock = 45
    WHERE producto_id = 1
""")

# Añadir nuevo producto al catálogo
nuevo = spark.createDataFrame([
    Row(5, 'Webcam HD', 'Perifericos', Decimal('59.99'), 180)
], schema)
nuevo.write.format('delta').mode('append').saveAsTable('productos_silver_scd2')

# Eliminar producto descatalogado
spark.sql("DELETE FROM productos_silver_scd2 WHERE producto_id = 4")

Proceso incremental SCD2

Una vez existe la tabla gold, el proceso incremental lee el CDF de silver desde la versión siguiente al watermark almacenado como propiedad de la tabla gold (scd2.last_processed_version).

Por cada lote de cambios aplica la lógica SCD2 en dos pasos:

  1. Cerrar registros activos (is_current = True) en gold para los registros que han sido actualizados (update_preimage) o eliminados (delete) — se les asigna valid_to = _commit_timestamp e is_current = False mediante un MERGE.

  2. Insertar nuevas versiones en gold para los registros nuevos (insert) o actualizados (update_postimage) — con valid_from = _commit_timestamp, valid_to = null e is_current = True.

Al finalizar, el watermark se actualiza con la última versión procesada.

SILVER_TABLE = 'productos_silver_scd2'
GOLD_TABLE   = 'productos_gold_scd2'

# Leer watermark: última versión de silver ya procesada
gold_props        = spark.sql(f"DESCRIBE DETAIL {GOLD_TABLE}").collect()[0]['properties']
last_version      = int(gold_props['scd2.last_processed_version'])
next_version      = last_version + 1

# Versión más reciente disponible en silver
silver_latest     = spark.sql(f"DESCRIBE HISTORY {SILVER_TABLE} LIMIT 1").collect()[0]['version']

if next_version > silver_latest:
    print(f"[SKIP] No hay cambios nuevos en silver desde la versión {last_version}.")
else:
    print(f"Procesando versiones {next_version} – {silver_latest} de silver...")

    cambios_df = (spark.read
                   .format('delta')
                   .option('readChangeFeed', 'true')
                   .option('startingVersion', next_version)
                   .table(SILVER_TABLE)
                   .cache())

    max_version = cambios_df.agg(F.max('_commit_version')).collect()[0][0]

    # ── Paso 1: cerrar registros activos en gold ──────────────────────────────
    # Para cada UPDATE (preimage) y DELETE buscamos el registro con is_current=True
    # y le ponemos valid_to = timestamp del commit en que dejó de ser válido.
    to_close = (cambios_df
                .filter(F.col('_change_type').isin(['update_preimage', 'delete']))
                .select('producto_id', '_commit_timestamp')
                .distinct())

    (DeltaTable.forName(spark, GOLD_TABLE).alias('gold')
     .merge(
         to_close.alias('c'),
         'gold.producto_id = c.producto_id AND gold.is_current = true'
     )
     .whenMatchedUpdate(set={
         'valid_to':   'c._commit_timestamp',
         'is_current': F.lit(False)
     })
     .execute())

    # ── Paso 2: insertar nuevas versiones en gold ─────────────────────────────
    # UPDATE postimage → nueva versión activa del registro modificado.
    # INSERT           → registro completamente nuevo en silver.
    new_records = (cambios_df
                   .filter(F.col('_change_type').isin(['update_postimage', 'insert']))
                   .withColumn('valid_from', F.col('_commit_timestamp'))
                   .withColumn('valid_to',   F.lit(None).cast('timestamp'))
                   .withColumn('is_current', F.lit(True))
                   .drop('_change_type', '_commit_version', '_commit_timestamp'))

    new_records.write.format('delta').mode('append').saveAsTable(GOLD_TABLE)

    # ── Paso 3: actualizar watermark ──────────────────────────────────────────
    spark.sql(f"""
        ALTER TABLE {GOLD_TABLE}
        SET TBLPROPERTIES ('scd2.last_processed_version' = '{max_version}')
    """)

    cambios_df.unpersist()
    print(f"[OK] Versiones {next_version}–{max_version} procesadas.")

display(spark.table(GOLD_TABLE).orderBy('producto_id', 'valid_from'))

Simular múltiples cambios sobre el mismo registro

El proceso incremental anterior funciona cuando cada producto cambia una sola vez por batch. Pero si el mismo producto_id recibe varios UPDATEs entre dos ejecuciones, el MERGE simple falla: después de cerrar el primer registro activo, el segundo preimage no encontrará ningún is_current = True y la cadena de versiones quedará rota.

Simulamos ese caso: dos bajadas de precio consecutivas sobre el producto 1 y un ajuste de stock en el producto 3, los tres en el mismo batch incremental.

# Primera bajada de precio del Laptop (commit independiente)
spark.sql("""
    UPDATE productos_silver_scd2
    SET precio = 799.99, stock = 30
    WHERE producto_id = 1
""")

# Segunda bajada de precio del Laptop (otro commit independiente)
spark.sql("""
    UPDATE productos_silver_scd2
    SET precio = 749.99, stock = 20
    WHERE producto_id = 1
""")

# Ajuste de stock en el Teclado
spark.sql("""
    UPDATE productos_silver_scd2
    SET stock = 180
    WHERE producto_id = 3
""")

Proceso incremental SCD2 con varias versiones

La solución al problema de múltiples cambios por producto es usar lead() sobre _commit_version en lugar del MERGE doble.

Lógica en dos pasos:

  1. Cerrar el registro activo en gold usando el timestamp del primer evento del batch para ese producto (mínimo _commit_timestamp de preimages/deletes). Esto cierra exactamente una vez independientemente de cuántos cambios haya.

    cambios_df = (spark.read
                    .format('delta')
                    .option('readChangeFeed', 'true')
                    .option('startingVersion', next_version)
                    .table(SILVER_TABLE)
                    .cache())
    
    max_version = cambios_df.agg(F.max('_commit_version')).collect()[0][0]
    
    first_event = (cambios_df
                    .filter(F.col('_change_type').isin(['update_preimage', 'delete']))
                    .groupBy('producto_id')
                    .agg(F.min('_commit_timestamp').alias('first_change_ts')))
    
    (DeltaTable.forName(spark, GOLD_TABLE).alias('gold')
         .merge(
             first_event.alias('c'),
             'gold.producto_id = c.producto_id AND gold.is_current = true'
         )
         .whenMatchedUpdate(set={
             'valid_to':   'c.first_change_ts',
             'is_current': F.lit(False)
         })
         .execute())
    
    display(cambios_df.orderBy('_commit_timestamp'))
    
    display(first_event)
    
  2. Construir todas las nuevas filas SCD2 a partir de los eventos update_postimage e insert, ordenados por _commit_version. Con lead(_commit_timestamp) calculamos el valid_to de cada fila: es el timestamp del siguiente evento para ese mismo producto_id. Si no hay siguiente evento (lead = null) es la versión activa (is_current = True). Los eventos delete se incluyen en la ventana para propagar su timestamp como valid_to de la última postimage, pero no generan fila propia.

    w = Window.partitionBy('producto_id').orderBy('_commit_version')
    
    new_rows = (cambios_df
                .filter(F.col('_change_type').isin(['insert', 'update_postimage', 'delete']))
                .withColumn('next_ts', F.lead('_commit_timestamp').over(w))
                .filter(F.col('_change_type') != 'delete')
                .withColumn('valid_from', F.col('_commit_timestamp'))
                .withColumn('valid_to',   F.col('next_ts'))
                .withColumn('is_current', F.col('next_ts').isNull())
                .drop('_change_type', '_commit_version', '_commit_timestamp', 'next_ts'))
    
    new_rows.write.format('delta').mode('append').saveAsTable(GOLD_TABLE)
    
    display(new_rows.orderBy('producto_id', 'valid_from'))
    

Resultado final


Reconstrucción completa de gold desde silver

En lugar de mantener el proceso incremental, a veces es útil (o necesario) reconstruir la tabla gold desde cero a partir de todo el historial CDF de silver.

La clave es que los eventos del CDF (insert, update_postimage, delete) forman una línea de tiempo ordenada por _commit_version para cada producto_id. Aplicando lead(_commit_timestamp) sobre esa ventana obtenemos directamente el valid_to de cada versión sin necesidad de MERGE ni watermark:

  • insert / update_postimage → generan una fila en gold con valid_from = _commit_timestamp y valid_to = siguiente evento del mismo producto

  • delete → no genera fila, pero su timestamp se propaga como valid_to de la última postimage gracias al lead()

  • update_preimage → descartado (es el espejo del postimage anterior, no aporta información nueva)

El resultado es idéntico al que produce el proceso incremental acumulado, pero calculado en un único scan del CDF.

SILVER_TABLE    = 'productos_silver_scd2'
GOLD_TABLE_FULL = 'productos_gold_scd2_full'

cdf_start_version, _ = get_cdf_start_info(SILVER_TABLE)

# Leer todo el historial CDF desde la versión inicial
all_events = (spark.read
               .format('delta')
               .option('readChangeFeed', 'true')
               .option('startingVersion', cdf_start_version)
               .table(SILVER_TABLE))

# Ventana por producto ordenada por versión de commit (orden real de los cambios)
w = Window.partitionBy('producto_id').orderBy('_commit_version')

gold_full_df = (all_events
    # Conservamos insert, update_postimage y delete.
    # delete entra en la ventana para que su timestamp llegue como valid_to
    # de la última postimage/insert, pero no generará fila propia.
    .filter(F.col('_change_type').isin(['insert', 'update_postimage', 'delete']))
    # valid_to = timestamp del siguiente evento para este producto_id (o null si es el último)
    .withColumn('next_ts', F.lead('_commit_timestamp').over(w))
    # Eliminar los delete (no generan fila en gold)
    .filter(F.col('_change_type') != 'delete')
    .withColumn('valid_from', F.col('_commit_timestamp'))
    .withColumn('valid_to',   F.col('next_ts'))
    .withColumn('is_current', F.col('next_ts').isNull())
    .drop('_change_type', '_commit_version', '_commit_timestamp', 'next_ts'))

gold_full_df.write.format('delta').mode('overwrite').saveAsTable(GOLD_TABLE_FULL)

print(f"Tabla '{GOLD_TABLE_FULL}' generada con {gold_full_df.count()} filas.")
display(spark.table(GOLD_TABLE_FULL).orderBy('producto_id', 'valid_from'))

Puedes ver el notebook completo aquí: NB_cdf_scd2.ipynb


VACUUM y CDF: una convivencia que hay que gestionar

Una de las operaciones de mantenimiento más habituales en Delta Lake es VACUUM, que elimina los archivos físicos que ya no son necesarios para la versión actual de la tabla. En tablas con CDF activo, esta operación tiene consecuencias criticas que hay que entender bien antes de ejecutarla.

⚠️El problema central

VACUUM elimina tanto los archivos de datos de versiones antiguas como los archivos de la carpeta changedata. Una vez ejecutado, es imposible recuperar esos cambios: no existe ninguna operacion de rollback para datos fisicamente borrados.

CDF y VACUUM compiten directamente: CDF necesita el historial para ser util; VACUUM lo elimina para ahorrar almacenamiento.

Que ocurre si VACUUM elimina versiones que necesita CDF

# Despues de ejecutar VACUUM, esta lectura puede FALLAR:
spark.read \
    .format('delta') \
    .option('readChangeFeed', 'true') \
    .option('startingVersion', 0) \
    .table('productos_silver')

# Error: Error getting change data for range [0, 5].
# The provided starting version 0 is older than the
# earliest available version for this table.

Controlar la retención: dos propiedades clave

Delta Lake tiene dos propiedades distintas que controlan la retención. Ambas deben estar alineadas: si solo amplias el log pero no los archivos fisicos, el log registrara versiones para las que ya no existen datos:

# Ver la configuracion actual
spark.sql('DESCRIBE DETAIL productos_silver') \
     .select('properties') \
     .show(truncate=False)

# Ampliar retencion a 90 dias
spark.sql("""
    ALTER TABLE productos_silver
    SET TBLPROPERTIES (
        delta.logRetentionDuration         = 'interval 90 days',
        delta.deletedFileRetentionDuration = 'interval 90 days'
    )
""")

# delta.logRetentionDuration      -> conserva entradas del transaction log
# delta.deletedFileRetentionDuration -> conserva archivos fisicos eliminados incluida la carpeta _change_data)

Validar la versión disponible antes de leer CDF

Es recomendable comprobar que la versión de inicio del CDF sigue disponible antes de lanzar el pipeline, especialmente en entornos donde VACUUM se ejecuta de forma automática:

def get_oldest_available_version(table_name: str) -> int:
    """
    Devuelve la version mas antigua disponible en el historial.
    Util para detectar si VACUUM ha eliminado parte del historial CDF.
    """
    history_df = spark.sql(f'DESCRIBE HISTORY {table_name}')
    return history_df.agg({'version': 'min'}).collect()[0][0]


oldest = get_oldest_available_version('productos_silver')
print(f'Version mas antigua disponible: {oldest}')

if cdf_start_version < oldest:
    raise ValueError(
        f'La version de inicio del CDF ({cdf_start_version}) ya no esta disponible. '
        f'La version mas antigua en el historial es {oldest}. '
        f'Es posible que VACUUM haya eliminado parte del historial.'
    )

Recomendaciones y buenas prácticas

1. Activa CDF desde el inicio del ciclo de vida de la tabla

CDF no captura el historial previo a su activación. Actívalo al crear la tabla o lo antes posible. En Fabric, usa la configuración global de sesión para que todas las tablas nuevas lo hereden:

spark.conf.set('spark.microsoft.delta.properties.defaults.enableChangeDataFeed', 'true')

O bien pásalo como opción al escribir:

df.write.format('delta').option('delta.enableChangeDataFeed', 'true').saveAsTable('mi_tabla')

2. Usa siempre startingVersion, no startingTimestamp, en procesos incrementales

Los timestamps pueden tener problemas de zona horaria o precisión. La versión del log de Delta es determinista e inequívoca. Guarda el watermark como número de versión.

3. Verifica siempre si hay cambios nuevos antes de leer el CDF

Antes de llamar a readChangeFeed, comprueba que realmente hay versiones nuevas:

silver_latest = spark.sql(f"DESCRIBE HISTORY {SILVER_TABLE} LIMIT 1").collect()[0]['version']
if next_version > silver_latest:
    print("[SKIP] No hay cambios nuevos.")

Esto evita errores cuando no hay nada que procesar.

4. Filtra update_preimage lo antes posible

En la mayoría de casos de uso (sincronización simple, upserts), el update_preimage no es necesario. Filtrarlo lo antes posible reduce el volumen de datos que Spark tiene que manejar. En SCD2 sí lo necesitas, pero solo para extraer el timestamp del primer evento.

5. Usa cache() cuando el DataFrame de CDF se lee múltiples veces

En el proceso SCD2, el DataFrame de cambios se usa en dos pasos (cerrar registros + insertar nuevos). Cachearlo evita releer el CDF dos veces:

cambios_df = (spark.read
               .format('delta')
               .option('readChangeFeed', 'true')
               .option('startingVersion', next_version)
               .table(SILVER_TABLE)
               .cache())

# resto de código

cambios_df.unpersist()

6. Usa el patrón lead() para manejar múltiples cambios en el mismo batch

El MERGE simple de SCD2 solo funciona cuando cada registro cambia una vez por batch. Para el caso general, el patrón con lead(_commit_timestamp) sobre una ventana ordenada por _commit_version es la solución correcta y escalable.

7. Ten en cuenta el coste de almacenamiento

CDF genera archivos adicionales de cambios en _change_data/ dentro del directorio de la tabla. En tablas con alta frecuencia de escritura, esto puede aumentar el almacenamiento significativamente. Evalúa si necesitas CDF en todas las tablas o solo en las que son fuente de otros procesos.

8. CDF y VACUUM: precaución con la retención

Si ejecutas VACUUM con una retención baja, puedes perder archivos de cambios que todavía no has procesado. Asegúrate de que la retención de VACUUM sea mayor que el intervalo máximo entre ejecuciones de tu pipeline.