MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

如何预防操作系统中的死锁

2021-05-312.1k 阅读

死锁的定义与产生条件

死锁的定义

在操作系统中,死锁指的是两个或多个进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,这些进程都将无法推进。例如,进程 A 持有资源 R1 并请求资源 R2,而进程 B 持有资源 R2 并请求资源 R1,此时 A 和 B 相互等待对方释放资源,从而陷入死锁状态。这种情况就如同两条相向而行的船在狭窄河道相遇,双方都不愿后退,导致谁都无法继续前行。

死锁产生的四个必要条件

  1. 互斥条件:资源在某一时刻只能被一个进程所使用。例如打印机,同一时间只能有一个进程向其发送打印任务,其他进程若要使用必须等待。这是资源的固有属性,很多资源本身就具有这种特性,如物理设备等。
  2. 占有并等待条件:进程已经占有了至少一个资源,但又提出了新的资源请求,而该资源被其他进程占有,此时请求进程被阻塞,但对自己已获得的资源保持不放。比如一个进程已经获取了内存空间,在运行过程中又请求 CPU 时间片,若 CPU 正被其他进程占用,此进程就会等待,同时不会释放已占有的内存。
  3. 不可剥夺条件:进程已获得的资源,在未使用完之前,不能被其他进程强行剥夺,只能由获得该资源的进程自己释放。例如,一个进程获得了一个锁资源用于保护共享数据,在它完成对共享数据的操作并释放锁之前,其他进程无法强行拿走这个锁。
  4. 循环等待条件:存在一个进程 - 资源的循环链,链中的每一个进程都在等待下一个进程所占有的资源。假设有进程 P1、P2、P3,P1 等待 P2 占有的资源 R1,P2 等待 P3 占有的资源 R2,P3 等待 P1 占有的资源 R3,这样就形成了一个循环等待的死锁环。

死锁预防的方法

破坏互斥条件

在某些情况下,可以通过允许资源共享来破坏互斥条件。然而,并非所有资源都能实现共享,像打印机这类设备,由于其物理特性决定了同一时刻只能处理一个打印任务,无法通过共享来避免死锁。但对于一些数据资源,例如可重入代码,多个进程可以同时访问,不会产生数据不一致的问题,从而避免了因互斥访问导致的死锁。

例如,在一个多线程的程序中,如果有一段纯计算的代码,不涉及对共享数据的修改,那么这段代码就是可重入的。多个线程可以同时调用这段代码,而不会因为互斥访问的问题陷入死锁。以下是一个简单的 C 语言示例:

#include <stdio.h>
#include <pthread.h>

// 可重入函数
void* compute(void* arg) {
    int i;
    for (i = 0; i < 1000000; i++) {
        // 这里只是进行简单计算,不涉及共享数据
        int result = i * i;
    }
    return NULL;
}

int main() {
    pthread_t thread1, thread2;
    // 创建两个线程,同时调用可重入函数
    pthread_create(&thread1, NULL, compute, NULL);
    pthread_create(&thread2, NULL, compute, NULL);

    // 等待线程结束
    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    return 0;
}

在这个例子中,compute 函数是可重入的,多个线程可以同时执行它,不会因为资源的互斥访问而产生死锁。

破坏占有并等待条件

  1. 静态分配策略:进程在运行前一次性申请它所需要的所有资源。只有当系统能够满足进程的全部资源需求时,才将这些资源分配给该进程,进程才开始执行。这样就不会出现进程在运行过程中因请求新资源而等待,同时又占有已分配资源的情况。例如,一个多媒体处理程序,它在运行前需要申请 CPU 时间片、内存空间、显卡资源等。如果系统能够一次性满足它对这些资源的需求,就将资源分配给它,程序开始运行。在运行过程中,它不会再请求新的资源,也就避免了占有并等待条件导致的死锁。

下面是一个简单的模拟静态分配资源的代码示例(使用 C 语言和自定义的资源分配结构体):

#include <stdio.h>
#include <stdbool.h>

#define MAX_PROCESSES 3
#define MAX_RESOURCES 3

// 资源结构体
typedef struct {
    int available[MAX_RESOURCES];
    int allocated[MAX_PROCESSES][MAX_RESOURCES];
    int maximum[MAX_PROCESSES][MAX_RESOURCES];
} ResourceSystem;

// 检查是否可以分配资源
bool canAllocate(ResourceSystem* rs, int process, int request[]) {
    int i;
    for (i = 0; i < MAX_RESOURCES; i++) {
        if (request[i] > rs->available[i] || request[i] > rs->maximum[process][i] - rs->allocated[process][i]) {
            return false;
        }
    }
    return true;
}

// 分配资源
void allocateResources(ResourceSystem* rs, int process, int request[]) {
    int i;
    for (i = 0; i < MAX_RESOURCES; i++) {
        rs->available[i] -= request[i];
        rs->allocated[process][i] += request[i];
    }
}

// 释放资源
void releaseResources(ResourceSystem* rs, int process, int request[]) {
    int i;
    for (i = 0; i < MAX_RESOURCES; i++) {
        rs->available[i] += request[i];
        rs->allocated[process][i] -= request[i];
    }
}

int main() {
    ResourceSystem rs;
    // 初始化可用资源
    rs.available[0] = 3;
    rs.available[1] = 3;
    rs.available[2] = 2;

    // 初始化每个进程的最大需求
    rs.maximum[0][0] = 7;
    rs.maximum[0][1] = 5;
    rs.maximum[0][2] = 3;
    rs.maximum[1][0] = 3;
    rs.maximum[1][1] = 2;
    rs.maximum[1][2] = 2;
    rs.maximum[2][0] = 9;
    rs.maximum[2][1] = 0;
    rs.maximum[2][2] = 2;

    // 初始化已分配资源
    rs.allocated[0][0] = 0;
    rs.allocated[0][1] = 1;
    rs.allocated[0][2] = 0;
    rs.allocated[1][0] = 2;
    rs.allocated[1][1] = 0;
    rs.allocated[1][2] = 0;
    rs.allocated[2][0] = 3;
    rs.allocated[2][1] = 0;
    rs.allocated[2][2] = 2;

    // 假设进程 0 的请求
    int request0[] = {0, 1, 0};
    if (canAllocate(&rs, 0, request0)) {
        allocateResources(&rs, 0, request0);
        printf("Resources allocated to process 0\n");
    } else {
        printf("Resources cannot be allocated to process 0\n");
    }

    // 假设进程 1 的请求
    int request1[] = {2, 0, 2};
    if (canAllocate(&rs, 1, request1)) {
        allocateResources(&rs, 1, request1);
        printf("Resources allocated to process 1\n");
    } else {
        printf("Resources cannot be allocated to process 1\n");
    }

    // 假设进程 2 的请求
    int request2[] = {2, 2, 2};
    if (canAllocate(&rs, 2, request2)) {
        allocateResources(&rs, 2, request2);
        printf("Resources allocated to process 2\n");
    } else {
        printf("Resources cannot be allocated to process 2\n");
    }

    // 释放进程 0 的资源
    releaseResources(&rs, 0, request0);
    printf("Resources released by process 0\n");

    return 0;
}
  1. 逐步分配但提前声明:进程在开始执行前,声明它在整个运行过程中需要的所有资源。系统根据进程的声明,在进程运行过程中逐步分配资源,但前提是系统能够保证满足进程的所有资源需求。例如,一个数据库应用程序,在启动时声明它可能需要连接池中的多个数据库连接、一定的磁盘空间用于缓存数据等资源。系统根据其声明,在程序运行过程中逐步分配这些资源,并且在分配过程中确保不会出现死锁。

破坏不可剥夺条件

  1. 高优先级剥夺:当一个进程占有了某些资源,而另一个优先级更高的进程请求这些资源时,系统可以剥夺低优先级进程所占有的资源,分配给高优先级进程。例如,在一个实时操作系统中,实时任务通常具有较高的优先级。如果一个后台任务占有了 CPU 资源,而此时一个紧急的实时任务需要执行,系统可以剥夺后台任务的 CPU 资源,分配给实时任务,从而避免因不可剥夺条件导致的死锁。

以下是一个简单的模拟高优先级剥夺资源的代码示例(使用 Python 和简单的进程优先级表示):

class Process:
    def __init__(self, pid, priority, resources_needed):
        self.pid = pid
        self.priority = priority
        self.resources_needed = resources_needed
        self.resources_allocated = [0] * len(resources_needed)

    def request_resources(self, available_resources):
        can_allocate = True
        for i in range(len(self.resources_needed)):
            if self.resources_needed[i] > available_resources[i]:
                can_allocate = False
                break
        if can_allocate:
            for i in range(len(self.resources_needed)):
                available_resources[i] -= self.resources_needed[i]
                self.resources_allocated[i] += self.resources_needed[i]
            print(f"Process {self.pid} allocated resources.")
        else:
            print(f"Process {self.pid} cannot allocate resources.")

    def release_resources(self, available_resources):
        for i in range(len(self.resources_allocated)):
            available_resources[i] += self.resources_allocated[i]
            self.resources_allocated[i] = 0
        print(f"Process {self.pid} released resources.")


# 初始化可用资源
available_resources = [3, 3, 2]

# 创建进程
process1 = Process(1, 1, [2, 0, 2])
process2 = Process(2, 2, [0, 1, 0])

# 进程 1 请求资源
process1.request_resources(available_resources)

# 进程 2 请求资源,由于优先级高,尝试剥夺进程 1 的资源
if process2.priority > process1.priority:
    print(f"Process {process2.pid} has higher priority, trying to deprive resources from process {process1.pid}")
    process1.release_resources(available_resources)
    process2.request_resources(available_resources)
else:
    process2.request_resources(available_resources)


  1. 超时剥夺:为进程占有资源设置一个时间限制。如果一个进程占有资源的时间超过了设定的阈值,系统自动剥夺该进程所占有的资源。例如,一个进程申请了一个文件锁,在一定时间内没有完成对文件的操作,系统可以剥夺该文件锁,释放给其他等待的进程,以防止因长期占有资源而导致死锁。

破坏循环等待条件

  1. 资源分配图算法(RAG):资源分配图是一个有向图,它描述了系统中进程和资源之间的关系。图中的节点分为进程节点和资源节点,边表示进程对资源的请求或资源对进程的分配。通过检测资源分配图中是否存在环来判断是否存在死锁。如果存在环,就意味着存在循环等待条件,可能导致死锁。例如,假设有进程 P1、P2,资源 R1、R2,P1 持有 R1 并请求 R2,P2 持有 R2 并请求 R1,在资源分配图中就会形成一个环。

以下是一个简单的使用深度优先搜索(DFS)检测资源分配图中是否存在环的代码示例(使用 Python 和邻接表表示图):

from collections import defaultdict


class Graph:
    def __init__(self, vertices):
        self.graph = defaultdict(list)
        self.V = vertices

    def addEdge(self, u, v):
        self.graph[u].append(v)

    def isCyclicUtil(self, v, visited, recStack):
        visited[v] = True
        recStack[v] = True

        for neighbour in self.graph[v]:
            if not visited[neighbour]:
                if self.isCyclicUtil(neighbour, visited, recStack):
                    return True
            elif recStack[neighbour]:
                return True

        recStack[v] = False
        return False

    def isCyclic(self):
        visited = [False] * self.V
        recStack = [False] * self.V
        for node in range(self.V):
            if not visited[node]:
                if self.isCyclicUtil(node, visited, recStack):
                    return True
        return False


# 假设进程和资源总数为 4
g = Graph(4)
g.addEdge(0, 1)
g.addEdge(1, 2)
g.addEdge(2, 0)
g.addEdge(2, 3)
g.addEdge(3, 3)

if g.isCyclic():
    print("Graph contains cycle, potential deadlock.")
else:
    print("Graph does not contain cycle, no deadlock.")


  1. 资源分配顺序法:为系统中的资源分配一个线性顺序,例如按照资源的编号或者重要性等。进程请求资源时,必须按照这个线性顺序依次请求。这样就避免了循环等待的情况。比如,系统中有资源 R1、R2、R3,规定进程必须先请求 R1,再请求 R2,最后请求 R3。如果一个进程需要 R2 和 R3,它必须先请求 R1(即使它可能并不需要 R1 的实际功能),然后请求 R2,最后请求 R3。这样就不会出现进程 A 等待进程 B 释放 R2 同时进程 B 等待进程 A 释放 R1 的循环等待情况。

死锁避免

银行家算法

  1. 算法原理:银行家算法是一种最具有代表性的死锁避免算法。其原理基于系统资源分配的安全性检查。把操作系统比作银行家,操作系统管理的资源如同银行家管理的资金,进程向操作系统请求资源如同用户向银行家贷款。银行家在给用户贷款时,要考虑这笔贷款是否会导致系统处于不安全状态,如果会则拒绝贷款。同样,操作系统在分配资源给进程时,要检查分配后系统是否仍处于安全状态,如果安全则分配,否则拒绝。

例如,假设有三个进程 P1、P2、P3,系统中有三种资源 R1、R2、R3,初始时可用资源分别为 3、3、2。进程 P1 最大需求为 7、5、3,已分配 0、1、0;进程 P2 最大需求为 3、2、2,已分配 2、0、0;进程 P3 最大需求为 9、0、2,已分配 3、0、2。银行家算法会首先检查每个进程剩余的需求(最大需求减去已分配资源),然后尝试找到一个安全序列,即按照这个序列分配资源,每个进程都能顺利完成,不会出现死锁。

  1. 算法步骤
    • 初始化:记录系统中每种资源的总量(Available)、每个进程对每种资源的最大需求(Max)、每个进程当前已分配到的资源(Allocation)。
    • 计算需求:计算每个进程还需要的资源量(Need = Max - Allocation)。
    • 寻找安全序列:在所有进程中,寻找一个Need小于等于Available的进程。如果找到这样的进程,假设将资源分配给它,更新AvailableAvailable = Available + Allocation),并标记该进程为已完成。重复这个步骤,尝试找到一个完整的安全序列。如果能够找到这样的安全序列,说明系统处于安全状态,可以分配资源;否则,系统处于不安全状态,不能分配资源。

以下是银行家算法的 Python 代码实现:

def isSafe(processes, available, max_need, allocation):
    need = [[max_need[i][j] - allocation[i][j] for j in range(len(available))] for i in range(len(processes))]
    finish = [False] * len(processes)
    work = available.copy()
    safe_sequence = []

    while True:
        found = False
        for i in range(len(processes)):
            if not finish[i] and all(need[i][j] <= work[j] for j in range(len(available))):
                for j in range(len(available)):
                    work[j] += allocation[i][j]
                finish[i] = True
                safe_sequence.append(i)
                found = True
        if not found:
            break

    if all(finish):
        print("System is in safe state. Safe sequence:", safe_sequence)
        return True
    else:
        print("System is not in safe state.")
        return False


# 示例数据
processes = [0, 1, 2]
available = [3, 3, 2]
max_need = [
    [7, 5, 3],
    [3, 2, 2],
    [9, 0, 2]
]
allocation = [
    [0, 1, 0],
    [2, 0, 0],
    [3, 0, 2]
]

isSafe(processes, available, max_need, allocation)


资源分配拒绝策略

  1. 基于剩余资源的拒绝:当一个进程请求资源时,系统首先检查剩余资源是否足够满足该进程的需求。如果剩余资源量小于进程的请求量,直接拒绝该请求。例如,系统剩余内存空间为 100MB,一个进程请求 200MB 内存,系统直接拒绝该请求,以避免因资源不足导致的潜在死锁。

  2. 基于进程优先级和剩余资源的拒绝:结合进程的优先级和剩余资源情况进行决策。如果剩余资源能够满足一个高优先级进程的部分需求,而一个低优先级进程请求全部资源,系统可能拒绝低优先级进程的请求,优先满足高优先级进程的部分需求。这样可以在一定程度上保证高优先级进程的运行,同时避免死锁。例如,系统中有两个进程,高优先级进程 P1 需要 10 个资源,已分配 5 个,低优先级进程 P2 需要 8 个资源,已分配 0 个,此时系统剩余资源为 6 个。系统可能拒绝 P2 的请求,先满足 P1 的剩余 5 个资源需求,使 P1 能够继续运行并最终释放资源,避免死锁。

死锁检测与恢复

死锁检测算法

  1. 资源分配图算法(RAG 算法改进):在之前介绍的资源分配图检测环的基础上,增加资源分配和请求的动态变化处理。当一个进程请求资源或释放资源时,更新资源分配图,并重新检测图中是否存在环。例如,进程 P1 释放了资源 R1,R1 被分配给进程 P2,此时更新资源分配图,然后使用深度优先搜索等算法检测是否存在环。如果存在环,说明可能出现死锁。

  2. 基于状态的检测算法:记录系统中每个进程的状态(如运行、等待资源等)以及资源的分配情况。定期检查系统状态,看是否存在进程相互等待资源且没有资源可用的情况。例如,系统中有进程 P1、P2、P3,P1 等待 P2 占有的资源 R1,P2 等待 P3 占有的资源 R2,P3 等待 P1 占有的资源 R3,同时系统中没有其他可用资源,这种情况就表明可能出现死锁。

死锁恢复方法

  1. 终止进程:选择一个或多个死锁进程并终止它们。可以选择优先级最低的进程、占用资源最多的进程或者已经运行时间最短的进程等作为终止对象。例如,系统检测到进程 P1、P2、P3 死锁,P1 是一个后台批处理进程,优先级较低,选择终止 P1,释放它所占有的资源,使 P2 和 P3 能够继续运行。

  2. 资源剥夺:从死锁进程中剥夺部分资源,分配给其他进程,以打破死锁。例如,进程 P1、P2 死锁,P1 占有资源 R1 和 R2,P2 占有资源 R3 并请求 R1,P1 占有 R1 并请求 R3。可以剥夺 P1 的 R1 资源,分配给 P2,从而打破死锁。在剥夺资源时,要考虑进程的恢复问题,例如被剥夺资源的进程可能需要重新初始化部分状态。

  3. 回滚进程:将死锁进程回滚到某个之前的检查点状态,然后重新执行。检查点记录了进程在某个时刻的状态和资源分配情况。例如,进程 P 在运行过程中与其他进程发生死锁,系统将 P 回滚到之前设置的检查点状态,此时 P 可能持有较少的资源,重新执行时可能不会再陷入死锁。但回滚进程需要额外的系统开销来记录和恢复检查点状态。