Skip to content

Commit 526449b

Browse files
committed
feat(backend/sdk):Add download_to_workspace option to dsl.importer
Signed-off-by: VaniHaripriya <[email protected]>
1 parent 4af58d4 commit 526449b

File tree

23 files changed

+1830
-54
lines changed

23 files changed

+1830
-54
lines changed

api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Lines changed: 21 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/v2alpha1/pipeline_spec.proto

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -868,11 +868,14 @@ message PipelineDeploymentConfig {
868868
map<string, ValueOrRuntimeParameter> custom_properties = 4
869869
[deprecated = true];
870870

871+
// Whether or not import an artifact regardless it has been imported before.
872+
bool reimport = 5;
873+
871874
// Properties of the Artifact.
872875
google.protobuf.Struct metadata = 6;
873876

874-
// Whether or not import an artifact regardless it has been imported before.
875-
bool reimport = 5;
877+
// If true, download artifact into the pipeline workspace.
878+
bool download_to_workspace = 7;
876879
}
877880

878881
// ResolverSpec resolves artifacts from historical metadata and returns them

backend/src/v2/compiler/argocompiler/dag.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
328328
// it's impossible to add a when condition based on driver outputs.
329329
return nil, fmt.Errorf("triggerPolicy.condition on importer task is not supported")
330330
}
331-
importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID)
331+
importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID, e.Importer.GetDownloadToWorkspace())
332332
if err != nil {
333333
return nil, err
334334
}

backend/src/v2/compiler/argocompiler/importer.go

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (c *workflowCompiler) Importer(name string, componentSpec *pipelinespec.Com
3434
return c.saveComponentImpl(name, importer)
3535
}
3636

37-
func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string) (*wfapi.DAGTask, error) {
37+
func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string, downloadToWorkspace bool) (*wfapi.DAGTask, error) {
3838
componentPlaceholder, err := c.useComponentSpec(task.GetComponentRef().GetName())
3939
if err != nil {
4040
return nil, err
@@ -45,7 +45,7 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
4545
}
4646
return &wfapi.DAGTask{
4747
Name: name,
48-
Template: c.addImporterTemplate(),
48+
Template: c.addImporterTemplate(downloadToWorkspace),
4949
Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{{
5050
Name: paramTask,
5151
Value: wfapi.AnyStringPtr(taskJSON),
@@ -62,8 +62,11 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
6262
}, nil
6363
}
6464

65-
func (c *workflowCompiler) addImporterTemplate() string {
65+
func (c *workflowCompiler) addImporterTemplate(downloadToWorkspace bool) string {
6666
name := "system-importer"
67+
if downloadToWorkspace {
68+
name += "-workspace"
69+
}
6770
if _, alreadyExists := c.templates[name]; alreadyExists {
6871
return name
6972
}
@@ -104,6 +107,24 @@ func (c *workflowCompiler) addImporterTemplate() string {
104107
if value, ok := os.LookupEnv(PublishLogsEnvVar); ok {
105108
args = append(args, "--publish_logs", value)
106109
}
110+
111+
var volumeMounts []k8score.VolumeMount
112+
var volumes []k8score.Volume
113+
if downloadToWorkspace {
114+
volumeMounts = append(volumeMounts, k8score.VolumeMount{
115+
Name: workspaceVolumeName,
116+
MountPath: component.WorkspaceMountPath,
117+
})
118+
volumes = append(volumes, k8score.Volume{
119+
Name: workspaceVolumeName,
120+
VolumeSource: k8score.VolumeSource{
121+
PersistentVolumeClaim: &k8score.PersistentVolumeClaimVolumeSource{
122+
ClaimName: fmt.Sprintf("{{workflow.name}}-%s", workspaceVolumeName),
123+
},
124+
},
125+
})
126+
}
127+
107128
importerTemplate := &wfapi.Template{
108129
Name: name,
109130
Inputs: wfapi.Inputs{
@@ -115,13 +136,15 @@ func (c *workflowCompiler) addImporterTemplate() string {
115136
},
116137
},
117138
Container: &k8score.Container{
118-
Image: c.launcherImage,
119-
Command: c.launcherCommand,
120-
Args: args,
121-
EnvFrom: []k8score.EnvFromSource{metadataEnvFrom},
122-
Env: commonEnvs,
123-
Resources: driverResources,
139+
Image: c.launcherImage,
140+
Command: c.launcherCommand,
141+
Args: args,
142+
EnvFrom: []k8score.EnvFromSource{metadataEnvFrom},
143+
Env: commonEnvs,
144+
Resources: driverResources,
145+
VolumeMounts: volumeMounts,
124146
},
147+
Volumes: volumes,
125148
}
126149

127150
// If TLS is enabled (apiserver or metadata), add the custom CA bundle to the importer template.

backend/src/v2/component/importer_launcher.go

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/kubeflow/pipelines/backend/src/common/util"
1010

11+
"github.com/kubeflow/pipelines/backend/src/v2/config"
1112
"github.com/kubeflow/pipelines/backend/src/v2/objectstore"
1213

1314
pb "github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata"
@@ -125,12 +126,18 @@ func (l *ImportLauncher) Execute(ctx context.Context) (err error) {
125126
if err != nil {
126127
return err
127128
}
129+
// Determine execution type
130+
executionType := metadata.ExecutionType(metadata.ImporterExecutionTypeName)
131+
if l.importer.GetDownloadToWorkspace() {
132+
executionType = metadata.ExecutionType(metadata.ImporterWorkspaceExecutionTypeName)
133+
}
134+
128135
ecfg := &metadata.ExecutionConfig{
129136
TaskName: l.task.GetTaskInfo().GetName(),
130137
PodName: l.launcherV2Options.PodName,
131138
PodUID: l.launcherV2Options.PodUID,
132139
Namespace: l.launcherV2Options.Namespace,
133-
ExecutionType: metadata.ImporterExecutionTypeName,
140+
ExecutionType: executionType,
134141
ParentDagID: l.importerLauncherOptions.ParentDagID,
135142
}
136143
createdExecution, err := l.metadataClient.CreateExecution(ctx, pipeline, ecfg)
@@ -253,15 +260,17 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact
253260
}
254261

255262
if strings.HasPrefix(artifactUri, "oci://") {
263+
// OCI artifacts are not supported when workspace is used
264+
if l.importer.GetDownloadToWorkspace() {
265+
return nil, fmt.Errorf("importer workspace download does not support OCI registries")
266+
}
256267
artifactType, err := metadata.SchemaToArtifactType(schema)
257268
if err != nil {
258269
return nil, fmt.Errorf("converting schema to artifact type failed: %w", err)
259270
}
260-
261271
if *artifactType.Name != "system.Model" {
262272
return nil, fmt.Errorf("the %s artifact type does not support OCI registries", *artifactType.Name)
263273
}
264-
265274
return artifact, nil
266275
}
267276

@@ -283,9 +292,54 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact
283292
}
284293
storeSessionInfoStr := string(storeSessionInfoJSON)
285294
artifact.CustomProperties["store_session_info"] = metadata.StringValue(storeSessionInfoStr)
295+
296+
// Download the artifact into the workspace
297+
if l.importer.GetDownloadToWorkspace() {
298+
bucketConfig, err := l.resolveBucketConfigForURI(ctx, artifactUri)
299+
if err != nil {
300+
return nil, err
301+
}
302+
localPath, err := LocalWorkspacePathForURI(artifactUri)
303+
if err != nil {
304+
return nil, fmt.Errorf("failed to get local path for uri %q: %w", artifactUri, err)
305+
}
306+
blobKey, err := bucketConfig.KeyFromURI(artifactUri)
307+
if err != nil {
308+
return nil, fmt.Errorf("failed to derive blob key from uri %q while downloading artifact into workspace: %w", artifactUri, err)
309+
}
310+
bucket, err := objectstore.OpenBucket(ctx, l.k8sClient, l.launcherV2Options.Namespace, bucketConfig)
311+
if err != nil {
312+
return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err)
313+
}
314+
defer bucket.Close()
315+
glog.Infof("Downloading artifact %q (blob key %q) to workspace path %q", artifactUri, blobKey, localPath)
316+
if err := objectstore.DownloadBlob(ctx, bucket, localPath, blobKey); err != nil {
317+
return nil, fmt.Errorf("failed to download artifact to workspace: %w", err)
318+
}
319+
}
286320
return artifact, nil
287321
}
288322

323+
// resolveBucketConfigForURI parses bucket configuration for a given artifact URI and
324+
// attaches session information from the kfp-launcher ConfigMap when available.
325+
func (l *ImportLauncher) resolveBucketConfigForURI(ctx context.Context, uri string) (*objectstore.Config, error) {
326+
bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(uri)
327+
if err != nil {
328+
return nil, fmt.Errorf("failed to parse bucket config while resolving uri %q: %w", uri, err)
329+
}
330+
// Resolve and attach session info from kfp-launcher config for the artifact provider
331+
if cfg, err := config.FromConfigMap(ctx, l.k8sClient, l.launcherV2Options.Namespace); err != nil {
332+
glog.Warningf("failed to load launcher config while resolving bucket config: %v", err)
333+
} else if cfg != nil {
334+
if sess, err := cfg.GetStoreSessionInfo(uri); err != nil {
335+
glog.Warningf("failed to resolve store session info for %q: %v", uri, err)
336+
} else {
337+
bucketConfig.SessionInfo = &sess
338+
}
339+
}
340+
return bucketConfig, nil
341+
}
342+
289343
func (l *ImportLauncher) getOutPutArtifactName() (string, error) {
290344
outPutNames := make([]string, 0, len(l.component.GetOutputDefinitions().GetArtifacts()))
291345
for name := range l.component.GetOutputDefinitions().GetArtifacts() {

backend/src/v2/component/launcher_v2.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -715,6 +715,12 @@ func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.Executor
715715
for _, artifact := range artifactList.Artifacts {
716716
// Iterating through the artifact list allows for collected artifacts to be properly consumed.
717717
inputArtifact := artifact
718+
// Skip downloading if the artifact is flagged as already present in the workspace
719+
if inputArtifact.GetMetadata() != nil {
720+
if v, ok := inputArtifact.GetMetadata().GetFields()["_kfp_workspace"]; ok && v.GetBoolValue() {
721+
continue
722+
}
723+
}
718724
localPath, err := LocalPathForURI(inputArtifact.Uri)
719725
if err != nil {
720726
glog.Warningf("Input Artifact %q does not have a recognized storage URI %q. Skipping downloading to local path.", name, inputArtifact.Uri)
@@ -858,6 +864,20 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
858864
key := fmt.Sprintf(`{{$.inputs.artifacts['%s'].uri}}`, name)
859865
placeholders[key] = inputArtifact.Uri
860866

867+
// If the artifact is marked as already in the workspace, map to the workspace path
868+
// with the same shape as LocalPathForURI, but rebased under the workspace mount.
869+
if inputArtifact.GetMetadata() != nil {
870+
if v, ok := inputArtifact.GetMetadata().GetFields()["_kfp_workspace"]; ok && v.GetBoolValue() {
871+
localPath, lerr := LocalWorkspacePathForURI(inputArtifact.Uri)
872+
if lerr != nil {
873+
return nil, fmt.Errorf("failed to get local workspace path for input artifact %q: %w", name, lerr)
874+
}
875+
key = fmt.Sprintf(`{{$.inputs.artifacts['%s'].path}}`, name)
876+
placeholders[key] = localPath
877+
continue
878+
}
879+
}
880+
861881
localPath, err := LocalPathForURI(inputArtifact.Uri)
862882
if err != nil {
863883
// Input Artifact does not have a recognized storage URI
@@ -1010,6 +1030,21 @@ func retrieveArtifactPath(artifact *pipelinespec.RuntimeArtifact) (string, error
10101030
}
10111031
}
10121032

1033+
// LocalWorkspacePathForURI returns the local workspace path for a given artifact URI.
1034+
// It preserves the same path shape as LocalPathForURI, but rebases it under the
1035+
// workspace artifacts directory: /kfp-workspace/.artifacts/...
1036+
func LocalWorkspacePathForURI(uri string) (string, error) {
1037+
if strings.HasPrefix(uri, "oci://") {
1038+
return "", fmt.Errorf("failed to generate workspace path for URI %s: OCI not supported for workspace artifacts", uri)
1039+
}
1040+
localPath, err := LocalPathForURI(uri)
1041+
if err != nil {
1042+
return "", err
1043+
}
1044+
// Rebase under the workspace mount, stripping the leading '/'
1045+
return filepath.Join(WorkspaceMountPath, ".artifacts", strings.TrimPrefix(localPath, "/")), nil
1046+
}
1047+
10131048
func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
10141049
for name, parameter := range executorInput.GetOutputs().GetParameters() {
10151050
dir := filepath.Dir(parameter.OutputFile)

backend/src/v2/component/launcher_v2_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"errors"
2121
"io"
2222
"os"
23+
"path/filepath"
2324
"testing"
2425

2526
"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
@@ -176,6 +177,29 @@ func Test_executeV2_publishLogs(t *testing.T) {
176177
}
177178
}
178179

180+
func Test_getPlaceholders_WorkspaceArtifactPath(t *testing.T) {
181+
execIn := &pipelinespec.ExecutorInput{
182+
Inputs: &pipelinespec.ExecutorInput_Inputs{
183+
Artifacts: map[string]*pipelinespec.ArtifactList{
184+
"data": {
185+
Artifacts: []*pipelinespec.RuntimeArtifact{
186+
{Uri: "minio://mlpipeline/sample/sample.txt", Metadata: &structpb.Struct{Fields: map[string]*structpb.Value{"_kfp_workspace": structpb.NewBoolValue(true)}}},
187+
},
188+
},
189+
},
190+
},
191+
}
192+
ph, err := getPlaceholders(execIn)
193+
if err != nil {
194+
t.Fatalf("getPlaceholders error: %v", err)
195+
}
196+
actual := ph["{{$.inputs.artifacts['data'].path}}"]
197+
expected := filepath.Join(WorkspaceMountPath, ".artifacts", "minio", "mlpipeline", "sample", "sample.txt")
198+
if actual != expected {
199+
t.Fatalf("placeholder path mismatch: actual=%q expected=%q", actual, expected)
200+
}
201+
}
202+
179203
func Test_executorInput_compileCmdAndArgs(t *testing.T) {
180204
executorInputJSON := `{
181205
"inputs": {

backend/src/v2/driver/driver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ func needsWorkspaceMount(executorInput *pipelinespec.ExecutorInput) bool {
461461
return true
462462
}
463463

464-
if strings.HasPrefix(strVal.StringValue, component.WorkspaceMountPath) {
464+
if strings.Contains(strVal.StringValue, component.WorkspaceMountPath) {
465465
return true
466466
}
467467
}

0 commit comments

Comments
 (0)