Skip to content

Commit 8eee5ee

Browse files
github-actions[bot]sunank200
authored andcommitted
[v3-1-test] Fix operator extra links not appearing on failed tasks (#58227) (#58508)
Co-authored-by: Ankit Chaurasia <[email protected]>
1 parent b03cd4d commit 8eee5ee

File tree

2 files changed

+100
-4
lines changed

2 files changed

+100
-4
lines changed

task-sdk/src/airflow/sdk/execution_time/task_runner.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,9 +1395,17 @@ def finalize(
13951395
task = ti.task
13961396
# Pushing xcom for each operator extra links defined on the operator only.
13971397
for oe in task.operator_extra_links:
1398-
link, xcom_key = oe.get_link(operator=task, ti_key=ti), oe.xcom_key # type: ignore[arg-type]
1399-
log.debug("Setting xcom for operator extra link", link=link, xcom_key=xcom_key)
1400-
_xcom_push_to_db(ti, key=xcom_key, value=link)
1398+
try:
1399+
link, xcom_key = oe.get_link(operator=task, ti_key=ti), oe.xcom_key # type: ignore[arg-type]
1400+
log.debug("Setting xcom for operator extra link", link=link, xcom_key=xcom_key)
1401+
_xcom_push_to_db(ti, key=xcom_key, value=link)
1402+
except Exception:
1403+
log.exception(
1404+
"Failed to push an xcom for task operator extra link",
1405+
link_name=oe.name,
1406+
xcom_key=oe.xcom_key,
1407+
ti=ti,
1408+
)
14011409

14021410
if getattr(ti.task, "overwrite_rtif_after_execution", False):
14031411
log.debug("Overwriting Rendered template fields.")

task-sdk/tests/task_sdk/execution_time/test_task_runner.py

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from pathlib import Path
2828
from typing import TYPE_CHECKING
2929
from unittest import mock
30-
from unittest.mock import patch
30+
from unittest.mock import call, patch
3131

3232
import pandas as pd
3333
import pytest
@@ -48,6 +48,7 @@
4848
from airflow.sdk import (
4949
DAG,
5050
BaseOperator,
51+
BaseOperatorLink,
5152
Connection,
5253
dag as dag_decorator,
5354
get_current_context,
@@ -1723,6 +1724,93 @@ def execute(self, context):
17231724
map_index=runtime_ti.map_index,
17241725
)
17251726

1727+
def test_task_failed_with_operator_extra_links(
1728+
self, create_runtime_ti, mock_supervisor_comms, time_machine
1729+
):
1730+
"""Test that operator extra links are pushed to xcoms even when task fails."""
1731+
instant = timezone.datetime(2024, 12, 3, 10, 0)
1732+
time_machine.move_to(instant, tick=False)
1733+
1734+
class DummyTestOperator(BaseOperator):
1735+
operator_extra_links = (AirflowLink(),)
1736+
1737+
def execute(self, context):
1738+
raise ValueError("Task failed intentionally")
1739+
1740+
task = DummyTestOperator(task_id="task_with_operator_extra_links")
1741+
runtime_ti = create_runtime_ti(task=task)
1742+
context = runtime_ti.get_template_context()
1743+
runtime_ti.start_date = instant
1744+
runtime_ti.end_date = instant
1745+
1746+
state, _, error = run(runtime_ti, context=context, log=mock.MagicMock())
1747+
assert state == TaskInstanceState.FAILED
1748+
assert error is not None
1749+
1750+
with mock.patch.object(XCom, "_set_xcom_in_db") as mock_xcom_set:
1751+
finalize(
1752+
runtime_ti,
1753+
log=mock.MagicMock(),
1754+
state=TaskInstanceState.FAILED,
1755+
context=context,
1756+
error=error,
1757+
)
1758+
assert mock_xcom_set.mock_calls == [
1759+
call(
1760+
key="_link_AirflowLink",
1761+
value="https://airflow.apache.org",
1762+
dag_id=runtime_ti.dag_id,
1763+
task_id=runtime_ti.task_id,
1764+
run_id=runtime_ti.run_id,
1765+
map_index=runtime_ti.map_index,
1766+
)
1767+
]
1768+
1769+
def test_operator_extra_links_exception_handling(
1770+
self, create_runtime_ti, mock_supervisor_comms, time_machine
1771+
):
1772+
"""Test that exceptions in get_link() don't prevent other links from being pushed."""
1773+
instant = timezone.datetime(2024, 12, 3, 10, 0)
1774+
time_machine.move_to(instant, tick=False)
1775+
1776+
class FailingLink(BaseOperatorLink):
1777+
"""A link that raises an exception when get_link is called."""
1778+
1779+
name = "failing_link"
1780+
1781+
def get_link(self, operator, *, ti_key):
1782+
raise ValueError("Link generation failed")
1783+
1784+
class DummyTestOperator(BaseOperator):
1785+
operator_extra_links = (FailingLink(), AirflowLink())
1786+
1787+
def execute(self, context):
1788+
pass
1789+
1790+
task = DummyTestOperator(task_id="task_with_multiple_links")
1791+
runtime_ti = create_runtime_ti(task=task)
1792+
context = runtime_ti.get_template_context()
1793+
runtime_ti.start_date = instant
1794+
runtime_ti.end_date = instant
1795+
1796+
with mock.patch.object(XCom, "_set_xcom_in_db") as mock_xcom_set:
1797+
finalize(
1798+
runtime_ti,
1799+
log=mock.MagicMock(),
1800+
state=TaskInstanceState.SUCCESS,
1801+
context=context,
1802+
)
1803+
assert mock_xcom_set.mock_calls == [
1804+
call(
1805+
key="_link_AirflowLink",
1806+
value="https://airflow.apache.org",
1807+
dag_id=runtime_ti.dag_id,
1808+
task_id=runtime_ti.task_id,
1809+
run_id=runtime_ti.run_id,
1810+
map_index=runtime_ti.map_index,
1811+
)
1812+
]
1813+
17261814
@pytest.mark.parametrize(
17271815
["cmd", "rendered_cmd"],
17281816
[

0 commit comments

Comments
 (0)