Aller au contenu

SparkStreaming wc


Spark Streaming

Spark Structured Streaming est l’API de traitement de flux continus de données de Spark. Tolérante aux pannes, elle est exprimée en termes de DataFrame et permet d’appliquer les mêmes transformations que sur des données statiques — select, groupBy, join, window — sur un flux entrant en temps réel. Les résultats peuvent être écrits sur la console, dans des fichiers, des bases de données ou des tableaux de bord.

streaming-arch

Spark reçoit les flux de données et les divise en micro-batchs traités par le moteur Spark pour produire le résultat de manière incrémentale.

streaming-flow

Migration DStream → Structured Streaming

L’ancienne API StreamingContext/DStream (Spark 2.x) a été supprimée dans Spark 4.0. Le script SparkStreaming_wc.py fourni utilise l’API actuelle readStream/writeStream basée sur les DataFrames.

wordcount en streaming

Si le cluster n’est pas encore lancé : docker compose up -d (puis attends que check-cluster.sh soit ).

Ouvrez deux Terminaux connectés au master — chacun ouvre son propre terminal et lance :

docker compose exec hadoop-master bash
Mettez les deux fenêtres côte à côte. Dans le premier, déplacez-vous dans ce répertoire :
cd ~/TP_Spark/sparklib
et lancez
spark-submit --deploy-mode client --master local[2] SparkStreaming_wc.py localhost 9999

Dans le second Terminal, lancez le serveur de données netcat sur le port 9999 de la manière suivante :

nc -l -p 9999
et tapez des mots rapidement ; vous devriez voir le comptage de mot apparaître sur le Terminal 1, chaque seconde.

Remarques : pour entrer un série de mots d’un coup sur le second Terminal, vous pouvez soit - écrire une ligne de mots séparés par des espaces dans le second Terminal. Envoyez alors la ligne d’un coup, en appuyant sur la touche entrée de votre clavier. - copier en mémoire une ligne d’un texte quelconque, et la coller dans le second Terminal.

Les paquets de données correspondent à une durée d’une seconde. Pour allonger l’intervalle de mise à jour, modifiez l’option trigger dans le script :

.trigger(processingTime="2 seconds")