python⭐ Featured
Python Async para Data Pipelines: De 2 horas a 15 minutos
Cómo usar asyncio y aiohttp para paralelizar fetches de APIs y acelerar pipelines dramáticamente.
22 de marzo de 20269 min
pythonasyncperformance
Los pipelines de datos tradicionales son síncronos y secuenciales: extraen, transforman, cargan, y esperan en cada paso. Cuando procesás miles de fuentes de datos en paralelo, esto se convierte en un cuello de botella. Python async (asyncio) puede transformar un pipeline de 2 horas en uno de 15 minutos.
El problema con los pipelines síncronos
Un pipeline típico de ETL que consulta 50 APIs externas:
# Síncrono: 50 APIs * 2 segundos promedio = 100 segundos
def extract_all_sources(sources: list[str]) -> list[dict]:
results = []
for source in sources:
result = requests.get(source, timeout=10)
results.append(result.json())
return results
Con asyncio:
import asyncio
import aiohttp
# Async: todas las APIs en paralelo = ~2 segundos
async def extract_all_sources(sources: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [fetch_source(session, url) for url in sources]
return await asyncio.gather(*tasks)
async def fetch_source(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
return await response.json()
results = asyncio.run(extract_all_sources(sources))
Limitaciones y rate limits
El problema con gather sin control: si tenés 500 fuentes, lanzás 500 requests simultáneos, lo cual puede:
- Saturar la red o la API destino
- Violar rate limits y recibir 429s
- Consumir toda la memoria
import asyncio
from asyncio import Semaphore
async def extract_with_rate_limit(
sources: list[str],
max_concurrent: int = 20
) -> list[dict]:
semaphore = Semaphore(max_concurrent)
async def fetch_with_limit(url: str) -> dict:
async with semaphore:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
tasks = [fetch_with_limit(url) for url in sources]
return await asyncio.gather(*tasks, return_exceptions=True)
Async + retry logic
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=30)
)
async def fetch_with_retry(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
resp.raise_for_status()
return await resp.json()
Pipeline async completo
import asyncio
import aiohttp
from asyncio import Semaphore
import logging
logger = logging.getLogger(__name__)
class AsyncDataPipeline:
def __init__(self, max_concurrent: int = 20):
self.semaphore = Semaphore(max_concurrent)
async def extract(self, sources: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [self._fetch(session, url) for url in sources]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = [r for r in results if not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
if failed:
logger.warning(f"{len(failed)} sources failed")
return successful
async def _fetch(self, session: aiohttp.ClientSession, url: str) -> dict:
async with self.semaphore:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=15)) as resp:
resp.raise_for_status()
data = await resp.json()
return {url: url, data: data, status: ok}
except Exception as e:
logger.error(f"Failed to fetch {url}: {e}")
raise
async def run(self, sources: list[str]) -> dict:
raw_data = await self.extract(sources)
transformed = [self.transform(item) for item in raw_data]
return {records: len(transformed), data: transformed}
pipeline = AsyncDataPipeline(max_concurrent=15)
result = asyncio.run(pipeline.run(sources))
Resultado: pipelines que antes tardaban horas ahora corren en minutos.
Escrito por Mariano Gobea Alcoba