Funnel Rocket: A Cloud Native Query Engine

For more context on why this project came to be, see: All Clusters Must Serve, All Clusters Must Die. To learn if cloud native actually means anything, read on.

The Challenge

As a SaaS vendor, success is getting your product installed in websites and apps which many millions of people frequent daily. Whether you’re selling tools for analytics, recommendations, personalization, guided onboarding, surveys, media serving or whatnot, one crucial feature for your customers is the ability to freely explore the complexities of end-user behavior, to understand the makeup and value of distinct user groups and evolve the site accordingly — potentially tailoring different experiences on their site to different user groups (a surprise note: that is also achievable without tracking people across the web or storing personal details).

Dynamic Yield’s Audience Explorer (shown at 0:27)
  • I wanted an engine optimized for the type of complex user-centric queries I needed to support, with the ability to find bottlenecks, optimize them and add functionality natively.
  • I wanted a tool that’s suited for fast on-demand interactive requests but also for big batches of scheduled queries, optimizing each use case for cost and being in control of the SLA.

Building Blocks

Assorted toy parts
Assorted toy parts
Photo by Vanessa Bucceri on Unsplash
  1. Serverless as a first-class option to support fast scaling of workers and paying only for actual compute time (currently supporting AWS Lambda). The solution should also happily run on old-school machines or containers.
  2. Redis for assisting with orchestration and making the code environment-agnostic as possible, no less than for metadata storage.
  3. Cloud Storage (currently S3 or compatible stores) to hold datasets. Funnel Rocket does not have an “ingestion” phase — it reads datasets that were prepared and written to S3 by other components at their own pace.
Diagram of Funnel Rocket and Your System
Diagram of Funnel Rocket and Your System
Funnel Rocket and Your System. Most icons by the Noun Project.

A. Pandas-based workers

Pandas and its concept of DataFrames are widely used in the data engineering/science communities. Pandas itself is library embedded within a single process, and we planned for each worker process to handle one file at a time out of the whole dataset, not worrying about any larger context.

B. Running Serverless — or Server-full

Serverless is quite polarizing, I know: the whole paradigm, the proprietary implementations, the latency, the constraints… I did not come into this project sold on the idea, at all. For this use-case, however, AWS Lambda did prove itself to be reliable, fast and (gasp) cost efficient, while 99% of the code is agnostic to it and can be easily extended to support other implementations.

C. Managing it All with Redis

Two traits of Redis, taken together, make it a natural choice for a wide range of functionality:

  1. Operations are atomic by nature, giving you a pretty strong guarantee that only one consumer will pop a specific element from a list or set (I realize there are always some nuanced caveats and edge cases, particularly around fail-overs).
  • Reporting the current status and results of all tasks in an in-flight query: instead of needing to communicate directly, workers write all updates are to Redis by workers as tasks progress, and these updates are polled by the API server. The API server knows whether a task succeeded, failed or got “lost in space” through its status in Redis. It does not need to rely on AWS Lambda or any other runtime environment to get this information.
  • Atomic counters for numbering consecutive attempts at running a failed/lost task. Each attempt of a task has its own unique attempt number. If multiple attempts for the same task end up succeeding, the API server will take care to only use the results of one.
  • Enqueuing work in the non-serverless mode: in the “classic process” mode, each worker is simply blocking on a Redis list to get work to do. In this mode, when a query starts the API server enqueues task requests in a logical queue, from which tasks are popped by workers. Statuses and retries are handled as above — through Redis as well, but regardless of the task invocation mode. No load balancer required; workers don’t know of each other, nor does the API server know of them directly.
  • Best-effort optimization for data locality: in the case of repeated queries over the same dataset, how do we make warm functions work on the same file they’ve downloaded before?
    There is no direct way to call a specific warm instance. Instead, the API server lets the workers choose for themselves which file to work on. When a query starts, it publishes the list of files as a Redis set. Then it invokes workers, asking them to each take take one file off the set. Those with matching local files will attempt to grab that same file again. Others will grab a part at random.This mechanism is not guaranteed to always have maximum efficiency, but it does make orchestration needs minimal. The API server is a Jon Snow here: it knows nothing.

Benchmarking

As a basic benchmark I’ve created a dataset which mimics user behavioral data (pageviews, events, clicks on campaigns, …), similar as possible to real data we collect in Dynamic Yield.

  • Each file weights a bit less than 40mb. The small dataset weighs 3.8 GB in total, the larger one clocks in at 19.6GB.
    However, that can be very misleading: Parquet files are typically highly-compressed. The exact same files saved as CSV are about 10x the size, which would make the datasets weigh 38 & 196 GB.
  • The Lambda function was configured to use 1768 MB RAM (per instance) so that each instance gets one full vCPU. It doesn’t need all that RAM necessarily, but will simply run more slowly with less as it’ll get a smaller CPU share (see here).

Results

Notes

  1. Time is the total time to run the query from the viewpoint of the API server. However, cost is the total billed amount for the Lambda runs, based directly on total milliseconds of actual handling time * units of allocated RAM * cost per unit/ms. I did not include per-request Lambda and S3 fees, which are for this use case much lower.
  2. Admittedly, these are preliminary results. More scenarios would be added here, with more iterations per each so that histograms of total duration, cost and task completion over time can be added.

Cloud Native: Is That a Thing?

It was personally exciting for me to see all this working in practice. There are of course a lot of missing features to wish for, surely a few bugs, and a bunch of optimizations and improvements yet to do (see the high-level roadmap).

  • Relying on existing, managed constructs to scale and orchestrate. “The Cloud” may be someone else’s computer, but it also offers many other services to build with: cloud storage, managed databases, serverless/containerized environments, and more.
  • Each component should have modest requirements (compared to a Spark executor, in this case). It should start quickly, take (relatively) little RAM and just do its work. It should report metrics for observability.
  • Lastly, it should also work fully on your computer. We use docker-compose, MinIO and docker-lambda to support that.
Last industrial-themed image! (photo by Marcin Jozwiak)

Software (develop, architect, manage, imagine)