1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
| from datetime import datetime import os from pyhive import hive
fpath = '/root/mydata'
def etl_log(fnamein, fnameou): with open(fnamein, 'r', encoding='utf-8') as f: with open(fnameou, 'a', encoding='utf-8') as f1: for r in f.readlines(): result = r.split(' ') ip = result[0] date = result[3] date = datetime.strptime(date, '[%d/%b/%Y:%H:%M:%S') date = datetime.strftime(date, '%Y-%m-%d %H:%M:%S') url = result[6] result1 = r.split('"')[-2] os_markers = [ ("Windows", "Windows"), ("MacOS", "Mac OS"), ("Linux", "Linux") ] browser_markers = [ ("Chrome", "Chrome"), ("Safari", "Safari"), ("Firefox", "Firefox"), ("Opera", "Opera") ] osinfo = "Unknown" for os_name, marker in os_markers: if marker in result1: osinfo = os_name break browser = "Unknown" for browser_name, marker in browser_markers: if marker in result1: browser = browser_name break loginfo = [ip, date, url, osinfo, browser] f1.write('\t'.join(loginfo) + '\n')
def load_log(fnameou, dt): sql_create = """create table if not exists log ( ip string, date_l string, url string, osinfo string, browser string ) partitioned by (dt string) row format delimited fields terminated by '\t' """
sql_load = f"""load data local INPATH '{fnameou}' into table log partition (dt='{dt}')""" conn = hive.connect(host='192.168.200.100', port=10000, username='root', database='db_hive') cursor = conn.cursor() cursor.execute(sql_create) cursor.execute(sql_load) cursor.close() conn.close()
def test_value(dt, fpath): """验证输入参数并返回日志文件路径""" if len(dt) == 8 and dt.isdigit(): fname = f"{dt[0:4]}-{dt[4:6]}-{dt[6:8]}.log" fnamein = f'{fpath}/{fname}' return fname, fnamein else: print("输入参数格式不对: yyyymmdd") return
def test_file(file): """验证文件是否存在""" if os.path.isfile(file): return file else: print("找不到该文件")
if __name__ == "__main__": dt = input("请输入参数: ") if not dt: print("请输入参数(格式:yyyymmdd)") exit() result = test_value(dt, fpath) if not result: exit() else: fname, fnamein = result fnamein = test_file(fnamein) if not fnamein: exit() fname1 = f"etl_{fname.replace('.log','.txt')}" fnameou = f'{fpath}/{fname1}' etl_log(fnamein, fnameou) load_log(fnameou, dt)
|