相关总结

一、参数接收与文件校验

1. 用户输入获取

从命令行接收日期参数(格式:yyyymmdd

1
2
3
4
dt = input("请输入参数: ")
if not dt:
print("请输入参数(格式:yyyymmdd)")
exit()

2. 参数格式验证

检查输入是否符合 8 位纯数字格式

1
2
3
4
5
6
7
8
def test_value(dt, fpath):
if len(dt) == 8 and dt.isdigit():
fname = f"{dt[0:4]}-{dt[4:6]}-{dt[6:8]}.log"
fnamein = f'{fpath}/{fname}'
return fname, fnamein
else:
print("输入参数格式不对: yyyymmdd")
return

3. 文件存在性检查

验证对应日期的日志文件是否存在

1
2
3
4
5
6
def test_file(file):
if os.path.isfile(file):
return file
else:
print("找不到该文件")
return None

二、日志数据清洗(ETL)

1. 日志文件读取

逐行读取原始日志文件

1
2
3
with open(fnamein, 'r', encoding='utf-8') as f:
for r in f.readlines():
# 处理每一行日志

2. 基础字段提取

从日志行中解析 IP、日期和 URL

1
2
3
4
5
6
result = r.split(' ')
ip = result[0]
date = result[3] # 格式: [20/May/2021:00:00:02
date = datetime.strptime(date, '[%d/%b/%Y:%H:%M:%S')
date = datetime.strftime(date, '%Y-%m-%d %H:%M:%S')
url = result[6]

3. User-Agent 解析

从 User-Agent 字符串中提取操作系统和浏览器信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
result1 = r.split('"')[-2]  # 提取User-Agent字段

# 操作系统检测逻辑
os_markers = [("Windows", "Windows"), ("MacOS", "Mac OS"), ("Linux", "Linux")]
osinfo = "Unknown"
for os_name, marker in os_markers:
if marker in result1:
osinfo = os_name
break

# 浏览器检测逻辑
browser_markers = [("Chrome", "Chrome"), ("Safari", "Safari"), ...]
browser = "Unknown"
for browser_name, marker in browser_markers:
if marker in result1:
browser = browser_name
break

4. 清洗数据写入

将提取的字段写入临时文件,使用制表符分隔

1
2
loginfo = [ip, date, url, osinfo, browser]
f1.write('\t'.join(loginfo) + '\n')

三、数据导入 Hive 数据库

1. Hive 表创建

检查目标表是否存在,不存在则创建分区表

1
2
3
4
5
6
7
8
9
10
11
sql_create = """CREATE TABLE IF NOT EXISTS log (
ip string,
date_l string,
url string,
osinfo string,
browser string
) PARTITIONED BY (dt string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
"""
cursor.execute(sql_create)

2. 数据加载到分区

将清洗后的临时文件数据加载到对应日期分区

1
2
3
sql_load = f"""LOAD DATA LOCAL INPATH '{fnameou}'
INTO TABLE log PARTITION (dt='{dt}')"""
cursor.execute(sql_load)

3. Hive 连接管理

建立与 HiveServer2 的连接并执行 SQL

1
2
3
4
5
6
7
8
9
10
11
conn = hive.connect(
host='192.168.200.100',
port=10000,
username='root',
database='db_hive'
)
cursor = conn.cursor()
# 执行SQL...
cursor.execute(sql)
cursor.close()
conn.close()

etl_log.py 脚本完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from datetime import datetime
import os
from pyhive import hive

# 日志文件目录
fpath = '/root/mydata'

def etl_log(fnamein, fnameou):
# 清洗日志文件,提取关键信息并写入新文件
with open(fnamein, 'r', encoding='utf-8') as f:
with open(fnameou, 'a', encoding='utf-8') as f1:
for r in f.readlines():
# 分割日志行,提取基础字段
result = r.split(' ')
ip = result[0]
date = result[3]
# 转换日期格式
date = datetime.strptime(date, '[%d/%b/%Y:%H:%M:%S')
date = datetime.strftime(date, '%Y-%m-%d %H:%M:%S')
url = result[6]
# 提取User-Agent信息
result1 = r.split('"')[-2]

# 操作系统识别规则
os_markers = [
("Windows", "Windows"),
("MacOS", "Mac OS"),
("Linux", "Linux")
]
# 浏览器识别规则
browser_markers = [
("Chrome", "Chrome"),
("Safari", "Safari"),
("Firefox", "Firefox"),
("Opera", "Opera")
]

# 检测操作系统
osinfo = "Unknown"
for os_name, marker in os_markers:
if marker in result1:
osinfo = os_name
break

# 检测浏览器
browser = "Unknown"
for browser_name, marker in browser_markers:
if marker in result1:
browser = browser_name
break

# 构建清洗后的记录并写入文件
loginfo = [ip, date, url, osinfo, browser]
f1.write('\t'.join(loginfo) + '\n')

def load_log(fnameou, dt):
# 将清洗后的文件加载到Hive数据库
# 创建log表sql语句
sql_create = """create table if not exists log (
ip string,
date_l string,
url string,
osinfo string,
browser string
)
partitioned by (dt string)
row format delimited
fields terminated by '\t'
"""

# 加载数据SQL
sql_load = f"""load data local INPATH '{fnameou}'
into table log partition (dt='{dt}')"""

# 连接Hive并执行SQL
conn = hive.connect(host='192.168.200.100', port=10000, username='root', database='db_hive')
cursor = conn.cursor()
cursor.execute(sql_create)
cursor.execute(sql_load)
cursor.close()
conn.close()

def test_value(dt, fpath):
"""验证输入参数并返回日志文件路径"""
if len(dt) == 8 and dt.isdigit():
# 转换日期格式为标准日志文件名
fname = f"{dt[0:4]}-{dt[4:6]}-{dt[6:8]}.log"
fnamein = f'{fpath}/{fname}'
return fname, fnamein
else:
print("输入参数格式不对: yyyymmdd")
return

def test_file(file):
"""验证文件是否存在"""
if os.path.isfile(file):
return file
else:
print("找不到该文件")

if __name__ == "__main__":
# 获取用户输入的日期参数
dt = input("请输入参数: ")
if not dt:
print("请输入参数(格式:yyyymmdd)")
exit()

# 验证参数并获取文件路径
result = test_value(dt, fpath)
if not result:
exit()
else:
fname, fnamein = result
fnamein = test_file(fnamein)
if not fnamein: # 增加文件检查的退出逻辑
exit()

# 构建输出文件名并执行ETL和加载
fname1 = f"etl_{fname.replace('.log','.txt')}"
fnameou = f'{fpath}/{fname1}'
etl_log(fnamein, fnameou)
load_log(fnameou, dt)

验证结果