1. 进程
资源分配的基本单位
进程是正在运行的程序的实例
1.1. fork()
在 Unix/Linux 系统中fork
函数被用于创建进程。这个函数很特殊,对于普通的函数,调用一次,返回一次,但是调用 fork
一次,它返回两次。事实上,fork
函数创建了新的进程,我们把它称为子进程,子进程几乎是当前进程(即父进程)的一个拷贝:它会复制父进程的代码段,堆栈段和数据段。
对于父进程,fork
函数返回了子进程的进程号 pid。对于子进程,fork
函数则返回 0
,这也是 fork
函数返回两次的原因,根据返回值,我们可以判断进程是父进程还是子进程。
import os
pid = os.fork()
if pid < 0:
print 'Fail to create process'
elif pid == 0:
print 'I am child process (%s) and my parent is (%s).' % (os.getpid(), os.getppid())
else:
print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
虽然子进程复制了父进程的代码段和数据段等,但是一旦子进程开始运行,子进程和父进程就是相互独立的,它们之间不再共享任何数据
1.2. 多进程
python提供multiprocessing模块实现多进程,需要注意的是multiprocessing 在 Windows 和 Linux 平台的不一致性:一样的代码在 Windows 和 Linux 下运行的结果可能不同。因为 Windows 的进程模型和 Linux 不一样,Windows 下没有 fork
创建进程对象
from multiprocessing import Process
p = Process(参数)
参数名 | 说明 |
---|---|
target | 执行的目标任务名,应该是一个函数名(函数名后不要加括号) |
name | 进程名,一般不用设置 |
group | 进程组,目前只能使用None |
执行带参数的任务
参数名 | 说明 |
---|---|
args | 以元组的方式给执行任务传参,参数是按顺序进行对应的 |
kwargs | 以字典的方式给执行任务传参,参数是按键名进行对应的 |
启动进程
p.start()
例
import multiprocessing
import time
def sing(num):
for i in range(num):
print("singing")
time.sleep(0.5)
def draw(num):
for i in range(num):
print("drawing")
time.sleep(0.5)
if __name__ == '__main__':
process1 = multiprocessing.Process(target=sing,args=(3,)) #使用元组进行传参
process2 = multiprocessing.Process(target=draw,kwargs={"num":3})
#使用字典进行传参
process1.start()
process2.start()
# sing(3)
# draw(3)
获取当前进程id
os.getpid()
获取当前父进程id
os.getppid()
1.3. 守护主进程
如果希望主进程结束的同时结束所有子进程,就需要设置守护主进程
子进程.daemon = True #在启动进程前设置
例
import multiprocessing
def sing():
for i in range(10):
print("singing")
if __name__ == '__main__':
process1 = multiprocessing.Process(target=sing)
process1.daemon = True #设置主进程守护
process1.start()
print("主进程结束")
1.4. 进程阻塞
join 方法,该方法用于阻塞子进程以外的所有进程(这里指父进程),当子进程执行完毕后,父进程才会继续执行,它通常用于进程间的同步
1.5. 结束进程
不管任务是否完成,立即停止工作进程
进程对象.terminate()
但是通过执行系统命令ps查看停止后的进程, 会发现, 直接调用terminate方法停止的进程变成了一个僵尸进程(defunct), 只能等待主程序退出, 这个僵尸进程才会消失.
通过在terminate后添加一次调用join方法等待进程真正结束, 就能避免出现僵尸进程:
进程对象.terminate()
进程对象.join()
1.6. 进程池
import os, time
from multiprocessing import Pool
def foo(x):
print('Run task %s (pid:%s)...' % (x, os.getpid()))
time.sleep(2)
print('Task %s result is: %s' % (x, x * x))
if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4) # 设置进程数
for i in range(5):
p.apply_async(foo, args=(i,)) # 设置每个进程要执行的函数和参数
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
Pool 用于生成进程池,对 Pool 对象调用 apply_async 方法可以使每个进程异步执行任务,也就说不用等上一个任务执行完才执行下一个任务,close 方法用于关闭进程池,确保没有新的进程加入,join 方法会等待所有子进程执行完毕
1.7. 进程间通信
由于每个进程都有各自的内存空间,数据栈等,所以只能使用进程间通讯(Inter-Process Communication, IPC),而不能直接共享信息
Python 的 multiprocessing 模块封装了底层的实现机制,让我们可以通过Queue很容易地实现进程间的通信
import time
from multiprocessing import Process, Queue
# 向队列中写入数据
def write_task(q):
try:
n = 1
while n < 5:
print("write, %d" % n)
q.put(n)
time.sleep(1)
n += 1
except BaseException:
print("write_task error")
finally:
print("write_task end")
# 从队列读取数据
def read_task(q):
try:
n = 1
while n < 5:
print("read, %d" % q.get())
time.sleep(1)
n += 1
except BaseException:
print("read_task error")
finally:
print("read_task end")
if __name__ == "__main__":
q = Queue() # 父进程创建Queue,并传给各个子进程
pw = Process(target=write_task, args=(q,))
pr = Process(target=read_task, args=(q,))
pw.start() # 启动子进程 pw,写入
pr.start() # 启动子进程 pr,读取
pw.join() # 等待 pw 结束
pr.join() # 等待 pr 结束
print("DONE")
2. 线程
执行程序的最小单元
进程是线程的容器
一个进程中最少有一个线程负责执行程序
它与同属一个进程的其他线程共享进程所拥有的全部资源
2.1. 多线程
在 Python 中,进行多线程编程的模块有两个:thread 和 threading。其中,thread 是低级模块,threading 是高级模块,对 thread 进行了封装,一般来说,我们只需使用 threading 这个模块
创建线程对象
import threading
t = threading.Thread(target=任务名)
参数 | 说明 |
---|---|
target | 执行的目标任务名,一般是函数名 |
name | 线程名 |
group | 线程组,目前只能使用None |
线程执行带参数的任务
参数 | 说明 |
---|---|
args | 以元组的方式给执行任务传参,参数是按顺序进行对应的 |
kwargs | 以字典的方式给执行任务传参,参数是按键名进行对应的 |
启动线程
线程对象.start()
例
import threading
import time
def sing():
for i in range(5):
print('sing')
time.sleep(1)
def dance():
for i in range(5):
print('dance')
time.sleep(1)
if __name__ == '__main__':
# sing()
# dance()
sing_thread = threading.Thread(target=sing)
dance_thread = threading.Thread(target=dance)
sing_thread.start()
dance_thread.start()
2.2. 守护线程
如果希望主线程结束的同时结束所有子线程,就需要设置守护主线程
线程对象 = threading.Thread(target= ,daemon=True)
或
线程对象.setDaemon(True)
例
import threading
import time
def sing():
for i in range(5):
print('sing')
time.sleep(1)
if __name__ == '__main__':
# sing()
# dance()
sing_thread = threading.Thread(target=sing)
sing_thread.setDaemon(True)
sing_thread.start()
time.sleep(1)
print("主线程结束")
2.3. 线程阻塞
使用join方法,让主线程等待子线程执行
import threading
import time
def test():
print(threading.current_thread())
time.sleep(5)
if __name__ == '__main__':
thread_list = []
for i in range(5):
th = threading.Thread(target=test)
th.start()
thread_list.append(th)
for th in thread_list:
th.join()
print('主线程结束')
如果代码写为:
if __name__ == '__main__':
thread_list = []
for i in range(5):
th = threading.Thread(target=test)
th.start()
th.join()
print('主线程结束')
当代码第一次运行到join()
时,主线程就卡住了,后面的循环根本没有执行。此时当前只有 thread_1
执行过.start()
方法,所以此时只有 thread_1
在运行。这个线程需要执行5秒钟。等5秒过后,thread_1
结束,于是主线程才会运行for的第二次循环,第二个线程才会开始运行。所以这个例子里面,三个线程串行运行
为什么会有 join 这个功能呢?我们设想这样一个场景。你的爬虫使用10个线程爬取100个 URL,主线程需要等到所有URL 都已经爬取完成以后,再来分析数据。此时就可以通过 join 先把主线程卡住,等到10个子线程全部运行结束了,再用主线程进行后面的操作
2.4. 锁
由于同一个进程之间的线程是内存共享的,所以当多个线程对同一个变量进行修改的时候,就会得到意想不到的结果
由于线程是交替运行的,线程在执行时可能中断,就会导致其他线程读到一个脏值
为了保证计算的准确性,我们就需要给操作加上锁
。当某个线程开始执行这个操作时,由于该线程获得了锁,因此其他线程不能同时执行该操作,只能等待,直到锁被释放,这样就可以避免修改的冲突。创建一个锁可以通过 threading.Lock()
来实现
from threading import Thread, current_thread, Lock
num = 0
lock = Lock()
def calc():
global num
print('thread %s is running...' % current_thread().name)
for i in range(10000):
lock.acquire() # 获取锁
num += 1
lock.release() # 释放锁
print('thread %s ended.' % current_thread().name)
if __name__ == '__main__':
print('thread %s is running...' % current_thread().name)
threads = []
for i in range(5):
threads.append(Thread(target=calc))
threads[i].start()
for i in range(5):
threads[i].join()
print('global num: %d' % num)
print('thread %s ended.' % current_thread().name)
2.5. GIL 锁
GIL
全称是 Global Interpreter Lock,译为全局解释锁
GIL
锁的存在导致 Python 不能有效地使用多线程实现多核任务,因为在同一时间,只能有一个线程在运行
2.6. 信号量
2.7. 事件
2.8. 销毁子线程
要做到主线程不结束,但是要强行结束子线程
方法一:使用ctypes强行杀掉线程
import threading
import time
import inspect
import ctypes
def _async_raise(tid, exctype):
"""raises the exception, performs cleanup if needed"""
tid = ctypes.c_long(tid)
if not inspect.isclass(exctype):
exctype = type(exctype)
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
if res == 0:
raise ValueError("invalid thread id")
elif res != 1:
# """if it returns a number greater than one, you're in trouble,
# and you should call it again with exc=NULL to revert the effect"""
ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
raise SystemError("PyThreadState_SetAsyncExc failed")
def stop_thread(thread):
_async_raise(thread.ident, SystemExit)
def test_thread():
i = 0
while True:
print(i+1)
time.sleep(2)
if __name__ == "__main__":
t = threading.Thread(target=test_thread)
t.start()
stop_thread(t)
方法二:使用全局变量控制退出条件
import threading
import time
# 全局变量,用于控制子线程退出
stop_flag = False
# 子线程的执行函数
def child_thread():
print("子线程开始执行")
while not stop_flag:
print("子线程正在运行")
time.sleep(1)
print("子线程退出")
# 创建子线程并启动
thread = threading.Thread(target=child_thread)
thread.start()
# 在主线程中等待一段时间
time.sleep(5)
# 修改全局变量,通知子线程退出
stop_flag = True
# 等待子线程结束
thread.join()
print("主线程结束")
3. ThreadLocal
为了避免多个线程同时修改全局变量,我们就需要对全局变量的修改加锁。
除了对全局变量的修改进行加锁,你可能也想到了可以使用线程自己的局部变量,因为局部变量只有线程自己能看见,对同一进程的其他线程是不可访问的
threading.local()
使用 ThreadLocal 对象来线程绑定自己独有的数据
4. 协程
协程允许有多个入口对程序进行中断、继续执行等操作。
协程的切换由用户自己管理和调度
通过创建协程将异步编程同步化
第三方库 gevent 对协程提供了强大的支持。另外,Python3.5 提供了 async/await 语法来实现对协程的支持。
相比多线程,协程的一大特点就是它在一个线程内执行,既避免了多线程之间切换带来的开销,也避免了对共享资源的访问冲突
5. concurrent.futures模块
异步执行可以由 ThreadPoolExecutor 使用线程或由 ProcessPoolExecutor 使用单独的进程来实现。 两者都是实现抽像类 Executor 定义的接口
5.1. Executor 对象
抽象类Executor提供异步执行调用方法。要通过它的子类调用,而不是直接调用
submit(fn, args, **kwargs)
调度可调用对象 fn
,并返回一个代表该可调用对象的执行的 Future 对象
建议使用 with 语句来搭配执行,否则得手动实现资源的释放
map(fn, *iterables, timeout=None, chunksize=1)
返回一个 map()迭代器,这个迭代器中的回调执行返回的结果是有序的
5.2. ThreadPoolExecutor
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
使用最多 max_workers
个线程的线程池来异步执行调用
thread_name_prefix
参数允许用户控制由线程池创建的线程名称
def get_page(url):
page.get(url)
for li in page.eles('tag:li'):
for img in li.eles('tag:img'):
img_page = img.attr('src')
print(img_page)
page.download(img_page, './img')
def getPic():
start_overall = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
for pageNum in range(1, 5):
if pageNum == 1:
url = 'https://pic.netbian.com/4kdongman/index.html'
else:
url = 'https://pic.netbian.com/4kdongman/index_' + str(pageNum) +'.html'
executor.submit(get_page, url)
end_overall = time.time()
print(f"总耗时 {end_overall - start_overall:.2f} 秒")
5.3. ProcessPoolExecutor
ProcessPoolExecutor 会使用 multiprocessing 模块,这允许它绕过全局解释器锁
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)
使用最多具有 max_workers
个进程的进程池,在 Windows 上,max_workers
必须小于等于 61
,否则将引发ValueError
mp_context
参数允许用户控制由进程池创建给工作者进程的开始方法
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()