Funnel Rocket: A Cloud Native Query Engine
For more context on why this project came to be, see: All Clusters Must Serve, All Clusters Must Die. To learn if cloud native actually means anything, read on.
As a SaaS vendor, success is getting your product installed in websites and apps which many millions of people frequent daily. Whether you’re selling tools for analytics, recommendations, personalization, guided onboarding, surveys, media serving or whatnot, one crucial feature for your customers is the ability to freely explore the complexities of end-user behavior, to understand the makeup and value of distinct user groups and evolve the site accordingly — potentially tailoring different experiences on their site to different user groups (a surprise note: that is also achievable without tracking people across the web or storing personal details).
The customer expects a UI for ad-hoc exploration with which they can look for users who’ve met a nuanced set of conditions over time, perhaps completed a specific sequence of actions in order, then get back detailed analytics showing not only the size of that group but also details metrics on how this audience group stands out from the whole of their user base.
For a glimpse of how such a feature looks like in Dynamic Yield (the company I work for), jump to minute 0:27 in the video. Please do forgive me for the linking to a marketing video. In my defense, it’s the nicest clip I’ve found…
There’s one problem, though.
Such interactive queries are a challenge to execute in with existing tools — either SQL or NoSQL. Getting the query syntax right is cumbersome but essentially a one-time effort. The main issue is performance at scale. Consider what it takes to implement this as a GROUP BY <USER ID> query:
No query engine I know will let you define in one step the whole sequence of actions that each group (i.e. each user) should match, in order. You will need to break this into multiple sub-queries, passing intermediate data between them, meaning multiple passes by the engine. In each pass, the engine will search (per each user) for a row whose timestamp is later than the row found in the last step, for that specific user. Such complex cases are typically expressed via composition, by design; it’s not for the the query engine to offer the simplest shortcut for every imaginable need.
There’s a performance cost to that complexity, however. At a small scale, it will work fine. However, take a dataset of 20 million users (for just a single customer!) and that’s 20 million groups per query.
Is it doable? yes.
(assuming the user activity data is bucketed in the right way and the engine utilizes that fact, otherwise you’re basically screwed).
Is it any fun, fast, or cheap? not really. Remember, I’ve set some lofty goals at the end of my previous post: I wanted maximum processing speed just when I need it, paying for exactly the needed compute resources and then going back immediately to zero resources used and paid for.
Managed query engines (e.g. Amazon’s Athena, Google’s BigQuery) do offer on-demand pricing by the amount of data actually scanned. Your queries are executed behind the scenes by a small slice of their vast army of workers. This option was simply not available when we started, and thus we’ve built our homegrown solution to this problem with Elasticsearch + custom plugins. It was cool for a time, but became an operational and maintenance burden.
Considering the managed tools available to us now, there are some things still left to be desired:
- While these managed offerings are no doubt ultra-scalable and offer an impressively rich syntax, they’re simply not meant to serve interactive end-user queries: query latency can widely fluctuate, with some occasional hiccups — which is fine for your internal BI needs or a pipeline, but a real problem if you want to ensure that your users get latencies in the low single-digits.
- I wanted an engine optimized for the type of complex user-centric queries I needed to support, with the ability to find bottlenecks, optimize them and add functionality natively.
- I wanted a tool that’s suited for fast on-demand interactive requests but also for big batches of scheduled queries, optimizing each use case for cost and being in control of the SLA.
The user-centric query feature is a core element in our product, so it was important to us to have control of our capabilities — meaning a “build” option was on the table. Of course, we didn’t set out to match the extensive feature-set of general purpose query engines. We started with solving one problem well, and the result is Funnel Rocket.
Here’s my part list:
- Pandas for data crunching at the single-task level, coupled with a few helpers — notably Apache Arrow, with perhaps Numba for critical performance pain points. The general idea was to have Pandas-based workers processing one Parquet file at a time out of a much bigger dataset.
- Serverless as a first-class option to support fast scaling of workers and paying only for actual compute time (currently supporting AWS Lambda). The solution should also happily run on old-school machines or containers.
- Redis for assisting with orchestration and making the code environment-agnostic as possible, no less than for metadata storage.
- Cloud Storage (currently S3 or compatible stores) to hold datasets. Funnel Rocket does not have an “ingestion” phase — it reads datasets that were prepared and written to S3 by other components at their own pace.
To bind it all, we wrote an API server and task worker components in Python 3 that are pretty lightweight. The API server has endpoints for managing datasets and running queries, expressed in JSON format. It takes care to validate a given query, invoke the needed workers, track their progress, retry tasks as necessary, and aggregate the results back to the client. Optionally the client can ask for progress updates via HTTP streaming.
Here’s how it works, from the ground up:
A. Pandas-based workers
Pandas and its concept of DataFrames are widely used in the data engineering/science communities. Pandas itself is library embedded within a single process, and we planned for each worker process to handle one file at a time out of the whole dataset, not worrying about any larger context.
Pandas itself is pretty performant and feature rich out of the box. If it doesn’t offer some needed functionality, you can have it run Python code (though that’s pretty slow). On the other end, you can use Numba to generate optimized LLVM-compiled code from carefully-written Python functions. In my experience, that does deliver the (performance) goods.
We use Apache Arrow to augment Pandas with excellent support for Parquet files, and will probably use it much more going forward. Unlike Pandas, it supports nested data structures natively, and has a growing feature set for computation and querying of its own. I can only thank Wes McKinney (and the other industrious contributors!) for creating both Pandas and Arrow.
When a worker process is allocated a task, it transforms the JSON query syntax into a series of in-memory operations over a DataFrame, and returns the aggregated results for the single file it was assigned to process.
But wait, I’m cheating a bit.
For each worker to be independent in its processing, files in the dataset must already be partitioned (or “bucketed”) by user ID into a set of n files, so that each file holds data on a distinct set of users, and each user’s data is guaranteed to be in a single file only. That shuffling stage is arguably the biggest performance pain in big data, and I rely on it being performed beforehand.
In my defense, that’s only executed when a dataset is being created/updated, rather than on each query. Plus, it’s really a precondition for running the relevant queries with anything nearing decent performance on any tool.
B. Running Serverless — or Server-full
Serverless is quite polarizing, I know: the whole paradigm, the proprietary implementations, the latency, the constraints… I did not come into this project sold on the idea, at all. For this use-case, however, AWS Lambda did prove itself to be reliable, fast and (gasp) cost efficient, while 99% of the code is agnostic to it and can be easily extended to support other implementations.
There are several reasons why it works well for Funnel Rocket:
Queries in Funnel Rocket are measured in seconds, not milliseconds.
When Lambda function instances are cold, they will normally take ±3–5 seconds to get to the point where the handler function starts running (that time does include running all imports in the handler’s source file).
Luckily, Funnel Rocket serves a rather “power user” feature, so while there tend to be few customers concurrently using the feature, each user typically runs multiple iterations of a query over their own dataset, progressively tweaking conditions to zoom in on the user population. This means that for most queries not only are Lambda functions warm, but performance can further benefit from local caching of data files — if you can make that work for serverless. More on that in a bit…
The per-second cost is indeed higher than all the less-managed options. It’s a spectrum, really: from spot instances, through reserved VMs to on-demand ones, to Fargate, etc. — the more you need to worry about, the less you pay in compute (and more in operations).
However, you pay only for actual processing time: from the exact millisecond your entrypoint started execution till the millisecond it ends. That excludes the bootstrap time and the function instance staying warm for a while to be re-used. We found this model to fit our query patterns well: maximum scale when there’s a query, zero resources and cost at all other times. As a side effect of that, the cost is directly correlated to milliseconds of compute used — for each query request Funnel Rocket returns the exact cost back.
Ease of invocation
Funnel Rocket uses the asynchronous Lambda invocation API, making it easier to launch hundreds of jobs (or more) quickly, without needing to block waiting on all API calls to complete. Internally AWS manages such calls with a queue, but we’ve found that it adds no meaningful latency in normal operation. This mode has an important extra benefit: in moments of momentary pressure, you mostly avoid rate limiting on concurrent function executions.
Serverless, however, is not the only way to fly. I wanted the ability to have a small army of workers that can run locally, on physical machines, VMs or containers — while still making it easy to scale up or down, distribute work, handle failures, etc.
Luckily, there’s Redis. Yes, the key-value store.
C. Managing it All with Redis
Two traits of Redis, taken together, make it a natural choice for a wide range of functionality:
- It has a wealth of data structures over its basic key-value abstraction: lists, sets and ordered sets, atomic increments, HyperLogLogs, streams, and nowadays much more.
- Operations are atomic by nature, giving you a pretty strong guarantee that only one consumer will pop a specific element from a list or set (I realize there are always some nuanced caveats and edge cases, particularly around fail-overs).
Redis is used by Funnel Rocket for:
- Holding metadata on registered datasets and their schema. Easy.
- Reporting the current status and results of all tasks in an in-flight query: instead of needing to communicate directly, workers write all updates are to Redis by workers as tasks progress, and these updates are polled by the API server. The API server knows whether a task succeeded, failed or got “lost in space” through its status in Redis. It does not need to rely on AWS Lambda or any other runtime environment to get this information.
- Atomic counters for numbering consecutive attempts at running a failed/lost task. Each attempt of a task has its own unique attempt number. If multiple attempts for the same task end up succeeding, the API server will take care to only use the results of one.
- Enqueuing work in the non-serverless mode: in the “classic process” mode, each worker is simply blocking on a Redis list to get work to do. In this mode, when a query starts the API server enqueues task requests in a logical queue, from which tasks are popped by workers. Statuses and retries are handled as above — through Redis as well, but regardless of the task invocation mode. No load balancer required; workers don’t know of each other, nor does the API server know of them directly.
- Best-effort optimization for data locality: in the case of repeated queries over the same dataset, how do we make warm functions work on the same file they’ve downloaded before?
There is no direct way to call a specific warm instance. Instead, the API server lets the workers choose for themselves which file to work on. When a query starts, it publishes the list of files as a Redis set. Then it invokes workers, asking them to each take take one file off the set. Those with matching local files will attempt to grab that same file again. Others will grab a part at random.This mechanism is not guaranteed to always have maximum efficiency, but it does make orchestration needs minimal. The API server is a Jon Snow here: it knows nothing.
Multiple API server instances can run concurrently, only sharing the list of registered datasets between them. A portion may run ad-hoc queries via serverless workers, while others run batch work through cheap spot-instances at scheduled times. Both deployment options push much of the complexity into battle-tested tools and have Redis as their single stateful component.
As a basic benchmark I’ve created a dataset which mimics user behavioral data (pageviews, events, clicks on campaigns, …), similar as possible to real data we collect in Dynamic Yield.
I’ve made two versions of the dataset: a smaller one with 100 million rows, the other with 500 million. Each row holds a user ID, a timestamp, the type of activity and related data fields (URLs, browser type, country, event type, product SKU, etc.).
- The datasets are split to 100 and 500 Parquet files stored in S3, respectively.
- Each file weights a bit less than 40mb. The small dataset weighs 3.8 GB in total, the larger one clocks in at 19.6GB.
However, that can be very misleading: Parquet files are typically highly-compressed. The exact same files saved as CSV are about 10x the size, which would make the datasets weigh 38 & 196 GB.
- The Lambda function was configured to use 1768 MB RAM (per instance) so that each instance gets one full vCPU. It doesn’t need all that RAM necessarily, but will simply run more slowly with less as it’ll get a smaller CPU share (see here).
I’ve run a basic funnel query: find the users who have performed a pageview, then added a product to cart, then made a purchase — and return the matching user count at each step.
For both datasets, I measured the time to run and compute cost in two scenarios: when all Lambda function instances are cold, and when all are warm. Each scenario was run 10 times.
For 100 million rows:
For 500 million rows:
- Time is the total time to run the query from the viewpoint of the API server. However, cost is the total billed amount for the Lambda runs, based directly on total milliseconds of actual handling time * units of allocated RAM * cost per unit/ms. I did not include per-request Lambda and S3 fees, which are for this use case much lower.
- Admittedly, these are preliminary results. More scenarios would be added here, with more iterations per each so that histograms of total duration, cost and task completion over time can be added.
Cloud Native: Is That a Thing?
It was personally exciting for me to see all this working in practice. There are of course a lot of missing features to wish for, surely a few bugs, and a bunch of optimizations and improvements yet to do (see the high-level roadmap).
IMO, the software that resulted encapsulates what “cloud native” is about:
- Having a large functional task broken down to smaller pieces of work that can be run in multiple runtime environments, so that availability and cost are optimized depending on what you need (interactive or batch? more compute cost or more operations?)
- Relying on existing, managed constructs to scale and orchestrate. “The Cloud” may be someone else’s computer, but it also offers many other services to build with: cloud storage, managed databases, serverless/containerized environments, and more.
- Each component should have modest requirements (compared to a Spark executor, in this case). It should start quickly, take (relatively) little RAM and just do its work. It should report metrics for observability.
- Lastly, it should also work fully on your computer. We use docker-compose, MinIO and docker-lambda to support that.
I now think cloud-native is actually a thing.
Go to Funnel Rocket on GitHub.