Tag :Apache Spark

Soirée Data – Cassandra, Spark, Scalabilité le mardi 19 janvier

L’atelier sur Go n’est pas encore passé que nous vous proposons déjà la prochaine soirée, spéciale data ! On a fait les choses en grands vu qu’on aura 2 intervenantes et une salle de 400 places, on peut donc accueillir tout le monde, donc parlez en autour de vous :  Lire la suite

Getting started with Spark in practice

Some months ago, we, Sam Bessalah and I organized a workshop via Duchess France to introduce Apache Spark and its ecosystem.

This post aims to quickly recap basics about the Apache Spark framework and it describes exercises provided in this workshop (see the Exercises part) to get started with Spark (1.4), Spark streaming and dataFrame in practice.

If you want to start with Spark and come of its components, exercises of the workshop are available both in Java and Scala  on this github account. You just have to clone the project and go! If you need help, take a look at the solution branch.

spark workshop streaming dataframe

With MapReduce/Hadoop frameworks the way to reuse data between computations is to write it to an external stable storage system like HDFS. And it’s not very effective when you iterate because it suffers from I/O overhead.

If you want to reuse data, in particular, if you want to use Machine Learning algorithms you need to find a more efficient solution. This was one of the motivation behind the creation of the Spark framework: to develop a framework that works well with data reuse.

Other goals of Apache Spark were to design a programming model that supports more than MapReduce patterns, and to maintain its automatic fault tolerance.

In a nutshell Apache Spark is a large-scale in-memory data processing framework, just like Hadoop, but faster and more flexible.

Furthermore Spark 1.4.0 includes standard components: Spark streaming, Spark SQL & DataFrame, GraphX and MLlib (Machine Learning libraries). And these frameworks can be combined seamlessly in the same application.

Here is an example on how to use Spark and MLlib on data coming from an accelerometer.

 

RDD

Spark has one main abstraction: Resilient Distributed Datasets or RDDs.

An RDD is an immutable collection partitioned across the nodes of the cluster which can be operated on in parallel.

You can control the persistence of an RDD:

  • RDDs can be stored in memory between queries, if you need to reuse it (improve the performances in this way)
  • you can also persist an RDD on disk

RDDs support two types of operations (transformations and actions):

  • a transformation creates another RDD and it is lazy operation (for example: map, flatmap, filter, groupBy…)
  • an action returns a value after running a computation (for example: count, first, take, collect…)

You can chain operations together, but keep in mind that the computation only runs when you call an action.

 

Operation on RDDs: a global view

Here is a general diagram to understand the data flow.

spark workshop streaming dataframe

  1. To the left, the input data comes from an external storage. The data are loaded into Spark and an RDD is created.
  2. The big orange box represents an RDD with its partitions (small orange boxes). You can chain transformations on RDDs. As the transformations are lazy, the partitions will be sent across the nodes of the cluster when you will call an action on the RDD.
  3. Once a partition is located on a node, you can continue to operate on it.

N.B: all operations made on RDDs are registered in a DAG (direct acyclic graph): this is the lineage principle. In this way if a partition is lost, Spark can rebuilt automatically the partition thanks to this DAG.

 

An example: the wordcount

Here is the wordcount example: it is the “hello world” for MapReduce.

The goal is to count how many times each word appears in a file, using the mapreduce pattern.

https://gist.github.com/nivdul/0b84c5184ae42278b02f#file-wordcount

First the mapper step: we attribute 1 to each word using the transformation map.

Finally the reducer step: here the key is a word, and reduceBykey, which is an action, return the total for each word.

https://gist.github.com/nivdul/0b84c5184ae42278b02f

Exercises

In this workshop the exercises are focused on using the Spark core and Spark Streaming APIs, and also the dataFrame on data processing. The workshop is available in Java (1.8) and Scala (2.10). And to help you to implement each class, unit tests are in and there are a lot of comments.

scala_logo

java_logo

Prerequisites

In order to get the exercises below, you need to have Java 8 installed (better to use the lambda expression). Spark 1.4.0 uses Scala 2.10 so you will need to use a compatible Scala version (2.10.x). Here we use 2.10.4.
As build manager, this hands-on uses maven for the Java part and sbt for the Scala one. As unit tests library, we use jUnit (Java) and scalatest (Scala).

All exercises runs in local mode as a standalone program.

To work on the hands-on, retrieve the code via the following command line:
#Scala
$ git clone https://github.com/nivdul/spark-in-practice-scala.git

#Java
$ git clone https://github.com/nivdul/spark-in-practice.git

Then you can import the project in IntelliJ or Eclipse (add the SBT and Scala plugins for Scala), or use sublime text for example.

If you want to use the spark-shell (only scala/python), you need to download the binary Spark distribution spark download.

# Go to the Spark directory
$ cd /spark-1.4.0

# First build the project
$ build/mvn -DskipTests clean package

# Launch the spark-shell
$ ./bin/spark-shell
scala >

 

Part 1: Spark core API

To be more familiar with the Spark API, you will start by implementing the wordcount example (Ex0).

After that you will use reduced tweets as the data along a json format for data mining (Ex1-Ex3). It will give you a good insight of the basic functions provided by the Spark API.

https://gist.github.com/nivdul/b41f54f02b983bc0bf05#file-reduced_tweets-json

You will have to:

  • Find all the tweets by user
  • Find how many tweets each user has
  • Find all the persons mentioned on tweets
  • Count how many times each person is mentioned
  • Find the 10 most mentioned persons
  • Find all the hashtags mentioned on a tweet
  • Count how many times each hashtag is mentioned
  • Find the 10 most popular Hashtags

The last exercise (Ex4) is a way more complicated: the goal is to build an inverted index knowing that an inverted is the data structure used to build search engines.

Assuming #spark is a hashtag that appears in tweet1, tweet3, tweet39, the inverted index will be a Map that contains a (key, value) pair as (#spark, List(tweet1,tweet3, tweet39)).

Part 2: streaming analytics with Spark Streaming

Spark Streaming is a component of Spark to process live data streams in a scalable, high-throughput and fault-tolerant way.

 spark workshop streaming dataframe

In fact Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.

The abstraction, which represents a continuous stream of data is the DStream (discretized stream).

In the workshop, Spark Streaming is used to process a live stream of Tweets using twitter4j, a library for the Twitter API.

To be able to read the firehose, you will need first to create a Twitter application at http://apps.twitter.com, get your credentials, and add it in the StreamUtils class.

In this exercise you will:

  • Print the status text of the some of the tweets
  • Find the 10 most popular Hashtag over a 1 minute window

 

Part 3: structured data with the DataFrame

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.
In fact, the Spark SQL/dataframe component provides a SQL like interface. So you can apply SQL like queries directly on the RDDs.

DataFrames can be constructed from different sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

spark workshop streaming dataframe

In the exercise you will:

  • Print the dataframe
  • Print the schema of the dataframe
  • Find people who are located in Paris
  • Find the user who tweets the more

 

Conclusion

If you find better way/implementation, do not hesitate to send a pull request or open an issue on github.

Here are some useful links around Spark and its ecosystem:

Cassandra et Spark, amis pour la vie…

DuyHai DoanGerald Quintana

Duy Hai Doan et Gerald Quintana seront lundi à Lyon pour une session autour des projets Apache Spark et Cassandra. Ils nous en disent plus sur l’utilisation conjointe de ces deux outils, alternative technique intéressante par rapport à l’éco-système classique Hadoop.

Cette interview a été préparée par Agnès CREPET et Alexis HASSLER de la team du Lyon JUG.

Agnès et Alexis: Gerald et Duy Hai, pouvez-vous vous présenter ?

Duy Hai: Je m’appelle Duy Hai et je suis évangéliste technique pour Datastax, l’entreprise commerciale derrière Apache Cassandra.

Je partage mon temps entre donner des présentations/meetups/talks sur Cassandra, développer sur des projets open-source pour la communauté et aider les projets utilisant Cassandra. Avant Datastax, j’ai été développeur freelance Java/Cassandra.

Gérald: Je suis un développeur Java depuis longtemps. Je m’intéresse à la persistance et au traitement de la donnée, aussi bien en SQL qu’en NoSQL.

Agnès et Alexis: Vous allez nous parler d’Apache Spark et de Cassandra. Pouvez-vous expliquer ce que font ces outils ? Quels sont les cas d’utilisation de ces outils ?

Gérald: Cassandra est une base de données distribuée: ses points forts sont la scalabilité et la tolérance aux pannes. Spark permet de traiter de la données de manière distribuée à la fois en batch et au fil de l’eau.

Duy Hai: Pour résumer, Apache Spark est un framework de traitement distribué des données qui se proposent d’accélérer les calculs en stockant les données intermédiaires en mémoire. Apache Cassandra est une base de données NoSQL de type table distribuée, qui privilégie la haute disponibilité et la résilience aux pannes au détriment de la cohérence forte des données.

Apache Spark est adapté aux cas où vous avez beaucoup de données à traiter en parallèle, et que vous arrivez à les traiter de manière incrémentale par petit batchs qui tiennent en mémoire. En effet, si vos données dépassent la quantité de mémoire disponible, Spark écrit temporairement sur disque, ce qui ralentit énormément le temps de traitement.

Apache Cassandra est adapté aux cas suivants: besoin de très haute disponibilité, de scalabilité linéaire, de déploiement en multi-sites, et de simplicité opérationnelle

Agnès et Alexis: En quoi le fait d’utiliser ces deux outils ensemble vous semble intéressant?

Duy Hai: Cassandra permet d’avoir une base de données distribuées mais propose peu d’outils pour faire des analyses de données, domaine où Spark excelle. Ces 2 solutions fonctionnent déjà de manière distribuée, les combiner ensemble permet d’en tirer le meilleur des 2 mondes, la haute disponibilité et la résilience face aux pannes de Cassandra, la richesse des outils d’analyses de Spark

Gérald: Spark apporte à Cassandra un outil pour traiter les gros volumes de données (transformations de tables façon ETL, machine learning…) et des possibilités de requêtage supplémentaires: jointures, agrégations… Cassandra offre à Spark la possibilité de manipuler des données structurées sans forcément passer par des fichiers (HDFS), d’autant plus que le modèle orienté colonne de Cassandra est très proche du modèle de Spark SQL.

Agnès et Alexis: Dans quelles situations Cassandra sera plus adapté que d’autres bases de données ?

Duy Hai: Cassandra excelle particulièrement pour les séries de données temporelles et les données immuables. De par sa conception, le moteur de stockage optimise l’écriture sur disque pour un accès en lecture séquentielle des données.

Gérald: Cassandra est particulièrement indiqué lorsque les données ne tiennent plus sur un seul serveur et lorsque les systèmes traditionnels à base de master-slave n’arrivent plus à encaisser la charge, en particulier en écriture.

Agnès et Alexis: Même question pour Spark. Dans quelles situations sera-t-il plus adapté que Hadoop (ou d’autres outils similaires) ?

Duy Hai: De par sa conception très généraliste, il n’y a pas de job Hadoop qui ne puisse pas être écrit en Spark. Et Spark propose plus que du Map/Reduce. Que vous ayez déjà une installation Hadoop ou que vous voulez vous lancer sur un projet Big Data, Spark a suffisamment de modules et d’extension pour convenir à la plupart de vos besoins. Le point fort de Spark, c’est de réunir dans une architecture cohérence et extensible différents types de traitement de données (streaming, batch, SQL, …)

Gérald: Il apporte une API plus simple qu’Hadoop M/R (même si c’est du Scala :troll: ), l’outillage est plus intégré (Spark, Spark SQL, Spark Streaming en seul paquet) et les performances meilleures (encore qu’avec Tez…). Pour débuter dans le monde des traitements « big data », la première marche est plus accessible.

Agnès et Alexis: Utiliser l’écosystème Hadoop vous parait-il plus compliqué aujourd’hui ?

Duy Hai: C’est un doux euphémisme que de dire que l’écosystème Hadoop est compliqué. On oublie souvent que Hadoop a déjà 10 ans d’âge. A l’époque, l’écosystème est constitué de seulement 2 composants: HDFS (système de fichier distribué) et un gestionnaire de job (MRv1). Avec le temps s’est greffé un nombre invraisemblable de composants/frameworks hétéroclites: Pig, Hive, Cascading, Tez, Parquet, ZooKeeper, Impala …

Chacun de ces composants sont des technologies différentes, je pense notamment à Pig,Hive et Cascading,qui n’ont pas du tout la même philosophie. Leur seul point commun: produire du code Map/Reduce à la place de l’utilisateur. D’ailleurs le fait d’avoir besoin de passer par une couche d’abstraction pour « écrire » des jobs Map/Reduce est symptomatique de la complexité même d’Hadoop.

Côté opérationnel, bien que de gros efforts ont été faits pour simplifier l’administration de l’écosystème (Apache Ambari), débugger un job Hadoop reste compliquée aujourd’hui car il faut analyser les logs de toutes les couches (HDFS, Yarn, Pig/Hive/Cascading, ….)

Gérald: Distribuer un traitement et traiter de gros volumes de données de manière robuste est un problème complexe. Mais développer un batch enchaînant des jobs map/reduce pourrait être plus simple, c’est ce que prouvent des surcouches Hadoop comme Cascading.

Agnès et Alexis: Cassandra-Spark vous semble être le duo gagnant? Quels autres outils pourrait-on y ajouter pour avoir un killer combo ?

Duy Hai: Le couple Cassandra-Spark permet de tirer le meilleur de chaque solution mais n’est pas forcément la solution à tous les problèmes. Pour être complet, on rajoutera Apache Kafka dans l’écosystème Spark/Cassandra pour avoir un ESB hautement scalable et résilient

Gérald: Pour commencer, Ansible ou autre pour déployer facilement quelque chose sur un cluster. Ensuite Spark JobServer permet de piloter les batchs avec une API REST et OpsCenter pour monitorer Cassandra. Enfin, un notebook comme Spark Notebook ou Zeppelin pour exploiter les données de manière visuelle.

Agnès et Alexis: Faut-il apprendre scala pour faire du Spark, ou peut-on l’utiliser avec Java ? Avec Java, l’API permet-elle de faire du bon code ?

Duy Hai: Les concepteurs de Spark ont pensé aux développeurs lors de la conception du framework, vous n’êtes pas lié à un langage en particulier. Il est possible de travailler avec Spark en Scala, Java, Python. DataBricks, l’entreprise qui soutient Spark, est même en train d’introduire une version de Spark avec R, le langage préféré des data-scientists.
Tout dépend de ce qu’on appelle « bon code ». Il est possible avec Java de faire un code qui marche, bien testé et bien conçu. Néanmoins, il est assez évident qu’en terme de concision, même avec l’arrivée des lambdas dans Java 8, un code Spark en Scala reste toujours plus concis et légèrement plus lisible.

Gérald: Une connaissance minimale de Scala ou Python me semble nécessaire pour pouvoir utiliser les shells Spark (en attendant le REPL de Java 9), et ça complique un peu l’apprentissage de Spark, mais en réalité il n’y a pas besoin d’être expert Scala pour s’en sortir. Ensuite, le shell n’est utile qu’à des fins d’expérimentation ou d’exploration, pour développer de vrais traitements, Spark s’utilise très bien avec Java 8 (avec plein de lambdas). La plupart des extensions Spark (dont Cassandra fait partie) proposent une API Java.

Merci à Duy Hai et Gérald pour cette interview ! Inscrivez-vous à leur session au Lyon JUG Lyon le 15 juin!

Predict your activity using your Android, Cassandra and Spark

Lately, I started a new sport activity: running. When you run, you get really curious about the acceleration, the distance, the elevation and other metrics you can analyse when you practice this kind of sport. As a runner, I started using phone application (runkeeper) and recently I bought a Garmin Watch so I can get more information about my running sessions.
But how this kind of application analyse data and compute all this metrics?
Let’s focus on one metrics: proper acceleration.

What is proper acceleration?

Proper acceleration or physical acceleration is the acceleration it experiences relative to freefall and is the acceleration felt by people and objects. It is measured by an accelerometer.
The accelerometer data consist of successive measurements made over a time interval. that’s what we call a time series.

timeseries

How can I get an accelerometer?

Luckily, most of smartphones contain an accelerometer sensor.
The sensor measures 3 values related to 3 different axes as shown in the picture bellow:
accelerometerSchema

As an Android fan, I implemented an Android App: Basic Accelerometer which shows different axes values and the current date as timestamp.

Let’s create Basic Accelerometer Android App!

All source code is available on my Github repository here.
First step, I implemented the start activity:
https://gist.github.com/MiraLak/924090ad709098284d6c

After creating the starting menu, I have to collect the sensor values in a new activity: “AccelerometerActivity”.
To use the sensor, the activity class must implements SensorEventListener.
https://gist.github.com/MiraLak/cd7832d742d530598754

Now, I’m able to get information from the sensor and post them to an online REST service.
I used Retrofit, a REST client for Android and Java:
https://gist.github.com/MiraLak/5a86bb3586204b8b290f

After that, I added an asynchronous task to post sensor values at each sensor’s update:
https://gist.github.com/MiraLak/0242d2caa0df24bee28f

Now we’re able to launch our app!

How to install the app on your phone?

  • Download Android Studio
  • Clone the BasicAccelerometer project and open it on Android Studio
  • Activate developer mode on your Android phone (must have 4.0.3 version and above).
  • Plug your phone, run the app and choose your phone as a target.

The application will start automatically on your phone and you will see the screen below:

BasicAccelerometerScreen1

Now as the application is started, we will focus on the REST Service.

REST Service and Cassandra DB

The android app is ready to sent us real time data: time series of our acceleration.
As you may have noticed, I used acceleration bean on my Android app:
https://gist.github.com/MiraLak/63261abb04f17fc62e5f
The acceleration is posted to a REST service.
The REST API receiving accelorometers data and storing them into Cassandra. Each acceleration contains:

  • acceleration capture date as a timestamp (eg, 1428773040488)
  • acceleration force along the x axis (unit is m/s²)
  • acceleration force along the y axis (unit is m/s²)
  • acceleration force along the z axis (unit is m/s²)

Rest API sources are available on my Github here. All data are saved on Cassandra Data Base.

Apache Cassandra is an NoSQL database. When writing data to Cassandra, data is sorted and written sequentially to disk. When retrieving data by row key and then by range, you get a fast and efficient access pattern due to minimal disk seeks – time series data is an excellent fit for this type of pattern.

To start Cassandra data base:

  • dowload the archive Cassandra 2.1.4
  • open it
  • execute this command: sh /bin/cassandra

On the REST application, I used Spring Data Cassandra which uses DataStax java driver so I can easily interact with Cassandra DB to do different operations:  write, read, update or delete.
Spring Data Cassandra helps to configure Cassandra cluster and create my keyspace:
https://gist.github.com/MiraLak/14d835992cf2980ceebe

After configuration, I created my application model:

https://gist.github.com/MiraLak/1d2bf2d8e28bff7a3a98

We are trying to store historical data, so I used a compound key (user_id and timestamp) as they are unique:

https://gist.github.com/MiraLak/ac995c21ee05387cd893

Then, I added the REST controller.
The controller receives POST request with an acceleration and insert values on Cassandra DB.
https://gist.github.com/MiraLak/fc187ed3a25d7de5d1e6

The acceleration bean used in the controller is the same as defined for the Android app with an extra attribute: userID (I’ll explain the usage later).

After defining the REST controller and defining Cassandra configuration, we’re able to run the application:

https://gist.github.com/MiraLak/a97d9b9375660b932e50

Spring Boot starts a Jetty Server but you can use Tomcat Server instead. You’ll have to update project dependencies:
dependencies {
compile("org.springframework.boot:spring-boot-starter-web") {
exclude module: "spring-boot-starter-tomcat"
}
compile("org.springframework.boot:spring-boot-starter-jetty")
compile("org.springframework.boot:spring-boot-starter-actuator")
compile("org.springframework.data:spring-data-cassandra:1.2.0.RELEASE")
testCompile("junit:junit")
}

Let’s launch REST app!

We have already our Andoid. We run now the REST app. Then you have to add REST app URL to Android app:
http://myLocalIp:8080/accelerometer-api

As soon as you click on start button, Basic Accelerometer app begins to send acceleration data to the REST service:

basicAccelerometerScreen2

And then we start see insertion logs on REST app:

insertIntoCassandraLogs

And if we check Cassandra DB, we launch a CQL terminal to do some queries:
sh apache-cassandra-2.1.4/bin/cqlsh

Here’s how data looks like in Cassandra:
cassandraCqlTerminal

At this level, we collect data from the accelerometer and we store it in Cassandra Data Base.

How to analyse accelerometer data?

Remember, we aim to analyse our acceleration data. We must have some references to be able to create a decision tree model.

Luckily this model already exist and there is an interresting article explaining how to create a decision tree model based on acceleration data using data from Cassandra, Spark and MLib here.

Apache Spark is a fast and general engine for large-scale data processing.  MLlib is a standard component of Spark providing machine learning primitives on top of Spark which contains common algorithms , and also basic statistics and feature extraction functions.

The source code to do prediction with an exiting model is available on my Github here. [Update: this is the latest version with Scala]

We want now to guess just by analysing our acceleration if we are walking, jogging, standing up, sitting down, goind up or downstairs.
The decision tree model contains Resilient Distributed Dataset (RDD) of labeled points based on some features.

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel.

The feature include different values:
https://gist.github.com/MiraLak/1cc323a08e3880da4695

So to analyse the collected data from BasicAccelerator application, we have to compute features as defined in our decision tree model.
We init our Spark Context:
https://gist.github.com/MiraLak/74be7d0db0eb81099fb9

Then, we read data from Cassandra Data Base (UserID « TEST_USER » is hard coded in REST service application, you can update it or add it to Android App).

Spark-Cassandra Connector is a lightning-fast cluster computing with Spark and Cassandra. This library lets you expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables, and execute arbitrary CQL queries in your Spark applications.

The connector transforms data written into Cassandra into Spark’s RDDs:
https://gist.github.com/MiraLak/e9276bb60145245f359f

After creating our features and computed them into vectors, we can call the model and try to predict our activity.
You must use spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT or above to be able to save and load models:
https://gist.github.com/MiraLak/a130849a9fa7a1916d55

Last final step: prediction

Now we can launch our prediction to see if we can predict the activity based on acceleration:

  1. Launch the REST application
  2. Start the Andoid app with REST application URL
  3. Do an activity during 30seconds (Sitting, Standing Up, Walking, Jogging or Going up or down stairs) while holding the phone in one hand.
  4. Stop the Android app
  5. Launch the prediction activity:

https://gist.github.com/MiraLak/b213be868d61f29d227d

Then you will see the predicted activity as a result:

predictActivity

Conclusion

We’ve seen how to use a connected object (smartphone) to collect time series data and store it into Cassandra.
Then we used Spark Cassandra Connector to transform data into RDD. Then we analysed those RDD using a decision tree model created with Spark.
This is a just a light simple of the infinite possibilies we can have nowadays with conneted devices.

Analyze accelerometer data with Apache Spark and MLlib

The past months I grew some interest in Apache Spark, Machine Learning and Time Series, and I thought of playing around with it.

In this post I will explain how to predict user’s physical activity (like walking, jogging, sitting…) using Spark, the Spark-Cassandra connector and MLlib.

The entire code and data sets are available on my github account.

This post is inspired from the WISDM Lab’s study and data (not cleaned) come from here.

 

Spark-accelerometer

A FEW WORD ABOUT APACHE SPARK & CASSANDRA

Apache Spark started as a research project at the University of California, Berkeley in 2009 and it is an open source project written mostly in Scala. In a nutshell, Apache Spark is a fast and general engine for large-scale data processing.
Spark’s main property is in-memory processing, but you can also process data on disk and it can be fully integrated with Hadoop to process data from HDFS. Spark provides three main API, in  Java, Scala and Python. In this post I chose the Java API.
Spark offers an abstraction called resilient distributed datasets (RDDs),  which are  immutable and lazy data collections partitioned across the nodes of a cluster.

MLlib is a standard component of Spark providing machine learning primitives on top of Spark which contains common algorithms (regression, classification, recommendation, optimization, clustering..), and also basic statistics and feature extraction functions.

If you want to get a better look at Apache Spark and its ecosystem, just check out the web site Apache Spark and its documentation.

Finally the Spark-Cassandra connector lets you expose Cassandra tables as Spark RDDs, and persist  Spark RDDs into Cassandra tables, and execute arbitrary CQL queries within your Spark applications.

AN EXAMPLE: USER’S PHYSICAL ACTIVITY RECOGNITION

The availability of acceleration sensors creates exciting new opportunities for data mining and predictive analytics applications. In this post, I will consider data from accelerometers to perform activity recognition.

The data in my github account are already cleaned.
Data come from 37 different users. Each user has recorded the activity he was performing. That is why something the data are not relevant and need to be cleaned. Some rows are empty in the original file, and some other are misrecorded.

DATA DESCRIPTION

I have used labeled accelerometer data from users thanks to a device in their pocket during different activities (walking, sitting, jogging, ascending stairs, descending stairs, and standing).

The accelerometer measures acceleration in all three spatial dimensions as following:

  • Z-axis captures the forward movement of the leg
  • Y-axis captures the upward and downward movement of the leg
  • X-axis captures the horizontal movement of the leg

The plots below show characteristics for each activity. Because of the periodicity of such activities, a few seconds windows is sufficient to find specific characteristics for each activity.

walking_jogging_view

stairs_view

standing_sitting_view

We observe repeating waves and peaks for the following repetitive activities walking, jogging, ascending stairs and descending stairs. The activities Upstairs and Downstairs are very similar, and there is no periodic behavior for more static activities like standing or sitting, but different amplitudes.

 

DATA INTO CASSANDRA

I have pushed my data into Cassandra using the cql shell.

https://gist.github.com/nivdul/88d1dbb944f75c8bf612

Because I need to group my data by (user_id, activity) and then to sort them by timestamp, I decided to define the couple (user_id, activity) and timestamp, as a primary key.

Just below, an example of what my data looks like.

Capture d’écran 2015-04-15 à 20.25.19

And now how to retrive the data from Cassandra with the Spark-Cassandra connector:

 

https://gist.github.com/nivdul/b5a3654488886cd36dc5

 

PREPARE MY DATA

As you can imagine my data was not clean, and I needed to prepare them to extract my features from it. It is certainly the most time consuming part of the work, but also the more exciting for me.

My data is contained in a csv file,  and the data was acquired on different sequential days . So I needed to define the different recording intervals for each user and each activity. Thanks to these intervals, I have extracted windows on which I have computed my features.

Here is a diagram to explain what I did and the code.

 

IMG_7954

 

First retrieve the data for each (user, activity) and sorted by timestamp.

 

https://gist.github.com/nivdul/6424b9b21745d8718036

 

Then search for the jumps between the records in order to define my recording intervals and the number of windows per intervals.

https://gist.github.com/nivdul/84b324f883dc86991332

DETERMINE AND COMPUTE FEATURES FOR THE MODEL

Each of these activities demonstrate characteristics that we will use to define the features of the model. For example, the plot for walking shows a series of high peaks for the y-axis spaced out approximately 0.5 seconds intervals, while it is rather a 0.25 seconds interval for jogging. We also notice that the range of the y-axis acceleration for jogging is greater than for walking, and so on. This analysis step is essential and takes time (again) to determine the best features to use for our model.

After several tests with different features combination, the ones that I have chosen are described below (it is basic statistics):

  • Average acceleration (for each axis)
  • Variance (for each axis)
  • Average absolute difference (for each axis)
  • Average resultant acceleration (1/n * sum [√(x² + y² + z²)])
  • Average time between peaks (max) (Y-axis)

 

FEATURES COMPUTATION USING SPARK AND MLLIB

Now let’s compute the features to build the predictive model!

AVERAGE ACCELERATION AND VARIANCE

https://gist.github.com/nivdul/0ff01e13ba05135df09d

AVERAGE ABSOLUTE DIFFERENCE

https://gist.github.com/nivdul/1ee82f923991fea93bc6

AVERAGE RESULTANT ACCELERATION

https://gist.github.com/nivdul/666310c767cb6ef97503

AVERAGE TIME BETWEEN PEAKS

https://gist.github.com/nivdul/77225c0efee45a860d30

THE MODEL: DECISION TREES

Just to recap, we want to determine the user’s activity from data where the possible activities are: walking, jogging, sitting, standing, downstairs and upstairs. So it is a classification problem.

Here I have chosen the implementation of  the Decision Trees algorithms using MLlib, to create my model and then to predict the activity performing by users.

You could also use others algorithms such as Random Forest or Multinomial Logistic Regression (from Spark 1.3) available in MLlib.
Remark: with the chosen features, prediction for « up » and « down » activities are pretty bad. One trick would be to define more relevant features to have a better prediction model.

Below is the code that shows how to load our dataset, split it into training and testing datasets.

 

https://gist.github.com/nivdul/246dbe803a2345b7bf5b

 

Let’s use DecisionTree.trainClassifier to fit our model. After that the model is evaluated against the test dataset and an error is calculated to measure the algorithm accuracy.

https://gist.github.com/nivdul/f380586bfefc39b05f0c

RESULTS

classes number mean error (Random Forest) mean error (Decision Tree)
4 (4902 samples) 1,3% 1,5%
6 (6100 samples)  13,4% 13,2%

 

CONCLUSION

In this post we have first demonstrated how to use Apache Spark and Mllib to predict user’s physical activity.

The features extraction step is pretty long, because you need to test and experiment to find the best features as possible. We also have to prepare the data and the data processing is long too, but exciting.

If you find a better way/implementation to prepare the data or compute the features, do not hesitate to send a pull request or open an issue on github.

En continuant à utiliser le site, vous acceptez l’utilisation des cookies. Plus d’informations

Les paramètres des cookies sur ce site sont définis sur « accepter les cookies » pour vous offrir la meilleure expérience de navigation possible. Si vous continuez à utiliser ce site sans changer vos paramètres de cookies ou si vous cliquez sur "Accepter" ci-dessous, vous consentez à cela.

Fermer