Airflow在Kubernetes中的使用(第一部分):一种不同的操作器

2018.06.28

Airflow在Kubernetes中的使用(第一部分):一种不同的操作器

作者: Daniel Imberman (Bloomberg LP)

介绍

作为Bloomberg [继续致力于开发Kubernetes生态系统]的一部分(https://www.techatbloomberg.com/blog/bloomberg-awarded-first-cncf-end-user-award-contributions-kubernetes/),我们很高兴能够宣布Kubernetes Airflow Operator的发布; [Apache Airflow](https://airflow.apache.org/)的机制,一种流行的工作流程编排框架,使用Kubernetes API可以在本机启动任意的Kubernetes Pod。

什么是Airflow?

Apache Airflow是DevOps“Configuration As Code”理念的一种实现。 Airflow允许用户使用简单的Python对象DAG(有向无环图)启动多步骤流水线。 您可以在易于阅读的UI中定义依赖关系,以编程方式构建复杂的工作流,并监视调度的作业。

为什么在Kubernetes上使用Airflow?

自成立以来,Airflow的最大优势在于其灵活性。 Airflow提供广泛的服务集成,包括Spark和HBase,以及各种云提供商的服务。 Airflow还通过其插件框架提供轻松的可扩展性。但是,该项目的一个限制是Airflow用户仅限于执行时Airflow站点上存在的框架和客户端。单个组织可以拥有各种Airflow工作流程,范围从数据科学流到应用程序部署。用例中的这种差异会在依赖关系管理中产生问题,因为两个团队可能会在其工作流程使用截然不同的库。

为了解决这个问题,我们使Kubernetes允许用户启动任意Kubernetes pod和配置。 Airflow用户现在可以在其运行时环境,资源和机密上拥有全部权限,基本上将Airflow转变为“您想要的任何工作”工作流程协调器。

Kubernetes运营商

在进一步讨论之前,我们应该澄清Airflow中的[Operator](https://airflow.apache.org/concepts.html#operators)是一个任务定义。 当用户创建DAG时,他们将使用像“SparkSubmitOperator”或“PythonOperator”这样的operator分别提交/监视Spark作业或Python函数。 Airflow附带了Apache Spark,BigQuery,Hive和EMR等框架的内置运算符。 它还提供了一个插件入口点,允许DevOps工程师开发自己的连接器。

Airflow用户一直在寻找更易于管理部署和ETL流的方法。 在增加监控的同时,任何解耦流程的机会都可以减少未来的停机等问题。 以下是Airflow Kubernetes Operator提供的好处:

  • 提高部署灵活性:

Airflow的插件API一直为希望在其DAG中测试新功能的工程师提供了重要的福利。 不利的一面是,每当开发人员想要创建一个新的operator时,他们就必须开发一个全新的插件。 现在,任何可以在Docker容器中运行的任务都可以通过完全相同的运算符访问,而无需维护额外的Airflow代码。

  • 配置和依赖的灵活性:

对于在静态Airflow工作程序中运行的operator,依赖关系管理可能变得非常困难。 如果开发人员想要运行一个需要[SciPy](https://www.scipy.org) 的任务和另一个需要[NumPy](http://www.numpy.org) 的任务,开发人员必须维护所有Airflow节点中的依赖关系或将任务卸载到其他计算机(如果外部计算机以未跟踪的方式更改,则可能导致错误)。 自定义Docker镜像允许用户确保任务环境,配置和依赖关系完全是幂等的。

  • 使用kubernetes Secret以增加安全性:

处理敏感数据是任何开发工程师的核心职责。 Airflow用户总有机会在严格条款的基础上隔离任何API密钥,数据库密码和登录凭据。 使用Kubernetes运算符,用户可以利用Kubernetes Vault技术存储所有敏感数据。 这意味着Airflow工作人员将永远无法访问此信息,并且可以容易地请求仅使用他们需要的密码信息构建pod。

#架构

Kubernetes Operator使用[Kubernetes Python客户端](https://github.com/kubernetes-client/Python)生成由APIServer处理的请求(1)。 然后,Kubernetes将使用您定义的需求启动您的pod(2)。映像文件中将加载环境变量,Secret和依赖项,执行单个命令。 一旦启动作业,operator只需要监视跟踪日志的状况(3)。 用户可以选择将日志本地收集到调度程序或当前位于其Kubernetes集群中的任何分布式日志记录服务。

#使用Kubernetes Operator

##一个基本的例子

以下DAG可能是我们可以编写的最简单的示例,以显示Kubernetes Operator的工作原理。 这个DAG在Kubernetes上创建了两个pod:一个带有Python的Linux发行版和一个没有它的基本Ubuntu发行版。 Python pod将正确运行Python请求,而没有Python的那个将向用户报告失败。 如果Operator正常工作,则应该完成“passing-task”pod,而“falling-task”pod则向Airflow网络服务器返回失败。

from airflow import DAG

from datetime import datetime, timedelta

from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

from airflow.operators.dummy_operator import DummyOperator


default_args = {

    'owner': 'airflow',

    'depends_on_past': False,

    'start_date': datetime.utcnow(),

    'email': ['airflow@example.com'],

    'email_on_failure': False,

    'email_on_retry': False,

    'retries': 1,

    'retry_delay': timedelta(minutes=5)

}



dag = DAG(

    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))

start = DummyOperator(task_id='run_this_first', dag=dag)
passing = KubernetesPodOperator(namespace='default',

                          image="Python:3.6",

                          cmds=["Python","-c"],

                          arguments=["print('hello world')"],

                          labels={"foo": "bar"},

                          name="passing-test",

                          task_id="passing-task",

                          get_logs=True,

                          dag=dag

                          )

 failing = KubernetesPodOperator(namespace='default',

                          image="ubuntu:1604",

                          cmds=["Python","-c"],

                          arguments=["print('hello world')"],

                          labels={"foo": "bar"},

                          name="fail",

                          task_id="failing-task",

                          get_logs=True,

                          dag=dag

                          )

passing.set_upstream(start)

failing.set_upstream(start)

##但这与我的工作流程有什么关系?

虽然这个例子只使用基本映像,但Docker的神奇之处在于,这个相同的DAG可以用于您想要的任何图像/命令配对。 以下是推荐的CI / CD管道,用于在Airflow DAG上运行生产就绪代码。

1:github中的PR

使用Travis或Jenkins运行单元和集成测试,请您的朋友PR您的代码,并合并到主分支以触发自动CI构建。

2:CI / CD构建Jenkins - > Docker Image

[在Jenkins构建中生成Docker镜像和缓冲版本](https://getintodevops.com/blog/building-your-first-Docker-image-with-jenkins-2-guide-for-developers)。

3:Airflow启动任务

最后,更新您的DAG以反映新版本,您应该准备好了!

production_task = KubernetesPodOperator(namespace='default',

                          # image="my-production-job:release-1.0.1", <-- old release

                          image="my-production-job:release-1.0.2",

                          cmds=["Python","-c"],

                          arguments=["print('hello world')"],

                          name="fail",

                          task_id="failing-task",

                          get_logs=True,

                          dag=dag

                          )

#启动测试部署

由于Kubernetes运营商尚未发布,我们尚未发布官方helm 图表或operator(但两者目前都在进行中)。 但是,我们在下面列出了基本部署的说明,并且正在积极寻找测试人员来尝试这一新功能。 要试用此系统,请按以下步骤操作:

##步骤1:将kubeconfig设置为指向kubernetes集群

##步骤2:clone Airflow 仓库:

运行git clone https:// github.com / apache / incubator-airflow.git来clone官方Airflow仓库。

##步骤3:运行

为了运行这个基本Deployment,我们正在选择我们目前用于Kubernetes Executor的集成测试脚本(将在本系列的下一篇文章中对此进行解释)。 要启动此部署,请运行以下三个命令:


sed -ie "s/KubernetesExecutor/LocalExecutor/g" scripts/ci/kubernetes/kube/configmaps.yaml

./scripts/ci/kubernetes/Docker/build.sh

./scripts/ci/kubernetes/kube/deploy.sh

在我们继续之前,让我们讨论这些命令正在做什么:

sed -ie“s / KubernetesExecutor / LocalExecutor / g”scripts / ci / kubernetes / kube / configmaps.yaml

Kubernetes Executor是另一种Airflow功能,允许动态分配任务已解决幂等pod的问题。我们将其切换到LocalExecutor的原因只是一次引入一个功能。如果您想尝试Kubernetes Executor,欢迎您跳过此步骤,但我们将在以后的文章中详细介绍。

./scripts/ci/kubernetes/Docker/build.sh

此脚本将对Airflow主分支代码进行打包,以根据Airflow的发行文件构建Docker容器

./scripts/ci/kubernetes/kube/deploy.sh

最后,我们在您的群集上创建完整的Airflow部署。这包括Airflow配置,postgres后端,webserver +调度程序以及之间的所有必要服务。需要注意的一点是,提供的角色绑定是集群管理员,因此如果您没有该集群的权限级别,可以在scripts / ci / kubernetes / kube / airflow.yaml中进行修改。

##步骤4:登录您的网络服务器

现在您的Airflow实例正在运行,让我们来看看UI!用户界面位于Airflow pod的8080端口,因此只需运行即可


WEB=$(kubectl get pods -o go-template --template '{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}' | grep "airflow" | head -1)

kubectl port-forward $WEB 8080:8080

现在,Airflow UI将存在于http://localhost:8080上。 要登录,只需输入airflow /airflow,您就可以完全访问Airflow Web UI。

##步骤5:上传测试文档

要修改/添加自己的DAG,可以使用kubectl cp将本地文件上传到Airflow调度程序的DAG文件夹中。 然后,Airflow将读取新的DAG并自动将其上传到其系统。 以下命令将任何本地文件上载到正确的目录中:

kubectl cp /:/root/airflow/dags -c scheduler

##步骤6:使用它!

#那么我什么时候可以使用它?

虽然此功能仍处于早期阶段,但我们希望在未来几个月内发布该功能以进行广泛发布。

#参与其中

此功能只是将Apache Airflow集成到Kubernetes中的多项主要工作的开始。 Kubernetes Operator已合并到[Airflow的1.10发布分支](https://github.com/apache/incubator-airflow/tree/v1-10-test)(实验模式中的执行模块),以及完整的k8s本地调度程序称为Kubernetes Executor(即将发布文章)。这些功能仍处于早期采用者/贡献者可能对这些功能的未来产生巨大影响的阶段。

对于有兴趣加入这些工作的人,我建议按照以下步骤:

*加入airflow-dev邮件列表dev@airflow.apache.org。

*在[Apache Airflow JIRA]中提出问题(https://issues.apache.org/jira/projects/AIRFLOW/issues/)

*周三上午10点太平洋标准时间加入我们的SIG-BigData会议。

*在kubernetes.slack.com上的#sig-big-data找到我们。

特别感谢Apache Airflow和Kubernetes社区,特别是Grant Nicholas,Ben Goldberg,Anirudh Ramanathan,Fokko Dreisprong和Bolke de Bruin,感谢您对这些功能的巨大帮助以及我们未来的努力。