Mkdir700's Note

Mkdir700's Note

【入门Airflow】 编写第一个DAG

2021-12-16
【入门Airflow】 编写第一个DAG

前言

我们接触一个新的框架总会有许多新概念,这些概念虽然比较重要,但我想它的功能才是我们初步接触最想体验的,所以我更偏向于在使用的过程中去逐渐了解。

本文将从实际需求出发完成一个DAG的编写,并且我会写些BUG,以达到避坑的目的。

需求

现在我们有一个需求,每天定时请求下方接口,并把数据保存成文本文件。

https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US

分析

一共两个步骤:

  1. 请求接口;
  2. 存储响应内容;

我们可以分别定义两个函数,来完成这两个步骤。

请求接口

def request():
    import requests
    resp = requests.get("https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US")
    return resp

保存数据

def save(text: str):
    with open("/opt/airflow/results.txt", "a+", encoding="utf-8") as f:
        f.write(text)

DAG

在上一步,我们分别完成了两个函数,如果将它们组合起来就可以完成需求了。所以,现在需要一个家伙来“组合”它们。

DAG(Directed Acyclic Graph)即,有向无环图,关于它的含义,官方介绍比我清楚的多。

通俗点理解,DAG是一张地图,Airflow将根据这张地图,按照上面的路线抵达终点。

现在,我们就需要完成这张地图的路线绘制。

对于我们的需求,执行的路线就是,先下载再保存 ,所以路线图就是:

Operator

我们知道DAG是用来绘制路线图的,那么我们的任务怎么执行,谁来执行呢?

所以,需要一个家伙来将我们写的代码跑起来,这个家伙就是Operator。

列车有乘务,飞机有空姐,轮船有水手... Operator也是一样,根据不同的任务类型,可以选择不同的Operator。

例如:

我们的需求都是执行Python函数,所以我们可以只使用PythonOperator

不同Operator可以互相依赖,就好像海陆空协同作战。

编写DAG

了解完上面的两个基本概念,我们就可以编写DAG了。

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

with DAG(
    dag_id='download_bing',
    schedule_interval='0 0 * * *', # 定时执行,Cron表达式
    start_date=datetime(2021, 1, 1), # 开始时间
    tags=['test'], # DAG的tag标签,用于分类
) as dag:
    def request():
        import requests
        resp = requests.get("https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US")
        return resp
    
    
    def save(text: str):
        with open("/opt/airflow/results.txt", "a+", encoding="utf-8") as f:
            f.write(text)
    
    # 使用PythonOperator执行Python函数
    request_task = PythonOperator(
        task_id="request_task",
        python_callable=request
    )
    
    save_task = PythonOperator(
        task_id="save_task",
        python_callable=save
    )

上方就是我们编写DAG,仔细看,request和save之间没有先后关系(依赖关系)。

Airflow实现了语法糖,使用>><< 定义任务之间的依赖关系,则在本例中的依赖关系可以定义如下:

request_task >> save_task

完整DAG

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

with DAG(
    dag_id='download_bing',
    schedule_interval='0 0 * * *',
    start_date=datetime(2021, 1, 1),
    tags=['test'],
) as dag:
    def request():
        import requests
        resp = requests.get("https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US")
        return resp
    
    
    def save(text: str):
        with open("/opt/airflow/results.txt", "a+", encoding="utf-8") as f:
            f.write(text)
    
    
    request_task = PythonOperator(
        task_id="request_task",
        python_callable=request
    )
    
    save_task = PythonOperator(
        task_id="save_task",
        python_callable=save
    )
    
    request_task >> save_task

激活DAG

“激活”很简单,我们只要放置到/dags目录下,等待Airflow自动读取即可。

[入门Airflow] 使用Docker在本地快速搭建Airflow一文中,我们已映射了/dags /logs /plugins 这三个文件,所以我们将刚编写的DAG放置到/dags目录下,然后打开Web端,等待一会儿刷新即可。

如果DAG有错误,Web端则会包导入错误的异常:

可以输入框,输入test标签快速找到DAG

执行DAG

点击右侧的执行按钮

好吧,不出意外,你会看到下图的一片红色(doge🤣)

查看DAG执行日志

我理解你急于解决这个问题的心情,不妨来看看如何是哪里出了问题。

返回值类型错误

报错信息:

TypeError: Object of type Response is not JSON serializable

这个报错说Response类型不能被JSON序列化。

详细内容:

[2021-12-16, 21:24:37 ] {python.py:152} INFO - Done. Returned value was: <Response [200]>
[2021-12-16, 21:24:37 ] {xcom.py:334} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config.

不卖关子,request函数的返回值就是Response,Airflow推荐的返回值是能够JSON序列化的数据。

所以,我们可以返回Response.text即可。

DAG执行多次的问题

上面的报错解决了,但你可以先不用去改代码,我们进入DAG详情页面,选择Calendar选项

What Fuck! 怎么全是红的?

这其实是因为我们配置DAG的原因

with DAG(
    dag_id='download_bing',
    schedule_interval='0 0 * * *',
    start_date=datetime(2021, 1, 1),
    tags=['test'],
) as dag:
      ...

我们指定了start_date,所以Airflow就从2021-01-01执行到当前时间。

如果想让过去的时间,归为当前一次执行,则应该指定参数catchupFalse,我理解的意思是不捕获错过的时间

with DAG(
    dag_id='download_bing',
    schedule_interval='0 0 * * *',
    start_date=datetime(2021, 1, 1),
    tags=['test'],
    catchup=False  # 新增
) as dag:
      ...

任务之间的值传递

还有一件事。有没有发现,request返回值,如何交给save呢?

详细的可以了解Xcoms的概念,这里我就简单描述下任务之间值传递的逻辑。

save并不是被动接收来自request的返回值,而是主动去询问。

所以save函数这样写:

def save(**kwargs):
    # 获取任务实例
    ti = kwargs['ti']
    # 拉取任务ID为request_task的返回值
    text = ti.xcom_pull(task_ids="request_task")
    with open("/opt/airflow/results.txt", "a+", encoding="utf-8") as f:
        f.write(text)

通过ti.xcom_pull,我们就可以拿到request_task的返回值。

对于值传递,可以在Xcoms查看详情,我也会在后续的文章中,详细解释Xcoms工作逻辑。

完整DAG代码:

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

with DAG(
    dag_id='download_bing',
    schedule_interval='0 0 * * *',
    start_date=datetime(2021, 1, 1),
    tags=['test'],
    catchup=False,
) as dag:
    def request():
        import requests
        resp = requests.get("https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US")
        return resp.text


    def save(**kwargs):
        # 获取任务实例
        ti = kwargs['ti']
        # 拉取任务ID为request_task的返回值
        text = ti.xcom_pull(task_ids="request_task")
        with open("/opt/airflow/results.txt", "a+", encoding="utf-8") as f:
            f.write(text)
    
    
    request_task = PythonOperator(
        task_id="request_task",
        python_callable=request
    )
    
    save_task = PythonOperator(
        task_id="save_task",
        python_callable=save
    )
    
    request_task >> save_task

再次执行DAG

再次执行,可以看到变绿了🤡

执行下方命令,查看Worker容器:

docker exec -it airflow_airflow-worker_1  bash
ls -la

至此,文件已成功保存了。