Dans cette partie nous allons nous intéresser aux fichiers de données et leur utilisation sous Spark 3, le tout en analysant les données de vaccination du COVID-19.

De nombreuses fonctionnalités importantes de Spark vont être découvertes et expliquées, alors c’est parti !

[Part 1] Installation de Spark

[Part 3] Spark Dataframes

[Part 4] Spark Streaming

[Part 5] Spark ML sur AWS

Qu’est-ce qu’un RDD ?

RDD signifie « Resilient Distributed Datasets » ou « jeu de données distribué résilient » en français. Il s’agit d’un jeu de données (un fichier .csv par exemple) découpé entre plusieurs ordinateurs auquel on peut accéder via spark pour effectuer des calculs.

Les RDD sont les documents de base de spark exécutés dans la mémoire vive (RAM) pour aller plus vite. Ils ne peuvent pas être modifiés (seul l’accès en lecture est autorisé) et spark se charge de gérer l’assemblage des différentes parties du document pour fournir les résultats.

Parmi les autres types principaux de documents (objets) on compte :

  • les DataFrame
  • les DStream
  • les Graph RDD

Premiers pas avec Spark 3 et les RDD

Afin de découvrir Spark 3 par la pratique, nous allons utiliser des open data. Il s’agit de données fournies par des entreprises ou des gouvernements pour permettre aux développeurs et aux entrepreneurs de les exploiter (les conditions varient mais généralement c’est gratuit).

Spark 3 étant un outil de Big Data, j’ai choisi de prendre un dataset comprenant plusieurs milliers de lignes : l’état de la vaccination par pays au fil du temps. Ce sujet va nous permettre de voir comment utiliser Spark et de nous mettre à la place d’un organisme de santé souhaitant suivre l’avancement de la vaccination pour identifier les pays à « risque ».

Les données que l’on va utiliser

Nous allons partir du fichier « vaccinations.csv » du github Our World in Data.

Ce document ainsi que le code de cet article seront à récupérer directement à cette adresse et à placer dans le dossier de votre choix (dans la suite, on considèrera qu’il s’agit du dossier « D:\workspaces\penseeartif_tutorials_pyspark« .

Créez à présent votre fichier python de code pour cette partie 2, la correction étant dans part2_rdd_spark.py

Dans la suite, les méthodes en vertes sont celles pour lesquelles spark n’effectue pas les calculs (cf explication plus bas) et celles en bleues sont agrégées automatiquement.

Import de pyspark

Afin d’utiliser Spark nous allons avoir besoin de deux classes du module pyspark :

  • SparkConf = sert à configurer le framework, par exemple en indiquant quelle est l’adresse du serveur spark principal
  • SparkContext = sert à se connecter et à effectuer toutes les opérations de spark

On commence par les importer dans notre code via

from pyspark import SparkConf, SparkContext

Puis on configure en indiquant que le maître de spark est « local » (donc notre ordinateur) et on donne un nom quelconque à l’instance de spark

# Initialization
conf = SparkConf().setMaster("local").setAppName("[Part 2] Spark RDD")
sc = SparkContext(conf=conf)

On est prêts à utiliser spark pour analyser nos données !

Chargement de données avec textFile

Afin de charger un fichier de données on utilise la méthode textFile qui prend en entrée le chemin du fichier.

# Loading dataset
data = sc.textFile("datasets/vaccinations.csv")

Il est bon de noter dès à présent que spark n’exécute pas tous les codes immédiatement.

En effet, spark est un logiciel qui va optimiser les traitements que l’on souhaite effectuer. Imaginons que l’on écrive le code suivant :

  • Extraire les deux premières colonnes
  • Ne garder ensuite que la première colonne

Si on exécutait chaque ligne et qu’on récupérait le résultat à chaque fois, le coût en calculs serait lourd (n’oublions pas qu’on parle de Big Data, avec plusieurs millions/milliards de lignes !). Spark va donc attendre qu’on lui demande d’effectuer le calculs (certaines méthodes le font automatiquement) pour optimiser les travaux à faire. Ici, spark enlèverait la première demande qui n’apporte rien.

Point clé : les méthodes qui n’entraînent pas de calculs s’appellent des transformations, tandis que celles qui provoquent un résultat sont les actions. Dans la pratique, les transformations servent à construire ce qu’on appelle le DAG (Directed Acyclic Graph) qui est l’arbre des opérations à réaliser pour parvenir au résultat final demandé (et qui est optimisé par spark).

« textFile » est une méthode qui, justement, n’entraîne pas de calcul immédiat. Si vous regardez ce que vaut data, vous obtiendrez ceci « datasets/vaccinations.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0″

Nous allons voir à présent comment faire pour récupérer quelques données du fichier.

NB : il existe une méthode parallelize qui permet de créer un RDD à partir d’une liste (donc comme s’il s’agissait d’un fichier). Cette méthode est utile pour faire des tests rapides.

# Loading data from list
data_list = sc.parallelize([("France",5),("Espagne",3),("Angleterre",8)])

Extraction de quelques données : sample

La méthode sample permet d’extraire aléatoirement des données d’une « variable » spark. Elle n’entraîne pas de calcul, aussi faut-il la combiner avec collect qui demande à spark d’effectuer les calculs et de renvoyer le résultat.

Sample s’utilise avec deux valeurs :

  • withReplacements qui autorise spark a collecter éventuellement plusieurs fois une ligne si on le met à True (i.e. tirage avec remise)
  • fraction qui donne la proportion de données à extraire (doit être entre 0 et 1)

Fraction va nécessiter que nous connaissions le nombre de lignes au total dans nos données. Pour connaitre le nombre de lignes, on utilise la méthode count.

Grâce à ça, nous pouvons écrire notre première méthode qui va sélectionner aléatoirement environ 5 lignes de données et les afficher

# Show some data
def show_some_data(data,nb_samples=5):
    print(data.sample(withReplacement=False,fraction=nb_samples/data.count(),seed=0).collect())

Il ne nous reste qu’à appeler cette méthode pour voir ce que « sc.textFile » a récupéré exactement

## Sample of what is loaded from textFile
show_some_data(data)

Ce qui devrait donner les informations sur les Bahamas, la République Dominicaine, le Mozambique, la Tanzanie et les Etats Unis (deux fois).

Visualisation de quelques lignes de données au hasard

Alternative à sample : first et take pour récupérer les premières lignes (sans calculs supplémentaires)

On vient de voir « sample » qui récupère des lignes aléatoires, mais pourquoi ne pas « juste » regarder les premières lignes de données ?

La plupart du temps, lorsqu’on travaille avec spark on utilise la méthode first qui renvoie la première ligne de données (sans faire tous les calculs sur les autres données) ou la méthode take qui renvoie les N premières lignes de données ! L’intérêt de sample est d’avoir un échantillon aléatoire de données.

A noter que first et take sont des actions i.e. elles ne nécessitent pas d’utiliser collect pour afficher le résultat !

# Show first data
def show_first_data(data):
    print(data.first())
## First row of the textFile
show_first_data(data)
# Show first rows of data
def show_nfirst_datas(data,nb_samples=5):
    print(data.take(nb_samples))
## 5 first rows of the textFile
show_nfirst_datas(data)

Exécuter les calculs avec collect

La méthode collect a été décrite ci-dessus.

A noter, encore une fois, qu’il n’est pas nécessaire de l’appeler à chaque fois puisque certaines méthodes déclenchent automatiquement les calculs.

Travail sur les données avec map

La fonction map sert à prendre en entrée une ligne et à renvoyer une autre donnée (de format différent ou non).

Par exemple, si la donnée est une string, on peut renvoyer en sortie la liste des mots (en faisant un split sur les espaces). Si la donnée est un tuple, on peut enlever un élément du tuple, etc…

Pour être utilisée, il faut donner à « map » une fonction d’un seul paramètre (qui sera la fonction appliquée à chaque ligne ensuite).

Voici la structure des données qui vont nous servir avec leur numéro de colonne :

  • Nom du pays (col 0)
  • Date à laquelle a été faite l’observation (col 2)
  • Nombre total de vaccinations (col 3)
  • % de vaccination dans le pays (col 4)
  • Nombre de nouvelles doses par jour (col 6)
  • Nombre de personnes totalement vaccinées (col 10)

Servons-nous en pour transformer nos lignes de texte en tuples avec le nom du pays, la date et le nombre de vaccinations. En Python lorsqu’une fonction est fournie en paramètre on peut soit l’écrire avant (idéal lorsque le code est complexe et prend plusieurs lignes) ou utiliser une fonction lambda (idéal si l’opération est très simple et courte).

# Parse the dataset
def parse_data(data):
    line = data.split(",")
    country_name = line[0]
    date = line[2]
    total_vacc = float(line[3] if line[3] != "" else -1)
    percent_vacc = float(line[4] if line[4] != "" else -1)
    nb_new_vacc_per_day = float(line[6] if line[6] != "" else -1)
    nb_fully_vacc = float(line[10] if line[10] != "" else -1)
    return (country_name,date,total_vacc,percent_vacc,nb_new_vacc_per_day,nb_fully_vacc)
data_vaccination_countries = data.map(parse_data)
show_first_data(data_vaccination_countries)

Ce qui renvoie l’information du Mozambique du 17 octobre 2021. On remarque alors la présence des -1 signifiant que les données sont absentes.

On va se concentrer sur le nombre total de personnes complètement vaccinées dans les pays. Conservons uniquement cette information :

# Extract number of vaccinations in each country
data_nb_fully_vacc = data_vaccination_countries.map(lambda x:(x[0],x[1],x[5]))
show_first_data(data_nb_fully_vacc)

Se faciliter certains mapping avec mapValues

Voici une petite astuce utile dans certains cas : utiliser mapValues plutôt que map.

mapValues est une fonction qui fonctionne comme map sauf que son entrée est directement la valeur et qu’elle renvoie la nouvelle valeur associée à la clé en cours.

ATTENTION : comme précédemment, mapValues part du principe que les données sont au format (clé, valeur). Pour remplacer le code du paragraphe au-dessus si on écrit (notez bien qu’on déplace les index de la liste) :

# Same as MAP, not working because of the format
data_nb_fully_vacc2 = data_vaccination_countries.mapValues(lambda x: (x[0],x[4]))
show_first_data(data_nb_fully_vacc2)

On obtient « (‘Afghanistan’, (‘2’, ‘-‘)) » car x vaut… ‘2021-02-22’ et non (‘2021-02-22’, 0.0).

Il faut donc transformer nos données pour les avoir dans le bon format :

# Same as MAP, corrected
data_nb_fully_vacc3 = data_vaccination_countries.map(lambda x: (x[0],(x[1],x[2],x[3],x[4],x[5]))).mapValues(lambda x: (x[0],x[4]))
show_first_data(data_nb_fully_vacc3)

Ce qui nous permet d’avoir « (‘Afghanistan’, (‘2021-02-22’, 0.0)) » comme précédemment !

Supprimer les lignes dont la donnée de vaccination est inconnue via « filter »

La fonction filter permet d’enlever toutes les données ne répondant pas au critère indiqué.

Pour l’utiliser, il suffit de lui fournir en argument une fonction qui, pour chaque ligne, renvoie True (= on garde la ligne) ou False (= on la retire).

# Filter datas where nb_fully_vacc is not known (ie = -1)
filtered_data = data_nb_fully_vacc.filter(lambda x : x[2] != -1.0)
show_nfirst_datas(filtered_data)

Compter le nombre de lignes avec count

La fonction count renvoie le nombre de données. Servons-nous en pour compter le nombre total d’enregistrements :

# Number of datas in total
nb_datas_total = data_nb_fully_vacc.count()
nb_datas_not_empty = filtered_data.count()
print(f"{nb_datas_not_empty}/{nb_datas_total}")

On obtient 42 052 lignes non vides pour 79 850 lignes.

Compteurs plus précis : countByKey, countByValue

La fonction countByKey permet de compter le nombre d’enregistrements pour chaque clé.

# Number of datas per country
count_per_country = filtered_data.countByKey()
for key,val in count_per_country.items():
    print(key,val)

La fonction countByValue permet de compter le nombre d’enregistrements ayant la même valeur.

Dans notre cas, les valeurs sont composées de la date et du nombre de vaccinations : si on utilise directement countByValues on n’aura quasiment que des valeurs différentes puisque pour chaque date le nombre de vaccinations va être différent. On va donc utiliser la fonction map pour ne garder que les dates et compter les enregistrements de chaque date !

# Count per date
data_per_date = filtered_data.map(lambda x: x[1])
count_date = data_per_date.countByValue()
for key,val in sorted(count_date.items(),key=lambda x:x[1]):
    print(key,val)

On trouve qu’il y a 145 enregistrements pour le 9 août 2021.

NB : countByKey est recodée dans le paragraphe suivant grâce à reduceByKey pour l’exemple.

Visualiser toutes les valeurs d’un pays avec lookup

La méthode lookup permet d’afficher toutes les valeurs associées à une clé.

On va l’utiliser pour observer les données à disposition sur la France :

# Visualize datas for key France
dates_France = sorted(filtered_data.lookup("France"))
print(f"Nb dates for France : {len(dates_France)}, latest date = {dates_France[-1]}")

Ainsi on voit qu’il y a 404 enregistrements non-vides pour la France, le dernier datant du 03 février 2022.

On peut noter que la valeur retournée est basée sur la représentation (clé, valeur). Puisque nos données étaient au format (clé, valeur1, valeur2) au lieu de (clé, (valeur1, valeur2)), seule la valeur 1 est récupérée !

Regrouper les dates par pays avec groupBy et groupByKey

Ces deux fonctions sont plus complexes à utiliser car :

  • il faut que les données soient au format (clé, valeur) comme pour lookup
  • la sortie de la fonction n’est pas « directement lisible », en général elle est faite pour être enchaînée avec une autre fonction

Concrètement, groupBy permet de regrouper des données sur le critère indiqué en vue d’effectuer un traitement dessus (par exemple additionner des valeurs). La méthode groupByKey fait la même chose avec comme critère « clé identique » (ce qui revient à groupBy(lambda x : x[0])).

Afin de bien comprendre groupByKey, voici un exemple : on souhaite regrouper les pays entre eux afin d’avoir toutes les dates d’un pays.

La première étape est de passer au format (clé, valeur)

# Visualize dates per country
countries_and_datse = filtered_data.map(lambda x: (x[0],x[1]))

Si on appelle groupByKey ensuite on obtient « (‘Afghanistan’, <pyspark.resultiterable.ResultIterable at 0x2d35cd2f370>) »

show_first_data(countries_and_dates.groupByKey())

En effet, l’opération de groupement renvoie un objet pyspark de type « ResultIterable ». Pour le convertir en objet Python classique, il est indispensable de passer par une méthode comme mapValues.

Généralement, on appelle la méthode mapValues avec « list », « sorted » ou « set » pour obtenir respectivement ici toutes les dates, les dates triées, les dates uniques par pays.


show_first_data(countries_and_dates.groupByKey().mapValues(list))
show_first_data(countries_and_dates.groupByKey().mapValues(sorted))
show_first_data(countries_and_dates.groupByKey().mapValues(set))

La méthode groupBy peut être utilisée pour grouper, par exemple sur les dates.

# Visualize countries per date GROUPBY
show_first_data(countries_and_dates.groupBy(lambda x:x[1]).mapValues(list))

Compter le nombre de fois qu’apparait chaque pays avec reduceByKey

reduceByKey permet de rassembler deux données ayant la même clé et de renvoyer une nouvelle donnée avec cette clé (la clé = le premier élément du tuple).

ATTENTION la méthode reduceByKey nécessite deux conditions pour fonctionner :

  • Les données qu’on lui donne doivent être au format (clé, valeur). Par conséquent si les données sont (country_name, date, nb_fully_vacc) la fonctionne renverra une erreur. A la place, il faut les mettre sous la forme (country_name, (date, nb_fully_vacc))
  • La fonction reduceByKey fonctionne par itérations : la sortie à l’itération précédente est utilisée dans l’itération suivante. Il faut donc faire très attention à ce que ce qu’on renvoie avec notre fonction soit compatible avec les autres données !

La fonction que l’on donne à reduceByKey possède deux arguments notés x et y qui correspondent à deux lignes ayant la même clé (ce sont donc les valeurs uniquement). La clé n’est pas accessible, donc si elle est nécessaire il faut l’ajouter d’abord aux valeurs grâce à « map ».

Notre objectif ici est de compter le nombre de fois que chaque pays apparait grâce à reduceByKey. La technique est d’associer à chaque clé un compteur qui vaut 1 (grâce à map), puis grâce à reduceByKey on va additionner ces 1 pour obtenir le total.

On commence par créer les données (pays, 1) à partir des données filtrées :

# Add a counter to the keys
counting_datas = filtered_data.map(lambda x: (x[0],1))
show_nfirst_datas(counting_datas)

Puis on réduit le nombre de données en sommant sur les valeurs :

# Summing the counters
count_countries = counting_datas.reduceByKey(lambda x,y : x+y)
show_nfirst_datas(count_countries)

On peut récupérer l’ensemble des résultats grâce à la méthode collect vue précédemment.

## Aggregate results and display
results = count_countries.collect()
for r in results:
    print("{} : {} days of data".format(*r))

On trie par compteur et on note le min et le max :

## Find min and max
results_sorted = sorted(results,key=lambda x : x[1])
print(f"{results_sorted[0][0]} ({results_sorted[0][1]}) - {results_sorted[-1][0]} ({results_sorted[-1][1]})")

Ce qui donne seulement 2 données pour le pays « Bonaire » et 431 données pour le regroupement « World ».

Petit a parte sur le tri :

Il est possible de trier les résultats pour qu’ils apparaissent dans l’ordre croissant. Cependant, il est déconseillé de le faire après le collect car, en général, le nombre de résultats est très grand. On préfèrera utiliser la méthode sortByKey de spark qui est distribuée (il faudra au préalable inverser la clé et la valeur grâce à map car pour l’instant la clé est le pays !).

Astuce : la fonction « sort » trie une liste et renvoie None tandis que « sorted » trie une liste et renvoie le résultat, donc afin de réduire les lignes de code on peut écrire par exemple « for r in sorted(results) »

Créer de nouvelles lignes avec flatMap

Pour finir, on va découvrir la fonction flatMap qui permet de créer de nouvelles lignes de données.

Si on l’utilise généralement en traitement du langage naturel pour découper une ligne de phrase en plusieurs lignes de mots, ici on va s’en servir pour écrire autant de lignes qu’il y a de vaccinés.

# Create one row for each vaccination
show_nfirst_datas(filtered_data)

def split_datas(data):
    return [(data[0],data[1])]*int(data[2])
splitted = filtered_data.flatMap(split_datas)
show_nfirst_datas(splitted)

Conclusion

Dans cette partie nous avons vu de nombreuses fonctions de spark afin de résoudre un problème concret. Toutes les fonctions n’ont pas été présentées afin de limiter les informations, car dorénavant nous allons utiliser les DataFrame de spark !

Crédit de l’image de couverture : AbelEscobar via Pixabay – Pixabay License