Skip to main content

python实现——银行家算法、死锁检测与解除、动态内存分配

写了一周吧,代码传到github上了 mycourse/操作系统 at main · seldomlee/mycourse (github.com)

# 上面播放器的代码如下(仅网易云外链链接,其他播放器请自行百度)
<iframe frameborder="no" border="0" marginwidth="0" marginheight="0" width="330" height="86" src="//music.163.com/outchain/player?type=2&id=1495828667&auto=0&height=66"></iframe>
# width(宽度) ; height(高度)
# type = 歌曲(1) | 歌单(2) | 电台(3)
# id = 歌曲ID号
# auto = 自动播放(1) | 手动播放(0)

死锁

银行家算法和死锁的检测与解除实际是同一部分,都在死锁那一章的内容里,这里就放一起写了

这里简单过一下死锁:

  1. 死锁,简单来说就是多个进程竞争资源导致的僵局(互相等待),若无外力作用,这些进程都将无法向前推进。

    举个例子:

    我和舍友a、b都躺在床上: 舍友a说我起床了他才会起床 舍友b说舍友a起床他才会起床 我说舍友b起床我才会起床 这样就形成了一个互相等待的僵局,结果就是:除非有外力作用,否则大家都不会起床

  2. 死锁的产生原因:

    • 竞争系统资源 (只有对不可剥夺资源的竞争 才可能产生死锁,对可剥夺资源的竞争是不会引起死锁的)
      • 竞争不可抢占资源: 系统中的不可抢占资源不足以满足多个进程运行的需要,使得进程在运行过程中因为抢夺资源陷入死锁
      • 竞争可消耗资源: p1产生信息m1发送给p2,并从p3接收信息m3 p2产生信息m2发送给p3,并从p1接收信息m1 p3产生信息m3发送给p1,并从p2接收信息m2 假如他们都是先发送消息,再接收,就不会产生死锁; 而如果他们先执行接收操作,再执行发送操作;就会永远阻塞在接收操作上等待一条永远不会发送的消息
    • 进程推进顺序不当: 进程在运行过程中,请求或释放的顺序不当也可能产生死锁
  3. 死锁产生的必要条件:(产生死锁必须同时满足以下四个条件才会产生)

    • 互斥条件:在一段时间内某资源仅为一个进程所占有。此时若有其他进程请求该资源,则请求进程只能等待。
    • 不剥夺/不可抢占条件:进程所获得的资源在未使用完毕之前,不能被其他进程强行夺走(即只能由该进程自己释放)
    • 请求和保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
    • 循坏等待/环路条件:存在循环等待链,其中,每个进程都在等待链中等待下一个进程所持有的资源,造成这组进程处于永远等待状态。
  4. 死锁的处理策略:

    • 预防死锁:

      ​ 设置某些限制条件,破坏产生死锁的四个必要条件中的一个或几个,以防止发生死锁。 ​ 要注意的是互斥条件是非共享设备所必须的,不能破坏,还应当加以保证,因此主要是破坏产生死锁后的三个条件

    • 避免死锁:

      ​ 在资源的动态分配过程中,用某种方法防止系统进入不安全状态,从而避免死锁。

    • 死锁的检测及解除:

      ​ 通过系统的检测机构及时 地检测出死锁的发生,然后釆取某种措施解除死锁。

避免死锁-银行家算法

银行家算法是最著名的死锁避免算法。

当一个进程申请使用资源的时候,银行家算法通过先 试探 分配给该进程资源,然后通过安全性算法判断分配后的系统是否处于安全状态,若不安全则试探分配作废,让该进程继续等待。

简单来说就是给进程分配资源前判断此次分配是否会让系统处于不安全状态,不会就分配,会则等待。

具体内容和数据结构分析可以看这里:死锁预防和死锁避免_C语言中文网

这里直接贴代码了

#-- coding:UTF-8 --
# author:Na0h

import numpy as np

# 打印表格
def printTable(available, max, allocation, need):
n = allocation.shape[0]
print("=="*30)
print("[*]进程\tMax\t\tAllo\tNeed")
for i in range(n):
print(f"[*]P{i}\t{max[i]}\t{allocation[i]}\t{need[i]}")
print("[*]当前剩余资源:", available)

# 安全性算法
def safe(work,need,allocation):
n = need.shape[0]
finish = np.array([0] * n, dtype=int)

a = []
j = 0

while not(finish.all()):
# flag用于标记是否找到满足条件的进程
flag = 0
for i in range(n):
if not finish[i] and (need[i] <= work).all():
a.insert(j, i)
j += 1
flag = 1

work += allocation[i]
finish[i] = 1
break

if not flag:
return 0

print("==" * 30)
print("[*]存在安全序列:", end='')
for i in a:
if i == len(a) - 1:
print(f"P{i}")
else:
print(f"P{i}", end='->')
print("[*]当前不存在死锁")
return 1


def main():
# 资源总数m
m = int(input("[+]资源种类数目 m: "))

# 可用资源向量(资源池)——(available)
temp = input("[+]输入资源池资源(用空格隔开):").split()
available = np.array(temp, dtype=int)

# 进程数n
n = int(input("[+]进程数 n: "))

# 最大需求资源——(max)
max = np.zeros([n, m], dtype=int)
i = 0
while i < n:
temp = input(f"[+]输入进程 p{i} 最大需求资源(max): ").split()
max[i] = np.array(temp, dtype=int)
if(available < max[i]).any():
print("[*]错误输入,请重试")
i -= 1
i += 1


# 分配给该进程的资源——(allocation)
allocation = np.zeros([n, m], dtype=int)
i = 0
while i < n:
temp = input("[+]输入进程 P{} 的已分配资源(allocation):".format(i)).split()
allocation[i] = np.array(temp, dtype=int)
if(max[i] < allocation[i]).any():
print("[*]错误输入,请重试")
i -= 1
i += 1


# 计算需求资源数量
need = max - allocation

# 计算出剩余资源
for i in allocation:
available -= i

printTable(available, max, allocation, need)

# 银行家算法部分:循环获取资源请求,然后对资源请求进行判断,判断该请求的合法性以及安全性
while (need != 0).any():
process_id, req = input("[+]请求资源 (eg:P1, 1 0 1): ").split(',')
process_id = int(process_id[1:])
req = np.array(req.split(), dtype=int)
if (req > max[process_id]).any():
print("[*]错误输入,请重试")


# 安全性判断
else:
available -= req
allocation[process_id] += req
need[process_id] -= req
if safe(available.copy(), need, allocation):
printTable(available, max, allocation, need)
continue
else:
print("[*]不安全,分配失败")
available += req
allocation[process_id] -= req
need[process_id] += req

if __name__ == '__main__':
main()

死锁的检测和解除

死锁预防和避免算法,都是在为进程分配资源时施加限制条件或进行检测,若系统为进程分配资源时不釆取任何措施,则应该提供死锁检测和解除的手段。

检测可以参考这里死锁的检测和解除_C语言中文网

死锁的解除主要有这几种方法:

  1. 资源剥夺法:挂起某些死锁进程,并抢占它的资源,将这些资源分配给其他的死锁进程。但应防止被挂起的进程长时间得不到资源,而处于资源匮乏的状态。
  2. 撤销进程法:强制撤销部分、甚至全部死锁进程并剥夺这些进程的资源。撤销的原则可以按进程优先级和撤销进程代价的高低进行。
  3. 进程回退法:让一(多)个进程回退到足以回避死锁的地步,进程回退时自愿释放资源而不是被剥夺。要求系统保持进程的历史信息,设置还原点。

这里俺用的是资源剥夺法

#-- coding:UTF-8 --
# author:Na0h

import numpy as np

# 打印表格
def printTable(available, max, allocation, need):
n = allocation.shape[0]
print("=="*30)
print("[*]进程\tMax\t\tAllo\tNeed")
for i in range(n):
print(f"[*]P{i}\t{max[i]}\t{allocation[i]}\t{need[i]}")
print("[*]当前剩余资源:", available)

# 安全性算法
def safe(work,need,allocation):
n = need.shape[0]
finish = np.array([0] * n, dtype=int)

a = []
j = 0

while not(finish.all()):
# flag用于标记是否找到满足条件的进程
flag = 0
for i in range(n):
if not finish[i] and (need[i] <= work).all():
a.insert(j, i)
j += 1
flag = 1

work += allocation[i]
finish[i] = 1
break

if not flag:
return 0

print("==" * 30)
print("[*]存在安全序列:", end='')
for i in a:
if i == len(a) - 1:
print(f"P{i}")
else:
print(f"P{i}", end='->')
print("[*]当前不存在死锁")
return 1


# 利用资源抢占的方式解除死锁
def comproc(allocation, available, need, max):
n = allocation.shape[0]
m = allocation[0].shape[0]

flag = 0

for i in range(n):
for j in range(m):
process_id, req = i, allocation[i]

if need[i][j] > available[j]:
# 安全性判断
available += req
allocation[process_id] -= req
need[process_id] = max[process_id]-allocation[process_id]

flag = 1
if safe(available.copy(), need, allocation):
print(f"[*]可行,撤销P{process_id}资源以解除死锁")
print("==" * 30)
print("[*]进程\tMax\tAllo\tNeed")
for k in range(n):
print(f"[*]P{k}\t{max[k]}\t{allocation[k]}\t{need[k]}")
print("[*]当前剩余资源:", available)

return available, allocation, need
else:
comproc(allocation, available, need, max)

elif not allocation.any():
if flag:
available -= req
allocation[process_id] += req
need[process_id] = max[process_id]-allocation[process_id]
print("[*]无法解除")
return available, allocation, need


def main():
# 资源总数m
m = int(input("[+]资源种类数目 m: "))

# 可用资源向量(资源池)——(available)
temp = input("[+]输入资源池资源(用空格隔开):").split()
available = np.array(temp, dtype=int)

# 进程数n
n = int(input("[+]进程数 n: "))

# 最大需求资源——(max)
max = np.zeros([n, m], dtype=int)
i = 0
while i < n:
temp = input(f"[+]输入进程 p{i} 最大需求资源(max): ").split()
max[i] = np.array(temp, dtype=int)
if(available < max[i]).any():
print("[*]错误输入,请重试")
i -= 1
i += 1

# 分配给该进程的资源——(allocation)
allocation = np.zeros([n, m], dtype=int)
i = 0
while i < n:
temp = input("[+]输入进程 P{} 的已分配资源(allocation):".format(i)).split()
allocation[i] = np.array(temp, dtype=int)
if(max[i] < allocation[i]).any():
print("[*]错误输入,请重试")
i -= 1
i += 1



# 计算需求资源数量
need = max - allocation

# 计算出剩余资源
for i in allocation:
available -= i

printTable(available, max, allocation, need)

while (need != 0).any():
# 进行选择
print("==" * 30)
print("[*]1.请求资源\r\n[*]2.死锁检测\r\n[*]3.输出当前资源状态")
chose = input("[+]你选择:")

if chose == '1':
process_id, req = input("[+]请求资源 (eg:P1, 1 0 1): ").split(',')
process_id = int(process_id[1:])
req = np.array(req.split(), dtype=int)
if (req > max[process_id]).any():
print("[*]错误输入,请重试")

else: # 安全性判断
available -= req
allocation[process_id] += req
need[process_id] -= req
if safe(available.copy(), need, allocation):
printTable(available, max, allocation, need)
continue
else:
print("[*]不安全,分配失败")
available += req
allocation[process_id] -= req
need[process_id] += req

elif chose == '2':
if safe(available.copy(), need, allocation):
continue
else:
print("[*]存在死锁")
print("==" * 30)
print("[*]是否要抢占资源以解除死锁?\r\n[*]1.是\r\n[*]2.输入其他则返回上一级")
chose = input("[+]你选择:")
if chose == '1':
available, allocation, need = comproc(allocation.copy(), available.copy(), need.copy(), max)
else:
continue

elif chose == '3':
printTable(available, max, allocation, need)

else:
continue


if __name__ == '__main__':
main()

动态内存分配

动态分区分配,又称可变分区分配,指的是根据进程的实际需要,动态地为之分配内存空间,以提高内存空间利用率,降低碎片的大小。

动态分区分配算法

首次适应算法(First Fit-FF)

空闲分区以地址递增的次序链接。分配内存时顺序查找,找到大小满足要求的第一个空闲分区就进行分配。

最佳适应算法(Best Fit-BF)

空闲分区按容量递增形成分区链,找到第一个能满足要求的空闲分区就进行分配,即优先使用更小的分区以保留更多大分区。

最坏适应算法(Worst Fit-WF)

空闲分区以容量递减的次序链接,找到第一个能满足要求的空闲分区(也就是最大的分区)就进行分配,即优先使用更大的分区,以防产生太小的不可用的碎片分区。

数据结构定义

定义一个类来表示分区:

class Memory(object):
def __init__(self, start, end, length, state=1, ID=0):
self.Id = ID # 分区号
self.start = start # 分区起始地址
self.end = end # 分区结束地址
self.length = length # 分区大小
self.state = state # 为1:表示内存未分配

最佳最坏可以利用排序算法实现,这里俺就用冒泡了

#-- coding:UTF-8 --
# author:Na0h

class Memory(object):
def __init__(self, start, end, length, state=1, ID=0 ): # 类的初始化方法,当创建这个类的实例时就会调用该方法
self.Id = ID # 分区号 (ID为0是未分配,其余为任务编号
self.start = start # 分区起始地址
self.end = end # 分区结束地址
self.length = length # 分区大小
self.state = state # 为1:表示内存未分配


def printmemory(list): # 输出内存分配情况
print("[*]分区号\t起始地址\t结束地址\t分区大小\t分配状态")
for i in range(len(list)):
if list[i].state == 1:
st = "空闲"
else:
st = "已分配"
print(f"[*]{list[i].Id}\t\t{list[i].start}\t\t{list[i].end}\t\t{list[i].length}\t\t{st}")


def up_bsort(list): # 正序冒泡排序
for i in range(len(list)):
for j in range(i + 1, len(list)):
if list[i].length < list[j].length:
list[i], list[j] = list[j], list[i]
return list


def low_bsort(list): # 降序冒泡排序
for i in range(len(list)):
for j in range(i + 1, len(list)):
if list[i].length > list[j].length:
list[i], list[j] = list[j], list[i]
return list


def freem(id, list): # 回收空间
for i in range(len(list)):
if list[i].Id == id:
list[i].state = 1
list[i].Id = 0
t = i
break
# 向前合并
if t - 1 > 0:
if list[t-1].state == 1:
a = Memory(list[t-1].start, list[t-1].end, list[t-1].length + list[t].length, 1, 0)
del list[t-1]
del list[t-1]
list.insert(t-1,a)
t = t-1
if t + 1 < len(list):
if list[t+1].state == 1:
a = Memory(list[t].start, list[t + 1].end, list[t].length + list[t+1].length, 1, 0)
del list[t]
del list[t]
list.insert(t, a)
printmemory(list)


# 动态分区分配算法

def FF(id, length, list): # 首次适应算法
for i in list:
if i.Id == id: # 检测是否已存在该作业号
print(f"[*]已存在作业{i.Id}")
return
for i in range(len(list)):
if list[i].state == 1 and list[i].length > length: # 该分区未被分配,且未分配内存大小大于请求大小
m1 = Memory(list[i].start, list[i].start + length - 1, length, state=0, ID=id) # 被分配的分区
m2 = Memory(list[i].start + length, list[i].end, list[i].length - length, 1, 0) # 分配完毕后的空闲分区
del list[i]
list.insert(i, m2)
list.insert(i, m1)
printmemory(list)
return
if list[i].state == 1 and list[i].length == length:
list[i].state = 0
printmemory(list)
return
print("[*]空间不足")

def BF(id, length, list): # 最佳适应算法
for i in list:
if i.Id == id: # 检测是否已存在该作业号
print(f"[*]已存在作业{i.Id}")
return

# 找满足要求的空闲分区
m = up_bsort(list.copy())
f1, f2 = -1, -1
for i in range(len(m)):
if m[i].state == 1 and m[i].length > length:
f1 = m[i].start
elif m[i].state == 1 and m[i].length == length:
f2 = m[i].start
if f1 == -1 and f2 == -1:
print("[*]空间不足")
return

for i in range(len(list)):
if list[i].start == f1:
m1 = Memory(list[i].start, list[i].start + length - 1, length, state=0, ID=id) # 被分配的分区
m2 = Memory(list[i].start + length, list[i].end, list[i].length - length, 1, 0) # 分配完毕后的空闲分区
del list[i]
list.insert(i, m2)
list.insert(i, m1)
printmemory(list)
return
elif list[i].start == f2:
list[i].state = 0
printmemory(list)
return

def WF(id, length, list): # 最坏适应算法
for i in list:
if i.Id == id: # 检测是否已存在该作业号
print(f"[*]已存在作业{i.Id}")
return

# 找满足要求的空闲分区
m = low_bsort(list.copy())
f1, f2 = -1, -1
for i in range(len(m)):
if m[i].state == 1 and m[i].length > length:
f1 = m[i].start
elif m[i].state == 1 and m[i].length == length:
f2 = m[i].start
if f1 == -1 and f2 == -1:
print("[*]空间不足")
return

for i in range(len(list)):
if list[i].start == f1:
m1 = Memory(list[i].start, list[i].start + length - 1, length, state=0, ID=id) # 被分配的分区
m2 = Memory(list[i].start + length, list[i].end, list[i].length - length, 1, 0) # 分配完毕后的空闲分区
del list[i]
list.insert(i, m2)
list.insert(i, m1)
printmemory(list)
return
elif list[i].start == f2:
list[i].state = 0
printmemory(list)
return

def main():
size = int(input("[+]输入内存大小: "))
a = Memory(0, size - 1, size, state=1, ID=0)
b = []
b.append(a)
while True:
print("[*]1.初始化内存空间\r\n[*]2.分配空间\r\n[*]3.回收\r\n[*]4.查看当前内存分配情况\r\n")
chose = input("[+]你选择: ")
if chose == '1':
print("[+]输入内存大小: ")
size = int(input())
a = Memory(0, size - 1, size, state=1, ID=0)
b = []
b.append(a)
elif chose == '2':
print("[*]1.首次适应算法:FF\r\n[*]2.最佳适应算法:BF\r\n[*]3.最坏适应算法:WF\r\n[*]其他则返回上一界面")
x = input("[+]请输入分配执行的算法: ")
x = float(x)
flag = 'y'
while (flag == 'y'):
if x == 1:
work_size = input('[+]请输入作业id和大小: ').split()
FF(work_size[0], int(work_size[1]), b)
flag = input('[+]是否继续y/n: ')
elif x == 2:
work_size = input('[+]请输入作业id和大小: ').split()
BF(work_size[0], int(work_size[1]), b)
flag = input('[+]是否继续y/n: ')
elif x == 3:
work_size = input('[+]请输入作业id和大小: ').split()
WF(work_size[0], int(work_size[1]), b)
flag = input('[+]是否继续y/n: ')
else:
break
elif chose == '3':
id_delete = input('[+]请输入删除作业id: ')
freem(id_delete, b)
elif chose == '4':
printmemory(b)

if __name__ == '__main__':
main()