Audience Solutions Powered by your Customers

Datacratic and iPerceptions join forces

MapReduce avec parallel, cat et une redirection

Un projet sur lequel notre équipe d'apprentissage machine travaille actuellement est un engin de recommandation qui sert à générer des courriels personnalisés pour les clients d'un magasin en ligne. Le modèle que nous avons développé utilise l'historique de navigation et d'achats des utilisateurs du site. Chaque utilisateur est représenté par la combinaison de l'ensemble des produits avec lesquels il a interagi ainsi que leur relation avec chacun des produits que nous pouvons lui recommander.

Vers la fin du projet, un problème auquel nous avons fait face était de rendre possible l'exécution de notre modèle sur des centaines de milliers d'utilisateurs dans un temps très court. Nous aurions pu passer du temps à accélérer le modèle lui-même. Il y avait par contre un gain beaucoup plus grand à réaliser en changeant simplement la manière dont nous traitions chaque utilisateur.

Méthode séquentielle

À l'origine, chaque utilisateur était traité de manière séquentielle, c'est-à-dire que nous passions simplement à travers la liste des utilisateurs un après l'autre. En supposant une méthode score() prenant un identifiant d'utilisateur uid en paramètre et retournant la liste des scores pour l'utilisateur, chaque ligne de notre fichier de sortie a la forme suivante:

uid,s1,s2,s3,...,sN

L'algorithme séquentiel en Python ressemble à ceci:

with open("scores.csv", "w") as writer:
    for uid in uids:
        writer.write(score(uid)+"\n")

Après l'exécution, on se retrouvera donc avec un fichier scores.csv qui contiendra chaque utilisateur sur une ligne, suivi de tous ses scores associés.

Méthode MapReduce

Comment accélérer la génération des scores? Puisque les scores pour chaque utilisateur sont indépendants de ceux des autres, une manière simple est de générer les scores en parallèle selon le patron MapReduce et en utilisant de simples outils UNIX. La machine sur laquelle notre modèle est exécuté possédant 32 coeurs, nous pouvions tout exécuter en parallèle localement et potentiellement diviser le temps de calcul par 32 si chacun des coeurs générait les scores pour 1/32 des utilisateurs.

Rappelons que MapReduce est un patron qui comprend deux étapes: l'étape map consite à séparer un grand problème en n plus petits problèmes et de les traiter en parallèle, et l'étape reduce consiste à recombiner les résultats de l'étape map en un seul résultat.

Pour exécuter cette tâche en MapReduce, en plus du script générant les scores (appelons le score.py), nous aurons besoin d'un deuxième script qui aura comme tâche de démarrer plusieurs instances de score.py en leur disant quelle partie du travail effectuer.

Commençons par modifier score.py pour qu'il implémente la fonctionnalité map. Défnissons deux variables, total_nodes et node_num, respectivement le nombre total de noeuds de travail ainsi que le noeud actuel.

for i, uid in enumerate(uids):
    if Math.abs(i - node_num) % total_nodes != 0:
        continue
    print score(uid)

Nous avons fait deux petites modifications comparativement à la version séquentielle:

  1. Au lieu d'écrire les scores dans un fichier, le script les retourne sur la sortie standard.
  2. Une condition dans la boucle permet de sauter tous les utilisateurs pour lesquels nous ne sommes pas le noeud responsable.

Comme dit précédemment, le deuxième script est le contrôleur et est responsable de démarrer les noeuds de travail (étape map) ainsi que de combiner leur résultat (étape reduce). On peut exécuter le premier script en appelant simplement score.py et en passant en argument les variables total_nodes et node_num.

Pour exécuter les tâches en parallèle, nous utilisons GNU Parallel, dont nous avons déjà parlé dans un article précédent. Son utilisation se résume à spécifier une commande à exécuter où un argument sera substitué à partir d'une liste. Par exemple la commande suivante compressera en parallèle avec 12 coeurs tous les fichiers commençant par a_compresser.

parallel -j12 "gz {}" ::: a_compresser*

Pendant l'exécution de parallel, chacun des scripts score.py écrira son propre fichier de résultats. Pour les combiner, nous n'avons qu'à exécuter cat sur l'ensemble des fichiers et de faire une redirection dans un autre fichier, ce qui combinera tous les fichiers dans un. Cette logique en bash ressemble à ceci:

C=32
parallel -j$C "score.py $C {} > scores-{}.csv" ::: `seq 0 $[C-1]`
cat scores-*.csv > scores-combined.csv

 

Adopter cette stratégie nous a donc permis de dramatiquement accélérer la génération des scores en utilisant pleinement les capacités de notre machine. La réduction en temps était linéaire puisque le goulot d'étranglement était le processeur. L'exécution parallèle prenait environ 40 minutes avec 32 coeurs, au lieu d'environ 21 heures de manière séquentielle. Cette accélération a pu être faite avec de simples outils UNIX tout en ne faisant presque aucune modification de code. Notons qu'il aurait aussi été possible d'utiliser le module multiprocessing de Python pour réaliser le MapReduce.

La leçon que nous en tirons est qu'il est facile de dépenser beaucoup de temps à faire des microoptimisations quand on peut simplement paralléliser certains problèmes très facilement, sans nécessairement avoir à utiliser des librairies plus lourdes comme Hadoop.

Category: 

Comments

Merci François, c'est en effet un article très pratique et détaillé. À propos du dernier paragraphe, je pense que la raison principale pour laquelle on utilise un framework comme Hadoop c'est pour les cas où la donnée est distribuée sur des machines partout, et puisque ce n'est pas forcement facile de la copier sur une seule machine, plutôt on distribue la processus de l'étape map sur toutes les machines qui contiennent les pièces de la donnée. Mais, si on a toute la donnée sur une seule machine Hadoop n'est pas sans doute nécessaire, comme tu as mentionné.

Submitted by Pouria Fewzee on

Add new comment