If you are using airflow in your production environment then you must have come across a requirement to trigger an email if any Dag fails so that you will be notified and fix the issue in the flow. We will go step by step to set up the email configuration when DAG fails and in the later section, we will discuss the way to trigger email on the success of DAG.
Step 1: Get the Sender’s Email
Before sending an email, we need the sender’s email account credentials. Airflow will use this to trigger the email to the recipient.
Use Gmail to send an email:
- Create a new Gmail account or use the existing one.
- Enable 2-factor authentication in that Gmail account from settings.
- Open this link to generate the app password and that password we will use in the airflow. In this way, we can skip using an actual password.
Use Mailgun or Twilio SendGrid: Create your account on any of these websites and get the credentials. You can go for the domain verification if you want to send an email from your own domain. Follow this link to go for the domain verification approach and start sending emails.
Step 2: Setup SMTP configuration inside airflow.
We are going to discuss two ways to setup SMTP configuration in airflow. You can use anyone based on your requirements.
The first way is to set the configuration inside airflow.cfg file. This file exists in the home directory of airflow. So open your airflow.cfg file and edit the [smtp] section like below.
[smtp] smtp_host = smtp.gmail.com smtp_starttls = True smtp_ssl = False smtp_user = YOUR_EMAIL_ADDRESS smtp_password = 16_DIGIT_APP_PASSWORD smtp_port = 587 smtp_mail_from = YOUR_EMAIL_ADDRESS
If you are using Gmail then use the app password that you created in first step for smtp_password and smtp_user will be the gmail email ID that you create in first step.
In the case of mailgun or SendGrid smtp_host will be different. This information you can get from your account and put them in your airflow.cfg file as described in the above section.
The second way to set the SMTP configuration is to set them via environment variables if you don’t want to edit the airflow.cfg file. Set the below environment variables in your deployment environment.
AIRFLOW__SMTP__SMTP_HOST=smtp.gmail.com AIRFLOW__SMTP__SMTP_PORT=587 [email protected] AIRFLOW__SMTP__SMTP_PASSWORD=yourpassword [email protected]
Airflow will use these environment variables for SMTP configuration. Keep in mind that airflow variable set using environment variable has more precedence than set via airflow.cfg file.
Step 3: Trigger email from DAG on failure of DAG.
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.email_operator import EmailOperator from airflow.operators.dagrun_operator import TriggerDagRunOperator from datetime import datetime, timedelta from airflow.utils.email import send_email from airflow.utils.dates import days_ago from airflow.models import Variable python_file_to_exec = "python /usr/local/airflow/python-scripts/example.py" # DAG for airflow task dag_email_recipient = "[email protected]" default_args = { 'email': dag_email_recipient, 'email_on_failure': True, } dag = DAG( dag_id='process_incoming_files', default_args=default_args, start_date=days_ago(2), schedule_interval='@hourly' ) file_processor = BashOperator( task_id='process_incoming_files', bash_command=python_file_to_exec, dag=dag ) file_processor
In a nutshell, to make a DAG trigger email on failure we have to make 2 changes in the Dag file. Make sure email_on_failure is set to True and second is email is set with a valid email address.
Step 4: Trigger email on success of Dag
Airflow doesn’t provide a way to trigger email on success like failure. So there are two workarounds for this. The first one is to use the email operator task to send an email and attach this node at the end of Dag so that it will execute only after the successful execution of all predecessor nodes. The second method is to use the success_callback and write your own method to send the email.
We are going to discuss the first method where we will use the email operator task.
Use the below DAG to send the email on success as well failure.
from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.email_operator import EmailOperator from airflow.operators.dagrun_operator import TriggerDagRunOperator from datetime import datetime, timedelta from airflow.utils.email import send_email from airflow.utils.dates import days_ago from airflow.models import Variable python_file_to_exec = "python /usr/local/airflow/python-scripts/example.py" # DAG for airflow task dag_email_recipient = "[email protected]" default_args = { 'email': dag_email_recipient, 'email_on_failure': True, } dag = DAG( dag_id='process_incoming_files', default_args=default_args, start_date=days_ago(2), schedule_interval='@hourly' ) file_processor = BashOperator( task_id='process_incoming_files', bash_command=python_file_to_exec, dag=dag ) success_email_body = f""" Hi, <br><br> process_incoming_files DAG has been executed successfully at {datetime.now()}. """ send_mail = EmailOperator( task_id="send_mail", to=dag_email_recipient, subject='Airflow Success: process_incoming_files', html_content=success_email_body, dag=dag) file_processor >> send_mail
This is the working example on airflow 1.10.9 version. If you face any issue in the execution of above let us know by your comments.
Happy Coding!
2 Pingbacks
- How to set a variable in airflow admin UI and use it in DAG - MY BELIEFS
- How to set a variable in airflow admin UI and use it in DAG - My Beliefs
20th May 2021 at 10:52 am
python_file_to_exec = “python /usr/local/airflow/python-scripts/example.py”
Hi Piyush,
Can you please explain the above code , what that does.
I have to build the email notification in Airflow .
If you can help me out will be really appreciate.
Thanks
Chetan