Skip to main content

Command Palette

Search for a command to run...

Análisis del precio de las criptomonedas en tiempo real con Microsoft Fabric – Parte 1: Ingesta de datos con un Eventstream

Updated
8 min read
Análisis del precio de las criptomonedas en tiempo real con Microsoft Fabric – Parte 1: Ingesta de datos con un Eventstream

🎯 Objetivo

En este primer paso diseñaremos la arquitectura general del proyecto, crearemos los elementos necesarios en Microsoft Fabric para recibir y almacenar datos en tiempo real, y desarrollaremos un script en Python para capturar datos de criptomonedas desde la API pública de Binance.


🧱 Arquitectura del proyecto

A continuación, se muestra un esquema de alto nivel de la solución:

Componentes principales:

  • Eventstream: canal de ingesta en tiempo real que recibe eventos externos.

  • Eventhouse: base de datos KQL optimizada para almacenar eventos.

  • Script Python: encargado de consultar la API de Binance y enviar eventos al Eventstream.


⚙️ Crear los componentes en Microsoft Fabric

1. Crear un Eventstream y configurarlo

  1. Ve a tu workspace en Microsoft Fabric.

  2. Haz clic en Nuevo > Eventstream.

  3. Asigna un nombre, por ejemplo: es_Crypto.

  4. En la pantalla principal, selecciona el origen de datos Custom endpoint. Este tipo de origen permite recibir datos desde fuentes personalizadas a través de protocolos compatibles como Azure Event Hub, Kafka o AMQP.

  5. Asigna un nombre al origen (por ejemplo, binance-input) y añádelo.

  6. Publica los cambios del eventstream.


2. Crear un Eventhouse

  1. En el mismo workspace, selecciona Nuevo > Eventhouse. Este será el destino donde se almacenarán los eventos recibidos desde el Eventstream.

  2. Dale un nombre, como eh_Crypto.

  3. No es necesario crear manualmente una tabla aún, ya que se generará automáticamente al conectar el Eventstream en el siguiente paso.


🐍 Script Python para obtener y enviar datos

Este script permite simular eventos financieros en tiempo real conectándose a la API pública de Binance y enviando datos a un Azure Event Hub.

Cada segundo realiza las siguientes acciones:

  • 🔄 Consulta los datos de precios (ticker/price) y la hora del servidor desde Binance.

  • 🧱 Reestructura los datos en formato JSON incluyendo symbol, price y serverTime.

  • 📤 Agrupa los eventos en lotes y los envía al Azure Event Hub especificado.

  • 🛡️ Maneja errores de red, datos incompletos o problemas al construir lotes.

  • 🔁 Ejecuta estas acciones de manera continua dentro de un bucle asincrónico (asyncio).

💡 Ideal para pruebas o demostraciones de ingesta de datos en tiempo real desde una fuente externa.

🔒 Asegúrate de actualizar las variables EVENT_HUB_NAME y EVENT_HUB_CONNECTION_STR con los valores proporcionados por Fabric al crear el custom endpoint.

Al configurar el origen del eventstream, Microsoft Fabric genera automáticamente:

  • El nombre del Event Hub

  • El Connection string (primary key) con autenticación SAS Key

📦 Prerequisitos

pip install azure-eventhub

El script lo tienes disponible aquí:

import urllib.request
from urllib.error import URLError, HTTPError
import json
import asyncio
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient
import time
from datetime import datetime

# --- Configuración ---
# URLs de la API de Binance
ticker_price_url = "https://api.binance.com/api/v3/ticker/price"
server_time_url = "https://api.binance.com/api/v3/time"

# --- Configuración de Azure Event Hubs ---
# **IMPORTANTE:** Reemplaza estos valores con tu cadena de conexión y nombre de Event Hub.
EVENT_HUB_NAME = "es_XXX"
EVENT_HUB_CONNECTION_STR = "Endpoint=sb://XXX.servicebus.windows.net/;SharedAccessKeyName=key_XXX;SharedAccessKey=XXX;EntityPath=es_XXX"

# --- Configuración del Intervalo ---
# Define cada cuántos segundos quieres ejecutar el proceso
SEND_INTERVAL_SECONDS = 1

# --- Función para obtener datos de la API ---
def fetch_api_data(url):
    """Obtiene datos de una URL y los decodifica como JSON."""
    try:
        # print(f"Fetching data from: {url}") # Descomentar si necesitas logs detallados
        with urllib.request.urlopen(url, timeout=10) as response:
            if response.status == 200:
                return json.loads(response.read().decode('utf-8'))
            else:
                print(f"Error: Received status code {response.status} from {url}")
                return None
    except (HTTPError, URLError) as e:
        print(f"Error fetching data from {url}: {e}")
        return None
    except Exception as e:
        print(f"An unexpected error occurred while fetching data from {url}: {e}")
        return None


# --- Función principal asíncrona para procesar y enviar datos (sin cambios) ---
async def process_and_send_data(producer):
    # 1. Obtener la hora del servidor y los precios
    server_time_data = fetch_api_data(server_time_url)
    ticker_price_data = fetch_api_data(ticker_price_url)

    # Validar que se obtuvieron los datos
    if not server_time_data or 'serverTime' not in server_time_data:
        print("Error: No se pudo obtener la hora del servidor de Binance.")
        return False
    if not ticker_price_data or not isinstance(ticker_price_data, list):
        print("Error: No se pudieron obtener los datos de precios o el formato es incorrecto.")
        return False

    server_time = server_time_data['serverTime']
    # print(f"Server time obtained: {server_time}") # Log menos verboso
    # print(f"Number of tickers received: {len(ticker_price_data)}")

    # 2. Reestructurar los datos
    events_to_send = []
    for ticker in ticker_price_data:
        if 'symbol' in ticker and 'price' in ticker:
            restructured_event = {
                "serverTime": server_time,
                "tickerInfo": {
                    "symbol": ticker['symbol'],
                    "price": ticker['price']
                }
            }
            events_to_send.append(restructured_event)
        # else: # No loguear cada ticker inválido para no llenar la consola
            # print(f"Skipping invalid ticker data: {ticker}")

    if not events_to_send:
        print("No valid events were restructured to be sent in this cycle.")
        return False # Considerar esto un éxito parcial o fallo según el caso; aquí lo marcamos como no exitoso

    # print(f"Successfully restructured {len(events_to_send)} events.")

    # 3. Enviar datos a Event Hub usando el productor existente
    try:
        # El productor se crea y gestiona en main_loop
        event_data_batch = await producer.create_batch()
        event_count_in_batch = 0

        for event_payload in events_to_send:
            event_body_str = json.dumps(event_payload)
            event_data = EventData(event_body_str)
            try:
                event_data_batch.add(event_data)
                event_count_in_batch += 1
            except ValueError: # Batch full
                if event_count_in_batch > 0:
                    # print(f"Batch full ({event_count_in_batch} events). Sending batch...")
                    await producer.send_batch(event_data_batch)
                    # print("Batch sent.")
                else:
                     print(f"Warning: Single event is too large to fit in a batch: {len(event_body_str)} bytes.")
                     # Decide si quieres saltar este evento o manejarlo de otra forma
                     continue # Saltar este evento y continuar con el siguiente

                # Crear nuevo lote y añadir el evento actual
                event_data_batch = await producer.create_batch()
                event_data_batch.add(event_data)
                event_count_in_batch = 1

        # Enviar el último lote si contiene eventos
        if event_count_in_batch > 0:
            # print(f"Sending final batch ({event_count_in_batch} events)...")
            await producer.send_batch(event_data_batch)
            # print("Final batch sent.")

        print(f"Successfully sent {len(events_to_send)} events to Event Hub '{EVENT_HUB_NAME}'.")
        return True

    except Exception as e:
        print(f"An error occurred during Event Hub send: {e}")
        # Aquí podrías añadir lógica para reintentar o manejar errores específicos de Event Hubs
        return False


# --- Bucle principal asíncrono ---
async def main_loop():
    """Bucle principal que ejecuta el proceso cada X segundos."""

    # Crear el productor una vez fuera del bucle para reutilizar la conexión
    producer = EventHubProducerClient.from_connection_string(
        conn_str=EVENT_HUB_CONNECTION_STR,
        eventhub_name=EVENT_HUB_NAME
    )

    while True:
        start_time = time.time()
        print(f"--- [{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] Starting data fetch and send cycle ---")

        try:
            # Pasar el productor a la función
            success = await process_and_send_data(producer)
            if success:
                print(f"--- Cycle finished successfully ---")
            else:
                print(f"--- Cycle finished with errors (check logs above) ---")

        except Exception as e:
            # Captura errores inesperados en el ciclo principal
            print(f"--- FATAL ERROR in cycle: {e} ---")
            # Podrías decidir salir del bucle o reintentar crear el productor aquí
            # Por ahora, solo logueamos y continuamos esperando el intervalo

        end_time = time.time()
        elapsed_time = end_time - start_time
        wait_time = max(0, SEND_INTERVAL_SECONDS - elapsed_time) # Calcular cuánto esperar realmente

        print(f"--- Cycle duration: {elapsed_time:.2f} seconds. Waiting for {wait_time:.2f} seconds before next cycle ---")
        await asyncio.sleep(wait_time) # Esperar el tiempo restante del intervalo

# --- Punto de entrada ---
if __name__ == "__main__":
    if EVENT_HUB_CONNECTION_STR == "TU_CADENA_DE_CONEXION_EVENT_HUB" or EVENT_HUB_NAME == "TU_NOMBRE_DE_EVENT_HUB":
        print("ERROR: Por favor, actualiza las variables EVENT_HUB_CONNECTION_STR y EVENT_HUB_NAME con tus valores reales.")
    else:
        try:
            print(f"Starting continuous data sending every {SEND_INTERVAL_SECONDS} seconds...")
            print(f"Target Event Hub: {EVENT_HUB_NAME}")
            print("Press Ctrl+C to stop.")
            asyncio.run(main_loop())
        except KeyboardInterrupt:
            print("\nExecution stopped by user (Ctrl+C).")
        except Exception as e:
            print(f"\nAn unexpected error stopped the execution: {e}")
            exit(1)

🚀 Ejecución del script y conexión con el Eventstream

▶️ Ejecución del script en local

Una vez configurado correctamente el script, se puede ejecutar desde consola:

python src/scripts/binance-api-ticker-price.py

El script comenzará a realizar ciclos cada segundo, extrayendo datos de la API pública de Binance y enviándolos al Azure Event Hub configurado. Se mostrará información por consola sobre los ciclos y el número de eventos enviados.


✅ Comprobación de recepción de eventos en Microsoft Fabric

Para comprobar que los datos están llegando correctamente:

  1. Accede a tu Eventstream en Microsoft Fabric.

  2. En el menú superior, haz clic en "En directo / Live".

  3. Deberías comenzar a ver eventos con la siguiente estructura:


🔗 Conexión del Eventstream con un Eventhouse (KQL database)

Para persistir y consultar los datos, se puede conectar el Eventstream con un Eventhouse de la siguiente manera:

  1. En el Eventstream, haz clic en "Add destination". Lo puedes hacer desde el menú superior o desde la interfaz gráfica.

  2. Selecciona Eventhouse.

  3. Configuramos el destino:

    • Data ingestion mode: Direct ingestion

    • Destination name: Crypto-Eventhouse

    • Seleccionamos el workspace y eventhouse creado previamente.

  4. Guarda la configuración.

  5. Publica los cambios del eventstream.

    Una vez se hayan publicado los cambios, aparecerá el destino Eventhouse en rojo indicando que no está configurado.

  6. Clicamos en configurar.

  7. Creamos una nueva tabla con el nombre Crypto_RAW

  8. En la siguiente ventana, veremos una previsualización de los datos y como serán almacenados en la tabla del Eventhouse.

    Si cambiamos el valor de Nested levels a 2, podemos ver como los datos que vienen en formato JSON son extraídos correctamente, es decir, el precio de la criptomoneda por un lado y el identificador de la criptomoneda por otro. Además, el valor de serverTime también se ha convertido automaticamente a formato datetime.

    Esto es interesante conocerlo porque supone una modificación de los datos en tiempo real conforme se van recibiendo para almacenarlos correctamente en el Eventhouse. En nuestro caso, queremos almacenar los datos en crudo para limpiarlos y modificarlos más adelante en la capa silver.

  9. Volvemos a modificar el valor de Nested Levels a 1

  10. Clicamos en el icono del lápiz de la esquina derecha para modificar las columnas.

  11. Configuramos de la siguiente forma:

  12. Aplicamos los cambios.

  13. Finalizamos la configuración.

A partir de este momento, todos los eventos que lleguen al Eventstream se almacenarán en la tabla definida dentro del Eventhouse.


📊 Visualización de los datos en el Eventhouse

Una vez conectado el Eventstream con un Eventhouse y creada la tabla, es fundamental validar que los datos se están recibiendo y almacenando correctamente. Esto puede hacerse fácilmente mediante una consulta KQL desde la base de datos.

Desde el área de trabajo de Microsoft Fabric:

  1. Accede a tu KQL Database (Eventhouse).

  2. Selecciona la tabla Crypto_RAW.

  3. Verificamos que los datos se están almacenando correctamente desde la previsualización.

More from this blog

D

DataGym | Microsoft Fabric

36 posts