python gevent基础

目录

[TOC]

gevent是什么?

gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.

核心部分

Greenlet状态

gevent中用到的主要模式是Greenlet,它是以C扩展模块形式接入Pyhton的轻量级协程。
Greenlet全部运行在主程序系统进程的内部,但他们被协作地调度。这与multiprocessing或threading等提供真正并行构造的库是不同的。 这些库轮转使用操作系统调度的进程和线程,是真正的并行。

在greenlet中有一些标志,让你可以监视它的线程内部状态:

  • started:Boolean,指示此Greenlet是否已经启动
  • ready(): Boolean,指示Greenlet是否已经停止
  • successful(): Boolean,指示Greenlet是否已经停止而且没抛出异常
  • value:函数返回的值
  • exception: 异常,函数内抛出的未捕获异常

例如:

import geventdef win():    return 'You win!'def fail():    raise Exception('You fail at failing.')winner = gevent.spawn(win)loser = gevent.spawn(fail)print(winner.started) # Trueprint(loser.started)  # True# Exceptions raised in the Greenlet, stay inside the Greenlet.try:    gevent.joinall([winner, loser])except Exception as e:    print('This will never be reached')print(winner.value) # 'You win!'print(loser.value)  # Noneprint(winner.ready()) # Trueprint(loser.ready())  # Trueprint(winner.successful()) # Trueprint(loser.successful())  # False# The exception raised in fail, will not propogate outside the# greenlet. A stack trace will be printed to stdout but it# will not unwind the stack of the parent.print(loser.exception)

运行结果:

# It is possible though to raise the exception again outside# raise loser.exception# or with# loser.get()TrueTrueYou win!NoneTrueTrueTrueFalseYou fail at failing.

程序停止

当主程序(main program)收到一个SIGQUIT信号时,不能成功做yield操作的Greenlet可能会令意外地挂起程序的执行。
这导致了所谓的僵尸进程,它需要在Python解析器之外被kill。

对此,一个通用的处理模式就是在主程序中监听SIGQUIT信息,在程序退出调用gevent.shutdown。
如下:

import geventimport signaldef run_forever():    gevent.sleep(1000)if __name__ == '__main__':    gevent.signal(signal.SIGQUIT, gevent.shutdown)    thread = gevent.spawn(run_forever)    thread.join()

超时

超时是一种对一块代码或一个Greenlet的运行时间的约束。

import geventfrom gevent import Timeoutseconds = 10timeout = Timeout(seconds)timeout.start()def wait():    gevent.sleep(10)try:    gevent.spawn(wait).join()except Timeout:    print('Could not complete')

灵猴补丁(Monkey patching)

在这种情况下,gevent能够 修改标准库里面大部分的阻塞式系统调用,包括socket、ssl、threading和 select等模块,而变为协作式运行。

for gevent import monkeymonkey.patch_all()

数据结构

事件(event)

事件(event)是一个在Greenlet之间异步通信的形式。

  • event.set()产生事件,event.wait()等待事件。
mport geventfrom gevent.event import Event'''Illustrates the use of events'''evt = Event()def setter():    '''After 3 seconds, wake all threads waiting on the value of evt'''    print('A: Hey wait for me, I have to do something')    gevent.sleep(3)    print("Ok, I'm done")    evt.set()def waiter():    '''After 3 seconds the get call will unblock'''    print("I'll wait for you")    evt.wait()  # blocking    print("It's about time")def main():    gevent.joinall([        gevent.spawn(setter),        gevent.spawn(waiter),        gevent.spawn(waiter),        gevent.spawn(waiter),        gevent.spawn(waiter),        gevent.spawn(waiter)    ])if __name__ == '__main__': main()
  • 事件对象的一个扩展AsyncResult,其中set(value)方法可以设置一个事件值,get()方法可以等待事件发生,并获得者值返回。
import geventfrom gevent.event import AsyncResulta = AsyncResult()def setter():    """    After 3 seconds set the result of a.    """    gevent.sleep(3)    a.set('Hello!')def waiter():    """    After 3 seconds the get call will unblock after the setter    puts a value into the AsyncResult.    """    print(a.get())gevent.joinall([    gevent.spawn(setter),    gevent.spawn(waiter),])

队列

import geventfrom gevent.queue import Queuequeue1 = Queue()

队列是一个排序的数据集合,他有如下方法:

  • 阻塞:put(value)、get(),队列满或者为空时,相应操作将阻塞
  • 非阻塞:put_nowait(value)、get_nowait(),非阻塞,操作不能完成将抛出gevent.queue.Fullgevent.queue.Empty异常

队列中的值如果被一个Greenlet从队列中取出,则其就不会被其他的Greenlet再取出。例如:

import geventfrom gevent.queue import Queuetasks = Queue()def worker(n):    while not tasks.empty():        task = tasks.get()        print('Worker %s got task %s' % (n, task))        gevent.sleep(0)    print('Quitting time!')def boss():    for i in xrange(1,25):        tasks.put_nowait(i)gevent.spawn(boss).join()gevent.joinall([    gevent.spawn(worker, 'steve'),    gevent.spawn(worker, 'john'),    gevent.spawn(worker, 'nancy'),])

运行结果:

Worker steve got task 1Worker john got task 2Worker nancy got task 3Worker steve got task 4Worker nancy got task 5Worker john got task 6Worker steve got task 7Worker john got task 8Worker nancy got task 9Worker steve got task 10Worker nancy got task 11Worker john got task 12Worker steve got task 13Worker john got task 14Worker nancy got task 15Worker steve got task 16Worker nancy got task 17Worker john got task 18Worker steve got task 19Worker john got task 20Worker nancy got task 21Worker steve got task 22Worker nancy got task 23Worker john got task 24Quitting time!Quitting time!Quitting time!

在下面例子中,我们让boss与多个worker同时运行,并限制了queue不能放入多于3个元素。 这个限制意味着,直到queue有空余空间之间,put操作会被阻塞。相反地,如果队列中 没有元素,get操作会被阻塞。它同时带一个timeout参数,允许在超时时间内如果 队列没有元素无法完成操作就抛出gevent.queue.Empty异常。

import geventfrom gevent.queue import Queue, Emptytasks = Queue(maxsize=3)def worker(n):    try:        while True:            task = tasks.get(timeout=1) # decrements queue size by 1            print('Worker %s got task %s' % (n, task))            gevent.sleep(0)    except Empty:        print('Quitting time!')def boss():    """    Boss will wait to hand out work until a individual worker is    free since the maxsize of the task queue is 3.    """    for i in xrange(1,10):        tasks.put(i)    print('Assigned all work in iteration 1')    for i in xrange(10,20):        tasks.put(i)    print('Assigned all work in iteration 2')gevent.joinall([    gevent.spawn(boss),    gevent.spawn(worker, 'steve'),    gevent.spawn(worker, 'john'),    gevent.spawn(worker, 'bob'),])

运行结果:

Worker steve got task 1Worker john got task 2Worker bob got task 3Worker steve got task 4Worker bob got task 5Worker john got task 6Assigned all work in iteration 1Worker steve got task 7Worker john got task 8Worker bob got task 9Worker steve got task 10Worker bob got task 11Worker john got task 12Worker steve got task 13Worker john got task 14Worker bob got task 15Worker steve got task 16Worker bob got task 17Worker john got task 18Assigned all work in iteration 2Worker steve got task 19Quitting time!Quitting time!Quitting time!

Group/Pool

组(group)是一个运行中greenlet的集合,,集合中的greenlet像一个组一样 会被共同管理和调度。 它也兼饰了像Python的multiprocessing库那样的 平行调度器的角色。

  • 组方法:add() join():
import geventfrom gevent.pool import Groupdef talk(msg):    for i in xrange(3):        print(msg)g1 = gevent.spawn(talk, 'bar')g2 = gevent.spawn(talk, 'foo')group = Group()group.add(g1)group.add(g2)group.join()
barbarbarfoofoofoo

这个例子非常简单。就是spawn了好几个talk,然后都加到组里面。最后使用group.join()来等待所有spawn完成,每完成一个就会从group里面去掉。由于没有返回值等问题,这个demo非常简单,

  • Group().map(func, interable),取得spawn的返回值,第二个参数是一个迭代器,每一个参数传递给执行函数func
rom gevent import getcurrentfrom gevent.pool import Groupgroup = Group()def hello_from(n):    print('Size of group %s' % len(group))    print('Hello from Greenlet %s' % id(getcurrent()))    return nx = group.map(hello_from, xrange(3))print type(x)print x
  • Group().imap(): 与map不一样,imap返回一个返回值的迭代器
import geventfrom gevent.pool import Groupdef intensive(n):    gevent.sleep(3 - n)    return 'task', nprint('Ordered')ogroup = Group()x = ogroup.imap(intensive, xrange(3))print xfor x in ogroup.imap(intensive, xrange(3)):    print x

imap中还有一个参数maxsize,可以设置迭代器一批一批执行。

def intensive(n):    gevent.sleep(2)    return 'task', nprint('Ordered')ogroup = Group()x = ogroup.imap(intensive, xrange(20), maxsize=3)print x

这里运行的时候,会将并行控制到3个,执行也是每2秒执行3个,而不是不设置的时候2秒之后将输出所有的结果。

  • Group().imap_unordered(): 先返回的先回来
import geventfrom gevent.pool import Groupdef intensive(n):    gevent.sleep(3 - n)    return 'task', nigroup = Group()for i in igroup.imap_unordered(intensive, xrange(3)):    print(i)

运行了可以看到输出是:

('task', 2)('task', 1)('task', 0)

先返回的先回来,这个如果是imap运行的话,会先等上3秒钟开始返回0然后1 2 一次返回。

  • Pool: Group是Pool类的父类。pool是可以指定池子里面最多可以拥有多少greenlet在跑
from gevent.pool import Poolclass SocketPool(object):    def __init__(self):        self.pool = Pool(1000)        self.pool.start()    def listen(self, socket):        while True:            socket.recv()    def add_handler(self, socket):        if self.pool.full():            raise Exception("At maximum pool size")        else:            self.pool.spawn(self.listen, socket)    def shutdown(self):        self.pool.kill()

信号量与锁

信号量

信号量是一个允许greenlet从底层互相协作用的一个东西,可以限制并发访问互斥的资源。

  • 信号量有两个方法,acquire和release:在信号量是否已经被 acquire或release,和拥有资源的数量之间不同,被称为此信号量的范围 (the bound of the semaphore)。
  • 信号量支持with上下文管理器。
    如果一个信号量的范围已经降到0,那么会阻塞acquire操作直到有其他家伙释放。
from gevent import sleepfrom gevent.pool import Poolfrom gevent.coros import BoundedSemaphoresem = BoundedSemaphore(2)def worker1(n):    sem.acquire()    print('Worker %i acquired semaphore' % n)    sleep(0)    sem.release()    print('Worker %i released semaphore' % n)def worker2(n):    with sem:        print('Worker %i acquired semaphore' % n)        sleep(0)    print('Worker %i released semaphore' % n)pool = Pool()pool.map(worker1, xrange(0,2))pool.map(worker2, xrange(3,6))

运行结果:

Worker 0 acquired semaphoreWorker 1 acquired semaphoreWorker 0 released semaphoreWorker 1 released semaphoreWorker 3 acquired semaphoreWorker 4 acquired semaphoreWorker 3 released semaphoreWorker 4 released semaphoreWorker 5 acquired semaphoreWorker 5 released semaphore

可以看到信号量的范围为2时,只能同时执行两个Greenlet,

范围为1的信号量也称为锁(lock)。它向单个greenlet提供了互斥访问。 信号量和锁常常用来保证资源只在程序上下文被单次使用。

线程局部变量

Gevent也允许你指定局部于greenlet上下文的数据。
原理:它被实现为以greenlet的getcurrent()为键, 在一个私有命名空间寻址的全局查找。

import geventfrom gevent.local import localstash = local()def f1():    stash.x = 1    print(stash.x)def f2():    stash.y = 2    print(stash.y)    try:        stash.x    except AttributeError:        print("x is not local to f2")g1 = gevent.spawn(f1)g2 = gevent.spawn(f2)gevent.joinall([g1, g2])

结果为:

12x is not local to f2

这让我们联想到常用python web应用框架flask的requests实现?一个requests就是一个http访问,在整个访问过程中我们可以从requests对象里面拿到很多参数,但是它和其他的requests互不影响,这就是线程本地变量的作用。

genvent.local还可以被继承实现基于当前greenlet能访问的一组属性的自己的类,如:

import geventfrom gevent.local import localclass MyLocal(local):    __slots__ = ('number', 'x')    # number = 2    initialized = False    def __init__(self, **kw):        if self.initialized:            raise SystemError('__init__ called too many times')        self.initialized = True        self.__dict__.update(kw)    def squared(self):        return self.number ** 2stash = MyLocal()def f1():    stash.x = 1    stash.number = 3    print stash.x    print stash.numberdef f2():    stash.y = 2    print(stash.y)    try:        print stash.x        print stash.number    except AttributeError:        print("x is not local to f2")g1 = gevent.spawn(f1)g2 = gevent.spawn(f2)gevent.joinall([g1, g2])

这里Mylocal继承了gevent的local,这里重点介绍一下slots在这里的用法,我们知道在常规的类里面指定slots的意思往往是只允许该类下的属性只允许有slots里面这些,超出的就会报出Attribute error的错误。但是继承了local的slots在这里却是指,申明了的属性将会穿透所有greenlet变成一个全局可读的,并不再是线程本地的,这里注意下。 其他的都没有什么好说的了

Actors

Actors的主要思想:每个Actor有一个可以从 其它Actor接收消息的收件箱。Actor内部的主循环遍历它收到的消息,并 根据它期望的行为来采取行动。

Gevent没有原生的Actor类型,但在一个子类化的Greenlet内使用队列, 我们可以定义一个非常简单的

import geventfrom gevent.queue import Queueclass Actor(gevent.Greenlet):    def __init__(self):        self.inbox = Queue()        Greenlet.__init__(self)    def receive(self, message):        """        Define in your subclass.        """        raise NotImplemented()    def _run(self):        self.running = True        while self.running:            message = self.inbox.get()            self.receive(message)class Pinger(Actor):    def receive(self, message):        print(message)        pong.inbox.put('ping')        gevent.sleep(0)class Ponger(Actor):    def receive(self, message):        print(message)        ping.inbox.put('pong')        gevent.sleep(0)ping = Pinger()pong = Ponger()ping.start()pong.start()ping.inbox.put('start')gevent.joinall([ping, pong])

参考文献

Was this helpful?

0 / 0

发表回复 0