Il existe plusieurs systèmes de calculs distribués permettant de traiter une volumétrie importante de données: Big Data, le tout en temps réel. L’une des solutions les plus connus c’est Spark.
Apache Spark est un framework open source totalement écrit en Scala, permettant le traitement de données volumineuses à travers des calculs distribués en mémoire.
L’écosystème de Spark se compose de Spark Core qui est le projet principal effectuant les divers calculs distribués et puis on retrouve un ensemble de modules ajoutant des nouvelles fonctionnalités :
Spark excelle dans l’analyse en temps réel grâce au module Spark Streaming.
Spark Streaming permet ainsi d’analyser des flux de données en temps réel. Inversement à Apache Storm qui gère le flux de données d’un coup (one at a time), Spark Streaming découpe le flux de données en plusieurs petits batches à intervalle de temps régulier avant de les traiter. Il faut noter que avec Storm Trident, il possible aujourd’hui de faire du micro-batch sur un flux de données (stream) en s’assurant d’avoir une source fiable alors que Spark Streaming gère très bien le cas d’une source de données avec des échecs.
Spark Streaming se base sur une abstraction appelé DStream pour Discritized Stream. DStream représente un batche de RDD (Resilient Distributed Datasets).
RDD est l’abstraction de collection utilisé par Spark pour le traitement des données et sur laquelle on réalise les diverses opérations de calcul que ce soit des transformations tel que :
Ou des actions tel que :
Spark Streaming s’intègre avec plusieurs sources de données tel que :
Spark Streaming propose aussi une intégration pour les autres data sources qui ne bénéficient pas d’une intégration prédéfinie. Il suffit d’implémenter la classe Receiver.
Apache Spark supporte plusieurs langages: Scala, Java, Python et R.
Pour les exemples suivants, le langage utilisé est Scala.
Scala est un langage de programmation multi-paradigme qui s’intègre avec les paradigmes de programmation objet et de programmation fonctionnelle. Il se caractérise par un typage statique, l’inférence de type ou les conversions implicites qui rendent l’utilisation d’Apache Spark plus simple.
L’objectif de cet exemple est de calculer le nombre d’occurrence de mots en temps réel.
La première étape est d’initialiser le contexte Spark Streaming:
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf() .setAppName("myFirstApp") .setMaster(local[*]) val ssc = new StreamingContext(conf, Seconds(2))
On crée le contexte Spark en précisant le nom de notre application ainsi que le cluster manager. Dans cet exemple, le mode de lancement de Spark est en standalone. C’est la solution la plus simple qui se base sur Akka pour les échanges et sur Zookeeper pour la haute disponibilité du nœud master. L’exécuteur a été initialisé avec [*] ce qui veut dire autant de thread que possible sachant qu’il faudra toujours avoir au moins deux threads : un pour la lecture des données et un autre pour le traitement des données.
Spark peut faire appel à d’autres gestionnaires de cluster tel que Mesos ou YARN.
Après l’initialisation du contexte Spark, on initialise le spark streaming context à partir de ce dernier et on précise la période de streaming qui est de 2 secondes pour cet exemple.
L’étape suivante est de définir le flux de données à utiliser. Il existe plusieurs possibilités de flux de données. Ici, on va présenter quelques types de flux pour notre exemple simple : le calcul du nombre d’occurrence de chaque mot d’un texte.
Les sockets sont des connecteurs réseau qui permettent d’utiliser le protocole TCP ou UDP.
Ici, ça sera plutôt le protocole TCP (Transmission Control Protocol). L’idée est de récupérer le flux de données en temps réel. Les données récupérées sont sous la forme d’un texte.
L’idée ici est de calculer le nombre d’occurrence de chacun des mots toutes les deux secondes.
val lines = ssc.socketTextStream(host, port, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination()
Après avoir décrit les actions à faire pour le flux, on démarre le streaming en appelant l’action ssc.start(). Ensuite, le traitement est lancé sur des threads différents. On peut arrêter le thread principal soit en précisant un événement particulier ou en attendant la fin de l’exécution ssc.awaitTermination().
Kafka est système de messagerie distribué. Il a pour but de conserver les messages de façon temporaire (comme pour un système de Queuing) permettant de garantir, lors de forts pics de charge, l’intégrité du message. Je vous conseille de lire l’article dessus qui est inclus dans ce numéro pour avoir plus d’informations.
// Create direct kafka stream with brokers and topics val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) // Get the lines, split them into words, count the words and print val lines = messages.map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print() // Start the computation ssc.start() ssc.awaitTermination()
Pour écouter les messages, il faut tout d’abord configurer kafka : spécifier les brokers et la liste des topics. Ensuite, pour chaque message, on récupère les mots et on calcule le nombre d’occurrence de chaque mot.
Custom receiver
Si on souhaite récupérer des données dont il n’existe pas d’intégration directe dans Spark Streaming, on peut créer notre propre Receiver.
object CustomReceiver { val sparkConf = new SparkConf().setAppName("CustomReceiver") val ssc = new StreamingContext(sparkConf, Seconds(2)) val lines = ssc.receiverStream(new CustomReceiver(myHostname, port)) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() }
L’initialisation est la même que précédemment sauf que l’on fait appel à la méthode receiverStream de Spark Streaming qui prend en paramètre le Custom receiver. Le Custom receiver nous renvoie comme résultat des lignes de texte sur lesquelles on applique les mêmes transformations que précédemment pour obtenir le nombre d’occurrence des mots.
Si on regarde de plus près le Custom Receiver:
class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { def onStart() { new Thread("Socket Receiver") { override def run() { receive() } }.start() } def onStop() { } }
La classe Custom Receiver étend l’interface Receiver fournit par Spark Streaming et qui contient les deux méthodes : start() et stop().
private def receive() { var socket: Socket = null var userInput: String = null try { socket = new Socket(host, port) val reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) userInput = reader.readLine() } reader.close() socket.close() logInfo("Stopped receiving") restart("Trying to connect again") } catch { case e: java.net.ConnectException => restart("Error connecting to " + host + ":" + port, e) case t: Throwable => restart("Error receiving data", t) } }
Dans la méthode start(), on précise la source de données qui sera utilisé. Ici, dans la méthode receive(), on lit les données d’un InputStream et on sauvegarde chacune des lignes lues pendant la durée du batche qui est de 2 secondes.
Les lignes récupérés via le Custom Receiver seront traitées et un nouveau batche va aussitôt se lancer pour récupérer des nouvelles lignes et les analyser et ainsi de suite.
En plus la lecture de flux de données en temps réel, Spark Streaming peut aussi récupérer un flux de données et le transformer pour le stocker dans une base de données. Dans ce cas, on initialise en plus un Spark SQL context en se basant sur Spark Context et on crée un Spark SQL Dataframe.
val lines = ssc.socketTextStream(hostname, port, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) // Convert RDDs of the words DStream to DataFrame and run SQL query words.foreachRDD((rdd: RDD[String], time: Time) => { // Get the singleton instance of SQLContext val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ // Convert RDD[String] to RDD[case class] to DataFrame val wordsDataFrame = rdd.map(w => Record(w)).toDF() // Register as table wordsDataFrame.registerTempTable("words") // Do word count on table using SQL and print it val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") println(s"========= $time =========") wordCountsDataFrame.show() }) ssc.start() ssc.awaitTermination()
On transforme ainsi les données en Dataframe et ensuite on effectue le calcul du nombre d’occurrence en faisant appel à une simple requête SQL.
Attention à un détail, le Spark SQL context doit être un singleton :
/** Lazily instantiated singleton instance of SQLContext */ object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } }
Le cas d’usage peut être mis en place dans le cas ou on a besoin de récupérer des informations et les sauvegarder pour servir dans des traitements ultérieurs.
Apache Flume est un système distribué qui permet d’aggréger un grand volume de données en temps réels. Les données peuvent être des logs ou des événements.
On change d’exemple précédent qui était le calcul du nombre d’occurrence d’un mot pour calculer le nombre d’événement reçus. Les étapes initiales sont les mêmes: on initialise un context spark et spark streaming comme précédemment en précisant l’intervalle de batche.
// Create the context and set the batch size val sparkConf = new SparkConf().setAppName("FlumeEventCount") val ssc = new StreamingContext(sparkConf, Milliseconds(2000)) // Create a flume stream val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() ssc.start() ssc.awaitTermination()
Ensuite, on fait appel à FlumeUtils pour créer le flux de données et réaliser le traitement souhaité : le calcul du nombre d’événements reçus.
Spark Streaming est utilisé dans différents cas:
Spark Streaming est l’une des meilleures solutions actuelles pour le traitement de flux de données en temps réel et en continu. La prise en main est très facile car Spark Streaming propose déjà l’intégration de plusieurs types de sources de données.
En continuant à utiliser le site, vous acceptez l’utilisation des cookies. Plus d’informations
Les paramètres des cookies sur ce site sont définis sur « accepter les cookies » pour vous offrir la meilleure expérience de navigation possible. Si vous continuez à utiliser ce site sans changer vos paramètres de cookies ou si vous cliquez sur "Accepter" ci-dessous, vous consentez à cela.