-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat(backend/sdk):Add download_to_workspace option to dsl.importer #12353
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(backend/sdk):Add download_to_workspace option to dsl.importer #12353
Conversation
|
Skipping CI for Draft Pull Request. |
be5f8f5 to
9866919
Compare
9866919 to
60212b1
Compare
| return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err) | ||
| } | ||
| defer bucket.Close() | ||
| if err := objectstore.DownloadBlob(ctx, bucket, workspaceRoot, blobKey); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the workspace paths are quite correct. This is more in line with the design of having the same path but just under /kfp-workspace/.artifacts:
diff --git a/backend/src/v2/component/importer_launcher.go b/backend/src/v2/component/importer_launcher.go
index 7c11f9fbd..78e8a6d03 100644
--- a/backend/src/v2/component/importer_launcher.go
+++ b/backend/src/v2/component/importer_launcher.go
@@ -4,8 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
- "os"
- "path/filepath"
"strings"
"github.com/kubeflow/pipelines/backend/src/common/util"
@@ -310,20 +308,22 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact
bucketConfig.SessionInfo = &sess
}
}
+
+ localPath, err := LocalWorkspacePathForURI(artifactUri)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get local path for uri %q: %w", artifactUri, err)
+ }
+
blobKey, err := bucketConfig.KeyFromURI(artifactUri)
if err != nil {
return nil, fmt.Errorf("failed to derive blob key from uri %q while downloading artifact into workspace: %w", artifactUri, err)
}
- workspaceRoot := filepath.Join(WorkspaceMountPath, ".artifacts")
- if err := os.MkdirAll(workspaceRoot, 0755); err != nil {
- return nil, fmt.Errorf("failed to create workspace directory %q: %w", workspaceRoot, err)
- }
bucket, err := objectstore.OpenBucket(ctx, l.k8sClient, l.launcherV2Options.Namespace, bucketConfig)
if err != nil {
return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err)
}
defer bucket.Close()
- if err := objectstore.DownloadBlob(ctx, bucket, workspaceRoot, blobKey); err != nil {
+ if err := objectstore.DownloadBlob(ctx, bucket, localPath, blobKey); err != nil {
return nil, fmt.Errorf("failed to download artifact to workspace: %w", err)
}
}
diff --git a/backend/src/v2/component/launcher_v2.go b/backend/src/v2/component/launcher_v2.go
index 95caf375b..ac670ca2d 100644
--- a/backend/src/v2/component/launcher_v2.go
+++ b/backend/src/v2/component/launcher_v2.go
@@ -865,16 +865,13 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
// If the artifact is marked as already in the workspace, map the workspace path.
if inputArtifact.GetMetadata() != nil {
if v, ok := inputArtifact.GetMetadata().GetFields()["_kfp_workspace"]; ok && v.GetBoolValue() {
- bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(inputArtifact.Uri)
- if err == nil {
- blobKey, err := bucketConfig.KeyFromURI(inputArtifact.Uri)
- if err == nil {
- localPath := filepath.Join(WorkspaceMountPath, ".artifacts", blobKey)
- key = fmt.Sprintf(`{{$.inputs.artifacts['%s'].path}}`, name)
- placeholders[key] = localPath
- continue
- }
+ localPath, err := LocalWorkspacePathForURI(inputArtifact.Uri)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get local workspace path for input artifact %q: %w", name, err)
}
+ key = fmt.Sprintf(`{{$.inputs.artifacts['%s'].path}}`, name)
+ placeholders[key] = localPath
+ continue
}
}
diff --git a/sdk/python/kfp/dsl/types/artifact_types.py b/sdk/python/kfp/dsl/types/artifact_types.py
index ae737b483..39ae42adb 100644
--- a/sdk/python/kfp/dsl/types/artifact_types.py
+++ b/sdk/python/kfp/dsl/types/artifact_types.py
@@ -94,39 +94,26 @@ class Artifact:
self._set_path(path)
def _get_path(self) -> Optional[str]:
- # If the artifact is already present in the pipeline workspace, map to the workspace path.
- # This is indicated by backend setting metadata['_kfp_workspace'] = True.
- if self.metadata.get('_kfp_workspace') is True:
- uri = self.uri or ''
- for prefix in (RemotePrefix.GCS.value, RemotePrefix.MINIO.value,
- RemotePrefix.S3.value):
- if uri.startswith(prefix):
- # Derive the object key relative to the bucket:
- # "<bucket>/<key>" -> blob_key == "<key>"
- without_scheme = uri[len(prefix):]
- parts = without_scheme.split('/', 1)
- blob_key = parts[1] if len(parts) == 2 else ''
- if blob_key:
- return os.path.join(WORKSPACE_MOUNT_PATH, '.artifacts',
- blob_key)
-
- return os.path.join(WORKSPACE_MOUNT_PATH, '.artifacts')
+ local_path = self.uri
if self.uri.startswith(RemotePrefix.GCS.value):
- return _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value
- ):]
- if self.uri.startswith(RemotePrefix.MINIO.value):
- return _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.MINIO
- .value):]
- if self.uri.startswith(RemotePrefix.S3.value):
- return _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3.value
- ):]
- if self.uri.startswith(RemotePrefix.OCI.value):
+ local_path = _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value ):]
+ elif self.uri.startswith(RemotePrefix.MINIO.value):
+ local_path = _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.MINIO .value):]
+ elif self.uri.startswith(RemotePrefix.S3.value):
+ local_path = _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3.value ):]
+ elif self.uri.startswith(RemotePrefix.OCI.value):
escaped_uri = self.uri[len(RemotePrefix.OCI.value):].replace(
'/', '_')
- return _OCI_LOCAL_MOUNT_PREFIX + escaped_uri
+ local_path = _OCI_LOCAL_MOUNT_PREFIX + escaped_uri
+
+ # If the artifact is already present in the pipeline workspace, map to the workspace path.
+ # This is indicated by backend setting metadata['_kfp_workspace'] = True.
+ if self.metadata.get('_kfp_workspace') is True:
+ local_path = os.path.join(WORKSPACE_MOUNT_PATH, ".artifacts", local_path.lstrip("/"))
+
# uri == path for local execution
- return self.uri
+ return local_path
def _set_path(self, path: str) -> None:
self.uri = convert_local_path_to_remote_path(path)| } | ||
| // Add workspace volume only if the workflow defines a workspace PVC | ||
| hasWorkspacePVC := false | ||
| for _, pvc := range c.wf.Spec.VolumeClaimTemplates { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This always mounts the PVC for all importers if the pipeline uses a workspace. Let's make this more conditional to avoid unnecessary PVC mounting which can cause pod scheduling issues. Something like this:
diff --git a/backend/src/v2/compiler/argocompiler/dag.go b/backend/src/v2/compiler/argocompiler/dag.go
index 73e3efc8b..c57193222 100644
--- a/backend/src/v2/compiler/argocompiler/dag.go
+++ b/backend/src/v2/compiler/argocompiler/dag.go
@@ -326,7 +326,8 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
// it's impossible to add a when condition based on driver outputs.
return nil, fmt.Errorf("triggerPolicy.condition on importer task is not supported")
}
- importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID)
+
+ importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID, e.Importer.GetDownloadToWorkspace())
if err != nil {
return nil, err
}
diff --git a/backend/src/v2/compiler/argocompiler/importer.go b/backend/src/v2/compiler/argocompiler/importer.go
index 5c509bf4e..096481ee6 100644
--- a/backend/src/v2/compiler/argocompiler/importer.go
+++ b/backend/src/v2/compiler/argocompiler/importer.go
@@ -32,7 +32,7 @@ func (c *workflowCompiler) Importer(name string, componentSpec *pipelinespec.Com
return c.saveComponentImpl(name, importer)
}
-func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string) (*wfapi.DAGTask, error) {
+func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string, downloadToWorkspace bool) (*wfapi.DAGTask, error) {
componentPlaceholder, err := c.useComponentSpec(task.GetComponentRef().GetName())
if err != nil {
return nil, err
@@ -43,7 +43,7 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
}
return &wfapi.DAGTask{
Name: name,
- Template: c.addImporterTemplate(),
+ Template: c.addImporterTemplate(downloadToWorkspace),
Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{{
Name: paramTask,
Value: wfapi.AnyStringPtr(taskJSON),
@@ -60,11 +60,16 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
}, nil
}
-func (c *workflowCompiler) addImporterTemplate() string {
+func (c *workflowCompiler) addImporterTemplate(downloadToWorkspace bool) string {
name := "system-importer"
+ if downloadToWorkspace {
+ name += "-workspace"
+ }
+
if _, alreadyExists := c.templates[name]; alreadyExists {
return name
}
+
args := []string{
"--executor_type", "importer",
"--task_spec", inputValue(paramTask),
@@ -91,18 +96,10 @@ func (c *workflowCompiler) addImporterTemplate() string {
if value, ok := os.LookupEnv(PublishLogsEnvVar); ok {
args = append(args, "--publish_logs", value)
}
- // Add workspace volume only if the workflow defines a workspace PVC
- hasWorkspacePVC := false
- for _, pvc := range c.wf.Spec.VolumeClaimTemplates {
- if pvc.Name == workspaceVolumeName {
- hasWorkspacePVC = true
- break
- }
- }
var volumeMounts []k8score.VolumeMount
var volumes []k8score.Volume
- if hasWorkspacePVC {
+ if downloadToWorkspace {
volumeMounts = append(volumeMounts, k8score.VolumeMount{
Name: workspaceVolumeName,
MountPath: component.WorkspaceMountPath,| bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(artifactUri) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to parse bucket config while downloading artifact into workspace with uri %q: %w", artifactUri, err) | ||
| } | ||
| // Resolve and attach session info from kfp-launcher config for the artifact provider | ||
| if cfg, cfgErr := config.FromConfigMap(ctx, l.k8sClient, l.launcherV2Options.Namespace); cfgErr != nil { | ||
| glog.Warningf("failed to load launcher config for workspace download: %v", cfgErr) | ||
| } else if cfg != nil { | ||
| if sess, sessErr := cfg.GetStoreSessionInfo(artifactUri); sessErr != nil { | ||
| glog.Warningf("failed to resolve store session info for %q: %v", artifactUri, sessErr) | ||
| } else { | ||
| bucketConfig.SessionInfo = &sess | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid duplicating this code and move the code from the Execute method to a helper function.
| return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err) | ||
| } | ||
| defer bucket.Close() | ||
| if err := objectstore.DownloadBlob(ctx, bucket, workspaceRoot, blobKey); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also log a message indicating the artifact is being downloaded?
| ) | ||
| def pipeline_with_importer_workspace() -> str: | ||
| ds = importer( | ||
| artifact_uri='minio://mlpipeline/sample/sample.txt', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a test case that downloads a directory as well.
c92e63f to
9eaea2c
Compare
9c1df93 to
9f12059
Compare
|
The CI should be fixed by #12404 |
58b0d6f to
538dfaa
Compare
| try: | ||
| host_path = _copy_local_artifact_to_workspace( | ||
| source_path, workspace_root, component_name) | ||
| except PermissionError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran it locally with this small diff and I didn't encounter any permission errors:
diff --git a/sdk/python/test/local_execution/local_execution_test.py b/sdk/python/test/local_execution/local_execution_test.py
index bf43a8c64..2487d7279 100644
--- a/sdk/python/test/local_execution/local_execution_test.py
+++ b/sdk/python/test/local_execution/local_execution_test.py
@@ -234,7 +234,7 @@ class TestDockerRunner:
Path(ws_root).mkdir(exist_ok=True)
Path(pipeline_root).mkdir(exist_ok=True)
local.init(
- runner=local.DockerRunner(),
+ runner=local.DockerRunner(user=f'{os.getuid()}'),
raise_on_error=True,
workspace_root=ws_root,
pipeline_root=pipeline_root)I think let's apply that diff and then rather than fallback, we just provide the user with a good error message indicating that there maybe a mismatch between the host user and container and they can either explicitly set the Docker container UID like above or provider more permissive permissions on the host folder. I don't want the behavior to be inconsistent for how workspace artifacts are downloaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @mprahl !! this looks better, updated it.
de9ae88 to
137345d
Compare
| try: | ||
| workspace_root = _get_workspace_root() | ||
| cfg = config.LocalExecutionConfig.instance | ||
| used_base = 'workspace' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need the used_base variable anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree..removed the flag and updated the code.
2b101a8 to
e476956
Compare
| pipeline_root = f'{pipeline_root_base}_docker' | ||
| if os.path.isdir(ws_root): | ||
| shutil.rmtree(ws_root, ignore_errors=True) | ||
| Path(ws_root).mkdir(parents=True, exist_ok=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is still handled in setup_and_teardown so this might be duplicating.
| shutil.rmtree(ws_root, ignore_errors=True) | ||
| Path(ws_root).mkdir(parents=True, exist_ok=True) | ||
| local.init( | ||
| runner=local.DockerRunner(user=f'{os.getuid()}'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we enhance TestData to accept a new optional arg of docker_runner_args to make this reusable?
01dc425 to
ed47c02
Compare
Signed-off-by: VaniHaripriya <[email protected]>
ed47c02 to
526449b
Compare
|
/retest |
mprahl
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/approve
/lgtm
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: mprahl The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
ad30461
into
kubeflow:master
Description of your changes:
Resolves #12352 .
This PR adds download_to_workspace option to dsl.importer to download artifacts into the pipeline workspace and consume them downstream without re-downloading.
Checklist: