Skip to content

Commit ac96487

Browse files
authored
Add executor.running_dags gauge to expose count of running DAGs (#12368) (#52815)
* feature: introducing a counter for number of DAGs in running state * wrap emit executor.running_dags gague as a function * add test for _emit_running_dags_metric * add executor.running_dags to metrics.rst * use Sqlalchemy 2.0 style * check for metrics configuration before emitting running dags metric * Enable metrics configuration for running dags in DagFileProcessorManager test * Add configuration for multi-team support in test_bundles_with_team * add go-sdk/bin/.gitignore back * Update .gitignore to include all files except itself * Refactor running DAGs metric emission: move to SchedulerJobRunner and update tests * Add running DAGs metric and remove unused metrics check * Add dagrun_metrics_interval to config and update running DAGs metric to use 'scheduler' namespace * Remove unused import of DagRunState in test_emit_running_dags_metric * Fix Low dep tests:core * Rename running DAGs metric to 'scheduler.dagruns.running' for consistency and update related test assertions
1 parent 1f66c7f commit ac96487

File tree

4 files changed

+43
-0
lines changed

4 files changed

+43
-0
lines changed

airflow-core/docs/administration-and-deployment/logging-monitoring/metrics.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ Name Description
236236
``scheduler.tasks.executable`` Number of tasks that are ready for execution (set to queued)
237237
with respect to pool limits, Dag concurrency, executor state,
238238
and priority.
239+
``scheduler.dagruns.running`` Number of DAGs whose latest DagRun is currently in the ``RUNNING`` state
239240
``executor.open_slots.<executor_class_name>`` Number of open slots on a specific executor. Only emitted when multiple executors are configured.
240241
``executor.open_slots`` Number of open slots on executor
241242
``executor.queued_tasks.<executor_class_name>`` Number of queued tasks on on a specific executor. Only emitted when multiple executors are configured.

airflow-core/src/airflow/config_templates/config.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2088,6 +2088,14 @@ scheduler:
20882088
type: float
20892089
example: ~
20902090
default: "30.0"
2091+
dagrun_metrics_interval:
2092+
description: |
2093+
How often (in seconds) the scheduler emits metrics on running DAG runs
2094+
to StatsD (if statsd_on is enabled)
2095+
version_added: 3.1.0
2096+
type: float
2097+
example: ~
2098+
default: "30.0"
20912099
scheduler_health_check_threshold:
20922100
description: |
20932101
If the last scheduler heartbeat happened more than ``[scheduler] scheduler_health_check_threshold``

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1339,6 +1339,11 @@ def _run_scheduler_loop(self) -> None:
13391339
self._emit_running_ti_metrics,
13401340
)
13411341

1342+
timers.call_regular_interval(
1343+
conf.getfloat("scheduler", "dagrun_metrics_interval", fallback=30.0),
1344+
self._emit_running_dags_metric,
1345+
)
1346+
13421347
timers.call_regular_interval(
13431348
conf.getfloat("scheduler", "task_instance_heartbeat_timeout_detection_interval", fallback=10.0),
13441349
self._find_and_purge_task_instances_without_heartbeats,
@@ -2276,6 +2281,12 @@ def _emit_running_ti_metrics(self, session: Session = NEW_SESSION) -> None:
22762281

22772282
self.previous_ti_running_metrics = ti_running_metrics
22782283

2284+
@provide_session
2285+
def _emit_running_dags_metric(self, session: Session = NEW_SESSION) -> None:
2286+
stmt = select(func.count()).select_from(DagRun).where(DagRun.state == DagRunState.RUNNING)
2287+
running_dags = session.scalar(stmt)
2288+
Stats.gauge("scheduler.dagruns.running", running_dags)
2289+
22792290
@provide_session
22802291
def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None:
22812292
from airflow.models.pool import Pool

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7165,6 +7165,29 @@ def test_process_expired_deadlines_no_deadlines_found(self, mock_handle_miss, se
71657165
# The handler should not be called, but no exceptions should be raised either.`
71667166
mock_handle_miss.assert_not_called()
71677167

7168+
def test_emit_running_dags_metric(self, dag_maker, monkeypatch):
7169+
"""Test that the running_dags metric is emitted correctly."""
7170+
with dag_maker("metric_dag") as dag:
7171+
_ = dag
7172+
dag_maker.create_dagrun(run_id="run_1", state=DagRunState.RUNNING, logical_date=timezone.utcnow())
7173+
dag_maker.create_dagrun(
7174+
run_id="run_2", state=DagRunState.RUNNING, logical_date=timezone.utcnow() + timedelta(hours=1)
7175+
)
7176+
7177+
recorded: list[tuple[str, int]] = []
7178+
7179+
def _fake_gauge(metric: str, value: int, *_, **__):
7180+
recorded.append((metric, value))
7181+
7182+
monkeypatch.setattr("airflow.jobs.scheduler_job_runner.Stats.gauge", _fake_gauge, raising=True)
7183+
7184+
with conf_vars({("metrics", "statsd_on"): "True"}):
7185+
scheduler_job = Job()
7186+
self.job_runner = SchedulerJobRunner(scheduler_job)
7187+
self.job_runner._emit_running_dags_metric()
7188+
7189+
assert recorded == [("scheduler.dagruns.running", 2)]
7190+
71687191

71697192
@pytest.mark.need_serialized_dag
71707193
def test_schedule_dag_run_with_upstream_skip(dag_maker, session):

0 commit comments

Comments
 (0)