Spark Scheduling in Kubernetes

Palantir
Palantir Blog
Published in
9 min readMay 16, 2019

--

This is the second post in our blog series on Rubix, our effort to rebuild our cloud architecture around Kubernetes.

In 2018, as we rapidly scaled up our usage of Spark on Kubernetes in production, we extended Kubernetes to add support for batch job scheduling through a scheduler extender. In particular, we sought to mitigate problems from partial scheduling during cluster oversubscription. This blog post introduces our open-source k8s-spark-scheduler extender, explains how it works, and provides insight into the results we’ve seen running it in production at Palantir.

Introduction

As long-term, enthusiastic participants of the Spark community, we were excited to use Spark as one of the frameworks for executing user-authored code within Palantir Foundry. Foundry provides our customers with a common data platform for complex data analyses and transformations. Foundry executes two different classes of workloads: short-lived dedicated-use Spark applications for batch processing, and long-lived multi-tenant Spark applications that serve interactive user queries. Today, we run tens of thousands of Spark applications daily on thousands of hosts across hundreds of environments.

The first post in this series tells the origin story of Rubix: Palantir’s deployment infrastructure team decided to migrate Foundry’s cloud architecture to Kubernetes and thus committed to supporting secure multi-tenant Spark clusters on top of Kubernetes. The first step was an open-source effort (in conjunction with engineers from multiple organizations) to add Kubernetes support to Spark. While this endeavor was successful in the sense that it merged to mainline Spark and worked well for individual Spark applications, Kubernetes support for multi-container batch workloads was still immature compared to YARN. We developed k8s-spark-scheduler to solve the two main problems we experienced when running Spark on Kubernetes in our production environments: unreliable executor scheduling and slow autoscaling.

Spark on Kubernetes

Kubernetes objects such as pods or services are brought to life by declaring the desired object state via the Kubernetes API. A Kubernetes controller continually observes the difference between desired and actual state and takes steps to achieve the desired state, for example, by launching new pods on machines with unused compute resources.

Spark applications consist of a single-driver process (aka “master”) and a variable number of executors (“workers”). When executed on Kubernetes, the driver and the executors are all executed as separate pods which are launched in a two-step process: first, the Spark client launches the driver pod, then the driver pod itself launches the executor pods.

The upside of this design is that the driver itself is in charge of managing its executors, for instance in order to dynamically scale up the cluster. However, this design decision has a critical downside: the scheduler has no idea how many executors the driver will need until after the driver has successfully started. This doesn’t pose a problem in clusters with abundant resources, but when the cluster is “too small” for the number of Spark submissions, a client could theoretically saturate the cluster with drivers. In this scenario, none of the drivers would acquire executors, so no work could be completed. In practice, we observed a more localized version of the problem: occasional, unlucky applications would get executor-starved, leading to unpredictable performance degradation.

Additionally, because the pods from different Spark applications were scheduled individually and in (relatively) arbitrary order, it was possible for small applications to skip ahead of larger ones. This led to higher rates of resource starvation — and subsequent performance degradation or failure — amongst larger, more expensive workloads. Let’s look at a simple example:

Example: Two applications, A (blue) and B (pink), are to be launched. Note that pods for drivers and executors of different applications may occur in arbitrary order, e.g., executor A.3 occurs after driver B. Let’s assume that the cluster has sufficient free resources to launch pods for driver A, executors A.1 and A.2, as well as driver B. It is possible that the cluster now has insufficient resources for the expensive executor A.3, but sufficient resources for the cheaper executor B.1. Thus, the cheaper application B was launched successfully, while the more expensive application A is still waiting for executor A.3 before it can make progress.

Gang scheduling. A more desirable behavior for Spark applications is gang scheduling. Instead of scheduling pod-by-pod, we want to launch pods application-by-application:

Example: Using the same example as above, the goal of gang scheduling is to schedule either all of A’s pods before any of B’d pods, or vice-versa. Note that depending on the available resources in the cluster, this strategy may mean that no new pods get launched in a given scheduler cycle, even if the cluster has available resources for some of them. Here, the scheduler must not launch B’s driver or executor B.1 before A.3 was launched.

First-in-first-out. Gang scheduling ensures that Kubernetes never launches partial applications. However, by itself gang scheduling does not solve the starvation problem. For example, if the scheduler continually launches small applications while small amounts of resources become available, larger applications may never get scheduled at all. In this post we discuss a simple solution to the starvation problem: we schedule applications in the order they were submitted, i.e., in FIFO order. In the example above, we launch application A before application B because driver A was submitted before driver B.

FIFO gang scheduling in Kubernetes

Let’s now see how to implement FIFO gang scheduling with the Kubernetes scheduler. In a nutshell, the Kubernetes scheduler maintains a queue of pods to be scheduled, pops a pod off the queue, and tries to find a suitable host for the pod. The extender API allows us to implement a custom filter for sub-selecting the pods that should be scheduled. Our filter implementation relies on driver pod annotations declaring the amount of memory and CPU resources required by the driver and the executors. Equipped with such annotations, the filter implementation for gang scheduling is straightforward:

// returns true if the given driver pod should be launched
shouldScheduleDriver(pod p):
cpuAndMem = extract declared driver and executor resources from p.annotations
if cluster has sufficient free cpuAndMem resources:
for i in 0..p.numExecutors:
create reservation object 'executor-{p.appId}-{i}'
return true
else
return false // see "Cluster autoscaling" paragraph
// returns true iff the given executor pod should be launched
shouldScheduleExecutor(pod p):
if there exist a reservation object 'executor-{p.appId}-{p.executorId}':
return true
else
return false

In order to schedule a driver pod, we check whether the cluster currently has sufficient free CPU and memory resources to launch the driver and all of the required executors. If this is the case, we launch the driver and create a custom resource reservation object for each executor. This reservation object is consumed when deciding whether to schedule a given executor pod in shouldScheduleExecutor: we only launch those executors for which a driver has previously been launched. Together, these two filters implement gang scheduling for Spark driver and executor pods.

To implement FIFO scheduling, we add an additional condition to shouldScheduleDriver: we only launch a driver if all previously submitted drivers have either been launched already, or if the cluster has sufficient resources to launch all of them and their executors. This means that drivers can get scheduled out of order, but only if we can guarantee that all previously submitted drivers can get launched right away. This slight departure from “true FIFO” is a performance tweak in that it avoids redundant empty scheduling loops.

The following diagrams depicts the whole sequence of events:

Cluster autoscaling

Existing autoscaler services such as Atlassian’s Escalator and Kubernetes Autoscaler use pod resource requests to infer the target cluster capacity. For Spark applications, this model would incur substantial scaling latencies since executor pods do not exist until after the driver has declared them. To solve this problem, we create custom resource demand objects whenever the cluster is too small to schedule a given Spark application; in the pseudo-code above, these objects are created in the false branch in shouldScheduleDriver, i.e., exactly in the case when the cluster has insufficient capacity for the driver and its executors. We run a custom autoscaling controller called Scaler that acts on these demand objects and adjusts cluster scale accordingly.

Conclusion and outlook

The combination of FIFO gang scheduling and resource demands has allowed us to provide tunable and reliable performance for Spark applications. We offer different “lanes” (implemented as AWS autoscaling groups and Kubernetes node selectors) for mission-critical or interactive compute jobs and thus put our customers in control of the cost-vs-performance tradeoff.

Today, we run the Spark Scheduler Extender in production in every Kubernetes cluster across our fleet. Our larger clusters have it processing scheduling requests for more than 10 pods per second and execute hundreds of concurrent Spark jobs across 1000+ nodes. Additionally, we are able to realize much better overall utilization of the cluster through efficient use of autoscaling, where we regularly see 100+ hosts spin up in minutes to handle demand surges.

We’re really just getting started with how we think about scheduling Spark on Kubernetes. As our customers continue to integrate Foundry into their business operations, we continue to find new challenges. Today, throughput is limited by the rate at which resource reservations can be created in k8s-spark-scheduler — this is especially critical as we look to double the size of our largest clusters over the next 12 months.

There is also much more we want to do towards making scheduling Spark on Kubernetes more intelligent. A few of the areas we are investigating:

Start-up performance. We often see variable startup delays due to a backlog in the Kubernetes scheduler. We hope to minimize the amount of time drivers are idling by prioritizing scheduling of their respective executors. We experimented with using k8s pod priorities in Kubernetes 1.10 and 1.11 but experienced issues with pods getting stuck when they couldn’t be scheduled. We will pick up this thread as soon as we upgrade our clusters to Kubernetes 1.14.

AZ-awareness. Today, we run autoscaling groups in multiple availability zones (AZs) in order to mitigate the risk of not having enough capacity in a single AZ. Unfortunately, this has a side affect of increased cross-zone data transfer costs. A possible mitigation is to make the node selection process availability-zone aware, where we prefer to fit as many pods for a single job in one AZ as possible.

Spot instances. We’re exploring the use of spot instances for certain resource pools. Spot instances can provide massive cost savings but can also be interrupted at any time. If a driver pod gets interrupted, the entire job fails and has to be restarted from scratch. The impact of losing executor pods is less dramatic but still not free. The use of spot instances trades predictable performance with cost; this certainly won’t work for all user workloads, but our customers are often willing to accept this trade-off for workloads that aren’t mission-critical or time sensitive.

The next blog posts in this series on Rubix will cover Rubix networking and security, so stay tuned!

Candidates who are interested in joining the Deployment Infrastructure team can apply here.

Our Spark & AI Summit talk is now available here.

The Rubix blog series

Authors

Greg D., Onur S., Robert F., Will M.

--

--