前记 Gunicorn
是一个基于Python
实现的动态Web服务器,它通过Pre-Worker模型来实现并发,本身带有多种工作模式,基本上可以与所有基于Python
的Web框架集成,并为他们带来一个多功能又稳定的服务器托管核心。
从学习Python
Web的第一天就开始接触了Gunicorn
,那时候还不知道他具体的作用是什么, 只知道在项目中使用他运行之后可以变得十分的稳定,高性能,从未研究它是如何实现的。随着使用时间的增长,越来越想知道它的运行原理是什么,特别是它性能为何会高,跟类Unix有什么特殊结合,Pre-Worker
模型是如何通信的,如何设计一个比较好的Pre-Worker
模型服务器? 于是,本文就是阅读源码之后的产物。
1.简单了解 通过Gunicorn代码仓库 把代码拉到本地,使用Tree
命令可以简单的看到Gunicorn
的代码结构,可以看出文件数量并不是很多,以下是Gunicorn
的代码结构以及他们每个文件夹或者每个文件的说明:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 ├── app ├── http ├── instrument ├── workers ├── __init__.py ├── __main__.py ├── arbiter.py ├── config.py ├── debug.py ├── errors.py ├── glogging.py ├── pidfile.py ├── reloader.py ├── sock.py ├── systemd.py └── util.py
2.开始入手 了解完代码结构后接着从官方文档的示例开始入手:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 $ pip install gunicorn $ cat myapp.py def app(environ, start_response): data = b"Hello, World!\n" start_response("200 OK" , [ ("Content-Type" , "text/plain" ), ("Content-Length" , str(len(data))) ]) return iter([data]) $ gunicorn -w 4 myapp:app [2014-09-10 10:22:28 +0000] [30869] [INFO] Listening at: http://127.0.0.1:8000 (30869) [2014-09-10 10:22:28 +0000] [30869] [INFO] Using worker: sync [2014-09-10 10:22:28 +0000] [30874] [INFO] Booting worker with pid: 30874 [2014-09-10 10:22:28 +0000] [30875] [INFO] Booting worker with pid: 30875 [2014-09-10 10:22:28 +0000] [30876] [INFO] Booting worker with pid: 30876 [2014-09-10 10:22:28 +0000] [30877] [INFO] Booting worker with pid: 30877
这个示例演示了如何通过命令行来使Gunicorn
来运行一个最小的WSGI
Web应用,如果熟悉Python
的打包规则的话,可以知道这个命令中的gunicorn
实际上是在setup.py
文件中定义好的, 重新打开仓库,找到setup.py
:
1 2 3 4 5 6 7 8 9 from setuptools import setup, find_packages setup( entry_points=""" [console_scripts] gunicorn=gunicorn.app.wsgiapp:run """ , )
这里移除了其它部分的代码,只保留了相关的代码,在这段代码中,指定了gunicorn
的命令是命令gunicorn.app.wsgiapp:run
的别名,所以这段代码实际上执行了gunicorn.app.wsgiapp
文件中的run
函数:
1 2 3 def run (): from gunicorn.app.wsgiapp import WSGIApplication WSGIApplication("%(prog)s [OPTIONS] [APP_MODULE]" ).run()
而run
函数的运行逻辑也是很简单, 它是直接实例化一个承于gunicorn.app.base
文件的BaseApplication
的WSGIApplication
类, 这个类在实例化时会执行它的do_load_config
方法,也就是在这时候会初始化实例的cfg
变量,并调用cfg.parse
,其中这个实例是config.Config
的实例化,而parse
方法是用来解析用户传入的参数并供后续的Arbiter
以及Worker
使用。
3.加载配置 一般项目中的加载配置是没有什么可以说的, 但是Gunicorn
比较特殊, 在config
文件中除了Config
这个类和一些校验方法外,还存在大量类似于:
1 2 3 4 5 6 7 8 9 10 11 12 13 class WorkerConnections (Setting ): name = "worker_connections" section = "Worker Processes" cli = ["--worker-connections" ] meta = "INT" validator = validate_pos_int type = int default = 1000 desc = """\ The maximum number of simultaneous clients. This setting only affects the Eventlet and Gevent worker types. """
的类, 这些类就是Gunicorn
支持的参数或配置值以及它对应的类型,校验规则,默认值和帮助文档等,这些类都按照一定的规范进行编写,同时又继承于Setting
这个类,而Setting
这个类又继承了SettingMeta
,而SettingMeta
的作用就是在程序运行的时候,把所有继承于Setting
的类加入到config
文件的local
变量中,供Gunicorn
来使用。
Gunicorn
在启动时通过config
模块把用户传入的命令进行初始化, 再通过环境变量来初始化配置,此时的配置会覆盖掉用户传入命令初始化时的配置,接着在判断用户是否有指定配置文件,如果有就加载配置文件的配置,再覆盖已经存在的配置。 也就是说,配置的优先级是配置文件最高,然后环境变量次之,用户通过命令行传入的配置最后会被配置文件的配置所覆盖, 但是读取配置文件路径的优先级是命令行最高,然后才是环境变量。
4.Gunicorn的核心–Arbiter.run Application
实例化完成后,会调用BaseApplication.run
方法, 该方法如下:
1 2 3 4 5 6 7 def run (self ): try : Arbiter(self).run() except RuntimeError as e: print("\nError: %s\n" % e, file=sys.stderr) sys.stderr.flush() sys.exit(1 )
该方法会执行gunicorn.arbiter
文件的Arbiter
类,这个类是gunicorn
的核心类,负责启动和管理所有运行的worker,而Arbiter
的run
方法则是核心中的核心,负责着整个服务的运行管理,同时又跟Arbiter
的其它方法有交互,所以可以通过分析run
方法进行分析,进而纵观整个Arbiter
类, run
的方法如下(具体说明见注释,其它具体分析见小章节):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 def run (self ): "Main master loop." self.start() util._setproctitle("master [%s]" % self.proc_name) try : self.manage_workers() while True : self.maybe_promote_master() sig = self.SIG_QUEUE.pop(0 ) if self.SIG_QUEUE else None if sig is None : self.sleep() self.murder_workers() self.manage_workers() continue if sig not in self.SIG_NAMES: self.log.info("Ignoring unknown signal: %s" , sig) continue signame = self.SIG_NAMES.get(sig) handler = getattr (self, "handle_%s" % signame, None ) if not handler: self.log.error("Unhandled signal: %s" , signame) continue self.log.info("Handling signal: %s" , signame) handler() self.wakeup() except (StopIteration, KeyboardInterrupt): self.halt() except HaltServer as inst: self.halt(reason=inst.reason, exit_status=inst.exit_status) except SystemExit: raise except Exception: self.log.info("Unhandled exception in main loop" , exc_info=True ) self.stop(False ) if self.pidfile is not None : self.pidfile.unlink() sys.exit(-1 )
4.1初始化–Arbiter.start Arbiter.start
方法的第一步是进行初始化,首先是判断该进程是否是另一个进程启动的,这里是Gunicorn
的Upgrading to a new binary on the fly 功能,将在后面统一说明,然后他通过init_signals
方法初始化信号的回调, init_signals
的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def init_signals (self ): """\ Initialize master signal handling. Most of the signals are queued. Child signals only wake up the master. """ for p in self.PIPE: os.close(p) self.PIPE = pair = os.pipe() for p in pair: util.set_non_blocking(p) util.close_on_exec(p) self.log.close_on_exec() for s in self.SIGNALS: signal.signal(s, self.signal) signal.signal(signal.SIGCHLD, self.handle_chld)def signal (self, sig, frame ): if len (self.SIG_QUEUE) < 5 : self.SIG_QUEUE.append(sig) self.wakeup()
这个方法做了两件事:
1.首先是初始化PIPE
, Gunicorn
的Master
是一个一直在循环的单进程,每次循环会sleep
一秒防止空转,通过PIPE
可以使Master
进程从sleep
阶段提前唤醒。PIPE
在初始化时会先通过util.set_non_blocking
方法来设置不阻塞来防止PIPE
收到信号时,阻塞到Master
进程的主流程。
然后通过util.close_on_exec
来关闭子进程无用的描述符,之所以要这样处理是因为Gunicorn
采用了Pre-Worker
的模型,在运行的时候Master
进程会通过fork
的方法来创建worker
进程,fork
出来的子进程是通过写时复制来获得父进程的数据的,当子进程在exec
阶段时就会创建一份新的资源引用,此时保存原来文件描述符的变量当然也不存在了,也就无法关闭无用的文件描述符了, 这意味着对于某个文件描述符多了一个引用,而Linux
的文件描述符是等到都没有引用的时候才会删掉, 所以我们在使用的时候都会期望在fork
阶段后,exec
阶段之前直接关掉无用的物件描述符,这时候就可以用到Linux
的close-on-exec
功能。
在初始化文件描述符后, 通过直接对文件描述符设置FD_CLOEXEC
标记, 这样在fork
阶段后exec
阶段前,Linux
会自动关掉无用的文件描述符, 在Gunicorn
中,会对所有文件描述符都调用util.close_on_exec
。
2.初始化真正的信号监听, 注册了Gunicortn
所有会用到的信号回调,该回调会把信息注册到一个名为SIG_QUEUE
的队列中,同时会执行wakeup
来唤醒worker
的循环(将在run中的sleep介绍这个机制是怎么实现的)
在完成了信号监听的注册后, 会开始创建sock
,Gunicorn
为各种类型的sock
做了统一的封装, 它会判断用户配置要监听的IP
端口来挑选一个合适的sock
,然后进行初始化,其中最重要的初始化方法就是BaseCocket
的set_options
方法, 它的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class BaseSocket (object ): def __init__ (self, address, conf, log, fd=None ): ... self.sock = self.set_options(sock, bound=bound) def set_options (self, sock, bound=False ): sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) if (self.conf.reuse_port and hasattr (socket, 'SO_REUSEPORT' )): try : sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1 ) except socket.error as err: if err.errno not in (errno.ENOPROTOOPT, errno.EINVAL): raise if not bound: self.bind(sock) sock.setblocking(0 ) if hasattr (sock, "set_inheritable" ): sock.set_inheritable(True ) sock.listen(self.conf.backlog) return sock def bind (self, sock ): sock.bind(self.cfg_addr)
在Gunicorn
中,它创建的所有Scoket
都是继承于gunicorn.sock.BaseSocket
,所以Gunicorn
在创建scoket
后会调用set_options
方法,这个方法默认会设置SO_REUSEADDR
标记,然后再依赖配置设置SO_REUSEPORT
标记, 设置SO_REUSEPORT
标记可以解决部分惊群问题,同时也能解决不同进程收到请求的负载均衡问题,但是会带来响应请求的延迟,所以Gunicorn
将这个配置设置为可选项(关于这几个参数以及惊群问题会另开文章说明)。 接着Gunicorn
会调用sock.bind
且设置scoket
是不阻塞的,再进行监听,并返回给Master
。
Gunicorn
之所以这样做是因为Gunicorn
是Pre-Worker
模型的,在这个模型中, 所有的scoket
都是由Master
进程创建并监听,然后在通过fork
子进程的时候把scoket
传递给子进程,然后子进程可以通过该scoket
可以进行accept
获取到对应的请求。
4.2.管理woeker数量–manage_workers 在Gunicorn
的Arbiter
中,通过manage_workers
方法对Worker
进行管理, 当前运行的Worker
数量不满足与用户指定的数量时,会通过spawn_workers
来调用spawn_worker
创建Worker
,spawn_workers
是批量创建一批Worker
,而spawn_worker
是每次调用创建一个Worker
,他们的代码十分简单, 具体说明见注释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 def spawn_worker (self ): self.worker_age += 1 worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS, self.app, self.timeout / 2.0 , self.cfg, self.log) self.cfg.pre_fork(self, worker) pid = os.fork() if pid != 0 : worker.pid = pid self.WORKERS[pid] = worker return pid for sibling in self.WORKERS.values(): sibling.tmp.close() worker.pid = os.getpid() try : util._setproctitle("worker [%s]" % self.proc_name) self.log.info("Booting worker with pid: %s" , worker.pid) self.cfg.post_fork(self, worker) worker.init_process() sys.exit(0 ) except SystemExit: raise except AppImportError as e: self.log.debug("Exception while loading the application" , exc_info=True ) print("%s" % e, file=sys.stderr) sys.stderr.flush() sys.exit(self.APP_LOAD_ERROR) except Exception: self.log.exception("Exception in worker process" ) if not worker.booted: sys.exit(self.WORKER_BOOT_ERROR) sys.exit(-1 ) finally : self.log.info("Worker exiting (pid: %s)" , worker.pid) try : worker.tmp.close() self.cfg.worker_exit(self, worker) except Exception: self.log.warning("Exception during worker exit:\n%s" , traceback.format_exc())def spawn_workers (self ): for _ in range (self.num_workers - len (self.WORKERS)): self.spawn_worker() time.sleep(0.1 * random.random())
在这里面中有两个注意点,第一个点是在spawn_worker
中,会先初始化worker
,接着在fork
出子进程运行代码部分会先执行worker.init_process
最后调用sys.exit(0)
退出,所以我们最好不要在worker.__init__
里面初始化数据,因为这部分是在Master
进程中执行的。我们应该在worker.init_process
中执行初始化,并在初始化代码执行后调用super().init_process()
,如下:
1 2 3 def init_process (self ): super ().init_process()
这样做是因为BaseWorker
会在自己的init_process
方法中执行一些通用的初始化功能,然后再调用BaseWorker.run
来运行Worker
。
4.3.核心循环与信号处理 在经过manage_workers
后,Worker
都已经以子进程的形式在运行了,但是Master
进程还需要处理一些家务事,比如管理Worker
进程是否存活以及用户在运行时指派给Master
的一些工作等,而这些将通过信号来进行交互。
这个阶段的Master
以一个循环不断的跑着, 如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 while True : self.maybe_promote_master() sig = self.SIG_QUEUE.pop(0 ) if self.SIG_QUEUE else None if sig is None : self.sleep() self.murder_workers() self.manage_workers() continue if sig not in self.SIG_NAMES: self.log.info("Ignoring unknown signal: %s" , sig) continue signame = self.SIG_NAMES.get(sig) handler = getattr (self, "handle_%s" % signame, None ) if not handler: self.log.error("Unhandled signal: %s" , signame) continue self.log.info("Handling signal: %s" , signame) handler() self.wakeup()
它的运行逻辑很简单,首先是判断当前进程是不是真的主进程,如果是将晋升为主进程,然后就是获取信号,如果当前信号队列有信号存在就取最前的一个并执行对应的信号对用以及调用wakeup
方法,防止下次循环还在睡眠;如果获取不到信号,就先休眠1秒,然后清除超时的Worker
最后执行manage_worker
并进入下个循环。
在Gunicorn
中,它确保每个循环只执行一次操作,确保该循环要不就执行信号回调,要不就执行Worker
管理,同时它还确保执行信号回调的优先级是最高的,但是Gunicorn
对传入的信号的数量也有限制,通过4.1.初始化--Arbiter.start
的源码:
1 2 3 4 def signal (self, sig, frame ): if len (self.SIG_QUEUE) < 5 : self.SIG_QUEUE.append(sig) self.wakeup()
中可以知道,Gunicorn
只允许最多有5个信号在队列中,防止同一时刻执行太多了信号处理,同时可以看到在正常接收信号后会执行wakeup
函数:
1 2 3 4 5 6 def wakeup (self ): try : os.write(self.PIPE[1 ], b'.' ) except IOError as e: if e.errno not in [errno.EAGAIN, errno.EINTR]: raise
这个函数十分简单,就是往管道PIPE
写入了一个字节,这样就能快速唤醒Gunicorn
继续运行循环,不会停留在sleep
阶段。 单看wakeup
函数是无法理解它为啥能唤醒Gunicorn
的主循环的, 需要结合Arbiter.sleep
源码来了解Gunicorn
为何这样设计。
通常情况下,我们都会使用Python
的标准函数time.sleep
来实现进程睡眠,但是该函数是阻塞的,意味着该函数运行期间是没办法通过其它方法来进行打断,所以Gunicorn
采用了事件循环的思路实现Arbiter.sleep
,该通过使用select.select
来读取PIPE
,同时设置超时为1秒,sleep
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def sleep (self ): try : ready = select.select([self.PIPE[0 ]], [], [], 1.0 ) if not ready[0 ]: return while os.read(self.PIPE[0 ], 1 ): pass except (select.error, OSError) as e: error_number = getattr (e, 'errno' , e.args[0 ]) if error_number not in [errno.EAGAIN, errno.EINTR]: raise except KeyboardInterrupt: sys.exit()
在这段逻辑中,会把PIPE
的读文件描述符传给select.select
中,这样select.select
会等待PIPE
的事件再返回,同时它的最大等待时间为1秒,之后这段运行逻辑会停留在这里,但并不会阻塞当前进程,如果这时候进程有收到信号,进程还是可以正常接收信号, 并通过wakeup
函数往PIPE
写入一个字节,接着select.select
就能通过PIPE
捕获到事件并返回,这时候上面停留的代码逻辑就会继续执行,等于sleep
函数可以提前结束等待,Gunicorn
的主循环能继续转了。
这一段主要是依赖于事件循环相关来防止主进程被阻塞,可以通过搜索事件循环
,epoll
来了解更多相关的。 如果想知道如何在阻塞代码中实现一个协程,可以通过文章–初识Python协程的实现 了解
5.无感切换实例 在分析Arbiter
,有几处都是先忽略跳过不分析,这部分的功能我把他称为无感切换新实例,这里的无感是指与Gunicorn
绑定的scoket交互的应用程序,如Nginx
或者客户端等。 这个功能可以使Gunicorn
的实例在运行的时候,重新开一个新的Gunicorn
实例来运行我们指定的代码,这时候读取的代码和配置都是最新的,与我们当前正在运行的旧实例不一样,但是他们都能针对同一批socket处理请求(可用于滚动发布以及灰度发布)。
为了方便阐述,我把一个Master
进程与它fork出来的Worker
进程统称为一个实例。
实际上官方把这个功能称为:Upgrading to a new binary on the fly ,这个功能是大多数Pre-Worker
模型的服务器都会支持的,不过这个功能需要多块不同生命周期的代码来结合才可以完成,所以单独拎出来分析。
根据文档:
First, replace the old binary with a new one, then send a USR2 signal to the current master process. It executes a new binary whose PID file is postfixed with .2 (e.g. /var/run/gunicorn.pid.2), which in turn starts a new master process and new worker processes
可以知道,Master
进程在收到USR2
的信号后,会创建一个新的实例,这部分的源代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 def handle_usr2 (self ): self.reexec()def reexec (self ): if self.reexec_pid != 0 : self.log.warning("USR2 signal ignored. Child exists." ) return if self.master_pid != 0 : self.log.warning("USR2 signal ignored. Parent exists." ) return master_pid = os.getpid() self.reexec_pid = os.fork() if self.reexec_pid != 0 : return self.cfg.pre_exec(self) environ = self.cfg.env_orig.copy() environ['GUNICORN_PID' ] = str (master_pid) if self.systemd: environ['LISTEN_PID' ] = str (os.getpid()) environ['LISTEN_FDS' ] = str (len (self.LISTENERS)) else : environ['GUNICORN_FD' ] = ',' .join( str (l.fileno()) for l in self.LISTENERS) os.chdir(self.START_CTX['cwd' ]) os.execvpe(self.START_CTX[0 ], self.START_CTX['args' ], environ)
按官方文档的示例,当Gunicorn
执行完这段逻辑后,就有一个新的Gunicorn
实例开始运行了,由于这个实例的环境变量中存在一个GUNICORN_PID
的变量, 所以在运行时会有一些不一样, 比如在Arbiter.start
进行初始化时会设置不一样的属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def start (self ): if 'GUNICORN_PID' in os.environ: self.master_pid = int (os.environ.get('GUNICORN_PID' )) self.proc_name = self.proc_name + ".2" self.master_name = "Master.2" self.pid = os.getpid() if self.cfg.pidfile is not None : pidname = self.cfg.pidfile if self.master_pid != 0 : pidname += ".2" self.pidfile = Pidfile(pidname) self.pidfile.create(self.pid)
同时,在初始化scoket时,还会沿用环境变量中名为GUNICORN_FD
的值,这个值是创建这个实例的Master
进程用到的文件描述符,新创建的实例通过复用相同的文件描述符,使其它也能读取到相同端口的网络请求数据:
1 2 3 4 5 6 7 8 9 10 def start (self ): ... elif self.master_pid: fds = [] for fd in os.environ.pop('GUNICORN_FD' ).split(',' ): fds.append(int (fd)) self.LISTENERS = sock.create_sockets(self.cfg, self.log, fds)
在新创建的实例运行后, 机器上的Gunicorn
进程列表将变为这样子:
1 2 3 4 5 6 7 8 9 10 PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND20844 benoitc 20 0 54808 11m 3352 S 0.0 0.1 0 :00.36 gunicorn: master [test:app]20849 benoitc 20 0 54808 9.9 m 1500 S 0.0 0.1 0 :00.02 gunicorn: worker [test:app]20850 benoitc 20 0 54808 9.9 m 1500 S 0.0 0.1 0 :00.01 gunicorn: worker [test:app]20851 benoitc 20 0 54808 9.9 m 1500 S 0.0 0.1 0 :00.01 gunicorn: worker [test:app]20854 benoitc 20 0 55748 12m 3348 S 0.0 0.2 0 :00.35 gunicorn: master [test:app]20859 benoitc 20 0 55748 11m 1500 S 0.0 0.1 0 :00.01 gunicorn: worker [test:app]20860 benoitc 20 0 55748 11m 1500 S 0.0 0.1 0 :00.00 gunicorn: worker [test:app]20861 benoitc 20 0 55748 11m 1500 S 0.0 0.1 0 :00.01 gunicorn: worker [test:app]
从这个进程列表可以发现目前有两个Master
进程,他们分别有3个Worker
子进程,这时候两个实例是一起运行的,如果指定的代码文件没有进行修改,指定的配置也没变,那么这两个实例的逻辑可以认为是等效的。
当用户判断新的实例能正常处理请求后, 可以发送信号TERM
给旧实例的Master
,让它开始优雅的关闭Worker
并退出,然后新创建的实例的Master
进程会在核心循环中发现创建自己的父进程已经退出了, 就让自己晋升为真正名义上的Master
进程,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def maybe_promote_master (self ): if self.master_pid == 0 : return if self.master_pid != os.getppid(): self.log.info("Master has been promoted." ) self.master_name = "Master" self.master_pid = 0 self.proc_name = self.cfg.proc_name del os.environ['GUNICORN_PID' ] if self.pidfile is not None : self.pidfile.rename(self.cfg.pidfile) util._setproctitle("master [%s]" % self.proc_name)
通过源码可以发现,无论新实例有没有通过may_be_promote_master
晋升为名义上的Master
进程, 总体上的逻辑跟原先的Master
进程是一样的,只不过是没办法通过接受USR2
信号来创建新的实例。
通过这种方式可以无感的升级应用代码,结合其它的信号,在升级失败时也能关闭新创建的实例,切回到旧实例,具体可以通过Upgrading to a new binary on the fly 了解。
6.Worker与Master的交互 分析完了Arbiter
后,整个Gunicorn
的核心还剩下Worker
尚未分析,Gunicorn
中带了多种Worker
,比如用在gevent
场景的Worker.ggevent.GeventWorker
,用在Tornado
的Worker.gtornado.TornadoWorker
。这些Worker
除了一些与Arbiter
交互的方法外, 还有一些方法用来通过读取scoket
的数据并转化为WSGI
协议发给挂在后面的WSGI
应用,这意味着Gunicorn
不仅用于WSGI
场景,还可以通过自己编写Worker
来对接其它的场景,比如Uvicorn.Worker
就是对接ASGI
应用等。
由于我在分析Gunicorn
时,我是抱着使用Gunicorn
来托管我的TCP服务的,所以我是着重了解Worker
与Arbiter
的交互,对于自带的其它Worker
,则不多做说明(通过了解WSGI
协议也能了解它们的执行逻辑)。
Worker
与Arbiter
的交互可以简化为如下图:
Gunicorn源码分析-Worker与Arbiter交互
图中蓝色和绿色线条代表两种类型的信号交互,黑色代表其它的Master
进程与Worker
进程的交互。
6.1.基于信号的交互 在图中见到了已经在4.2.管理worker数量--manager-workers
说过的manage_workers
,它除了在Arbiter
中会调用manage_workers
进行Worker
初始化,之后会在接收到用户发起信号的时候调用manage_worker
来进行增减。这类型信号更改Worker
数量有两种, 一种是修改配置的Worker
数量, 然后通过信号HUP
重载配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def handle_hup (self ): self.log.info("Hang up: %s" , self.master_name) self.reload()def reload (self ): ... for _ in range (self.cfg.workers): self.spawn_worker() self.manage_workers()
另外一种就是通过信号TTIN
和TTOU
来更改数量的加减:
1 2 3 4 5 6 7 8 9 def handle_ttin (self ): self.num_workers += 1 self.manage_workers()def handle_ttou (self ): if self.num_workers <= 1 : return self.num_workers -= 1 self.manage_workers()
此外,Master
除了接收用户的信号外,还接收自己创建的Worker
进程的信号,当Worker
进程退出时,会发送信号CHLD
给Master
进程,Master
进程会调用reap_worker
来回收对应Worker
的进程资源, reap_worker
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 def reap_workers (self ): try : while True : wpid, status = os.waitpid(-1 , os.WNOHANG) if not wpid: break if self.reexec_pid == wpid: self.reexec_pid = 0 else : exitcode = status >> 8 if exitcode == self.WORKER_BOOT_ERROR: reason = "Worker failed to boot." raise HaltServer(reason, self.WORKER_BOOT_ERROR) if exitcode == self.APP_LOAD_ERROR: reason = "App failed to load." raise HaltServer(reason, self.APP_LOAD_ERROR) if os.WIFSIGNALED(status): self.log.warning( "Worker with pid %s was terminated due to signal %s" , wpid, os.WTERMSIG(status) ) worker = self.WORKERS.pop(wpid, None ) if not worker: continue worker.tmp.close() self.cfg.child_exit(self, worker) except OSError as e: if e.errno != errno.ECHILD: raise
这段代码实际上是为了解决一个类Unix系统等待终止子进程的问题, 该问题是如果子进程在终止过程中,子进程完全消失了,就没有给父进程留下任何可以来了解子进程的东西,父进程没办法清理与子进程相关的数据,所以类Unix系统有这样一个设计:如果子进程在父进程之前结束会先发送信号给父进程,然后内核会把子进程设置为一个特殊的状态,处于这个状态的进程叫做僵尸进程,这类进程只保留最小的概要信息并等待分进程来查询自己的信息,只要父进程获取了子进程的信息后,子进程就会消失,否则会一直保持僵死状态(zombie)。其中父进程获取子进程的信息有多种方式,在Gunicorn
是采用waitpid
来获取子进程的信息,而reap_workers
的主要责任就是通过waitpid
获取子进程信息并做出响应。
在Gunicorn
中,采用waitpid
来获取僵死进程的状态和信息,但是直接使用waitpid
会暂时停止目前进程的执行,直到有信号来到或者有子进程结束,所以使用了WHOHANG
标记,该标记表示如果没有任何已经结束的子进程也马上返回结果,不等待。Gunicorn
通过使用该标记以及循环的方式来解决可能同时出现多个进程变为僵死状态的问题。
此外Gunicorn
在waitpid
中传了一个参数值-1
,这个参数的名为pid
,输入不同的值有不同的意义:
pid>0时,只等待进程ID等于pid的子进程,不管其它已经有多少子进程运行结束退出了,只要指定的子进程还没有结束,waitpid就会一直等下去。
pid=-1时,等待任何一个子进程退出,没有任何限制,此时waitpid和wait的作用一模一样。
pid=0时,等待同一个进程组中的任何子进程,如果子进程已经加入了别的进程组,waitpid不会处理它。
pid<-1时,等待一个指定进程组中的任何子进程,这个进程组的ID等于pid的绝对值。
Gunicorn
在通过waitpid
获取到的返回信息中第一个pid代表退出进程的pid, 如果为空就代表没有子进程退出,应该直接退出逻辑返回到循环中,第二个status它包含了一些子进程的附加信息,该参数的高8位记录进程调用exit退出的状态,低8位记录进程接收到的信号,如果是正常退出,高8位数为退出状态,低8位数为0,如果是非正常退出,高8位数为0,低8位数为信号id,所以Gunicorn
会通过status >> 8
来获取低8位的数据,且当它不为0时就判断是否是自己定义的特殊信号,如果是则按照信号进行抛异常。
6.2.Master进程主动检测 上面说到Master
进程虽然可以收到子进程退出时发出的CHLD
信号,但是并不是所有子进程退出时都能发出CHLD
信号,所以Master
进程还需要做到主动检测,Gunicorn
在主动检查中用到了一个比较奇特的方法–临时文件的最后修改时间,该方法是通过Worker
进程每隔一段时间更新临时文件的最后修改时间,Master
进程每隔一段时间就去检测最后修改时间是否在一段范围内, 如果不合法就剔除这个Worker
进程。
这种方式挺让人困惑的,同时容易引起性能问题,具体见How do I avoid Gunicorn excessively blocking in os.fchmod? ,目前官方表示可能会进行改进, 见Gunicorn
中的一条issue
这个检测是思路通过WorkerTmp
类来实现,它的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class WorkerTmp (object ): def __init__ (self, cfg ): ... self.spinner = 0 def notify (self ): self.spinner = (self.spinner + 1 ) % 2 os.fchmod(self._tmp.fileno(), self.spinner) def last_update (self ): return os.fstat(self._tmp.fileno()).st_ctime
它会在Worker
初始化时实例化为Worker.tmp
属性,对于Worker
进程,必须在self.timeout / 2
的时间间隔调用tmp.notify
来更新修改文件的更新时间;对于Master
则通过tmp.last_update
来获取临时文件的最后修改时间,以此判断Worker
是否还存活,这部分就是示例图中的murder_workers
,它的源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def murder_workers (self ): if not self.timeout: return workers = list (self.WORKERS.items()) for (pid, worker) in workers: try : if time.time() - worker.tmp.last_update() <= self.timeout: continue except (OSError, ValueError): continue if not worker.aborted: self.log.critical("WORKER TIMEOUT (pid:%s)" , pid) worker.aborted = True self.kill_worker(pid, signal.SIGABRT) else : self.kill_worker(pid, signal.SIGKILL)
7.总结 至此,Gunicorn
的主核心逻辑源码分析已经分析完毕了,可以发现Gunicorn
就是一个大管家,会把获取网络请求的功能下放给下面工作的Worker
,自己只负责一些Worker
的管理等功能。 同时可以发现Gunicorn
并不只是WSGI
服务,通过自定义Worker
,它也可以挂载TCP之类的应用。