前记
tail是一个常用的Linux命令, 它可以打印文件的后面n行数据, 也能实时输出文件的追加数据.
tail的实现很简单,但是要实现一个完善的tail却需要考虑很多细节,如果要注重性能,则需要引入一些其他的机制.
一开始只是为了单纯的实现Linux的tail
的基本功能,后面随着需要对日志文件的高性能读取则需要的Linux的inotify
机制去完善.
相关源码见:https://github.com/so1n/example/tree/master/example_python/example_python/tail
1.第一版–从文件尾部读取实时数据
主要思路是: 打开文件, 把指针移动到文件最后, 然后有数据则输出数据, 无数据则休眠一段时间.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import time import sys
from typing import Callable, NoReturn
class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval
def __call__(self): with open(self.file_name) as f: f.seek(0, 2) while True: line: str = f.readline() if line: self.output(line) else: time.sleep(self.interval)
if __name__ == '__main__': filename: str = sys.argv[0] Tail(filename)()
|
之后只要做如下调用即可:
2.第二版–实现tail -f
tail -f
默认先读取最后10行数据,再从文件尾部读取实时数据.如果对于小文件,可以先读取所有文件内容,并输出最后10行, 但是读取全文再获取最后10行的性能不高, 而从后滚10行的边界条件也很复杂, 先看先读取全文再获取最后10行的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import time import sys
from typing import Callable, NoReturn
class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval
def __call__(self): with open(self.file_name) as f: self.read_last_line(f) while True: line: str = f.readline() if line: self.output(line) else: time.sleep(self.interval)
def read_last_line(self, f): last_lines = f.readlines()[-10:] for line in last_lines: self.output(line)
if __name__ == '__main__': filename: str = sys.argv[0] Tail(filename)()
|
可以看到实现很简单, 相比第一版只多了个read_last_line的函数
, 接下来就要解决性能的问题了, 当文件很大的时候, 这个逻辑是不行的, 特别是有些日志文件经常有几个G大, 如果全读出来内存就爆了. 而在Linux系统中, 没有一个接口可以指定指针跳到倒数10行, 只能使用如下方法来模拟输出倒数10行:
- 首先游标跳转到最新的字符, 保存当前游标, 然后预估一行数据的字符长度, 最好偏多, 这里我按1024字符长度为一行来处理
- 然后利用seek的方法,跳转到seek(-1024 * 10, 2)的字符, 这就是我们预估的倒数10行内的内容
- 接着对内容进行判断, 如果跳转的字符长度小于 10 * 1024, 则证明整个文件没有10行, 则采用原来的
read_last_line
方法.
- 如果跳转到字符长度等于1024 * 10, 则利用换行符计算已取字符长度共有多少行,如果行数大于10,那只输出最后10行,如果只读了4行,则继续读6*1024,直到读满10行为止
通过以上步奏, 就把倒数10行的数据计算好了可以打印出来, 可以进入追加数据了, 但是这时候文件内容可能发生改变了, 我们的游标也发生改变了, 这时候要把游标跳回到刚才保存的游标,防止漏打或者重复打印数据.
分析完毕后, 就可以开始重构read_last_line
函数了.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| import time import sys
from typing import Callable, List, NoReturn
class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1, len_line: int = 1024 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval self.len_line: int = len_line
def __call__(self, n: int = 10): with open(self.file_name) as f: self.read_last_line(f, n) while True: line: str = f.readline() if line: self.output(line) else: time.sleep(self.interval)
def read_last_line(self, file, n): read_len: int = self.len_line * n
file.seek(0, 2) now_tell: int = file.tell() while True: if read_len > file.tell(): file.seek(0) last_line_list: List[str] = file.read().split('\n')[-n:] now_tell: int = file.tell() break file.seek(-read_len, 2) read_str: str = file.read(read_len) cnt: int = read_str.count('\n') if cnt >= n: last_line_list: List[str] = read_str.split('\n')[-n:] break else: if cnt == 0: line_per: int = read_len else: line_per: int = int(read_len / cnt) read_len = line_per * n
for line in last_line_list: self.output(line + '\n') file.seek(now_tell)
if __name__ == '__main__': import argparse
parser = argparse.ArgumentParser() parser.add_argument("-f", "--filename") parser.add_argument("-n", "--num", default=10) args, unknown = parser.parse_known_args() if not args.filename: raise RuntimeError('filename args error') Tail(args.filename)(int(args.num))
|
3.第三版–优雅的读取输出日志文件
可以发现实时读取那块的逻辑性能还是很差, 如果每秒读一次文件,实时性就太慢了,把间隔改小了,则处理器占用太多. 性能最好的情况是如果能得知文件更新再进行打印文件, 那性能就能得到保障了.庆幸的是,在Linux中inotify
提供了这样的功能.
此外,日志文件有一个特点就是会进行logrotate,如果日志被logrotate了,那我们就需要重新打开文件,并进一步读取数据, 这种情况也可以利用到inotify
, 当inotify
获取到文件重新打开的事件时,我们就重新打开文件,再读取.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
| import os import sys
from typing import Callable, List, NoReturn
import pyinotify
multi_event = pyinotify.IN_MODIFY | pyinotify.IN_MOVE_SELF
class InotifyEventHandler(pyinotify.ProcessEvent): """ 执行inotify event的封装 """ f: 'open()' filename: str path: str wm: 'pyinotify.WatchManager' output: Callable
def my_init(self, **kargs): """pyinotify.ProcessEvent要求不能直接继承__init__, 而是要重写my_init, 我们重写这一段并进行初始化"""
filename: str = kargs.pop('filename') if not os.path.exists(filename): raise RuntimeError('Not Found filename') if '/' not in filename: filename = os.getcwd() + '/' + filename index = filename.rfind('/') if index == len(filename) - 1 or index == -1: raise RuntimeError('Not a legal path')
self.f = None self.filename = filename self.output: Callable = kargs.pop('output') self.wm = kargs.pop('wm') self.path = filename[:index] self.wm.add_watch(self.path, multi_event)
def read_line(self): """统一的输出方法""" for line in self.f.readlines(): self.output(line)
def process_IN_MODIFY(self, event): """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生变化, 进行文件读取""" if event.pathname == self.filename: self.read_line()
def process_IN_MOVE_SELF(self, event): """必须为process_事件名称,event表示事件对象, 这里表示监控到文件发生重新打开, 进行文件读取""" if event.pathname == self.filename: self.f.close() self.f = open(self.filename) self.read_line()
def __enter__(self) -> 'InotifyEventHandler': self.f = open(self.filename) return self
def __exit__(self, exc_type, exc_val, exc_tb): self.f.close()
class Tail(object): def __init__( self, file_name: str, output: Callable[[str], NoReturn] = sys.stdout.write, interval: int = 1, len_line: int = 1024 ): self.file_name: str = file_name self.output: Callable[[str], NoReturn] = output self.interval: int = interval self.len_line: int = len_line
wm = pyinotify.WatchManager() inotify_event_handler = InotifyEventHandler( **dict(filename=file_name, wm=wm, output=output) ) wm.add_watch('/tmp', multi_event) self.notifier = pyinotify.Notifier(wm, inotify_event_handler) self.inotify_event_handle: 'InotifyEventHandler' = inotify_event_handler
def __call__(self, n: int = 10): """通过inotify的with管理打开文件""" with self.inotify_event_handle as i: self.read_last_line(i.f, n) self.notifier.loop()
def read_last_line(self, file, n): read_len: int = self.len_line * n
file.seek(0, 2) now_tell: int = file.tell() while True: if read_len > file.tell(): file.seek(0) last_line_list: List[str] = file.read().split('\n')[-n:] now_tell: int = file.tell() break file.seek(-read_len, 2) read_str: str = file.read(read_len) cnt: int = read_str.count('\n') if cnt >= n: last_line_list: List[str] = read_str.split('\n')[-n:] break else: if cnt == 0: line_per: int = read_len else: line_per: int = int(read_len / cnt) read_len = line_per * n
for line in last_line_list: self.output(line + '\n') file.seek(now_tell)
if __name__ == '__main__': import argparse
parser = argparse.ArgumentParser() parser.add_argument("-f", "--filename") parser.add_argument("-n", "--num", default=10) args, unknown = parser.parse_known_args() if not args.filename: raise RuntimeError('filename args error') Tail(args.filename)(int(args.num))
|
可以看到, 从原本的open打开文件改为用inotify打开文件(这时候会调用my_init方法进行初始化), 打开后还是运行我们打开原来n行的代码, 然后就交给inotify运行. 在inotify运行之前, 我们把重新打开文件方法和打印文件方法都挂载在inotifiy对应的事件里, 之后inotify运行时, 会根据对应的事件执行对应的方法.