Guide de l’exploration distribuée du Web

Découvrez les stratégies, les architectures et les exemples concrets d’exploration distribuée du web pour faire fonctionner des scraps évolutifs sur plusieurs machines.
31 min de lecture
Guide to Distributed Web Crawling blog image

L’exploration distribuée du web est une stratégie qui permet d’étendre les scrapeurs web à plusieurs machines, surmontant ainsi les limites des crawlers à un seul nœud. Dans cet article, nous allons explorer :

  • Exploration distribuée du web vs exploration d’un seul nœud
  • L’architecture de base de l’exploration distribuée du web
  • Exemples concrets d’exploration distribuée du web
  • Stratégies de mise en œuvre et meilleures pratiques
  • Les pièges les plus courants et la façon de les éviter

TL;DR : L ‘exploration distribuée du web utilise une grappe de machines pour explorer les sites web en parallèle, ce qui permet de résoudre les problèmes d’évolutivité et de vitesse que les robots d’exploration à un seul nœud ne peuvent pas gérer. Il offre un débit et une fiabilité accrus (pas de goulot d’étranglement unique) au prix d’une complexité architecturale et d’une surcharge accrues.

Recherche distribuée ou à nœud unique

La plupart des projets d’exploration n’ont pas besoin de systèmes distribués, mais les équipes perdent régulièrement des mois à construire des architectures distribuées complexes alors qu’un seul serveur suffirait.

Dans un crawler à nœud unique, une seule machine se charge de l’extraction, de l’analyse et du stockage des données. Ce type de système est plus facile à développer et à maintenir, et il permet de réaliser des économies. Il convient parfaitement à la recherche de 60 à 500 pages par minute, mais au fur et à mesure que vos besoins en matière de recherche augmentent, un nœud unique devient un goulot d’étranglement, car vous serez limité par les contraintes liées à l’unité centrale, à la mémoire et au réseau.

En revanche, les crawlers distribués répartissent le travail sur plusieurs nœuds, ce qui permet d’effectuer des recherches simultanées à grande échelle, à grande vitesse et avec une meilleure tolérance aux pannes. Si un travailleur tombe en panne, les autres continuent à fonctionner, ce qui améliore la fiabilité. En contrepartie, les systèmes distribués nécessitent des files d’attente de messages, la synchronisation d’une frontière d’URL et une conception minutieuse pour éviter la duplication ou la saturation des sites cibles.

Comparaison complète

Aspect Mono-nœud Distribué
Performance 4 secondes/page en moyenne, 60-120 pages/minute 30x plus rapide, 50 000+ requêtes/seconde
Évolutivité Limité par les ressources d’une seule machine Mise à l’échelle linéaire des nœuds
Tolérance de panne Point de défaillance unique Basculement automatique, auto-réparation
Répartition géographique Lieu fixe Déploiement multirégional
Utilisation des ressources Mise à l’échelle verticale uniquement Optimisation de l’échelle horizontale
Complexité Installation simple, frais généraux minimes Orchestration complexe, coûts opérationnels plus élevés
Coût Investissement initial moins élevé Coûts d’infrastructure plus élevés, meilleur retour sur investissement à grande échelle
Maintenance Charge opérationnelle minimale Nécessite une expertise en matière de systèmes distribués
Traitement des données Traitement local uniquement Traitement parallèle entre les nœuds
Antidétection Rotation limitée des PI Gestion avancée du proxy, empreintes digitales

Devriez-vous opter pour la distribution ? (Un arbre de décision)

Un arbre de décision indiquant si la distribution est la bonne approche

Éléments de base et architecture

Une fois que vous avez décidé d’opter pour le crawling distribué, l’étape suivante consiste à décomposer ce que vous êtes en train de construire. Pensez-y comme à une équipe de course de haute performance, où chaque composant a une tâche spécifique et doit fonctionner ensemble de manière transparente. Voici les principaux éléments nécessaires à la construction d’un système de recherche distribuée :

Planificateur / file d’attente (le cerveau)

Au cœur d’un robot d’exploration distribué se trouve un planificateur ou une file d’attente de tâches qui coordonne le travail entre les nœuds, et c’est là que vivent vos URL avant d’être explorées. Un planificateur peut également gérer la politesse (timing) et les tentatives. Par exemple, vous pouvez mettre en place des files d’attente spécifiques à un domaine afin de vous assurer qu’un site n’est pas parcouru par tous les travailleurs en même temps.

Les programmateurs offrent trois options principales, chacune ayant sa propre personnalité :

  • Kafka: C’est un peu le champion des poids lourds. Il est conçu pour un débit massif et ne perd pas de temps à gérer des millions de messages par seconde. Sa beauté réside dans sa conception basée sur les journaux, qui est parfaite pour gérer votre frontière d’URL. Vous pouvez partitionner par domaine pour garder votre crawling poli.
  • RabbitMQ : c’est un peu comme un couteau suisse. Il offre un routage plus souple que Kafka, avec des fonctionnalités telles que les files d’attente prioritaires. RabbitMQ dispose d’un stockage en mémoire, ce qui le rend plus rapide pour les petites charges de travail. Idéal lorsque vous avez besoin de différentes stratégies d’exploration pour différents types de contenu.
  • Celery: Le meilleur ami du développeur Python. Cette option n’est pas aussi efficace que les autres, mais elle est facile à utiliser. Celery est parfait pour le prototypage ou le crawling à moyenne échelle lorsque vous avez besoin de faire fonctionner quelque chose rapidement.

Frontière des URL et déduplication : La mémoire du crawler

Vous est-il déjà arrivé de parcourir accidentellement la même page 1 000 fois ? C’est là que la déduplication vous sauve la mise. Vous devez suivre ce que vous avez vu tout en respectant la politesse du serveur, afin de ne pas marteler le même domaine à plusieurs reprises.

Les ensembles Redis peuvent vous donner une précision parfaite, mais ils consomment beaucoup de mémoire. Les filtres de Bloom utilisent 90 % de mémoire en moins (1,2 Go contre plus de 12 Go pour un milliard d’URL), mais ils ont parfois des faux positifs (ils peuvent dire que vous n’avez pas vu une URL alors que vous l’avez vue), vous pouvez donc opter pour cette implémentation de Redis :

class DistributedURLFrontier:
    def __init__(self, redis_client):
        self.redis = redis_client

    def add_url(self, url, priority=0):
        domain = urlparse(url).netloc

        # Skip if already seen
        if self.redis.sismember("seen_urls", url):
            return

        # Mark as seen and queue by domain
        self.redis.sadd("seen_urls", url)
        self.redis.lpush(f"queue:{domain}", url)
        self.redis.zadd("priority_queue", {url: priority})

    def get_next_url(self):
        # Get highest priority URL
        result = self.redis.zrevrange("priority_queue", 0, 0)
        if not result:
            return None

        url = result[0]
        domain = urlparse(url).netloc

        # Respect crawl delay (1 second between requests per domain)
        last_crawl = self.redis.get(f"last_crawl:{domain}")
        if last_crawl and time.time() - float(last_crawl) < 1.0:
            return None

        # Remove from queues and update last crawl time
        self.redis.zrem("priority_queue", url)
        self.redis.rpop(f"queue:{domain}")
        self.redis.set(f"last_crawl:{domain}", time.time())

        return url

Nœuds de travail (le muscle)

Les nœuds de travail sont les bêtes de somme de l’exploration. Ce sont les processus ou les machines qui effectuent le travail d’exploration, comme la récupération des URL et le traitement du contenu. Chaque travailleur exécute une logique d’exploration identique (par exemple, le même script ou la même application Python), mais ils fonctionnent en parallèle sur différentes URL de la file d’attente.

Pour tirer le meilleur parti de vos travailleurs, vous devez les maintenir sans état, de sorte que tout état (URL visitées, résultats, etc.) soit stocké dans un espace de stockage partagé ou transmis par le biais de messages. De cette façon, n’importe quel travailleur peut prendre n’importe quelle tâche, et lorsqu’un travailleur meurt, les autres prennent instantanément le relais sans perdre de temps.

class DistributedWorker:
    def __init__(self, worker_id, max_concurrent=50):
        self.worker_id = worker_id
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=30),
            connector=aiohttp.TCPConnector(limit=100)
        )

    async def crawl_batch(self, urls):
        tasks = [self.crawl_url(url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

    async def crawl_url(self, url):
        async with self.semaphore:
            try:
                async with self.session.get(url) as response:
                    content = await response.text()
                    return {'url': url, 'content': content, 'status': response.status}
            except Exception as e:
                return {'url': url, 'error': str(e)}

Conseil de pro : avec les travailleurs, il est important de ne pas utiliser un marteau de forgeron pour tout. Vous devriez utiliser des workers HTTP légers pour le HTML statique et des workers Puppeteer lourds pour les pages rendues en JavaScript. Différents outils, différents pools de travailleurs. Vous pouvez facilement choisir les bons types de proxy pour votre flotte de travailleurs grâce à notre guide complet de sélection de proxy.

Couche de stockage (l’entrepôt)

La couche de stockage est l’endroit où vous enregistrez les données et les métadonnées explorées, et elle se compose souvent de deux parties :

  • Lestockage de contenu gère le gros du HTML brut, les réponses JSON, les images et les PDF. Il s’agit en quelque sorte de votre entrepôt numérique. Les magasins d’objets tels que S3, Google Cloud Storage ou HDFS excellent dans ce domaine, car ils s’étendent à l’infini et gèrent les écritures simultanées de plusieurs travailleurs sans se casser la tête.
  • Lestockage des métadonnées contient l’or structuré que vous avez extrait – champs analysés, relations entre les entités, horodatage de l’exploration et état de réussite ou d’échec. Ces données sont stockées dans des bases de données optimisées pour les requêtes et les mises à jour, et pas seulement pour le volume de stockage.

Les robots d’exploration distribués ont besoin d’un système de stockage capable de gérer des écritures massives et simultanées sans s’étouffer. Les magasins d’objets tels que S3 ou Google Cloud Storage excellent dans le domaine du contenu brut parce qu’ils sont extensibles à l’infini, tandis que les bases de données NoSQL (MongoDB, Cassandra) ou SQL gèrent efficacement les métadonnées structurées.

Surveillance et alerte

L’exploitation d’un crawler distribué nécessite une visibilité sur les performances du système. Vous pouvez utiliser Prometheus et Grafana pour créer des tableaux de bord de surveillance complets qui suivent les taux d’exploration, les taux de réussite, les temps de réponse et la profondeur des files d’attente. Les mesures clés comprennent les demandes par seconde par domaine, les temps de réponse du 95e centile et les tendances de la taille de la file d’attente.

Couche anti-bot et évasion

L’exploration du Web à grande échelle implique un jeu constant du chat et de la souris avec les systèmes anti-bots. Vous avez besoin de trois couches de défense : la rotation des IP à travers des milliers de proxys résidentiels et de centres de données, la randomisation des empreintes digitales des agents utilisateurs et des signatures de navigateur, et le mimétisme comportemental pour éviter les schémas de détection.

Bright Data Web Unlocker offre des capacités anti-détection de niveau professionnel avec un taux de réussite de plus de 99 %, grâce à la résolution automatique des CAPTCHA, à la rotation des IP et à l’empreinte digitale du navigateur. Son approche basée sur l’API simplifie l’intégration tout en gérant des défis anti-bots complexes.

class BrightDataWebUnlocker:
    def crawl_url(self, url: str, options: Dict = None) -> Dict:
        payload = {
            "url": url,
            "zone": self.zone,
            "format": "raw",
            "country": "US",
            "render_js": True,
            "wait_for_selector": ".content"
        }

        response = requests.post(
            self.base_url,
            headers={"Authorization": f"Bearer {self.api_key}"},
            json=payload,
            timeout=60
        )

La rotation de proxy avancée met en œuvre le contrôle de la santé, l’optimisation géographique et la récupération des pannes dans les pools de proxy résidentiels, de centres de données et mobiles. Une gestion réussie du proxy nécessite plus de 1000 IP avec des algorithmes de rotation intelligents.

L’évitement de l’empreinte digitale randomise les agents utilisateurs, les empreintes digitales des navigateurs et les caractéristiques des réseaux afin d’empêcher la détection par des systèmes anti-bots sophistiqués. Cela comprend la rotation des empreintes TLS, l’usurpation d’empreintes de toile et la simulation de modèles comportementaux.

Cas d’utilisation réels avec exemples de code

Explorons deux cas d’utilisation courants pour les crawlers distribués, et décrivons comment les mettre en œuvre à l’aide d’extraits de code. Nous utiliserons Python et Celery dans les exemples pour des raisons de simplicité, mais les principes s’appliquent de manière générale.

Cas d’utilisation 1 : Surveillance des prix du commerce électronique

Imaginez que vous suiviez les prix des concurrents sur 50 000 pages de produits chaque jour. Si vous essayez d’utiliser une seule machine pour accéder à toutes ces URL, vous devrez passer plus de 12 heures à les explorer, en supposant que rien ne se passe. De plus, la plupart des sites de commerce électronique commenceront à vous bloquer après quelques milliers de requêtes rapides provenant de la même adresse IP.

C’est là que le crawling distribué est utile. Au lieu d’une machine débordée, vous répartissez ces 50 000 URL sur des dizaines de travailleurs, chacun utilisant des adresses IP différentes. Ce qui prenait une demi-journée est maintenant terminé en 2 ou 3 heures, et vous passez sous le radar des systèmes anti-bots.

L’installation est simple. Vous devez tenir à jour les listes d’URL de vos concurrents (à partir de sitemaps ou d’explorations), puis utiliser quelque chose comme Celery avec Redis pour distribuer le travail. Chaque matin, vous mettez en file d’attente les 50 000 URL et votre armée de travailleurs se met au travail. Le travailleur 1 s’occupe des chaussures de course Nike, le travailleur 2 s’occupe des baskets Adidas, le travailleur 3 s’occupe des prix Puma. Tout cela simultanément, à partir d’adresses IP différentes.

from celery import Celery
import requests
from bs4 import BeautifulSoup
import random
import time
import re
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# Initialize Celery app with Redis as broker
app = Celery('price_monitor', broker='redis://localhost:6379/0')

# Realistic user agents for rotation
USER_AGENTS = [
   "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
   "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
   "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
   "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15"
]

# Proxy pool (replace with your actual proxy service)
PROXY_POOL = [
   "<http://proxy1:8080>",
   "<http://proxy2:8080>",
   "<http://proxy3:8080>",
   # Add your proxy endpoints here
]

def get_session_with_retries():
   """Create a session with retry strategy and random proxy."""
   session = requests.Session()

   # Retry strategy for resilience
   retry_strategy = Retry(
       total=3,
       backoff_factor=1,
       status_forcelist=[429, 500, 502, 503, 504],
   )
   adapter = HTTPAdapter(max_retries=retry_strategy)
   session.mount("http://", adapter)
   session.mount("https://", adapter)

   # Random proxy rotation
   if PROXY_POOL:
       proxy = random.choice(PROXY_POOL)
       session.proxies = {"http": proxy, "https": proxy}

   return session

@app.task(bind=True, max_retries=3)
def fetch_product_price(self, url, site_config=None):
   """Fetches product price with full anti-detection measures."""

   # Human-like delay before starting
   time.sleep(random.uniform(2, 8))

   # Randomized headers to avoid fingerprinting
   headers = {
       "User-Agent": random.choice(USER_AGENTS),
       "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
       "Accept-Language": "en-US,en;q=0.9",
       "Accept-Encoding": "gzip, deflate, br",
       "Connection": "keep-alive",
       "Upgrade-Insecure-Requests": "1",
       "Sec-Fetch-Dest": "document",
       "Sec-Fetch-Mode": "navigate",
       "Sec-Fetch-Site": "none",
       "Cache-Control": "max-age=0"
   }

   try:
       session = get_session_with_retries()
       resp = session.get(url, headers=headers, timeout=30)
       resp.raise_for_status()

       # Parse the page for price
       soup = BeautifulSoup(resp.text, 'html.parser')
       price_value = extract_price(soup, url, site_config)

       if price_value:
           # Store in database (implement your storage logic here)
           store_price_data(url, price_value, resp.status_code)
           return {"url": url, "price": price_value, "status": "success"}
       else:
           return {"url": url, "error": "Price not found", "status": "failed"}

   except requests.exceptions.RequestException as e:
       print(f"Request failed for {url}: {e}")

       # Retry with exponential backoff
       if self.request.retries < self.max_retries:
           raise self.retry(countdown=60 * (2 ** self.request.retries))

       return {"url": url, "error": str(e), "status": "failed"}

def extract_price(soup, url, site_config=None):
   """Extract price using multiple strategies."""

   # Site-specific selectors (customize for each competitor)
   price_selectors = [
       ".price", ".product-price", ".current-price", ".sale-price",
       "[data-price]", ".price-current", ".price-now", ".offer-price"
   ]

   # Try configured selectors first
   if site_config and site_config.get('price_selector'):
       price_selectors.insert(0, site_config['price_selector'])

   price_text = None
   for selector in price_selectors:
       price_elem = soup.select_one(selector)
       if price_elem:
           price_text = price_elem.get_text(strip=True)
           break

   # Try data attributes as fallback
   if not price_text:
       price_elem = soup.find(attrs={"data-price": True})
       if price_elem:
           price_text = price_elem.get("data-price")

   if not price_text:
       return None

   # Clean and parse price
   return parse_price(price_text)

def parse_price(price_text):
   """Parse price from various formats."""
   # Remove common currency symbols and whitespace
   cleaned = re.sub(r'[^\\d.,]', '', price_text)

   # Handle formats like "1,299.99" or "1299.99"
   try:
       # Remove commas and convert to float
       if ',' in cleaned and '.' in cleaned:
           # Format: 1,299.99
           price_value = float(cleaned.replace(',', ''))
       elif ',' in cleaned:
           # Could be European format: 1299,99
           if cleaned.count(',') == 1 and len(cleaned.split(',')[1]) == 2:
               price_value = float(cleaned.replace(',', '.'))
           else:
               # Format: 1,299 (no cents)
               price_value = float(cleaned.replace(',', ''))
       else:
           price_value = float(cleaned)

       return price_value

   except ValueError:
       print(f"Could not parse price from: {price_text}")
       return None

def store_price_data(url, price, status_code):
   """Store price data in your database."""
   # Implement your storage logic here
   # Could be PostgreSQL, MongoDB, or any other database
   print(f"Storing: {url} -> ${price} (Status: {status_code})")

# Site-specific configurations for better accuracy
SITE_CONFIGS = {
   "competitor1.com": {"price_selector": ".price-box .price"},
   "competitor2.com": {"price_selector": "[data-testid='price']"},
   "competitor3.com": {"price_selector": ".product-price-value"},
}

def get_site_config(url):
   """Get site-specific configuration."""
   for domain, config in SITE_CONFIGS.items():
       if domain in url:
           return config
   return None

# Load your 50k product URLs (from database, file, or API)
def load_product_urls():
   """Load URLs from your data source."""
   # Replace with your actual data loading logic
   urls = [
       "<https://competitor1.com/product/123>",
       "<https://competitor2.com/product/456>",
       # ... 49,998 more URLs
   ]
   return urls

# Main execution: dispatch all crawling tasks
def start_daily_price_monitoring():
   """Start the daily price monitoring job."""
   product_urls = load_product_urls()

   print(f"Starting crawl for {len(product_urls)} URLs...")

   for url in product_urls:
       site_config = get_site_config(url)
       fetch_product_price.delay(url, site_config)

   print("All tasks queued successfully!")

# Run with: python -m celery worker -A price_monitor --loglevel=info
# Start monitoring with: start_daily_price_monitoring()

Dans le code amélioré ci-dessus, fetch_product_price est une tâche Celery robuste conçue pour la surveillance des prix à l’échelle de l’entreprise. En appelant delay(url, site_config) pour chaque URL, nous mettons les tâches en file d’attente dans Redis où plus de 100 travailleurs peuvent les saisir instantanément. L’approche distribuée transforme un crawl de 12 heures sur une seule machine en une opération de 2 à 3 heures sur l’ensemble de votre flotte de travailleurs.

Principales considérations relatives à la production :

  • La gestion du proxy est essentielle : cet exemple inclut un PROXY_POOL qui fait tourner les IP par requête, ce qui est essentiel lorsque l’on touche 50 000 URL. Sans cela, les sites ciblés sont essentiellement victimes de DoS à partir d’une seule IP, ce qui garantit des blocages.
  • Limitation du taux par domaine : Même avec la distribution, 50 000 URL d’un site concurrent déclencheront des alarmes s’ils sont tous atteints en quelques minutes. Nous incluons des délais de type humain(time.sleep(random.uniform(2, 8))), mais envisageons un étranglement spécifique au domaine.
  • Planification et suivi. Utilisez Celery Beat pour la programmation quotidienne ou intégrez Airflow pour les flux de travail complexes. La fonction start_daily_price_monitoring() peut être déclenchée via cron ou votre plateforme d’orchestration.
  • Intégration du pipeline de données. Après chaque exploration, la fonction store_price_data() enregistre les résultats dans votre base de données.
  • Résistance aux pannes. Le code comprend une logique de réessai avec un backoff exponentiel, mais prévoyez des échecs partiels. Si 5 % des URL échouent systématiquement, il convient de vérifier si ces produits ont été abandonnés ou déplacés, ou si les mesures anti-bots de ces sites spécifiques sont plus strictes et nécessitent des approches différentes.

Cas d’utilisation 2 : Référencement et études de marché

Le référencement et les études de marché nécessitent de parcourir des millions de pages à travers deux flux critiques : l’analyse de contenu et la surveillance des moteurs de recherche. Il ne s’agit pas d’une simple récupération, mais d’une veille concurrentielle qui exige rapidité, discrétion et précision.

Si vous souhaitez suivre les mentions de mots clés sur 1 million de pages de concurrents tout en surveillant simultanément les classements dans les SERP pour des centaines de mots clés cibles chaque jour, une seule machine prendrait des semaines et serait bloquée en quelques heures. Une architecture distribuée s’impose donc.

L’approche de l’exploration distribuée du web peut être divisée en deux flux :

  • Intelligence du contenu : Parcourez les sites des concurrents, les médias et les blogs du secteur pour suivre la densité des mots clés, les lacunes du contenu et les tendances du marché.
  • Surveillance des SERP : Surveillez les classements Google/Bing pour vos mots-clés cibles, en suivant les positions des concurrents et les changements de caractéristiques des SERP.
from celery import Celery
import requests
from bs4 import BeautifulSoup
import redis
import hashlib
import json
import time
import random
import re
from urllib.parse import urljoin, urlparse
from dataclasses import dataclass
from typing import List, Dict, Optional
import logging

# Initialize Celery and Redis
app = Celery('seo_intelligence', broker='redis://localhost:6379/0')
redis_client = redis.Redis(host='localhost', port=6379, db=1)

# Anti-detection configurations
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 Safari/605.1.15"
]

PROXY_POOL = [
    "<http://user:[email protected]:8080>",
    "<http://user:[email protected]:8080>",
    # Add your proxy endpoints
]

@dataclass
class KeywordData:
    keyword: str
    frequency: int
    context: List[str]  # Surrounding text snippets
    url: str
    domain: str

@dataclass
class SERPResult:
    keyword: str
    position: int
    title: str
    url: str
    snippet: str
    domain: str

class SEOCrawler:
    def __init__(self):
        self.session = self._create_session()
        
    def _create_session(self):
        session = requests.Session()
        if PROXY_POOL:
            proxy = random.choice(PROXY_POOL)
            session.proxies = {"http": proxy, "https": proxy}
        return session
    
    def _get_headers(self):
        return {
            "User-Agent": random.choice(USER_AGENTS),
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
            "Connection": "keep-alive",
            "Upgrade-Insecure-Requests": "1",
            "Sec-Fetch-Dest": "document",
            "Sec-Fetch-Mode": "navigate",
            "Cache-Control": "max-age=0"
        }

# Deduplication utilities
def get_url_hash(url: str) -> str:
    """Generate consistent hash for URL deduplication."""
    return hashlib.md5(url.encode()).hexdigest()

def is_url_processed(url: str) -> bool:
    """Check if URL was already processed today."""
    url_hash = get_url_hash(url)
    today = time.strftime("%Y-%m-%d")
    return redis_client.exists(f"processed:{today}:{url_hash}")

def mark_url_processed(url: str):
    """Mark URL as processed with 24h expiry."""
    url_hash = get_url_hash(url)
    today = time.strftime("%Y-%m-%d")
    redis_client.setex(f"processed:{today}:{url_hash}", 86400, 1)

# Stream 1: Content Intelligence Crawling
@app.task(bind=True, max_retries=3)
def crawl_content_for_keywords(self, url: str, target_keywords: List[str]):
    """Crawl a page and extract keyword intelligence."""
    
    # Skip if already processed today
    if is_url_processed(url):
        return {"status": "skipped", "reason": "already_processed", "url": url}
    
    # Human-like delay
    time.sleep(random.uniform(3, 7))
    
    try:
        crawler = SEOCrawler()
        response = crawler.session.get(
            url, 
            headers=crawler._get_headers(), 
            timeout=30
        )
        response.raise_for_status()
        
        # Extract content and analyze keywords
        soup = BeautifulSoup(response.text, 'html.parser')
        content_data = extract_keyword_intelligence(soup, url, target_keywords)
        
        # Store results
        store_keyword_data(content_data)
        mark_url_processed(url)
        
        return {
            "status": "success",
            "url": url,
            "keywords_found": len(content_data),
            "total_mentions": sum(kd.frequency for kd in content_data)
        }
        
    except Exception as e:
        logging.error(f"Content crawl failed for {url}: {e}")
        if self.request.retries < self.max_retries:
            raise self.retry(countdown=60 * (2 ** self.request.retries))
        return {"status": "failed", "url": url, "error": str(e)}

def extract_keyword_intelligence(soup: BeautifulSoup, url: str, keywords: List[str]) -> List[KeywordData]:
    """Extract keyword data from page content."""
    # Remove script and style elements
    for script in soup(["script", "style", "nav", "footer", "header"]):
        script.decompose()
    
    # Get clean text content
    text = soup.get_text()
    text = re.sub(r'\\s+', ' ', text).strip().lower()
    
    domain = urlparse(url).netloc
    keyword_data = []
    
    for keyword in keywords:
        keyword_lower = keyword.lower()
        
        # Find all occurrences
        pattern = r'\\b' + re.escape(keyword_lower) + r'\\b'
        matches = list(re.finditer(pattern, text))
        
        if matches:
            # Extract context around each match
            contexts = []
            for match in matches[:5]:  # Limit to first 5 for performance
                start = max(0, match.start() - 100)
                end = min(len(text), match.end() + 100)
                context = text[start:end].strip()
                contexts.append(context)
            
            keyword_data.append(KeywordData(
                keyword=keyword,
                frequency=len(matches),
                context=contexts,
                url=url,
                domain=domain
            ))
    
    return keyword_data

# Stream 2: SERP Tracking
@app.task(bind=True, max_retries=3)
def track_serp_rankings(self, keyword: str, search_engine: str = "google"):
    """Track SERP positions for a keyword."""
    
    time.sleep(random.uniform(5, 10))  # Longer delay for search engines
    
    try:
        crawler = SEOCrawler()
        
        if search_engine == "google":
            search_url = f"<https://www.google.com/search?q={keyword}&num=20>"
        else:  # Bing
            search_url = f"<https://www.bing.com/search?q={keyword}&count=20>"
        
        # Special headers for search engines
        headers = crawler._get_headers()
        headers.update({
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Referer": "<https://www.google.com/>" if search_engine == "google" else "<https://www.bing.com/>"
        })
        
        response = crawler.session.get(search_url, headers=headers, timeout=30)
        response.raise_for_status()
        
        # Parse SERP results
        soup = BeautifulSoup(response.text, 'html.parser')
        serp_data = parse_serp_results(soup, keyword, search_engine)
        
        # Store SERP data
        store_serp_data(serp_data)
        
        return {
            "status": "success",
            "keyword": keyword,
            "results_found": len(serp_data),
            "search_engine": search_engine
        }
        
    except Exception as e:
        logging.error(f"SERP tracking failed for '{keyword}': {e}")
        if self.request.retries < self.max_retries:
            raise self.retry(countdown=120 * (2 ** self.request.retries))
        return {"status": "failed", "keyword": keyword, "error": str(e)}

def parse_serp_results(soup: BeautifulSoup, keyword: str, search_engine: str) -> List[SERPResult]:
    """Parse search engine results page."""
    results = []
    position = 1
    
    if search_engine == "google":
        # Google result selectors
        result_elements = soup.select('div.g')
        
        for element in result_elements:
            title_elem = element.select_one('h3')
            link_elem = element.select_one('a[href]')
            snippet_elem = element.select_one('.VwiC3b, .s3v9rd')
            
            if title_elem and link_elem:
                url = link_elem.get('href', '')
                if url.startswith('/url?q='):
                    url = url.split('/url?q=')[1].split('&')[0]
                
                results.append(SERPResult(
                    keyword=keyword,
                    position=position,
                    title=title_elem.get_text(strip=True),
                    url=url,
                    snippet=snippet_elem.get_text(strip=True) if snippet_elem else "",
                    domain=urlparse(url).netloc if url else ""
                ))
                position += 1
                
                if position > 20:  # Limit to top 20
                    break
    
    else:  # Bing
        result_elements = soup.select('.b_algo')
        
        for element in result_elements:
            title_elem = element.select_one('h2 a')
            snippet_elem = element.select_one('.b_caption p')
            
            if title_elem:
                url = title_elem.get('href', '')
                
                results.append(SERPResult(
                    keyword=keyword,
                    position=position,
                    title=title_elem.get_text(strip=True),
                    url=url,
                    snippet=snippet_elem.get_text(strip=True) if snippet_elem else "",
                    domain=urlparse(url).netloc if url else ""
                ))
                position += 1
                
                if position > 20:
                    break
    
    return results

# Data storage functions
def store_keyword_data(keyword_data: List[KeywordData]):
    """Store keyword intelligence in database."""
    for kd in keyword_data:
        data = {
            "keyword": kd.keyword,
            "frequency": kd.frequency,
            "context": kd.context,
            "url": kd.url,
            "domain": kd.domain,
            "crawled_at": time.time()
        }
        # Store in your preferred database (PostgreSQL, MongoDB, etc.)
        redis_client.lpush(f"keyword_data:{kd.keyword}", json.dumps(data))
        print(f"Stored: {kd.keyword} found {kd.frequency} times on {kd.domain}")

def store_serp_data(serp_data: List[SERPResult]):
    """Store SERP tracking data."""
    for result in serp_data:
        data = {
            "keyword": result.keyword,
            "position": result.position,
            "title": result.title,
            "url": result.url,
            "snippet": result.snippet,
            "domain": result.domain,
            "tracked_at": time.time()
        }
        redis_client.lpush(f"serp_data:{result.keyword}", json.dumps(data))
        print(f"SERP: '{result.keyword}' -> #{result.position} {result.domain}")

# Orchestration functions
def start_content_intelligence_crawl(urls: List[str], keywords: List[str]):
    """Launch content crawling across 1M+ URLs."""
    print(f"Starting content intelligence crawl for {len(urls)} URLs...")
    
    for url in urls:
        crawl_content_for_keywords.delay(url, keywords)
    
    print(f"Queued {len(urls)} content crawling tasks")

def start_serp_tracking(keywords: List[str], search_engines: List[str] = ["google", "bing"]):
    """Launch SERP tracking for target keywords."""
    print(f"Starting SERP tracking for {len(keywords)} keywords...")
    
    for keyword in keywords:
        for engine in search_engines:
            track_serp_rankings.delay(keyword, engine)
    
    print(f"Queued {len(keywords) * len(search_engines)} SERP tracking tasks")

# Example usage
if __name__ == "__main__":
    # Target keywords for analysis
    target_keywords = [
        "artificial intelligence", "machine learning", "data science",
        "cloud computing", "cybersecurity", "digital transformation"
    ]
    
    # URLs to crawl for content intelligence (load from your database)
    content_urls = [
        "<https://techcrunch.com/ai>",
        "<https://venturebeat.com/ai>",
        "<https://competitor-blog.com/insights>",
        # ... 999,997 more URLs
    ]
    
    # Keywords to track in SERPs
    serp_keywords = [
        "best AI tools 2025", "enterprise machine learning",
        "data analytics platform", "cloud security solutions"
    ]
    
    # Launch both crawling streams
    start_content_intelligence_crawl(content_urls, target_keywords)
    start_serp_tracking(serp_keywords)

Principales considérations en matière de production :

  • Déduplication intelligente : Le système utilise Redis avec une expiration de 24 heures pour éviter d’explorer à nouveau le même contenu tous les jours. Pour une déduplication plus poussée, envisagez le hachage de contenu afin de détecter les pages dont l’URL a changé mais dont le contenu est resté le même.
  • Limitation du débit en fonction du domaine : L’exploration des SERP doit faire l’objet d’une attention particulière, car les moteurs de recherche sont plus agressifs en matière de blocage. Notre exemple inclut des délais plus longs (5-10 secondes) pour les requêtes de recherche par rapport à l’exploration du contenu (3-7 secondes).
  • Suivi des fonctionnalités SERP : L’analyseur traite les résultats de Google et de Bing, mais vous pouvez l’étendre pour suivre les featured snippets, les packs locaux et d’autres fonctionnalités SERP qui ont un impact sur votre stratégie de visibilité.
  • Intégration du pipeline de données : Stockez les résultats dans la base de données de votre choix (PostgreSQL pour l’analyse relationnelle, MongoDB pour les schémas flexibles).

Meilleures pratiques

Respecter robots.txt ou en subir les conséquences

Examinez le fichier robots.txt avant de mettre les URL en file d’attente et respectez scrupuleusement les directives relatives au délai d’exécution du crawl. Si vous les ignorez, toute votre plage d’adresses IP sera mise sur liste noire plus rapidement que vous ne pouvez dire “crawler distribué”. Intégrez la vérification de robots.txt directement dans votre frontière d’URL, et n’en faites pas la responsabilité du nœud de travail.

Au-delà de la conformité à robots.txt, vous devez également mettre en œuvre des stratégies complètes d’évitement de la détection dans l’ensemble de votre flotte distribuée.

Toujours enregistrer pour le débogage de 3 heures du matin

Lorsque votre crawl meurt à minuit, vous avez besoin de métadonnées : URL, statut HTTP, latence, ID du proxy, ID du travailleur et horodatage pour chaque requête. Les journaux structurés en JSON vous sauvent la mise. La question n’est pas de savoir si vous aurez besoin de déboguer une panne de production, mais quand.

Validez tout, ne faites confiance à rien

La validation du schéma des données extraites est nécessaire à la survie de vos robots d’indexation distribués, car une seule réponse malformée peut empoisonner l’ensemble de vos données. Vérifiez les types de champs, les champs obligatoires et la fraîcheur des données lors de l’ingestion. Il s’agit d’attraper les déchets à temps ou de les voir corrompre votre analyse des mois plus tard.

Lutter sans relâche contre la dette de vitesse

Les systèmes distribués pourrissent rapidement. Vous devez planifier un nettoyage mensuel des clés Redis périmées, des files d’attente de tâches défaillantes et des processus de travail orphelins. Les URL mortes s’accumulent, les pools de proxy sont pollués par des IP bloquées et les fuites de mémoire des travailleurs s’aggravent avec le temps. La maintenance n’est pas très glamour, mais elle permet à votre crawler de rester en bonne santé. La dette technique des crawlers s’accroît de manière exponentielle, il faut donc s’en préoccuper avant qu’elle n’entraîne l’effondrement de votre système.

Les pièges courants du crawling distribué et comment les éviter

De nombreux écueils sont rencontrés lors de l’utilisation de l’exploration distribuée du web, c’est pourquoi la plupart des ingénieurs recherchent des alternatives, telles que les ensembles de données de Bright Data. Voici quelques-uns de ces écueils :

Le piège du “point unique de défaillance

Construire tout autour d’une instance Redis ou d’un coordinateur principal est une mauvaise idée. Lorsqu’il meurt, c’est l’ensemble de votre crawl qui s’arrête.

Correction : Utiliser Redis Cluster ou plusieurs instances de courtiers. Concevoir le coordinateur pour qu’il disparaisse, de sorte que les travailleurs doivent gérer les pannes du courtier de manière gracieuse et se reconnecter automatiquement.

La spirale de la mort

Lorsque les URL qui échouent sont immédiatement renvoyées dans la file d’attente principale, une boucle infinie est créée, ce qui a pour effet d’endommager les points de terminaison et d’encombrer votre pipeline.

Correction : Separate retry queues with exponential backoff. Première tentative après 1 minute, puis 5, puis 30. Après 3 échecs, envoi dans une file d’attente de lettres mortes pour examen manuel.

L’erreur “Tous les travailleurs sont égaux

La distribution des tâches par round-robin suppose que chaque travailleur dispose de la même vitesse de réseau, de la même qualité de proxy et de la même puissance de traitement. La réalité est souvent plus désordonnée.

Correction : Mettez en place un système de notation des travailleurs basé sur le taux de réussite, la latence et le débit. Acheminez les tâches les plus difficiles vers vos travailleurs les plus performants.

La bombe à retardement des fuites de mémoire

Les travailleurs qui ne redémarrent jamais accumulent des fuites de mémoire, en particulier lorsqu’ils analysent du code HTML malformé ou traitent des réponses volumineuses. S’ils sont laissés à eux-mêmes, les performances de votre système distribué d’exploration du web se dégradent jusqu’à ce que les workers tombent en panne.

Correction : Redémarrer les travailleurs après le traitement de 1000 tâches ou toutes les 4 heures. Surveiller l’utilisation de la mémoire et mettre en place des coupe-circuits.

Conclusion

Vous disposez à présent d’un modèle de crawling distribué qui s’étend à des millions de pages. Pour approfondir votre compréhension des principes fondamentaux de l’exploration du web qui sous-tendent les systèmes distribués, lisez notre présentation complète des robots d’exploration du web.

L’architecture est simple, mais la vérité brutale est que 90 % des équipes échouent encore parce qu’elles sous-estiment la complexité de l’anti-détection d’un système distribué d’exploration du Web. La gestion de milliers de proxys, la rotation des empreintes digitales et le traitement des CAPTCHA deviennent un cauchemar technique à plein temps qui détourne l’attention de l’extraction de données précieuses.

C’est exactement la raison d’être de l’API Web Unlocker de Bright Data. Au lieu de passer des mois à construire une infrastructure de proxy qui tombe en panne toutes les semaines, vos travailleurs distribués acheminent simplement les demandes par le biais de l’API Web Unlocker, dont le taux de réussite est supérieur à 99 %.

Pas de gestion de proxy, pas de rotation d’empreintes digitales, pas de résolution de CAPTCHA, mais une extraction de données fiable à grande échelle. Votre équipe d’ingénieurs se concentre sur l’élaboration de la logique commerciale, tandis que Bright Data se charge du jeu du chat et de la souris avec les systèmes anti-bots.

Le calcul est simple : une anti-détection maison coûte des mois de temps d’ingénierie et des maux de tête en termes de maintenance, alors que Web Unlocker ne coûte qu’une fraction de ce coût tout en offrant une fiabilité de niveau entreprise. Cessez donc de réinventer la roue et commencez à extraire des informations. Obtenez votre compte Bright Data gratuit dès aujourd’hui et transformez votre crawler distribué en un avantage concurrentiel, plutôt qu’un fardeau de maintenance.