DataX

项目地址:https://github.com/alibaba/DataX
官方文档:https://github.com/alibaba/DataX/blob/master/introduction.md

DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

​ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

XXL-JOB

一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展
任务调度中心页面:http://192.168.200.100:8080/xxl-job-admin/
启停脚本:vim /opt/module/apache-hive-3.1.2-bin/bin/xxl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#!/bin/bash
act=$1

start(){
echo "starting xxl-job"
ssh root@hadoop100 "cd /opt/module/xxljob; nohup java -jar xxl-job-admin-2.3.0.jar > xxl-job.log 2>&1 &"
ssh root@hadoop100 "cd /opt/module/xxljob; nohup java -jar xxl-job-executor-sample-springboot-2.3.0.jar > xxl-job-executor.log 2>&1 &"
ssh root@hadoop101 "cd /opt/module/xxljob; nohup java -jar xxl-job-executor-sample-springboot-2.3.0.jar > xxl-job-executor.log 2>&1 &"
ssh root@hadoop102 "cd /opt/module/xxljob; nohup java -jar xxl-job-executor-sample-springboot-2.3.0.jar > xxl-job-executor.log 2>&1 &"
}

stop(){
echo "stopping xxl-job"
ssh root@hadoop100 "ps -aux | grep xxl-job-admin | grep -v grep | awk '{print \$2}' | xargs kill -9"
ssh root@hadoop100 "ps -aux | grep xxl-job-executor-sample | grep -v grep | awk '{print \$2}' | xargs kill -9"
ssh root@hadoop101 "ps -aux | grep xxl-job-executor-sample | grep -v grep | awk '{print \$2}' | xargs kill -9"
ssh root@hadoop102 "ps -aux | grep xxl-job-executor-sample | grep -v grep | awk '{print \$2}' | xargs kill -9"
}

status(){
echo "=============== hadoop102 ================="
ssh root@hadoop102 "ps -aux | grep xxl-job-executor-sample | grep -v grep"
echo "=============== hadoop101 ================="
ssh root@hadoop101 "ps -aux | grep xxl-job-executor-sample | grep -v grep"
echo "=============== hadoop100 ================="
ssh root@hadoop100 "ps -aux | grep xxl-job | grep -v grep"
}

case $act in
start)
start
status
;;
stop)
stop
status
;;
restart)
stop
start
status
;;
status)
status
;;
esac

Hive 表导出到 MySQL 数据库

test_read_hdfs.json

将 HDFS 中的数据同步到 MySQL 数据库

  1. 从 HDFS 路径 /user/hive/warehouse/db_hive.db/sqoop_emp 读取数据
  2. 按制表符分隔解析文本行,提取 8 个字段
  3. 通过 3 个并发通道将数据传输到 MySQL
  4. 将数据插入到 MySQL 的 test.emp 表中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/db_hive.db/sqoop_emp",
"defaultFS": "hdfs://hadoop100:8020",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "string"
},
{
"index": 7,
"type": "string"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": "\t"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "test",
"password": "test",
"column": [
"empno",
"ename",
"job",
"mgr",
"hiredate",
"sal",
"comm",
"deptno"
],
"session": [
"set session sql_mode='ANSI'"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop100:3306/test?useSSL=False&useUnicode=true&characterEncoding=utf-8",
"table": [
"emp"
]
}
]
}
}
}
]
}
}

exp_log.sh

  1. 生成配置文件:根据传入的日期分区参数 $1,动态生成 DataX 配置文件 exp_log.json,配置从 HDFS 读取指定日期分区的数据,并写入 MySQL 表。

  2. 创建目标表:在 MySQL 中创建 log 表(如果不存在),定义五个 VARCHAR 类型的字段用于存储日志数据。

  3. 执行数据同步:调用 DataX 工具执行数据导出任务,将 HDFS 上的文本格式数据(/user/hive/warehouse/db_hive.db/log/dt=$part)按字段映射关系写入 MySQL 的 log 表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
#!/bin/bash
part=$1
# 创建一个json,用来给datax调用
echo '{
"job": {
"setting": {
"speed": {
"channel": 3
}
},
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"path": "/user/hive/warehouse/db_hive.db/log/dt='$part'",
"defaultFS": "hdfs://hadoop100:8020",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
},
{
"index": 4,
"type": "string"
}
],
"fileType": "text",
"encoding": "UTF-8",
"fieldDelimiter": "\t"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "test",
"password": "test",
"column": [
"ip",
"date_l",
"url",
"osinfo",
"bowser"
],
"session": [
"set session sql_mode='"'ANSI'"'"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://hadoop100:3306/test?useSSL=False&useUnicode=true&characterEncoding=utf-8",
"table": [
"log"
]
}
]
}
}
}
]
}
}' > /root/datax/json/exp_log.json

# 写一个建表语句,再mysql建表
mysql -utest -ptest --database=test -e \
'create table if not exists log (
ip varchar(500),
date_l varchar(500),
url varchar(500),
osinfo varchar(500),
bowser varchar(500)
)'

# 启动datax程序,完成操作
/usr/local/bin/python3.9 /opt/module/datax/bin/datax.py /root/datax/json/exp_log.json

xxl-job-任务调度中心-新增任务管理

GLUE IDE

1
2
3
4
5
6
7
8
9
10
#!/bin/bash
echo "xxl-job: hello shell"

echo "脚本位置:$0"
echo "任务参数:$1"
echo "分片序号 = $2"
echo "分片总数 = $3"
/root/datax/shell/exp_log.sh $1
echo "Good bye!"
exit 0

分别手动执行一次

mysql.test中查询log数据量

动态分区表导入导出

imp_log.json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "test",
"password": "test",
"column": [
"*"
],
"splitPk": "db_id",
"connection": [
{
"querySql": [
"select * from log where osinfo = 'Windows';"
],
"jdbcUrl": [
"jjdbc:mysql://hadoop100:3306/test?useSSL=False&useUnicode=true&characterEncoding=utf-8"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop100:8020",
"fileType": "orc",
"path": "/user/hive/warehouse/db_hive.db/log_tmp",
"fileName": "log_p",
"column": [
{
"name": "ip",
"type": "string"
},
{
"name": "date_l",
"type": "string"
},
{
"name": "url",
"type": "string"
},
{
"name": "osinfo",
"type": "string"
},
{
"name": "browser",
"type": "string"
}
],
"writeMode": "truncate",
"fieldDelimiter": "\t",
"compress": "NONE"
}
}
}
]
}
}

在db_hive中建log_temp表

1
2
3
4
5
6
7
8
9
10
11
create table log_tmp(
ip string,
date_l string,
url string,
osinfo string,
browser string
)
row format delimited
fields terminated by '\t'
lines terminated by '\n'
stored as orc

运行脚本
/usr/local/bin/python3.9 /opt/module/datax/bin/datax.py /root/datax/json/imp_log.json

imp_log.sh

  1. JSON 配置文件生成:创建 DataX 任务配置文件 imp_log.json,配置从 MySQL 读取数据并写入 HDFS 的 ORC 文件格式。
  2. Hive 表创建:使用 Beeline 连接 Hive,创建两个 ORC 格式的表:
    • log_tmp:临时表,用于存储从 MySQL 导入的原始数据
    • log_p:分区表,按操作系统类型 (os_tp) 分区
  3. 数据抽取与导入:调用 DataX 工具执行数据同步任务,将 MySQL 的 log 表数据导入到 Hive 的 log_tmp 表。
  4. 动态分区插入:配置 Hive 动态分区参数,将 log_tmp 表的数据按 osinfo 字段的值自动分配到 log_tmp2 表的不同分区中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/bin/bash
#创建json文件
echo '
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "test",
"password": "test",
"column": [
"*"
],
//"splitPk": "db_id",
"connection": [
{
"table": [
"log;"
],
"jdbcUrl": [
"jdbc:mysql://hadoop100:3306/test?useSSL=False&useUnicode=true&characterEncoding=utf-8"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://hadoop100:8020",
"fileType": "orc",
"path": "/user/hive/warehouse/db_hive.db/log_tmp",
"fileName": "log_tmp",
"column": [
{
"name": "ip",
"type": "string"
},
{
"name": "date_l",
"type": "string"
},
{
"name": "url",
"type": "string"
},
{
"name": "osinfo",
"type": "string"
},
{
"name": "browser",
"type": "string"
}
],
"writeMode": "truncate",
"fieldDelimiter": "\t",
"compress": "NONE"
}
}
}
]
}
}' > /root/datax/json/imp_log.json

# 建临时表和分区表
beeline -u jdbc:hive2://hadoop100:10000/db_hive -u root -p 545456 -e \
"CREATE TABLE if not exists log_tmp(
ip string,
date_l string,
url string,
osinfo string,
browser string
)
row format delimited
fields terminated by '\t'
lines TERMINATED by '\n'
STORED AS orc;
CREATE TABLE if not exists log_p(
ip string,
date_l string,
url string,
osinfo string,
browser string
)
partitioned by (os_tp string)
row format delimited
fields terminated by '\t'
lines TERMINATED by '\n'
STORED AS orc "
echo '*******************建表完成--***********************'
#导入数据
/bin/python3 /opt/module/datax/bin/datax.py /root/datax/json/imp_log.json
echo '*****************数据导入成功************************'
#动态分区
beeline -u jdbc:hive2://hadoop100:10000/db_hive -u root -p 545456 -e \
"
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=200;
set hive.exec.max.dynamic.partitions.pernode=50;
set hive.exec.max.created.files=1000;
set hive.error.on.empty.partition=false;
set hive.support.quoted.identifiers=none;
insert into log_p partition (os_tp) select l.*,osinfo from log_tmp l;
"
echo '------------------完成------------------'