MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

C语言原子操作与无锁编程在多核环境的应用

2022-03-024.2k 阅读

C语言原子操作基础

在多核环境下编程,数据竞争是一个常见且棘手的问题。传统的锁机制虽然能有效解决数据竞争,但在高并发场景下,锁的争用会带来性能瓶颈。原子操作则提供了一种轻量级的、无需锁机制就能保证数据一致性的方法。

原子操作是指不可被中断的操作,在多核处理器中,这样的操作能够在单条指令内完成,不会被其他线程或处理器干扰。C语言从C11标准开始引入了原子类型和原子操作库<stdatomic.h>,为开发者提供了在多核环境下进行原子操作的能力。

原子类型

<stdatomic.h>定义了一系列原子类型,例如_Atomic(int)表示一个原子整型。这些原子类型与普通类型类似,但对它们的操作是原子性的。以整型为例,普通的int类型在多线程环境下进行读写操作可能会出现数据竞争问题,而_Atomic(int)类型的变量在进行操作时不会被打断,确保了操作的原子性。

#include <stdatomic.h>
#include <stdio.h>

int main() {
    _Atomic(int) atomicVar = 0;
    atomic_store(&atomicVar, 42);
    int value = atomic_load(&atomicVar);
    printf("The value of atomic variable: %d\n", value);
    return 0;
}

在上述代码中,首先定义了一个原子整型变量atomicVar并初始化为0。然后使用atomic_store函数将值42存储到atomicVar中,这个操作是原子的,不会被其他线程干扰。接着通过atomic_load函数原子地加载atomicVar的值并打印出来。

原子操作函数

<stdatomic.h>提供了丰富的原子操作函数,除了前面提到的atomic_storeatomic_load,常见的还有atomic_fetch_addatomic_fetch_sub等。

  1. atomic_store:将一个值存储到原子变量中,其原型为void atomic_store(_Atomic(T) *obj, T desired);,其中obj是指向原子变量的指针,desired是要存储的值。
  2. atomic_load:从原子变量中加载值,其原型为T atomic_load(const _Atomic(T) *obj);,返回原子变量当前的值。
  3. atomic_fetch_add:将一个值加到原子变量上,并返回原子变量的旧值,其原型为T atomic_fetch_add(_Atomic(T) *obj, T arg);obj为指向原子变量的指针,arg是要加的值。
#include <stdatomic.h>
#include <stdio.h>
#include <pthread.h>

_Atomic(int) counter = 0;

void* increment(void* arg) {
    for (int i = 0; i < 10000; ++i) {
        atomic_fetch_add(&counter, 1);
    }
    return NULL;
}

int main() {
    pthread_t threads[10];
    for (int i = 0; i < 10; ++i) {
        pthread_create(&threads[i], NULL, increment, NULL);
    }
    for (int i = 0; i < 10; ++i) {
        pthread_join(threads[i], NULL);
    }
    printf("Final counter value: %d\n", atomic_load(&counter));
    return 0;
}

在这段代码中,定义了一个原子整型counterincrement函数通过atomic_fetch_add函数对counter进行10000次原子性的加1操作。在main函数中创建10个线程来执行increment函数,最后通过atomic_load获取并打印counter的最终值。由于atomic_fetch_add操作是原子的,所以即使多个线程同时执行加1操作,也不会出现数据竞争问题,能保证counter最终值的正确性。

无锁编程概念

无锁编程是一种在多线程环境下避免使用锁机制来实现数据一致性和并发控制的编程范式。在多核环境中,锁的争用会导致线程阻塞,降低系统的并发性能。无锁编程通过使用原子操作和一些特殊的数据结构设计,使得多个线程可以在不等待锁的情况下同时访问和修改共享数据。

无锁编程的优势

  1. 提高并发性能:由于无需等待锁,线程不会因为锁争用而阻塞,从而提高了系统整体的并发处理能力。在高并发场景下,无锁数据结构能够显著提升程序的运行效率。
  2. 避免死锁:死锁是多线程编程中常见的问题,通常是由于多个线程相互等待对方释放锁而导致的。无锁编程不依赖锁机制,从根本上避免了死锁的发生。
  3. 更好的扩展性:随着多核处理器核心数量的增加,锁争用问题会变得更加严重。无锁编程能够更好地适应多核环境的扩展,充分利用多核处理器的性能。

无锁编程的挑战

  1. 复杂的设计和实现:无锁数据结构的设计和实现通常比基于锁的数据结构复杂得多。需要深入理解原子操作和并发编程原理,以确保数据的一致性和正确性。
  2. 调试困难:由于无锁编程中多个线程同时访问和修改数据,调试过程变得更加困难。传统的调试工具和方法在无锁环境下可能无法准确捕捉到问题,需要使用一些专门的并发调试工具。
  3. ABA问题:这是无锁编程中常见的问题,当一个指针被修改为指向另一个对象,然后又被改回原来的对象时,可能会导致错误的判断。解决ABA问题需要额外的机制,如使用版本号等。

无锁数据结构实现

无锁链表

无锁链表是一种常见的无锁数据结构,它允许多个线程同时对链表进行插入和删除操作而无需使用锁。实现无锁链表的关键在于使用原子操作来管理链表节点的指针。

#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

typedef struct Node {
    int value;
    struct Node *next;
    _Atomic(int) marked;
} Node;

typedef struct {
    Node *head;
} LockFreeList;

LockFreeList list;

Node* createNode(int value) {
    Node *newNode = (Node*)malloc(sizeof(Node));
    newNode->value = value;
    newNode->next = NULL;
    atomic_store(&newNode->marked, 0);
    return newNode;
}

int insert(int value) {
    while (1) {
        Node *pred = list.head;
        Node *curr = atomic_load(&pred->next);
        while (curr != NULL && atomic_load(&curr->marked)) {
            pred = curr;
            curr = atomic_load(&curr->next);
        }
        if (curr == NULL || curr->value > value) {
            Node *newNode = createNode(value);
            atomic_store(&newNode->next, curr);
            if (atomic_compare_exchange_weak(&pred->next, &curr, newNode)) {
                return 1;
            }
        } else if (curr->value == value) {
            return 0;
        }
    }
}

int main() {
    list.head = createNode(-1);
    pthread_t threads[10];
    for (int i = 0; i < 10; ++i) {
        pthread_create(&threads[i], NULL, (void* (*)(void*))insert, (void*)&i);
    }
    for (int i = 0; i < 10; ++i) {
        pthread_join(threads[i], NULL);
    }
    Node *curr = atomic_load(&list.head->next);
    while (curr != NULL) {
        printf("%d ", curr->value);
        curr = atomic_load(&curr->next);
    }
    printf("\n");
    return 0;
}

在上述代码中,定义了一个无锁链表结构。Node结构体包含节点的值value、指向下一个节点的指针next以及一个用于标记节点是否已删除的原子整型markedLockFreeList结构体只有一个head指针指向链表头。createNode函数用于创建新节点并初始化相关字段。insert函数实现了无锁插入操作,通过循环不断尝试插入新节点,使用atomic_compare_exchange_weak原子操作来确保插入操作的原子性和数据一致性。在main函数中创建10个线程同时向链表中插入数据,最后遍历链表并打印节点值。

无锁队列

无锁队列也是一种常用的无锁数据结构,常用于多线程之间的数据传递。实现无锁队列可以使用循环数组或链表结构,并结合原子操作来管理队列的头和尾指针。

#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define QUEUE_SIZE 10

typedef struct {
    int data[QUEUE_SIZE];
    _Atomic(int) head;
    _Atomic(int) tail;
} LockFreeQueue;

LockFreeQueue queue;

void initQueue() {
    atomic_store(&queue.head, 0);
    atomic_store(&queue.tail, 0);
}

int enqueue(int value) {
    int tail = atomic_load(&queue.tail);
    int nextTail = (tail + 1) % QUEUE_SIZE;
    if (nextTail == atomic_load(&queue.head)) {
        return 0;
    }
    queue.data[tail] = value;
    atomic_store(&queue.tail, nextTail);
    return 1;
}

int dequeue(int *value) {
    int head = atomic_load(&queue.head);
    if (head == atomic_load(&queue.tail)) {
        return 0;
    }
    *value = queue.data[head];
    atomic_store(&queue.head, (head + 1) % QUEUE_SIZE);
    return 1;
}

void* producer(void* arg) {
    for (int i = 0; i < 5; ++i) {
        enqueue(i);
    }
    return NULL;
}

void* consumer(void* arg) {
    int value;
    for (int i = 0; i < 5; ++i) {
        if (dequeue(&value)) {
            printf("Consumed: %d\n", value);
        }
    }
    return NULL;
}

int main() {
    initQueue();
    pthread_t producerThread, consumerThread;
    pthread_create(&producerThread, NULL, producer, NULL);
    pthread_create(&consumerThread, NULL, consumer, NULL);
    pthread_join(producerThread, NULL);
    pthread_join(consumerThread, NULL);
    return 0;
}

在这段代码中,定义了一个基于循环数组的无锁队列。LockFreeQueue结构体包含一个数组data用于存储数据,以及两个原子整型headtail分别表示队列的头和尾。initQueue函数初始化队列的头和尾指针。enqueue函数实现入队操作,首先检查队列是否已满,然后将数据存入队列并更新尾指针。dequeue函数实现出队操作,检查队列是否为空,然后取出数据并更新头指针。在main函数中创建一个生产者线程和一个消费者线程,生产者线程向队列中插入数据,消费者线程从队列中取出数据并打印。

原子操作与无锁编程的性能分析

锁机制与无锁编程的性能对比

在多核环境下,锁机制在高并发场景下会因为锁争用而导致性能下降。当多个线程频繁竞争同一把锁时,大量的时间会消耗在等待锁的获取上,从而降低了系统的整体并发性能。

无锁编程通过原子操作避免了锁争用,使得多个线程可以同时访问和修改共享数据,在高并发场景下通常能够获得更好的性能表现。然而,无锁编程的性能优势并不是绝对的,其性能还受到多种因素的影响,如原子操作的开销、数据结构的复杂度等。

为了对比锁机制和无锁编程的性能,我们可以进行一些简单的性能测试。以下是一个使用互斥锁和原子操作分别实现的计数器示例,并对它们的性能进行对比。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#include <time.h>

#define THREADS 10
#define ITERATIONS 1000000

_Atomic(int) atomicCounter = 0;
pthread_mutex_t mutex;
int mutexCounter = 0;

void* atomicIncrement(void* arg) {
    for (int i = 0; i < ITERATIONS; ++i) {
        atomic_fetch_add(&atomicCounter, 1);
    }
    return NULL;
}

void* mutexIncrement(void* arg) {
    for (int i = 0; i < ITERATIONS; ++i) {
        pthread_mutex_lock(&mutex);
        mutexCounter++;
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main() {
    pthread_t atomicThreads[THREADS];
    pthread_t mutexThreads[THREADS];
    clock_t start, end;

    pthread_mutex_init(&mutex, NULL);

    start = clock();
    for (int i = 0; i < THREADS; ++i) {
        pthread_create(&atomicThreads[i], NULL, atomicIncrement, NULL);
    }
    for (int i = 0; i < THREADS; ++i) {
        pthread_join(atomicThreads[i], NULL);
    }
    end = clock();
    double atomicTime = ((double)(end - start)) / CLOCKS_PER_SEC;
    printf("Atomic counter time: %f seconds\n", atomicTime);

    start = clock();
    for (int i = 0; i < THREADS; ++i) {
        pthread_create(&mutexThreads[i], NULL, mutexIncrement, NULL);
    }
    for (int i = 0; i < THREADS; ++i) {
        pthread_join(mutexThreads[i], NULL);
    }
    end = clock();
    double mutexTime = ((double)(end - start)) / CLOCKS_PER_SEC;
    printf("Mutex counter time: %f seconds\n", mutexTime);

    pthread_mutex_destroy(&mutex);
    return 0;
}

在上述代码中,分别使用原子操作和互斥锁实现了计数器功能。atomicIncrement函数使用atomic_fetch_add原子操作对atomicCounter进行自增,mutexIncrement函数使用互斥锁保护mutexCounter的自增操作。在main函数中,分别创建10个线程来执行这两个函数,并使用clock函数记录执行时间。通过对比atomicTimemutexTime可以看出,在这种高并发场景下,原子操作实现的计数器性能优于基于互斥锁的计数器。

影响无锁编程性能的因素

  1. 原子操作开销:虽然原子操作避免了锁争用,但本身也有一定的开销。不同的原子操作指令在不同的处理器架构上的执行时间和资源消耗可能不同。例如,一些复杂的原子操作可能需要多个处理器周期才能完成,这会影响无锁编程的性能。
  2. 数据结构复杂度:无锁数据结构的设计复杂度会影响其性能。例如,无锁链表的插入和删除操作需要使用原子操作来管理指针,实现相对复杂。如果数据结构设计不合理,可能会导致额外的原子操作开销,从而降低性能。
  3. 线程数和竞争程度:随着线程数的增加,无锁编程的性能优势可能会更加明显,但当竞争程度过高时,即使是无锁数据结构也可能会因为频繁的原子操作冲突而性能下降。例如,在一个高度竞争的无锁队列中,多个线程同时进行入队和出队操作,可能会导致原子操作的重试次数增加,从而影响性能。

解决ABA问题

ABA问题的产生

ABA问题是无锁编程中常见的问题。在使用指针或引用进行数据操作时,当一个指针被修改为指向另一个对象,然后又被改回原来的对象时,可能会导致错误的判断。例如,在无锁链表中,一个节点被删除后,其内存被重新分配并用于另一个节点,然后这个新节点又被插入到链表中,此时如果其他线程根据指针来判断节点的状态,可能会误认为该节点没有被删除过。

解决ABA问题的方法

  1. 使用版本号:一种常见的解决ABA问题的方法是为每个对象添加一个版本号。每次对象的状态发生变化时,版本号加1。在进行原子操作时,不仅要比较指针的值,还要比较版本号。只有当指针和版本号都匹配时,才认为操作是合法的。
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

typedef struct Node {
    int value;
    struct Node *next;
    _Atomic(int) version;
} Node;

typedef struct {
    Node *head;
} LockFreeList;

LockFreeList list;

Node* createNode(int value) {
    Node *newNode = (Node*)malloc(sizeof(Node));
    newNode->value = value;
    newNode->next = NULL;
    atomic_store(&newNode->version, 0);
    return newNode;
}

int insert(int value) {
    while (1) {
        Node *pred = list.head;
        Node *curr = atomic_load(&pred->next);
        int predVersion = atomic_load(&pred->version);
        int currVersion = atomic_load(&curr->version);
        while (curr != NULL && atomic_load(&curr->marked)) {
            pred = curr;
            curr = atomic_load(&curr->next);
            predVersion = atomic_load(&pred->version);
            currVersion = atomic_load(&curr->version);
        }
        if (curr == NULL || curr->value > value) {
            Node *newNode = createNode(value);
            atomic_store(&newNode->next, curr);
            int newPredVersion = predVersion + 1;
            if (atomic_compare_exchange_weak(&pred->next, &curr, newNode) &&
                atomic_compare_exchange_weak(&pred->version, &predVersion, newPredVersion)) {
                return 1;
            }
        } else if (curr->value == value) {
            return 0;
        }
    }
}

int main() {
    list.head = createNode(-1);
    pthread_t threads[10];
    for (int i = 0; i < 10; ++i) {
        pthread_create(&threads[i], NULL, (void* (*)(void*))insert, (void*)&i);
    }
    for (int i = 0; i < 10; ++i) {
        pthread_join(threads[i], NULL);
    }
    Node *curr = atomic_load(&list.head->next);
    while (curr != NULL) {
        printf("%d ", curr->value);
        curr = atomic_load(&curr->next);
    }
    printf("\n");
    return 0;
}

在上述代码中,为Node结构体添加了一个原子整型version用于记录版本号。在insert函数中,每次进行原子操作时,不仅比较指针,还比较版本号,确保操作的正确性。

  1. 使用标记位:另一种方法是使用标记位来记录对象的状态变化。例如,在无锁链表中,可以为每个节点添加一个标记位,当节点被删除时设置标记位,这样在进行操作时可以根据标记位来判断节点是否被删除过,从而避免ABA问题。
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

typedef struct Node {
    int value;
    struct Node *next;
    _Atomic(int) marked;
} Node;

typedef struct {
    Node *head;
} LockFreeList;

LockFreeList list;

Node* createNode(int value) {
    Node *newNode = (Node*)malloc(sizeof(Node));
    newNode->value = value;
    newNode->next = NULL;
    atomic_store(&newNode->marked, 0);
    return newNode;
}

int insert(int value) {
    while (1) {
        Node *pred = list.head;
        Node *curr = atomic_load(&pred->next);
        while (curr != NULL && atomic_load(&curr->marked)) {
            pred = curr;
            curr = atomic_load(&curr->next);
        }
        if (curr == NULL || curr->value > value) {
            Node *newNode = createNode(value);
            atomic_store(&newNode->next, curr);
            if (atomic_compare_exchange_weak(&pred->next, &curr, newNode)) {
                return 1;
            }
        } else if (curr->value == value) {
            return 0;
        }
    }
}

int main() {
    list.head = createNode(-1);
    pthread_t threads[10];
    for (int i = 0; i < 10; ++i) {
        pthread_create(&threads[i], NULL, (void* (*)(void*))insert, (void*)&i);
    }
    for (int i = 0; i < 10; ++i) {
        pthread_join(threads[i], NULL);
    }
    Node *curr = atomic_load(&list.head->next);
    while (curr != NULL) {
        printf("%d ", curr->value);
        curr = atomic_load(&curr->next);
    }
    printf("\n");
    return 0;
}

在这段代码中,通过为Node结构体添加marked标记位,在删除节点时设置该标记位,在插入和遍历链表时根据标记位判断节点状态,有效避免了ABA问题。

总结

在多核环境下,C语言的原子操作和无锁编程为开发者提供了一种高效的并发编程方式。原子操作通过保证操作的原子性,避免了数据竞争问题,而无锁编程则通过使用原子操作和特殊的数据结构设计,避免了锁争用带来的性能瓶颈。

无锁数据结构如无锁链表和无锁队列的实现,展示了如何在不使用锁的情况下实现高效的并发数据结构。然而,无锁编程也面临着一些挑战,如复杂的设计和实现、调试困难以及ABA问题等。

通过性能分析,我们了解到在高并发场景下,无锁编程通常能够获得比锁机制更好的性能,但性能还受到原子操作开销、数据结构复杂度以及线程竞争程度等多种因素的影响。同时,通过使用版本号或标记位等方法,我们可以有效地解决无锁编程中的ABA问题。

在实际应用中,开发者需要根据具体的场景和需求,权衡锁机制和无锁编程的优缺点,选择合适的并发编程方式,以实现高效、可靠的多核程序。