Captcha 07 可无缝切换的日志分析
girlfriend内置了read_text
插件来提供纯文本的日志分析功能。read_text
插件着重解决了以下几个问题:
- 如何灵活的应对日志中变化无穷的文本格式,比如以单行为记录、以多行为记录、单行记录和多行记录混杂在一起。
- 从文本记录到对象的映射以及分析结果的包装处理。
- 对文件的周期性分析中,分析位置的记录。
- 对于日志分析,很多日志文件通常会按照时间或者大小进行切割,分析程序要考虑如何无缝的去跟踪这些切割。
基本使用
假设我们有一个使用log4j打印出的日志需要分析,这个日志记录了接口的慢查询,响应时间超过了100ms的调用都会被记录了下来,每行是一条记录:
2016-03-27 22:08:14,897 [http-nio-8080-exec-4] INFO perf - com.shicaigj.api.v1.order.OrderController.order|246
其中com.shicaigj.api.v1.order.OrderController.order
是调用的方法,分割线后的数字表示调用这个接口使用了246ms。
我们需要把上面的日志提炼成表格:
Record Time | API | Response Time(ms) |
---|---|---|
2016-03-27 22:08:14 | com.shicaigj.api.v1.order.OrderController.order | 246 |
这个工作流需要read_text
和print_table
两个插件,工作流的代码如下:
from girlfriend.workflow.gfworkflow import Job
from girlfriend.plugin.text import TextR
from girlfriend.data.table import TableWrapper, Title
def record_handler(line):
"""将记录时间、接口名称和响应时间从一行文本记录中提取出来
"""
parts = line.split(" ")
record_time = parts[0] + " " + parts[1].split(",")[0]
api, response_time = parts[-1].split("|")
return record_time, api, response_time
def workflow(options):
work_units = (
# read_text
Job(
name="read_text",
plugin="read_text",
args=[
TextR(
filepath="perf.log",
record_matcher="line",
record_handler=record_handler,
result_wrapper=TableWrapper(
name="perf table",
titles=(
Title("record_time", "Record Time"),
Title("api", "API"),
Title("response_time", "Response Time")
)
),
)
]
),
# print_table
Job(
name="print_table",
plugin="print_table",
args="$read_text.result"
),
)
return work_units
这里包含了read_text
插件的最基本使用,它接受一组TextR对象作为参数,每个TextR对象代表一个要分析的日志文件,也就是说,我们可以使用read_text
一次性来分析多个日志文件。TextR对象的构造函数参数:
- filepath: 要分析的日志文件路径
- record_matcher: 记录匹配器,"line"的意思是指每行作为一条记录,这也是默认的记录匹配器。
- record_handler: 记录处理器,起到映射对象的作用,通过该回调函数,可以将表示记录的文本行映射成对象,上面的例子是简单返回了元组。
- result_wrapper: 结果包装器,record_handler产生的对象最终会被收集到一个列表中,通过result_wrapper指定的函数来对这个列表进行处理。这里选择包装成表格。
文件指针
我们经常会使用crontab周期性的对日志进行分析,因为日志往往都是动态增长的,所以这种情形下都需要记录上次分析到的位置。在read_text
插件中只需要一个参数就可以完成这项工作:
TextR(
filepath="test.log",
record_matcher="line",
pointer="/tmp/test_pointer"
)
对!就是pointer参数,read_text
会使用这个指定的文件来记录文件指针。指定这个参数后就什么都不用管了,每次脚本运行都会检查这个文件,如果文件不存在或为空,那么会从头分析日志,否则会先seek上次分析到的位置再继续。
无缝切换
使用crontab还有一个问题,就是当日志按大小或者按时间进行切分时,如果分析脚本没有对应的感应措施,那么可能会造成分析数据丢失的情况。read_text
对这种情况也进行了处理,通过记录分析文件的inode值,来感知目标文件是否发生了变化,开发者唯一需要做的是定义一个回调函数change_file_logic
来告知girlfriend日志的切分规则。
假设我们分析的日志文件名为event.log,每天0点进行切分,切分后会生成一个新的event.log文件,同时昨天的日志文件将会被重命名为event.log.yyyy-mm-dd的格式,我们无法确保gf对昨天的日志已经分析完毕,当gf感知到日志文件有变化时,我们还需要检查昨日的分析是否还有残留:
import datetime
def change_file_logic(ctx):
# 获取昨日的日志名
yesterday = datetime.date.today() - datetime.timedelta(days=1)
return "event.log.%s" % yesterday.strftime("%Y-%m-%d")
...
TextR(
filepath="event.log",
record_matcher="line",
record_handler=record_handler,
pointer="test_pointer",
change_file_logic=change_file_logic
)
经过这样的配置,text_reader
在读取event.log的时候会先检查test_pointer中记录的inode值与event.log的inode值是否一致,如果不一致,那么会执行change_file_logic函数,获取被重命名的旧日志名称,text_reader
内部维护了一个分析队列,接下来会将这两个日志文件名称加入到分析队列,先分析旧的,再分析新的,等分析完新的,会更新text_pointer中的inode值。
这样就保证了不会漏分析日志数据。
自定义记录匹配
日志分析并不总是一行一条记录这样一帆风顺,很多时候会遇到一些复杂的格式,比如对于错误日志的分析:
2016-03-28 16:17:58,244 - girlfriend - INFO - 工作流开始执行,起始点为 'read_text'
2016-03-28 16:17:58,244 - girlfriend - INFO - 开始执行工作单元 read_text [job]
2016-03-28 16:17:58,245 - girlfriend - ERROR - 系统错误,工作流被迫中止
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/girlfriend-0.0.12-py2.7.egg/girlfriend/workflow/gfworkflow.py", line 576, in execute
last_result = current_unit.execute(ctx)
...
因为要包含堆栈,其中的错误记录是多行的。read_text
插件提供了record_filter参数对记录进行过滤,我们可以使用这个参数来忽略堆栈信息,比如:
import re
from girlfriend.workflow.gfworkflow import Job
from girlfriend.plugin.text import TextR
# 只处理以时间开头的行
RECORD_PATTERN = re.compile(r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}")
def workflow(options):
work_units = (
# read_text
Job(
name="read_text",
plugin="read_text",
args=[
TextR(
filepath="error.log",
record_matcher="line",
record_filter=lambda line: RECORD_PATTERN.search(line),
)
]
),
Job(
name="print_lines",
caller=print_lines
),
)
return work_units
def print_lines(ctx):
for line in ctx["read_text.result"][0]:
print line
运行后可以看到,打印后的结果不包含堆栈。但是,当我们确实需要收集这些堆栈信息的时候,该怎么做?
这个时候就需要去自定义记录匹配器了。顾名思义,记录匹配器的作用是描述日志中什么样的格式算是一条可以操作的记录,对于ERROR级别,ERROR行和底下的多行堆栈共同构成了一条记录,而对于其他Level,一行文本就是一条记录。
因此我们定义如下记录匹配器:
def logger_record_matcher(ctx):
if "ERROR" in ctx.current_line:
tb_lines = ctx.prepare_read(lambda line: re.search(r"^\d{4}-\d{2}-\d{2}"))
ctx.add()
ctx.add(tb_lines)
ctx.end()
elif "INFO" in ctx.current_line:
ctx.add()
ctx.end()
def record_handler(line_buffer):
# 此时record_handler接受的参数是一个列表,可能包含了一个字符串,也可能包含了其它对象。
...
...
TextR(
filepath="error.log",
record_matcher=logger_record_matcher,
record_handler=record_handler
)
...
record_matcher函数接受了一个TextRecordContext对象作为参数,这个对象描述了一次记录匹配的上下文,首先通过ctx.current_line属性获取当前文件指针所在的行,然后判断该行的类型,ERROR记录还是INFO记录。
如果是INFO类型记录,则直接调用ctx.add方法,当该方法不接受任何参数时,会把当前的行,即ctx.current_line,加入到一个record_line_buffer的列表对象中,这将表明该行作为记录的一部分,然后再调用ctx.end方法,该方法表示已经完成了一条记录的收集,同时会记录文件指针,record_line_buffer会传递给record_handler函数进行映射处理后清空,再进行接下来的分析处理。
如果是ERROR类型记录,则会调用一个prepare_read方法,该方法用于试探性的“预读”操作,直到遇到指定特征的行为止。这里会将所有的堆栈行读到tb_lines列表中,预读操作会开启一个新的文件句柄,不会记录任何文件指针,也不会影响ctx.current_line的值,只有当调用end方法的时候才会记录文件指针。这里调用了两次add方法,第一次是将当前包含ERROR的行加入到buffer,第二次是将收集到的堆栈行加入到line_buffer,传递给record_handler的参数将会是[error_line, [traceback_lines...]]的形式
其它说明
可以使用read_text
来分析比内存大的文件,read_text
不会将整个文件加载到内存进行分析,而是根据文件指针进行seek之后按行进行读取,并且每条记录在处理完毕后会释放,但是分析处理后的结果不能超过内存大小,因为结果是保存到上下文对象当中的。如果结果超出内存大小,那么需要考虑其它方式,比如将分析结果保存到数据库等等。