|
| 1 | +# Standalone driver |
| 2 | + |
| 3 | +## Summary |
| 4 | +In [Kubeflow Pipelines](https://www.kubeflow.org/docs/components/pipelines/interfaces/), each node in the pipeline graph is non-atomic and, at the Kubernetes level, consists of two components: a driver and an executor. Each of these runs in a separate Kubernetes pod. Additionally, every pipeline run spawns a root DAG driver pod. |
| 5 | + |
| 6 | +Here’s a simple diagram of the pods created during a KFP (Kubeflow Pipelines) run: |
| 7 | + |
| 8 | +*(User-defined pipeline with 2 tasks: Task 1 and Task 2)* |
| 9 | + |
| 10 | + |
| 11 | + |
| 12 | +This proposal explores approaches to replacing both the root DAG driver and all container drivers with a single standalone service, using Argo Workflows as the orchestration framework. |
| 13 | + |
| 14 | + |
| 15 | +## Motivation |
| 16 | +While using a separate pod for the executor makes sense - since it often handles heavy workloads and benefits from isolation and flexibility - the driver is a lightweight component. It typically performs just a few API calls: checking for cached results and creating an MLMD Execution. |
| 17 | + |
| 18 | +However, running the driver in a separate pod causes several issues: |
| 19 | + |
| 20 | +High overhead: Launching a Kubernetes pod merely to execute a few API calls introduces significant latency. Often, the pod scheduling and startup time outweighs the driver's actual processing time. |
| 21 | + |
| 22 | +Resource availability problems: There's no guarantee the Kubernetes cluster has sufficient resources to schedule the driver pod. If scheduling fails, the pipeline gets stuck. The UI currently doesn't show driver pod scheduling failures, which makes it hard to debug and understand what's going on. |
| 23 | + |
| 24 | +## Current state details |
| 25 | + |
| 26 | +Let's take a look at the copy of [hello_world.yaml](hello_world.yaml) generated by the argo compiler tests. |
| 27 | + |
| 28 | +**Execution Order:** |
| 29 | + |
| 30 | +1. **entrypoint** |
| 31 | + *Type:* DAG |
| 32 | + The root DAG represents the entire pipeline run. |
| 33 | + |
| 34 | +2. **root-driver** *(template: system-dag-driver)* |
| 35 | + *Type:* Container |
| 36 | + *Purpose:* Initializes the root DAG. Creates an MLMD execution for the DAG. |
| 37 | + **Outputs:** |
| 38 | + - `execution-id` – ID of the DAG execution (created during root-driver execution) |
| 39 | + |
| 40 | + **Tasks inside:** |
| 41 | + |
| 42 | + - **hello-world-driver** *(template: system-container-driver)* |
| 43 | + *Purpose:* Check for the existence of an execution in the cache. If it does not exist, prepare the MLMD execution of the hello-world container task, and generate the appropriate pod-spec-patch. |
| 44 | + **Outputs:** |
| 45 | + - `pod-spec-patch` – patch for the system-container-executor pod; inserts the correct image and command for the main container |
| 46 | + - `cached-decision` – if true, the next step will be skipped |
| 47 | + |
| 48 | + - **hello-world** *(template: system-container-executor)* |
| 49 | + *Depends on:* `hello-world-driver.Succeeded` |
| 50 | + *Purpose:* Executes the hello-world component |
| 51 | + **Inputs:** |
| 52 | + - `pod-spec-patch` — patch for the pod generated in the previous step |
| 53 | + - `cached-decision` — used as a skip condition |
| 54 | + |
| 55 | + The `system-container-executor` template defines the main container that runs the user-defined code. |
| 56 | + |
| 57 | + |
| 58 | +Overview of the Argo workflow node structure for the container-driver |
| 59 | +```yaml |
| 60 | + templates: |
| 61 | + - name: system-container-driver |
| 62 | + container: |
| 63 | + args: |
| 64 | + ... |
| 65 | + command: |
| 66 | + - driver |
| 67 | + image: ghcr.io/kubeflow/kfp-driver |
| 68 | + outputs: |
| 69 | + parameters: |
| 70 | + - name: pod-spec-patch |
| 71 | + valueFrom: |
| 72 | + path: /tmp/outputs/pod-spec-patch |
| 73 | + - name: cached-decision |
| 74 | + valueFrom: |
| 75 | + path: /tmp/outputs/cached-decision |
| 76 | +``` |
| 77 | +It creates a pod that launches the driver container using the kfp-driver image. |
| 78 | +
|
| 79 | +## Proposal |
| 80 | +
|
| 81 | +Instead of launching a new driver's pod using a container template, configure the system to send requests to an already running server. |
| 82 | +Something like this (showing both types of drivers): |
| 83 | +```yaml |
| 84 | + templates: |
| 85 | + - name: system-container-driver |
| 86 | + request: |
| 87 | + args: |
| 88 | + ... |
| 89 | + outputs: |
| 90 | + parameters: |
| 91 | + - name: pod-spec-patch |
| 92 | + jsonPath: $.pod_spec_patch |
| 93 | + - name: cached-decision |
| 94 | + jsonPath: $.cached_decision |
| 95 | +``` |
| 96 | +```yaml |
| 97 | + - name: system-dag-driver |
| 98 | + request: |
| 99 | + args: |
| 100 | + ... |
| 101 | + outputs: |
| 102 | + parameters: |
| 103 | + - name: execution-id |
| 104 | + valueFrom: |
| 105 | + jsonPath: $.execution-id |
| 106 | + - name: iteration-count |
| 107 | + valueFrom: |
| 108 | + default: "0" |
| 109 | + jsonPath: $.iteration-count |
| 110 | + - name: condition |
| 111 | + valueFrom: |
| 112 | + default: "true" |
| 113 | + jsonPath: $.condition |
| 114 | +``` |
| 115 | +
|
| 116 | +
|
| 117 | +### Requirements: |
| 118 | +- Execute a remote call with parameters |
| 119 | +- Read the response |
| 120 | +- Extract parameters from the response |
| 121 | +- Use the response in the next steps |
| 122 | +
|
| 123 | +*Extract parameters from the response — this is important to use in the Argo workflow itself, specifically cached-decision and pod-spec-patch. These parameters are used in when conditions and to patch the pod specification.* |
| 124 | +
|
| 125 | +### Argo workflow Features |
| 126 | +Two similar features in Argo Workflow can be considered to meet these requirements: |
| 127 | +- [Http Template](https://argo-workflows.readthedocs.io/en/latest/http-template) |
| 128 | +- [Executor Plugin](https://argo-workflows.readthedocs.io/en/latest/executor_plugins/) |
| 129 | +
|
| 130 | +Comparison: |
| 131 | +
|
| 132 | +| Feature | Supports Remote Call | Read the Response | Can Extract Parameters | Notes | |
| 133 | +|------------------|----------------------|-------------------|------------------------|------------------------------| |
| 134 | +| HTTP Template | ✅ | ✅ | ❌ | | |
| 135 | +| Executor Plugin | ✅ | ✅ | ✅ | Requires plugin installation | |
| 136 | +
|
| 137 | +The HTTP template [is not able](https://github.com/argoproj/argo-workflows/issues/13955) to extract parameters from the response and can only use the full response as-is. As a result, it cannot be used in podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' or when: '{{inputs.parameters.cached-decision}} != true' |
| 138 | +
|
| 139 | +There’s a trade-off between running a standalone driver service pod globally or single per workflow. This is a balance between better performance and avoiding a single point of failure. |
| 140 | +Currently, Argo [supports](https://github.com/argoproj/argo-workflows/issues/7891) only one driver pod per workflow option. Both options are based on the Agent pod, which is currently started per workflow — this is a limitation of the current [implementation](https://github.com/argoproj/argo-workflows/issues/7891). |
| 141 | +
|
| 142 | +### Implementation Based on the Executor Plugin |
| 143 | +
|
| 144 | +Instead of creating a driver pod for each task, we can reuse a single agent pod via a plugin template: |
| 145 | +[Agent pod](https://github.com/argoproj/argo-workflows/issues/5544) is a unit designed for extension. |
| 146 | +It can be extended by any server that implements the protocol. |
| 147 | +This server(plugin in Executor plugin terminology) runs as a sidecar alongside the agent pod. |
| 148 | +
|
| 149 | +Below is a scheme where, instead of creating a pod for the driver's task, we reuse the Argo Workflow Agent via a plugin |
| 150 | + |
| 151 | +
|
| 152 | +
|
| 153 | +To move from the container template to the Executor Plugin template: |
| 154 | +- patch the [Argo compiler](https://github.com/kubeflow/pipelines/tree/a870b1a325dae0c82c8b6f57941468ee1aea960b/backend/src/v2/compiler/argocompiler) to generate a plugin template instead of a container template. Sample: hello-world [adapted](hello_world_plugin.yaml) (see name: system-container-driver) |
| 155 | +- Namely, replace the templates used in the [container-driver](https://github.com/kubeflow/pipelines/blob/a870b1a325dae0c82c8b6f57941468ee1aea960b/backend/src/v2/compiler/argocompiler/container.go#L148) and [dag-driver](https://github.com/kubeflow/pipelines/blob/a870b1a325dae0c82c8b6f57941468ee1aea960b/backend/src/v2/compiler/argocompiler/dag.go#L156) section of the compiler |
| 156 | +- Extract the [driver](https://github.com/kubeflow/pipelines/tree/a870b1a325dae0c82c8b6f57941468ee1aea960b/backend/src/v2/driver) component into a standalone server. |
| 157 | +- Implement the [plugin](plugin.md) |
| 158 | +
|
| 159 | +The sample of the Argo Workflow system-container-driver template based on plugin. |
| 160 | +```yaml |
| 161 | + - name: system-container-driver |
| 162 | + inputs: |
| 163 | + parameters: |
| 164 | + - name: component |
| 165 | + - name: task |
| 166 | + - name: container |
| 167 | + - name: parent-dag-id |
| 168 | + - default: "-1" |
| 169 | + name: iteration-index |
| 170 | + - default: "" |
| 171 | + name: kubernetes-config |
| 172 | + metadata: {} |
| 173 | + # this is the key change that specifies use of the executor plugin |
| 174 | + plugin: |
| 175 | + driver-plugin: |
| 176 | + args: |
| 177 | + cached_decision_path: '{{outputs.parameters.cached-decision.path}}' |
| 178 | + component: '{{inputs.parameters.component}}' |
| 179 | + condition_path: '{{outputs.parameters.condition.path}}' |
| 180 | + container: '{{inputs.parameters.container}}' |
| 181 | + dag_execution_id: '{{inputs.parameters.parent-dag-id}}' |
| 182 | + iteration_index: '{{inputs.parameters.iteration-index}}' |
| 183 | + kubernetes_config: '{{inputs.parameters.kubernetes-config}}' |
| 184 | + pipeline_name: namespace/n1/pipeline/hello-world |
| 185 | + pod_spec_patch_path: '{{outputs.parameters.pod-spec-patch.path}}' |
| 186 | + run_id: '{{workflow.uid}}' |
| 187 | + task: '{{inputs.parameters.task}}' |
| 188 | + type: CONTAINER |
| 189 | + outputs: |
| 190 | + parameters: |
| 191 | + - name: pod-spec-patch |
| 192 | + valueFrom: |
| 193 | + default: "" |
| 194 | + jsonPath: $.pod-spec-patch |
| 195 | + - default: "false" |
| 196 | + name: cached-decision |
| 197 | + valueFrom: |
| 198 | + default: "false" |
| 199 | + jsonPath: $.cached-decision |
| 200 | + - name: condition |
| 201 | + valueFrom: |
| 202 | + default: "true" |
| 203 | + jsonPath: $.condition |
| 204 | +``` |
| 205 | +
|
| 206 | +## Test Plan |
| 207 | +[x] I/we understand the owners of the involved components may require updates to existing tests to make this code solid enough prior to committing the changes necessary to implement this enhancement. |
| 208 | +
|
| 209 | +Unit Tests |
| 210 | +Unit tests will primarily validate the compilation from KFP pipelines to Argo Workflow specs, while most other logic will be covered by integration tests. |
| 211 | +
|
| 212 | +Integration tests |
| 213 | +Add an additional E2E test to verify the behavior of the global driver server. |
| 214 | +
|
| 215 | +Additionally, it is nice to have end-to-end (E2E) tests to verify basic functionality. Existing tests should be reused if available. The E2E tests should cover at least the following scenarios: |
| 216 | +- A simple pipeline with a single component, waiting for successful completion of the run. |
| 217 | +- A pipeline with a chain of components passing inputs and outputs between them, waiting for successful completion of the run. |
| 218 | +- A pipeline designed to fail, waiting for the run to end with an error. |
| 219 | +- A pipeline which fails but has retries enabled(pipeline/ and component level), waiting for the run to complete successfully. |
| 220 | +
|
| 221 | +## Conclusion |
| 222 | +This proposal introduces an optimization for Kubeflow Pipelines (KFP) that replaces per-task driver pods with a lightweight standalone service based on Argo Workflows’ Executor Plugin mechanism. It significantly reduces pipeline task startup time by eliminating the overhead of scheduling a separate driver pod for each task — particularly beneficial for large pipelines with multiple steps and caching enabled. |
| 223 | +Instead of launching a new driver pod per task, the driver logic is offloaded to a shared agent pod that is scheduled per workflow, and completes once the workflow ends. This reduces latency in cache lookups and metadata initialization. |
| 224 | +However, this approach does not fully eliminate pod scheduling issues: the standalone driver is not a global service, but is instantiated per workflow. Thus, a pod still needs to be scheduled for each workflow run. |
| 225 | +
|
| 226 | +## Disadvantages: |
| 227 | +A key limitation of this implementation is that it currently supports only the Argo Workflows backend. The Executor plugin also adds some extra complexity to maintenance and deployment. |
| 228 | +
|
| 229 | +## Open Questions: |
| 230 | +- Do we need a fallback mechanism to the per-task driver pods in case the Executor Plugin is not available in some installations? Should KFP continue supporting both execution flows (plugin-based and pod-based drivers) for compatibility? |
| 231 | +
|
| 232 | +## Follow-ups |
| 233 | +- Implement a global agent pod. The community is [open](https://github.com/argoproj/argo-workflows/issues/7891) to it. |
0 commit comments