Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions api/v2alpha1/pipeline_spec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -868,11 +868,14 @@ message PipelineDeploymentConfig {
map<string, ValueOrRuntimeParameter> custom_properties = 4
[deprecated = true];

// Whether or not import an artifact regardless it has been imported before.
bool reimport = 5;

// Properties of the Artifact.
google.protobuf.Struct metadata = 6;

// Whether or not import an artifact regardless it has been imported before.
bool reimport = 5;
// If true, download artifact into the pipeline workspace.
bool download_to_workspace = 7;
}

// ResolverSpec resolves artifacts from historical metadata and returns them
Expand Down
2 changes: 1 addition & 1 deletion backend/src/v2/compiler/argocompiler/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (c *workflowCompiler) task(name string, task *pipelinespec.PipelineTaskSpec
// it's impossible to add a when condition based on driver outputs.
return nil, fmt.Errorf("triggerPolicy.condition on importer task is not supported")
}
importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID)
importer, err := c.importerTask(name, task, taskSpecJson, inputs.parentDagID, e.Importer.GetDownloadToWorkspace())
if err != nil {
return nil, err
}
Expand Down
41 changes: 32 additions & 9 deletions backend/src/v2/compiler/argocompiler/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (c *workflowCompiler) Importer(name string, componentSpec *pipelinespec.Com
return c.saveComponentImpl(name, importer)
}

func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string) (*wfapi.DAGTask, error) {
func (c *workflowCompiler) importerTask(name string, task *pipelinespec.PipelineTaskSpec, taskJSON string, parentDagID string, downloadToWorkspace bool) (*wfapi.DAGTask, error) {
componentPlaceholder, err := c.useComponentSpec(task.GetComponentRef().GetName())
if err != nil {
return nil, err
Expand All @@ -45,7 +45,7 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
}
return &wfapi.DAGTask{
Name: name,
Template: c.addImporterTemplate(),
Template: c.addImporterTemplate(downloadToWorkspace),
Arguments: wfapi.Arguments{Parameters: []wfapi.Parameter{{
Name: paramTask,
Value: wfapi.AnyStringPtr(taskJSON),
Expand All @@ -62,8 +62,11 @@ func (c *workflowCompiler) importerTask(name string, task *pipelinespec.Pipeline
}, nil
}

func (c *workflowCompiler) addImporterTemplate() string {
func (c *workflowCompiler) addImporterTemplate(downloadToWorkspace bool) string {
name := "system-importer"
if downloadToWorkspace {
name += "-workspace"
}
if _, alreadyExists := c.templates[name]; alreadyExists {
return name
}
Expand Down Expand Up @@ -104,6 +107,24 @@ func (c *workflowCompiler) addImporterTemplate() string {
if value, ok := os.LookupEnv(PublishLogsEnvVar); ok {
args = append(args, "--publish_logs", value)
}

var volumeMounts []k8score.VolumeMount
var volumes []k8score.Volume
if downloadToWorkspace {
volumeMounts = append(volumeMounts, k8score.VolumeMount{
Name: workspaceVolumeName,
MountPath: component.WorkspaceMountPath,
})
volumes = append(volumes, k8score.Volume{
Name: workspaceVolumeName,
VolumeSource: k8score.VolumeSource{
PersistentVolumeClaim: &k8score.PersistentVolumeClaimVolumeSource{
ClaimName: fmt.Sprintf("{{workflow.name}}-%s", workspaceVolumeName),
},
},
})
}

importerTemplate := &wfapi.Template{
Name: name,
Inputs: wfapi.Inputs{
Expand All @@ -115,13 +136,15 @@ func (c *workflowCompiler) addImporterTemplate() string {
},
},
Container: &k8score.Container{
Image: c.launcherImage,
Command: c.launcherCommand,
Args: args,
EnvFrom: []k8score.EnvFromSource{metadataEnvFrom},
Env: commonEnvs,
Resources: driverResources,
Image: c.launcherImage,
Command: c.launcherCommand,
Args: args,
EnvFrom: []k8score.EnvFromSource{metadataEnvFrom},
Env: commonEnvs,
Resources: driverResources,
VolumeMounts: volumeMounts,
},
Volumes: volumes,
}

// If TLS is enabled (apiserver or metadata), add the custom CA bundle to the importer template.
Expand Down
60 changes: 57 additions & 3 deletions backend/src/v2/component/importer_launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/kubeflow/pipelines/backend/src/common/util"

"github.com/kubeflow/pipelines/backend/src/v2/config"
"github.com/kubeflow/pipelines/backend/src/v2/objectstore"

pb "github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata"
Expand Down Expand Up @@ -125,12 +126,18 @@ func (l *ImportLauncher) Execute(ctx context.Context) (err error) {
if err != nil {
return err
}
// Determine execution type
executionType := metadata.ExecutionType(metadata.ImporterExecutionTypeName)
if l.importer.GetDownloadToWorkspace() {
executionType = metadata.ExecutionType(metadata.ImporterWorkspaceExecutionTypeName)
}

ecfg := &metadata.ExecutionConfig{
TaskName: l.task.GetTaskInfo().GetName(),
PodName: l.launcherV2Options.PodName,
PodUID: l.launcherV2Options.PodUID,
Namespace: l.launcherV2Options.Namespace,
ExecutionType: metadata.ImporterExecutionTypeName,
ExecutionType: executionType,
ParentDagID: l.importerLauncherOptions.ParentDagID,
}
createdExecution, err := l.metadataClient.CreateExecution(ctx, pipeline, ecfg)
Expand Down Expand Up @@ -253,15 +260,17 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact
}

if strings.HasPrefix(artifactUri, "oci://") {
// OCI artifacts are not supported when workspace is used
if l.importer.GetDownloadToWorkspace() {
return nil, fmt.Errorf("importer workspace download does not support OCI registries")
}
artifactType, err := metadata.SchemaToArtifactType(schema)
if err != nil {
return nil, fmt.Errorf("converting schema to artifact type failed: %w", err)
}

if *artifactType.Name != "system.Model" {
return nil, fmt.Errorf("the %s artifact type does not support OCI registries", *artifactType.Name)
}

return artifact, nil
}

Expand All @@ -283,9 +292,54 @@ func (l *ImportLauncher) ImportSpecToMLMDArtifact(ctx context.Context) (artifact
}
storeSessionInfoStr := string(storeSessionInfoJSON)
artifact.CustomProperties["store_session_info"] = metadata.StringValue(storeSessionInfoStr)

// Download the artifact into the workspace
if l.importer.GetDownloadToWorkspace() {
bucketConfig, err := l.resolveBucketConfigForURI(ctx, artifactUri)
if err != nil {
return nil, err
}
localPath, err := LocalWorkspacePathForURI(artifactUri)
if err != nil {
return nil, fmt.Errorf("failed to get local path for uri %q: %w", artifactUri, err)
}
blobKey, err := bucketConfig.KeyFromURI(artifactUri)
if err != nil {
return nil, fmt.Errorf("failed to derive blob key from uri %q while downloading artifact into workspace: %w", artifactUri, err)
}
bucket, err := objectstore.OpenBucket(ctx, l.k8sClient, l.launcherV2Options.Namespace, bucketConfig)
if err != nil {
return nil, fmt.Errorf("failed to open bucket for uri %q: %w", artifactUri, err)
}
defer bucket.Close()
glog.Infof("Downloading artifact %q (blob key %q) to workspace path %q", artifactUri, blobKey, localPath)
if err := objectstore.DownloadBlob(ctx, bucket, localPath, blobKey); err != nil {
return nil, fmt.Errorf("failed to download artifact to workspace: %w", err)
}
}
return artifact, nil
}

// resolveBucketConfigForURI parses bucket configuration for a given artifact URI and
// attaches session information from the kfp-launcher ConfigMap when available.
func (l *ImportLauncher) resolveBucketConfigForURI(ctx context.Context, uri string) (*objectstore.Config, error) {
bucketConfig, err := objectstore.ParseBucketConfigForArtifactURI(uri)
if err != nil {
return nil, fmt.Errorf("failed to parse bucket config while resolving uri %q: %w", uri, err)
}
// Resolve and attach session info from kfp-launcher config for the artifact provider
if cfg, err := config.FromConfigMap(ctx, l.k8sClient, l.launcherV2Options.Namespace); err != nil {
glog.Warningf("failed to load launcher config while resolving bucket config: %v", err)
} else if cfg != nil {
if sess, err := cfg.GetStoreSessionInfo(uri); err != nil {
glog.Warningf("failed to resolve store session info for %q: %v", uri, err)
} else {
bucketConfig.SessionInfo = &sess
}
}
return bucketConfig, nil
}

func (l *ImportLauncher) getOutPutArtifactName() (string, error) {
outPutNames := make([]string, 0, len(l.component.GetOutputDefinitions().GetArtifacts()))
for name := range l.component.GetOutputDefinitions().GetArtifacts() {
Expand Down
35 changes: 35 additions & 0 deletions backend/src/v2/component/launcher_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,12 @@ func downloadArtifacts(ctx context.Context, executorInput *pipelinespec.Executor
for _, artifact := range artifactList.Artifacts {
// Iterating through the artifact list allows for collected artifacts to be properly consumed.
inputArtifact := artifact
// Skip downloading if the artifact is flagged as already present in the workspace
if inputArtifact.GetMetadata() != nil {
if v, ok := inputArtifact.GetMetadata().GetFields()["_kfp_workspace"]; ok && v.GetBoolValue() {
continue
}
}
localPath, err := LocalPathForURI(inputArtifact.Uri)
if err != nil {
glog.Warningf("Input Artifact %q does not have a recognized storage URI %q. Skipping downloading to local path.", name, inputArtifact.Uri)
Expand Down Expand Up @@ -858,6 +864,20 @@ func getPlaceholders(executorInput *pipelinespec.ExecutorInput) (placeholders ma
key := fmt.Sprintf(`{{$.inputs.artifacts['%s'].uri}}`, name)
placeholders[key] = inputArtifact.Uri

// If the artifact is marked as already in the workspace, map to the workspace path
// with the same shape as LocalPathForURI, but rebased under the workspace mount.
if inputArtifact.GetMetadata() != nil {
if v, ok := inputArtifact.GetMetadata().GetFields()["_kfp_workspace"]; ok && v.GetBoolValue() {
localPath, lerr := LocalWorkspacePathForURI(inputArtifact.Uri)
if lerr != nil {
return nil, fmt.Errorf("failed to get local workspace path for input artifact %q: %w", name, lerr)
}
key = fmt.Sprintf(`{{$.inputs.artifacts['%s'].path}}`, name)
placeholders[key] = localPath
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update needsWorkspaceMount so that if strings.HasPrefix(strVal.StringValue, component.WorkspaceMountPath) { is if strings.Contains(strVal.StringValue, component.WorkspaceMountPath) {?

This is because the placeholder replacement logic is much less strict and we need needsWorkspaceMount to match.

continue
}
}

localPath, err := LocalPathForURI(inputArtifact.Uri)
if err != nil {
// Input Artifact does not have a recognized storage URI
Expand Down Expand Up @@ -1010,6 +1030,21 @@ func retrieveArtifactPath(artifact *pipelinespec.RuntimeArtifact) (string, error
}
}

// LocalWorkspacePathForURI returns the local workspace path for a given artifact URI.
// It preserves the same path shape as LocalPathForURI, but rebases it under the
// workspace artifacts directory: /kfp-workspace/.artifacts/...
func LocalWorkspacePathForURI(uri string) (string, error) {
if strings.HasPrefix(uri, "oci://") {
return "", fmt.Errorf("failed to generate workspace path for URI %s: OCI not supported for workspace artifacts", uri)
}
localPath, err := LocalPathForURI(uri)
if err != nil {
return "", err
}
// Rebase under the workspace mount, stripping the leading '/'
return filepath.Join(WorkspaceMountPath, ".artifacts", strings.TrimPrefix(localPath, "/")), nil
}

func prepareOutputFolders(executorInput *pipelinespec.ExecutorInput) error {
for name, parameter := range executorInput.GetOutputs().GetParameters() {
dir := filepath.Dir(parameter.OutputFile)
Expand Down
24 changes: 24 additions & 0 deletions backend/src/v2/component/launcher_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"io"
"os"
"path/filepath"
"testing"

"github.com/kubeflow/pipelines/backend/src/v2/cacheutils"
Expand Down Expand Up @@ -176,6 +177,29 @@ func Test_executeV2_publishLogs(t *testing.T) {
}
}

func Test_getPlaceholders_WorkspaceArtifactPath(t *testing.T) {
execIn := &pipelinespec.ExecutorInput{
Inputs: &pipelinespec.ExecutorInput_Inputs{
Artifacts: map[string]*pipelinespec.ArtifactList{
"data": {
Artifacts: []*pipelinespec.RuntimeArtifact{
{Uri: "minio://mlpipeline/sample/sample.txt", Metadata: &structpb.Struct{Fields: map[string]*structpb.Value{"_kfp_workspace": structpb.NewBoolValue(true)}}},
},
},
},
},
}
ph, err := getPlaceholders(execIn)
if err != nil {
t.Fatalf("getPlaceholders error: %v", err)
}
actual := ph["{{$.inputs.artifacts['data'].path}}"]
expected := filepath.Join(WorkspaceMountPath, ".artifacts", "minio", "mlpipeline", "sample", "sample.txt")
if actual != expected {
t.Fatalf("placeholder path mismatch: actual=%q expected=%q", actual, expected)
}
}

func Test_executorInput_compileCmdAndArgs(t *testing.T) {
executorInputJSON := `{
"inputs": {
Expand Down
2 changes: 1 addition & 1 deletion backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ func needsWorkspaceMount(executorInput *pipelinespec.ExecutorInput) bool {
return true
}

if strings.HasPrefix(strVal.StringValue, component.WorkspaceMountPath) {
if strings.Contains(strVal.StringValue, component.WorkspaceMountPath) {
return true
}
}
Expand Down
Loading
Loading