Python异步

Posted by Wh0ami-hy on April 25, 2022

1. 进程

资源分配的基本单位

进程是正在运行的程序的实例

1.1. fork()

在 Unix/Linux 系统中fork 函数被用于创建进程。这个函数很特殊,对于普通的函数,调用一次,返回一次,但是调用 fork 一次,它返回两次。事实上,fork 函数创建了新的进程,我们把它称为子进程,子进程几乎是当前进程(即父进程)的一个拷贝:它会复制父进程的代码段,堆栈段和数据段。

对于父进程,fork 函数返回了子进程的进程号 pid。对于子进程,fork 函数则返回 0,这也是 fork 函数返回两次的原因,根据返回值,我们可以判断进程是父进程还是子进程。

import os
pid = os.fork()
if pid < 0:
    print 'Fail to create process'
elif pid == 0:
    print 'I am child process (%s) and my parent is (%s).' % (os.getpid(), os.getppid())
else:
    print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)

虽然子进程复制了父进程的代码段和数据段等,但是一旦子进程开始运行,子进程和父进程就是相互独立的,它们之间不再共享任何数据

1.2. 多进程

python提供multiprocessing模块实现多进程,需要注意的是multiprocessing 在 Windows 和 Linux 平台的不一致性:一样的代码在 Windows 和 Linux 下运行的结果可能不同。因为 Windows 的进程模型和 Linux 不一样,Windows 下没有 fork

创建进程对象

from multiprocessing import Process
p = Process(参数)
参数名 说明
target 执行的目标任务名,应该是一个函数名(函数名后不要加括号)
name 进程名,一般不用设置
group 进程组,目前只能使用None

执行带参数的任务

参数名 说明
args 以元组的方式给执行任务传参,参数是按顺序进行对应的
kwargs 以字典的方式给执行任务传参,参数是按键名进行对应的

启动进程

p.start()

import multiprocessing
import time

def sing(num):
    for i in range(num):
        print("singing")
        time.sleep(0.5)

def draw(num):
    for i in range(num):
        print("drawing")
        time.sleep(0.5)

if __name__ == '__main__':
    process1 = multiprocessing.Process(target=sing,args=(3,))	#使用元组进行传参
    
    process2 = multiprocessing.Process(target=draw,kwargs={"num":3})
    #使用字典进行传参

    process1.start()
    process2.start()

    # sing(3)
    # draw(3)

获取当前进程id

os.getpid()

获取当前父进程id

os.getppid()

1.3. 守护主进程

如果希望主进程结束的同时结束所有子进程,就需要设置守护主进程

子进程.daemon = True  #在启动进程前设置

import multiprocessing

def sing():

    for i in range(10):
        print("singing")

if __name__ == '__main__':
    process1 = multiprocessing.Process(target=sing)

    process1.daemon = True      #设置主进程守护

    process1.start()

    print("主进程结束")

1.4. 进程阻塞

join 方法,该方法用于阻塞子进程以外的所有进程(这里指父进程),当子进程执行完毕后,父进程才会继续执行,它通常用于进程间的同步

1.5. 结束进程

不管任务是否完成,立即停止工作进程

进程对象.terminate()

但是通过执行系统命令ps查看停止后的进程, 会发现, 直接调用terminate方法停止的进程变成了一个僵尸进程(defunct), 只能等待主程序退出, 这个僵尸进程才会消失.

通过在terminate后添加一次调用join方法等待进程真正结束, 就能避免出现僵尸进程:

进程对象.terminate()
进程对象.join()

1.6. 进程池

import os, time  
from multiprocessing import Pool  
def foo(x):  
    print('Run task %s (pid:%s)...' % (x, os.getpid()))  
    time.sleep(2)  
    print('Task %s result is: %s' % (x, x * x))  
  
  
if __name__ == '__main__':  
    print('Parent process %s.' % os.getpid())  
    p = Pool(4)         # 设置进程数  
    for i in range(5):  
        p.apply_async(foo, args=(i,))    # 设置每个进程要执行的函数和参数  
    print('Waiting for all subprocesses done...')  
    p.close()  
    p.join()  
    print('All subprocesses done.')

Pool 用于生成进程池,对 Pool 对象调用 apply_async 方法可以使每个进程异步执行任务,也就说不用等上一个任务执行完才执行下一个任务,close 方法用于关闭进程池,确保没有新的进程加入,join 方法会等待所有子进程执行完毕

1.7. 进程间通信

由于每个进程都有各自的内存空间,数据栈等,所以只能使用进程间通讯(Inter-Process Communication, IPC),而不能直接共享信息

Python 的 multiprocessing 模块封装了底层的实现机制,让我们可以通过Queue很容易地实现进程间的通信

import time  
from multiprocessing import Process, Queue  
# 向队列中写入数据  
def write_task(q):  
    try:  
        n = 1  
        while n < 5:  
            print("write, %d" % n)  
            q.put(n)  
            time.sleep(1)  
            n += 1  
    except BaseException:  
        print("write_task error")  
    finally:  
        print("write_task end")  
  
  
# 从队列读取数据  
def read_task(q):  
    try:  
        n = 1  
        while n < 5:  
            print("read, %d" % q.get())  
            time.sleep(1)  
            n += 1  
    except BaseException:  
        print("read_task error")  
    finally:  
        print("read_task end")  
  
  
if __name__ == "__main__":  
    q = Queue()  # 父进程创建Queue,并传给各个子进程  
    pw = Process(target=write_task, args=(q,))  
    pr = Process(target=read_task, args=(q,))  
    pw.start()   # 启动子进程 pw,写入  
    pr.start()   # 启动子进程 pr,读取  
    pw.join()    # 等待 pw 结束  
    pr.join()    # 等待 pr 结束  
    print("DONE")

2. 线程

执行程序的最小单元

进程是线程的容器

一个进程中最少有一个线程负责执行程序

它与同属一个进程的其他线程共享进程所拥有的全部资源

2.1. 多线程

在 Python 中,进行多线程编程的模块有两个:thread 和 threading。其中,thread 是低级模块,threading 是高级模块,对 thread 进行了封装,一般来说,我们只需使用 threading 这个模块

创建线程对象

import threading
t = threading.Thread(target=任务名)
参数 说明
target 执行的目标任务名,一般是函数名
name 线程名
group 线程组,目前只能使用None

线程执行带参数的任务

参数 说明
args 以元组的方式给执行任务传参,参数是按顺序进行对应的
kwargs 以字典的方式给执行任务传参,参数是按键名进行对应的

启动线程

线程对象.start()

import threading
import time
def sing():
    for i in range(5):
        print('sing')
        time.sleep(1)

def dance():
    for i in range(5):
        print('dance')
        time.sleep(1)

if __name__ == '__main__':
    # sing()
    # dance()
    sing_thread = threading.Thread(target=sing)
    dance_thread = threading.Thread(target=dance)
    
    sing_thread.start()
    dance_thread.start()

2.2. 守护线程

如果希望主线程结束的同时结束所有子线程,就需要设置守护主线程

线程对象 = threading.Thread(target= ,daemon=True)

线程对象.setDaemon(True)

import threading
import time
def sing():
    for i in range(5):
        print('sing')
        time.sleep(1)

if __name__ == '__main__':
    # sing()
    # dance()
    sing_thread = threading.Thread(target=sing)

    sing_thread.setDaemon(True)
    sing_thread.start()

    time.sleep(1)
    print("主线程结束")

2.3. 线程阻塞

使用join方法,让主线程等待子线程执行

import threading  
import time  
  
def test():  
  
    print(threading.current_thread())  
    time.sleep(5)  
  
if __name__ == '__main__':  
    thread_list = []  
    for i in range(5):  
        th = threading.Thread(target=test)  
        th.start()  
        thread_list.append(th)  
    for th in thread_list:  
        th.join()  
    print('主线程结束')

如果代码写为:

if __name__ == '__main__':  
    thread_list = []  
    for i in range(5):  
        th = threading.Thread(target=test)  
        th.start()  
        th.join()  
    print('主线程结束')

当代码第一次运行到join()时,主线程就卡住了,后面的循环根本没有执行。此时当前只有 thread_1执行过.start()方法,所以此时只有 thread_1在运行。这个线程需要执行5秒钟。等5秒过后,thread_1结束,于是主线程才会运行for的第二次循环,第二个线程才会开始运行。所以这个例子里面,三个线程串行运行

为什么会有 join 这个功能呢?我们设想这样一个场景。你的爬虫使用10个线程爬取100个 URL,主线程需要等到所有URL 都已经爬取完成以后,再来分析数据。此时就可以通过 join 先把主线程卡住,等到10个子线程全部运行结束了,再用主线程进行后面的操作

2.4. 锁

由于同一个进程之间的线程是内存共享的,所以当多个线程对同一个变量进行修改的时候,就会得到意想不到的结果

由于线程是交替运行的,线程在执行时可能中断,就会导致其他线程读到一个脏值

为了保证计算的准确性,我们就需要给操作加上。当某个线程开始执行这个操作时,由于该线程获得了锁,因此其他线程不能同时执行该操作,只能等待,直到锁被释放,这样就可以避免修改的冲突。创建一个锁可以通过 threading.Lock() 来实现

from threading import Thread, current_thread, Lock  
num = 0  
lock = Lock()  
def calc():  
    global num  
    print('thread %s is running...' % current_thread().name)  
    for i in range(10000):  
        lock.acquire()    # 获取锁  
        num += 1  
        lock.release()    # 释放锁  
    print('thread %s ended.' % current_thread().name)  
if __name__ == '__main__':  
    print('thread %s is running...' % current_thread().name)  
    threads = []  
    for i in range(5):  
        threads.append(Thread(target=calc))  
        threads[i].start()  
    for i in range(5):  
        threads[i].join()  
    print('global num: %d' % num)  
    print('thread %s ended.' % current_thread().name)

2.5. GIL 锁

GIL 全称是 Global Interpreter Lock,译为全局解释锁

GIL 锁的存在导致 Python 不能有效地使用多线程实现多核任务,因为在同一时间,只能有一个线程在运行

2.6. 信号量

2.7. 事件

2.8. 销毁子线程

要做到主线程不结束,但是要强行结束子线程

方法一:使用ctypes强行杀掉线程

import threading
import time
import inspect
import ctypes


def _async_raise(tid, exctype):
    """raises the exception, performs cleanup if needed"""
    tid = ctypes.c_long(tid)
    if not inspect.isclass(exctype):
        exctype = type(exctype)
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        # """if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"""
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
        raise SystemError("PyThreadState_SetAsyncExc failed")


def stop_thread(thread):
    _async_raise(thread.ident, SystemExit)


def test_thread():
    i = 0
    while True:
        print(i+1)
        time.sleep(2)


if __name__ == "__main__":
    t = threading.Thread(target=test_thread)
    t.start()
    stop_thread(t)

方法二:使用全局变量控制退出条件

import threading
import time

# 全局变量,用于控制子线程退出
stop_flag = False

# 子线程的执行函数
def child_thread():
    print("子线程开始执行")
    while not stop_flag:
        print("子线程正在运行")
        time.sleep(1)

    print("子线程退出")

# 创建子线程并启动
thread = threading.Thread(target=child_thread)
thread.start()

# 在主线程中等待一段时间
time.sleep(5)

# 修改全局变量,通知子线程退出
stop_flag = True

# 等待子线程结束
thread.join()

print("主线程结束")

3. ThreadLocal

为了避免多个线程同时修改全局变量,我们就需要对全局变量的修改加锁。

除了对全局变量的修改进行加锁,你可能也想到了可以使用线程自己的局部变量,因为局部变量只有线程自己能看见,对同一进程的其他线程是不可访问的

threading.local()使用 ThreadLocal 对象来线程绑定自己独有的数据

4. 协程

协程允许有多个入口对程序进行中断、继续执行等操作

协程的切换由用户自己管理和调度

通过创建协程将异步编程同步化

第三方库 gevent 对协程提供了强大的支持。另外,Python3.5 提供了 async/await 语法来实现对协程的支持。

相比多线程,协程的一大特点就是它在一个线程内执行,既避免了多线程之间切换带来的开销,也避免了对共享资源的访问冲突

5. concurrent.futures模块

异步执行可以由 ThreadPoolExecutor 使用线程或由 ProcessPoolExecutor 使用单独的进程来实现。 两者都是实现抽像类 Executor 定义的接口

5.1. Executor 对象

抽象类Executor提供异步执行调用方法。要通过它的子类调用,而不是直接调用

submit(fn, args, **kwargs)

调度可调用对象 fn,并返回一个代表该可调用对象的执行的 Future 对象

建议使用 with 语句来搭配执行,否则得手动实现资源的释放

map(fn, *iterables, timeout=None, chunksize=1)

返回一个 map()迭代器,这个迭代器中的回调执行返回的结果是有序的

5.2. ThreadPoolExecutor

class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

使用最多 max_workers 个线程的线程池来异步执行调用

thread_name_prefix参数允许用户控制由线程池创建的线程名称

def get_page(url):  
    page.get(url)  
    for li in page.eles('tag:li'):  
        for img in li.eles('tag:img'):  
            img_page = img.attr('src')  
            print(img_page)  
            page.download(img_page, './img')  
def getPic():  
    start_overall = time.time()  
    with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:  
        for pageNum in range(1, 5):  
            if pageNum == 1:  
                url = 'https://pic.netbian.com/4kdongman/index.html'  
            else:  
                url = 'https://pic.netbian.com/4kdongman/index_' + str(pageNum) +'.html'  
            executor.submit(get_page, url)  
    end_overall = time.time()  
    print(f"总耗时 {end_overall - start_overall:.2f} 秒")

5.3. ProcessPoolExecutor

ProcessPoolExecutor 会使用 multiprocessing 模块,这允许它绕过全局解释器锁

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

使用最多具有 max_workers 个进程的进程池,在 Windows 上,max_workers 必须小于等于 61,否则将引发ValueError

mp_context 参数允许用户控制由进程池创建给工作者进程的开始方法

import concurrent.futures

import math

PRIMES = [

    112272535095293,

    112582705942171,

    112272535095293,

    115280095190773,

    115797848077099,

    1099726899285419]

def is_prime(n):

    if n < 2:

        return False

    if n == 2:

        return True

    if n % 2 == 0:

        return False

    sqrt_n = int(math.floor(math.sqrt(n)))

    for i in range(3, sqrt_n + 1, 2):

        if n % i == 0:

            return False

    return True

def main():

    with concurrent.futures.ProcessPoolExecutor() as executor:

        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):

            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':

    main()

本站总访问量