MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

C#中的任务并行库TPL与并行编程

2024-04-056.1k 阅读

1. C# 并行编程概述

在当今多核处理器普及的时代,充分利用多核资源以提升程序性能变得至关重要。并行编程允许我们将一个大的任务分解为多个小任务,并同时执行这些小任务,从而加快整体的处理速度。在 C# 中,任务并行库(Task Parallel Library,TPL)为我们提供了强大且便捷的并行编程模型。

1.1 并行编程的重要性

传统的顺序编程模型在处理大量数据或复杂计算时,由于单核处理器一次只能执行一个指令,会导致处理时间较长。而并行编程能够让多核处理器的每个核心同时处理不同的任务,大大提高了处理效率。例如,在数据分析、科学计算等领域,并行编程能显著减少处理时间,提高系统的响应速度。

1.2 C# 并行编程的发展历程

早期在 C# 中进行并行编程主要依赖于线程(Thread)类。开发者需要手动创建线程、管理线程的生命周期以及处理线程同步等复杂问题。这种方式虽然灵活,但编写和维护代码的难度较大,容易出现线程安全问题。随着多核处理器的广泛应用,微软在.NET Framework 4.0 中引入了任务并行库(TPL)。TPL 提供了更高级的抽象,使得并行编程更加简单、高效,开发者可以专注于业务逻辑,而无需过多关注底层线程管理。

2. 任务并行库(TPL)基础

2.1 任务(Task)的概念

在 TPL 中,Task 类是核心概念。一个 Task 代表一个异步操作,它可以是一个简单的计算任务,也可以是一个复杂的 I/O 操作。Task 类提供了丰富的方法和属性来管理和控制异步操作的执行。

2.1.1 创建任务

我们可以通过多种方式创建 Task。最基本的方式是使用 Task.Run 方法,它接受一个 Action 委托作为参数,该委托表示要执行的任务。例如:

Task task = Task.Run(() =>
{
    // 任务执行的代码
    Console.WriteLine("Task is running.");
});

在上述代码中,Task.Run 方法创建并启动了一个新任务,任务执行的内容是在控制台输出 "Task is running."。

2.1.2 任务的状态

Task 有多种状态,包括 Created(已创建)、WaitingForActivation(等待激活)、Running(正在运行)、WaitingToRun(等待运行)、Completed(已完成)、Faulted(出错)和 Canceled(已取消)。我们可以通过 Task.Status 属性获取任务当前的状态。例如:

Task task = Task.Run(() =>
{
    // 模拟任务执行
    Thread.Sleep(2000);
});

while (task.Status != TaskStatus.RanToCompletion)
{
    Console.WriteLine($"Task status: {task.Status}");
    Thread.Sleep(500);
}
Console.WriteLine($"Final task status: {task.Status}");

在这段代码中,我们创建了一个任务,并在任务执行过程中不断输出任务的状态,直到任务成功完成。

2.2 任务的延续(Task Continuation)

任务延续允许我们在一个任务完成后执行另一个任务。这在很多场景下非常有用,比如在一个数据处理任务完成后,需要将结果保存到数据库中。

2.2.1 使用 ContinueWith 方法

Task 类提供了 ContinueWith 方法来创建任务延续。ContinueWith 方法接受一个 Task 作为参数,并返回一个新的 Task。例如:

Task<int> task1 = Task.Run(() =>
{
    // 模拟一个返回结果的任务
    Thread.Sleep(2000);
    return 42;
});

Task<string> task2 = task1.ContinueWith(t =>
{
    if (t.IsFaulted)
    {
        return "Task 1 failed.";
    }
    else if (t.IsCanceled)
    {
        return "Task 1 was canceled.";
    }
    else
    {
        return $"Task 1 result: {t.Result}";
    }
});

Console.WriteLine(task2.Result);

在上述代码中,task1 是一个返回整数结果的任务。task2task1 的延续任务,它根据 task1 的执行结果返回不同的字符串。

2.2.2 延续任务的执行条件

ContinueWith 方法还提供了重载,可以指定延续任务的执行条件。例如,我们可以让延续任务只在前置任务成功完成时执行:

Task<int> task1 = Task.Run(() =>
{
    // 模拟一个返回结果的任务
    Thread.Sleep(2000);
    return 42;
});

Task<string> task2 = task1.ContinueWith(t =>
{
    return $"Task 1 result: {t.Result}";
}, TaskContinuationOptions.OnlyOnRanToCompletion);

在这个例子中,TaskContinuationOptions.OnlyOnRanToCompletion 表示 task2 只有在 task1 成功完成时才会执行。

3. 并行循环

并行循环是 TPL 中用于并行化迭代操作的重要工具。在传统的顺序编程中,forforeach 循环是顺序执行的,而并行循环可以将循环体中的操作并行执行,从而提高执行效率。

3.1 Parallel.For 循环

Parallel.For 方法用于并行执行一个 for 风格的循环。它接受循环的起始值、结束值以及一个 Action<int> 委托,该委托表示每次迭代要执行的操作。例如:

Parallel.For(0, 10, i =>
{
    Console.WriteLine($"Iteration {i} is running on thread {Thread.CurrentThread.ManagedThreadId}");
});

在上述代码中,Parallel.For 循环从 0 到 9 并行执行迭代操作,每个迭代会在不同的线程上执行,并输出当前迭代的编号和执行线程的 ID。

3.2 Parallel.ForEach 循环

Parallel.ForEach 方法用于并行执行一个 foreach 风格的循环。它接受一个实现了 IEnumerable<T> 接口的集合以及一个 Action<T> 委托,该委托表示对集合中每个元素要执行的操作。例如:

List<int> numbers = Enumerable.Range(1, 10).ToList();
Parallel.ForEach(numbers, number =>
{
    Console.WriteLine($"Processing number {number} on thread {Thread.CurrentThread.ManagedThreadId}");
});

在这段代码中,Parallel.ForEachnumbers 列表中的每个元素并行执行操作,输出正在处理的数字和执行线程的 ID。

3.3 并行循环的选项

Parallel.ForParallel.ForEach 方法都提供了重载,可以接受 ParallelOptions 对象,通过该对象可以设置并行循环的一些选项。例如,我们可以设置最大并行度:

ParallelOptions options = new ParallelOptions();
options.MaxDegreeOfParallelism = 3;

Parallel.For(0, 10, options, i =>
{
    Console.WriteLine($"Iteration {i} is running on thread {Thread.CurrentThread.ManagedThreadId}");
});

在上述代码中,MaxDegreeOfParallelism 设置为 3,表示最多同时有 3 个迭代在执行,这样可以避免过度并行导致的资源竞争和性能下降。

4. 并行集合

并行集合是 TPL 提供的一组线程安全的集合类,用于在并行编程中高效地处理数据。这些集合类专门设计用于多线程环境,能够自动处理线程同步问题,提高并行操作的性能。

4.1 ConcurrentDictionary

ConcurrentDictionary 是一个线程安全的键值对集合。它允许在多个线程同时对集合进行读写操作,而无需手动进行锁的管理。例如:

ConcurrentDictionary<int, string> dictionary = new ConcurrentDictionary<int, string>();
Task[] tasks = new Task[10];
for (int i = 0; i < tasks.Length; i++)
{
    tasks[i] = Task.Run(() =>
    {
        dictionary.TryAdd(i, $"Value {i}");
    });
}
Task.WaitAll(tasks);

foreach (var pair in dictionary)
{
    Console.WriteLine($"Key: {pair.Key}, Value: {pair.Value}");
}

在上述代码中,我们创建了一个 ConcurrentDictionary,并通过多个任务并行地向集合中添加键值对。最后,我们遍历集合并输出所有的键值对。

4.2 ConcurrentQueue

ConcurrentQueue 是一个线程安全的队列。它支持多个线程同时进行入队和出队操作。例如:

ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
Task[] tasks1 = new Task[10];
for (int i = 0; i < tasks1.Length; i++)
{
    tasks1[i] = Task.Run(() =>
    {
        queue.Enqueue(i);
    });
}
Task.WaitAll(tasks1);

int result;
while (queue.TryDequeue(out result))
{
    Console.WriteLine($"Dequeued: {result}");
}

在这段代码中,我们创建了一个 ConcurrentQueue,并通过多个任务并行地向队列中添加元素。然后,我们从队列中依次取出元素并输出。

4.3 ConcurrentBag

ConcurrentBag 是一个线程安全的无序集合,它允许快速地向集合中添加和移除元素。例如:

ConcurrentBag<int> bag = new ConcurrentBag<int>();
Task[] tasks2 = new Task[10];
for (int i = 0; i < tasks2.Length; i++)
{
    tasks2[i] = Task.Run(() =>
    {
        bag.Add(i);
    });
}
Task.WaitAll(tasks2);

int item;
while (bag.TryTake(out item))
{
    Console.WriteLine($"Took: {item}");
}

在上述代码中,我们创建了一个 ConcurrentBag,并通过多个任务并行地向集合中添加元素。然后,我们从集合中依次取出元素并输出,由于 ConcurrentBag 是无序的,每次取出的元素顺序可能不同。

5. 同步机制与线程安全

在并行编程中,同步机制是确保多线程操作正确执行的关键。如果多个线程同时访问和修改共享资源,可能会导致数据不一致或其他错误。TPL 提供了一些同步机制来帮助我们解决这些问题。

5.1 lock 语句

lock 语句是 C# 中最基本的同步机制。它用于锁定一个对象,确保在同一时间只有一个线程可以执行锁定区域内的代码。例如:

private static object lockObject = new object();
private static int sharedVariable = 0;

Task[] tasks3 = new Task[10];
for (int i = 0; i < tasks3.Length; i++)
{
    tasks3[i] = Task.Run(() =>
    {
        lock (lockObject)
        {
            sharedVariable++;
        }
    });
}
Task.WaitAll(tasks3);
Console.WriteLine($"Final value of sharedVariable: {sharedVariable}");

在上述代码中,我们使用 lock 语句保护对 sharedVariable 的访问,确保每个线程在修改 sharedVariable 时不会出现竞争条件。

5.2 SemaphoreSlim

SemaphoreSlim 是一个轻量级的信号量,它允许我们控制同时访问某个资源的线程数量。例如,假设我们有一个资源只能同时被 3 个线程访问:

SemaphoreSlim semaphore = new SemaphoreSlim(3, 3);
Task[] tasks4 = new Task[10];
for (int i = 0; i < tasks4.Length; i++)
{
    tasks4[i] = Task.Run(async () =>
    {
        await semaphore.WaitAsync();
        try
        {
            // 访问共享资源的代码
            Console.WriteLine($"Thread {Thread.CurrentThread.ManagedThreadId} is accessing the shared resource.");
        }
        finally
        {
            semaphore.Release();
        }
    });
}
Task.WaitAll(tasks4);

在这段代码中,SemaphoreSlim 的初始计数和最大计数都设置为 3,表示最多允许 3 个线程同时访问共享资源。每个线程在访问资源前先调用 WaitAsync 方法获取信号量,访问结束后调用 Release 方法释放信号量。

5.3 Monitor

Monitor 类提供了更底层的同步控制。它的功能与 lock 语句类似,但提供了更多的方法来进行复杂的同步操作。例如,Monitor.EnterMonitor.Exit 方法可以实现与 lock 语句相同的功能:

private static object monitorObject = new object();
private static int monitorSharedVariable = 0;

Task[] tasks5 = new Task[10];
for (int i = 0; i < tasks5.Length; i++)
{
    tasks5[i] = Task.Run(() =>
    {
        Monitor.Enter(monitorObject);
        try
        {
            monitorSharedVariable++;
        }
        finally
        {
            Monitor.Exit(monitorObject);
        }
    });
}
Task.WaitAll(tasks5);
Console.WriteLine($"Final value of monitorSharedVariable: {monitorSharedVariable}");

在上述代码中,我们使用 Monitor.EnterMonitor.Exit 方法来保护对 monitorSharedVariable 的访问,确保线程安全。

6. 异步编程与 TPL

异步编程是现代编程中提高应用程序响应性和性能的重要手段。在 C# 中,异步编程与 TPL 紧密结合,使得编写高效的异步代码变得更加容易。

6.1 asyncawait 关键字

asyncawait 关键字是 C# 异步编程的核心。async 关键字用于标记一个异步方法,await 关键字用于暂停异步方法的执行,直到其等待的 Task 完成。例如:

public async Task<string> AsyncMethod()
{
    await Task.Delay(2000);
    return "Async operation completed.";
}

public async Task Main()
{
    string result = await AsyncMethod();
    Console.WriteLine(result);
}

在上述代码中,AsyncMethod 是一个异步方法,它使用 await 等待 Task.Delay 任务完成,模拟一个耗时操作。Main 方法也是异步的,它调用 AsyncMethod 并等待其结果,然后输出结果。

6.2 异步任务的组合

在实际应用中,我们经常需要组合多个异步任务。TPL 提供了一些方法来方便地实现这一点。例如,Task.WhenAll 方法可以等待多个任务都完成:

Task<string> task3 = Task.Run(() =>
{
    Thread.Sleep(2000);
    return "Task 3 result";
});

Task<string> task4 = Task.Run(() =>
{
    Thread.Sleep(3000);
    return "Task 4 result";
});

Task<string[]> allTasks = Task.WhenAll(task3, task4);
string[] results = allTasks.Result;
foreach (string result in results)
{
    Console.WriteLine(result);
}

在这段代码中,Task.WhenAll 等待 task3task4 都完成,并返回一个包含所有任务结果的数组。我们可以遍历这个数组获取每个任务的结果。

6.3 异步异常处理

在异步编程中,异常处理同样重要。当一个异步任务抛出异常时,我们可以在调用 await 的地方捕获异常。例如:

public async Task AsyncMethodWithException()
{
    throw new Exception("Async method exception");
}

public async Task MainWithException()
{
    try
    {
        await AsyncMethodWithException();
    }
    catch (Exception ex)
    {
        Console.WriteLine($"Caught exception: {ex.Message}");
    }
}

在上述代码中,AsyncMethodWithException 抛出一个异常,MainWithException 方法通过 try - catch 块捕获并处理这个异常。

7. TPL 在实际项目中的应用案例

7.1 数据分析项目

在一个数据分析项目中,需要对大量的销售数据进行统计分析。数据存储在一个文件中,每行代表一笔销售记录。我们可以使用 TPL 来并行处理这些数据,提高分析速度。

string[] lines = File.ReadAllLines("salesdata.txt");
ConcurrentDictionary<string, decimal> productSales = new ConcurrentDictionary<string, decimal>();

Parallel.ForEach(lines, line =>
{
    string[] parts = line.Split(',');
    string product = parts[0];
    decimal amount = decimal.Parse(parts[1]);

    productSales.AddOrUpdate(product, amount, (key, oldValue) => oldValue + amount);
});

foreach (var pair in productSales)
{
    Console.WriteLine($"Product: {pair.Key}, Total Sales: {pair.Value}");
}

在这段代码中,我们使用 Parallel.ForEach 并行处理每一行销售数据,将每个产品的销售额累加到 ConcurrentDictionary 中,最后输出每个产品的总销售额。

7.2 图片处理项目

在一个图片处理项目中,需要对一批图片进行缩放处理。我们可以利用 TPL 并行处理每张图片,加快处理速度。

string[] imageFiles = Directory.GetFiles("images", "*.jpg");
foreach (string file in imageFiles)
{
    Task.Run(() =>
    {
        using (Image image = Image.FromFile(file))
        {
            int newWidth = image.Width / 2;
            int newHeight = image.Height / 2;
            using (Bitmap resizedImage = new Bitmap(newWidth, newHeight))
            {
                using (Graphics graphics = Graphics.FromImage(resizedImage))
                {
                    graphics.DrawImage(image, 0, 0, newWidth, newHeight);
                }
                resizedImage.Save($"resized_{Path.GetFileName(file)}");
            }
        }
    });
}

在上述代码中,我们通过 Task.Run 为每张图片创建一个独立的任务进行缩放处理,从而实现并行化图片处理。

8. TPL 性能优化与注意事项

8.1 性能优化

  • 合理设置并行度:根据系统的硬件资源和任务的特性,合理设置并行度。过高的并行度可能导致资源竞争和上下文切换开销增加,降低性能。例如,在 CPU 密集型任务中,可以根据 CPU 核心数设置最大并行度。
  • 减少锁的使用:锁会限制并行性,尽量使用线程安全的集合类或无锁数据结构来避免频繁使用锁。如 ConcurrentDictionary 等集合类可以在多线程环境下高效工作,无需手动锁。
  • 优化任务粒度:任务粒度不宜过大或过小。过大的任务粒度可能无法充分利用多核资源,过小的任务粒度会导致任务创建和管理的开销增加。应根据任务的实际情况,找到合适的任务粒度。

8.2 注意事项

  • 线程安全:在并行编程中,务必确保对共享资源的访问是线程安全的。即使使用了线程安全的集合类,在进行复杂操作时,也可能需要额外的同步机制。
  • 异常处理:在并行任务中,异常处理需要特别注意。当一个任务抛出异常时,可能会影响整个并行操作。可以使用 Task.WhenAll 并结合 try - catch 块来捕获和处理多个任务中的异常。
  • 资源管理:并行任务可能会消耗大量的系统资源,如内存、CPU 等。要注意及时释放不再使用的资源,避免资源泄漏。例如,在使用完文件、数据库连接等资源后,要及时关闭和释放。

通过合理运用 TPL 的各种特性,注意性能优化和相关注意事项,我们能够在 C# 中高效地进行并行编程,充分利用多核处理器的优势,提升应用程序的性能和响应性。