Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,13 @@
)

# [END howto_operator_trigger_dagrun]

# [START howto_operator_trigger_dagrun_with_note]

trigger_with_note = TriggerDagRunOperator(
task_id="trigger_with_note",
trigger_dag_id="example_trigger_target_dag",
note="Triggered with a note!",
)

# [END howto_operator_trigger_dagrun_with_note]
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def __init__(
fail_when_dag_is_paused: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
openlineage_inject_parent_info: bool = True,
note: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -190,6 +191,7 @@ def __init__(
self.failed_states = [DagRunState(s) for s in failed_states]
else:
self.failed_states = [DagRunState.FAILED]
self.note = note
self.skip_when_already_exists = skip_when_already_exists
self.fail_when_dag_is_paused = fail_when_dag_is_paused
self.openlineage_inject_parent_info = openlineage_inject_parent_info
Expand Down Expand Up @@ -264,6 +266,9 @@ def execute(self, context: Context):
)

def _trigger_dag_af_3(self, context, run_id, parsed_logical_date):
if self.note:
self.log.info("Triggered DAG with note: %s", self.note)

from airflow.providers.common.compat.sdk import DagRunTriggerException

raise DagRunTriggerException(
Expand All @@ -281,6 +286,9 @@ def _trigger_dag_af_3(self, context, run_id, parsed_logical_date):
)

def _trigger_dag_af_2(self, context, run_id, parsed_logical_date):
if self.note:
self.log.info("Triggered DAG with note: %s", self.note)
Comment on lines +289 to +290
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can ever reach that? Having a note passed down in AF2, can we?


try:
dag_run = trigger_dag(
dag_id=self.trigger_dag_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,27 @@ def test_trigger_dagrun_with_logical_date(self):

assert exc_info.value.logical_date == timezone.datetime(2021, 1, 2, 3, 4, 5)

@pytest.mark.skipif(not AIRFLOW_V_3_0_PLUS, reason="Implementation is different for Airflow 2 & 3")
def test_trigger_dagrun_operator_logs_note_af3(self, mocker):
"""
Ensure that for Airflow 3.x the operator logs the note
before raising DagRunTriggerException.
"""
mock_log = mocker.patch(
"airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator.log"
)

operator = TriggerDagRunOperator(
task_id="test_trigger",
trigger_dag_id="target_dag",
note="Test note",
)

with pytest.raises(DagRunTriggerException):
operator.execute(context={"ti": mocker.Mock()})

mock_log.info.assert_called_once_with("Triggered DAG with note: %s", "Test note")

def test_trigger_dagrun_operator_templated_invalid_conf(self, dag_maker):
"""Test passing a conf that is not JSON Serializable raise error."""
with dag_maker(
Expand Down Expand Up @@ -566,6 +587,27 @@ def test_trigger_dagrun(self, dag_maker, mock_supervisor_comms):
assert dagrun.run_type == DagRunType.MANUAL
assert dagrun.run_id == DagRun.generate_run_id(DagRunType.MANUAL, dagrun.logical_date)

def test_trigger_dagrun_operator_logs_note(self, dag_maker, mocker):
"""
Ensure that for Airflow 2.x the operator logs the note when provided.
This test can be removed once providers drop support for Airflow 2.
"""
mock_log = mocker.patch(
"airflow.providers.standard.operators.trigger_dagrun.TriggerDagRunOperator.log"
)

with dag_maker(TEST_DAG_ID, default_args={"start_date": DEFAULT_DATE}, serialized=True):
task = TriggerDagRunOperator(
task_id="test_task",
trigger_dag_id=TRIGGERED_DAG_ID,
note="Test note for AF2",
)

dag_maker.create_dagrun()
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

mock_log.info.assert_called_once_with("Triggered DAG with note: %s", "Test note for AF2")

def test_explicitly_provided_trigger_run_id_is_saved_as_attr(self, dag_maker, session):
with dag_maker(TEST_DAG_ID, default_args={"start_date": DEFAULT_DATE}, serialized=True):
task = TriggerDagRunOperator(
Expand Down
Loading