神刀安全网

Calculating Pi with Apache Spark

17/04/16 byDaniel Pape

No Comments

Apache Spark is a system for cluster computing and part of the increasingly popularSMACK stack. The aim of this blog post is to provide a beginners introduction on how to set up a mini Spark cluster of virtual machines (VMs) using Vagrant and to run a small example application on it that approximates Calculating Pi with Apache Spark .

The cluster

To set up the Vagrant cluster on your local machine you need to first install Oracle VirtualBox on your system. After this it suffices to clone the Git repository from here to a working directory of your choice.

Once in the working directory, we can spin up the cluster using the console command vagrant up . The cluster is deployed in standalone mode and will consist of a designated master node named sparkmaster and a configurable number of worker nodes. The nodes are assigned consecutive static IP addresses and the workers are accessible via password-less SSH from the master node.

The following table summarizes the hostnames and IP addresses of the nodes and includes for later reference. It also includes the URLs to the web UIs provided by Spark on the nodes once the cluster is running:

Nodename IP address Web UI
sparkmaster 192.168.33.100 http://192.168.33.100:8080
sparkworker-01 192.168.33.101 http://192.168.33.101:8081
sparkworker-02 192.168.33.102 http://192.168.33.102:8082
etc. etc. etc.

After the cluster is up, we can use the command vagrant ssh <nodename> to connect to the node with name nodename . For example, let us connect to the master node via vagrant ssh sparkmaster and have a look at its Spark installation directory:

vagrant@sparkmaster:~$ ls -F $SPARK_HOME CHANGES.txt  NOTICE  README.md bin/   data/  examples/  licenses/  python/ LICENSE      R/      RELEASE conf/  ec2/   lib/  logs/     sbin/

Spark comes with a couple of important directories containing executables and configuration files:

  • First of all, the directory SPARK_HOME/bin contains the spark-shell script for running Spark’s REPL (read-evaluate-print-loop), which allows interactive data exploration. But our main character here is the spark-submit script: it can be used to submit Spark applications as a JAR to the cluster.
  • Next, SPARK_HOME/conf contains the configuration files slaves and spark-env.sh . The first lists the hostnames of all VMs to be used as slaves while the second lists options used by Spark.
  • Finally, the directory SPARK_HOME/sbin will be important as it contains the shell scripts for starting and stopping the master as well as worker instances on the designated machines, either individually or in one go via the  start-all.sh and stop-all.sh scripts.

We will start the master on the VM named sparkmaster while all the other VMs will be used as slaves. This can be achieved by running the start-all.sh script on sparkmaster:

vagrant@sparkmaster:~$ $SPARK_HOME/sbin/start-all.sh

We might check that (hopefully) everything went smoothly by inspecting the log files in our cluster from the corresponding SPARK_HOME/logs directory on each individual machine. As said, the master and slave instances can be stopped by running the stop-all.sh script on sparkmaster.

Inspecting the web UI

More information is available from the Spark’s master Web UI:

Calculating Pi with Apache Spark

Here we find the following information:

  • A list of all workers in the cluster under the section heading Workers.
  • Information on Running Applications and Completed Applications.

The UI is reachable as long as we do not deliberately stop the master by invoking one of the scripts for stopping it.

Submitting an application to the cluster

To actually submit an application to our cluster we make usage of the SPARK_HOME/bin/spark-submit.sh script. To test this and also that our cluster is set up properly, we will use the example applications for computing an approximation to Calculating Pi with Apache Spark via Monte Carlo that ships with the Spark installation (Code: GitHub ).

For convenience the shared vagrant folder contains a shell script for submitting the example application to the cluster:

spark-submit / --class de.codecentric.SparkPi / --master spark://192.168.33.100:7077  / --conf spark.eventLog.enabled=true / /vagrant/jars/spark-pi-example-1.0.jar 10

Besides a reference to the main class in the JAR and the path to the latter, we pass the IP address and port for the the Spark master instance and enable event logging. The latter will allow us to look at specific information in the web UI even after the application has finished. The argument 10 determines the size of the random sample used and also the degree of parallelism; see below.

If we invoke this script we get the result of the computation printed to the console. Also note the corresponding finished application after switching to the master Web UI in our browser:

vagrant@sparkmaster:~$ /vagrant/scripts/submit-script-pi.sh Pi is roughly 3.13918

How is Calculating Pi with Apache Spark approximated here?

This computation is based on the following heuristic: By definition Calculating Pi with Apache Spark is the area Calculating Pi with Apache Spark of a circle with radius Calculating Pi with Apache Spark (generally, Calculating Pi with Apache Spark is the area of a circle of radius Calculating Pi with Apache Spark ).

  • One then circumscribes this unit circle with a square whose area equals Calculating Pi with Apache Spark . The ratio of these two areas thus equals to Calculating Pi with Apache Spark and gives the geometric probability of a point inside the square to lie inside in the circle.
  • Now let us assume that we pick a huge number Calculating Pi with Apache Spark of points randomly inside the circumscribed square, for example, by throwing darts or dropping rain drops onto it. A certain number Calculating Pi with Apache Spark of these points will end up inside the area described by the circle while the remaining number Calculating Pi with Apache Spark of these points will lie outside of it (but inside the square). Thus Calculating Pi with Apache Spark and the probability of a point lying inside of the circle area is Calculating Pi with Apache Spark .
  • Heuristically, one has Calculating Pi with Apache Spark and hence Calculating Pi with Apache Spark .

It goes without saying, that this algorithm is non-deterministic and results will likely change with each run.

To wrap things up: The beauty of this is, it paves a way to approximate Calculating Pi with Apache Spark by simply counting the fraction of points that end up inside the circle out of a total population of points randomly thrown at the circumscribed square. Something that can be distributed in a trivial fashion. And this is exactly what the mentioned Spark application does! A interactive visualization of the above may be found here: Link.

Subsequently, we drill down on some of the basic concepts of Spark by looking into the code of SparkPi . This includes speaking about the concept of a RDD, Spark’s abstract data type for handling data distributed on a cluster.

Resilient Distributed Datasets (RDDs)

Within the Spark world the core abstraction is that of a R esilient D istributed D ataset. The rationale is that we want to create, distribute and process data within a cluster that is created from various input data, e.g. text files or plain Java/Scala collections. These input data are structured by Spark into RDDs of which one can basically think of as Java/Scala collections that are distributed over the cluster into partitions. Spark provides a functional-programming style API for Java/Scala that allows to either

  • create new RDDs from various input sources, like files residing in HDFS, etc.
  • create new RDDs from already existing ones by so-called transformations , or
  • to create final Java/Scala values from existing RDDs by so-called actions .

To make these distributed data sets resilient or fault-tolerant, Spark keeps track of the dependencies between the input data and the intermediate RDDs created from it through an RDDs dependency graph. In case of failure this graph allows to replay the parts of the computation that were necessary to create the RDD at hand. It is important to note that RDDs are computed in a lazy fashion: only creating a final Java/Scala value via an action triggers the actual execution of a computation. Since the dependency graph in Spark is an example of a directed acyclic graph (DAG) this name is used as a reference frequently, for example in the web UI.

Writing a simple Spark application

To illustrate the ideas outlined in the previous section, let us rewrite the application SparkPi step by step. We will follow the original source but allow ourselves to divert a little from it in order to stress where and how RDDs are created and transformed. To begin with, the basic skeleton for the main application looks as follows:

import scala.math.random import org.apache.spark._   object SparkApp {   def main(args: Array[String]): Unit = {     val conf = new SparkConf().setAppName("Spark Pi")     val sc = new SparkContext(conf)       // Application code goes here...       sc.stop()   } }

The main entry point to every Spark application is creating a SparkContext object. It provides a connection to the Spark cluster and context information about the cluster as well as the application itself and is used to create RDDs from input data. For example we are able to set the name of the application that will also appear in the Spark web UIs to be "Spark Pi" . Further parameters might be passed to the Spark context at runtime as has already happened in the above usage of the submit script; there the IP address of the master node is passed to the Spark context.

The main step in the application code is to create a huge number n of random sample points by using the parallelize method provided by the Spark context sc . It allows to create an initial RDD from any Scala collection. In our case this collection, xs , consists of the first n consecutive numbers. The resulting RDD is divided into a number of slices partitions. Next, this RDD is transformed via map to the RDD sample that contains a number of n random points Calculating Pi with Apache Spark inside the square Calculating Pi with Apache Spark . Finally, we filter out the points from the sample that lie in the interior of the unit disc and count these in order to obtain an approximative value for Calculating Pi with Apache Spark . Here counting represents the final action that triggers the evaluation of all previous RDDs along the dependency graph.

val slices = if (args.length < 0) args(0).toInt else 2  val n = math.min(100000L * slices, Int.MaxValue).toInt  val xs = 1 until n  val rdd = sc.parallelize(xs, slices)             .setName("'Initial rdd'")  val sample = rdd.map { i =>   val x = random * 2 - 1   val y = random * 2 - 1   (x, y) }.setName("'Random points sample'")   val inside = sample.filter { case (x, y) => (x * x + y * y < 1) }.setName("'Random points inside circle'")   val count = inside.count()   println("Pi is roughly " + 4.0 * count / n)

We can find a visual representation of the dependency graph of the final RDD inside after running the application by clicking either corresponding application id or name (here “SparkPi”) in the master web UI under the section “Completed Applications”. There one finds a link labeled “Application Detail UI”, which leads to more detailed information about the jobs and stages involved in the application. Our application includes only one job consisting solely of one stage, and by clicking on the corresponding link in the “Application Detail UI”, we finally find a representation of the dependency graph:

Calculating Pi with Apache Spark

Notice that we were able to set names for debugging/monitoring purposes in the application code by using the setName method provided by the RDD class, and that these names also appear in the visual representation of the dependency graph. This is for example helpful when it comes to the identification of performance bottlenecks in larger applications that involve more intricated ways of creating and transforming RDDs.

That’s all! If you want, you can stop the cluster using vagrant halt or can completely get rid of it with vagrant destroy -f after exiting from the master’s shell.

Summary

In conclusion, we described how to set up a small Spark cluster using Vagrant, and how to write and submit a simple application to the cluster. Finally, we saw how to make basic usage of the web UI for monitoring purposes.

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » Calculating Pi with Apache Spark

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
分享按钮