Kotlin多线程同步与并发控制
Kotlin 多线程同步与并发控制
多线程基础概念
在深入探讨 Kotlin 的多线程同步与并发控制之前,先回顾一下多线程的基本概念。线程是程序执行中的一个单一顺序控制流程,而多线程则允许在一个程序中同时运行多个线程,每个线程都可以独立执行不同的任务。这种并行执行可以提高程序的效率,特别是在处理 I/O 操作、计算密集型任务等场景下。
在 Kotlin 中,多线程编程基于 Java 的线程模型,因为 Kotlin 运行在 Java 虚拟机(JVM)之上。Java 的线程模型提供了基础的线程创建、启动、暂停和终止等操作,Kotlin 在这个基础上提供了更简洁和现代的语法来处理多线程相关的任务。
线程创建与启动
在 Kotlin 中创建线程有几种常见的方式。
-
继承 Thread 类
class MyThread : Thread() { override fun run() { println("Thread is running") } } fun main() { val myThread = MyThread() myThread.start() }
在上述代码中,定义了一个
MyThread
类继承自Thread
类,并覆盖了run
方法。run
方法中的代码就是线程要执行的任务。在main
函数中创建了MyThread
的实例并调用start
方法启动线程。start
方法会使线程进入就绪状态,等待 CPU 调度执行run
方法中的代码。 -
实现 Runnable 接口
class MyRunnable : Runnable { override fun run() { println("Runnable is running") } } fun main() { val myRunnable = MyRunnable() val thread = Thread(myRunnable) thread.start() }
这里定义了
MyRunnable
类实现Runnable
接口,Runnable
接口只有一个run
方法。在main
函数中,创建MyRunnable
实例并将其作为参数传递给Thread
构造函数来创建线程,然后调用start
方法启动线程。 -
使用 Kotlin 的 Thread 构造函数
fun main() { val thread = Thread { println("Thread created with lambda is running") } thread.start() }
这种方式利用 Kotlin 的 lambda 表达式直接在
Thread
构造函数中定义线程的执行逻辑,更加简洁。
多线程同步问题
当多个线程同时访问和修改共享资源时,就会出现多线程同步问题。例如,假设有两个线程同时对一个共享的计数器进行加一操作,如果没有适当的同步机制,可能会导致数据不一致。
示例:多线程对共享变量操作
class Counter {
var count = 0
}
fun main() {
val counter = Counter()
val thread1 = Thread {
for (i in 1..1000) {
counter.count++
}
}
val thread2 = Thread {
for (i in 1..1000) {
counter.count++
}
}
thread1.start()
thread2.start()
thread1.join()
thread2.join()
println("Final count: ${counter.count}")
}
在上述代码中,Counter
类有一个 count
变量,thread1
和 thread2
两个线程同时对 count
进行加一操作。理论上,最终的 count
值应该是 2000,但由于多线程并发访问,实际运行结果可能小于 2000。这是因为当一个线程读取 count
的值,还未进行加一操作时,另一个线程也读取了相同的值,导致部分加一操作丢失。
同步机制
为了解决多线程同步问题,Kotlin 借助 Java 的同步机制,主要包括以下几种方式。
synchronized 关键字
-
同步方法
class SynchronizedCounter { var count = 0 synchronized fun increment() { count++ } } fun main() { val counter = SynchronizedCounter() val thread1 = Thread { for (i in 1..1000) { counter.increment() } } val thread2 = Thread { for (i in 1..1000) { counter.increment() } } thread1.start() thread2.start() thread1.join() thread2.join() println("Final count: ${counter.count}") }
在
SynchronizedCounter
类中,increment
方法被声明为synchronized
。这意味着当一个线程进入increment
方法时,它会获取this
对象的锁,其他线程如果想进入该方法,必须等待锁被释放。这样就保证了同一时间只有一个线程可以执行increment
方法,从而避免了数据不一致的问题。 -
同步块
class BlockSynchronizedCounter { var count = 0 val lock = Any() fun increment() { synchronized(lock) { count++ } } } fun main() { val counter = BlockSynchronizedCounter() val thread1 = Thread { for (i in 1..1000) { counter.increment() } } val thread2 = Thread { for (i in 1..1000) { counter.increment() } } thread1.start() thread2.start() thread1.join() thread2.join() println("Final count: ${counter.count}") }
这里使用
synchronized
块,通过一个lock
对象作为锁。当一个线程进入synchronized(lock)
块时,会获取lock
对象的锁,保证同一时间只有一个线程可以执行块内的代码。使用synchronized
块的好处是可以更细粒度地控制同步范围,而不是像同步方法那样对整个方法进行同步。
ReentrantLock
ReentrantLock
是 Java 提供的一种更灵活的锁机制,在 Kotlin 中同样可以使用。它提供了比 synchronized
关键字更强大的功能,例如可中断的锁获取、公平锁等。
import java.util.concurrent.locks.ReentrantLock
class ReentrantLockCounter {
private val lock = ReentrantLock()
var count = 0
fun increment() {
lock.lock()
try {
count++
} finally {
lock.unlock()
}
}
}
fun main() {
val counter = ReentrantLockCounter()
val thread1 = Thread {
for (i in 1..1000) {
counter.increment()
}
}
val thread2 = Thread {
for (i in 1..1000) {
counter.increment()
}
}
thread1.start()
thread2.start()
thread1.join()
thread2.join()
println("Final count: ${counter.count}")
}
在上述代码中,ReentrantLockCounter
类使用 ReentrantLock
来保护 count
变量的操作。在 increment
方法中,首先调用 lock.lock()
获取锁,执行完操作后在 finally
块中调用 lock.unlock()
释放锁,确保即使在操作过程中出现异常,锁也能被正确释放。
并发控制工具
除了同步机制,Kotlin 还可以利用一些并发控制工具来管理多线程的执行。
CountDownLatch
CountDownLatch
允许一个或多个线程等待其他线程完成一组操作。
import java.util.concurrent.CountDownLatch
fun main() {
val latch = CountDownLatch(3)
val thread1 = Thread {
println("Thread 1 is doing some work")
Thread.sleep(1000)
println("Thread 1 is done")
latch.countDown()
}
val thread2 = Thread {
println("Thread 2 is doing some work")
Thread.sleep(1500)
println("Thread 2 is done")
latch.countDown()
}
val thread3 = Thread {
println("Thread 3 is doing some work")
Thread.sleep(2000)
println("Thread 3 is done")
latch.countDown()
}
thread1.start()
thread2.start()
thread3.start()
println("Main thread is waiting for all threads to finish")
latch.await()
println("All threads have finished, main thread can continue")
}
在这个例子中,CountDownLatch
初始化为 3,thread1
、thread2
和 thread3
线程在完成工作后调用 latch.countDown()
使计数器减一。主线程调用 latch.await()
等待计数器变为 0,即所有线程完成工作后才继续执行。
CyclicBarrier
CyclicBarrier
用于让一组线程互相等待,直到所有线程都到达某个点(屏障点),然后再一起继续执行。
import java.util.concurrent.CyclicBarrier
fun main() {
val barrier = CyclicBarrier(3) {
println("All threads have reached the barrier")
}
val thread1 = Thread {
println("Thread 1 is doing some work")
Thread.sleep(1000)
println("Thread 1 is reaching the barrier")
try {
barrier.await()
} catch (e: Exception) {
e.printStackTrace()
}
println("Thread 1 is continuing after the barrier")
}
val thread2 = Thread {
println("Thread 2 is doing some work")
Thread.sleep(1500)
println("Thread 2 is reaching the barrier")
try {
barrier.await()
} catch (e: Exception) {
e.printStackTrace()
}
println("Thread 2 is continuing after the barrier")
}
val thread3 = Thread {
println("Thread 3 is doing some work")
Thread.sleep(2000)
println("Thread 3 is reaching the barrier")
try {
barrier.await()
} catch (e: Exception) {
e.printStackTrace()
}
println("Thread 3 is continuing after the barrier")
}
thread1.start()
thread2.start()
thread3.start()
}
这里 CyclicBarrier
初始化参数为 3,表示需要 3 个线程到达屏障点。当所有线程调用 barrier.await()
到达屏障点时,会执行 CyclicBarrier
构造函数中传入的 Runnable 代码块,然后所有线程继续执行后续代码。而且 CyclicBarrier
可以重复使用,不像 CountDownLatch
只能使用一次。
Semaphore
Semaphore
是一个计数信号量,它控制同时访问某个资源的线程数量。
import java.util.concurrent.Semaphore
fun main() {
val semaphore = Semaphore(2)
val thread1 = Thread {
try {
semaphore.acquire()
println("Thread 1 has acquired the semaphore")
Thread.sleep(2000)
println("Thread 1 is releasing the semaphore")
semaphore.release()
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
val thread2 = Thread {
try {
semaphore.acquire()
println("Thread 2 has acquired the semaphore")
Thread.sleep(1500)
println("Thread 2 is releasing the semaphore")
semaphore.release()
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
val thread3 = Thread {
try {
semaphore.acquire()
println("Thread 3 has acquired the semaphore")
Thread.sleep(1000)
println("Thread 3 is releasing the semaphore")
semaphore.release()
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
thread1.start()
thread2.start()
thread3.start()
}
在上述代码中,Semaphore
初始化为 2,意味着最多允许 2 个线程同时获取信号量。线程通过 semaphore.acquire()
获取信号量,如果没有可用的信号量,线程会被阻塞,直到有其他线程调用 semaphore.release()
释放信号量。
线程安全的数据结构
在多线程编程中,使用线程安全的数据结构可以避免很多同步问题。Kotlin 可以使用 Java 提供的线程安全数据结构,例如 ConcurrentHashMap
、CopyOnWriteArrayList
等。
ConcurrentHashMap
ConcurrentHashMap
是线程安全的哈希表,它允许多个线程同时读,并且在一定程度上支持并发写。
import java.util.concurrent.ConcurrentHashMap
fun main() {
val map = ConcurrentHashMap<String, Int>()
val thread1 = Thread {
for (i in 1..10) {
map.put("Key$i", i)
}
}
val thread2 = Thread {
for (i in 11..20) {
map.put("Key$i", i)
}
}
thread1.start()
thread2.start()
thread1.join()
thread2.join()
println("Map contents: $map")
}
在这个例子中,ConcurrentHashMap
确保了多个线程可以安全地同时对其进行读写操作,而不需要额外的同步机制(虽然某些复杂操作可能仍需要同步)。
CopyOnWriteArrayList
CopyOnWriteArrayList
是一个线程安全的列表,它在进行写操作(如添加、删除元素)时,会创建一个原列表的副本,在副本上进行操作,然后将原列表引用指向新的副本。读操作则直接读取原列表,这样读操作和写操作可以并发执行,并且读操作不会被写操作阻塞。
import java.util.concurrent.CopyOnWriteArrayList
fun main() {
val list = CopyOnWriteArrayList<String>()
val thread1 = Thread {
for (i in 1..10) {
list.add("Element$i")
}
}
val thread2 = Thread {
for (element in list) {
println("Thread 2 read: $element")
}
}
thread1.start()
thread2.start()
thread1.join()
thread2.join()
println("List contents: $list")
}
在上述代码中,thread1
向 CopyOnWriteArrayList
中添加元素,thread2
同时读取列表元素,由于 CopyOnWriteArrayList
的特性,thread2
可以安全地读取列表,而不会受到 thread1
写操作的影响。
并发集合框架
Kotlin 标准库也提供了一些并发集合框架,如 ConcurrentHashSet
、ConcurrentLinkedQueue
等,这些集合在多线程环境下使用更加方便和安全。
ConcurrentHashSet
ConcurrentHashSet
是线程安全的哈希集合,它基于 ConcurrentHashMap
实现。
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentHashSet
fun main() {
val set = ConcurrentHashSet<String>()
val thread1 = Thread {
for (i in 1..10) {
set.add("Element$i")
}
}
val thread2 = Thread {
for (i in 11..20) {
set.add("Element$i")
}
}
thread1.start()
thread2.start()
thread1.join()
thread2.join()
println("Set contents: $set")
}
在这个例子中,ConcurrentHashSet
保证了多个线程可以安全地同时向集合中添加元素,不会出现数据不一致的问题。
ConcurrentLinkedQueue
ConcurrentLinkedQueue
是一个线程安全的无界队列,它使用链表实现,支持高效的并发插入和删除操作。
import java.util.concurrent.ConcurrentLinkedQueue
fun main() {
val queue = ConcurrentLinkedQueue<String>()
val thread1 = Thread {
for (i in 1..10) {
queue.add("Element$i")
}
}
val thread2 = Thread {
var element: String?
while (queue.isNotEmpty()) {
element = queue.poll()
element?.let { println("Thread 2 polled: $it") }
}
}
thread1.start()
thread2.start()
thread1.join()
thread2.join()
println("Queue is empty: ${queue.isEmpty()}")
}
在上述代码中,thread1
向 ConcurrentLinkedQueue
中添加元素,thread2
从队列中取出元素。ConcurrentLinkedQueue
保证了在多线程环境下插入和删除操作的线程安全性。
线程池
线程池是管理和复用线程的一种机制,可以避免频繁创建和销毁线程带来的开销。Kotlin 可以使用 Java 的线程池框架,如 ThreadPoolExecutor
以及一些便捷的工厂方法来创建线程池。
创建线程池
-
使用 ThreadPoolExecutor
import java.util.concurrent.BlockingQueue import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit fun main() { val corePoolSize = 2 val maximumPoolSize = 4 val keepAliveTime = 10L val unit = TimeUnit.SECONDS val workQueue: BlockingQueue<Runnable> = LinkedBlockingQueue() val executor = ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue ) for (i in 1..10) { val task = Runnable { println("Task $i is running on thread ${Thread.currentThread().name}") Thread.sleep(2000) } executor.submit(task) } executor.shutdown() try { if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { executor.shutdownNow() if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { println("Pool did not terminate") } } } catch (interrupted: InterruptedException) { executor.shutdownNow() Thread.currentThread().interrupt() } }
在上述代码中,通过
ThreadPoolExecutor
的构造函数创建了一个线程池。corePoolSize
表示线程池的核心线程数,即线程池在正常情况下保持的线程数量;maximumPoolSize
表示线程池允许的最大线程数;keepAliveTime
和unit
定义了非核心线程在没有任务执行时的存活时间;workQueue
是一个阻塞队列,用于存放等待执行的任务。通过executor.submit(task)
向线程池提交任务,任务会在有可用线程时执行。最后通过executor.shutdown()
和awaitTermination
方法关闭线程池并等待所有任务执行完毕。 -
使用 Executors 工厂方法
import java.util.concurrent.Executors import java.util.concurrent.ExecutorService fun main() { val executor: ExecutorService = Executors.newFixedThreadPool(3) for (i in 1..10) { val task = Runnable { println("Task $i is running on thread ${Thread.currentThread().name}") Thread.sleep(2000) } executor.submit(task) } executor.shutdown() try { if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { executor.shutdownNow() if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { println("Pool did not terminate") } } } catch (interrupted: InterruptedException) { executor.shutdownNow() Thread.currentThread().interrupt() } }
这里使用
Executors.newFixedThreadPool(3)
创建了一个固定大小为 3 的线程池。newFixedThreadPool
是Executors
类提供的便捷工厂方法,它创建的线程池核心线程数和最大线程数相等,并且线程池中的线程会一直存活,不会因为空闲而被销毁。
异步编程与协程
在 Kotlin 中,除了传统的多线程编程方式,还引入了协程来进行异步编程。协程提供了一种更简洁、更高效的异步编程模型,它基于轻量级线程(纤程),可以在不阻塞主线程的情况下执行异步任务。
协程基础
-
创建和启动协程
import kotlinx.coroutines.* fun main() = runBlocking { val job = launch { println("Coroutine is running") delay(2000) println("Coroutine has finished") } job.join() }
在上述代码中,使用
launch
函数创建了一个协程。launch
函数返回一个Job
对象,可以用于控制协程的生命周期。runBlocking
函数用于阻塞当前线程,直到其内部的协程执行完毕。delay
函数用于暂停协程的执行,模拟异步操作。 -
协程的返回值
import kotlinx.coroutines.* fun main() = runBlocking { val result = async { println("Calculating result") delay(2000) 42 }.await() println("Result is $result") }
这里使用
async
函数创建了一个带有返回值的协程。async
函数返回一个Deferred
对象,通过调用await
方法可以获取协程的返回值。await
方法会暂停当前协程,直到async
协程执行完毕并返回结果。
协程与多线程
虽然协程是轻量级的异步编程模型,但在底层它可以利用线程池来执行异步任务。Kotlin 提供了不同的调度器来控制协程在哪些线程上执行。
-
Dispatchers.Default
import kotlinx.coroutines.* fun main() = runBlocking { launch(Dispatchers.Default) { println("Coroutine is running on ${Thread.currentThread().name}") delay(2000) println("Coroutine has finished") } println("Main thread is not blocked") }
Dispatchers.Default
调度器使用一个共享的后台线程池,适用于 CPU 密集型任务。在上述代码中,协程在Dispatchers.Default
调度器上执行,主线程不会被阻塞。 -
Dispatchers.IO
import kotlinx.coroutines.* fun main() = runBlocking { launch(Dispatchers.IO) { println("Coroutine is running on ${Thread.currentThread().name}") delay(2000) println("Coroutine has finished") } println("Main thread is not blocked") }
Dispatchers.IO
调度器也使用一个线程池,但专门用于 I/O 操作,例如文件读写、网络请求等。它的线程池会根据负载动态调整线程数量,以优化 I/O 性能。
协程同步
在协程中也会遇到同步问题,特别是当多个协程同时访问共享资源时。Kotlin 协程提供了一些同步机制来解决这些问题。
-
Mutex
import kotlinx.coroutines.* import kotlinx.coroutines.sync.Mutex val mutex = Mutex() var sharedValue = 0 fun main() = runBlocking { val job1 = launch { mutex.lock() try { for (i in 1..1000) { sharedValue++ } } finally { mutex.unlock() } } val job2 = launch { mutex.lock() try { for (i in 1..1000) { sharedValue++ } } finally { mutex.unlock() } } job1.join() job2.join() println("Final shared value: $sharedValue") }
在上述代码中,
Mutex
用于保护sharedValue
变量的操作。mutex.lock()
用于获取锁,mutex.unlock()
用于释放锁,确保同一时间只有一个协程可以修改sharedValue
。 -
Semaphore
import kotlinx.coroutines.* import kotlinx.coroutines.sync.Semaphore val semaphore = Semaphore(2) fun main() = runBlocking { val job1 = launch { semaphore.acquire() try { println("Job 1 has acquired the semaphore") delay(2000) println("Job 1 is releasing the semaphore") } finally { semaphore.release() } } val job2 = launch { semaphore.acquire() try { println("Job 2 has acquired the semaphore") delay(1500) println("Job 2 is releasing the semaphore") } finally { semaphore.release() } } val job3 = launch { semaphore.acquire() try { println("Job 3 has acquired the semaphore") delay(1000) println("Job 3 is releasing the semaphore") } finally { semaphore.release() } } job1.join() job2.join() job3.join() }
这里
Semaphore
初始化为 2,意味着最多允许 2 个协程同时获取信号量,从而控制并发访问。
通过以上对 Kotlin 多线程同步与并发控制的全面介绍,包括传统的多线程机制、并发控制工具、线程安全的数据结构、线程池以及协程等内容,开发者可以根据具体的应用场景选择合适的方式来实现高效、安全的多线程编程。无论是处理 CPU 密集型任务、I/O 操作还是复杂的并发业务逻辑,Kotlin 都提供了丰富的工具和灵活的编程模型来满足需求。