Skip to content

Commit 59d7f90

Browse files
committed
Keep base AirflowException only in sdk
1 parent cd6290f commit 59d7f90

File tree

4 files changed

+13
-73
lines changed

4 files changed

+13
-73
lines changed

airflow-core/src/airflow/exceptions.py

Lines changed: 8 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,13 @@
2121

2222
from __future__ import annotations
2323

24-
from datetime import datetime
2524
from http import HTTPStatus
2625
from typing import TYPE_CHECKING, NamedTuple
2726

27+
from airflow.sdk.exceptions import AirflowException, AirflowNotFoundException
28+
2829
if TYPE_CHECKING:
2930
from airflow.models import DagRun
30-
from airflow.utils.state import DagRunState
31-
32-
33-
class AirflowException(Exception):
34-
"""
35-
Base class for all Airflow's errors.
36-
37-
Each custom exception should be derived from this class.
38-
"""
39-
40-
status_code = HTTPStatus.INTERNAL_SERVER_ERROR
41-
42-
def serialize(self):
43-
cls = self.__class__
44-
return f"{cls.__module__}.{cls.__name__}", (str(self),), {}
4531

4632

4733
class AirflowDagCycleException(AirflowException):
@@ -54,12 +40,6 @@ class AirflowBadRequest(AirflowException):
5440
status_code = HTTPStatus.BAD_REQUEST
5541

5642

57-
class AirflowNotFoundException(AirflowException):
58-
"""Raise when the requested object/resource is not available in the system."""
59-
60-
status_code = HTTPStatus.NOT_FOUND
61-
62-
6343
class AirflowConfigException(AirflowException):
6444
"""Raise when there is configuration problem."""
6545

@@ -84,13 +64,6 @@ class InvalidStatsNameException(AirflowException):
8464
"""Raise when name of the stats is invalid."""
8565

8666

87-
# Important to inherit BaseException instead of AirflowException->Exception, since this Exception is used
88-
# to explicitly interrupt ongoing task. Code that does normal error-handling should not treat
89-
# such interrupt as an error that can be handled normally. (Compare with KeyboardInterrupt)
90-
class AirflowTaskTimeout(BaseException):
91-
"""Raise when the task execution times-out."""
92-
93-
9467
class AirflowOptionalProviderFeatureException(AirflowException):
9568
"""Raise by providers when imports are missing for optional provider features."""
9669

@@ -180,15 +153,15 @@ class ParamValidationError(AirflowException):
180153
"""Raise when DAG params is invalid."""
181154

182155

183-
class TaskNotFound(AirflowNotFoundException):
156+
class TaskNotFound(Exception):
184157
"""Raise when a Task is not available in the system."""
185158

186159

187-
class TaskInstanceNotFound(AirflowNotFoundException):
160+
class TaskInstanceNotFound(Exception):
188161
"""Raise when a task instance is not available in the system."""
189162

190163

191-
class PoolNotFound(AirflowNotFoundException):
164+
class PoolNotFound(Exception):
192165
"""Raise when a Pool is not available in the system."""
193166

194167

@@ -246,44 +219,6 @@ class VariableNotUnique(AirflowException):
246219
"""Raise when multiple values are found for the same variable name."""
247220

248221

249-
# TODO: workout this to correct place https://github.com/apache/airflow/issues/44353
250-
class DagRunTriggerException(AirflowException):
251-
"""
252-
Signal by an operator to trigger a specific Dag Run of a dag.
253-
254-
Special exception raised to signal that the operator it was raised from wishes to trigger
255-
a specific Dag Run of a dag. This is used in the ``TriggerDagRunOperator``.
256-
"""
257-
258-
def __init__(
259-
self,
260-
*,
261-
trigger_dag_id: str,
262-
dag_run_id: str,
263-
conf: dict | None,
264-
logical_date: datetime | None,
265-
reset_dag_run: bool,
266-
skip_when_already_exists: bool,
267-
wait_for_completion: bool,
268-
allowed_states: list[str | DagRunState],
269-
failed_states: list[str | DagRunState],
270-
poke_interval: int,
271-
deferrable: bool,
272-
):
273-
super().__init__()
274-
self.trigger_dag_id = trigger_dag_id
275-
self.dag_run_id = dag_run_id
276-
self.conf = conf
277-
self.logical_date = logical_date
278-
self.reset_dag_run = reset_dag_run
279-
self.skip_when_already_exists = skip_when_already_exists
280-
self.wait_for_completion = wait_for_completion
281-
self.allowed_states = allowed_states
282-
self.failed_states = failed_states
283-
self.poke_interval = poke_interval
284-
self.deferrable = deferrable
285-
286-
287222
# The try/except handling is needed after we moved all k8s classes to cncf.kubernetes provider
288223
# These two exceptions are used internally by Kubernetes Executor but also by PodGenerator, so we need
289224
# to leave them here in case older version of cncf.kubernetes provider is used to run KubernetesPodOperator
@@ -350,6 +285,9 @@ class UnknownExecutorException(ValueError):
350285
"AirflowSkipException": "airflow.sdk.exceptions.AirflowSkipException",
351286
"AirflowFailException": "airflow.sdk.exceptions.AirflowFailException",
352287
"AirflowSensorTimeout": "airflow.sdk.exceptions.AirflowSensorTimeout",
288+
"AirflowTaskTimeout": "airflow.sdk.exceptions.AirflowTaskTimeout",
289+
"DagRunTriggerException": "airflow.sdk.exceptions.DagRunTriggerException",
290+
"AirflowNotFoundException": "airflow.sdk.exceptions.AirflowNotFoundException",
353291
}
354292

355293

providers/standard/src/airflow/providers/standard/version_compat.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
4141
if AIRFLOW_V_3_1_PLUS:
4242
from airflow.sdk import BaseHook, BaseOperator, timezone
4343
from airflow.sdk.definitions.context import context_merge
44+
from airflow.sdk.exceptions import AirflowSensorTimeout, TaskDeferred
4445
else:
46+
from airflow.exceptions import AirflowSensorTimeout, TaskDeferred # type: ignore[no-redef, attr-defined]
4547
from airflow.hooks.base import BaseHook # type: ignore[attr-defined,no-redef]
4648
from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef]
4749
from airflow.utils import timezone # type: ignore[attr-defined,no-redef]
@@ -64,4 +66,6 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]:
6466
"PokeReturnValue",
6567
"context_merge",
6668
"timezone",
69+
"AirflowSensorTimeout",
70+
"TaskDeferred",
6771
]

providers/standard/tests/unit/standard/sensors/test_filesystem.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,10 @@
2929
except ImportError:
3030
from airflow.utils.timezone import datetime # type: ignore[no-redef]
3131

32-
from airflow.exceptions import AirflowSensorTimeout, TaskDeferred
3332
from airflow.models.dag import DAG
3433
from airflow.providers.standard.sensors.filesystem import FileSensor
3534
from airflow.providers.standard.triggers.file import FileTrigger
36-
from airflow.sdk.exceptions import AirflowSensorTimeout, TaskDeferred
35+
from airflow.providers.standard.version_compat import AirflowSensorTimeout, TaskDeferred
3736

3837
pytestmark = pytest.mark.db_test
3938

task-sdk/src/airflow/sdk/exceptions.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
from http import HTTPStatus
2222
from typing import TYPE_CHECKING, Any
2323

24-
from airflow.exceptions import AirflowException
2524
from airflow.sdk import TriggerRule
2625

2726
if TYPE_CHECKING:

0 commit comments

Comments
 (0)