使用python实现tail

本文总阅读量

前记

tail是一个常用的Linux命令, 它可以打印文件的后面n行数据, 也能实时输出文件的追加数据.
tail的实现很简单,但是要实现一个完善的tail却需要考虑很多细节,如果要注重性能,则需要引入一些其他的机制.

一开始只是为了单纯的实现Linux的tail的基本功能,后面随着需要对日志文件的高性能读取则需要的Linux的inotify机制去完善.
相关源码见:https://github.com/so1n/example/tree/master/example_python/example_python/tail

1.第一版–从文件尾部读取实时数据

主要思路是: 打开文件, 把指针移动到文件最后, 然后有数据则输出数据, 无数据则休眠一段时间.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import time
import sys

from typing import Callable, NoReturn


class Tail(object):
def __init__(
self,
file_name: str,
output: Callable[[str], NoReturn] = sys.stdout.write,
interval: int = 1
):
self.file_name: str = file_name
self.output: Callable[[str], NoReturn] = output
self.interval: int = interval

def __call__(self):
with open(self.file_name) as f:
f.seek(0, 2) # 从文件结尾处开始seek
while True:
line: str = f.readline()
if line:
self.output(line) # 使用print都会每次都打印新的一行
else:
time.sleep(self.interval)


if __name__ == '__main__':
filename: str = sys.argv[0]
Tail(filename)()

之后只要做如下调用即可:

1
python xxx.py filename 

2.第二版–实现tail -f

tail -f默认先读取最后10行数据,再从文件尾部读取实时数据.如果对于小文件,可以先读取所有文件内容,并输出最后10行, 但是读取全文再获取最后10行的性能不高, 而从后滚10行的边界条件也很复杂, 先看先读取全文再获取最后10行的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import time
import sys

from typing import Callable, NoReturn


class Tail(object):
def __init__(
self,
file_name: str,
output: Callable[[str], NoReturn] = sys.stdout.write,
interval: int = 1
):
self.file_name: str = file_name
self.output: Callable[[str], NoReturn] = output
self.interval: int = interval

def __call__(self):
with open(self.file_name) as f:
self.read_last_line(f)
while True:
line: str = f.readline()
if line:
self.output(line) # 使用print都会每次都打印新的一行
else:
time.sleep(self.interval)

def read_last_line(self, f):
last_lines = f.readlines()[-10:]
for line in last_lines:
self.output(line)

if __name__ == '__main__':
filename: str = sys.argv[0]
Tail(filename)()

可以看到实现很简单, 相比第一版只多了个read_last_line的函数, 接下来就要解决性能的问题了, 当文件很大的时候, 这个逻辑是不行的, 特别是有些日志文件经常有几个G大, 如果全读出来内存就爆了. 而在Linux系统中, 没有一个接口可以指定指针跳到倒数10行, 只能使用如下方法来模拟输出倒数10行:

  • 首先游标跳转到最新的字符, 保存当前游标, 然后预估一行数据的字符长度, 最好偏多, 这里我按1024字符长度为一行来处理
  • 然后利用seek的方法,跳转到seek(-1024 * 10, 2)的字符, 这就是我们预估的倒数10行内的内容
  • 接着对内容进行判断, 如果跳转的字符长度小于 10 * 1024, 则证明整个文件没有10行, 则采用原来的read_last_line方法.
  • 如果跳转到字符长度等于1024 * 10, 则利用换行符计算已取字符长度共有多少行,如果行数大于10,那只输出最后10行,如果只读了4行,则继续读6*1024,直到读满10行为止

通过以上步奏, 就把倒数10行的数据计算好了可以打印出来, 可以进入追加数据了, 但是这时候文件内容可能发生改变了, 我们的游标也发生改变了, 这时候要把游标跳回到刚才保存的游标,防止漏打或者重复打印数据.

分析完毕后, 就可以开始重构read_last_line函数了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import time
import sys

from typing import Callable, List, NoReturn


class Tail(object):
def __init__(
self,
file_name: str,
output: Callable[[str], NoReturn] = sys.stdout.write,
interval: int = 1,
len_line: int = 1024
):
self.file_name: str = file_name
self.output: Callable[[str], NoReturn] = output
self.interval: int = interval
self.len_line: int = len_line

def __call__(self, n: int = 10):
with open(self.file_name) as f:
self.read_last_line(f, n)
while True:
line: str = f.readline()
if line:
self.output(line) # 使用print都会每次都打印新的一行
else:
time.sleep(self.interval)

def read_last_line(self, file, n):
read_len: int = self.len_line * n

# 跳转游标到最后
file.seek(0, 2)
# 获取当前结尾的游标位置
now_tell: int = file.tell()
while True:
if read_len > file.tell():
# 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
file.seek(0) # 由于read方法是按照游标进行打印, 所以要重置游标
last_line_list: List[str] = file.read().split('\n')[-n:]
# 重新获取游标位置
now_tell: int = file.tell()
break
# 跳转到我们预估的字符位置
file.seek(-read_len, 2)
read_str: str = file.read(read_len)
cnt: int = read_str.count('\n')
if cnt >= n:
# 如果获取的行数大于要求的行数,则获取前n行的行数
last_line_list: List[str] = read_str.split('\n')[-n:]
break
else:
# 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
if cnt == 0:
line_per: int = read_len
else:
line_per: int = int(read_len / cnt)
read_len = line_per * n

for line in last_line_list:
self.output(line + '\n')
# 重置游标,确保接下来打印的数据不重复
file.seek(now_tell)


if __name__ == '__main__':
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("-f", "--filename")
parser.add_argument("-n", "--num", default=10)
args, unknown = parser.parse_known_args()
if not args.filename:
raise RuntimeError('filename args error')
Tail(args.filename)(int(args.num))

3.第三版–优雅的读取输出日志文件

可以发现实时读取那块的逻辑性能还是很差, 如果每秒读一次文件,实时性就太慢了,把间隔改小了,则处理器占用太多. 性能最好的情况是如果能得知文件更新再进行打印文件, 那性能就能得到保障了.庆幸的是,在Linux中inotify提供了这样的功能.
此外,日志文件有一个特点就是会进行logrotate,如果日志被logrotate了,那我们就需要重新打开文件,并进一步读取数据, 这种情况也可以利用到inotify, 当inotify获取到文件重新打开的事件时,我们就重新打开文件,再读取.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import os
import sys

from typing import Callable, List, NoReturn

import pyinotify

multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF # 监控多个事件


class InotifyEventHandler(pyinotify.ProcessEvent): # 定制化事件处理类,注意继承
"""
执行inotify event的封装
"""
f: 'open()'
filename: str
path: str
wm: 'pyinotify.WatchManager'
output: Callable

def my_init(self, **kargs):
"""pyinotify.ProcessEvent要求不能直接继承__init__, 而是要重写my_init, 我们重写这一段并进行初始化"""

# 获取文件
filename: str = kargs.pop('filename')
if not os.path.exists(filename):
raise RuntimeError('Not Found filename')
if '/' not in filename:
filename = os.getcwd() + '/' + filename
index = filename.rfind('/')
if index == len(filename) - 1 or index == -1:
raise RuntimeError('Not a legal path')

self.f = None
self.filename = filename
self.output: Callable = kargs.pop('output')
self.wm = kargs.pop('wm')
# 只监控路径,这样就能知道文件是否移动
self.path = filename[:index]
self.wm.add_watch(self.path, multi_event)

def read_line(self):
"""统一的输出方法"""
for line in self.f.readlines():
self.output(line)

def process_IN_MODIFY(self, event):
"""必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生变化, 进行文件读取"""
if event.pathname == self.filename:
self.read_line()

def process_IN_MOVE_SELF(self, event):
"""必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生重新打开, 进行文件读取"""
if event.pathname == self.filename:
# 检测到文件被移动重新打开文件
self.f.close()
self.f = open(self.filename)
self.read_line()

def __enter__(self) -> 'InotifyEventHandler':
self.f = open(self.filename)
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.f.close()


class Tail(object):
def __init__(
self,
file_name: str,
output: Callable[[str], NoReturn] = sys.stdout.write,
interval: int = 1,
len_line: int = 1024
):
self.file_name: str = file_name
self.output: Callable[[str], NoReturn] = output
self.interval: int = interval
self.len_line: int = len_line

wm = pyinotify.WatchManager() # 创建WatchManager对象
inotify_event_handler = InotifyEventHandler(
**dict(filename=file_name, wm=wm, output=output)
) # 实例化我们定制化后的事件处理类, 采用**dict传参数
wm.add_watch('/tmp', multi_event) # 添加监控的目录,及事件
self.notifier = pyinotify.Notifier(wm, inotify_event_handler) # 在notifier实例化时传入,notifier会自动执行
self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler

def __call__(self, n: int = 10):
"""通过inotify的with管理打开文件"""
with self.inotify_event_handle as i:
# 先读取指定的行数
self.read_last_line(i.f, n)
# 启用inotify的监听
self.notifier.loop()

def read_last_line(self, file, n):
read_len: int = self.len_line * n

# 获取当前结尾的游标位置
file.seek(0, 2)
now_tell: int = file.tell()
while True:
if read_len > file.tell():
# 如果跳转的字符长度大于原来文件长度,那就把所有文件内容打印出来
file.seek(0)
last_line_list: List[str] = file.read().split('\n')[-n:]
# 重新获取游标位置
now_tell: int = file.tell()
break
file.seek(-read_len, 2)
read_str: str = file.read(read_len)
cnt: int = read_str.count('\n')
if cnt >= n:
# 如果获取的行数大于要求的行数,则获取前n行的行数
last_line_list: List[str] = read_str.split('\n')[-n:]
break
else:
# 如果获取的行数小于要求的行数,则预估需要获取的行数,继续获取
if cnt == 0:
line_per: int = read_len
else:
line_per: int = int(read_len / cnt)
read_len = line_per * n

for line in last_line_list:
self.output(line + '\n')
# 重置游标,确保接下来打印的数据不重复
file.seek(now_tell)


if __name__ == '__main__':
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("-f", "--filename")
parser.add_argument("-n", "--num", default=10)
args, unknown = parser.parse_known_args()
if not args.filename:
raise RuntimeError('filename args error')
Tail(args.filename)(int(args.num))

可以看到, 从原本的open打开文件改为用inotify打开文件(这时候会调用my_init方法进行初始化), 打开后还是运行我们打开原来n行的代码, 然后就交给inotify运行. 在inotify运行之前, 我们把重新打开文件方法和打印文件方法都挂载在inotifiy对应的事件里, 之后inotify运行时, 会根据对应的事件执行对应的方法.

查看评论