服务器

cdh服务器
主机: 192.168.8.67
端口: 22
用户: root
密码: zhiyun
当前服务器平台版本:CDH6.3
http://192.168.8.67:7180/cmf/login 用户密码都是 admin

1
2
3
4
5
6
7
8
9
10
# 创建个人目录
mkdir -p /zhiyun/lijinquan
cd /zhiyun/lijinquan
# 创建5个功能目录
# data - 存放数据文件
# jobs - 存放datax的配置文件
# sql - sql脚本
# shell - shell脚本
# python - python脚本
mkdir data jobs sql shell python

数据库
主机:192.168.8.8
数据库:jd
密码:123456

配置文件编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
{
"job": { // DataX的核心任务配置,一个job代表一次数据同步任务
"setting": { // 任务全局设置(性能控制、错误处理等)
"speed": { // 速度控制配置(并行度/吞吐量限制)
"channel": 3 // 并行通道数:设置为3表示同时启动3个线程并行读取和写入数据,用于控制同步速度(数值越大吞吐量越高,需根据源库和目标存储性能调整)
},
"errorLimit": { // 错误容忍度配置(允许的最大错误量)
"record": 0, // 允许的错误记录数上限:0表示不允许任何单条记录同步失败
"percentage": 0.02 // 允许的错误率上限:0.02即2%,表示当错误记录数占总记录数的比例≤2%时任务可继续执行(实际生效规则为"记录数上限"和"错误率"取更宽松的条件,此处0条记录严格于2%,故实际以record=0为准)
}
},
"content": [ // 同步任务的具体内容(数据源和目标存储的配置),可包含多个同步子任务(数组形式)
{
"reader": { // 数据读取插件配置(源端)
"name": "mysqlreader", // 读取插件名称:使用mysqlreader从MySQL数据库读取数据
"parameter": { // mysqlreader的具体参数配置
"username": "jd", // MySQL登录用户名
"password": "123456", // MySQL登录密码(生产环境建议加密存储,避免明文)
"column": [ // 需要读取的列名列表
"*" // "*"表示读取表中所有列(注意:需与writer的column配置的字段数量和顺序一致,否则会导致字段不匹配)
],
"connection": [ // MySQL连接信息(可配置多个库表,数组形式)
{
"table": [ // 要读取的表名列表
"hwz_nmpa" // 目标表:从该表读取数据
],
"jdbcUrl": [ // MySQL的JDBC连接地址(可配置多个,用于分库分表场景)
"jdbc:mysql://192.168.8.8:3306/jd" // 连接地址格式:jdbc:mysql://IP:端口/数据库名(此处数据库为jd)
]
}
]
}
},
"writer": { // 数据写入插件配置(目标端)
"name": "hdfswriter", // 写入插件名称:使用hdfswriter将数据写入HDFS
"parameter": { // hdfswriter的具体参数配置
"defaultFS": "hdfs://hdp:8020", // HDFS的NameNode地址:格式为hdfs://主机名/服务名:端口(此处hdp为主机名,8020为HDFS默认端口)
"fileType": "orc", // 输出文件格式:ORC格式(列式存储,压缩率高,适合大数据场景)
"path": "/zhiyun/huangwenzhe/tmp/nmpa", // HDFS上的存储路径:数据将写入该目录下
"fileName": "nmpa.data", // 输出文件名前缀:实际生成的文件名为"nmpa.data_随机后缀"(因并行写入可能生成多个文件)
"column": [ // 写入HDFS的字段定义(需与reader读取的列一一对应,名称和类型需匹配)
// 字段名,类型
{"name": "id", "type": "int"},
{"name": "index_id", "type": "string"},
{"name": "title", "type": "string"},
{"name": "category", "type": "string"},
{"name": "post_date", "type": "string"},
{"name": "content", "type": "string"},
{"name": "attachment_count", "type": "int"},
{"name": "attachment_path", "type": "string"},
{"name": "link", "type": "string"},
{"name": "create_time", "type": "string"},
{"name": "update_time", "type": "string"}
],
"writeMode": "truncate", // 写入模式:truncate表示先删除目标路径下的所有文件,再写入新数据(覆盖写);可选值还有append(追加)、nonConflict(不冲突则写)
"fieldDelimiter": "\t" // 字段分隔符:因fileType为ORC(列式存储,内部不依赖分隔符),此参数实际生效有限,主要用于兼容文本格式(如txt/csv)的场景
}
}
}
]
}
}

抽取数据

1
2
# 注意抽取前需要提前创建HDFS目录
hadoop fs -mkdir -p /zhiyun/huangwenzhe/tmp/nmpa

Browsing HDFS处验证

1
2
# 开始抽取
python /opt/datax/bin/datax.py /zhiyun/huangwenzhe/jobs/nmpa.json

Browsing HDFS处验证

Hive建表

1
2
3
# 终端进入hive
hive
# 执行以下sql语句(正确执行应显示ok)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- 建立ODS数据库
create database if not exists ods_huangwenzhe location "/zhiyun/huangwenzhe/ods";

-- 建立NMPA分区表
-- ODS的表应该是外部表
-- 内部表删除表的时候删除数据,外部表删除表不删除数据
create external table if not exists ods_huangwenzhe.nmpa(
id int,
index_id string,
title string,
category string,
post_date string,
content string,
attachment_count string,
attachment_path string,
link string,
create_time string,
update_time string
) partitioned by (dt string)
row format delimited fields terminated by "/t"
lines terminated by "\n"
stored as orc
location "/zhiyun/huangwenzhe/ods/nmpa";

查看表

加载数据到当月分区

1
load data inpath "/zhiyun/huangwenzhe/tmp/nmpa/*" overwrite into table ods_huangwenzhe.nmpa partition(dt="2025-07");

验证数据

1
2
select count(1) from ods_huangwenzhe.nmpa;
select * from ods_huangwenzhe.nmpa limit 1;

自动化调度

需求:每天凌晨4点自动执行这个流程
调度平台:海豚调度平台
优势:告警机制(任务出错了需要通知);自动重试
用户名/密码:admin / dolphinscheduler123

项目管理-创建项目-点击进入项目-工作流定义-创建工作流

加载数据的shell脚本实现动态按月分区

1
2
3
4
current_month=$(date +"%Y-%m") # 先定义变量

# 使用hive -e执行,注意变量引用要用$current_month,且外层用双引号
hive -e "load data inpath '/zhiyun/laojiaen/tmp/yaozh/*' overwrite into table ods_laojiaen.yaozh partition(dt='$current_month');"

成功运行