Aller au contenu

SparkMLlib

Table des Matières


Spark ML

pyspark.ml est la librairie de machine learning de Spark. Son objectif est de rendre l’apprentissage automatique scalable et accessible. Elle fournit notamment :

  • Algorithmes ML : classification, régression, clustering, filtrage collaboratif ;
  • Vectorisation : extraction de features, transformations, réduction de dimension, sélection ;
  • Pipelines : outils pour construire, évaluer et régler des pipelines ML ;
  • Persistance : sauvegarde et chargement des modèles et des pipelines ;
  • Utilitaires : algèbre linéaire, statistiques, manipulation de données.

Pourquoi pyspark.ml et non pyspark.mllib ?

L’ancienne API pyspark.mllib, basée sur les RDD, est en mode maintenance depuis Spark 2.0. La nouvelle API pyspark.ml, basée sur les DataFrame, est la seule API recommandée et activement développée par Apache. C’est celle utilisée dans ce TP.

Placez-vous dans le sous-répertoire MLlib :

cd ~/TP_Spark/MLlib/

Prise en main

Trois catégories usuelles d’apprentissage artificiel sont : classification, clustering et filtrage collaboratif.

  • Classification : Gmail utilise la classification pour décider si un message reçu est un spam ou non, à partir des données du message (expéditeur, destinataires, sujet, corps). La classification apprend à étiqueter de nouveaux exemples à partir d’un jeu d’exemples déjà étiquetés.
  • Clustering : Google News utilise le clustering pour regrouper les articles d’actualité en catégories, à partir du titre et du contenu. Les algorithmes de clustering découvrent les regroupements présents dans une collection de données.
  • Filtrage collaboratif : Amazon utilise le filtrage collaboratif (couramment appelé recommandation) pour déterminer les produits qui plairont à un utilisateur, à partir de son historique et de sa similarité avec d’autres utilisateurs.

Structure de donnees

Avec pyspark.ml, les jeux de données d’apprentissage sont des DataFrames (et non plus des RDD comme dans l’ancienne pyspark.mllib). Deux colonnes sont attendues par la plupart des estimateurs :

  • features : un Vector contenant les attributs d’un exemple ;
  • label : la classe (classification) ou la valeur cible (régression), de type double.

Construction de vecteurs

Création d’un vecteur dense (toutes les valeurs sont stockées, y compris les 0.0) :

from pyspark.ml.linalg import Vectors
denseVec = Vectors.dense([1.0, 0.0, 2.0, 4.0, 0.0])

Création d’un vecteur creux : seules les valeurs non nulles sont stockées. On précise la taille du vecteur, puis les coordonnées des valeurs non nulles, soit via un dictionnaire, soit via deux listes (indices, valeurs) :

sparseVec1 = Vectors.sparse(5, {0: 1.0, 2: 2.0, 3: 4.0})
sparseVec2 = Vectors.sparse(5, [0, 2, 3], [1.0, 2.0, 4.0])

Assembler des colonnes en features

Si les attributs sont répartis sur plusieurs colonnes d’un DataFrame, l’utilitaire VectorAssembler regroupe ces colonnes en une seule colonne features :

from pyspark.ml.feature import VectorAssembler
assembler   = VectorAssembler(inputCols=["x", "y", "z"], outputCol="features")
df_features = assembler.transform(df)

Estimateurs, modèles et pipelines

Un Estimator (algorithme d’apprentissage) expose une méthode fit(df) qui retourne un Model. Le Model est lui-même un transformer qui dispose d’une méthode transform(df) ajoutant une colonne prediction au DataFrame. Plusieurs étages (vectorisation, transformation, apprentissage) peuvent être enchaînés dans un Pipeline, lui-même un Estimator :

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model    = pipeline.fit(training_df)
predictions = model.transform(test_df)

Utilisation de la librairie

La préparation des données est une étape essentielle à la qualité des analyses et modélisations qui en découlent. Extraction, filtrage, échantillonnage, complétion des données manquantes, correction, détection d’anomalies ou atypiques, jointures, agrégations ou cumuls, transformations (recodage, discrétisation, réduction, “normalisation”…), sélection des variables ou des features, recalages d’images ou de signaux… sont les principales procédures à mettre en œuvre de façon itérative. Par principe, la plupart de ces étapes se distribuent naturellement sur les nœuds d’un cluster en exécutant des fonctions MapReduce.

Utilisez si nécessaire la documentation pour trouver la signification des paramètres transmis aux transformations et aux actions.

K-Means

L’algorithme de classification non supervisé le plus utilisé, car le plus facile à passer à l’échelle, est le K-Means. Visualisez le fichier kmeans_data.txt dans le répertoire courant : il contient des lignes de valeurs ; on peut considérer que chaque donnée est un point de coordonnée 3D. L’algorithme des K-Means cherche à classer les 3 premières lignes dans la première classe et les 3 dernières dans la seconde classe.

Modifiez si besoin le fichier MLlib_Kmeans.py pour pointer vers l’endroit où vous avez sauvegardé le fichier kmeans_data.txt. Si vous l’avez laissé sur le Namenode, vous pouvez utiliser file:///root/TP_Spark/MLlib/kmeans_data.txt. Si vous l’avez déplacé sur HDFS, alors vous pouvez utiliser input/kmeans_data.txt. Lancez alors :

spark-submit --deploy-mode client --master local[2] MLlib_Kmeans.py
Interprétez le résultat affiché sur le Terminal à la vue du code. Notez les trois étapes : lecture du fichier en DataFrame, assemblage des 3 colonnes en un vecteur features, puis appel à KMeans.fit(...).

ℹ Verbosité de Spark — Depuis la version 5 de l’image, la verbosité de Spark (et de Hadoop) est déjà réduite par défaut au niveau WARN. Si vous voyez quand même des warnings, n’y prêtez pas attention. Pour repasser temporairement en mode verbeux (utile en cas de débogage), voir la FAQ Dépannage.

Régression logistique

Un exemple trivial de fouille de textes est la détection de pourriels. Le fichier spam.txt contient un spam par ligne, le fichier ham.txt un message normal par ligne. Il s’agit ensuite de prévoir le statut spam ou non d’un nouveau message.

Observez le contenu du fichier MLlib_LogReg.py :

  • Les deux fichiers sont chargés en DataFrame, avec une colonne label (1.0 pour spam, 0.0 pour normal) et text pour le message brut.
  • Un Pipeline enchaîne trois étapes : Tokenizer (découpe le texte en mots), HashingTF (vectorise les mots dans un espace de hachage de 10000 dimensions, en comptant les fréquences), puis LogisticRegression.
  • La méthode pipeline.fit(...) produit un modèle entraîné, qu’on applique ensuite à deux messages tests pour prédire leur classe.

Lancez la commande :

spark-submit --deploy-mode client --master local[2] MLlib_LogReg.py

Remarque : un traitement préalable supprimant les mots charnières (articles, conjonctions…) et résumant chaque mot à sa racine aurait été nécessaire pour une application opérationnelle.

Forêt aléatoire

Un autre système de classification qui passe à l’échelle : les forêts aléatoires.

spark-submit --deploy-mode client --master local[2] MLlib_ForetA.py

Analysez le code. Notez que les données ne sont pas extraites d’un fichier mais directement codées dans le script source, sous la forme d’un DataFrame à 2 colonnes label et features.

Organisation des moyens de lutte contre les feux de foret

L’objectif de ce TP est d’analyser des données pour permettre d’organiser au mieux les moyens humains et matériels pour réagir le plus rapidement en cas de départ de feu en forêt. Pour cela, on cherche à regrouper les feux de forêt ayant eu lieu les années précédentes en clusters géographiques afin de disposer de moyens de lutte aux endroits les mieux adaptés.

Les données de départ de feu proviennent des services forestiers américains et canadiens (Fire Information for Resource Management System US/Canada).

  1. Les fichiers getshapefiles.sh, convert_shp_to_csv.py et train_Kmeans.py sont déjà disponibles dans ~/TP_Spark/MLlib/ du master (montés depuis le repo).
  2. Si nécessaire (utilisateurs Windows) : dos2unix getshapefiles.sh.
  3. Exécutez le script getshapefiles.sh pour récupérer des fichiers shapefile (GIS — Système d’information Géographique) nécessaires au TP :
chmod +x getshapefiles.sh
./getshapefiles.sh # cela peut prendre 30 secondes

Le téléchargement correspond à 4 années de données. D’autres données sont disponibles sur le même serveur pour des années antérieures.

Depuis le répertoire ~/TP_Spark/MLlib/, appliquez le script convert_shp_to_csv.py sur chacun des 4 fichiers rapatriés de la manière suivante :

python3 convert_shp_to_csv.py modis_fire_2015_365_conus.dbf modis_fire_2015_365_conus.csv
python3 convert_shp_to_csv.py modis_fire_2016_366_conus.dbf modis_fire_2016_366_conus.csv
python3 convert_shp_to_csv.py modis_fire_2017_365_conus.dbf modis_fire_2017_365_conus.csv
python3 convert_shp_to_csv.py modis_fire_2018_365_conus.dbf modis_fire_2018_365_conus.csv

Votre travail consiste à entraîner un classifieur K-Means avec les données de longitude et de latitude pour :

  • d’une part, obtenir le centre des clusters de départ de feu, permettant aux pompiers de disposer des moyens humains et matériels pour réagir au plus vite ;
  • d’autre part, informer les pompiers des moyens à utiliser en cas d’un nouveau départ de feu (dont on connaît la longitude et la latitude), par prédiction du cluster le plus proche.

Bien sûr, les réponses que vous apporterez à ces questions sont nécessairement simplistes, car bien d’autres facteurs entrent en considération dans la vraie vie (notamment la morphologie du terrain) !

Avant de poursuivre, quelques remarques importantes :

  • Commencez à travailler avec un seul fichier de données, puis étendez à tous les fichiers lorsque vous aurez validé votre algorithme.
  • Lisez le CSV avec spark.read.option("header", True).csv(...) : la première ligne est automatiquement utilisée comme noms de colonnes (plus besoin de la filtrer manuellement).
  • Convertissez les colonnes LATITUDE et LONGITUDE en double avec .cast("double") avant de filtrer.
  • Pour le K-Means, utilisez la classe pyspark.ml.clustering.KMeans.
  • Pour vous aider, le fichier train_Kmeans.py contient la lecture d’un fichier de données et l’affichage des 5 premières lignes. Pour le tester :
spark-submit --deploy-mode client --master local[2] train_Kmeans.py
  • Ne conservez que les données dont la latitude est comprise entre 42 et 50, et dont la longitude est comprise entre -124 et -110.
  • Assemblez les 2 colonnes en un vecteur features via VectorAssembler.
  • Pour l’apprentissage du classifieur, prenez un échantillon de 50% des données lues avec la méthode randomSplit(...), l’autre partie sera utilisée pour la prédiction.