azkaban
>i # Azkaban概论
## 为什么需要工作流调度系统
1、一个完整的数据分析系统通常都是由大量任务单元组成。
Shell脚本程序,Java 程序,MapReduce 程序、Hive 脚本等
2、各任务单元之间存在时间先后及前后依赖关系
3、为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行
## 常见的工作流调度系统
1、简单的任务调度:直接使用Linux的Crontab来定义;。
2、复杂的任务调度:开发调度平台或使用现成的开源调度系统,比如
- Ooize:CDH平台下的一个工作流程调度器,可以借助CDH平台下的工作组件HUE,使用就比较友好,因为HUE提供了可视化的界面,脱离HUE后需要编写大量的xml文件,比较繁琐
- Azkaban:简单易用的工作流程调度器。配置工作流程通过yaml文件进行配置,定时规则在页面上进行配置
- Airflow:Python开发的工作流程调度器,配置流程规则需要使用Python脚本
>w Crontab可以实现简单的任务调度,它不能直接解决工作流程的依赖关系的
> 复杂的调度工具主要分两步:描述工作流程、配置定时
> 区别在于描述工作流程或配置定时规则
## Azkaban与Ooize对比
总体来说,Ooize 相比Azkaban是一个重量级的任务调度系统,功能全面,但配置使用也更复杂,作为老牌的调度器,使用最多。如果可以不在意某些功能的缺失,轻量级调度器Azkaban是很不错的候选对象。
## 基本架构
![image.png](https://cos.easydoc.net/52087651/files/l6eqiev1.png)
Azkaban Web Server:项目管理、用户管理、权限管理、任务的定时和触发、提供工作界面
Azkaban Executor Server:负责具体任务的执行
MySQL:工作流程的配置、定时规则、任务的执行状态的存储
## 模式
- 单机模式:适合测试使用,Web Server、Executor Server是在一台服务器上
- 集群模式 :适合生产环境,Web Server、Executor Server独立部署,是两个独立的进程,集群可以部署多个独立的Executor Server能起到一定的负载均衡和容灾的作用
>i # 安装
## 安装包
>s azkaban-db-3.84.4.tar.gz:azkaban在使用的时候会用到Mysql数据库,所以需要提前建好需要的数据库和表,这里面是所有的建表语句
azkaban-exec-server-3.84.4.tar.gz:Executor Server安装包
azkaban-web-server-3.84.4.tar.gz:Web Server安装包
## 解压
```shell
tar -xvf azkaban-web-server-3.84.4.tar.gz -C /opt/azkaban
tar -xvf azkaban-exec-server-3.84.4.tar.gz -C /opt/azkaban
tar -xvf azkaban-db-3.84.4.tar.gz -C /opt/azkaban
```
## 改名可选
```shell
mv azkaban-web-server-3.84.4 azkaban-web
mv azkaban-exec-server-3.84.4 azkaban-exec
```
## azkaban部署
### 数据库初始化
> 所需库、表、用户的创建
- **正常安装MySQL**
- **启动MySQL**
`mysql -uroot -p`
- 登录MySQL创建Azkaban数据库
`mysql> create database azkaban`
- 创建Azkaban用户并赋予权限
- 设置密码有效4位以上
`mysql> set global validate_password_length=4;`
- 设置密码策略最低级别
`mysql> set global validate_password_policy=0;`
- 创建Azkaban用户,任何主机都可以访问Azkaban,密码自己定
`mysql> CREATE USER 'azkaban'@'%' IDENTIFIED BY 'liulike';`
- 赋予Azkaban用户增删改查权限
`mysql> GRANT SELECT,INSERT,UPDATE,DELETE ON azkaban.* to 'azkaban'@'%' WITH GRANT OPTION;`
- 创建Azkaban表,完成后退出MySQL
`mysql> use azkaban;`
`mysql> source /opt/azkaban/azkaban-db-3.84.4/create-all-sq1-3.84.4.sq1`
`mysql> quit;`
- 更改MySQL包大小;防止Azkaban连接MySQL阻塞
`vi /etc/my.cnf`
在[mysqld]下面加一行`max_allowed_packet=1024M`
[mysqld]
`max_allowed_packet=1024M`
- 重启MySQL
`systemctl restart mysq1de`
### Executor Server的部署与配置
Azkaban Executor Server 处理工作流和作业的实际执行。
- 编辑 azkaban.properties
```shell
vi /opt/azkaban-web/conf/azkaban.properties
```
修改如下标红的属性
<pre>
#...
<span style="color:red">default.timezone.id=Asia/Shanghai</span>
#...
<span style="color:red">azkaban.webserver.url=http://hadoop-01:8081</span>
<span style="color:red">executor.port=12321</span>
#...
database.type=mysql
mysql.port=3306
<span style="color:red">mysql.host=hadoop-01</span>
mysql.database=azkaban
mysql.user=azkaban
<span style="color:red">mysql.password=liulike</span>
mysql.numconnections=100
</pre>
- 同步 azkaban-exec 到所有节点
[root@hadoop-01 azkaban]$ acp -r /opt/azkaban-exec
- <span style="color:red">必须进入到/opt/azkaban-exec 路径</span>,分别在三台机器上,启动 executor server
注意:如果在/opt/azkaban-exec 目录下出现 executor.port 文件,说明
启动成功
```shell
[root@hadoop-01 azkaban-exec]$ bin/start-exec.sh
[root@hadoop-02 azkaban-exec]$ bin/start-exec.sh
[root@hadoop-03 azkaban-exec]$ bin/start-exec.sh
```
- 下面激活 executor,需要
```shell
[root@hadoop-01 azkaban-exec]$ curl -G "hadoop-01:12321/executor?action=activate" && echo
[root@hadoop-01 azkaban-exec]$ curl -G "hadoop-01:12321/executor?action=activate" && echo
[root@hadoop-01 azkaban-exec]$ curl -G "hadoop-01:12321/executor?action=activate" && echo
```
如果三台机器都出现如下提示,则表示激活成功
```shell
{"status":"success"}
```
### Web Server的部署与配置
Azkaban Web Server 处理项目管理,身份验证,计划和执行触发。
- 编辑 azkaban.properties
```shell
[root@hadoop-01 azkaban]$ vi /opt/azkaban-web/conf/azkaban.properties
```
修改如下属性
<pre>
...
<span style="color:red">default.timezone.id=Asia/Shanghai</span>
...
database.type=mysql
mysql.port=3306
<span style="color:red">mysql.host=hadoop-01</span>
mysql.database=azkaban
mysql.user=azkaban
<span style="color:red">mysql.password=liulike</span>
mysql.numconnections=100
...
<span style="color:red">azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus</span>
</pre>
>s 说明:
#StaticRemainingFlowSize:正在排队的任务数;
#CpuStatus:CPU 占用情况
#MinimumFreeMemory:内存占用情况。测试环境,必须将 MinimumFreeMemory 删除或注释掉,否则它会认为集群资源不够,不执行。
- 修改 azkaban-users.xml 文件,添加 root 用户
```shell
[root@hadoop-01 azkaban-web]$ vi /opt/azkaban-web/conf/azkaban-users.xml
```
```shell
<azkaban-users>
<user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/>
<user password="metrics" roles="metrics" username="metrics"/>
<user password="liulike" roles="admin" username="liulike"/>
<role name="admin" permissions="ADMIN"/>
<role name="metrics" permissions="METRICS"/>
</azkaban-users>
```
- 必须进入到 hadoop-01 的/opt/azkaban-web 路径,启动 web server
[root@hadoop-01 azkaban-web]$ bin/start-web.sh
- 访问 http://hadoop-01:8081,并用 liulike 用户登陆
>i # 案例实操
## HelloWorld 案例
- 在 windows 环境,新建 azkaban.project 文件,编辑内容如下
`azkaban-flow-version: 2.0`
注意:该文件作用,是采用新的 Flow-API 方式解析 flow 文件。
- 新建 basic.flow 文件,内容如下:
```shell
nodes:
- name: jobA
type: command
config:
command: echo "Hello World"
```
>s 参数说明
(1)Name:job 名称
(2)Type:job 类型。command 表示你要执行作业的方式为命令
(3)Config:job 配置
>w flow文件写法与yaml文件的格式相同:
● 大小写敏感
● 使用缩进表示层级关系
● 不支持Tab键制表符缩进,只使用空格缩进
● 缩进的空格数目不重要,只要相同层级的元素左侧对齐即可,通常开头缩进两个空格:
● 字符后缩进一个空格,如冒号,逗号,短横杆(-)等
● 字符串可以不用引号标注
● “#"表示注释
YAML 支持的数据结构有三种:
● 对象:键值对的集合,又称为映射(mapping)/ 哈希(hashes) / 字典(dictionary)
● 数组:一组按次序排列的值,又称为序列(sequence) / 列表(list)
● 纯量(scalars):单个的、不可再分的值
- 将 azkaban.project、basic.flow 文件压缩到一个 zip 文件,文件名称必须是英文。
- 在 WebServer 新建项目:http://hadoop-01:8081/index
![image.png](https://cos.easydoc.net/52087651/files/l6fwza85.png)
- 给项目名称命名和添加项目描述
![image.png](https://cos.easydoc.net/52087651/files/l6fx0hgi.png)
- first.zip 文件上传、选择上传的文件
![image.png](https://cos.easydoc.net/52087651/files/l6fx1mvp.png)
- 执行任务流
![image.png](https://cos.easydoc.net/52087651/files/l6fxd1up.png)
![image.png](https://cos.easydoc.net/52087651/files/l6fxffhv.png)
![image.png](https://cos.easydoc.net/52087651/files/l6fxgiky.png)
- 在日志中,查看运行结果
## 作业依赖案例
需求:JobA 和 JobB 执行完了,才能执行 JobC
具体步骤:
- 修改 basic.flow 为如下内容
```shell
nodes:
- name: jobC
type: command
# jobC 依赖 JobA 和 JobB
dependsOn:
- jobA
- jobB
config:
command: echo "I’m JobC"
- name: jobA
type: command
config:
command: echo "I’m JobA"
- name: jobB
type: command
config:
command: echo "I’m JobB"
```
>s 参数说明:
dependsOn:作业依赖,后面案例中演示
- 将修改后的 basic.flow 和 azkaban.project 压缩成 second.zip 文件
- 重复 `HelloWorld 案例` 后续步骤。
![image.png](https://cos.easydoc.net/52087651/files/l6k6ajmf.png)
![image.png](https://cos.easydoc.net/52087651/files/l6k69at2.png)
![image.png](https://cos.easydoc.net/52087651/files/l6k6d17v.png)
## 自动失败重试案例
需求:如果执行任务失败,需要重试 3 次,重试的时间间隔 10000ms
具体步骤:
- 编译配置流
```shell
nodes:
# 需求:如果执行任务失败,需要重试 3 次,重试的时间间隔 1s
- name: JobA
type: command
config:
command: sh /not_exists.sh
retries: 3
retry.backoff: 1000
```
>s 参数说明:
retries:重试次数
retry.backoff:重试的时间间隔
- 将修改后的 basic.flow 和 azkaban.project 压缩成 three.zip 文件
- 重复 `HelloWorld 案例` 后续步骤。
- 执行并观察到一次失败+三次重试
![image.png](https://cos.easydoc.net/52087651/files/l6k6gt90.png)
- 也可以点击上图中的 Log,在任务日志中看到,总共执行了 4 次。
![image.png](https://cos.easydoc.net/52087651/files/l6k6i3nm.png)
- 也可以在 Flow 全局配置中添加任务失败重试配置,此时重试配置会应用到所有 Job。
案例如下:
```shell
config:
# 此时配置会在全局执行
retries: 3
retry.backoff: 10000
nodes:
# 需求:如果执行任务失败,需要重试 3 次,重试的时间间隔 1s
- name: JobA
type: command
```
## 手动失败重试案例
需求:JobA——>JobB(依赖于 A)——>JobC——>JobD——>JobE——>JobF。生产环境,任何 Job 都有可能挂掉,可以根据需求执行想要执行的 Job。
具体步骤:
1)编译配置流
```yaml
nodes:
- name: JobA
type: command
config:
command: echo "This is JobA."
- name: JobB
type: command
dependsOn:
- JobA
config:
command: echo "This is JobB."
- name: JobC
type: command
dependsOn:
- JobB
config:
command: echo "This is JobC."
- name: JobD
type: command
dependsOn:
- JobC
config:
command: echo "This is JobD."
- name: JobE
type: command
dependsOn:
- JobD
config:
command: echo "This is JobE."
- name: JobF
type: command
dependsOn:
- JobE
config:
command: echo "This is JobF.
```
2)将修改后的 basic.flow 和 azkaban.project 压缩成 five.zip 文件
3)重复 HelloWorld案例 后续步骤。
Enable 和 Disable 下面都分别有如下参数:
Parents:该作业的上一个任务
Ancestors:该作业前的所有任务
Children:该作业后的一个任务
Descendents:该作业后的所有任务
Enable All:所有的任务
4)可以根据需求选择性执行对应的任务。
![image.png](https://cos.easydoc.net/52087651/files/l6k6p0yr.png)
![image.png](https://cos.easydoc.net/52087651/files/l6k6pfu3.png)
关闭先前已执行的
![image.png](https://cos.easydoc.net/52087651/files/l6k6rrvr.png)
![image.png](https://cos.easydoc.net/52087651/files/l6k6t24i.png)
![image.png](https://cos.easydoc.net/52087651/files/l6k6tayo.png)
# Azkaban 进阶
## JavaProcess 作业类型案例
JavaProcess 类型可以运行一个自定义主类方法,type 类型为 javaprocess,可用的配置为:
Xms:最小堆
Xmx:最大堆
classpath:类路径
java.class:要运行的 Java 对象,其中必须包含 Main 方法
main.args:main 方法的参数
案例:
1)新建一个 azkaban 的 maven 工程
2)创建包名:com.root
3)创建 AzTest 类
```java
package com.liulike;
public class AzTest {
public static void main(String[] args) {
System.out.println("This is for testing!");
}
}
```
4)打包成 jar 包 azkaban_java.jar
5)新建 testJava.flow,内容如下
```yaml
nodes:
- name: test_java
type: javaprocess
config:
Xms: 96M
Xmx: 200M
java.class: com.liulike.AzTest
```
6)将 Jar 包、flow 文件和 project 文件打包成 javatest.zip
7)创建项目——>上传 javatest.zip ——>执行作业——>观察结果
![image.png](https://cos.easydoc.net/52087651/files/l6k6uu6v.png)
## 条件工作流案例
条件工作流功能允许用户自定义执行条件来决定是否运行某些Job。条件可以由当前Job
的父 Job 输出的运行时参数构成,也可以使用预定义宏。在这些条件下,用户可以在确定 Job
执行逻辑时获得更大的灵活性,例如,只要父 Job 之一成功,就可以运行当前 Job。
### 运行时参数案例
1)基本原理
(1)父 Job 将参数写入 JOB_OUTPUT_PROP_FILE 环境变量所指向的文件
(2)子 Job 使用 ${jobName:param}来获取父 Job 输出的参数并定义执行条件
2)支持的条件运算符:
(1)== 等于
(2)!= 不等于
(3)> 大于
(4)>= 大于等于
(5)< 小于
(6)<= 小于等于
(7)&& 与
(8)|| 或
(9)! 非
3)案例:
需求:
JobA 执行一个 shell 脚本。
JobB 执行一个 shell 脚本,但 JobB 不需要每天都执行,而只需要每个周一执行。
(1)新建 JobA.sh
```shell
#!/bin/bash
echo "do JobA"
wk=`date +%w`
echo "{\"wk\":$wk}" > $JOB_OUTPUT_PROP_FILE
```
(2)新建 JobB.sh
```shell
#!/bin/bash
echo "do JobB"
```
(3)新建 condition.flow
```yaml
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
dependsOn:
- JobA
config:
command: sh JobB.sh
condition: ${JobA:wk} == 1
```
(4)将 JobA.sh、JobB.sh、condition.flow 和 azkaban.project 打包成 condition.zip
(5)创建 condition 项目——>上传 condition.zip 文件——>执行作业——>观察结果
(6)按照我们设定的条件,JobB 会根据当日日期决定是否执行。
![image.png](https://cos.easydoc.net/52087651/files/l6k6ylhk.png)
如果正好是星期一:
![image.png](https://cos.easydoc.net/52087651/files/l6k72rjs.png)
![image.png](https://cos.easydoc.net/52087651/files/l6k726va.png)
如果不是星期一:
![image.png](https://cos.easydoc.net/52087651/files/l6k70u1b.png)
![image.png](https://cos.easydoc.net/52087651/files/l6k713x8.png)
### 预定义宏案例
Azkaban 中预置了几个特殊的判断条件,称为预定义宏。
预定义宏会根据所有父 Job 的完成情况进行判断,再决定是否执行。可用的预定义宏如下:
(1)all_success: 表示父 Job 全部成功才执行(默认)
(2)all_done:表示父 Job 全部完成才执行
(3)all_failed:表示父 Job 全部失败才执行
(4)one_success:表示父 Job 至少一个成功才执行
(5)one_failed:表示父 Job 至少一个失败才执行
1)案例
需求:
JobA 执行一个 shell 脚本
JobB 执行一个 shell 脚本
JobC 执行一个 shell 脚本,要求 JobA、JobB 中有一个成功即可执行
(1)新建 JobA.sh
```shell
#!/bin/bash
echo "do JobA"
```
(2)新建 JobC.sh
```shell
#!/bin/bash
echo "do JobC"
```
(3)新建 macro.flow
```yaml
nodes:
- name: JobA
type: command
config:
command: sh JobA.sh
- name: JobB
type: command
config:
command: sh JobB.sh
- name: JobC
type: command
dependsOn:
- JobA
- JobB
config:
command: sh JobC.sh
condition: one_success
```
(4)JobA.sh、JobC.sh、macro.flow、azkaban.project 文件,打包成 macro.zip。
>d 注意:没有 JobB.sh
(5)创建 macro 项目——>上传 macro.zip 文件——>执行作业——>观察结果
![image.png](https://cos.easydoc.net/52087651/files/l6k740n3.png)
![image.png](https://cos.easydoc.net/52087651/files/l6k74eee.png)
![image.png](https://cos.easydoc.net/52087651/files/l6k74ut6.png)
## 定时执行案例
需求:JobA 每间隔 1 分钟执行一次;
具体步骤:
1)Azkaban 可以定时执行工作流。在执行工作流时候,选择左下角 Schedule
![image.png](https://cos.easydoc.net/52087651/files/l6k77v3v.png)
2)右上角注意时区是上海,然后在左面填写具体执行事件,填写的方法和 crontab 配置定时任务规则一致。
![image.png](https://cos.easydoc.net/52087651/files/l6k77ga9.png)
3)观察结果
![image.png](https://cos.easydoc.net/52087651/files/l6k78i9u.png)
![image.png](https://cos.easydoc.net/52087651/files/l6k7cjcy.png)
4)删除定时调度
点击 remove Schedule 即可删除当前任务的调度规则。
![image.png](https://cos.easydoc.net/52087651/files/l6k7cywl.png)
## 邮件报警案例
### 注册邮箱
1)申请注册一个 126 邮箱
![image.png](https://cos.easydoc.net/52087651/files/l6k89dz8.png)
2)点击设置——>POP3/SMTP/IMAP
![image.png](https://cos.easydoc.net/52087651/files/l6k8alg7.png)
3)开启 SMTP 服务
![image.png](https://cos.easydoc.net/52087651/files/l6k41vtt.png)
4)一定要记住授权码
![image.png](https://cos.easydoc.net/52087651/files/l6k40mr6.png)
### 默认邮件报警案例
Azkaban 默认支持通过邮件对失败的任务进行报警,配置方法如下:
1 ) 在azkaban-web节点hadoop-01上,编辑 /opt/azkaban-web/conf/azkaban.properties,修改如下内容
```shell
[root@hadoop-01 azkaban-web]$ vi /opt/azkaban-web/conf/azkaban.properties
```
添加如下内容:
#这里设置邮件发送服务器,需要 申请邮箱,切开通 stmp 服务,以下只是例子
```shell
mail.sender=root@126.com
mail.host=smtp.126.com
mail.user=root@126.com
mail.password=用邮箱的授权码
```
2)保存并重启 web-server
```shell
[root@hadoop-01 azkaban-web]$ bin/shutdown-web.sh
[root@hadoop-01 azkaban-web]$ bin/start-web.sh
```
3)编辑 basic.flow
```yaml
nodes:
- name: jobA
type: command
config:
command: echo "This is an email test."
```
4)将 azkaban.project 和 basic.flow 压缩成 email.zip
5)创建工程——>上传文件——>执行作业——>查看结果
![image.png](https://cos.easydoc.net/52087651/files/l6k7jw56.png)
## 电话报警案例
### 第三方告警平台集成
有时任务执行失败后邮件报警接收不及时,因此可能需要其他报警方式,比如电话报警。
如有类似需求,可与第三方告警平台进行集成,例如睿象云。
1)进入睿象云官网注册账号并登录
官网地址:[https://www.aiops.com/](https://www.aiops.com/)
![image.png](https://cos.easydoc.net/52087651/files/l6k8v5w6.png)
2)集成告警平台,使用 Email 集成
![image.png](https://cos.easydoc.net/52087651/files/l6k8y9z3.png)
3)获取邮箱地址,后边需将报警信息发送至该邮箱
![image.png](38)
4)配置分派策略
![image.png](https://cos.easydoc.net/52087651/files/l6k94pjz.png)
5)配置通知策略
![image.png](https://cos.easydoc.net/52087651/files/l6k97mtv.png)
### 测试
执行上一个邮件通知的案例,将通知对象改为刚刚集成第三方平台时获取的邮箱。
## Azkaban 多 Executor 模式注意事项
Azkaban 多 Executor 模式是指,在集群中多个节点部署 Executor。在这种模式下,
Azkaban web Server 会根据策略,选取其中一个 Executor 去执行任务。
为确保所选的 Executor 能够准确的执行任务,我们须在以下两种方案任选其一,推荐使
用方案二。
方案一:指定特定的 Executor(hadoop-01)去执行任务。
1)在 MySQL 中 azkaban 数据库 executors 表中,查询 hadoop-01 上的 Executor 的 id。
2)在执行工作流程时加入 useExecutor 属性,如下
方案二:在 Executor 所在所有节点部署任务所需脚本和应用。
# 参考资料
## Azkaban 完整配置
见官网文档:https://azkaban.readthedocs.io/en/latest/configuration.html
## 可能遇见的错误
```shell
08-08-2022 10:38:29 CST jobA INFO - Cannot request memory (Xms 0 kb, Xmx 0 kb) from system for job jobA, sleep for 60 secs and retry, attempt 1 of 720
```
解决:
在azkaban的根目录下的plugins/jobtypes/目录下的commonprivate.properties文件内末尾加上`memCheck.enabled=false`重启即可