Skip to content

Conversation

@VaniHaripriya
Copy link
Contributor

Description of your changes:

Resolves #12352 .

This PR adds download_to_workspace option to dsl.importer to download artifacts into the pipeline workspace and consume them downstream without re-downloading.

Checklist:

@google-oss-prow
Copy link

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch 4 times, most recently from be5f8f5 to 9866919 Compare October 13, 2025 21:31
@VaniHaripriya VaniHaripriya marked this pull request as ready for review October 14, 2025 05:15
@google-oss-prow google-oss-prow bot requested a review from mprahl October 14, 2025 05:15
@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch from 9866919 to 60212b1 Compare October 14, 2025 15:05
return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err)
}
defer bucket.Close()
if err := objectstore.DownloadBlob(ctx, bucket, workspaceRoot, blobKey); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the workspace paths are quite correct. This is more in line with the design of having the same path but just under /kfp-workspace/.artifacts:

diff --git a/backend/src/v2/component/importer_launcher.go b/backend/src/v2/component/importer_launcher.go
index 7c11f9fbd..78e8a6d03 100644
--- a/backend/src/v2/component/importer_launcher.go
+++ b/backend/src/v2/component/importer_launcher.go
@@ -4,8 +4,6 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
-	"os"
-	"path/filepath"
 	"strings"
 
 	"github.com/kubeflow/pipelines/backend/src/common/util"
@@ -310,20 +308,22 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact
 				bucketConfig.SessionInfo = &sess
 			}
 		}
+
+		localPath, err := LocalWorkspacePathForURI(artifactUri)
+		if err != nil {
+			return nil, fmt.Errorf("failed to get local path for uri %q: %w", artifactUri, err)
+		}
+
 		blobKey, err := bucketConfig.KeyFromURI(artifactUri)
 		if err != nil {
 			return nil, fmt.Errorf("failed to derive blob key from uri %q while downloading artifact into workspace: %w", artifactUri, err)
 		}
-		workspaceRoot := filepath.Join(WorkspaceMountPath, ".artifacts")
-		if err := os.MkdirAll(workspaceRoot, 0755); err != nil {
-			return nil, fmt.Errorf("failed to create workspace directory %q: %w", workspaceRoot, err)
-		}
 		bucket, err := objectstore.OpenBucket(ctx, l.k8sClient, l.launcherV2Options.Namespace, bucketConfig)
 		if err != nil {
 			return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err)
 		}
 		defer bucket.Close()
-		if err := objectstore.DownloadBlob(ctx, bucket, workspaceRoot, blobKey); err != nil {
+		if err := objectstore.DownloadBlob(ctx, bucket, localPath, blobKey); err != nil {
 			return nil, fmt.Errorf("failed to download artifact to workspace: %w", err)
 		}
 	}
diff --git a/backend/src/v2/component/launcher_v2.go b/backend/src/v2/component/launcher_v2.go
index 95caf375b..ac670ca2d 100644
--- a/backend/src/v2/component/launcher_v2.go
+++ b/backend/src/v2/component/launcher_v2.go
@@ -865,16 +865,13 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
 		// If the artifact is marked as already in the workspace, map the workspace path.
 		if inputArtifact.GetMetadata() != nil {
 			if v, ok := inputArtifact.GetMetadata().GetFields()["_kfp_workspace"]; ok && v.GetBoolValue() {
-				bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(inputArtifact.Uri)
-				if err == nil {
-					blobKey, err := bucketConfig.KeyFromURI(inputArtifact.Uri)
-					if err == nil {
-						localPath := filepath.Join(WorkspaceMountPath, ".artifacts", blobKey)
-						key = fmt.Sprintf(`{{$.inputs.artifacts['%s'].path}}`, name)
-						placeholders[key] = localPath
-						continue
-					}
+				localPath, err := LocalWorkspacePathForURI(inputArtifact.Uri)
+				if err != nil {
+					return nil, fmt.Errorf("failed to get local workspace path for input artifact %q: %w", name, err)
 				}
+				key = fmt.Sprintf(`{{$.inputs.artifacts['%s'].path}}`, name)
+				placeholders[key] = localPath
+				continue
 			}
 		}
 
diff --git a/sdk/python/kfp/dsl/types/artifact_types.py b/sdk/python/kfp/dsl/types/artifact_types.py
index ae737b483..39ae42adb 100644
--- a/sdk/python/kfp/dsl/types/artifact_types.py
+++ b/sdk/python/kfp/dsl/types/artifact_types.py
@@ -94,39 +94,26 @@ class Artifact:
         self._set_path(path)
 
     def _get_path(self) -> Optional[str]:
-        # If the artifact is already present in the pipeline workspace, map to the workspace path.
-        # This is indicated by backend setting metadata['_kfp_workspace'] = True.
-        if self.metadata.get('_kfp_workspace') is True:
-            uri = self.uri or ''
-            for prefix in (RemotePrefix.GCS.value, RemotePrefix.MINIO.value,
-                           RemotePrefix.S3.value):
-                if uri.startswith(prefix):
-                    # Derive the object key relative to the bucket:
-                    # "<bucket>/<key>" -> blob_key == "<key>"
-                    without_scheme = uri[len(prefix):]
-                    parts = without_scheme.split('/', 1)
-                    blob_key = parts[1] if len(parts) == 2 else ''
-                    if blob_key:
-                        return os.path.join(WORKSPACE_MOUNT_PATH, '.artifacts',
-                                            blob_key)
-
-                    return os.path.join(WORKSPACE_MOUNT_PATH, '.artifacts')
+        local_path = self.uri
 
         if self.uri.startswith(RemotePrefix.GCS.value):
-            return _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value
-                                                         ):]
-        if self.uri.startswith(RemotePrefix.MINIO.value):
-            return _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.MINIO
-                                                            .value):]
-        if self.uri.startswith(RemotePrefix.S3.value):
-            return _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3.value
-                                                        ):]
-        if self.uri.startswith(RemotePrefix.OCI.value):
+            local_path = _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value                                             ):]
+        elif self.uri.startswith(RemotePrefix.MINIO.value):
+            local_path = _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.MINIO                                                 .value):]
+        elif self.uri.startswith(RemotePrefix.S3.value):
+            local_path = _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3.value                                             ):]
+        elif self.uri.startswith(RemotePrefix.OCI.value):
             escaped_uri = self.uri[len(RemotePrefix.OCI.value):].replace(
                 '/', '_')
-            return _OCI_LOCAL_MOUNT_PREFIX + escaped_uri
+            local_path = _OCI_LOCAL_MOUNT_PREFIX + escaped_uri
+
+        # If the artifact is already present in the pipeline workspace, map to the workspace path.
+        # This is indicated by backend setting metadata['_kfp_workspace'] = True.
+        if self.metadata.get('_kfp_workspace') is True:
+            local_path = os.path.join(WORKSPACE_MOUNT_PATH, ".artifacts", local_path.lstrip("/"))
+
         # uri == path for local execution
-        return self.uri
+        return local_path
 
     def _set_path(self, path: str) -> None:
         self.uri = convert_local_path_to_remote_path(path)

}
// Add workspace volume only if the workflow defines a workspace PVC
hasWorkspacePVC := false
for _, pvc := range c.wf.Spec.VolumeClaimTemplates {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This always mounts the PVC for all importers if the pipeline uses a workspace. Let's make this more conditional to avoid unnecessary PVC mounting which can cause pod scheduling issues. Something like this:

diff --git a/backend/src/v2/compiler/argocompiler/dag.go b/backend/src/v2/compiler/argocompiler/dag.go
index 73e3efc8b..c57193222 100644
--- a/backend/src/v2/compiler/argocompiler/dag.go
+++ b/backend/src/v2/compiler/argocompiler/dag.go
@@ -326,7 +326,8 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
 				// it's impossible to add a when condition based on driver outputs.
 				return nil, fmt.Errorf("triggerPolicy.condition on importer task is not supported")
 			}
-			importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID)
+
+			importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID, e.Importer.GetDownloadToWorkspace())
 			if err != nil {
 				return nil, err
 			}
diff --git a/backend/src/v2/compiler/argocompiler/importer.go b/backend/src/v2/compiler/argocompiler/importer.go
index 5c509bf4e..096481ee6 100644
--- a/backend/src/v2/compiler/argocompiler/importer.go
+++ b/backend/src/v2/compiler/argocompiler/importer.go
@@ -32,7 +32,7 @@ func (c *workflowCompiler) Importer(name string, componentSpec *pipelinespec.Com
 	return c.saveComponentImpl(name, importer)
 }
 
-func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string) (*wfapi.DAGTask, error) {
+func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string, downloadToWorkspace bool) (*wfapi.DAGTask, error) {
 	componentPlaceholder, err := c.useComponentSpec(task.GetComponentRef().GetName())
 	if err != nil {
 		return nil, err
@@ -43,7 +43,7 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
 	}
 	return &wfapi.DAGTask{
 		Name:     name,
-		Template: c.addImporterTemplate(),
+		Template: c.addImporterTemplate(downloadToWorkspace),
 		Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{{
 			Name:  paramTask,
 			Value: wfapi.AnyStringPtr(taskJSON),
@@ -60,11 +60,16 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
 	}, nil
 }
 
-func (c *workflowCompiler) addImporterTemplate() string {
+func (c *workflowCompiler) addImporterTemplate(downloadToWorkspace bool) string {
 	name := "system-importer"
+	if downloadToWorkspace {
+		name += "-workspace"
+	}
+
 	if _, alreadyExists := c.templates[name]; alreadyExists {
 		return name
 	}
+
 	args := []string{
 		"--executor_type", "importer",
 		"--task_spec", inputValue(paramTask),
@@ -91,18 +96,10 @@ func (c *workflowCompiler) addImporterTemplate() string {
 	if value, ok := os.LookupEnv(PublishLogsEnvVar); ok {
 		args = append(args, "--publish_logs", value)
 	}
-	// Add workspace volume only if the workflow defines a workspace PVC
-	hasWorkspacePVC := false
-	for _, pvc := range c.wf.Spec.VolumeClaimTemplates {
-		if pvc.Name == workspaceVolumeName {
-			hasWorkspacePVC = true
-			break
-		}
-	}
 
 	var volumeMounts []k8score.VolumeMount
 	var volumes []k8score.Volume
-	if hasWorkspacePVC {
+	if downloadToWorkspace {
 		volumeMounts = append(volumeMounts, k8score.VolumeMount{
 			Name:      workspaceVolumeName,
 			MountPath: component.WorkspaceMountPath,

Comment on lines 299 to 312
bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(artifactUri)
if err != nil {
return nil, fmt.Errorf("failed to parse bucket config while downloading artifact into workspace with uri %q: %w", artifactUri, err)
}
// Resolve and attach session info from kfp-launcher config for the artifact provider
if cfg, cfgErr := config.FromConfigMap(ctx, l.k8sClient, l.launcherV2Options.Namespace); cfgErr != nil {
glog.Warningf("failed to load launcher config for workspace download: %v", cfgErr)
} else if cfg != nil {
if sess, sessErr := cfg.GetStoreSessionInfo(artifactUri); sessErr != nil {
glog.Warningf("failed to resolve store session info for %q: %v", artifactUri, sessErr)
} else {
bucketConfig.SessionInfo = &sess
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid duplicating this code and move the code from the Execute method to a helper function.

return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err)
}
defer bucket.Close()
if err := objectstore.DownloadBlob(ctx, bucket, workspaceRoot, blobKey); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also log a message indicating the artifact is being downloaded?

)
def pipeline_with_importer_workspace() -> str:
ds = importer(
artifact_uri='minio://mlpipeline/sample/sample.txt',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test case that downloads a directory as well.

@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch 4 times, most recently from c92e63f to 9eaea2c Compare October 24, 2025 21:23
@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch 4 times, most recently from 9c1df93 to 9f12059 Compare October 28, 2025 18:31
@mprahl
Copy link
Collaborator

mprahl commented Nov 1, 2025

The CI should be fixed by #12404

@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch 7 times, most recently from 58b0d6f to 538dfaa Compare November 14, 2025 17:48
try:
host_path = _copy_local_artifact_to_workspace(
source_path, workspace_root, component_name)
except PermissionError:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran it locally with this small diff and I didn't encounter any permission errors:

diff --git a/sdk/python/test/local_execution/local_execution_test.py b/sdk/python/test/local_execution/local_execution_test.py
index bf43a8c64..2487d7279 100644
--- a/sdk/python/test/local_execution/local_execution_test.py
+++ b/sdk/python/test/local_execution/local_execution_test.py
@@ -234,7 +234,7 @@ class TestDockerRunner:
         Path(ws_root).mkdir(exist_ok=True)
         Path(pipeline_root).mkdir(exist_ok=True)
         local.init(
-            runner=local.DockerRunner(),
+            runner=local.DockerRunner(user=f'{os.getuid()}'),
             raise_on_error=True,
             workspace_root=ws_root,
             pipeline_root=pipeline_root)

I think let's apply that diff and then rather than fallback, we just provide the user with a good error message indicating that there maybe a mismatch between the host user and container and they can either explicitly set the Docker container UID like above or provider more permissive permissions on the host folder. I don't want the behavior to be inconsistent for how workspace artifacts are downloaded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mprahl !! this looks better, updated it.

@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch 3 times, most recently from de9ae88 to 137345d Compare November 18, 2025 03:11
try:
workspace_root = _get_workspace_root()
cfg = config.LocalExecutionConfig.instance
used_base = 'workspace'
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need the used_base variable anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree..removed the flag and updated the code.

@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch 4 times, most recently from 2b101a8 to e476956 Compare November 19, 2025 02:42
pipeline_root = f'{pipeline_root_base}_docker'
if os.path.isdir(ws_root):
shutil.rmtree(ws_root, ignore_errors=True)
Path(ws_root).mkdir(parents=True, exist_ok=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is still handled in setup_and_teardown so this might be duplicating.

shutil.rmtree(ws_root, ignore_errors=True)
Path(ws_root).mkdir(parents=True, exist_ok=True)
local.init(
runner=local.DockerRunner(user=f'{os.getuid()}'),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we enhance TestData to accept a new optional arg of docker_runner_args to make this reusable?

@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch 2 times, most recently from 01dc425 to ed47c02 Compare November 20, 2025 16:24
@VaniHaripriya VaniHaripriya force-pushed the dsl-importer-download-to-workspace branch from ed47c02 to 526449b Compare November 20, 2025 17:28
@VaniHaripriya
Copy link
Contributor Author

/retest

Copy link
Collaborator

@mprahl mprahl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/approve
/lgtm

@google-oss-prow
Copy link

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: mprahl

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@google-oss-prow google-oss-prow bot merged commit ad30461 into kubeflow:master Nov 20, 2025
224 of 264 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[feature] Add download_to_workspace option to dsl.importer

4 participants