Airflow DAG scheduling to get stock data from Yahoo! Finance, save to csv files, fetch csv file to database and query.
- Download the daily price data for stock symbols
AAPLandTSLA. - The workflow should be scheduled to run at 6 PM on every weekday (Mon-Fri)
- Save the datasets into CSV file and load them into HDFS
- Run customer query on the downloaded datasets
- Airflow is installed
- package yfinance installed (
pip install yfinance) - pandas (
pip install pandas)
$ docker-compose build && docker-compose up
- Use
docker-composeto wrap airflow settings inside one container - For simplicity, we implement all tasks and helper function inside one python file
extract.pyinsidedagsfolder.
| Step | task id(s) | Description |
|---|---|---|
| 0 | init_dir |
Setup the temperation location for daily data. Using a BashOperator |
| 1 | extract_stock1, extract_stock2 |
Extract stock data from Yahoo! Finance and write data as CSV files on temperation location |
| 2 | move_data1, move_data2 |
Move CSV files from temperation location to database location |
| 3 | query_data |
Query data in database location |
The task dependencies is illustrated as the figure below:
Celery Executor requrie other RDBMS such as MySQL, Postgresql rather than SQLite. So the first step is install the RDBMS and the related connector. This project using MySQL as the back-end DBMS.
The practical method to use airflow with mysql and celery is wrapping them inside a docker container
RUN pip install apache-airflow[mysql,celery]==1.10.12 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-1.10.12/constraints-3.7.txt"
RUN pip install mysql-connector-python
Beside that, we need to modify the airflow.cfg file to explicit using Celery Executor
One trick when using MySQL with Airflow inside a docker container is call airflow initdb two times.
- The firs time to generate all default config files, included
airflow.cfgfile. - Then we could edit
airflow.cfgfile to explicitly using Celery Executor.
sql_alchemy_conn=mysql://{user}:{password}@airflow-backend/{db}
executor=CeleryExecutor
- Finally, we call
airflow initdbthe second time correct the config.

