SparkMLlib
Table des Matières
Spark ML¶
pyspark.ml est la librairie de machine learning de Spark. Son objectif est de rendre l’apprentissage automatique scalable et accessible. Elle fournit notamment :
- Algorithmes ML : classification, régression, clustering, filtrage collaboratif ;
- Vectorisation : extraction de features, transformations, réduction de dimension, sélection ;
- Pipelines : outils pour construire, évaluer et régler des pipelines ML ;
- Persistance : sauvegarde et chargement des modèles et des pipelines ;
- Utilitaires : algèbre linéaire, statistiques, manipulation de données.
Pourquoi pyspark.ml et non pyspark.mllib ?
L’ancienne API pyspark.mllib, basée sur les RDD, est en mode maintenance depuis Spark 2.0. La nouvelle API pyspark.ml, basée sur les DataFrame, est la seule API recommandée et activement développée par Apache. C’est celle utilisée dans ce TP.
Placez-vous dans le sous-répertoire MLlib :
Prise en main¶
Trois catégories usuelles d’apprentissage artificiel sont : classification, clustering et filtrage collaboratif.
- Classification : Gmail utilise la classification pour décider si un message reçu est un spam ou non, à partir des données du message (expéditeur, destinataires, sujet, corps). La classification apprend à étiqueter de nouveaux exemples à partir d’un jeu d’exemples déjà étiquetés.
- Clustering : Google News utilise le clustering pour regrouper les articles d’actualité en catégories, à partir du titre et du contenu. Les algorithmes de clustering découvrent les regroupements présents dans une collection de données.
- Filtrage collaboratif : Amazon utilise le filtrage collaboratif (couramment appelé recommandation) pour déterminer les produits qui plairont à un utilisateur, à partir de son historique et de sa similarité avec d’autres utilisateurs.
Structure de donnees¶
Avec pyspark.ml, les jeux de données d’apprentissage sont des DataFrames (et non plus des RDD comme dans l’ancienne pyspark.mllib). Deux colonnes sont attendues par la plupart des estimateurs :
features: unVectorcontenant les attributs d’un exemple ;label: la classe (classification) ou la valeur cible (régression), de typedouble.
Construction de vecteurs
Création d’un vecteur dense (toutes les valeurs sont stockées, y compris les 0.0) :
Création d’un vecteur creux : seules les valeurs non nulles sont stockées. On précise la taille du vecteur, puis les coordonnées des valeurs non nulles, soit via un dictionnaire, soit via deux listes (indices, valeurs) :
sparseVec1 = Vectors.sparse(5, {0: 1.0, 2: 2.0, 3: 4.0})
sparseVec2 = Vectors.sparse(5, [0, 2, 3], [1.0, 2.0, 4.0])
Assembler des colonnes en features
Si les attributs sont répartis sur plusieurs colonnes d’un DataFrame, l’utilitaire VectorAssembler regroupe ces colonnes en une seule colonne features :
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["x", "y", "z"], outputCol="features")
df_features = assembler.transform(df)
Estimateurs, modèles et pipelines
Un Estimator (algorithme d’apprentissage) expose une méthode fit(df) qui retourne un Model. Le Model est lui-même un transformer qui dispose d’une méthode transform(df) ajoutant une colonne prediction au DataFrame. Plusieurs étages (vectorisation, transformation, apprentissage) peuvent être enchaînés dans un Pipeline, lui-même un Estimator :
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(training_df)
predictions = model.transform(test_df)
Utilisation de la librairie¶
La préparation des données est une étape essentielle à la qualité des analyses et modélisations qui en découlent. Extraction, filtrage, échantillonnage, complétion des données manquantes, correction, détection d’anomalies ou atypiques, jointures, agrégations ou cumuls, transformations (recodage, discrétisation, réduction, “normalisation”…), sélection des variables ou des features, recalages d’images ou de signaux… sont les principales procédures à mettre en œuvre de façon itérative. Par principe, la plupart de ces étapes se distribuent naturellement sur les nœuds d’un cluster en exécutant des fonctions MapReduce.
Utilisez si nécessaire la documentation pour trouver la signification des paramètres transmis aux transformations et aux actions.
K-Means
L’algorithme de classification non supervisé le plus utilisé, car le plus facile à passer à l’échelle, est le K-Means. Visualisez le fichier kmeans_data.txt dans le répertoire courant : il contient des lignes de valeurs ; on peut considérer que chaque donnée est un point de coordonnée 3D. L’algorithme des K-Means cherche à classer les 3 premières lignes dans la première classe et les 3 dernières dans la seconde classe.
Modifiez si besoin le fichier MLlib_Kmeans.py pour pointer vers l’endroit où vous avez sauvegardé le fichier kmeans_data.txt. Si vous l’avez laissé sur le Namenode, vous pouvez utiliser file:///root/TP_Spark/MLlib/kmeans_data.txt. Si vous l’avez déplacé sur HDFS, alors vous pouvez utiliser input/kmeans_data.txt. Lancez alors :
features, puis appel à KMeans.fit(...).
Verbosité de Spark — Depuis la version 5 de l’image, la verbosité de Spark (et de Hadoop) est déjà réduite par défaut au niveau
WARN. Si vous voyez quand même deswarnings, n’y prêtez pas attention. Pour repasser temporairement en mode verbeux (utile en cas de débogage), voir la FAQ Dépannage.
Régression logistique
Un exemple trivial de fouille de textes est la détection de pourriels. Le fichier spam.txt contient un spam par ligne, le fichier ham.txt un message normal par ligne. Il s’agit ensuite de prévoir le statut spam ou non d’un nouveau message.
Observez le contenu du fichier MLlib_LogReg.py :
- Les deux fichiers sont chargés en DataFrame, avec une colonne
label(1.0pourspam,0.0pournormal) ettextpour le message brut. - Un
Pipelineenchaîne trois étapes :Tokenizer(découpe le texte en mots),HashingTF(vectorise les mots dans un espace de hachage de 10000 dimensions, en comptant les fréquences), puisLogisticRegression. - La méthode
pipeline.fit(...)produit un modèle entraîné, qu’on applique ensuite à deux messages tests pour prédire leur classe.
Lancez la commande :
Remarque : un traitement préalable supprimant les mots charnières (articles, conjonctions…) et résumant chaque mot à sa racine aurait été nécessaire pour une application opérationnelle.
Forêt aléatoire
Un autre système de classification qui passe à l’échelle : les forêts aléatoires.
Analysez le code. Notez que les données ne sont pas extraites d’un fichier mais directement codées dans le script source, sous la forme d’un DataFrame à 2 colonnes label et features.
Organisation des moyens de lutte contre les feux de foret¶
L’objectif de ce TP est d’analyser des données pour permettre d’organiser au mieux les moyens humains et matériels pour réagir le plus rapidement en cas de départ de feu en forêt. Pour cela, on cherche à regrouper les feux de forêt ayant eu lieu les années précédentes en clusters géographiques afin de disposer de moyens de lutte aux endroits les mieux adaptés.
Les données de départ de feu proviennent des services forestiers américains et canadiens (Fire Information for Resource Management System US/Canada).
- Les fichiers getshapefiles.sh, convert_shp_to_csv.py et train_Kmeans.py sont déjà disponibles dans
~/TP_Spark/MLlib/du master (montés depuis le repo). - Si nécessaire (utilisateurs Windows) :
dos2unix getshapefiles.sh. - Exécutez le script getshapefiles.sh pour récupérer des fichiers shapefile (GIS — Système d’information Géographique) nécessaires au TP :
Le téléchargement correspond à 4 années de données. D’autres données sont disponibles sur le même serveur pour des années antérieures.
Depuis le répertoire ~/TP_Spark/MLlib/, appliquez le script convert_shp_to_csv.py sur chacun des 4 fichiers rapatriés de la manière suivante :
python3 convert_shp_to_csv.py modis_fire_2015_365_conus.dbf modis_fire_2015_365_conus.csv
python3 convert_shp_to_csv.py modis_fire_2016_366_conus.dbf modis_fire_2016_366_conus.csv
python3 convert_shp_to_csv.py modis_fire_2017_365_conus.dbf modis_fire_2017_365_conus.csv
python3 convert_shp_to_csv.py modis_fire_2018_365_conus.dbf modis_fire_2018_365_conus.csv
Votre travail consiste à entraîner un classifieur K-Means avec les données de longitude et de latitude pour :
- d’une part, obtenir le centre des clusters de départ de feu, permettant aux pompiers de disposer des moyens humains et matériels pour réagir au plus vite ;
- d’autre part, informer les pompiers des moyens à utiliser en cas d’un nouveau départ de feu (dont on connaît la longitude et la latitude), par prédiction du cluster le plus proche.
Bien sûr, les réponses que vous apporterez à ces questions sont nécessairement simplistes, car bien d’autres facteurs entrent en considération dans la vraie vie (notamment la morphologie du terrain) !
Avant de poursuivre, quelques remarques importantes :
- Commencez à travailler avec un seul fichier de données, puis étendez à tous les fichiers lorsque vous aurez validé votre algorithme.
- Lisez le CSV avec
spark.read.option("header", True).csv(...): la première ligne est automatiquement utilisée comme noms de colonnes (plus besoin de la filtrer manuellement). - Convertissez les colonnes
LATITUDEetLONGITUDEendoubleavec.cast("double")avant de filtrer. - Pour le K-Means, utilisez la classe
pyspark.ml.clustering.KMeans. - Pour vous aider, le fichier train_Kmeans.py contient la lecture d’un fichier de données et l’affichage des 5 premières lignes. Pour le tester :
- Ne conservez que les données dont la latitude est comprise entre 42 et 50, et dont la longitude est comprise entre -124 et -110.
- Assemblez les 2 colonnes en un vecteur
featuresviaVectorAssembler. - Pour l’apprentissage du classifieur, prenez un échantillon de 50% des données lues avec la méthode
randomSplit(...), l’autre partie sera utilisée pour la prédiction.