三次握手四次挥手
半连接池: 限制的是同一时刻的请求数,而非连接数
这是三次握手
syn_sent是客户端发送请求时的状态
listen是服务端一开始的接听状态
syn_rcvd是服务端收到请求后的状态
established是客户端建立连接后的状态(客户端到服务端这端的管道建立)
eatablished是服务端建立连接后的状态(服务端到客户端这端的管道建立)
seq = x 请求的时候附带的序列号(暗号)
ack = x+1 是回复请求, 并把刚刚拿到的序列号+1
四次挥手
C/S B/S
client<—基于网络通信—>server
browser<—基于网络通信—>server
server端(服务端)必须满足的条件:
1 | 1、稳定运行(网络、硬件、操作系统、服务端应用软件),对外一直提供服务 |
什么是互联网
两大要素:
1 | 1、底层的物理连接介质,是为通信铺好道路的 |
自定义协议(后面将会有自定义报头解决tcp协议的粘包现象)
任何一种通信协议都必须包含两部分:
1 | 1 报头:必须是固定长度(如果不固定长度,会有粘包现象) |
标识地址的方式
1 | ip+mac就能标识全世界范围内独一无二的一台计算机 |
为何建立连接要三次而断开连接却需要四次
1 | 三次握手是为了建立连接,建立连接时并没有数据产生 |
为何tcp协议是可靠协议,而udp协议是不可靠协议
1 | tcp调用的操作系统,操作系统发出数据,接受到对方传来的确认信息时才会清空数据 |
为何tcp协议会有粘包问题?
因为tcp想优化效率,里面有个叫nagle算法.这个算法规定了tcp协议在传输数据的时候会将数据较小,传输间隔较短的多条数据合并成一条发送
而tcp是通过操作系统来发送数据的,操作系统想什么时候发就什么时候发,应用层管不到操作系统, tcp把数据交给操作系统是告诉了操作系统一件事,让操作系统把数据较小,传输间隔较短的多条数据合并成一条发送.就造成了粘包现象模块补充:strcuct模块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import struct
import json
header_dic={
'filename':'a.txt',
'total_size':11112313123212222222222222222222222222222222222222222222222222222221111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111131222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222223,
'hash':'asdf123123x123213x'
}
header_json=json.dumps(header_dic) #将字典序列化成字符串
header_bytes=header_json.encode('utf-8') #转换成bytes
obj=struct.pack('i',len(header_bytes)) #用i模式固定长度(固定的长度为4
print(obj,len(obj))
res=struct.unpack('i',obj) # 用i模式解开obj
print(res)
输出结果如下:
507
b'\xfb\x01\x00\x00' 4
(507,)
模拟ssh远程执行命令
客户端
1
2
3
4
5
6
7
8
9
10
11from socket import *
import struct
import json
phone = socket(AF_INET, SOCK_STREAM)
phone.connect(('127.0.0.1', 8080))
while True:
cmd = input(">>>>").strip()
if not cmd:
continue
phone.send(cmd.encode('utf-8'))
1 | dahler_len = struct.unpack('i', phone.recv(4))[0] # 先接收报头长度 |
服务端
1 | import socket |
进程
1、什么是进程
进程指的就是一个正在运行的程序,或者说是程序的运行过程,即进程是一个抽象的概念
进程是起源于操作系统的,是操作系统最核心的概念,操作系统所有其他的概念都是围绕进程展开的
其中就有了多道技术的来由
用进程就是为了实现并发操作系统(现代操作系统):
操作系统是位于计算机硬件于软件之间的控制程序
作用:
1、将硬件的复杂操作封装成简单的接口,给用户或者应用程序使用
2、将多个应用程序对硬件的竞争变的有序进程
一个正在运行的程序,或者说是一个程序的运行过程串行、并发、并行
串行:一个任务完完整运行完毕,才执行下一个
并发:多个任务看起来是同时运行的,单核就可以实现并发
并行:多个任务是真正意义上的同时运行,只有多核才能实现并行多道技术
背景:想要再单核下实现并发(单核同一时刻只能执行一个任务(每起一个进程就会产生一把GIL全局解释器锁))
并发实现的本质就:切换+保存状态
多道技术:
1、空间上的复用=》多个任务共用一个内存条,但占用内存是彼此隔离的,而且是物理层面隔离的
2、时间上的复用=》多个任务共用同一个cpu
切换:
1、遇到io切换
2、一个任务占用cpu时间过长,或者有另外一个优先级更高的任务抢走的cpu开启进程的两种方式
方式一:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16from multiprocessing import Process
def task(x):
print('%s is running' %x)
time.sleep(3)
print('%s is done' %x)
if __name__ == '__main__':
# Process(target=task,kwargs={'x':'子进程'})
p=Process(target=task,args=('子进程',)) # 如果args=(),括号内只有一个参数,一定记住加逗号
p.start() # 只是在操作系统发送一个开启子进程的信号
print('主')
# 导入from multiprocessing import Process
# 相当于在windows系统中调用了CreateProcess接口
# CreateProcess既处理进程的创建,也负责把正确的程序装入新进程。
# p.start() # 只是在操作系统发送一个开启子进程的信号方式二:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17from multiprocessing import Process
import time
class Myprocess(Process):
def __init__(self,x):
super().__init__()
self.name=x
def run(self):
print('%s is running' %self.name)
time.sleep(3)
print('%s is done' %self.name)
if __name__ == '__main__':
p=Myprocess('子进程1')
p.start() #p.run()
print('主')进程间的内存空间是彼此隔离的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15from multiprocessing import Process
import time
x = 100
def task():
global x
x = 0
print('done')
if __name__ == '__main__':
p = Process(target=task)
p.start()
time.sleep(500) # 让父进程在原地等待,等了500s后,才执行下一行代码
print(x)进程的方法与属性:
join:
让父进程在原地等待,等到子进程运行完毕后(会触发wait功能,将子进程回收掉),才执行下一行代码terminate:
终止进程,应用程序给操作系统发送信号,让操作系统把这个子程序干掉 ,至于多久能干死,在于操作系统什么时候执行这个指令is_alive:
查看子进程是否存在,存在返回True,否则返回Falseos.getpid:
导入os模块,查看自己的门牌号os.getppid:
导入os模块,查看父的门牌号current_process().name:
导入from multiprocessing import Process,current_process
查看子进程的名字
主进程等子进程是因为主进程要给子进程收尸
进程必须等待其内部所有线程都运行完毕才结束孤儿进程:
在父进程被干掉的情况下会编程孤儿进程,无害,会被孤儿院((linux的孤儿院)init)回收
僵尸进程:父进程没死,子进程死了,这时候的子进程就是僵尸进程
正常情况下无害(会调用wait()方法进行回收操作), 父进程无限循环,且不被回收的情况下会无限制的生成子进程从而占用大量的操作系统资源
当操作系统被大量僵尸进程占满内存后,操作系统就无法在启动其他的程序实例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20from multiprocessing import Process,current_process
import time
def task():
print('子进程[%s]运行。。。。' %current_process().name)
time.sleep(2)
if __name__ == '__main__':
p1=Process(target=task,name='子进程1')
p1.start()
# print(p1.is_alive())
# p1.join()
# print(p1.is_alive())
p1.terminate() # 终止进程,应用程序给操作系统发送信号,让操作系统把这个子程序干掉
# 至于多久能干死,在于操作系统什么时候执行这个指令
time.sleep(1)
print(p1.is_alive()) # 查看子进程是否存在,有返回值. True则存在,False则不存在
print('主')守护进程
1、守护进程
守护进程其实就是一个“子进程”
守护进程会伴随主进程的代码运行完毕后而死掉
进程:
当父进程需要将一个任务并发出去执行,需要将该任务放到一个子进程里
守护:
当该子进程内的代码在父进程代码运行完毕后就没有存在的意义了,就应该
将该子进程设置为守护进程,会在父进程代码结束后死掉
实例:
from multiprocessing import Process
import time,os
1 | def task(name): |
互斥锁
互斥锁:可以将要执行任务的部分代码(只涉及到修改共享数据的代码)变成串行
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43from multiprocessing import Process,Lock # Lock 互斥锁模块
import json
import os
import time
import random
def check():
time.sleep(1) # 模拟网路延迟
with open('db.txt','rt',encoding='utf-8') as f:
dic=json.load(f)
print('%s 查看到剩余票数 [%s]' %(os.getpid(),dic['count']))
def get():
with open('db.txt','rt',encoding='utf-8') as f:
dic=json.load(f)
time.sleep(2)
if dic['count'] > 0:
# 有票
dic['count']-=1
time.sleep(random.randint(1,3))
with open('db.txt','wt',encoding='utf-8') as f:
json.dump(dic,f)
print('%s 购票成功' %os.getpid())
else:
print('%s 没有余票' %os.getpid())
def task(mutex):
# 查票
check()
#购票
mutex.acquire() # 互斥锁不能连续的acquire,必须是release以后才能重新acquire
get()
mutex.release() # 关闭互斥锁
# with mutex: # 开启与关闭互斥锁一种简单的写法
# get()
if __name__ == '__main__':
mutex=Lock()
for i in range(10):
p=Process(target=task,args=(mutex,))
p.start()
IPC机制:进程间通信,有两种实现方式
1、pipe:管道(前面已经说过)
2、queue:pipe+锁
1 | from multiprocessing import Queue |
了解点
q=Queue(3) #先进先出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37# q.put('first',block=True,timeout=3)
# q.put({'k':'sencond'},block=True,timeout=3)
# q.put(['third',],block=True,timeout=3)
# print('===>')
# # q.put(4,block=True,timeout=3)
#
#
# print(q.get(block=True,timeout=3))
# print(q.get(block=True,timeout=3))
# print(q.get(block=True,timeout=3))
# print(q.get(block=True,timeout=3))
# q=Queue(3) #先进先出
# q.put('first',block=False,)
# q.put({'k':'sencond'},block=False,)
# q.put(['third',],block=False,)
# print('===>')
# # q.put(4,block=False,) # 队列满了直接抛出异常,不会阻塞
#
# print(q.get(block=False))
# print(q.get(block=False))
# print(q.get(block=False))
# print('get over')
# print(q.get(block=False))
#
q=Queue(3) #先进先出
q.put_nowait('first') #q.put('first',block=False,)
q.put_nowait(2)
q.put_nowait(3)
# q.put_nowait(4) # 队列满了不会等待,直接抛出异常
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait()) # 队列里没有数据了不会等待,直接抛出异常生产者与消费者模型
1 什么是生产者消费者模型
生产者:比喻的是程序中负责产生数据的任务
消费者:比喻的是程序中负责处理数据的任务1
生产者->共享的介质(队列)<-消费者
2 为何用
实现了生产者与消费者的解耦和,生产者可以不停地生产,消费者也可以不停地消费
从而平衡了生产者的生产能力与消费者消费能力,提升了程序整体运行的效率1
2
3什么时候用?
当我们的程序中存在明显的两类任务,一类负责产生数据,另外一类负责处理数据
此时就应该考虑使用生产者消费者模型来提升程序的效率实例:
from multiprocessing import Queue,Process
import time
import os
import randomdef producer(q):
for i in range(10):
res=’包子%s’ %i
time.sleep(random.randint(1,3))往队列里丢
1
2
3q.put(res)
print('\033[45m%s 生产了 %s[0m' %(os.getpid(),res))
q.put(None)def consumer(q):
while True:
#从队列里取走
res=q.get()
if res is None:break
time.sleep(random.randint(1,3))
print(‘\033[46m%s 吃了 %s[0m’ %(os.getpid(),res))if name == ‘main‘:
q=Queue()
2、为何要有GIL
因为Cpython解释器自带垃圾回收机制不是线程安全的,也就是说如果没有GIL的情况下,在给一个值
赋值的情况下如果被垃圾回收机制回收了,那就会出现错误3 有两种并发解决方案:
多进程:计算密集型
多线程:IO密集型(因为我们以后用的都是基于网络通信的套接字,而基于网络通信就存在大量IO,于是我们用的最多的都是多线程的方式)IO密集型实例:
from multiprocessing import Process
from threading import Thread
import os,time
def work1():
time.sleep(5)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34def work2():
time.sleep(5)
def work3():
time.sleep(5)
def work4():
time.sleep(5)
if __name__ == '__main__':
l=[]
# print(os.cpu_count()) #本机为4核
start=time.time()
#p1=Process(target=work1) # 多进程
#p2=Process(target=work2)
#p3=Process(target=work3)
#p4=Process(target=work4)
p1=Thread(target=work1) # 多线程
p2=Thread(target=work2)
p3=Thread(target=work3)
p4=Thread(target=work4)
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
stop=time.time()
print('run time is %s' %(stop-start)) #run time is 5.162574291229248
# run time is 5.002141714096069计算密集型实例:
from multiprocessing import Process
from threading import Thread
import os,time
def work1():
res=0
for i in range(100000000):
res*=i1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40def work2():
res=0
for i in range(100000000):
res*=i
def work3():
res=0
for i in range(100000000):
res*=i
def work4():
res=0
for i in range(100000000):
res*=i
if __name__ == '__main__':
l=[]
# print(os.cpu_count()) #本机为4核
start=time.time()
# p1=Process(target=work1) # 开启多进程
# p2=Process(target=work2)
# p3=Process(target=work3)
# p4=Process(target=work4)
p1=Thread(target=work1) # 开启多线程
p2=Thread(target=work2)
p3=Thread(target=work3)
p4=Thread(target=work4)
p1.start()
p2.start()
p3.start()
p4.start()
p1.join()
p2.join()
p3.join()
p4.join()
stop=time.time()
print('run time is %s' %(stop-start)) #6.484470367431641(多线程的运行时间)
# run time is 17.391708850860596 # 多进程的运行时间定时器: 多久时间后运行
实例如下:
1
2
3
4
5
6
7
8
9
10from threading import Timer,current_thread
def task(x):
print('%s run....' %x)
print(current_thread().name)
if __name__ == '__main__':
t=Timer(3,task,args=(10,)) # 第一个参数指定的是多久以后执行,第二个参数是执行的函数
# 第三个是函数需要传参的参数(括号里必须是元组)
t.start()
print('主')线程queque:import queue
队列:先进先出
q=queue.Queue(3)
q.put(1)
q.put(2)
q.put(3)1
2
3print(q.get())
print(q.get())
print(q.get())堆栈:先进后出
q=queue.LifoQueue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())优先级队列:优先级高先出来,数字越小,优先级越高
q=queue.PriorityQueue()
q.put((3,’data1’))
q.put((-10,’data2’))
q.put((11,’data3’))1
2
3
4
5
6
7print(q.get())
print(q.get())
print(q.get())
输出结果如下:
(-10, 'data2')
(3, 'data1')
(11, 'data3')进程池与线程池
池的功能是限制启动的进程数或线程数,
什么时候应该限制???
当并发的任务数远远超过了计算机的承受能力时,即无法一次性开启过多的进程数或线程数时
就应该用池的概念将开启的进程数或线程数限制在计算机可承受的范围内同步:提交完任务后(用’串行’的方式运行)就在原地等待,直到任务运行完毕后拿到任务的返回值,再继续运行下一行代码
实例如下:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os
import time
import random1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27def task(n):
print('%s run...' %os.getpid())
time.sleep(10)
return n**2
def parse(res):
print('...')
if __name__ == '__main__':
pool=ProcessPoolExecutor(4)
# pool.submit(task,1)
# pool.submit(task,2)
# pool.submit(task,3)
# pool.submit(task,4)
l=[]
for i in range(1,5):
future=pool.submit(task,i)
l.append(future)
# print(future)
# print(future.result())
pool.shutdown(wait=True) #shutdown关闭进程池的入口
for future in l:
# print(future.result())
parse(future.result())
print('主')异步:提交完任务(绑定一个回调函数)后根本就不在原地等待,直接运行下一行代码,等到任务有返回值后会自动触发回调函数
用多进程实现异步:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22def task(n):
print('%s run...' %os.getpid())
time.sleep(5)
return n**2
def parse(future):
time.sleep(1)
res=future.result()
print('%s 处理了 %s' %(os.getpid(),res))
if __name__ == '__main__':
pool=ProcessPoolExecutor(4)
start=time.time()
for i in range(1,5):
future=pool.submit(task,i)
future.add_done_callback(parse) # parse会在futrue有返回值时立刻触发,
# 并且将future当作参数传给parse,
# 由主进程来回调函数,运行时间为 9.163417339324951
pool.shutdown(wait=True)
stop=time.time()
print('主',os.getpid(),(stop - start))用多线程实现异步:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread
import os
import time
import random1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20def task(n):
print('%s run...' %current_thread().name)
time.sleep(5)
return n**2
def parse(future):
time.sleep(1)
res=future.result()
print('%s 处理了 %s' %(current_thread().name,res))
if __name__ == '__main__':
pool=ThreadPoolExecutor(4)
start=time.time()
for i in range(1,5):
future=pool.submit(task,i)
future.add_done_callback(parse) # parse会在futrue有返回值时立刻触发,
# 并且将future当作参数传给parse
pool.shutdown(wait=True) # 哪个线程有时间了哪个线程就取回调函数,运行时间为 6.003463268280029
stop=time.time()
print('主',current_thread().name,(stop - start))协程:只有遇到io才切换到其他任务的协程才能提升单线程的执行效率
1、协程只有遇到io才切换到其他任务的协程才能提升
单线程实现并发
在应用程序里控制多个任务的切换+保存状态(协程只有在IO在单线程下切换到另外一个任务才能提升效率)
优点:
应用程序级别切换的速度要远远高于操作系统的切换
缺点:
多个任务一旦有一个阻塞没有切,整个线程都阻塞在原地
该线程内的其他的任务都不能执行了1
2
3一旦引入协程,就需要检测单线程下所有的IO行为,
实现遇到IO就切换,少一个都不行,因为一旦一个任务阻塞了,整个线程就阻塞了,
其他的任务即便是可以计算,但是也无法运行了2、协程序的目的:
想要在单线程下实现并发
并发指的是多个任务看起来是同时运行的
并发=切换+保存状态
实例: 协程 服务端: pip3 install gevent
from gevent import spawn,monkey;monkey.patch_all() # 相当于给下面的代码都打上标记,就都能识别
from socket import *
from threading import Thread1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23def talk(conn):
while True:
try:
data=conn.recv(1024)
if len(data) == 0:break
conn.send(data.upper())
except ConnectionResetError:
break
conn.close()
def server(ip,port,backlog=5):
server = socket(AF_INET, SOCK_STREAM)
server.bind((ip, port))
server.listen(backlog)
print('starting...')
while True:
conn, addr = server.accept()
spawn(talk, conn,) # 起一个协程,只要这个协程不死掉,进程也不会结束
if __name__ == '__main__':
g=spawn(server,'127.0.0.1',8080) # 起一个进程
g.join() # 只要这个协程不死掉,进程也不会结束客户端:
from threading import Thread,current_thread
from socket import *
import os1
2
3
4
5
6
7
8
9
10
11
12
13
14def task():
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg='%s say hello' %current_thread().name
client.send(msg.encode('utf-8'))
data=client.recv(1024)
print(data.decode('utf-8'))
if __name__ == '__main__':
for i in range(500):
t=Thread(target=task)
t.start()IO模型
网络IO:
recvfrom:
wait data:
conn.recv(1024)等待客户端产生数据——》客户端OS–》网络–》服务端操作系统缓存
copy data:由本地操作系统缓存中的数据拷贝到应用程序的内存中send:
也要经历copy data这个过程,从应用程序拷贝到本地操作系统中,在由操作系统
调用网卡基于网络传输给服务端的网卡,传输到服务端的操作系统,对方的操作系统在
从本地操作系统缓存中拷贝数据到应用程序的内存中阻塞IO模型: 网络IO就是阻塞IO
非阻塞IO模型:
用户进程不断的主动询问操作系统数据准备好了没有
服务端:
from socket import *
import time缺陷在于如果这个列表里有1亿个连接,那么就会让客户明显的感觉到等待
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28# 对CPU无效的占有率够高
server = socket(AF_INET, SOCK_STREAM) # 绑定协议(tcp)
server.bind(('127.0.0.1',8080)) # 绑定IP 和 port
server.listen(5) # 半连接池: 限制的是请求数而非连接数
server.setblocking(False) # 让操作系统给一个回复信息
conn_l=[]
while True:
try:
print('总连接数[%s]' % len(conn_l))
conn,addr=server.accept()
conn_l.append(conn) # 把每个建立的连接放到列表里面
except BlockingIOError: # 如果没有数据产生就会报错,所有我们需要用捕捉异常让它不因为报错而停止运行
del_l=[]
for conn in conn_l: # 从conn_l列表里遍历出来通讯
try:
data=conn.recv(1024)
if len(data) == 0:
del_l.append(conn) # 把遍历出来断了连接的连接加入到del_l列表里面
continue
conn.send(data.upper())
except BlockingIOError: # 需要捕捉异常
pass
except ConnectionResetError: # 把遍历出来断了连接的连接加入到del_l列表里面
del_l.append(conn)
for conn in del_l: # 遍历del_l列表,把断开的连接一个一个的删除掉
conn_l.remove(conn)客户端:
from socket import *
import os1
2
3
4
5
6
7
8client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg='%s say hello' %os.getpid()
client.send(msg.encode('utf-8'))
data=client.recv(1024)
print(data.decode('utf-8'))
多路复用IO模型
当用户进程调用了select,那么整个进程会被block,而同时,操作系统会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从操作系统拷贝到用户进程。
服务端:
1 | from socket import * |
客户端:
1 | from socket import * |
异步IO
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从操作系统的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,操作系统会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,操作系统会给用户进程发送一个signal,告诉它read操作完成了
IO分两阶段:
1.数据准备阶段
2.内核空间复制回用户进程缓冲区阶段