diff --git a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go index e91a354bb7d..d0a03de1abd 100644 --- a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go +++ b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go @@ -4638,12 +4638,14 @@ type PipelineDeploymentConfig_ImporterSpec struct { // // Deprecated: Marked as deprecated in pipeline_spec.proto. CustomProperties map[string]*ValueOrRuntimeParameter `protobuf:"bytes,4,rep,name=custom_properties,json=customProperties,proto3" json:"custom_properties,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // Whether or not import an artifact regardless it has been imported before. + Reimport bool `protobuf:"varint,5,opt,name=reimport,proto3" json:"reimport,omitempty"` // Properties of the Artifact. Metadata *structpb.Struct `protobuf:"bytes,6,opt,name=metadata,proto3" json:"metadata,omitempty"` - // Whether or not import an artifact regardless it has been imported before. - Reimport bool `protobuf:"varint,5,opt,name=reimport,proto3" json:"reimport,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // If true, download artifact into the pipeline workspace. + DownloadToWorkspace bool `protobuf:"varint,7,opt,name=download_to_workspace,json=downloadToWorkspace,proto3" json:"download_to_workspace,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *PipelineDeploymentConfig_ImporterSpec) Reset() { @@ -4706,6 +4708,13 @@ func (x *PipelineDeploymentConfig_ImporterSpec) GetCustomProperties() map[string return nil } +func (x *PipelineDeploymentConfig_ImporterSpec) GetReimport() bool { + if x != nil { + return x.Reimport + } + return false +} + func (x *PipelineDeploymentConfig_ImporterSpec) GetMetadata() *structpb.Struct { if x != nil { return x.Metadata @@ -4713,9 +4722,9 @@ func (x *PipelineDeploymentConfig_ImporterSpec) GetMetadata() *structpb.Struct { return nil } -func (x *PipelineDeploymentConfig_ImporterSpec) GetReimport() bool { +func (x *PipelineDeploymentConfig_ImporterSpec) GetDownloadToWorkspace() bool { if x != nil { - return x.Reimport + return x.DownloadToWorkspace } return false } @@ -5903,7 +5912,7 @@ const file_pipeline_spec_proto_rawDesc = "" + "\x0econstant_value\x18\x01 \x01(\v2\x13.ml_pipelines.ValueB\x02\x18\x01H\x00R\rconstantValue\x12-\n" + "\x11runtime_parameter\x18\x02 \x01(\tH\x00R\x10runtimeParameter\x124\n" + "\bconstant\x18\x03 \x01(\v2\x16.google.protobuf.ValueH\x00R\bconstantB\a\n" + - "\x05value\"\xcf\x17\n" + + "\x05value\"\x83\x18\n" + "\x18PipelineDeploymentConfig\x12S\n" + "\texecutors\x18\x01 \x03(\v25.ml_pipelines.PipelineDeploymentConfig.ExecutorsEntryR\texecutors\x1a\xfc\t\n" + "\x15PipelineContainerSpec\x12\x14\n" + @@ -5937,7 +5946,7 @@ const file_pipeline_spec_proto_rawDesc = "" + "\x0eresource_count\x18\x04 \x01(\tR\rresourceCountJ\x04\b\x04\x10\x05\x1a2\n" + "\x06EnvVar\x12\x12\n" + "\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n" + - "\x05value\x18\x02 \x01(\tR\x05value\x1a\xa3\x05\n" + + "\x05value\x18\x02 \x01(\tR\x05value\x1a\xd7\x05\n" + "\fImporterSpec\x12H\n" + "\fartifact_uri\x18\x01 \x01(\v2%.ml_pipelines.ValueOrRuntimeParameterR\vartifactUri\x12A\n" + "\vtype_schema\x18\x02 \x01(\v2 .ml_pipelines.ArtifactTypeSchemaR\n" + @@ -5945,9 +5954,10 @@ const file_pipeline_spec_proto_rawDesc = "" + "\n" + "properties\x18\x03 \x03(\v2C.ml_pipelines.PipelineDeploymentConfig.ImporterSpec.PropertiesEntryB\x02\x18\x01R\n" + "properties\x12z\n" + - "\x11custom_properties\x18\x04 \x03(\v2I.ml_pipelines.PipelineDeploymentConfig.ImporterSpec.CustomPropertiesEntryB\x02\x18\x01R\x10customProperties\x123\n" + - "\bmetadata\x18\x06 \x01(\v2\x17.google.protobuf.StructR\bmetadata\x12\x1a\n" + - "\breimport\x18\x05 \x01(\bR\breimport\x1ad\n" + + "\x11custom_properties\x18\x04 \x03(\v2I.ml_pipelines.PipelineDeploymentConfig.ImporterSpec.CustomPropertiesEntryB\x02\x18\x01R\x10customProperties\x12\x1a\n" + + "\breimport\x18\x05 \x01(\bR\breimport\x123\n" + + "\bmetadata\x18\x06 \x01(\v2\x17.google.protobuf.StructR\bmetadata\x122\n" + + "\x15download_to_workspace\x18\a \x01(\bR\x13downloadToWorkspace\x1ad\n" + "\x0fPropertiesEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12;\n" + "\x05value\x18\x02 \x01(\v2%.ml_pipelines.ValueOrRuntimeParameterR\x05value:\x028\x01\x1aj\n" + diff --git a/api/v2alpha1/pipeline_spec.proto b/api/v2alpha1/pipeline_spec.proto index eced0271fcb..437365ab635 100644 --- a/api/v2alpha1/pipeline_spec.proto +++ b/api/v2alpha1/pipeline_spec.proto @@ -868,11 +868,14 @@ message PipelineDeploymentConfig { map custom_properties = 4 [deprecated = true]; + // Whether or not import an artifact regardless it has been imported before. + bool reimport = 5; + // Properties of the Artifact. google.protobuf.Struct metadata = 6; - // Whether or not import an artifact regardless it has been imported before. - bool reimport = 5; + // If true, download artifact into the pipeline workspace. + bool download_to_workspace = 7; } // ResolverSpec resolves artifacts from historical metadata and returns them diff --git a/backend/src/v2/compiler/argocompiler/dag.go b/backend/src/v2/compiler/argocompiler/dag.go index f38be870812..e0074b1664c 100644 --- a/backend/src/v2/compiler/argocompiler/dag.go +++ b/backend/src/v2/compiler/argocompiler/dag.go @@ -328,7 +328,7 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec // it's impossible to add a when condition based on driver outputs. return nil, fmt.Errorf("triggerPolicy.condition on importer task is not supported") } - importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID) + importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID, e.Importer.GetDownloadToWorkspace()) if err != nil { return nil, err } diff --git a/backend/src/v2/compiler/argocompiler/importer.go b/backend/src/v2/compiler/argocompiler/importer.go index d9ea4f88b32..7b5a20ed046 100644 --- a/backend/src/v2/compiler/argocompiler/importer.go +++ b/backend/src/v2/compiler/argocompiler/importer.go @@ -34,7 +34,7 @@ func (c *workflowCompiler) Importer(name string, componentSpec *pipelinespec.Com return c.saveComponentImpl(name, importer) } -func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string) (*wfapi.DAGTask, error) { +func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string, downloadToWorkspace bool) (*wfapi.DAGTask, error) { componentPlaceholder, err := c.useComponentSpec(task.GetComponentRef().GetName()) if err != nil { return nil, err @@ -45,7 +45,7 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline } return &wfapi.DAGTask{ Name: name, - Template: c.addImporterTemplate(), + Template: c.addImporterTemplate(downloadToWorkspace), Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{{ Name: paramTask, Value: wfapi.AnyStringPtr(taskJSON), @@ -62,8 +62,11 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline }, nil } -func (c *workflowCompiler) addImporterTemplate() string { +func (c *workflowCompiler) addImporterTemplate(downloadToWorkspace bool) string { name := "system-importer" + if downloadToWorkspace { + name += "-workspace" + } if _, alreadyExists := c.templates[name]; alreadyExists { return name } @@ -104,6 +107,24 @@ func (c *workflowCompiler) addImporterTemplate() string { if value, ok := os.LookupEnv(PublishLogsEnvVar); ok { args = append(args, "--publish_logs", value) } + + var volumeMounts []k8score.VolumeMount + var volumes []k8score.Volume + if downloadToWorkspace { + volumeMounts = append(volumeMounts, k8score.VolumeMount{ + Name: workspaceVolumeName, + MountPath: component.WorkspaceMountPath, + }) + volumes = append(volumes, k8score.Volume{ + Name: workspaceVolumeName, + VolumeSource: k8score.VolumeSource{ + PersistentVolumeClaim: &k8score.PersistentVolumeClaimVolumeSource{ + ClaimName: fmt.Sprintf("{{workflow.name}}-%s", workspaceVolumeName), + }, + }, + }) + } + importerTemplate := &wfapi.Template{ Name: name, Inputs: wfapi.Inputs{ @@ -115,13 +136,15 @@ func (c *workflowCompiler) addImporterTemplate() string { }, }, Container: &k8score.Container{ - Image: c.launcherImage, - Command: c.launcherCommand, - Args: args, - EnvFrom: []k8score.EnvFromSource{metadataEnvFrom}, - Env: commonEnvs, - Resources: driverResources, + Image: c.launcherImage, + Command: c.launcherCommand, + Args: args, + EnvFrom: []k8score.EnvFromSource{metadataEnvFrom}, + Env: commonEnvs, + Resources: driverResources, + VolumeMounts: volumeMounts, }, + Volumes: volumes, } // If TLS is enabled (apiserver or metadata), add the custom CA bundle to the importer template. diff --git a/backend/src/v2/component/importer_launcher.go b/backend/src/v2/component/importer_launcher.go index fa9a99fd0fb..c1745682d2d 100644 --- a/backend/src/v2/component/importer_launcher.go +++ b/backend/src/v2/component/importer_launcher.go @@ -8,6 +8,7 @@ import ( "github.com/kubeflow/pipelines/backend/src/common/util" + "github.com/kubeflow/pipelines/backend/src/v2/config" "github.com/kubeflow/pipelines/backend/src/v2/objectstore" pb "github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata" @@ -125,12 +126,18 @@ func (l *ImportLauncher) Execute(ctx context.Context) (err error) { if err != nil { return err } + // Determine execution type + executionType := metadata.ExecutionType(metadata.ImporterExecutionTypeName) + if l.importer.GetDownloadToWorkspace() { + executionType = metadata.ExecutionType(metadata.ImporterWorkspaceExecutionTypeName) + } + ecfg := &metadata.ExecutionConfig{ TaskName: l.task.GetTaskInfo().GetName(), PodName: l.launcherV2Options.PodName, PodUID: l.launcherV2Options.PodUID, Namespace: l.launcherV2Options.Namespace, - ExecutionType: metadata.ImporterExecutionTypeName, + ExecutionType: executionType, ParentDagID: l.importerLauncherOptions.ParentDagID, } createdExecution, err := l.metadataClient.CreateExecution(ctx, pipeline, ecfg) @@ -253,15 +260,17 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact } if strings.HasPrefix(artifactUri, "oci://") { + // OCI artifacts are not supported when workspace is used + if l.importer.GetDownloadToWorkspace() { + return nil, fmt.Errorf("importer workspace download does not support OCI registries") + } artifactType, err := metadata.SchemaToArtifactType(schema) if err != nil { return nil, fmt.Errorf("converting schema to artifact type failed: %w", err) } - if *artifactType.Name != "system.Model" { return nil, fmt.Errorf("the %s artifact type does not support OCI registries", *artifactType.Name) } - return artifact, nil } @@ -283,9 +292,54 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact } storeSessionInfoStr := string(storeSessionInfoJSON) artifact.CustomProperties["store_session_info"] = metadata.StringValue(storeSessionInfoStr) + + // Download the artifact into the workspace + if l.importer.GetDownloadToWorkspace() { + bucketConfig, err := l.resolveBucketConfigForURI(ctx, artifactUri) + if err != nil { + return nil, err + } + localPath, err := LocalWorkspacePathForURI(artifactUri) + if err != nil { + return nil, fmt.Errorf("failed to get local path for uri %q: %w", artifactUri, err) + } + blobKey, err := bucketConfig.KeyFromURI(artifactUri) + if err != nil { + return nil, fmt.Errorf("failed to derive blob key from uri %q while downloading artifact into workspace: %w", artifactUri, err) + } + bucket, err := objectstore.OpenBucket(ctx, l.k8sClient, l.launcherV2Options.Namespace, bucketConfig) + if err != nil { + return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err) + } + defer bucket.Close() + glog.Infof("Downloading artifact %q (blob key %q) to workspace path %q", artifactUri, blobKey, localPath) + if err := objectstore.DownloadBlob(ctx, bucket, localPath, blobKey); err != nil { + return nil, fmt.Errorf("failed to download artifact to workspace: %w", err) + } + } return artifact, nil } +// resolveBucketConfigForURI parses bucket configuration for a given artifact URI and +// attaches session information from the kfp-launcher ConfigMap when available. +func (l *ImportLauncher) resolveBucketConfigForURI(ctx context.Context, uri string) (*objectstore.Config, error) { + bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(uri) + if err != nil { + return nil, fmt.Errorf("failed to parse bucket config while resolving uri %q: %w", uri, err) + } + // Resolve and attach session info from kfp-launcher config for the artifact provider + if cfg, err := config.FromConfigMap(ctx, l.k8sClient, l.launcherV2Options.Namespace); err != nil { + glog.Warningf("failed to load launcher config while resolving bucket config: %v", err) + } else if cfg != nil { + if sess, err := cfg.GetStoreSessionInfo(uri); err != nil { + glog.Warningf("failed to resolve store session info for %q: %v", uri, err) + } else { + bucketConfig.SessionInfo = &sess + } + } + return bucketConfig, nil +} + func (l *ImportLauncher) getOutPutArtifactName() (string, error) { outPutNames := make([]string, 0, len(l.component.GetOutputDefinitions().GetArtifacts())) for name := range l.component.GetOutputDefinitions().GetArtifacts() { diff --git a/backend/src/v2/component/launcher_v2.go b/backend/src/v2/component/launcher_v2.go index ff2fd35684c..f6b74b911a8 100644 --- a/backend/src/v2/component/launcher_v2.go +++ b/backend/src/v2/component/launcher_v2.go @@ -715,6 +715,12 @@ func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.Executor for _, artifact := range artifactList.Artifacts { // Iterating through the artifact list allows for collected artifacts to be properly consumed. inputArtifact := artifact + // Skip downloading if the artifact is flagged as already present in the workspace + if inputArtifact.GetMetadata() != nil { + if v, ok := inputArtifact.GetMetadata().GetFields()["_kfp_workspace"]; ok && v.GetBoolValue() { + continue + } + } localPath, err := LocalPathForURI(inputArtifact.Uri) if err != nil { glog.Warningf("Input Artifact %q does not have a recognized storage URI %q. Skipping downloading to local path.", name, inputArtifact.Uri) @@ -858,6 +864,20 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma key := fmt.Sprintf(`{{$.inputs.artifacts['%s'].uri}}`, name) placeholders[key] = inputArtifact.Uri + // If the artifact is marked as already in the workspace, map to the workspace path + // with the same shape as LocalPathForURI, but rebased under the workspace mount. + if inputArtifact.GetMetadata() != nil { + if v, ok := inputArtifact.GetMetadata().GetFields()["_kfp_workspace"]; ok && v.GetBoolValue() { + localPath, lerr := LocalWorkspacePathForURI(inputArtifact.Uri) + if lerr != nil { + return nil, fmt.Errorf("failed to get local workspace path for input artifact %q: %w", name, lerr) + } + key = fmt.Sprintf(`{{$.inputs.artifacts['%s'].path}}`, name) + placeholders[key] = localPath + continue + } + } + localPath, err := LocalPathForURI(inputArtifact.Uri) if err != nil { // Input Artifact does not have a recognized storage URI @@ -1010,6 +1030,21 @@ func retrieveArtifactPath(artifact *pipelinespec.RuntimeArtifact) (string, error } } +// LocalWorkspacePathForURI returns the local workspace path for a given artifact URI. +// It preserves the same path shape as LocalPathForURI, but rebases it under the +// workspace artifacts directory: /kfp-workspace/.artifacts/... +func LocalWorkspacePathForURI(uri string) (string, error) { + if strings.HasPrefix(uri, "oci://") { + return "", fmt.Errorf("failed to generate workspace path for URI %s: OCI not supported for workspace artifacts", uri) + } + localPath, err := LocalPathForURI(uri) + if err != nil { + return "", err + } + // Rebase under the workspace mount, stripping the leading '/' + return filepath.Join(WorkspaceMountPath, ".artifacts", strings.TrimPrefix(localPath, "/")), nil +} + func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error { for name, parameter := range executorInput.GetOutputs().GetParameters() { dir := filepath.Dir(parameter.OutputFile) diff --git a/backend/src/v2/component/launcher_v2_test.go b/backend/src/v2/component/launcher_v2_test.go index a01c7126bd4..27435641ed7 100644 --- a/backend/src/v2/component/launcher_v2_test.go +++ b/backend/src/v2/component/launcher_v2_test.go @@ -20,6 +20,7 @@ import ( "errors" "io" "os" + "path/filepath" "testing" "github.com/kubeflow/pipelines/backend/src/v2/cacheutils" @@ -176,6 +177,29 @@ func Test_executeV2_publishLogs(t *testing.T) { } } +func Test_getPlaceholders_WorkspaceArtifactPath(t *testing.T) { + execIn := &pipelinespec.ExecutorInput{ + Inputs: &pipelinespec.ExecutorInput_Inputs{ + Artifacts: map[string]*pipelinespec.ArtifactList{ + "data": { + Artifacts: []*pipelinespec.RuntimeArtifact{ + {Uri: "minio://mlpipeline/sample/sample.txt", Metadata: &structpb.Struct{Fields: map[string]*structpb.Value{"_kfp_workspace": structpb.NewBoolValue(true)}}}, + }, + }, + }, + }, + } + ph, err := getPlaceholders(execIn) + if err != nil { + t.Fatalf("getPlaceholders error: %v", err) + } + actual := ph["{{$.inputs.artifacts['data'].path}}"] + expected := filepath.Join(WorkspaceMountPath, ".artifacts", "minio", "mlpipeline", "sample", "sample.txt") + if actual != expected { + t.Fatalf("placeholder path mismatch: actual=%q expected=%q", actual, expected) + } +} + func Test_executorInput_compileCmdAndArgs(t *testing.T) { executorInputJSON := `{ "inputs": { diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 478643b6e85..bc6645ddbe8 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -461,7 +461,7 @@ func needsWorkspaceMount(executorInput *pipelinespec.ExecutorInput) bool { return true } - if strings.HasPrefix(strVal.StringValue, component.WorkspaceMountPath) { + if strings.Contains(strVal.StringValue, component.WorkspaceMountPath) { return true } } diff --git a/backend/src/v2/driver/driver_test.go b/backend/src/v2/driver/driver_test.go index ef11fbcd499..b59447e76bb 100644 --- a/backend/src/v2/driver/driver_test.go +++ b/backend/src/v2/driver/driver_test.go @@ -1207,6 +1207,70 @@ func TestWorkspaceMount_PassthroughVolumes_ApplyAndCapture(t *testing.T) { } } +func TestWorkspaceMount_TriggeredByArtifactMetadata(t *testing.T) { + proxy.InitializeConfigWithEmptyForTests() + containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{Image: "python:3.9"} + componentSpec := &pipelinespec.ComponentSpec{ + TaskConfigPassthroughs: []*pipelinespec.TaskConfigPassthrough{ + { + Field: pipelinespec.TaskConfigPassthroughType_KUBERNETES_VOLUMES, + ApplyToTask: true, + }, + }, + } + + // Build an ExecutorInput that does NOT reference workspace path in params, + // but contains an input artifact marked as already in workspace. + execInput := &pipelinespec.ExecutorInput{ + Inputs: &pipelinespec.ExecutorInput_Inputs{ + Artifacts: map[string]*pipelinespec.ArtifactList{ + "data": { + Artifacts: []*pipelinespec.RuntimeArtifact{ + { + Uri: "minio://mlpipeline/sample/sample.txt", + Metadata: &structpb.Struct{Fields: map[string]*structpb.Value{ + "_kfp_workspace": structpb.NewBoolValue(true), + }}, + }, + }, + }, + }, + }, + } + + taskCfg := &TaskConfig{} + podSpec, err := initPodSpecPatch( + containerSpec, componentSpec, execInput, + 27, "test", "run", "my-run-name", "1", "false", "false", taskCfg, false, false, "", + "metadata-grpc-service.kubeflow.svc.local", + "8080", + ) + assert.Nil(t, err) + + // Expect workspace volume mounted + if assert.Len(t, podSpec.Volumes, 1) { + assert.Equal(t, "kfp-workspace", podSpec.Volumes[0].Name) + if assert.NotNil(t, podSpec.Volumes[0].PersistentVolumeClaim) { + assert.Equal(t, "my-run-name-kfp-workspace", podSpec.Volumes[0].PersistentVolumeClaim.ClaimName) + } + } + if assert.Len(t, podSpec.Containers, 1) { + if assert.Len(t, podSpec.Containers[0].VolumeMounts, 1) { + assert.Equal(t, "kfp-workspace", podSpec.Containers[0].VolumeMounts[0].Name) + assert.Equal(t, "/kfp-workspace", podSpec.Containers[0].VolumeMounts[0].MountPath) + } + } + + // Expect volumes to be captured in TaskConfig + if assert.Len(t, taskCfg.Volumes, 1) { + assert.Equal(t, "kfp-workspace", taskCfg.Volumes[0].Name) + } + if assert.Len(t, taskCfg.VolumeMounts, 1) { + assert.Equal(t, "kfp-workspace", taskCfg.VolumeMounts[0].Name) + assert.Equal(t, "/kfp-workspace", taskCfg.VolumeMounts[0].MountPath) + } +} + func Test_initPodSpecPatch_TaskConfig_Env_Passthrough_CaptureOnly(t *testing.T) { proxy.InitializeConfigWithEmptyForTests() containerSpec := &pipelinespec.PipelineDeploymentConfig_PipelineContainerSpec{ diff --git a/backend/src/v2/driver/resolve.go b/backend/src/v2/driver/resolve.go index d630a81a602..23a50b5d48f 100644 --- a/backend/src/v2/driver/resolve.go +++ b/backend/src/v2/driver/resolve.go @@ -33,6 +33,17 @@ import ( var ErrResolvedParameterNull = errors.New("the resolved input parameter is null") +// setWorkspaceFlag sets the _kfp_workspace metadata flag on the provided +// runtime artifact when the producing execution is an ImporterWorkspace. +func setWorkspaceFlag(execution *metadata.Execution, runtimeArtifact *pipelinespec.RuntimeArtifact) { + if execution.GetExecution().GetType() == string(metadata.ImporterWorkspaceExecutionTypeName) { + if runtimeArtifact.Metadata == nil { + runtimeArtifact.Metadata = &structpb.Struct{Fields: map[string]*structpb.Value{}} + } + runtimeArtifact.Metadata.Fields["_kfp_workspace"] = structpb.NewBoolValue(true) + } +} + // resolveUpstreamOutputsConfig is just a config struct used to store the input // parameters of the resolveUpstreamParameters and resolveUpstreamArtifacts // functions. @@ -763,6 +774,8 @@ func resolveUpstreamArtifacts(cfg resolveUpstreamOutputsConfig) (*pipelinespec.A if err != nil { cfg.err(err) } + // If produced by workspace importer, Set _kfp_workspace=True + setWorkspaceFlag(currentTask, runtimeArtifact) // Base case return &pipelinespec.ArtifactList{ Artifacts: []*pipelinespec.RuntimeArtifact{runtimeArtifact}, @@ -980,6 +993,8 @@ func collectContainerOutput( if err != nil { return nil, nil, cfg.err(err) } + // If produced by workspace importer, Set _kfp_workspace=True + setWorkspaceFlag(currentTask, artifact) glog.V(4).Infof("runtimeArtifact: %v", artifact) } else { _, outputParameters, err := currentTask.GetParameters() diff --git a/backend/src/v2/metadata/client.go b/backend/src/v2/metadata/client.go index 57ee2b456fe..8435a2f21bf 100644 --- a/backend/src/v2/metadata/client.go +++ b/backend/src/v2/metadata/client.go @@ -50,10 +50,11 @@ import ( ) const ( - pipelineContextTypeName = "system.Pipeline" - pipelineRunContextTypeName = "system.PipelineRun" - ImporterExecutionTypeName = "system.ImporterExecution" - mlmdClientSideMaxRetries = 3 + pipelineContextTypeName = "system.Pipeline" + pipelineRunContextTypeName = "system.PipelineRun" + ImporterExecutionTypeName = "system.ImporterExecution" + ImporterWorkspaceExecutionTypeName = "system.ImporterWorkspaceExecution" + mlmdClientSideMaxRetries = 3 ) type ExecutionType string diff --git a/backend/src/v2/metadata/converter.go b/backend/src/v2/metadata/converter.go index 202c5c6b130..6779082507a 100644 --- a/backend/src/v2/metadata/converter.go +++ b/backend/src/v2/metadata/converter.go @@ -119,6 +119,11 @@ func toMLMDArtifact(runtimeArtifact *pipelinespec.RuntimeArtifact) (*pb.Artifact if runtimeArtifact.Metadata != nil { for k, v := range runtimeArtifact.Metadata.Fields { + // _kfp_workspace flag is needed only at runtime for the executor to know + // the correct path to the artifact; do not persist to MLMD + if k == "_kfp_workspace" { + continue + } value, err := StructValueToMLMDValue(v) if err != nil { return nil, errorF(err) diff --git a/sdk/python/kfp/compiler/compiler_test.py b/sdk/python/kfp/compiler/compiler_test.py index ccdd1216670..bf65142ad93 100644 --- a/sdk/python/kfp/compiler/compiler_test.py +++ b/sdk/python/kfp/compiler/compiler_test.py @@ -4334,6 +4334,62 @@ def my_pipeline(): compiler.Compiler().compile( pipeline_func=my_pipeline, package_path=output_yaml) + def test_compile_fails_when_importer_download_to_workspace_without_workspace_config( + self): + """Tests that compilation fails if importer uses download_to_workspace without workspace config.""" + + import os + import tempfile + + from kfp import compiler + from kfp import dsl + + # No PipelineConfig provided (i.e., no workspace configured) + with self.assertRaisesRegex( + ValueError, + r'dsl\.importer\(download_to_workspace=True\) requires PipelineConfig\(workspace=\.\.\.\) on the pipeline\.' + ): + + @dsl.pipeline + def my_pipeline(): + dsl.importer( + artifact_uri='gs://bucket/file.txt', + artifact_class=dsl.Dataset, + download_to_workspace=True, + ) + + with tempfile.TemporaryDirectory() as tmpdir: + output_yaml = os.path.join(tmpdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=output_yaml) + + def test_compile_succeeds_when_importer_download_to_workspace_with_workspace_config( + self): + """Tests that compilation succeeds with both download_to_workspace and workspace config.""" + + import os + import tempfile + + from kfp import compiler + from kfp import dsl + + @dsl.pipeline( + pipeline_config=dsl.PipelineConfig( + workspace=dsl.WorkspaceConfig(size='1Gi'))) + def my_pipeline(): + dsl.importer( + artifact_uri='gs://bucket/file.txt', + artifact_class=dsl.Dataset, + download_to_workspace=True, + ) + + with tempfile.TemporaryDirectory() as tmpdir: + output_yaml = os.path.join(tmpdir, 'pipeline.yaml') + compiler.Compiler().compile( + pipeline_func=my_pipeline, package_path=output_yaml) + # Should not raise an error + self.assertTrue(os.path.exists(output_yaml)) + class ExtractInputOutputDescription(unittest.TestCase): diff --git a/sdk/python/kfp/compiler/pipeline_spec_builder.py b/sdk/python/kfp/compiler/pipeline_spec_builder.py index d3693aab816..3c77d4f730c 100644 --- a/sdk/python/kfp/compiler/pipeline_spec_builder.py +++ b/sdk/python/kfp/compiler/pipeline_spec_builder.py @@ -614,6 +614,10 @@ def build_importer_spec_for_task( metadata_protobuf_struct.update(task.importer_spec.metadata) importer_spec.metadata.CopyFrom(metadata_protobuf_struct) + # Emit download_to_workspace if set on the task + if getattr(task.importer_spec, 'download_to_workspace', False): + importer_spec.download_to_workspace = task.importer_spec.download_to_workspace + if isinstance(task.importer_spec.artifact_uri, pipeline_channel.PipelineChannel): importer_spec.artifact_uri.runtime_parameter = 'uri' @@ -2133,6 +2137,59 @@ def write_pipeline_spec_to_file( kubernetes_manifest_options: KubernetesManifestOptions object with manifest options. kubernetes_manifest_format: Output the compiled pipeline as a Kubernetes manifest. """ + + # Validate workspace requirement when download_to_workspace is used + def _uses_workspace_download(ps: pipeline_spec_pb2.PipelineSpec) -> bool: + ds = ps.deployment_spec + if ds is None: + return False + executors = getattr(ds, 'executors', None) + if executors: + for _, exec_spec in executors.items(): + if exec_spec.WhichOneof('spec') == 'importer': + if getattr(exec_spec.importer, 'download_to_workspace', + False): + return True + return False + ds_dict = json_format.MessageToDict(ds) + if not ds_dict: + return False + execs_dict = ds_dict.get('executors', {}) + if not isinstance(execs_dict, dict): + return False + for _, exec_spec in execs_dict.items(): + if isinstance(exec_spec, dict) and 'importer' in exec_spec: + importer = exec_spec['importer'] + if isinstance(importer, dict) and importer.get( + 'downloadToWorkspace', False): + return True + return False + + def _has_workspace_config(ps: pipeline_spec_pb2.PlatformSpec) -> bool: + if ps is None: + return False + plat = json_format.MessageToDict(ps) + platforms = plat.get('platforms', {}) + if not isinstance(platforms, dict): + return False + # Check if a platform has a workspace config + for platform_name, platform_config in platforms.items(): + if isinstance(platform_config, dict): + pc = platform_config.get('pipelineConfig') + if isinstance( + pc, dict + ) and 'workspace' in pc and pc['workspace'] is not None: + return True + return False + + if _uses_workspace_download( + pipeline_spec) and not _has_workspace_config(platform_spec): + raise ValueError( + 'dsl.importer(download_to_workspace=True) requires PipelineConfig(workspace=...) on the pipeline. ' + 'Add workspace configuration to your @dsl.pipeline decorator: ' + 'pipeline_config=dsl.PipelineConfig(workspace=dsl.WorkspaceConfig(size=\\\'1Gi\\\'))' + ) + if kubernetes_manifest_format: opts = kubernetes_manifest_options or KubernetesManifestOptions() opts.set_pipeline_spec(pipeline_spec) diff --git a/sdk/python/kfp/dsl/importer_node.py b/sdk/python/kfp/dsl/importer_node.py index 2a3e676daa7..ebdc47ec66d 100644 --- a/sdk/python/kfp/dsl/importer_node.py +++ b/sdk/python/kfp/dsl/importer_node.py @@ -34,6 +34,7 @@ def importer( artifact_class: Type[artifact_types.Artifact], reimport: bool = False, metadata: Optional[Mapping[str, Any]] = None, + download_to_workspace: bool = False, ) -> pipeline_task.PipelineTask: """Imports an existing artifact for use in a downstream component. @@ -42,6 +43,7 @@ def importer( artifact_class: The artifact class being imported. reimport: Whether to reimport the artifact. metadata: Properties of the artifact. + download_to_workspace: If true, download artifact into pipeline workspace. Returns: A task with the artifact accessible via its ``.output`` attribute. @@ -127,7 +129,8 @@ def traverse_dict_and_create_metadata_inputs(d: Any) -> Any: artifact_class.schema_title, artifact_class.schema_version), schema_version=artifact_class.schema_version, reimport=reimport, - metadata=metadata_with_placeholders)), + metadata=metadata_with_placeholders, + download_to_workspace=download_to_workspace)), inputs={ URI_KEY: structures.InputSpec(type='String'), **component_inputs diff --git a/sdk/python/kfp/dsl/structures.py b/sdk/python/kfp/dsl/structures.py index 2dba57411e5..94520ecef33 100644 --- a/sdk/python/kfp/dsl/structures.py +++ b/sdk/python/kfp/dsl/structures.py @@ -449,6 +449,7 @@ class ImporterSpec: schema_version: str reimport: bool metadata: Optional[Mapping[str, Any]] = None + download_to_workspace: bool = False @dataclasses.dataclass diff --git a/sdk/python/kfp/dsl/types/artifact_types.py b/sdk/python/kfp/dsl/types/artifact_types.py index 3f9405dc42c..d671f06565f 100644 --- a/sdk/python/kfp/dsl/types/artifact_types.py +++ b/sdk/python/kfp/dsl/types/artifact_types.py @@ -18,6 +18,8 @@ from typing import Dict, List, Optional, Type import warnings +from kfp.dsl.constants import WORKSPACE_MOUNT_PATH + _GCS_LOCAL_MOUNT_PREFIX = '/gcs/' _MINIO_LOCAL_MOUNT_PREFIX = '/minio/' _S3_LOCAL_MOUNT_PREFIX = '/s3/' @@ -94,23 +96,31 @@ def path(self, path: str) -> None: self._set_path(path) def _get_path(self) -> Optional[str]: + local_path = self.uri + if self.custom_path: - return self._get_custom_path() - if self.uri.startswith(RemotePrefix.GCS.value): - return _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS.value - ):] - if self.uri.startswith(RemotePrefix.MINIO.value): - return _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.MINIO - .value):] - if self.uri.startswith(RemotePrefix.S3.value): - return _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3.value - ):] - if self.uri.startswith(RemotePrefix.OCI.value): + local_path = self._get_custom_path() + elif self.uri.startswith(RemotePrefix.GCS.value): + local_path = _GCS_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.GCS + .value):] + elif self.uri.startswith(RemotePrefix.MINIO.value): + local_path = _MINIO_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix. + MINIO.value):] + elif self.uri.startswith(RemotePrefix.S3.value): + local_path = _S3_LOCAL_MOUNT_PREFIX + self.uri[len(RemotePrefix.S3 + .value):] + elif self.uri.startswith(RemotePrefix.OCI.value): escaped_uri = self.uri[len(RemotePrefix.OCI.value):].replace( '/', '_') - return _OCI_LOCAL_MOUNT_PREFIX + escaped_uri - # uri == path for local execution - return self.uri + local_path = _OCI_LOCAL_MOUNT_PREFIX + escaped_uri + + # If the artifact is already present in the pipeline workspace, map to the workspace path. + # This is indicated by backend setting metadata['_kfp_workspace'] = True. + if self.metadata.get('_kfp_workspace') is True: + local_path = os.path.join(WORKSPACE_MOUNT_PATH, ".artifacts", + local_path.lstrip("/")) + + return local_path @property def custom_path(self) -> str: diff --git a/sdk/python/kfp/local/importer_handler.py b/sdk/python/kfp/local/importer_handler.py index 44c6a407f1d..8331e56e173 100644 --- a/sdk/python/kfp/local/importer_handler.py +++ b/sdk/python/kfp/local/importer_handler.py @@ -13,13 +13,19 @@ # limitations under the License. """Code for running a dsl.importer locally.""" import logging +import os +import shutil from typing import Any, Dict, Tuple +from urllib.parse import urlparse +import uuid import warnings from google.protobuf import json_format from kfp import dsl +from kfp.dsl import constants as dsl_constants from kfp.dsl.types import artifact_types from kfp.dsl.types import type_utils +from kfp.local import config from kfp.local import logging_utils from kfp.local import placeholder_utils from kfp.local import status @@ -28,6 +34,43 @@ Outputs = Dict[str, Any] +def _get_workspace_root() -> str: + cfg = config.LocalExecutionConfig.instance + if cfg is None or not cfg.workspace_root: + raise RuntimeError( + 'Workspace not configured. Initialize with workspace_root parameter:\n' + "local.init(runner=local.SubprocessRunner(), workspace_root='/path/to/workspace')" + ) + workspace_root = cfg.workspace_root + if not os.path.isabs(workspace_root): + workspace_root = os.path.abspath(workspace_root) + return workspace_root + + +def _copy_local_artifact_to_workspace(source_path: str, base_dir: str, + component_name: str) -> str: + source_path = os.path.abspath(source_path) + if not os.path.exists(source_path): + raise FileNotFoundError(source_path) + + destination_dir = os.path.join(base_dir, 'importer', component_name, + uuid.uuid4().hex) + os.makedirs(destination_dir, exist_ok=True) + + if os.path.isdir(source_path): + destination_path = os.path.join( + destination_dir, os.path.basename(source_path.rstrip(os.sep))) + if os.path.exists(destination_path): + shutil.rmtree(destination_path) + shutil.copytree(source_path, destination_path) + else: + destination_path = os.path.join(destination_dir, + os.path.basename(source_path)) + shutil.copy2(source_path, destination_path) + + return destination_path + + def run_importer( pipeline_resource_name: str, component_name: str, @@ -90,13 +133,43 @@ def run_importer( ) ArtifactCls = get_artifact_class_from_schema_title( executor_spec.importer.type_schema.schema_title) - outputs = { - 'artifact': ArtifactCls( - name='artifact', - uri=uri, - metadata=metadata, - ) - } + artifact = ArtifactCls( + name='artifact', + uri=uri, + metadata=metadata, + ) + + if executor_spec.importer.download_to_workspace: + parsed_uri = urlparse(uri) + scheme = parsed_uri.scheme + if scheme not in ('', 'file'): + warnings.warn( + "download_to_workspace only supports local file paths when running locally. Skipping workspace copy for '%s'." + % uri) + else: + source_path = parsed_uri.path if scheme == 'file' else uri + try: + workspace_root = _get_workspace_root() + workspace_artifacts_root = os.path.join(workspace_root, + '.artifacts') + os.makedirs(workspace_artifacts_root, exist_ok=True) + cfg = config.LocalExecutionConfig.instance + host_path = _copy_local_artifact_to_workspace( + source_path, workspace_artifacts_root, component_name) + + if isinstance(cfg.runner, config.DockerRunner): + rel_path = os.path.relpath(host_path, workspace_root) + container_path = os.path.join( + dsl_constants.WORKSPACE_MOUNT_PATH, rel_path) + artifact.custom_path = container_path + else: + artifact.custom_path = host_path + except FileNotFoundError: + raise FileNotFoundError( + f"download_to_workspace expected a local file at '{source_path}', but it was not found." + ) + + outputs = {'artifact': artifact} with logging_utils.local_logger_context(): logging.info( f'Task {task_name_for_logs} finished with status {logging_utils.format_status(status.Status.SUCCESS)}' diff --git a/sdk/python/test/compilation/pipeline_compilation_test.py b/sdk/python/test/compilation/pipeline_compilation_test.py index 5262e1c9abd..8c0b2514f05 100644 --- a/sdk/python/test/compilation/pipeline_compilation_test.py +++ b/sdk/python/test/compilation/pipeline_compilation_test.py @@ -94,6 +94,8 @@ my_pipeline as artifact_upload_download_pipeline from test_data.sdk_compiled_pipelines.valid.critical.pipeline_with_env import \ my_pipeline as env_pipeline +from test_data.sdk_compiled_pipelines.valid.critical.pipeline_with_importer_workspace import \ + pipeline_with_importer_workspace from test_data.sdk_compiled_pipelines.valid.critical.pipeline_with_input_status_state import \ status_state_pipeline from test_data.sdk_compiled_pipelines.valid.critical.pipeline_with_placeholders import \ @@ -479,6 +481,13 @@ def __repr__(self) -> str: compiled_file_name='pipeline_with_workspace.yaml', expected_compiled_file_path=f'{_VALID_PIPELINE_FILES}/critical/pipeline_with_workspace.yaml' ), + TestData( + pipeline_name='pipeline-with-importer-workspace', + pipeline_func=pipeline_with_importer_workspace, + pipline_func_args=None, + compiled_file_name='pipeline_with_importer_workspace.yaml', + expected_compiled_file_path=f'{_VALID_PIPELINE_FILES}/critical/pipeline_with_importer_workspace.yaml' + ), TestData( pipeline_name='containerized-two-step-pipeline', pipeline_func=two_step_containerized_pipeline, diff --git a/sdk/python/test/local_execution/local_execution_test.py b/sdk/python/test/local_execution/local_execution_test.py index 0bc882092e0..cee25072ba7 100644 --- a/sdk/python/test/local_execution/local_execution_test.py +++ b/sdk/python/test/local_execution/local_execution_test.py @@ -37,6 +37,8 @@ crust as mixed_parameters_pipeline from test_data.sdk_compiled_pipelines.valid.critical.multiple_parameters_namedtuple import \ crust as namedtuple_pipeline +from test_data.sdk_compiled_pipelines.valid.critical.pipeline_with_importer_workspace import \ + pipeline_with_importer_workspace as importer_workspace_pipeline from test_data.sdk_compiled_pipelines.valid.critical.producer_consumer_param import \ producer_consumer_param_pipeline from test_data.sdk_compiled_pipelines.valid.dict_input import dict_input @@ -173,6 +175,12 @@ def idfn(val): pipeline_func_args=None, expected_output=None, ), + TestData( + name='Importer Workspace', + pipeline_func=importer_workspace_pipeline, + pipeline_func_args=None, + expected_output=None, + ), ] docker_specific_pipeline_funcs = [ @@ -219,7 +227,7 @@ def idfn(val): @pytest.mark.regression class TestDockerRunner: - @pytest.fixture(scope="class", autouse=True) + @pytest.fixture(autouse=True) def setup_and_teardown(self): ws_root = f'{ws_root_base}_docker' pipeline_root = f'{pipeline_root_base}_docker' diff --git a/test_data/compiled-workflows/pipeline_with_importer_workspace.yaml b/test_data/compiled-workflows/pipeline_with_importer_workspace.yaml new file mode 100644 index 00000000000..befe7d58d3c --- /dev/null +++ b/test_data/compiled-workflows/pipeline_with_importer_workspace.yaml @@ -0,0 +1,688 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + creationTimestamp: null + generateName: pipeline-with-importer-workspace- +spec: + arguments: + parameters: + - name: components-2b628cf1bbf8b078860182989444f41ee4b6d622ca01c0c3a141b24e47528200 + value: '{"executorLabel":"exec-get-uri","inputDefinitions":{"artifacts":{"d":{"artifactType":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}' + - name: implementations-2b628cf1bbf8b078860182989444f41ee4b6d622ca01c0c3a141b24e47528200 + value: '{"args":["--executor_input","{{$}}","--function_to_execute","get_uri"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 + -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.14.6'' ''--no-deps'' + ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 + \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" + \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 + -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef + get_uri(d: dsl.Input[dsl.Dataset]) -\u003e str:\n print(f\"Artifact URI: + {d.uri}\")\n return d.uri\n\n"],"image":"registry.access.redhat.com/ubi9/python-311:latest"}' + - name: components-comp-importer + value: '{"executorLabel":"exec-importer","inputDefinitions":{"parameters":{"uri":{"parameterType":"STRING"}}},"outputDefinitions":{"artifacts":{"artifact":{"artifactType":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}}}}' + - name: implementations-comp-importer + value: '{"artifactUri":{"runtimeParameter":"uri"},"downloadToWorkspace":true,"metadata":{"key":"value"},"typeSchema":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}' + - name: components-comp-importer-2 + value: '{"executorLabel":"exec-importer-2","inputDefinitions":{"parameters":{"uri":{"parameterType":"STRING"}}},"outputDefinitions":{"artifacts":{"artifact":{"artifactType":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}}}}' + - name: implementations-comp-importer-2 + value: '{"artifactUri":{"runtimeParameter":"uri"},"downloadToWorkspace":true,"metadata":{"source":"directory"},"reimport":true,"typeSchema":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}' + - name: components-3bcddb027f2989803f573a96a8276aa92dafce532445692e84684a59fbade9d0 + value: '{"executorLabel":"exec-read-dir","inputDefinitions":{"artifacts":{"data":{"artifactType":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}' + - name: implementations-3bcddb027f2989803f573a96a8276aa92dafce532445692e84684a59fbade9d0 + value: '{"args":["--executor_input","{{$}}","--function_to_execute","read_dir"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 + -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.14.6'' ''--no-deps'' + ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 + \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" + \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 + -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef + read_dir(data: dsl.Input[dsl.Dataset]) -\u003e str:\n \"\"\"Walk the directory + and return a summary of file names.\"\"\"\n import os\n path = data.path\n\n if + not os.path.exists(path):\n raise FileNotFoundError(f\"Path does not + exist: {path}\")\n\n if os.path.isdir(path):\n names = []\n for + root, _, files in os.walk(path):\n for name in files:\n names.append(os.path.relpath(os.path.join(root, + name), path))\n names.sort()\n result = \",\".join(names) if + names else \"EMPTY_DIRECTORY\"\n print(f\"Found {len(names)} files: + {result}\")\n return result\n\n raise ValueError(f\"Not a directory: + {path}\")\n\n"],"image":"registry.access.redhat.com/ubi9/python-311:latest"}' + - name: components-5280d86f7e6e728edad8560318d1b1dbaecae0175dc50cc0101d214bcdbdb157 + value: '{"executorLabel":"exec-train","inputDefinitions":{"artifacts":{"dataset":{"artifactType":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}}},"outputDefinitions":{"parameters":{"message":{"parameterType":"STRING"},"scalar":{"parameterType":"STRING"}}}}' + - name: implementations-5280d86f7e6e728edad8560318d1b1dbaecae0175dc50cc0101d214bcdbdb157 + value: '{"args":["--executor_input","{{$}}","--function_to_execute","train"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 + -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.14.6'' ''--no-deps'' + ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 + \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" + \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 + -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef + train(\n dataset: dsl.Input[dsl.Dataset]\n) -\u003e NamedTuple(''Outputs'', + [\n (''scalar'', str),\n (''message'', str),\n]):\n \"\"\"Dummy Training + step.\"\"\"\n with open(dataset.path, encoding=\"utf-8\") as f:\n data + = f.read()\n print(''Dataset:'', data)\n\n scalar = ''123''\n message + = f''My model trained using data: {data}''\n\n from collections import + namedtuple\n output = namedtuple(''Outputs'', [''scalar'', ''message''])\n return + output(scalar, message)\n\n"],"image":"registry.access.redhat.com/ubi9/python-311:latest"}' + - name: components-comp-import-stage + value: '{"dag":{"outputs":{"parameters":{"dir_result":{"valueFromParameter":{"outputParameterKey":"Output","producerSubtask":"read-dir"}},"train_result":{"valueFromParameter":{"outputParameterKey":"scalar","producerSubtask":"train"}}}},"tasks":{"importer":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"componentInputParameter":"file_uri"}}},"taskInfo":{"name":"importer"}},"importer-2":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer-2"},"inputs":{"parameters":{"uri":{"componentInputParameter":"dir_uri"}}},"taskInfo":{"name":"importer-2"}},"read-dir":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-read-dir"},"dependentTasks":["importer-2"],"inputs":{"artifacts":{"data":{"taskOutputArtifact":{"outputArtifactKey":"artifact","producerTask":"importer-2"}}}},"taskInfo":{"name":"read-dir"}},"train":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-train"},"dependentTasks":["importer"],"inputs":{"artifacts":{"dataset":{"taskOutputArtifact":{"outputArtifactKey":"artifact","producerTask":"importer"}}}},"taskInfo":{"name":"train"}}}},"inputDefinitions":{"parameters":{"dir_uri":{"parameterType":"STRING"},"file_uri":{"parameterType":"STRING"}}},"outputDefinitions":{"parameters":{"dir_result":{"parameterType":"STRING"},"train_result":{"parameterType":"STRING"}}}}' + - name: components-f12dedf1695f84ba76d85e222901aff7b854f31e4e01e21fb11991b72fa4e98c + value: '{"executorLabel":"exec-write-dir-artifact","outputDefinitions":{"artifacts":{"out_ds":{"artifactType":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}}}}' + - name: implementations-f12dedf1695f84ba76d85e222901aff7b854f31e4e01e21fb11991b72fa4e98c + value: '{"args":["--executor_input","{{$}}","--function_to_execute","write_dir_artifact"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 + -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.14.6'' ''--no-deps'' + ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 + \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" + \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 + -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef + write_dir_artifact(out_ds: Output[dsl.Dataset]):\n import os\n os.makedirs(out_ds.path, + exist_ok=True)\n with open(os.path.join(out_ds.path, \"part-000.txt\"), + \"w\", encoding=\"utf-8\") as f:\n f.write(\"First file in directory.\\n\")\n with + open(os.path.join(out_ds.path, \"part-001.txt\"), \"w\", encoding=\"utf-8\") + as f:\n f.write(\"Second file in directory.\\n\")\n\n"],"image":"registry.access.redhat.com/ubi9/python-311:latest"}' + - name: components-d39dc06dd18c0e79de6863a8ecb0e12cb73b623496a42b59cae4fbcbfcfed4bd + value: '{"executorLabel":"exec-write-file-artifact","outputDefinitions":{"artifacts":{"out_ds":{"artifactType":{"schemaTitle":"system.Dataset","schemaVersion":"0.0.1"}}}}}' + - name: implementations-d39dc06dd18c0e79de6863a8ecb0e12cb73b623496a42b59cae4fbcbfcfed4bd + value: '{"args":["--executor_input","{{$}}","--function_to_execute","write_file_artifact"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 + -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.14.6'' ''--no-deps'' + ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 + \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" + \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 + -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef + write_file_artifact(out_ds: Output[dsl.Dataset]):\n import os\n os.makedirs(os.path.dirname(out_ds.path), + exist_ok=True)\n with open(out_ds.path, \"w\", encoding=\"utf-8\") as f:\n f.write(\"Hello + from producer file\\n\")\n\n"],"image":"registry.access.redhat.com/ubi9/python-311:latest"}' + - name: components-root + value: '{"dag":{"outputs":{"parameters":{"dir_result":{"valueFromParameter":{"outputParameterKey":"dir_result","producerSubtask":"import-stage"}},"train_result":{"valueFromParameter":{"outputParameterKey":"train_result","producerSubtask":"import-stage"}}}},"tasks":{"get-uri":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-get-uri"},"dependentTasks":["write-file-artifact"],"inputs":{"artifacts":{"d":{"taskOutputArtifact":{"outputArtifactKey":"out_ds","producerTask":"write-file-artifact"}}}},"taskInfo":{"name":"get-uri"}},"get-uri-2":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-get-uri-2"},"dependentTasks":["write-dir-artifact"],"inputs":{"artifacts":{"d":{"taskOutputArtifact":{"outputArtifactKey":"out_ds","producerTask":"write-dir-artifact"}}}},"taskInfo":{"name":"get-uri-2"}},"import-stage":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-import-stage"},"dependentTasks":["get-uri","get-uri-2"],"inputs":{"parameters":{"dir_uri":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"get-uri-2"}},"file_uri":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"get-uri"}}}},"taskInfo":{"name":"import-stage"}},"write-dir-artifact":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-write-dir-artifact"},"taskInfo":{"name":"write-dir-artifact"}},"write-file-artifact":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-write-file-artifact"},"taskInfo":{"name":"write-file-artifact"}}}},"outputDefinitions":{"parameters":{"dir_result":{"parameterType":"STRING"},"train_result":{"parameterType":"STRING"}}}}' + entrypoint: entrypoint + podMetadata: + annotations: + pipelines.kubeflow.org/v2_component: "true" + labels: + pipelines.kubeflow.org/v2_component: "true" + serviceAccountName: pipeline-runner + templates: + - container: + args: + - --executor_type + - importer + - --task_spec + - '{{inputs.parameters.task}}' + - --component_spec + - '{{inputs.parameters.component}}' + - --importer_spec + - '{{inputs.parameters.importer}}' + - --pipeline_name + - pipeline-with-importer-workspace + - --run_id + - '{{workflow.uid}}' + - --parent_dag_id + - '{{inputs.parameters.parent-dag-id}}' + - --pod_name + - $(KFP_POD_NAME) + - --pod_uid + - $(KFP_POD_UID) + - --mlmd_server_address + - metadata-grpc-service.kubeflow.svc.cluster.local + - --mlmd_server_port + - "8080" + command: + - launcher-v2 + env: + - name: KFP_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: KFP_POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + envFrom: + - configMapRef: + name: metadata-grpc-configmap + optional: true + image: ghcr.io/kubeflow/kfp-launcher:latest + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + volumeMounts: + - mountPath: /kfp-workspace + name: kfp-workspace + inputs: + parameters: + - name: task + - name: component + - name: importer + - name: parent-dag-id + metadata: {} + name: system-importer-workspace + outputs: {} + volumes: + - name: kfp-workspace + persistentVolumeClaim: + claimName: '{{workflow.name}}-kfp-workspace' + - container: + args: + - --type + - CONTAINER + - --pipeline_name + - pipeline-with-importer-workspace + - --run_id + - '{{workflow.uid}}' + - --run_name + - '{{workflow.name}}' + - --run_display_name + - "" + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --task_name + - '{{inputs.parameters.task-name}}' + - --container + - '{{inputs.parameters.container}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --cached_decision_path + - '{{outputs.parameters.cached-decision.path}}' + - --pod_spec_patch_path + - '{{outputs.parameters.pod-spec-patch.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --kubernetes_config + - '{{inputs.parameters.kubernetes-config}}' + - --http_proxy + - "" + - --https_proxy + - "" + - --no_proxy + - "" + - --ml_pipeline_server_address + - ml-pipeline.kubeflow + - --ml_pipeline_server_port + - "8887" + - --mlmd_server_address + - metadata-grpc-service.kubeflow.svc.cluster.local + - --mlmd_server_port + - "8080" + command: + - driver + image: ghcr.io/kubeflow/kfp-driver:latest + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - name: task + - name: container + - name: task-name + - name: parent-dag-id + - default: "-1" + name: iteration-index + - default: "" + name: kubernetes-config + metadata: {} + name: system-container-driver + outputs: + parameters: + - name: pod-spec-patch + valueFrom: + default: "" + path: /tmp/outputs/pod-spec-patch + - default: "false" + name: cached-decision + valueFrom: + default: "false" + path: /tmp/outputs/cached-decision + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: pod-spec-patch + value: '{{inputs.parameters.pod-spec-patch}}' + name: executor + template: system-container-impl + when: '{{inputs.parameters.cached-decision}} != true' + inputs: + parameters: + - name: pod-spec-patch + - default: "false" + name: cached-decision + metadata: {} + name: system-container-executor + outputs: {} + - container: + command: + - should-be-overridden-during-runtime + env: + - name: KFP_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: KFP_POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + envFrom: + - configMapRef: + name: metadata-grpc-configmap + optional: true + image: gcr.io/ml-pipeline/should-be-overridden-during-runtime + name: "" + resources: {} + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + - mountPath: /gcs + name: gcs-scratch + - mountPath: /s3 + name: s3-scratch + - mountPath: /minio + name: minio-scratch + - mountPath: /.local + name: dot-local-scratch + - mountPath: /.cache + name: dot-cache-scratch + - mountPath: /.config + name: dot-config-scratch + initContainers: + - args: + - --copy + - /kfp-launcher/launch + command: + - launcher-v2 + image: ghcr.io/kubeflow/kfp-launcher:latest + name: kfp-launcher + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 100m + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + inputs: + parameters: + - name: pod-spec-patch + metadata: {} + name: system-container-impl + outputs: {} + podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' + volumes: + - emptyDir: {} + name: kfp-launcher + - emptyDir: {} + name: gcs-scratch + - emptyDir: {} + name: s3-scratch + - emptyDir: {} + name: minio-scratch + - emptyDir: {} + name: dot-local-scratch + - emptyDir: {} + name: dot-cache-scratch + - emptyDir: {} + name: dot-config-scratch + - dag: + tasks: + - arguments: + parameters: + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer"},"inputs":{"parameters":{"uri":{"componentInputParameter":"file_uri"}}},"taskInfo":{"name":"importer"}}' + - name: component + value: '{{workflow.parameters.components-comp-importer}}' + - name: importer + value: '{{workflow.parameters.implementations-comp-importer}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: importer + template: system-importer-workspace + - arguments: + parameters: + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-importer-2"},"inputs":{"parameters":{"uri":{"componentInputParameter":"dir_uri"}}},"taskInfo":{"name":"importer-2"}}' + - name: component + value: '{{workflow.parameters.components-comp-importer-2}}' + - name: importer + value: '{{workflow.parameters.implementations-comp-importer-2}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: importer-2 + template: system-importer-workspace + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-3bcddb027f2989803f573a96a8276aa92dafce532445692e84684a59fbade9d0}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-read-dir"},"dependentTasks":["importer-2"],"inputs":{"artifacts":{"data":{"taskOutputArtifact":{"outputArtifactKey":"artifact","producerTask":"importer-2"}}}},"taskInfo":{"name":"read-dir"}}' + - name: container + value: '{{workflow.parameters.implementations-3bcddb027f2989803f573a96a8276aa92dafce532445692e84684a59fbade9d0}}' + - name: task-name + value: read-dir + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + depends: importer-2.Succeeded + name: read-dir-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.read-dir-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.read-dir-driver.outputs.parameters.cached-decision}}' + depends: read-dir-driver.Succeeded + name: read-dir + template: system-container-executor + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-5280d86f7e6e728edad8560318d1b1dbaecae0175dc50cc0101d214bcdbdb157}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-train"},"dependentTasks":["importer"],"inputs":{"artifacts":{"dataset":{"taskOutputArtifact":{"outputArtifactKey":"artifact","producerTask":"importer"}}}},"taskInfo":{"name":"train"}}' + - name: container + value: '{{workflow.parameters.implementations-5280d86f7e6e728edad8560318d1b1dbaecae0175dc50cc0101d214bcdbdb157}}' + - name: task-name + value: train + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + depends: importer.Succeeded + name: train-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.train-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.train-driver.outputs.parameters.cached-decision}}' + depends: train-driver.Succeeded + name: train + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: comp-import-stage + outputs: {} + - container: + args: + - --type + - '{{inputs.parameters.driver-type}}' + - --pipeline_name + - pipeline-with-importer-workspace + - --run_id + - '{{workflow.uid}}' + - --run_name + - '{{workflow.name}}' + - --run_display_name + - "" + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --task_name + - '{{inputs.parameters.task-name}}' + - --runtime_config + - '{{inputs.parameters.runtime-config}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --execution_id_path + - '{{outputs.parameters.execution-id.path}}' + - --iteration_count_path + - '{{outputs.parameters.iteration-count.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --http_proxy + - "" + - --https_proxy + - "" + - --no_proxy + - "" + - --ml_pipeline_server_address + - ml-pipeline.kubeflow + - --ml_pipeline_server_port + - "8887" + - --mlmd_server_address + - metadata-grpc-service.kubeflow.svc.cluster.local + - --mlmd_server_port + - "8080" + command: + - driver + image: ghcr.io/kubeflow/kfp-driver:latest + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - default: "" + name: runtime-config + - default: "" + name: task + - default: "" + name: task-name + - default: "0" + name: parent-dag-id + - default: "-1" + name: iteration-index + - default: DAG + name: driver-type + metadata: {} + name: system-dag-driver + outputs: + parameters: + - name: execution-id + valueFrom: + path: /tmp/outputs/execution-id + - name: iteration-count + valueFrom: + default: "0" + path: /tmp/outputs/iteration-count + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-2b628cf1bbf8b078860182989444f41ee4b6d622ca01c0c3a141b24e47528200}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-get-uri"},"dependentTasks":["write-file-artifact"],"inputs":{"artifacts":{"d":{"taskOutputArtifact":{"outputArtifactKey":"out_ds","producerTask":"write-file-artifact"}}}},"taskInfo":{"name":"get-uri"}}' + - name: container + value: '{{workflow.parameters.implementations-2b628cf1bbf8b078860182989444f41ee4b6d622ca01c0c3a141b24e47528200}}' + - name: task-name + value: get-uri + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + depends: write-file-artifact.Succeeded + name: get-uri-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.get-uri-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.get-uri-driver.outputs.parameters.cached-decision}}' + depends: get-uri-driver.Succeeded + name: get-uri + template: system-container-executor + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-2b628cf1bbf8b078860182989444f41ee4b6d622ca01c0c3a141b24e47528200}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-get-uri-2"},"dependentTasks":["write-dir-artifact"],"inputs":{"artifacts":{"d":{"taskOutputArtifact":{"outputArtifactKey":"out_ds","producerTask":"write-dir-artifact"}}}},"taskInfo":{"name":"get-uri-2"}}' + - name: container + value: '{{workflow.parameters.implementations-2b628cf1bbf8b078860182989444f41ee4b6d622ca01c0c3a141b24e47528200}}' + - name: task-name + value: get-uri-2 + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + depends: write-dir-artifact.Succeeded + name: get-uri-2-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.get-uri-2-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.get-uri-2-driver.outputs.parameters.cached-decision}}' + depends: get-uri-2-driver.Succeeded + name: get-uri-2 + template: system-container-executor + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-comp-import-stage}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-import-stage"},"dependentTasks":["get-uri","get-uri-2"],"inputs":{"parameters":{"dir_uri":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"get-uri-2"}},"file_uri":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"get-uri"}}}},"taskInfo":{"name":"import-stage"}}' + - name: task-name + value: import-stage + depends: get-uri.Succeeded && get-uri-2.Succeeded + name: import-stage-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.import-stage-driver.outputs.parameters.execution-id}}' + - name: condition + value: '{{tasks.import-stage-driver.outputs.parameters.condition}}' + depends: import-stage-driver.Succeeded + name: import-stage + template: comp-import-stage + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-f12dedf1695f84ba76d85e222901aff7b854f31e4e01e21fb11991b72fa4e98c}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-write-dir-artifact"},"taskInfo":{"name":"write-dir-artifact"}}' + - name: container + value: '{{workflow.parameters.implementations-f12dedf1695f84ba76d85e222901aff7b854f31e4e01e21fb11991b72fa4e98c}}' + - name: task-name + value: write-dir-artifact + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: write-dir-artifact-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.write-dir-artifact-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.write-dir-artifact-driver.outputs.parameters.cached-decision}}' + depends: write-dir-artifact-driver.Succeeded + name: write-dir-artifact + template: system-container-executor + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-d39dc06dd18c0e79de6863a8ecb0e12cb73b623496a42b59cae4fbcbfcfed4bd}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-write-file-artifact"},"taskInfo":{"name":"write-file-artifact"}}' + - name: container + value: '{{workflow.parameters.implementations-d39dc06dd18c0e79de6863a8ecb0e12cb73b623496a42b59cae4fbcbfcfed4bd}}' + - name: task-name + value: write-file-artifact + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: write-file-artifact-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.write-file-artifact-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.write-file-artifact-driver.outputs.parameters.cached-decision}}' + depends: write-file-artifact-driver.Succeeded + name: write-file-artifact + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: root + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-root}}' + - name: runtime-config + value: '{}' + - name: driver-type + value: ROOT_DAG + name: root-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.root-driver.outputs.parameters.execution-id}}' + - name: condition + value: "" + depends: root-driver.Succeeded + name: root + template: root + inputs: {} + metadata: {} + name: entrypoint + outputs: {} + volumeClaimTemplates: + - metadata: + creationTimestamp: null + name: kfp-workspace + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 1Gi + storageClassName: standard + status: {} +status: + finishedAt: null + startedAt: null diff --git a/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_importer_workspace.py b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_importer_workspace.py new file mode 100644 index 00000000000..1ca2b43554f --- /dev/null +++ b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_importer_workspace.py @@ -0,0 +1,148 @@ +# Copyright 2025 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A pipeline that tests the importer with download_to_workspace for file and directory artifacts.""" +from kfp import dsl, compiler +from typing import NamedTuple +from kfp.dsl import importer, Output +import os + +@dsl.component(base_image='registry.access.redhat.com/ubi9/python-311:latest') +def train( + dataset: dsl.Input[dsl.Dataset] +) -> NamedTuple('Outputs', [ + ('scalar', str), + ('message', str), +]): + """Dummy Training step.""" + with open(dataset.path, encoding="utf-8") as f: + data = f.read() + print('Dataset:', data) + + scalar = '123' + message = f'My model trained using data: {data}' + + from collections import namedtuple + output = namedtuple('Outputs', ['scalar', 'message']) + return output(scalar, message) + +@dsl.component(base_image='registry.access.redhat.com/ubi9/python-311:latest') +def read_dir(data: dsl.Input[dsl.Dataset]) -> str: + """Walk the directory and return a summary of file names.""" + import os + path = data.path + + if not os.path.exists(path): + raise FileNotFoundError(f"Path does not exist: {path}") + + if os.path.isdir(path): + names = [] + for root, _, files in os.walk(path): + for name in files: + names.append(os.path.relpath(os.path.join(root, name), path)) + names.sort() + result = ",".join(names) if names else "EMPTY_DIRECTORY" + print(f"Found {len(names)} files: {result}") + return result + + raise ValueError(f"Not a directory: {path}") + +@dsl.component(base_image='registry.access.redhat.com/ubi9/python-311:latest') +def write_file_artifact(out_ds: Output[dsl.Dataset]): + import os + os.makedirs(os.path.dirname(out_ds.path), exist_ok=True) + with open(out_ds.path, "w", encoding="utf-8") as f: + f.write("Hello from producer file\n") + +@dsl.component(base_image='registry.access.redhat.com/ubi9/python-311:latest') +def write_dir_artifact(out_ds: Output[dsl.Dataset]): + import os + os.makedirs(out_ds.path, exist_ok=True) + with open(os.path.join(out_ds.path, "part-000.txt"), "w", encoding="utf-8") as f: + f.write("First file in directory.\n") + with open(os.path.join(out_ds.path, "part-001.txt"), "w", encoding="utf-8") as f: + f.write("Second file in directory.\n") + +@dsl.component(base_image='registry.access.redhat.com/ubi9/python-311:latest') +def get_uri(d: dsl.Input[dsl.Dataset]) -> str: + print(f"Artifact URI: {d.uri}") + return d.uri + +@dsl.pipeline(name="import-stage") +def import_stage( + file_uri: str, + dir_uri: str, +) -> NamedTuple('ImportOutputs', [('train_result', str), ('dir_result', str)]): + """Nested stage that imports by URI and runs consumers.""" + importer1 = importer( + artifact_uri=file_uri, + artifact_class=dsl.Dataset, + reimport=False, + metadata={ + 'key': 'value', + }, + download_to_workspace=True, + ) + + dir_import = importer( + artifact_uri=dir_uri, + artifact_class=dsl.Dataset, + reimport=True, + download_to_workspace=True, + metadata={ + 'source': 'directory', + }, + ) + + train_task = train(dataset=importer1.output) + dir_task = read_dir(data=dir_import.output) + + ImportOutputs = NamedTuple('ImportOutputs', [('train_result', str), ('dir_result', str)]) + return ImportOutputs(train_task.outputs['scalar'], dir_task.output) + +@dsl.pipeline( + name="pipeline-with-importer-workspace", + description="Importer downloads an artifact into workspace; downstream reads it", + pipeline_config=dsl.PipelineConfig( + workspace=dsl.WorkspaceConfig( + size='1Gi', + kubernetes=dsl.KubernetesWorkspaceConfig( + pvcSpecPatch={'storageClassName': 'standard', 'accessModes': ['ReadWriteOnce']} + ), + ), + ), +) +def pipeline_with_importer_workspace() -> NamedTuple('Outputs', [('train_result', str), ('dir_result', str)]): + """Test pipeline for importer with download_to_workspace feature.""" + + # Produce a file artifact and compute its runtime URI + file_writer = write_file_artifact() + file_uri = get_uri(d=file_writer.outputs["out_ds"]) + + # Produce a directory artifact and compute its runtime URI + dir_writer = write_dir_artifact() + dir_uri = get_uri(d=dir_writer.outputs["out_ds"]) + + # Import and consume inside a nested sub-pipeline + stage = import_stage(file_uri=file_uri.output, dir_uri=dir_uri.output) + + # Return outputs for validation + Outputs = NamedTuple('Outputs', [('train_result', str), ('dir_result', str)]) + return Outputs(stage.outputs['train_result'], stage.outputs['dir_result']) + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=pipeline_with_importer_workspace, + package_path=__file__.replace('.py', '.yaml')) diff --git a/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_importer_workspace.yaml b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_importer_workspace.yaml new file mode 100644 index 00000000000..73694a6a854 --- /dev/null +++ b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_importer_workspace.yaml @@ -0,0 +1,489 @@ +# PIPELINE DEFINITION +# Name: pipeline-with-importer-workspace +# Description: Importer downloads an artifact into workspace; downstream reads it +# Outputs: +# dir_result: str +# train_result: str +components: + comp-get-uri: + executorLabel: exec-get-uri + inputDefinitions: + artifacts: + d: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-get-uri-2: + executorLabel: exec-get-uri-2 + inputDefinitions: + artifacts: + d: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-import-stage: + dag: + outputs: + parameters: + dir_result: + valueFromParameter: + outputParameterKey: Output + producerSubtask: read-dir + train_result: + valueFromParameter: + outputParameterKey: scalar + producerSubtask: train + tasks: + importer: + cachingOptions: + enableCache: true + componentRef: + name: comp-importer + inputs: + parameters: + uri: + componentInputParameter: file_uri + taskInfo: + name: importer + importer-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-importer-2 + inputs: + parameters: + uri: + componentInputParameter: dir_uri + taskInfo: + name: importer-2 + read-dir: + cachingOptions: + enableCache: true + componentRef: + name: comp-read-dir + dependentTasks: + - importer-2 + inputs: + artifacts: + data: + taskOutputArtifact: + outputArtifactKey: artifact + producerTask: importer-2 + taskInfo: + name: read-dir + train: + cachingOptions: + enableCache: true + componentRef: + name: comp-train + dependentTasks: + - importer + inputs: + artifacts: + dataset: + taskOutputArtifact: + outputArtifactKey: artifact + producerTask: importer + taskInfo: + name: train + inputDefinitions: + parameters: + dir_uri: + parameterType: STRING + file_uri: + parameterType: STRING + outputDefinitions: + parameters: + dir_result: + parameterType: STRING + train_result: + parameterType: STRING + comp-importer: + executorLabel: exec-importer + inputDefinitions: + parameters: + uri: + parameterType: STRING + outputDefinitions: + artifacts: + artifact: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-importer-2: + executorLabel: exec-importer-2 + inputDefinitions: + parameters: + uri: + parameterType: STRING + outputDefinitions: + artifacts: + artifact: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-read-dir: + executorLabel: exec-read-dir + inputDefinitions: + artifacts: + data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + outputDefinitions: + parameters: + Output: + parameterType: STRING + comp-train: + executorLabel: exec-train + inputDefinitions: + artifacts: + dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + outputDefinitions: + parameters: + message: + parameterType: STRING + scalar: + parameterType: STRING + comp-write-dir-artifact: + executorLabel: exec-write-dir-artifact + outputDefinitions: + artifacts: + out_ds: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-write-file-artifact: + executorLabel: exec-write-file-artifact + outputDefinitions: + artifacts: + out_ds: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 +deploymentSpec: + executors: + exec-get-uri: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - get_uri + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef get_uri(d: dsl.Input[dsl.Dataset]) -> str:\n print(f\"Artifact\ + \ URI: {d.uri}\")\n return d.uri\n\n" + image: registry.access.redhat.com/ubi9/python-311:latest + exec-get-uri-2: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - get_uri + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef get_uri(d: dsl.Input[dsl.Dataset]) -> str:\n print(f\"Artifact\ + \ URI: {d.uri}\")\n return d.uri\n\n" + image: registry.access.redhat.com/ubi9/python-311:latest + exec-importer: + importer: + artifactUri: + runtimeParameter: uri + downloadToWorkspace: true + metadata: + key: value + typeSchema: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + exec-importer-2: + importer: + artifactUri: + runtimeParameter: uri + downloadToWorkspace: true + metadata: + source: directory + reimport: true + typeSchema: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + exec-read-dir: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - read_dir + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef read_dir(data: dsl.Input[dsl.Dataset]) -> str:\n \"\"\"Walk\ + \ the directory and return a summary of file names.\"\"\"\n import os\n\ + \ path = data.path\n\n if not os.path.exists(path):\n raise\ + \ FileNotFoundError(f\"Path does not exist: {path}\")\n\n if os.path.isdir(path):\n\ + \ names = []\n for root, _, files in os.walk(path):\n \ + \ for name in files:\n names.append(os.path.relpath(os.path.join(root,\ + \ name), path))\n names.sort()\n result = \",\".join(names)\ + \ if names else \"EMPTY_DIRECTORY\"\n print(f\"Found {len(names)}\ + \ files: {result}\")\n return result\n\n raise ValueError(f\"\ + Not a directory: {path}\")\n\n" + image: registry.access.redhat.com/ubi9/python-311:latest + exec-train: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - train + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef train(\n dataset: dsl.Input[dsl.Dataset]\n) -> NamedTuple('Outputs',\ + \ [\n ('scalar', str),\n ('message', str),\n]):\n \"\"\"Dummy Training\ + \ step.\"\"\"\n with open(dataset.path, encoding=\"utf-8\") as f:\n \ + \ data = f.read()\n print('Dataset:', data)\n\n scalar = '123'\n\ + \ message = f'My model trained using data: {data}'\n\n from collections\ + \ import namedtuple\n output = namedtuple('Outputs', ['scalar', 'message'])\n\ + \ return output(scalar, message)\n\n" + image: registry.access.redhat.com/ubi9/python-311:latest + exec-write-dir-artifact: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - write_dir_artifact + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef write_dir_artifact(out_ds: Output[dsl.Dataset]):\n import\ + \ os\n os.makedirs(out_ds.path, exist_ok=True)\n with open(os.path.join(out_ds.path,\ + \ \"part-000.txt\"), \"w\", encoding=\"utf-8\") as f:\n f.write(\"\ + First file in directory.\\n\")\n with open(os.path.join(out_ds.path,\ + \ \"part-001.txt\"), \"w\", encoding=\"utf-8\") as f:\n f.write(\"\ + Second file in directory.\\n\")\n\n" + image: registry.access.redhat.com/ubi9/python-311:latest + exec-write-file-artifact: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - write_file_artifact + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef write_file_artifact(out_ds: Output[dsl.Dataset]):\n import\ + \ os\n os.makedirs(os.path.dirname(out_ds.path), exist_ok=True)\n \ + \ with open(out_ds.path, \"w\", encoding=\"utf-8\") as f:\n f.write(\"\ + Hello from producer file\\n\")\n\n" + image: registry.access.redhat.com/ubi9/python-311:latest +pipelineInfo: + description: Importer downloads an artifact into workspace; downstream reads it + name: pipeline-with-importer-workspace +root: + dag: + outputs: + parameters: + dir_result: + valueFromParameter: + outputParameterKey: dir_result + producerSubtask: import-stage + train_result: + valueFromParameter: + outputParameterKey: train_result + producerSubtask: import-stage + tasks: + get-uri: + cachingOptions: + enableCache: true + componentRef: + name: comp-get-uri + dependentTasks: + - write-file-artifact + inputs: + artifacts: + d: + taskOutputArtifact: + outputArtifactKey: out_ds + producerTask: write-file-artifact + taskInfo: + name: get-uri + get-uri-2: + cachingOptions: + enableCache: true + componentRef: + name: comp-get-uri-2 + dependentTasks: + - write-dir-artifact + inputs: + artifacts: + d: + taskOutputArtifact: + outputArtifactKey: out_ds + producerTask: write-dir-artifact + taskInfo: + name: get-uri-2 + import-stage: + cachingOptions: + enableCache: true + componentRef: + name: comp-import-stage + dependentTasks: + - get-uri + - get-uri-2 + inputs: + parameters: + dir_uri: + taskOutputParameter: + outputParameterKey: Output + producerTask: get-uri-2 + file_uri: + taskOutputParameter: + outputParameterKey: Output + producerTask: get-uri + taskInfo: + name: import-stage + write-dir-artifact: + cachingOptions: + enableCache: true + componentRef: + name: comp-write-dir-artifact + taskInfo: + name: write-dir-artifact + write-file-artifact: + cachingOptions: + enableCache: true + componentRef: + name: comp-write-file-artifact + taskInfo: + name: write-file-artifact + outputDefinitions: + parameters: + dir_result: + parameterType: STRING + train_result: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.14.6 +--- +platforms: + kubernetes: + pipelineConfig: + workspace: + kubernetes: + pvcSpecPatch: + accessModes: + - ReadWriteOnce + storageClassName: standard + size: 1Gi