MK
摩柯社区 - 一个极简的技术知识社区
AI 面试

Java并发数据结构的选择与使用

2022-04-092.3k 阅读

Java并发数据结构概述

在Java的并发编程领域,选择合适的数据结构对于程序的性能、正确性以及可维护性至关重要。不同的并发场景对数据结构有着不同的要求,例如,在高并发读而写操作较少的场景下,我们需要选择一种能高效支持读操作的数据结构;而在读写操作频率较为均衡的情况下,又需要另寻合适的方案。Java提供了丰富的并发数据结构,这些数据结构针对不同的并发访问模式进行了优化,以满足多样化的应用需求。

并发容器的分类

  1. 线程安全的集合类:Java早期通过对传统集合类(如VectorHashtable)进行同步包装来实现线程安全。例如,Vector中的每个方法都被synchronized关键字修饰,这使得在多线程环境下,同一时间只有一个线程能够访问该Vector的方法,从而保证了线程安全。然而,这种粗粒度的同步方式在高并发场景下会带来严重的性能瓶颈,因为所有线程都需要竞争同一把锁。
  2. 并发包中的容器:自Java 5.0引入java.util.concurrent包(简称JUC包)后,提供了一系列高性能的并发容器。这些容器采用了更细粒度的锁机制或者无锁算法,大大提高了并发性能。例如,ConcurrentHashMap使用分段锁技术,将数据分成多个段,每个段都有自己的锁,从而允许不同线程同时访问不同段的数据,提高了并发读和写的效率。

线程安全的集合类

Vector

Vector是Java最早提供的线程安全的动态数组。它继承自AbstractList,实现了List接口。Vector的方法大多被synchronized关键字修饰,这意味着在多线程环境下,同一时间只有一个线程能够调用Vector的方法。以下是一个简单的Vector使用示例:

import java.util.Vector;

public class VectorExample {
    public static void main(String[] args) {
        Vector<String> vector = new Vector<>();
        vector.add("element1");
        vector.add("element2");
        for (String element : vector) {
            System.out.println(element);
        }
    }
}

虽然Vector保证了线程安全,但由于其粗粒度的同步机制,在高并发场景下,大量线程竞争同一把锁会导致性能下降。特别是在写操作频繁时,这种性能瓶颈更为明显。

Hashtable

Hashtable是线程安全的哈希表,它实现了Map接口。与Vector类似,Hashtable的方法也是通过synchronized关键字进行同步的。下面是一个Hashtable的使用示例:

import java.util.Hashtable;

public class HashtableExample {
    public static void main(String[] args) {
        Hashtable<String, Integer> hashtable = new Hashtable<>();
        hashtable.put("key1", 1);
        hashtable.put("key2", 2);
        for (String key : hashtable.keySet()) {
            System.out.println("Key: " + key + ", Value: " + hashtable.get(key));
        }
    }
}

Hashtable同样存在与Vector类似的性能问题,粗粒度的同步使得它在高并发环境下表现不佳。此外,Hashtable不允许键或值为null,这在某些场景下可能会带来不便。

并发包中的容器

ConcurrentHashMap

  1. 分段锁机制ConcurrentHashMap是Java并发包中非常重要的一个类,它采用了分段锁(Segment)机制来提高并发性能。在早期的实现中,ConcurrentHashMap将数据分成多个段(默认16个),每个段都有自己独立的锁。当一个线程访问某个段的数据时,只会锁住该段,而其他段的数据仍然可以被其他线程并发访问。例如,假设有两个线程thread1thread2thread1操作ConcurrentHashMap中属于段1的数据,thread2操作属于段2的数据,由于两个段的锁是独立的,所以thread1thread2可以同时进行操作,互不干扰。
  2. 读写性能:在高并发读场景下,ConcurrentHashMap具有出色的性能。因为读操作通常不需要获取锁(除了在某些情况下需要进行扩容等操作),多个线程可以同时读取不同段的数据,从而实现高效的并发读。对于写操作,虽然需要获取相应段的锁,但由于分段锁的粒度较细,相比Hashtable的全局锁,并发写的性能也有了显著提升。以下是一个ConcurrentHashMap的使用示例:
import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentHashMapExample {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> concurrentHashMap = new ConcurrentHashMap<>();
        concurrentHashMap.put("key1", 1);
        concurrentHashMap.put("key2", 2);
        for (String key : concurrentHashMap.keySet()) {
            System.out.println("Key: " + key + ", Value: " + concurrentHashMap.get(key));
        }
    }
}
  1. 扩容机制:当ConcurrentHashMap中的元素数量达到一定阈值(负载因子与容量的乘积)时,会触发扩容操作。在扩容过程中,ConcurrentHashMap采用了一种复杂但高效的机制,允许部分数据的迁移在多个线程间并行进行,而不是像传统哈希表那样一次性全部迁移。这种方式减少了扩容对性能的影响,使得在高并发场景下,扩容操作能够更加平滑地进行。

CopyOnWriteArrayList

  1. 写时复制机制CopyOnWriteArrayList是一个线程安全的List实现,它采用了写时复制(Copy - On - Write,简称COW)的机制。当对CopyOnWriteArrayList进行写操作(如添加、删除元素)时,会先复制一份原数组,在新的数组上进行操作,操作完成后再将原数组引用指向新数组。而读操作则直接读取原数组,不需要加锁。这种机制使得读操作非常高效,因为读操作不会被写操作阻塞。例如,假设有多个线程同时读取CopyOnWriteArrayList,而此时有一个线程进行写操作,写操作会在新的数组上进行,不会影响其他线程的读操作。
  2. 适用场景CopyOnWriteArrayList适用于读操作远远多于写操作的场景。比如,在一些配置信息的存储场景中,配置信息一旦加载后很少被修改,但可能会被多个线程频繁读取,这种情况下CopyOnWriteArrayList就非常适用。以下是一个CopyOnWriteArrayList的使用示例:
import java.util.concurrent.CopyOnWriteArrayList;

public class CopyOnWriteArrayListExample {
    public static void main(String[] args) {
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        list.add("element1");
        list.add("element2");
        for (String element : list) {
            System.out.println(element);
        }
    }
}
  1. 局限性:然而,CopyOnWriteArrayList也有其局限性。由于写操作需要复制数组,所以写操作的开销较大,不适合写操作频繁的场景。此外,由于读操作读取的是写操作之前的数组副本,所以在写操作完成到读操作感知到变化之间存在一定的延迟,这在一些对数据一致性要求极高的场景下可能不适用。

CopyOnWriteArraySet

  1. 基于CopyOnWriteArrayList实现CopyOnWriteArraySet是一个线程安全的Set实现,它内部是基于CopyOnWriteArrayList实现的。与CopyOnWriteArrayList类似,CopyOnWriteArraySet也采用了写时复制的机制来保证线程安全。当向CopyOnWriteArraySet中添加元素时,会先检查集合中是否已存在该元素,如果不存在,则将元素添加到新复制的数组中。读操作同样直接读取原数组,无需加锁。
  2. 唯一性保证:作为Set的实现,CopyOnWriteArraySet保证了集合中元素的唯一性。这是通过在添加元素时进行检查来实现的。以下是一个CopyOnWriteArraySet的使用示例:
import java.util.concurrent.CopyOnWriteArraySet;

public class CopyOnWriteArraySetExample {
    public static void main(String[] args) {
        CopyOnWriteArraySet<String> set = new CopyOnWriteArraySet<>();
        set.add("element1");
        set.add("element2");
        set.add("element1"); // 重复添加,不会生效
        for (String element : set) {
            System.out.println(element);
        }
    }
}
  1. 应用场景CopyOnWriteArraySet适用于读多写少且需要保证元素唯一性的场景,例如,在一些缓存场景中,需要存储唯一的缓存标识,并且这些标识很少被修改,但会被频繁读取,CopyOnWriteArraySet就是一个不错的选择。

ConcurrentLinkedQueue

  1. 无锁队列ConcurrentLinkedQueue是一个基于链表实现的无锁线程安全队列。它采用了乐观锁的机制,通过CAS(Compare - And - Swap)操作来实现线程安全。CAS操作是一种原子操作,它比较内存中的值与预期值,如果相等则将内存中的值更新为新值。在ConcurrentLinkedQueue中,入队和出队操作都通过CAS操作来修改链表的节点引用,而不需要使用传统的锁机制。例如,在入队操作时,会通过CAS操作将新节点添加到链表的尾部。
  2. 性能优势:由于避免了锁的使用,ConcurrentLinkedQueue在高并发场景下具有较高的性能。多个线程可以同时进行入队和出队操作,而不会因为锁的竞争而阻塞。这使得它在一些需要高效处理队列任务的场景中非常适用,比如生产者 - 消费者模型中的任务队列。以下是一个ConcurrentLinkedQueue的使用示例:
import java.util.concurrent.ConcurrentLinkedQueue;

public class ConcurrentLinkedQueueExample {
    public static void main(String[] args) {
        ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
        queue.add("element1");
        queue.add("element2");
        String element = queue.poll();
        System.out.println("Polled element: " + element);
    }
}
  1. 使用注意事项:虽然ConcurrentLinkedQueue在高并发场景下性能优越,但由于它是无锁的,在某些复杂场景下可能会出现一些微妙的问题。例如,在遍历队列时,由于队列可能在遍历过程中被其他线程修改,所以遍历结果可能不是完全准确的。在使用时需要根据具体场景进行权衡。

LinkedBlockingQueue

  1. 阻塞队列LinkedBlockingQueue是一个基于链表实现的阻塞队列。它实现了BlockingQueue接口,支持在队列为空时阻塞读操作,在队列满时阻塞写操作。LinkedBlockingQueue内部使用了两把锁,一把用于读操作,一把用于写操作,这种设计减少了读和写之间的竞争。例如,当一个线程尝试从空队列中读取元素时,它会被阻塞,直到有其他线程向队列中添加了元素;当一个线程尝试向满队列中添加元素时,它也会被阻塞,直到有其他线程从队列中取出了元素。
  2. 适用场景LinkedBlockingQueue非常适合用于生产者 - 消费者模型。生产者线程可以将任务添加到队列中,而消费者线程可以从队列中取出任务进行处理。如果队列已满,生产者线程会等待;如果队列已空,消费者线程会等待。以下是一个简单的生产者 - 消费者模型示例,使用LinkedBlockingQueue作为任务队列:
import java.util.concurrent.LinkedBlockingQueue;

class Producer implements Runnable {
    private final LinkedBlockingQueue<Integer> queue;

    public Producer(LinkedBlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            try {
                queue.put(i);
                System.out.println("Produced: " + i);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

class Consumer implements Runnable {
    private final LinkedBlockingQueue<Integer> queue;

    public Consumer(LinkedBlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Integer element = queue.take();
                System.out.println("Consumed: " + element);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
}

public class LinkedBlockingQueueExample {
    public static void main(String[] args) {
        LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(5);
        Thread producerThread = new Thread(new Producer(queue));
        Thread consumerThread = new Thread(new Consumer(queue));
        producerThread.start();
        consumerThread.start();
        try {
            producerThread.join();
            consumerThread.interrupt();
            consumerThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
  1. 队列容量LinkedBlockingQueue可以设置容量,也可以不设置容量(默认为Integer.MAX_VALUE)。设置容量可以避免队列无限增长,从而防止内存溢出等问题。在使用时需要根据实际需求合理设置队列容量。

ArrayBlockingQueue

  1. 基于数组的阻塞队列ArrayBlockingQueue是一个基于数组实现的有界阻塞队列。与LinkedBlockingQueue类似,它也实现了BlockingQueue接口,支持在队列为空时阻塞读操作,在队列满时阻塞写操作。ArrayBlockingQueue在创建时需要指定队列的容量,一旦指定,容量不可变。例如:
import java.util.concurrent.ArrayBlockingQueue;

public class ArrayBlockingQueueExample {
    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);
        try {
            queue.put(1);
            queue.put(2);
            Integer element = queue.take();
            System.out.println("Taken element: " + element);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
  1. 锁机制ArrayBlockingQueue内部使用一把锁来控制读和写操作,这与LinkedBlockingQueue的两把锁机制有所不同。虽然一把锁的设计相对简单,但在高并发场景下,读和写操作可能会相互竞争锁,从而影响性能。不过,在一些读和写操作频率相对均衡且并发度不是特别高的场景下,ArrayBlockingQueue仍然能够提供较好的性能。
  2. 适用场景:由于ArrayBlockingQueue是有界的,它适用于需要限制任务数量的场景,比如在一些资源有限的系统中,为了防止任务堆积导致系统资源耗尽,可以使用ArrayBlockingQueue来限制任务队列的大小。

PriorityBlockingQueue

  1. 优先队列PriorityBlockingQueue是一个无界的线程安全优先队列。它实现了BlockingQueue接口,元素按照自然顺序或者自定义的比较器顺序进行排序。在PriorityBlockingQueue中,每次取出的元素都是队列中优先级最高的元素。例如,如果元素实现了Comparable接口,那么元素会按照compareTo方法定义的顺序进行排序;如果在创建PriorityBlockingQueue时提供了自定义的Comparator,则会按照Comparator定义的规则进行排序。
  2. 使用示例:以下是一个PriorityBlockingQueue的使用示例,其中元素是自定义的Task类,根据任务的优先级进行排序:
import java.util.concurrent.PriorityBlockingQueue;

class Task implements Comparable<Task> {
    private int priority;
    private String name;

    public Task(int priority, String name) {
        this.priority = priority;
        this.name = name;
    }

    @Override
    public int compareTo(Task other) {
        return Integer.compare(this.priority, other.priority);
    }

    @Override
    public String toString() {
        return "Task{" +
                "priority=" + priority +
                ", name='" + name + '\'' +
                '}';
    }
}

public class PriorityBlockingQueueExample {
    public static void main(String[] args) {
        PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
        queue.add(new Task(3, "task3"));
        queue.add(new Task(1, "task1"));
        queue.add(new Task(2, "task2"));
        Task task = queue.poll();
        System.out.println("Polled task: " + task);
    }
}
  1. 注意事项:由于PriorityBlockingQueue是无界的,在向队列中添加元素时不会阻塞,除非队列内存耗尽。在使用时需要注意控制队列的大小,避免内存溢出问题。同时,由于元素的排序需要进行比较操作,所以元素的比较操作应该尽量高效,以避免影响队列的性能。

DelayQueue

  1. 延迟队列DelayQueue是一个无界的阻塞队列,其中的元素只有在延迟时间到期后才能被取出。DelayQueue中的元素必须实现Delayed接口,该接口继承自Comparable接口,除了compareTo方法外,还需要实现getDelay方法,用于获取元素的剩余延迟时间。例如,假设有一个任务需要在10秒后执行,我们可以将该任务封装成一个实现了Delayed接口的对象,并添加到DelayQueue中,DelayQueue会在10秒后将该任务取出。
  2. 使用示例:以下是一个简单的DelayQueue使用示例,模拟了一个延迟任务的执行:
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

class DelayedTask implements Delayed {
    private long delayTime;
    private long startTime;
    private String taskName;

    public DelayedTask(long delayTime, String taskName) {
        this.delayTime = delayTime;
        this.startTime = System.currentTimeMillis();
        this.taskName = taskName;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        long remainingDelay = delayTime - elapsedTime;
        return unit.convert(remainingDelay, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed other) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), other.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return "DelayedTask{" +
                "taskName='" + taskName + '\'' +
                '}';
    }
}

public class DelayQueueExample {
    public static void main(String[] args) {
        DelayQueue<DelayedTask> queue = new DelayQueue<>();
        queue.add(new DelayedTask(3000, "task1"));
        queue.add(new DelayedTask(1000, "task2"));
        try {
            while (true) {
                DelayedTask task = queue.take();
                System.out.println("Executing task: " + task);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
  1. 应用场景DelayQueue常用于实现定时任务、缓存过期等功能。例如,在一个缓存系统中,可以将缓存项封装成实现Delayed接口的对象,并添加到DelayQueue中,当缓存项的延迟时间到期后,DelayQueue会将其取出,从而可以进行缓存清理等操作。

并发数据结构的选择原则

  1. 读写频率:如果读操作远远多于写操作,CopyOnWriteArrayListCopyOnWriteArraySetConcurrentHashMap(在高并发读场景下)是不错的选择。因为这些数据结构对读操作进行了优化,CopyOnWriteArrayListCopyOnWriteArraySet读操作无需加锁,ConcurrentHashMap读操作通常也不需要获取锁。而如果读写操作频率较为均衡,ConcurrentHashMap仍然是一个较好的选择,因为它的分段锁机制能够较好地支持并发读写。如果写操作远远多于读操作,需要考虑数据结构的写性能,例如LinkedBlockingQueueArrayBlockingQueue等阻塞队列,虽然它们在写操作时可能会有一定的阻塞,但在高并发写且需要队列功能的场景下是比较合适的。
  2. 数据一致性要求:对于数据一致性要求极高的场景,CopyOnWriteArrayListCopyOnWriteArraySet可能不太适用,因为它们存在写操作到读操作感知变化的延迟。而ConcurrentHashMapConcurrentLinkedQueue等数据结构在一致性方面相对较好,能够满足大多数场景的需求。如果需要严格的顺序一致性,在选择数据结构时需要更加谨慎,可能需要结合锁机制或者使用一些专门的并发数据结构来保证顺序。
  3. 是否需要队列功能:如果应用场景需要队列功能,如生产者 - 消费者模型,那么ConcurrentLinkedQueueLinkedBlockingQueueArrayBlockingQueue等队列数据结构是首选。ConcurrentLinkedQueue是无锁队列,性能较高,但在遍历等操作上可能有一些局限性;LinkedBlockingQueueArrayBlockingQueue是阻塞队列,适用于需要控制队列容量和阻塞读写的场景,其中LinkedBlockingQueue基于链表实现,ArrayBlockingQueue基于数组实现,各有特点。
  4. 是否需要优先队列或延迟队列功能:如果需要按照元素的优先级进行处理,PriorityBlockingQueue是合适的选择;如果需要实现定时任务或缓存过期等功能,DelayQueue则是不二之选。这些特殊功能的队列能够满足特定场景下的需求,在选择时需要根据实际业务逻辑来确定。

总结

在Java并发编程中,选择合适的并发数据结构是至关重要的。不同的数据结构具有不同的特点和适用场景,通过深入理解它们的原理、性能以及使用注意事项,可以在开发高并发应用程序时做出明智的选择。无论是线程安全的传统集合类,还是JUC包中的高性能并发容器,都为我们提供了丰富的工具,帮助我们构建高效、可靠的并发应用。在实际项目中,需要根据具体的业务需求,综合考虑读写频率、数据一致性要求、队列功能等因素,选择最适合的数据结构,以实现最优的并发性能和程序正确性。同时,不断学习和研究新的并发数据结构以及优化技术,也是提升并发编程能力的关键。