## March 25th, 2016

Daniel Blazevski

Insight Data Engineering

Program Director

During the seven-week Insight Data Engineering Fellows Program experienced software engineers and recent grads learn the latest open source technologies by building a data platform to handle large, real-time datasets. Daniel Blazevski (now a Program Director and Data Engineer at Insight) discusses his project of building a quadtree data structure to improve the performance of Apache Flink’s k-nearest neighbors algorithm.

A young and exciting open source tool for distributed data processing known as Apache Flnk has recently emerged as a player in the data engineering ecosystem. Similar data processing tools do indeed already exist, most notably Spark, Storm and Hadoop MapReduce. Compared to existing technologies, Flink has a unique framework, placing batch and streaming into a unified streaming framework. In contrast, Spark is a batch processing tool and the Spark Streaming lumps relatively small amounts of data into "micro-batches". Storm is able to process data one-by-one in a purely streaming way, though does not have a batch processing framework.

Flink, on the other hand, operates in a purely streaming framework

Flink, on the other hand, operates in a purely streaming framework, and instantiates the vision of Jay Kreps of the kappa architecture . The quick rise in popularity and development of Flink should be noted: Flink started as a university project in Berlin, and in a matter of a mere eight months Flink went from Incubator status to becoming a Top-Level Apache project in December 2014. Even more recently, Yahoo wrote a blog about their experiences in using Flink in comparison to other tools.

k-Nearest Neighbors (kNN) for Flink For my Insight project, I improved on an initial brute-force approach to add an exact k-nearest neighbors (kNN) algorithm by writing a quadtree data structure. I have since made a pull-request , and the Flink community intends to soon merge the work with its main body of source code.

The kNN algorithm is one of the fundamental classification algorithms, and has an endless amount of applications in data science. Given a training set *A* and test set *B* , the kNN query can formally be stated as follows: form the set *(b, A_b)* of tuples of points *b* in *B* and the *k* geometrically closest points *A_b* in *A* to *b* .

The kNN algorithm is one of the fundamental classification algorithms, and has an endless amount of applications in data science

The brute-force method computes the distance between a given test point and *all* the points in the training set. The diagram illustrates this method, where the blue dots represent training points and the gold star a single test point

Even for a modest number of points — e.g. 50,000 test and training points — the brute-force method can take hours to run on a single CPU.

The kNN Query Using a Quadtree A quadtree is a dynamically constructed object on the training set, and one starts by forming a bounding box on the training set. Once the bounding box has more than some specified value *maxPerBox* of training set points, the box is partitioned into equal sub-boxes. Once each sub-box has more than *maxPerBox* of training points, it is further partitioned, as demonstrated by the green sub-boxes in the diagram

A quadtree is a dynamically constructed object on the training set…once the bounding box has more than some specified value of points, the box is partitioned into equal sub-boxes.

The intuitive idea of partitioning the training set into smaller sub-boxes is appealing, though there are some notable challenges in using the quadtree for the kNN query, namely some of the k-nearest neighbors may not be in the minimal bounding box of the gold star. The following diagram, for example, shows red points in the training set that are closest to the test point.

A clean efficient way to search both in a test point’s minimal bounding box *and* surrounding area is needed. Defining the "surrounding area" of a test point is in fact the most delicate part. The leaf-nodes of the quadtree are the only nodes containing a non-empty collection of objects in the training set, and the key to defining the "surrounding area" of a test point is to construct a min-heap on the leaf nodes of the siblings belonging to the minimal bounding box in which a given test point lies in, where the priority of a node is a given by a suitably defined notion of the distance from a point to a box *minDist* [1]

The machine learning library of Flink is written in Scala, and this project was my first production-level Scala code. The portion of the QuadTree class that I wrote which contains the methods to recursively and efficiently search though the tree and construct a heap of boxes for a given test point is:

Distributing the Algorithm in a MapReduce Framework The initial brute-force approach for kNN was already in a MapReduce framework where the training and test sets were distributed to multiple threads, local kNN queries were made on each thread and ultimately gathered to a single node and sorted [2]. When the training set *A* and test set *B* are large and partitioned into blocks *A_i* and *B_j* .

How do we split up computation across nodes? Suppose that *b* is a test point in block *B_1* — one has to check every block *A_i* to find the k-nearest neighbors. To minimize data movement across the network, the block *B_1* should then be stored locally with each block *A_i* . The same goes for all the blocks *B_j* , meaning if *A* and *B* are broken into *n* blocks, we form *n^2* buckets containing *A_i* and *B_j* . This is achieved using the cross product between *A* and *B* , namely *A x B* consists of all tuples *(a,b)* . Flink’s DataSet API will automatically distribute the data stored in the variable *A x B* across multiple workers.

The set *A x B* set can be quite large, for example if *A* and *B* are both integer arrays with 100,000 entries, they each are 0.4MB in size. The cross product is a whopping 40GB in size. Flink has built in optimizers that allow a user to give a hint as to whether *A* or *B* is small so that the smaller set is broadcasted to the nodes. The cross product is then split up into a user-specified number of blocks, and the distance computation is computed on each block.

Distributing the Algorithm Using a Quadtree Care needs to be taken when distributing the k-nearest neighbors computation using a quadtree. For example, one might be tempted to create a global quadtree for the entire training set. However, to reduce the amount of communication between workers, we still form the cross product between the testing and training sets and form blocks. On each block, we then form a quadtree on the training points in that block. Thus, there are as many quadtrees formed as there are blocks, and a more efficient k-nearest neighbors search is done on each block.

On each block, we form a quadtree on the training points in that block. Thus, there are as many quadtrees formed as there are blocks, and a more efficient k-nearest neighbors search is done on each block.

The quadtree greatly improves the scaling of the k-nearest neighbors query with respect to the number of training points. It does, however, scale poorly in the dimension. The good scaling is intuitively expected since we are no longer computing the distance between all test and training points. The reason the computation scales poorly with increasing spatial dimension *d* is that for a box in *d* -dimensional space, it has *2^d* subboxes, many of which will be empty for large dimensions.

To deal with this issue, a conservative complexity estimate was introduced to determine whether or not the quadtree would improve performance. The code for the k-nearest neighbors algorithm in Flink with a MapReduce framework that decides when to use a quadtree or not, is below:

Summary The Apache Flink project is an exciting, bleeding-edge open source tool for distributed batch and stream processing of data. I was glad to have the opportunity to work on a basic machine learning algorithm to the Apache Flink project. There are many additional ways to contribute to the Flink project, and I found it rewarding to make a meaningful contribution to the Flink project in a short amount of time as an Insight Data Engineering Fellow.

For more information about the implementation, the code for this project is available on GitHub .

转载本站任何文章请注明：转载至神刀安全网，谢谢神刀安全网 » Planting Quadtrees for Apache Flink