Skip to content

Commit c92e63f

Browse files
committed
updated workspace path logic
1 parent fb7b3ea commit c92e63f

File tree

5 files changed

+73
-71
lines changed

5 files changed

+73
-71
lines changed

backend/src/v2/compiler/argocompiler/dag.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
326326
// it's impossible to add a when condition based on driver outputs.
327327
return nil, fmt.Errorf("triggerPolicy.condition on importer task is not supported")
328328
}
329-
importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID)
329+
importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID, e.Importer.GetDownloadToWorkspace())
330330
if err != nil {
331331
return nil, err
332332
}

backend/src/v2/compiler/argocompiler/importer.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (c *workflowCompiler) Importer(name string, componentSpec *pipelinespec.Com
3333
return c.saveComponentImpl(name, importer)
3434
}
3535

36-
func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string) (*wfapi.DAGTask, error) {
36+
func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string, downloadToWorkspace bool) (*wfapi.DAGTask, error) {
3737
componentPlaceholder, err := c.useComponentSpec(task.GetComponentRef().GetName())
3838
if err != nil {
3939
return nil, err
@@ -44,7 +44,7 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
4444
}
4545
return &wfapi.DAGTask{
4646
Name: name,
47-
Template: c.addImporterTemplate(),
47+
Template: c.addImporterTemplate(downloadToWorkspace),
4848
Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{{
4949
Name: paramTask,
5050
Value: wfapi.AnyStringPtr(taskJSON),
@@ -61,8 +61,11 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
6161
}, nil
6262
}
6363

64-
func (c *workflowCompiler) addImporterTemplate() string {
64+
func (c *workflowCompiler) addImporterTemplate(downloadToWorkspace bool) string {
6565
name := "system-importer"
66+
if downloadToWorkspace {
67+
name += "-workspace"
68+
}
6669
if _, alreadyExists := c.templates[name]; alreadyExists {
6770
return name
6871
}
@@ -105,18 +108,10 @@ func (c *workflowCompiler) addImporterTemplate() string {
105108
if value, ok := os.LookupEnv(PublishLogsEnvVar); ok {
106109
args = append(args, "--publish_logs", value)
107110
}
108-
// Add workspace volume only if the workflow defines a workspace PVC
109-
hasWorkspacePVC := false
110-
for _, pvc := range c.wf.Spec.VolumeClaimTemplates {
111-
if pvc.Name == workspaceVolumeName {
112-
hasWorkspacePVC = true
113-
break
114-
}
115-
}
116111

117112
var volumeMounts []k8score.VolumeMount
118113
var volumes []k8score.Volume
119-
if hasWorkspacePVC {
114+
if downloadToWorkspace {
120115
volumeMounts = append(volumeMounts, k8score.VolumeMount{
121116
Name: workspaceVolumeName,
122117
MountPath: component.WorkspaceMountPath,

backend/src/v2/component/importer_launcher.go

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7-
"os"
8-
"path/filepath"
97
"strings"
108

119
"github.com/kubeflow/pipelines/backend/src/common/util"
@@ -296,40 +294,51 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact
296294

297295
// Download the artifact into the workspace
298296
if l.importer.GetDownloadToWorkspace() {
299-
bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(artifactUri)
297+
bucketConfig, err := l.resolveBucketConfigForURI(ctx, artifactUri)
300298
if err != nil {
301-
return nil, fmt.Errorf("failed to parse bucket config while downloading artifact into workspace with uri %q: %w", artifactUri, err)
299+
return nil, err
302300
}
303-
// Resolve and attach session info from kfp-launcher config for the artifact provider
304-
if cfg, cfgErr := config.FromConfigMap(ctx, l.k8sClient, l.launcherV2Options.Namespace); cfgErr != nil {
305-
glog.Warningf("failed to load launcher config for workspace download: %v", cfgErr)
306-
} else if cfg != nil {
307-
if sess, sessErr := cfg.GetStoreSessionInfo(artifactUri); sessErr != nil {
308-
glog.Warningf("failed to resolve store session info for %q: %v", artifactUri, sessErr)
309-
} else {
310-
bucketConfig.SessionInfo = &sess
311-
}
301+
localPath, err := LocalWorkspacePathForURI(artifactUri)
302+
if err != nil {
303+
return nil, fmt.Errorf("failed to get local path for uri %q: %w", artifactUri, err)
312304
}
313305
blobKey, err := bucketConfig.KeyFromURI(artifactUri)
314306
if err != nil {
315307
return nil, fmt.Errorf("failed to derive blob key from uri %q while downloading artifact into workspace: %w", artifactUri, err)
316308
}
317-
workspaceRoot := filepath.Join(WorkspaceMountPath, ".artifacts")
318-
if err := os.MkdirAll(workspaceRoot, 0755); err != nil {
319-
return nil, fmt.Errorf("failed to create workspace directory %q: %w", workspaceRoot, err)
320-
}
321309
bucket, err := objectstore.OpenBucket(ctx, l.k8sClient, l.launcherV2Options.Namespace, bucketConfig)
322310
if err != nil {
323311
return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err)
324312
}
325313
defer bucket.Close()
326-
if err := objectstore.DownloadBlob(ctx, bucket, workspaceRoot, blobKey); err != nil {
314+
glog.Infof("Downloading artifact %q (blob key %q) to workspace path %q", artifactUri, blobKey, localPath)
315+
if err := objectstore.DownloadBlob(ctx, bucket, localPath, blobKey); err != nil {
327316
return nil, fmt.Errorf("failed to download artifact to workspace: %w", err)
328317
}
329318
}
330319
return artifact, nil
331320
}
332321

322+
// resolveBucketConfigForURI parses bucket configuration for a given artifact URI and
323+
// attaches session information from the kfp-launcher ConfigMap when available.
324+
func (l *ImportLauncher) resolveBucketConfigForURI(ctx context.Context, uri string) (*objectstore.Config, error) {
325+
bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(uri)
326+
if err != nil {
327+
return nil, fmt.Errorf("failed to parse bucket config while resolving uri %q: %w", uri, err)
328+
}
329+
// Resolve and attach session info from kfp-launcher config for the artifact provider
330+
if cfg, cfgErr := config.FromConfigMap(ctx, l.k8sClient, l.launcherV2Options.Namespace); cfgErr != nil {
331+
glog.Warningf("failed to load launcher config while resolving bucket config: %v", cfgErr)
332+
} else if cfg != nil {
333+
if sess, sessErr := cfg.GetStoreSessionInfo(uri); sessErr != nil {
334+
glog.Warningf("failed to resolve store session info for %q: %v", uri, sessErr)
335+
} else {
336+
bucketConfig.SessionInfo = &sess
337+
}
338+
}
339+
return bucketConfig, nil
340+
}
341+
333342
func (l *ImportLauncher) getOutPutArtifactName() (string, error) {
334343
outPutNames := make([]string, 0, len(l.component.GetOutputDefinitions().GetArtifacts()))
335344
for name := range l.component.GetOutputDefinitions().GetArtifacts() {

backend/src/v2/component/launcher_v2.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -862,19 +862,17 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
862862
key := fmt.Sprintf(`{{$.inputs.artifacts['%s'].uri}}`, name)
863863
placeholders[key] = inputArtifact.Uri
864864

865-
// If the artifact is marked as already in the workspace, map the workspace path.
865+
// If the artifact is marked as already in the workspace, map to the workspace path
866+
// with the same shape as LocalPathForURI, but rebased under the workspace mount.
866867
if inputArtifact.GetMetadata() != nil {
867868
if v, ok := inputArtifact.GetMetadata().GetFields()["_kfp_workspace"]; ok && v.GetBoolValue() {
868-
bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(inputArtifact.Uri)
869-
if err == nil {
870-
blobKey, err := bucketConfig.KeyFromURI(inputArtifact.Uri)
871-
if err == nil {
872-
localPath := filepath.Join(WorkspaceMountPath, ".artifacts", blobKey)
873-
key = fmt.Sprintf(`{{$.inputs.artifacts['%s'].path}}`, name)
874-
placeholders[key] = localPath
875-
continue
876-
}
869+
localPath, lerr := LocalWorkspacePathForURI(inputArtifact.Uri)
870+
if lerr != nil {
871+
return nil, fmt.Errorf("failed to get local workspace path for input artifact %q: %w", name, lerr)
877872
}
873+
key = fmt.Sprintf(`{{$.inputs.artifacts['%s'].path}}`, name)
874+
placeholders[key] = localPath
875+
continue
878876
}
879877
}
880878

@@ -1016,6 +1014,21 @@ func LocalPathForURI(uri string) (string, error) {
10161014
return "", fmt.Errorf("failed to generate local path for URI %s: unsupported storage scheme", uri)
10171015
}
10181016

1017+
// LocalWorkspacePathForURI returns the local workspace path for a given artifact URI.
1018+
// It preserves the same path shape as LocalPathForURI, but rebases it under the
1019+
// workspace artifacts directory: /kfp-workspace/.artifacts/...
1020+
func LocalWorkspacePathForURI(uri string) (string, error) {
1021+
if strings.HasPrefix(uri, "oci://") {
1022+
return "", fmt.Errorf("failed to generate workspace path for URI %s: OCI not supported for workspace artifacts", uri)
1023+
}
1024+
localPath, err := LocalPathForURI(uri)
1025+
if err != nil {
1026+
return "", err
1027+
}
1028+
// Rebase under the workspace mount, stripping the leading '/'
1029+
return filepath.Join(WorkspaceMountPath, ".artifacts", strings.TrimPrefix(localPath, "/")), nil
1030+
}
1031+
10191032
func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
10201033
for name, parameter := range executorInput.GetOutputs().GetParameters() {
10211034
dir := filepath.Dir(parameter.OutputFile)

sdk/python/kfp/dsl/types/artifact_types.py

Lines changed: 15 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -94,39 +94,24 @@ def path(self, path: str) -> None:
9494
self._set_path(path)
9595

9696
def _get_path(self) -> Optional[str]:
97-
# If the artifact is already present in the pipeline workspace, map to the workspace path.
98-
# This is indicated by backend setting metadata['_kfp_workspace'] = True.
99-
if self.metadata.get('_kfp_workspace') is True:
100-
uri = self.uri or ''
101-
for prefix in (RemotePrefix.GCS.value, RemotePrefix.MINIO.value,
102-
RemotePrefix.S3.value):
103-
if uri.startswith(prefix):
104-
# Derive the object key relative to the bucket:
105-
# "<bucket>/<key>" -> blob_key == "<key>"
106-
without_scheme = uri[len(prefix):]
107-
parts = without_scheme.split('/', 1)
108-
blob_key = parts[1] if len(parts) == 2 else ''
109-
if blob_key:
110-
return os.path.join(WORKSPACE_MOUNT_PATH, '.artifacts',
111-
blob_key)
112-
113-
return os.path.join(WORKSPACE_MOUNT_PATH, '.artifacts')
114-
97+
local_path = self.uri
11598
if self.uri.startswith(RemotePrefix.GCS.value):
116-
return _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value
117-
):]
118-
if self.uri.startswith(RemotePrefix.MINIO.value):
119-
return _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.MINIO
120-
.value):]
121-
if self.uri.startswith(RemotePrefix.S3.value):
122-
return _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3.value
123-
):]
124-
if self.uri.startswith(RemotePrefix.OCI.value):
99+
local_path = _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value):]
100+
elif self.uri.startswith(RemotePrefix.MINIO.value):
101+
local_path = _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.MINIO.value):]
102+
elif self.uri.startswith(RemotePrefix.S3.value):
103+
local_path = _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3.value):]
104+
elif self.uri.startswith(RemotePrefix.OCI.value):
125105
escaped_uri = self.uri[len(RemotePrefix.OCI.value):].replace(
126106
'/', '_')
127-
return _OCI_LOCAL_MOUNT_PREFIX + escaped_uri
128-
# uri == path for local execution
129-
return self.uri
107+
local_path = _OCI_LOCAL_MOUNT_PREFIX + escaped_uri
108+
109+
# If the artifact is already present in the pipeline workspace, map to the workspace path.
110+
# This is indicated by backend setting metadata['_kfp_workspace'] = True.
111+
if self.metadata.get('_kfp_workspace') is True:
112+
local_path = os.path.join(WORKSPACE_MOUNT_PATH, ".artifacts", local_path.lstrip("/"))
113+
114+
return local_path
130115

131116
def _set_path(self, path: str) -> None:
132117
self.uri = convert_local_path_to_remote_path(path)

0 commit comments

Comments
 (0)