2026/3/4 16:22:57
网站建设
项目流程
网站托管维护合同,wordpress 中文教程,wordpress 目录安全,中国万网域名查询你是不是也遇到过#xff1a;
线上出问题#xff0c;日志一大堆#xff0c;靠人肉 grep想统计“最常见异常 / 最频繁报错模块 / 报错时间分布”想把结果发给同事/领导#xff0c;但复制粘贴太丑
这篇我给你一个生产可用的小工具#xff1a;
✅ 支持大日志#xff08;流式…你是不是也遇到过线上出问题日志一大堆靠人肉 grep想统计“最常见异常 / 最频繁报错模块 / 报错时间分布”想把结果发给同事/领导但复制粘贴太丑这篇我给你一个生产可用的小工具✅ 支持大日志流式读取✅ 自动抽取异常块Java/Python 常见堆栈✅ 聚合 Top 异常、出现次数、首末出现时间✅ 输出Markdown 报告可直接贴到 CSDN/飞书/钉钉✅ 一行命令运行1. 目标与效果你运行python log_report.py --input app.log --out report.md会生成一份report.md包含异常 Top N按出现次数排序每种异常的次数、首次时间、最后时间、示例片段日志整体时间范围、错误密度每分钟 error 数2. 支持的日志格式够用且好扩展行首时间常见格式之一即可2026-01-09 01:47:122026-01-09T01:47:12错误块识别JavaException/Error开头 多行at xxx(...)PythonTraceback (most recent call last): 多行堆栈没命中也没关系工具仍会统计ERROR行并输出密度。3. 核心思路3 句话讲清流式读取不用一次性把日志读进内存遇到“异常起始标记”时开始收集多行堆栈直到块结束对异常块做指纹hash聚合计数、时间范围、示例4. 直接上代码完整可运行文件名log_report.pyPython 3.9无三方依赖#!/usr/bin/env python3# -*- coding: utf-8 -*-importargparseimporthashlibimportrefromdataclassesimportdataclass,fieldfromdatetimeimportdatetimefromtypingimportDict,List,Optional,Tuple# --------- 时间解析可扩展 ----------TS_PATTERNS[# 2026-01-09 01:47:12re.compile(r^(?Pts\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2})),# 2026-01-09T01:47:12.123re.compile(r^(?Pts\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3})),]defparse_ts(line:str)-Optional[datetime]:forpatinTS_PATTERNS:mpat.search(line)ifnotm:continuerawm.group(ts)forfmtin(%Y-%m-%d %H:%M:%S,%Y-%m-%dT%H:%M:%S,%Y-%m-%dT%H:%M:%S.%f):try:returndatetime.strptime(raw,fmt)exceptValueError:passreturnNone# --------- 异常块识别 ----------JAVA_STARTre.compile(r(\bException\b|\bError\b|\bCaused by:))JAVA_STACKre.compile(r^\sat\s\S\(.*\)$)PY_STARTre.compile(r^Traceback \(most recent call last\):)PY_STACKre.compile(r^\sFile\s\.*\, line \d, in .$)LEVEL_ERRORre.compile(r\bERROR\b|\bFATAL\b,re.IGNORECASE)defis_java_exception_start(line:str)-bool:# 常见xxxException: msg / Caused by: xxxreturnbool(JAVA_START.search(line))defis_java_stack_line(line:str)-bool:returnbool(JAVA_STACK.match(line))defis_py_exception_start(line:str)-bool:returnbool(PY_START.match(line))defis_py_stack_line(line:str)-bool:returnbool(PY_STACK.match(line))deflooks_like_blank_or_new_entry(line:str)-bool:# 用“有时间戳”判断是否进入下一条日志returnparse_ts(line)isnotNonedataclassclassExceptionAgg:count:int0first_seen:Optional[datetime]Nonelast_seen:Optional[datetime]Nonesample:strdataclassclassReport:total_lines:int0error_lines:int0start_time:Optional[datetime]Noneend_time:Optional[datetime]Noneper_minute_errors:Dict[str,int]field(default_factorydict)exceptions:Dict[str,ExceptionAgg]field(default_factorydict)deffingerprint_exception(block:str)-str: 对异常块做指纹去掉明显变化的信息后 hash # 去掉数字、耗时、id 等易变项可按你的日志优化normalizedre.sub(r\d,N,block)normalizedre.sub(r0x[0-9a-fA-F],0xHEX,normalized)normalizedre.sub(r\b[a-f0-9]{16,}\b,HEXSTR,normalized)# 长 hashhhashlib.sha1(normalized.encode(utf-8,errorsignore)).hexdigest()returnh[:12]defminute_key(ts:datetime)-str:returnts.strftime(%Y-%m-%d %H:%M)defupdate_time_range(rep:Report,ts:Optional[datetime])-None:iftsisNone:returnifrep.start_timeisNoneortsrep.start_time:rep.start_timetsifrep.end_timeisNoneortsrep.end_time:rep.end_timetsdefadd_error_minute(rep:Report,ts:Optional[datetime])-None:iftsisNone:returnkminute_key(ts)rep.per_minute_errors[k]rep.per_minute_errors.get(k,0)1defcommit_exception(rep:Report,ts:Optional[datetime],block:str)-None:fpfingerprint_exception(block)aggrep.exceptions.get(fp)ifaggisNone:aggExceptionAgg(count0,first_seents,last_seents,sampleblock[:1200])rep.exceptions[fp]agg agg.count1iftsisnotNone:ifagg.first_seenisNoneortsagg.first_seen:agg.first_seentsifagg.last_seenisNoneortsagg.last_seen:agg.last_seentsifnotagg.sample:agg.sampleblock[:1200]defparse_log(path:str)-Report:repReport()in_excFalseexc_lines:List[str][]exc_ts:Optional[datetime]Noneexc_type:Optional[str]None# java / pydefflush_exc():nonlocalin_exc,exc_lines,exc_ts,exc_typeifin_excandexc_lines:commit_exception(rep,exc_ts,\n.join(exc_lines))in_excFalseexc_lines[]exc_tsNoneexc_typeNonewithopen(path,r,encodingutf-8,errorsignore)asf:forlineinf:rep.total_lines1lineline.rstrip(\n)tsparse_ts(line)update_time_range(rep,ts)# 错误行统计即便没形成异常块ifLEVEL_ERROR.search(line):rep.error_lines1add_error_minute(rep,ts)# 异常块状态机ifnotin_exc:ifis_py_exception_start(line):in_excTrueexc_typepyexc_tsts exc_lines[line]continueifis_java_exception_start(line):in_excTrueexc_typejavaexc_tsts exc_lines[line]continueelse:# 已在异常块中判断是否继续收集ifexc_typejava:# Java 堆栈行 or 继续的 caused by 等ifis_java_stack_line(line)oris_java_exception_start(line)orline.strip().startswith(...):exc_lines.append(line)continue# 新日志条目出现 → 结束异常块iflooks_like_blank_or_new_entry(line):flush_exc()# 这行可能是新异常起点递归判断ifis_py_exception_start(line):in_excTrueexc_typepyexc_tsparse_ts(line)exc_lines[line]elifis_java_exception_start(line):in_excTrueexc_typejavaexc_tsparse_ts(line)exc_lines[line]continue# 其他行也可能是异常信息补充保守收集ifline.strip():exc_lines.append(line)continue# 空行先收集exc_lines.append(line)continueifexc_typepy:ifis_py_stack_line(line)orline.strip().startswith((Traceback,During handling of the above exception)):exc_lines.append(line)continue# Python 异常块通常以 “Exception: msg” 结束行出现ifline.strip()andnotlooks_like_blank_or_new_entry(line):exc_lines.append(line)# 继续收集一两行也无妨continueiflooks_like_blank_or_new_entry(line):flush_exc()ifis_py_exception_start(line):in_excTrueexc_typepyexc_tsparse_ts(line)exc_lines[line]elifis_java_exception_start(line):in_excTrueexc_typejavaexc_tsparse_ts(line)exc_lines[line]continueexc_lines.append(line)continue# 文件结束别忘了 flushifin_exc:flush_exc()returnrepdefrender_md(rep:Report,top_n:int10)-str:lines:List[str][]lines.append(# 日志异常分析报告\n)lines.append(## 概览\n)lines.append(f- 总行数**{rep.total_lines}**)lines.append(f- ERROR/FATAL 行数**{rep.error_lines}**)ifrep.start_timeandrep.end_time:lines.append(f- 时间范围**{rep.start_time}** ~ **{rep.end_time}**)lines.append()# 错误密度 Topifrep.per_minute_errors:lines.append(## 错误密度每分钟 ERROR Top 10\n)top_minutessorted(rep.per_minute_errors.items(),keylambdax:x[1],reverseTrue)[:10]lines.append(| 分钟 | ERROR 数 |)lines.append(|---|---:|)fork,vintop_minutes:lines.append(f|{k}|{v}|)lines.append()# 异常 Topifrep.exceptions:lines.append(f## 异常聚合 Top{top_n}\n)itemssorted(rep.exceptions.items(),keylambdakv:kv[1].count,reverseTrue)[:top_n]lines.append(| 指纹 | 次数 | 首次出现 | 最后出现 |)lines.append(|---|---:|---|---|)forfp,agginitems:lines.append(f| {fp} |{agg.count}|{agg.first_seenor-}|{agg.last_seenor-}|)lines.append()# 详情lines.append(## 异常详情示例片段\n)forfp,agginitems:lines.append(f### {fp}{agg.count}次)lines.append(f- 首次{agg.first_seenor-})lines.append(f- 最后{agg.last_seenor-}\n)lines.append(text)lines.append(agg.sample.rstrip())lines.append(\n)else:lines.append(## 异常聚合\n)lines.append( 未识别到典型 Java/Python 堆栈异常块可能是日志格式不同。你仍可以从“错误密度”定位高发时间段。\n)return\n.join(lines)defmain():apargparse.ArgumentParser(descriptionGenerate log exception analysis report (Markdown).)ap.add_argument(--input,-i,requiredTrue,helplog file path)ap.add_argument(--out,-o,defaultreport.md,helpoutput markdown report path)ap.add_argument(--top,-t,typeint,default10,helptop N exceptions)argsap.parse_args()repparse_log(args.input)mdrender_md(rep,top_nargs.top)withopen(args.out,w,encodingutf-8)asf:f.write(md)print(f[OK] Report generated:{args.out})ifrep.start_timeandrep.end_time:print(f[INFO] Time range:{rep.start_time}~{rep.end_time})print(f[INFO] Total lines:{rep.total_lines}, ERROR lines:{rep.error_lines}, exceptions:{len(rep.exceptions)})if__name____main__:main()5. 快速试跑给你一个最小示例新建一个app.log写入2026-01-09 01:47:12 ERROR c.xxx.Service - boom java.lang.NullPointerException: x is null at c.xxx.Service.run(Service.java:10) at c.xxx.App.main(App.java:5) 2026-01-09 01:47:13 INFO ok 2026-01-09 01:47:20 ERROR c.xxx.Service - boom again java.lang.NullPointerException: x is null at c.xxx.Service.run(Service.java:10) at c.xxx.App.main(App.java:5)运行python log_report.py -i app.log -o report.md打开report.md就能看到聚合结果次数2。6. 生产使用建议别跳过✅ 1放到服务器定时跑0*/2 * * * /usr/bin/python3 /opt/tools/log_report.py -i /var/log/app/app.log -o /var/log/app/report.md✅ 2结合告警把 report.md 发到飞书/钉钉下一篇我会写一个“报告自动推送器”异常密度超过阈值才推送。✅ 3扩展你的日志格式只需要改两处TS_PATTERNS增加时间格式增加一种异常起始识别 后续工具会持续补充进《程序员自动化工具箱》喜欢的朋友别忘了点个关注订阅此专栏。