Airflow 1.9 Not Able to Read Logs
Getting started with Apache Airflow
In this mail service, I am going to discuss Apache Airflow, a workflow management system developed by Airbnb.
Earlier I had discussed writing basic ETL pipelines in Bonobo. Bonobo is cool for write ETL pipelines but the globe is not all near writing ETL pipelines to automate things. At that place are other use cases in which yous have to perform tasks in a certain social club once or periodically. For instance:
- Monitoring Cron jobs
- transferring data from one place to other.
- Automating your DevOps operations.
- Periodically fetching data from websites and update the database for your awesome cost comparison organization.
- Information processing for recommendation based systems.
- Car Learning Pipelines.
Possibilities are endless.
Before we move on further to implement Airflow in our systems, let's discuss what actually is Airflow and it's terminologies.
What is Airflow?
From the Website:
Airflow is a platform to programmatically author, schedule and monitor workflows.
Use airflow to author workflows every bit directed acyclic graphs (DAGs) of tasks. The airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing circuitous surgeries on DAGs a snap. The rich user interface makes information technology easy to visualize pipelines running in product, monitor progress, and troubleshoot issues when needed.
Basica l ly, it helps to automate scripts in order to perform tasks. Airflow is Python-based but you can execute a program irrespective of the language. For case, the beginning stage of your workflow has to execute a C++ based program to perform image analysis so a Python-based program to transfer that data to S3. Possibilities are endless.
What is Dag?
From Wikipedia
In mathematics and computer science, a directed acyclic graph (DAG /ˈdæɡ/ (Near this sound listen)), is a finite directed graph with no directed cycles. That is, it consists of finitely many vertices and edges, with each edge directed from one vertex to another, such that there is no way to showtime at any vertex v and follow a consistently-directed sequence of edges that eventually loops dorsum to five again. Equivalently, a DAG is a directed graph that has a topological ordering, a sequence of the vertices such that every edge is directed from earlier to later in the sequence.
Let me try to explicate in uncomplicated words: Yous tin can only be a son of your father but not vice versa. OK, it's lame or weird but could non find a amend case to explain a directed bike.
In Airflow all workflows are DAGs. A Dag consists of operators. An operator defines an individual chore that needs to be performed. There are different types of operators bachelor( As given on Airflow Website):
-
BashOperator
- executes a fustigate command -
PythonOperator
- calls an capricious Python function -
EmailOperator
- sends an e-mail -
SimpleHttpOperator
- sends an HTTP request -
MySqlOperator
,SqliteOperator
,PostgresOperator
,MsSqlOperator
,OracleOperator
,JdbcOperator
, etc. - executes a SQL command -
Sensor
- waits for a certain fourth dimension, file, database row, S3 key, etc…
You can also come with a custom operator every bit per your need.
Installation and Setup
Airflow is Python based. The best manner to install it is via pip
tool.
pip install apache-airflow
To verify whether it got installed, run the command: airflow version
and it should print something like:
[2018-09-22 15:59:23,880] {__init__.py:51} INFO - Using executor SequentialExecutor ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.0
You lot volition need to install mysqlclient
as well to incorporate MySQL in your workflows. Information technology is optional though.
pip install mysqlclient
Before yous start annihilation, create a folder and set it as AIRFLOW_HOME
. In my case information technology is airflow_home
. Once created you will phone call export
command to set it in the path.
consign AIRFLOW_HOME='pwd' airflow_home
Make sure you are a binder above of airflow_home
before running the export
command. Within airflow_home
you lot will create another folder to go along DAGs. Call it dags
If you set up load_examples=False
it will not load default examples on the Spider web interface.
Now you accept to call airflow initdb
inside airflow_home
folder. Once it's washed it creates airflow.cfg
and unitests.cfg
airflow.db
is an SQLite file to store all configuration related to run workflows. airflow.cfg
is to keep all initial settings to go on things running.
In this file, you can see sql_alchemy_conn
parameter with the value ../airflow_home/airflow.db
Yous can use MySQL if you lot want. For now, but stick with basic settings.
And so far so proficient, now without wasting whatever fourth dimension permit's first the spider web server.
airflow webserver
When starts information technology shows the screen like:
2018-09-twenty 22:36:24,943] {__init__.py:51} INFO - Using executor SequentialExecutor /anaconda3/anaconda/lib/python3.6/site-packages/airflow/bin/cli.py:1595: DeprecationWarning: The celeryd_concurrency pick in [celery] has been renamed to worker_concurrency - the onetime setting has been used, but please update your config. default=conf.become('celery', 'worker_concurrency')), ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ v1.10.0 [2018-09-nineteen xiv:21:42,340] {__init__.py:57} INFO - Using executor SequentialExecutor ____________ _____________ ____ |__( )_________ __/__ /________ __ ____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / _/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/ /anaconda3/anaconda/lib/python3.six/site-packages/flask/exthook.py:71: ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use flask_cache instead. .format(ten=modname), ExtDeprecationWarning [2018-09-xix 14:21:43,119] [48995] {models.py:167} INFO - Filling up the DagBag from /Development/airflow_home/dags Running the Gunicorn Server with: Workers: 4 sync Host: 0.0.0.0:8080
Now when you lot visit 0.0.0.0:8080
information technology shows a screen like:
You tin see a bunch of entries here. These are the example shipped with the Airflow installation. You tin turn them off by visiting airflow.cfg
file and set load_examples
to False
DAG Runs tell how many times a certain DAG has been executed. Recent Tasks tells which task out of many tasks within a DAG currently running and what's the status of information technology. The Schedule is similar to the 1 y'all would have used when scheduling a Cron, therefore, I won't emphasize on it at the moment. The Schedule is responsible at what time this certain DAG should be triggered.
Here is the screenshot from a DAG I created before and executed. You can encounter rectangular boxes representing a task. You tin can likewise see dissimilar color boxes on the top right of the greyed box, named: success, running, failed etc. These are legends. In the picture above you tin all boxes accept a green edge, nevertheless, if you lot are unsure then hover your mouse on success fable and y'all will run across a screen like beneath:
You might accept noticed the groundwork/filling color of these boxes which is dark-green and reed. On top-left of the greyed box, you tin see why are they in such colors, these groundwork color represents the different types of operators being used in this DAG. In this instance, we are using BashOperator and PythonOperator.
Basic Example
We will work on a basic example to see how it works. I will be explaining the example. In the dags
folder which was earlier created in airflow_home/
we will create our first sample DAG. So, I am going to create a file with name, my_simple_dag.py
The very first affair you lot are going to do subsequently imports is to write routines that will serve as tasks for Operators. We will be using a mixture of BashOperator
and PythonOperator
.
import datetime as dtfrom airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperatordef greet():
def reply():
print('Writing in file')
with open('path/to/file/greet.txt', 'a+', encoding='utf8') as f:
now = dt.datetime.now()
t = now.strftime("%Y-%m-%d %H:%G")
f.write(str(t) + '\n')
render 'Greeted'
return 'Greet Responded Again'
These are 2 simple routines which are doing zip just returning a text. I will tell y'all afterwards why am I writing something in a text file. Side by side things I am going to practice is to define default_args
and create a DAG
instance.
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2018, 9, 24, ten, 00, 00),
'concurrency': one,
'retries': 0
}
Here yous fix a bunch of parameters in the default_args
dict
variable.
start_date
tells since when this DAG should commencement executing the workflow. This start_date
could belong to the past. In my case, information technology is 22 September and xi AM UTC. This engagement is past for me now because information technology's already 11:15 AM UTC for me. You can always change this parameter via airflow.cfg
file and set up your own local timezone. For at present, UTC is fine for me. In example you are still curious what time is being used by Airflow, bank check on the top correct of the Airflow Web UI, you should run across something like given below. You can employ this every bit a reference to schedule your tasks.
The retries
parameter retries to run the DAG 10 number of times in instance of not executing successfully. The concurrency
parameter helps to dictate the number of processes needs to be used running multiple DAGs. For instance, your DAG has to run 4 past instances, also termed as Backfill, with an interval of 10 minutes(I will cover this complex topic shortly) and you accept prepare concurrency
to two
and so two DAGs volition run at a time and execute tasks in it. If yous already take implemented multiprocessing
in your Python then you should experience like home hither.
with DAG('my_simple_dag',
default_args=default_args,
schedule_interval='*/ten * * * *',
) equally dag:
opr_hello = BashOperator(task_id='say_Hi',
bash_command='echo "Hi!!"')opr_greet = PythonOperator(task_id='greet',
python_callable=greet)
opr_sleep = BashOperator(task_id='sleep_me',
bash_command='sleep five')opr_respond = PythonOperator(task_id='respond',
opr_hello >> opr_greet >> opr_sleep >> opr_respond
python_callable=respond)
Now using Context Manager we are defining a DAG with its properties, the first parameter is the ID of the dag, in our instance it is my_simple_dag
, the second parameter we already have discussed, the 3rd parameter is something that needs to exist discussed along with start_date
that mentioned in default_args
.
Within that Context Managing director, you lot are assigning operators along with job Ids. In our instance these operators labeled as: opr_hello
opr_greet
opr_sleep
and opr_respond
. These names then appear in rectangular boxes discussed above.
Before I move farther, I amend discuss DAG Runs and scheduler and what office exercise they play in the entire workflow.
What is Airflow Scheduler?
Airflow Scheduler is a monitoring process that runs all the fourth dimension and triggers task execution based on schedule_interval
and execution_date.
What is DagRun?
A DagRun is the instance of a DAG that will run at a fourth dimension. When it runs, all task inside it will exist executed.
Above is the diagram which might help to effigy out about a DAGRun :-)
Assume the start_date
is September,24,2018 12:00:00 PM UTC and you have started the DAG at 12:xxx:00 PM UTC with the schedule_interval
of */10 * * * *(After every 10 minutes). By using the same default_args
params discussed above, the following will exist the entries of DAG that will run instantly, one by i in our case due to concurrency
is one
:
Why is it happening? Well, you are responsible for it. Airflow gives you the facility to run past DAGs. The process of running past DAGs is chosen Backfill. The process of Backfill really let Airflow forset some status of all DAGs since it's inception. The characteristic has been given for scenarios where you are running a DAG which queries some DB or API like Google Analytics to fetch previous data and make it function of the workflow. Even if there is no past data, Airflow will run it anyway to proceed the state of the entire workflow intact.
Once by DAGs are run, the next(the one you intend to run will run) at 12:40:00 PM UTC. Do remember that whatever the schedule you ready, the DAG runs Afterward that time, in our case if information technology has to run after every 10 mins, it volition run in one case 10 minutes are passed.
Let'southward play with information technology. I plough my_simple_dag
on and then start the scheduler.
airflow scheduler
As before long as you lot run yous volition see the dag screen like this:
Some of the tasks are queued. If you click on the DAG Id, my_simple_dag
y'all volition run into a screen similar below:
Observe the timestamp in Run Id column. Exercise you see the pattern? The outset one executed at x:00, then 10:10, 10:twenty. It then stops, let me analyze again that the DAG runs one time the fourth dimension duration which is 10minutes is passed. The scheduler started at 10:30 AM. and so it filled passed 3 with the difference of ten mins of the interval.
The DAG that was executed for x:thirty:00 AM UTC was actually washed at 10:xl:00 AM UTC, The latest DAGRun record will ever be a one minus than the electric current time. In our example, the machine time was x:forty:00 AM UTC
If you hover on 1 of the circles yous tin can meet the timestamp in forepart of Run: that tells the time it was executed. You tin can see that these light-green circles accept a time difference of ten minutes. The Tree View gives is a bit complicated but gives a complete moving picture of your entire workflow. In our case, it was run 4 times and all tasks ran successfully, the dark green colour.
You can avoid Backfilling in two ways: You set start_date
of the future or set catchup = Faux
in DAG
instance. For case, you can do something like below:
with DAG('my_simple_dag',
catchup=Imitation,
default_args=default_args,
schedule_interval='*/10 * * * *',
# schedule_interval=None,
) as dag:
By setting catchup=False
information technology then does not matter whether your start_date
belongs to the by or non. It will exist executing from the current time and continues. By setting end_date
you can make a DAG cease running itself.
opr_hello >> opr_greet >> opr_sleep >> opr_respond
The line you are seeing above tells the human relationship between operators hence constructs the unabridged workflow. The bitwise operator here is telling the relationship betwixt operators. Here opr_hello
runs first and then the rest. The flow executes from left to correct. In pictorial form it looks like below:
opr_hello >> opr_greet >> opr_sleep << opr_respond
If you change the direction of the terminal operator the menstruum will expect like beneath:
The respond
task will execute in parallel and sleep
will execute in both cases.
Conclusion
In this postal service, I discussed how y'all tin innovate a comprehensive workflow organisation to schedule and automate your workflows. In function 2, I will come up upwardly with a real-world example to show how Airflow can be used. I wanted to comprehend it upward in this mail just it already got enough lengthy and explaining the DAGRun concept was necessary as information technology took me quite a time to effigy information technology out.
As ever the lawmaking of this post is bachelor on Github .
This mail service was originally published hither .
If you like this mail then you should subscribe to my newsletter.
Airflow 1.9 Not Able to Read Logs
Source: https://towardsdatascience.com/getting-started-with-apache-airflow-df1aa77d7b1b