MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Kotlin多线程同步与并发控制

2021-02-252.5k 阅读

Kotlin 多线程同步与并发控制

多线程基础概念

在深入探讨 Kotlin 的多线程同步与并发控制之前,先回顾一下多线程的基本概念。线程是程序执行中的一个单一顺序控制流程,而多线程则允许在一个程序中同时运行多个线程,每个线程都可以独立执行不同的任务。这种并行执行可以提高程序的效率,特别是在处理 I/O 操作、计算密集型任务等场景下。

在 Kotlin 中,多线程编程基于 Java 的线程模型,因为 Kotlin 运行在 Java 虚拟机(JVM)之上。Java 的线程模型提供了基础的线程创建、启动、暂停和终止等操作,Kotlin 在这个基础上提供了更简洁和现代的语法来处理多线程相关的任务。

线程创建与启动

在 Kotlin 中创建线程有几种常见的方式。

  1. 继承 Thread 类

    class MyThread : Thread() {
        override fun run() {
            println("Thread is running")
        }
    }
    
    fun main() {
        val myThread = MyThread()
        myThread.start()
    }
    

    在上述代码中,定义了一个 MyThread 类继承自 Thread 类,并覆盖了 run 方法。run 方法中的代码就是线程要执行的任务。在 main 函数中创建了 MyThread 的实例并调用 start 方法启动线程。start 方法会使线程进入就绪状态,等待 CPU 调度执行 run 方法中的代码。

  2. 实现 Runnable 接口

    class MyRunnable : Runnable {
        override fun run() {
            println("Runnable is running")
        }
    }
    
    fun main() {
        val myRunnable = MyRunnable()
        val thread = Thread(myRunnable)
        thread.start()
    }
    

    这里定义了 MyRunnable 类实现 Runnable 接口,Runnable 接口只有一个 run 方法。在 main 函数中,创建 MyRunnable 实例并将其作为参数传递给 Thread 构造函数来创建线程,然后调用 start 方法启动线程。

  3. 使用 Kotlin 的 Thread 构造函数

    fun main() {
        val thread = Thread {
            println("Thread created with lambda is running")
        }
        thread.start()
    }
    

    这种方式利用 Kotlin 的 lambda 表达式直接在 Thread 构造函数中定义线程的执行逻辑,更加简洁。

多线程同步问题

当多个线程同时访问和修改共享资源时,就会出现多线程同步问题。例如,假设有两个线程同时对一个共享的计数器进行加一操作,如果没有适当的同步机制,可能会导致数据不一致。

示例:多线程对共享变量操作

class Counter {
    var count = 0
}

fun main() {
    val counter = Counter()
    val thread1 = Thread {
        for (i in 1..1000) {
            counter.count++
        }
    }
    val thread2 = Thread {
        for (i in 1..1000) {
            counter.count++
        }
    }
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    println("Final count: ${counter.count}")
}

在上述代码中,Counter 类有一个 count 变量,thread1thread2 两个线程同时对 count 进行加一操作。理论上,最终的 count 值应该是 2000,但由于多线程并发访问,实际运行结果可能小于 2000。这是因为当一个线程读取 count 的值,还未进行加一操作时,另一个线程也读取了相同的值,导致部分加一操作丢失。

同步机制

为了解决多线程同步问题,Kotlin 借助 Java 的同步机制,主要包括以下几种方式。

synchronized 关键字

  1. 同步方法

    class SynchronizedCounter {
        var count = 0
        synchronized fun increment() {
            count++
        }
    }
    
    fun main() {
        val counter = SynchronizedCounter()
        val thread1 = Thread {
            for (i in 1..1000) {
                counter.increment()
            }
        }
        val thread2 = Thread {
            for (i in 1..1000) {
                counter.increment()
            }
        }
        thread1.start()
        thread2.start()
        thread1.join()
        thread2.join()
        println("Final count: ${counter.count}")
    }
    

    SynchronizedCounter 类中,increment 方法被声明为 synchronized。这意味着当一个线程进入 increment 方法时,它会获取 this 对象的锁,其他线程如果想进入该方法,必须等待锁被释放。这样就保证了同一时间只有一个线程可以执行 increment 方法,从而避免了数据不一致的问题。

  2. 同步块

    class BlockSynchronizedCounter {
        var count = 0
        val lock = Any()
        fun increment() {
            synchronized(lock) {
                count++
            }
        }
    }
    
    fun main() {
        val counter = BlockSynchronizedCounter()
        val thread1 = Thread {
            for (i in 1..1000) {
                counter.increment()
            }
        }
        val thread2 = Thread {
            for (i in 1..1000) {
                counter.increment()
            }
        }
        thread1.start()
        thread2.start()
        thread1.join()
        thread2.join()
        println("Final count: ${counter.count}")
    }
    

    这里使用 synchronized 块,通过一个 lock 对象作为锁。当一个线程进入 synchronized(lock) 块时,会获取 lock 对象的锁,保证同一时间只有一个线程可以执行块内的代码。使用 synchronized 块的好处是可以更细粒度地控制同步范围,而不是像同步方法那样对整个方法进行同步。

ReentrantLock

ReentrantLock 是 Java 提供的一种更灵活的锁机制,在 Kotlin 中同样可以使用。它提供了比 synchronized 关键字更强大的功能,例如可中断的锁获取、公平锁等。

import java.util.concurrent.locks.ReentrantLock

class ReentrantLockCounter {
    private val lock = ReentrantLock()
    var count = 0
    fun increment() {
        lock.lock()
        try {
            count++
        } finally {
            lock.unlock()
        }
    }
}

fun main() {
    val counter = ReentrantLockCounter()
    val thread1 = Thread {
        for (i in 1..1000) {
            counter.increment()
        }
    }
    val thread2 = Thread {
        for (i in 1..1000) {
            counter.increment()
        }
    }
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    println("Final count: ${counter.count}")
}

在上述代码中,ReentrantLockCounter 类使用 ReentrantLock 来保护 count 变量的操作。在 increment 方法中,首先调用 lock.lock() 获取锁,执行完操作后在 finally 块中调用 lock.unlock() 释放锁,确保即使在操作过程中出现异常,锁也能被正确释放。

并发控制工具

除了同步机制,Kotlin 还可以利用一些并发控制工具来管理多线程的执行。

CountDownLatch

CountDownLatch 允许一个或多个线程等待其他线程完成一组操作。

import java.util.concurrent.CountDownLatch

fun main() {
    val latch = CountDownLatch(3)
    val thread1 = Thread {
        println("Thread 1 is doing some work")
        Thread.sleep(1000)
        println("Thread 1 is done")
        latch.countDown()
    }
    val thread2 = Thread {
        println("Thread 2 is doing some work")
        Thread.sleep(1500)
        println("Thread 2 is done")
        latch.countDown()
    }
    val thread3 = Thread {
        println("Thread 3 is doing some work")
        Thread.sleep(2000)
        println("Thread 3 is done")
        latch.countDown()
    }
    thread1.start()
    thread2.start()
    thread3.start()
    println("Main thread is waiting for all threads to finish")
    latch.await()
    println("All threads have finished, main thread can continue")
}

在这个例子中,CountDownLatch 初始化为 3,thread1thread2thread3 线程在完成工作后调用 latch.countDown() 使计数器减一。主线程调用 latch.await() 等待计数器变为 0,即所有线程完成工作后才继续执行。

CyclicBarrier

CyclicBarrier 用于让一组线程互相等待,直到所有线程都到达某个点(屏障点),然后再一起继续执行。

import java.util.concurrent.CyclicBarrier

fun main() {
    val barrier = CyclicBarrier(3) {
        println("All threads have reached the barrier")
    }
    val thread1 = Thread {
        println("Thread 1 is doing some work")
        Thread.sleep(1000)
        println("Thread 1 is reaching the barrier")
        try {
            barrier.await()
        } catch (e: Exception) {
            e.printStackTrace()
        }
        println("Thread 1 is continuing after the barrier")
    }
    val thread2 = Thread {
        println("Thread 2 is doing some work")
        Thread.sleep(1500)
        println("Thread 2 is reaching the barrier")
        try {
            barrier.await()
        } catch (e: Exception) {
            e.printStackTrace()
        }
        println("Thread 2 is continuing after the barrier")
    }
    val thread3 = Thread {
        println("Thread 3 is doing some work")
        Thread.sleep(2000)
        println("Thread 3 is reaching the barrier")
        try {
            barrier.await()
        } catch (e: Exception) {
            e.printStackTrace()
        }
        println("Thread 3 is continuing after the barrier")
    }
    thread1.start()
    thread2.start()
    thread3.start()
}

这里 CyclicBarrier 初始化参数为 3,表示需要 3 个线程到达屏障点。当所有线程调用 barrier.await() 到达屏障点时,会执行 CyclicBarrier 构造函数中传入的 Runnable 代码块,然后所有线程继续执行后续代码。而且 CyclicBarrier 可以重复使用,不像 CountDownLatch 只能使用一次。

Semaphore

Semaphore 是一个计数信号量,它控制同时访问某个资源的线程数量。

import java.util.concurrent.Semaphore

fun main() {
    val semaphore = Semaphore(2)
    val thread1 = Thread {
        try {
            semaphore.acquire()
            println("Thread 1 has acquired the semaphore")
            Thread.sleep(2000)
            println("Thread 1 is releasing the semaphore")
            semaphore.release()
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }
    }
    val thread2 = Thread {
        try {
            semaphore.acquire()
            println("Thread 2 has acquired the semaphore")
            Thread.sleep(1500)
            println("Thread 2 is releasing the semaphore")
            semaphore.release()
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }
    }
    val thread3 = Thread {
        try {
            semaphore.acquire()
            println("Thread 3 has acquired the semaphore")
            Thread.sleep(1000)
            println("Thread 3 is releasing the semaphore")
            semaphore.release()
        } catch (e: InterruptedException) {
            e.printStackTrace()
        }
    }
    thread1.start()
    thread2.start()
    thread3.start()
}

在上述代码中,Semaphore 初始化为 2,意味着最多允许 2 个线程同时获取信号量。线程通过 semaphore.acquire() 获取信号量,如果没有可用的信号量,线程会被阻塞,直到有其他线程调用 semaphore.release() 释放信号量。

线程安全的数据结构

在多线程编程中,使用线程安全的数据结构可以避免很多同步问题。Kotlin 可以使用 Java 提供的线程安全数据结构,例如 ConcurrentHashMapCopyOnWriteArrayList 等。

ConcurrentHashMap

ConcurrentHashMap 是线程安全的哈希表,它允许多个线程同时读,并且在一定程度上支持并发写。

import java.util.concurrent.ConcurrentHashMap

fun main() {
    val map = ConcurrentHashMap<String, Int>()
    val thread1 = Thread {
        for (i in 1..10) {
            map.put("Key$i", i)
        }
    }
    val thread2 = Thread {
        for (i in 11..20) {
            map.put("Key$i", i)
        }
    }
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    println("Map contents: $map")
}

在这个例子中,ConcurrentHashMap 确保了多个线程可以安全地同时对其进行读写操作,而不需要额外的同步机制(虽然某些复杂操作可能仍需要同步)。

CopyOnWriteArrayList

CopyOnWriteArrayList 是一个线程安全的列表,它在进行写操作(如添加、删除元素)时,会创建一个原列表的副本,在副本上进行操作,然后将原列表引用指向新的副本。读操作则直接读取原列表,这样读操作和写操作可以并发执行,并且读操作不会被写操作阻塞。

import java.util.concurrent.CopyOnWriteArrayList

fun main() {
    val list = CopyOnWriteArrayList<String>()
    val thread1 = Thread {
        for (i in 1..10) {
            list.add("Element$i")
        }
    }
    val thread2 = Thread {
        for (element in list) {
            println("Thread 2 read: $element")
        }
    }
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    println("List contents: $list")
}

在上述代码中,thread1CopyOnWriteArrayList 中添加元素,thread2 同时读取列表元素,由于 CopyOnWriteArrayList 的特性,thread2 可以安全地读取列表,而不会受到 thread1 写操作的影响。

并发集合框架

Kotlin 标准库也提供了一些并发集合框架,如 ConcurrentHashSetConcurrentLinkedQueue 等,这些集合在多线程环境下使用更加方便和安全。

ConcurrentHashSet

ConcurrentHashSet 是线程安全的哈希集合,它基于 ConcurrentHashMap 实现。

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentHashSet

fun main() {
    val set = ConcurrentHashSet<String>()
    val thread1 = Thread {
        for (i in 1..10) {
            set.add("Element$i")
        }
    }
    val thread2 = Thread {
        for (i in 11..20) {
            set.add("Element$i")
        }
    }
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    println("Set contents: $set")
}

在这个例子中,ConcurrentHashSet 保证了多个线程可以安全地同时向集合中添加元素,不会出现数据不一致的问题。

ConcurrentLinkedQueue

ConcurrentLinkedQueue 是一个线程安全的无界队列,它使用链表实现,支持高效的并发插入和删除操作。

import java.util.concurrent.ConcurrentLinkedQueue

fun main() {
    val queue = ConcurrentLinkedQueue<String>()
    val thread1 = Thread {
        for (i in 1..10) {
            queue.add("Element$i")
        }
    }
    val thread2 = Thread {
        var element: String?
        while (queue.isNotEmpty()) {
            element = queue.poll()
            element?.let { println("Thread 2 polled: $it") }
        }
    }
    thread1.start()
    thread2.start()
    thread1.join()
    thread2.join()
    println("Queue is empty: ${queue.isEmpty()}")
}

在上述代码中,thread1ConcurrentLinkedQueue 中添加元素,thread2 从队列中取出元素。ConcurrentLinkedQueue 保证了在多线程环境下插入和删除操作的线程安全性。

线程池

线程池是管理和复用线程的一种机制,可以避免频繁创建和销毁线程带来的开销。Kotlin 可以使用 Java 的线程池框架,如 ThreadPoolExecutor 以及一些便捷的工厂方法来创建线程池。

创建线程池

  1. 使用 ThreadPoolExecutor

    import java.util.concurrent.BlockingQueue
    import java.util.concurrent.LinkedBlockingQueue
    import java.util.concurrent.ThreadPoolExecutor
    import java.util.concurrent.TimeUnit
    
    fun main() {
        val corePoolSize = 2
        val maximumPoolSize = 4
        val keepAliveTime = 10L
        val unit = TimeUnit.SECONDS
        val workQueue: BlockingQueue<Runnable> = LinkedBlockingQueue()
        val executor = ThreadPoolExecutor(
            corePoolSize,
            maximumPoolSize,
            keepAliveTime,
            unit,
            workQueue
        )
        for (i in 1..10) {
            val task = Runnable {
                println("Task $i is running on thread ${Thread.currentThread().name}")
                Thread.sleep(2000)
            }
            executor.submit(task)
        }
        executor.shutdown()
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow()
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    println("Pool did not terminate")
                }
            }
        } catch (interrupted: InterruptedException) {
            executor.shutdownNow()
            Thread.currentThread().interrupt()
        }
    }
    

    在上述代码中,通过 ThreadPoolExecutor 的构造函数创建了一个线程池。corePoolSize 表示线程池的核心线程数,即线程池在正常情况下保持的线程数量;maximumPoolSize 表示线程池允许的最大线程数;keepAliveTimeunit 定义了非核心线程在没有任务执行时的存活时间;workQueue 是一个阻塞队列,用于存放等待执行的任务。通过 executor.submit(task) 向线程池提交任务,任务会在有可用线程时执行。最后通过 executor.shutdown()awaitTermination 方法关闭线程池并等待所有任务执行完毕。

  2. 使用 Executors 工厂方法

    import java.util.concurrent.Executors
    import java.util.concurrent.ExecutorService
    
    fun main() {
        val executor: ExecutorService = Executors.newFixedThreadPool(3)
        for (i in 1..10) {
            val task = Runnable {
                println("Task $i is running on thread ${Thread.currentThread().name}")
                Thread.sleep(2000)
            }
            executor.submit(task)
        }
        executor.shutdown()
        try {
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow()
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    println("Pool did not terminate")
                }
            }
        } catch (interrupted: InterruptedException) {
            executor.shutdownNow()
            Thread.currentThread().interrupt()
        }
    }
    

    这里使用 Executors.newFixedThreadPool(3) 创建了一个固定大小为 3 的线程池。newFixedThreadPoolExecutors 类提供的便捷工厂方法,它创建的线程池核心线程数和最大线程数相等,并且线程池中的线程会一直存活,不会因为空闲而被销毁。

异步编程与协程

在 Kotlin 中,除了传统的多线程编程方式,还引入了协程来进行异步编程。协程提供了一种更简洁、更高效的异步编程模型,它基于轻量级线程(纤程),可以在不阻塞主线程的情况下执行异步任务。

协程基础

  1. 创建和启动协程

    import kotlinx.coroutines.*
    
    fun main() = runBlocking {
        val job = launch {
            println("Coroutine is running")
            delay(2000)
            println("Coroutine has finished")
        }
        job.join()
    }
    

    在上述代码中,使用 launch 函数创建了一个协程。launch 函数返回一个 Job 对象,可以用于控制协程的生命周期。runBlocking 函数用于阻塞当前线程,直到其内部的协程执行完毕。delay 函数用于暂停协程的执行,模拟异步操作。

  2. 协程的返回值

    import kotlinx.coroutines.*
    
    fun main() = runBlocking {
        val result = async {
            println("Calculating result")
            delay(2000)
            42
        }.await()
        println("Result is $result")
    }
    

    这里使用 async 函数创建了一个带有返回值的协程。async 函数返回一个 Deferred 对象,通过调用 await 方法可以获取协程的返回值。await 方法会暂停当前协程,直到 async 协程执行完毕并返回结果。

协程与多线程

虽然协程是轻量级的异步编程模型,但在底层它可以利用线程池来执行异步任务。Kotlin 提供了不同的调度器来控制协程在哪些线程上执行。

  1. Dispatchers.Default

    import kotlinx.coroutines.*
    
    fun main() = runBlocking {
        launch(Dispatchers.Default) {
            println("Coroutine is running on ${Thread.currentThread().name}")
            delay(2000)
            println("Coroutine has finished")
        }
        println("Main thread is not blocked")
    }
    

    Dispatchers.Default 调度器使用一个共享的后台线程池,适用于 CPU 密集型任务。在上述代码中,协程在 Dispatchers.Default 调度器上执行,主线程不会被阻塞。

  2. Dispatchers.IO

    import kotlinx.coroutines.*
    
    fun main() = runBlocking {
        launch(Dispatchers.IO) {
            println("Coroutine is running on ${Thread.currentThread().name}")
            delay(2000)
            println("Coroutine has finished")
        }
        println("Main thread is not blocked")
    }
    

    Dispatchers.IO 调度器也使用一个线程池,但专门用于 I/O 操作,例如文件读写、网络请求等。它的线程池会根据负载动态调整线程数量,以优化 I/O 性能。

协程同步

在协程中也会遇到同步问题,特别是当多个协程同时访问共享资源时。Kotlin 协程提供了一些同步机制来解决这些问题。

  1. Mutex

    import kotlinx.coroutines.*
    import kotlinx.coroutines.sync.Mutex
    
    val mutex = Mutex()
    var sharedValue = 0
    
    fun main() = runBlocking {
        val job1 = launch {
            mutex.lock()
            try {
                for (i in 1..1000) {
                    sharedValue++
                }
            } finally {
                mutex.unlock()
            }
        }
        val job2 = launch {
            mutex.lock()
            try {
                for (i in 1..1000) {
                    sharedValue++
                }
            } finally {
                mutex.unlock()
            }
        }
        job1.join()
        job2.join()
        println("Final shared value: $sharedValue")
    }
    

    在上述代码中,Mutex 用于保护 sharedValue 变量的操作。mutex.lock() 用于获取锁,mutex.unlock() 用于释放锁,确保同一时间只有一个协程可以修改 sharedValue

  2. Semaphore

    import kotlinx.coroutines.*
    import kotlinx.coroutines.sync.Semaphore
    
    val semaphore = Semaphore(2)
    fun main() = runBlocking {
        val job1 = launch {
            semaphore.acquire()
            try {
                println("Job 1 has acquired the semaphore")
                delay(2000)
                println("Job 1 is releasing the semaphore")
            } finally {
                semaphore.release()
            }
        }
        val job2 = launch {
            semaphore.acquire()
            try {
                println("Job 2 has acquired the semaphore")
                delay(1500)
                println("Job 2 is releasing the semaphore")
            } finally {
                semaphore.release()
            }
        }
        val job3 = launch {
            semaphore.acquire()
            try {
                println("Job 3 has acquired the semaphore")
                delay(1000)
                println("Job 3 is releasing the semaphore")
            } finally {
                semaphore.release()
            }
        }
        job1.join()
        job2.join()
        job3.join()
    }
    

    这里 Semaphore 初始化为 2,意味着最多允许 2 个协程同时获取信号量,从而控制并发访问。

通过以上对 Kotlin 多线程同步与并发控制的全面介绍,包括传统的多线程机制、并发控制工具、线程安全的数据结构、线程池以及协程等内容,开发者可以根据具体的应用场景选择合适的方式来实现高效、安全的多线程编程。无论是处理 CPU 密集型任务、I/O 操作还是复杂的并发业务逻辑,Kotlin 都提供了丰富的工具和灵活的编程模型来满足需求。