Skip to content

Commit 125f213

Browse files
committed
Fallback to Argo Workflow archives for log collection
Signed-off-by: mprahl <[email protected]>
1 parent 36c8437 commit 125f213

File tree

2 files changed

+250
-35
lines changed

2 files changed

+250
-35
lines changed

.github/actions/deploy/action.yml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,45 @@ runs:
117117
fi
118118
echo "ARGS=$ARGS" >> "$GITHUB_OUTPUT"
119119
120+
- name: Download Argo CLI
121+
id: download-argo-cli
122+
shell: bash
123+
continue-on-error: true
124+
env:
125+
ARGO_VERSION_INPUT: ${{ inputs.argo_version }}
126+
run: |
127+
set -euo pipefail
128+
129+
if command -v argo >/dev/null 2>&1; then
130+
echo "Argo CLI already available on PATH."
131+
exit 0
132+
fi
133+
134+
ARGO_VERSION=""
135+
if [[ -n "${ARGO_VERSION_INPUT}" ]]; then
136+
ARGO_VERSION="${ARGO_VERSION_INPUT}"
137+
elif [[ -f "third_party/argo/VERSION" ]]; then
138+
ARGO_VERSION="$(head -n 1 third_party/argo/VERSION | tr -d '[:space:]')"
139+
fi
140+
141+
if [[ -z "${ARGO_VERSION}" ]]; then
142+
echo "Unable to determine Argo Workflows version." >&2
143+
exit 1
144+
fi
145+
146+
if [[ "${ARGO_VERSION}" != v* ]]; then
147+
ARGO_VERSION="v${ARGO_VERSION}"
148+
fi
149+
150+
DOWNLOAD_URL="https://github.com/argoproj/argo-workflows/releases/download/${ARGO_VERSION}/argo-linux-amd64.gz"
151+
TMP_DIR="$(mktemp -d)"
152+
echo "Attempting to download Argo CLI ${ARGO_VERSION} from ${DOWNLOAD_URL}"
153+
curl -sSfL "${DOWNLOAD_URL}" -o "${TMP_DIR}/argo.gz"
154+
gunzip -c "${TMP_DIR}/argo.gz" > "${TMP_DIR}/argo"
155+
chmod +x "${TMP_DIR}/argo"
156+
echo "${TMP_DIR}" >> "$GITHUB_PATH"
157+
echo "Downloaded Argo CLI to ${TMP_DIR}/argo"
158+
120159
- name: Deploy KFP
121160
id: deploy-kfp
122161
if: ${{ steps.configure.outcome == 'success' }}

backend/test/end2end/utils/e2e_utils.go

Lines changed: 211 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,39 @@
22
package utils
33

44
import (
5+
"context"
56
"fmt"
67
"maps"
8+
"os/exec"
79
"sort"
10+
"strings"
11+
"sync"
812
"time"
913

1014
runparams "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_client/run_service"
1115
"github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_model"
1216
apiserver "github.com/kubeflow/pipelines/backend/src/common/client/api_server/v2"
17+
"github.com/kubeflow/pipelines/backend/src/common/util"
1318
"github.com/kubeflow/pipelines/backend/test/config"
1419
"github.com/kubeflow/pipelines/backend/test/logger"
1520
"github.com/kubeflow/pipelines/backend/test/testutil"
1621
apitests "github.com/kubeflow/pipelines/backend/test/v2/api"
1722

1823
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
24+
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
1925
"github.com/onsi/ginkgo/v2"
2026
"github.com/onsi/gomega"
2127
v1 "k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2229
"k8s.io/client-go/kubernetes"
2330
)
2431

32+
var (
33+
workflowClient versioned.Interface
34+
workflowClientOnce sync.Once
35+
workflowClientErr error
36+
)
37+
2538
// CreatePipelineRun - Create a pipeline run
2639
func CreatePipelineRun(runClient *apiserver.RunClient, testContext *apitests.TestContext, pipelineID *string, pipelineVersionID *string, experimentID *string, inputParams map[string]interface{}) *run_model.V2beta1Run {
2740
runName := fmt.Sprintf("E2e Test Run-%v", testContext.TestStartTimeUTC)
@@ -90,63 +103,226 @@ func ValidateComponentStatuses(runClient *apiserver.RunClient, k8Client *kuberne
90103
gomega.Expect(len(actualTaskDetails)).To(gomega.BeNumerically(">=", len(expectedTaskDetails)), "Number of created DAG tasks should be >= number of expected tasks")
91104
}
92105
}
93-
94106
}
95107

96108
// CapturePodLogsForUnsuccessfulTasks - Capture pod logs of a failed component
97109
func CapturePodLogsForUnsuccessfulTasks(k8Client *kubernetes.Clientset, testContext *apitests.TestContext, taskDetails []*run_model.V2beta1PipelineTaskDetail) {
98110
failedTasks := make(map[string]string)
111+
archivedLogCache := make(map[string]string)
112+
namespace := testutil.GetNamespace()
99113
sort.Slice(taskDetails, func(i, j int) bool {
100114
return time.Time(taskDetails[i].EndTime).After(time.Time(taskDetails[j].EndTime)) // Sort Tasks by End Time in descending order
101115
})
102116
for _, task := range taskDetails {
103-
if task.State != nil {
104-
switch *task.State {
105-
case run_model.V2beta1RuntimeStateSUCCEEDED:
106-
{
107-
logger.Log("SUCCEEDED - Task %s for run %s has finished successfully", task.DisplayName, task.RunID)
108-
}
109-
case run_model.V2beta1RuntimeStateRUNNING:
110-
{
111-
logger.Log("RUNNING - Task %s for Run %s is running", task.DisplayName, task.RunID)
117+
if task.State == nil {
118+
continue
119+
}
112120

121+
switch *task.State {
122+
case run_model.V2beta1RuntimeStateSUCCEEDED:
123+
logger.Log("SUCCEEDED - Task %s for run %s has finished successfully", task.DisplayName, task.RunID)
124+
case run_model.V2beta1RuntimeStateRUNNING:
125+
logger.Log("RUNNING - Task %s for Run %s is running", task.DisplayName, task.RunID)
126+
case run_model.V2beta1RuntimeStateSKIPPED:
127+
logger.Log("SKIPPED - Task %s for Run %s skipped", task.DisplayName, task.RunID)
128+
case run_model.V2beta1RuntimeStateCANCELED:
129+
logger.Log("CANCELED - Task %s for Run %s canceled", task.DisplayName, task.RunID)
130+
case run_model.V2beta1RuntimeStateFAILED:
131+
logger.Log("%s - Task %s for Run %s did not complete successfully", *task.State, task.DisplayName, task.RunID)
132+
133+
podNames := map[string]struct{}{}
134+
if task.PodName != "" {
135+
podNames[task.PodName] = struct{}{}
136+
}
137+
for _, childTask := range task.ChildTasks {
138+
if childTask.PodName != "" {
139+
podNames[childTask.PodName] = struct{}{}
113140
}
114-
case run_model.V2beta1RuntimeStateSKIPPED:
115-
{
116-
logger.Log("SKIPPED - Task %s for Run %s skipped", task.DisplayName, task.RunID)
117-
}
118-
case run_model.V2beta1RuntimeStateCANCELED:
119-
{
120-
logger.Log("CANCELED - Task %s for Run %s canceled", task.DisplayName, task.RunID)
141+
}
142+
143+
if len(podNames) == 0 {
144+
logger.Log("Task %s for Run %s did not report any pod names", task.DisplayName, task.RunID)
145+
failedTasks[task.DisplayName] = string(*task.State)
146+
continue
147+
}
148+
149+
var combinedLog strings.Builder
150+
for podName := range podNames {
151+
logger.Log("Collecting logs for task %s pod %s", task.DisplayName, podName)
152+
combinedLog.WriteString(fmt.Sprintf("===== Pod: %s =====\n", podName))
153+
154+
podLog := testutil.ReadPodLogs(k8Client, namespace, podName, nil, &testContext.TestStartTimeUTC, config.PodLogLimit)
155+
missingPod := false
156+
meaningful, missingPod := hasMeaningfulLogs(podLog)
157+
switch {
158+
case meaningful:
159+
combinedLog.WriteString("----- Live Logs (kubectl) -----\n")
160+
combinedLog.WriteString(podLog)
161+
if !strings.HasSuffix(podLog, "\n") {
162+
combinedLog.WriteString("\n")
163+
}
164+
case strings.TrimSpace(podLog) != "":
165+
combinedLog.WriteString(podLog)
166+
if !strings.HasSuffix(podLog, "\n") {
167+
combinedLog.WriteString("\n")
168+
}
169+
if missingPod {
170+
combinedLog.WriteString("Pod logs unavailable; pod not found. Falling back to archived logs.\n")
171+
} else {
172+
combinedLog.WriteString("Live logs unavailable via kubectl logs.\n")
173+
}
174+
default:
175+
if missingPod {
176+
combinedLog.WriteString("Pod logs unavailable; pod not found. Falling back to archived logs.\n")
177+
} else {
178+
combinedLog.WriteString("Live logs unavailable via kubectl logs.\n")
179+
}
121180
}
122-
case run_model.V2beta1RuntimeStateFAILED:
123-
{
124-
logger.Log("%s - Task %s for Run %s did not complete successfully", *task.State, task.DisplayName, task.RunID)
125-
for _, childTask := range task.ChildTasks {
126-
podName := childTask.PodName
127-
if podName != "" {
128-
logger.Log("Capturing pod logs for task %s, with pod name %s", task.DisplayName, podName)
129-
podLog := testutil.ReadPodLogs(k8Client, *config.Namespace, podName, nil, &testContext.TestStartTimeUTC, config.PodLogLimit)
130-
logger.Log("Pod logs captured for task %s in pod %s", task.DisplayName, podName)
131-
logger.Log("Attaching pod logs to the report")
132-
ginkgo.AddReportEntry(fmt.Sprintf("Failing '%s' Component Log", task.DisplayName), podLog)
133-
logger.Log("Attached pod logs to the report")
181+
182+
if missingPod {
183+
archivedLog, err := getArchivedLogWithCache(archivedLogCache, k8Client, namespace, task.RunID, podName)
184+
if err != nil {
185+
logger.Log("Failed to retrieve archived logs for pod %s: %v", podName, err)
186+
combinedLog.WriteString(fmt.Sprintf("Failed to retrieve archived logs via Argo Workflows: %v\n", err))
187+
} else if strings.TrimSpace(archivedLog) != "" {
188+
combinedLog.WriteString("----- Archived Logs (Argo) -----\n")
189+
combinedLog.WriteString(archivedLog)
190+
if !strings.HasSuffix(archivedLog, "\n") {
191+
combinedLog.WriteString("\n")
134192
}
193+
} else {
194+
combinedLog.WriteString("Archived logs were empty.\n")
135195
}
136-
failedTasks[task.DisplayName] = string(*task.State)
137-
}
138-
default:
139-
{
140-
logger.Log("UNKNOWN state - Task %s for Run %s has an UNKNOWN state", task.DisplayName, task.RunID)
141196
}
197+
198+
combinedLog.WriteString("\n")
199+
}
200+
201+
entryContent := combinedLog.String()
202+
if strings.TrimSpace(entryContent) == "" {
203+
entryContent = fmt.Sprintf("No logs were available for failed task %s", task.DisplayName)
142204
}
205+
206+
logger.Log("Attaching logs to report for task %s", task.DisplayName)
207+
ginkgo.AddReportEntry(fmt.Sprintf("Failing '%s' Component Log", task.DisplayName), entryContent)
208+
logger.Log("Attached logs to the report for task %s", task.DisplayName)
209+
210+
failedTasks[task.DisplayName] = string(*task.State)
211+
default:
212+
logger.Log("UNKNOWN state - Task %s for Run %s has an UNKNOWN state", task.DisplayName, task.RunID)
143213
}
144214
}
145215
if len(failedTasks) > 0 {
146216
logger.Log("Found failed tasks: %v", maps.Keys(failedTasks))
147217
}
148218
}
149219

220+
func getArchivedLogWithCache(cache map[string]string, k8Client *kubernetes.Clientset, namespace, runID, podName string) (string, error) {
221+
cacheKey := fmt.Sprintf("%s::%s", runID, podName)
222+
if val, ok := cache[cacheKey]; ok {
223+
return val, nil
224+
}
225+
226+
logContent, err := retrieveArchivedLogs(k8Client, namespace, runID, podName)
227+
if err != nil {
228+
return "", err
229+
}
230+
231+
cache[cacheKey] = logContent
232+
return logContent, nil
233+
}
234+
235+
func retrieveArchivedLogs(k8Client *kubernetes.Clientset, namespace, runID, podName string) (string, error) {
236+
workflowName, err := resolveWorkflowNameForRun(k8Client, namespace, runID)
237+
if err != nil {
238+
return "", err
239+
}
240+
241+
cliPath, err := ensureArgoCLI()
242+
if err != nil {
243+
return "", err
244+
}
245+
246+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
247+
defer cancel()
248+
249+
cmd := exec.CommandContext(ctx, cliPath, "logs", workflowName, podName, "-n", namespace, "--no-color")
250+
output, err := cmd.CombinedOutput()
251+
if err != nil {
252+
if ctx.Err() == context.DeadlineExceeded {
253+
return "", fmt.Errorf("argo logs command timed out after 30s\n%s", string(output))
254+
}
255+
return "", fmt.Errorf("argo logs command failed: %w\n%s", err, string(output))
256+
}
257+
258+
return string(output), nil
259+
}
260+
261+
func hasMeaningfulLogs(logText string) (bool, bool) {
262+
trimmed := strings.TrimSpace(logText)
263+
if trimmed == "" {
264+
return false, false
265+
}
266+
lower := strings.ToLower(trimmed)
267+
268+
missingPod := strings.Contains(lower, "not found")
269+
if strings.Contains(lower, "no pod logs available") ||
270+
strings.Contains(lower, "could not find pod containing container") ||
271+
strings.Contains(lower, "failed to stream pod logs") {
272+
return false, missingPod
273+
}
274+
275+
return true, missingPod
276+
}
277+
278+
func ensureArgoCLI() (string, error) {
279+
return exec.LookPath("argo")
280+
}
281+
282+
func resolveWorkflowNameForRun(k8Client *kubernetes.Clientset, namespace, runID string) (string, error) {
283+
if runID == "" {
284+
return "", fmt.Errorf("run ID is empty")
285+
}
286+
287+
wfClient, err := getWorkflowClient()
288+
if err != nil {
289+
return "", err
290+
}
291+
292+
labelSelector := fmt.Sprintf("%s=%s", util.LabelKeyWorkflowRunId, runID)
293+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
294+
defer cancel()
295+
296+
workflows, err := wfClient.ArgoprojV1alpha1().Workflows(namespace).List(ctx, metav1.ListOptions{
297+
LabelSelector: labelSelector,
298+
})
299+
if err != nil {
300+
if ctx.Err() == context.DeadlineExceeded {
301+
return "", fmt.Errorf("listing workflows timed out after 10s for run ID %s", runID)
302+
}
303+
return "", fmt.Errorf("failed to list workflows: %w", err)
304+
}
305+
306+
if len(workflows.Items) == 0 {
307+
return "", fmt.Errorf("no workflow found in namespace %s with run ID %s", namespace, runID)
308+
}
309+
310+
return workflows.Items[0].Name, nil
311+
}
312+
313+
func getWorkflowClient() (versioned.Interface, error) {
314+
workflowClientOnce.Do(func() {
315+
restConfig, err := util.GetKubernetesConfig()
316+
if err != nil {
317+
workflowClientErr = fmt.Errorf("failed to create kubernetes config: %w", err)
318+
return
319+
}
320+
321+
workflowClient, workflowClientErr = versioned.NewForConfig(restConfig)
322+
})
323+
return workflowClient, workflowClientErr
324+
}
325+
150326
type TaskDetails struct {
151327
TaskName string
152328
Task v1alpha1.DAGTask
@@ -156,7 +332,7 @@ type TaskDetails struct {
156332

157333
// GetTasksFromWorkflow - Get tasks from a compiled workflow
158334
func GetTasksFromWorkflow(workflow *v1alpha1.Workflow) []TaskDetails {
159-
var containers = make(map[string]*v1.Container)
335+
containers := make(map[string]*v1.Container)
160336
var tasks []TaskDetails
161337
for _, template := range workflow.Spec.Templates {
162338
if template.Container != nil {

0 commit comments

Comments
 (0)