The hardest part of using existing stream processors is configuring and maintaining the multiple distributed systems required to keep them running — that’s why we created Concord.
Concord is an inclusive stream processing solution that takes care of systems’ configuration and management for you. That way, you can spend your time working on business logic rather than managing your infrastructure.
Just as important as the tech are the people building the product. We come from a background in Finance – counting things very fast – and Ad Tech – making people around the web click pretty buttons. Though stream processing is not limited to these use cases, it is important to note our background as it reflects our take on systems engineering.
Concord’s driving principles have always been: predictability, usability and high availability.
In our experience, predictable, simple systems yield the most reliable products overall. With simpler systems, we can deal with a process that crashes at midnight – by setting up supervision on this process. We can deal with eventually consistent databases if we know that the results will ‘at some point’ show up. We can monitor MySQL replication and restart it, throttle it, etc. In practice, engineering has been more about dealing with the mutable, fragile world than designing fancy algorithms.
When testing distributed systems , specifically streaming frameworks, the way we dealt with isolation of processes was via cloud providers’ Virtual Machines. We dealt with different security tiers via multiple clusters of the same software, etc. At the time we started experimenting with the ideas that would eventually form Concord, there weren’t many players in the market. There was not one system that behaved well in practice — deploying more computational topologies adversely impacted already-running systems. Though some of these problems still remain even via Linux containerization (noisy neighbor on Amazon/Google?) we are working to create a solution with technologies that would change for better predictability of stream processing.
Building atop Apache Mesos was the first step towards composing a predictable system. One needs isolation in a production environment and Mesos provided the most mature Network, Disk, Memory, and CPUs isolators. We wanted a way to change out individual components without disrupting the overall service of a system. This impulse came from our experiences making frequent updates to fraudulent click pipelines. Being able to detect fraudulent clicks quickly could save a lot of money for advertisers and publishers in the 30+ billion dollar market . In order to have the latest and greatest fraud detection models, we needed a fast compile-test-deploy cycle. While doing this fast, we wanted to maintain SLA’s. If you are down for some time, you have to spend money covering it up, usually via guaranteed payment.
We discovered that having a cluster abstracted as 3 numbers (CPU, Memory, Disk) per machine allows you to do much more than just isolating processes. There was now an abstraction that you could control programmatically to affect the overall runtime of your system. It was kind of like discovering an intermediary language to speak cluster optimizations only. Now predictability meant cluster wide guarantees.
With programmatic access to cluster constraints solved, we needed to tackle the next big problem for any JVM shop. It is unacceptable that streaming frameworks generally take 10-15% of your resources. People spend countless hours optimizing Garbage Collection strategies per type of a computation per instance. A big problem with JVM approaches can be categorized into 2 camps. First, the framework causes long GC pauses – this eventually gets solved (Apache Flink uses offheap techniques for example). Then comes your application’s code. There is just no way for a framework launched cluster wide to have good GC settings for every kind of workload.
In practice, dealing with GC in streaming frameworks ate a lot of development time and resources. If you don’t tune your GC settings per operator, you will end up with unpredictable latency behaviors. Writing the framework in C++ was a natural next step for us.
Usability & Debuggability
In an ideal world, one should be able to diagnose a problem with a distributed system with the same ease that you debug a local webapp. Monitor the logs, see the core dump of crashed processes, tail your application log, manage log levels, kill the process, re-run it (schedule it), etc.
At its core, what we really get with these debugging abilities is fast iteration on the problems we are trying to solve. We haven’t made it 100% as transparent as developing a local webapp, but we’re pretty close and trying hard to achieve this.
# heroku like deployment $ concord deploy config.json
# similar to kill -9 w/ a shutdown() hook $ concord kill -id "process-id"
# get a graphical representation of your topology $ concord graph
We weren’t the only ones to realize that when you get comfortable with streaming frameworks or thinking, the actual work that you end up building is more related to microservices than to a faster version of MapReduce jobs. To paraphrase the confluent.io folks
… these stream processing apps were most often software that implemented core functions in the business rather than computing analytics about the business.
From our background, we first saw the need for this approach while working at FactSet , translating financial query languages to a MapReduce-Like framework built in ancient C++ – before the standard library even existed. At this point our focus shifted from optimizing existing queries to embracing the fact that we could do so much more than simple data crunching. We realized that you could react to the information on an event-by-event basis and have finer grain alerts.
This realization reared its head again later in our careers while building a mobile ad exchange network at Yieldmo . We first started indexing our logs. Following that, we started adding monitoring jobs, eventually moving to a real time event notification system for detecting fraudulent clicks as the first use case.
In every case we realized that you end up deploying chained-operators (ala ‘microservices’) and requiring the same tools that you needed for monitoring your webapps. To operationalize your cluster, you’ll want to see your latency, throughput, memory usage, CPU usage, etc. We embedded probabilistic sampling of messages like Google Dapper. We monitor your
/proc/ for CPU usage and memory usage. We count all the records and print your throughput. That’s the batteries included approach that we thought would make stream processing easier for all developers.
In practice, we found that the most useful semantic added to make stream processing approachable to all developers was to create a single threaded callback style mechanism. Not only did this help us iron out a few bugs with the framework, it gave the programmer a simple programming model. As a programmer you register a few callbacks and the framework guarantees single threaded access to them. If you block, you propagate back-pressure. If you move fast, the framework moves fast. Our API is heavily inspired by the Google Millwheel paper.
void init(CtxPtr ctx); void destroy(); void processRecord(CtxPtr ctx, FrameworkRecord &&r); void processTimer(CtxPtr ctx, const string &key, int64_t time); Metadata metadata();
The outcome of this single threaded API combined with Concord’s dynamic topology mechanism allows developers to think of parallelism of components via the abstraction of containers. We support dedicating more CPUs via Linux containerizers in case you decide to scale vertically as well.
We acknowledge that this is a more complex system than just adding a Jar to your class path. However, the alternative is that you end up gluing together a set of disparate systems (Mesos/Yarn + Marathon/Aurora + Monitoring + Alerting + Your code + … )
On the positive side, you can build cluster wide optimizations like constraint solving that would not otherwise be possible by gluing together existing open source projects. The optimizations that we have planned for future releases include host-based network optimizations, security constraint solving (deploying on subset of a cluster), dynamic scaling to support backpressure on cloud environments, programmatic access to failure, TCP kernel bypass , etc.
Last, we strongly believe that stream processing should not be solely for the few distributed systems engineers at your company; interviewing dozens of companies from our customer discovery process that reinforced this belief. In fact, the opposite should be true. Every developer should be able to build, deploy, release software without worrying that it will take down the entire cluster. You should be able to deploy without fear that a JVM system would not eat up all of the memory on every box causing other processes to starve. To serve this principle, we built first class citizenship for python , C++ , Java/Scala , Go and ruby .
We still have much ground to cover.
We think of Concord as an execution engine. It is very low level as it stands today. It is an abstraction where each operator is a networked function, with typed erased arguments and returned values. It’s all bytes. However, this is the base building block on which to build new API’s that have rich semantics like the Apache Beam (previously Google DataFlow).
No one that has attempted to run a system without High Availability has come back for a second cup. For Concord this meant a series of design decisions both on the scheduler – our brain – and our executor – the main engine.
First, we want to clarify that we are an at-most-once, best effort delivery. We agree with you and acknowledge that this only works for a small subset of use cases. Finance companies accept this risk all the time with their market data products. Their networks tend to be relatively reliable per cluster and most are not doing multi-data center operations. Everyone of them has an active-active DC layout where everything is 100% replicated, ready for DNS hot swap if needed. We have found during our testing period that people are OK with this risk which have thankfully allowed us to iron out some bugs that you can’t reach easily with unit testing. The real world is utterly unpredictable and filled with double-firewalls.
The biggest simplification for failure that we could reason about is fail-fast. This allows our central scheduler to perform cluster wide, task centric or instance based Quality of Service. Our schedulers support High Availability through Zookeeper and the Zookeeper Curator library. With this tool in hand we set out to build a native fail fast approach for all the containers launched in the cluster. In practice this is what we found to be the easiest way to explain Concord to clients.
Everything will fail. We generalize failure and fail fast. Everything that we can’t reason about leads to a failed process. You control how you react to failure.
Concord is a very young framework with a new approach to streaming that enables dynamic workflows still not possible with any alternative we know of today!
We believe in:
- First class citizenship for multiple languages – Ruby,Python,Go? we got you.
- Native language support
- Runtime modification of your computational topology
- Containerized environment – don’t be afraid to iterate fast
- Centralized scheduling mechanism for constraint solving
At this point we have most of the big pieces in place, however, we still have to partner with a replication system for improved message delivery guarantees. Not to mention a better DSL for data manipulation like Apache Beam among the other milestones for decreasing byte-copying inside the framework, improved memory allocators, smarter crash analytics, simpler metrics and monitoring and much much more!
If this was interesting to you, give Concord a try.Get started here