Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions dev/breeze/tests/test_selective_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1852,7 +1852,7 @@ def test_expected_output_push(
"selected-providers-list-as-string": "amazon apache.beam apache.cassandra apache.kafka "
"cncf.kubernetes common.compat common.sql "
"facebook google hashicorp http microsoft.azure microsoft.mssql mysql "
"openlineage oracle postgres presto salesforce samba sftp ssh trino",
"openlineage oracle postgres presto salesforce samba sftp ssh standard trino",
"all-python-versions": f"['{DEFAULT_PYTHON_MAJOR_MINOR_VERSION}']",
"all-python-versions-list-as-string": DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
"ci-image-build": "true",
Expand All @@ -1864,7 +1864,7 @@ def test_expected_output_push(
"docs-list-as-string": "apache-airflow helm-chart amazon apache.beam apache.cassandra "
"apache.kafka cncf.kubernetes common.compat common.sql facebook google hashicorp http microsoft.azure "
"microsoft.mssql mysql openlineage oracle postgres "
"presto salesforce samba sftp ssh trino",
"presto salesforce samba sftp ssh standard trino",
"skip-prek-hooks": ALL_SKIPPED_COMMITS_IF_NO_UI,
"run-kubernetes-tests": "true",
"upgrade-to-newer-dependencies": "false",
Expand All @@ -1874,12 +1874,13 @@ def test_expected_output_push(
"providers-test-types-list-as-strings-in-json": json.dumps(
[
{
"description": "amazon...google",
"description": "amazon...standard",
"test_types": "Providers[amazon] Providers[apache.beam,apache.cassandra,"
"apache.kafka,cncf.kubernetes,common.compat,common.sql,facebook,"
"hashicorp,http,microsoft.azure,microsoft.mssql,mysql,"
"openlineage,oracle,postgres,presto,salesforce,samba,sftp,ssh,trino] "
"Providers[google]",
"Providers[google] "
"Providers[standard]",
}
]
),
Expand Down Expand Up @@ -2122,7 +2123,7 @@ def test_upgrade_to_newer_dependencies(
"docs-list-as-string": "amazon apache.beam apache.cassandra apache.kafka "
"cncf.kubernetes common.compat common.sql facebook google hashicorp http "
"microsoft.azure microsoft.mssql mysql openlineage oracle "
"postgres presto salesforce samba sftp ssh trino",
"postgres presto salesforce samba sftp ssh standard trino",
},
id="Google provider docs changed",
),
Expand Down
20 changes: 20 additions & 0 deletions providers/google/docs/operators/cloud/cloud_composer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,23 @@ You can trigger a DAG in another Composer environment, use:
:dedent: 4
:start-after: [START howto_operator_trigger_dag_run]
:end-before: [END howto_operator_trigger_dag_run]

Waits for a different DAG, task group, or task to complete
----------------------------------------------------------

You can use sensor that waits for a different DAG, task group, or task to complete for a specific composer environment, use:
:class:`~airflow.providers.google.cloud.sensors.cloud_composer.CloudComposerExternalTaskSensor`

.. exampleinclude:: /../../google/tests/system/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_external_task]
:end-before: [END howto_sensor_external_task]

or you can define the same sensor in the deferrable mode:

.. exampleinclude:: /../../google/tests/system/google/cloud/composer/example_cloud_composer.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_external_task_deferrable_mode]
:end-before: [END howto_sensor_external_task_deferrable_mode]
4 changes: 4 additions & 0 deletions providers/google/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ dependencies = [
"http" = [
"apache-airflow-providers-http"
]
"standard" = [
"apache-airflow-providers-standard"
]

[dependency-groups]
dev = [
Expand All @@ -228,6 +231,7 @@ dev = [
"apache-airflow-providers-salesforce",
"apache-airflow-providers-sftp",
"apache-airflow-providers-ssh",
"apache-airflow-providers-standard",
"apache-airflow-providers-trino",
# Additional devel dependencies (do not remove this line and add extra development dependencies)
"apache-airflow-providers-apache-kafka",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import time
from collections.abc import MutableSequence, Sequence
from typing import TYPE_CHECKING, Any
from urllib.parse import urljoin
from urllib.parse import urlencode, urljoin

from aiohttp import ClientSession
from google.api_core.client_options import ClientOptions
Expand Down Expand Up @@ -505,6 +505,42 @@ def get_dag_runs(

return response.json()

def get_task_instances(
self,
composer_airflow_uri: str,
composer_dag_id: str,
query_parameters: dict | None = None,
timeout: float | None = None,
) -> dict:
"""
Get the list of task instances for provided DAG.

:param composer_airflow_uri: The URI of the Apache Airflow Web UI hosted within Composer environment.
:param composer_dag_id: The ID of DAG.
:query_parameters: Query parameters for this request.
:param timeout: The timeout for this request.
"""
query_string = f"?{urlencode(query_parameters)}" if query_parameters else ""

response = self.make_composer_airflow_api_request(
method="GET",
airflow_uri=composer_airflow_uri,
path=f"/api/v1/dags/{composer_dag_id}/dagRuns/~/taskInstances{query_string}",
timeout=timeout,
)

if response.status_code != 200:
self.log.error(
"Failed to get task instances for dag_id=%s from %s (status=%s): %s",
composer_dag_id,
composer_airflow_uri,
response.status_code,
response.text,
)
response.raise_for_status()

return response.json()


class CloudComposerAsyncHook(GoogleBaseAsyncHook):
"""Hook for Google Cloud Composer async APIs."""
Expand Down Expand Up @@ -849,3 +885,39 @@ async def get_dag_runs(
raise AirflowException(response_body["title"])

return response_body

async def get_task_instances(
self,
composer_airflow_uri: str,
composer_dag_id: str,
query_parameters: dict | None = None,
timeout: float | None = None,
) -> dict:
"""
Get the list of task instances for provided DAG.

:param composer_airflow_uri: The URI of the Apache Airflow Web UI hosted within Composer environment.
:param composer_dag_id: The ID of DAG.
:query_parameters: Query parameters for this request.
:param timeout: The timeout for this request.
"""
query_string = f"?{urlencode(query_parameters)}" if query_parameters else ""

response_body, response_status_code = await self.make_composer_airflow_api_request(
method="GET",
airflow_uri=composer_airflow_uri,
path=f"/api/v1/dags/{composer_dag_id}/dagRuns/~/taskInstances{query_string}",
timeout=timeout,
)

if response_status_code != 200:
self.log.error(
"Failed to get task instances for dag_id=%s from %s (status=%s): %s",
composer_dag_id,
composer_airflow_uri,
response_status_code,
response_body["title"],
)
raise AirflowException(response_body["title"])

return response_body
Loading
Loading