Java 并发

本文最后更新于 2 分钟前,文中所描述的信息可能已发生改变。

线程基础

并发编程允许程序同时执行多个任务,提高程序效率。Java 通过多线程实现并发。主要类和接口包括:

  • Thread 类:表示线程的类,通过继承 Thread 类来创建线程。
  • Runnable 接口:表示任务的接口,通过实现 Runnable 接口来创建线程。
  • Callable 接口:表示有返回值的任务的接口,通过实现 Callable 接口来创建线程。
  • Executor 框架:用于管理线程池和任务调度,如 ExecutorService、ThreadPoolExecutor。
Java 中创建线程有几种方式?

四种创建方式:

java
// 1. 继承Thread类
class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Thread: " + Thread.currentThread().getName());
    }
}

// 2. 实现Runnable接口(推荐)
class MyRunnable implements Runnable {
    @Override
    public void run() {
        System.out.println("Runnable: " + Thread.currentThread().getName());
    }
}

// 3. 实现Callable接口(有返回值)
class MyCallable implements Callable<String> {
    @Override
    public String call() throws Exception {
        return "Callable result: " + Thread.currentThread().getName();
    }
}

// 4. 使用线程池(最佳实践)
public class ThreadCreationDemo {
    public static void main(String[] args) throws Exception {
        // 方式1:继承Thread
        new MyThread().start();

        // 方式2:实现Runnable
        new Thread(new MyRunnable()).start();

        // 方式3:Callable + FutureTask
        FutureTask<String> futureTask = new FutureTask<>(new MyCallable());
        new Thread(futureTask).start();
        System.out.println(futureTask.get()); // 获取返回值

        // 方式4:线程池
        ExecutorService executor = Executors.newFixedThreadPool(2);
        executor.submit(new MyRunnable());
        executor.submit(new MyCallable());
        executor.shutdown();
    }
}
多线程实现方式的区别?
方式是否支持返回值是否可复用应用场景
继承 Thread 类不支持不可复用简单场景
实现 Runnable 接口不支持可复用多线程共享资源
实现 Callable 接口支持可复用需要返回值的任务
使用线程池支持可复用高并发场景
介绍一下线程安全的类
类别类名说明
基础类String, StringBuffer, AtomicIntegerStringBuffer 使用了同步方法,线程安全
集合类Vector, ConcurrentHashMap, CopyOnWriteArrayList使用同步或锁机制保证线程安全
工具类Collections.synchronizedList通过包装实现线程安全的集合
Blocking 队列LinkedBlockingQueue, ArrayBlockingQueue支持生产者-消费者模型,线程安全
原子类AtomicInteger, AtomicLong, AtomicReference使用 CAS(Compare And Swap)实现原子操作

生命周期

线程的生命周期,各个状态之间是如何转换的?

线程状态枚举:

java
public enum State {
    NEW,          // 新建
    RUNNABLE,     // 可运行(就绪+运行)
    BLOCKED,      // 阻塞
    WAITING,      // 等待
    TIMED_WAITING,// 超时等待
    TERMINATED;   // 终止
}

状态转换:

java
public class ThreadStateDemo {
    public static void main(String[] args) throws InterruptedException {
        Object lock = new Object();

        Thread thread = new Thread(() -> {
            synchronized (lock) {
                try {
                    System.out.println("线程开始等待");
                    lock.wait(); // WAITING状态
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        System.out.println("创建后状态: " + thread.getState()); // NEW

        thread.start();
        Thread.sleep(100);
        System.out.println("启动后状态: " + thread.getState()); // WAITING

        synchronized (lock) {
            lock.notify(); // 唤醒线程
        }

        thread.join();
        System.out.println("结束后状态: " + thread.getState()); // TERMINATED
    }
}

volatile

volatile 关键字的作用是什么?

volatile 是 Java 提供的一种轻量级的同步机制,它具有两个特性:可见性和有序性。

可见性: volatile 保证了线程间变量是可见的,即当一个线程修改了共享变量后,该变量会立马同步到主内存,其余线程监听到数据变化后会使得自己缓存的原数据失效,并触发 read 操作读取新修改的变量的值。

有序性: volatile 保证了线程对变量的修改是有序的,即禁止指令重排序

  • 当程序执行到 volatile 变量的读操作或者写操作时,在其前面的操作的更改肯定全部已经进行,且结果已经对后面的操作可见,在其后面的操作肯定还没有进行。
  • 在进行指令优化时,不能将在对 volatile 变量访问的语句放在其后面执行,也不能把 volatile 变量后面的语句放到其前面执行。

应用场景:

  • volatile 不能保证原子性,即不能保证多个线程同时修改一个变量时的线程安全性
  • volatile 不能代替锁,它只能保证可见性和有序性,不能保证原子性。
  • volatile 适用于一个线程写,多个线程读的场景。
volatile 底层是如何实现的?

示例代码:

java
public class VolatileDemo {
    private volatile boolean flag = false;
    private int count = 0;

    // 线程1:写操作
    public void writer() {
        count = 42;      // 1. 普通写
        flag = true;     // 2. volatile写
    }

    // 线程2:读操作
    public void reader() {
        if (flag) {      // 3. volatile读
            int value = count; // 4. 普通读,能看到count=42
        }
    }
}

happens-before 规则:

  1. 程序顺序规则:单线程内,前面的操作 happens-before 后面的操作
  2. volatile 规则:volatile 写 happens-before volatile 读
  3. 传递性:A happens-before B,B happens-before C,则 A happens-before C

因此:count=42 happens-before flag=true happens-before flag读取 happens-before count读取

内存可见性

内存屏障如何实现的?

底层实现:

txt
// volatile写操作的内存屏障
StoreStore屏障
volatile写操作
StoreLoad屏障

// volatile读操作的内存屏障
volatile读操作
LoadLoad屏障
LoadStore屏障

实际应用场景:

java
// 1. 状态标志
public class StatusFlag {
    private volatile boolean shutdown = false;

    public void shutdown() {
        shutdown = true; // 写线程
    }

    public void work() {
        while (!shutdown) { // 读线程,能立即看到变化
            // 执行工作
        }
    }
}

// 2. 双重检查锁定(DCL)
public class Singleton {
    private volatile static Singleton instance;

    public static Singleton getInstance() {
        if (instance == null) { // 第一次检查
            synchronized (Singleton.class) {
                if (instance == null) { // 第二次检查
                    instance = new Singleton(); // volatile防止指令重排
                }
            }
        }
        return instance;
    }
}

并发工具类

CountDownLatch

使用场景:等待多个线程完成

java
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 3;
        CountDownLatch latch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    // 模拟工作
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " 完成工作");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown(); // 计数器减1
                }
            }, "Worker-" + i).start();
        }

        latch.await(); // 等待所有线程完成
        System.out.println("所有工作完成,开始汇总");
    }
}

CyclicBarrier

使用场景:多线程协同工作

java
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("所有线程都到达屏障,开始下一阶段");
        });

        for (int i = 0; i < threadCount; i++) {
            new Thread(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 到达屏障");
                    barrier.await(); // 等待其他线程
                    System.out.println(Thread.currentThread().getName() + " 继续执行");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "Thread-" + i).start();
        }
    }
}

Semaphore

使用场景:限制并发数量

java
public class SemaphoreDemo {
    private static final Semaphore semaphore = new Semaphore(2); // 最多2个线程

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire(); // 获取许可
                    System.out.println(Thread.currentThread().getName() + " 获得许可");
                    Thread.sleep(2000); // 模拟工作
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    System.out.println(Thread.currentThread().getName() + " 释放许可");
                    semaphore.release(); // 释放许可
                }
            }, "Thread-" + i).start();
        }
    }
}

锁机制

乐观锁与悲观锁的区别?

悲观锁基于悲观的假设,认为共享资源在每次访问时都会发生冲突,因此在每次操作时都会加锁。 这种锁机制会导致其他线程阻塞,直到锁被释放。Java 中的 synchronized 和 ReentrantLock 是悲观锁的典型实现方式。虽然悲观锁能有效避免数据竞争,但在高并发场景下会导致线程阻塞、上下文切换频繁,从而影响系统性能,并且还可能引发死锁问题。

乐观锁基于乐观的假设,认为共享资源在每次访问时不会发生冲突,因此无须加锁,只需在提交修改时验证数据是否被其他线程修改。 Java 中的 AtomicInteger 和 LongAdder 等类通过 CAS(Compare-And-Swap)算法实现了乐观锁。乐观锁避免了线程阻塞和死锁问题,在读多写少的场景中性能优越。但在写操作频繁的情况下,可能会导致大量重试和失败,从而影响性能。

公平锁与非公平锁的区别?
  • 公平锁:按照线程请求的顺序来获取锁,即先到先得。
  • 非公平锁:不考虑线程请求的顺序,有可能后请求的线程先获取到锁。

CAS (Compare And Swap)

  • 定义:CAS 是一种乐观锁,通过比较当前值和期望值是否一样来决定是否更新。
  • 相关类:AtomicInteger、AtomicLong、AtomicReference 等。

问题与方案

问题解决方案
ABA 问题使用版本号或时间戳解决,如 AtomicStampedReference
循环时间长开销大限制重试次数,避免无限循环,如自旋锁
只能保证一个共享变量的原子操作使用 AtomicReference 或加锁

ABA 问题:线程 1 读取数据 A,线程 2 将 A 改为 B,再改回 A,线程 1 再次读取 A,认为 A 未变化。

synchronized

  • 定义:synchronized 是一种悲观锁,通过获取锁来保证同一时刻只有一个线程执行。
  • 特点:可重入、非公平锁、不可中断、性能高。
  • 使用场景:适用于代码块同步、单例模式、简单场景。
synchronized 的锁升级过程是怎样的?

锁升级路径:

无锁 → 偏向锁 → 轻量级锁 → 重量级锁

代码分析:

java
public class SynchronizedDemo {
    private int count = 0;

    // 1. 偏向锁:只有一个线程访问
    public synchronized void increment() {
        count++; // 第一次访问,获得偏向锁
    }

    // 2. 轻量级锁:少量线程竞争,自旋等待
    public void lightweightLock() {
        synchronized (this) {
            // 多个线程竞争,升级为轻量级锁
            // 通过CAS和自旋获取锁
            count++;
        }
    }

    // 3. 重量级锁:竞争激烈,线程阻塞
    public void heavyweightLock() {
        synchronized (this) {
            try {
                // 竞争激烈时,升级为重量级锁
                // 线程进入阻塞状态,由操作系统调度
                Thread.sleep(100);
                count++;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

锁升级的触发条件:

java
// 偏向锁 → 轻量级锁
// 当有第二个线程尝试获取偏向锁时

// 轻量级锁 → 重量级锁
// 当自旋次数超过阈值(默认10次)或自旋线程数超过CPU核数一半时
synchronize 的实现原理?

字节码层面:

java
public class SyncBytecode {
    public synchronized void method1() {
        // 方法级别的synchronized
    }

    public void method2() {
        synchronized (this) {
            // 代码块级别的synchronized
        }
    }
}

// 编译后的字节码:
// method1: ACC_SYNCHRONIZED标志
// method2: monitorenter和monitorexit指令

JVM 层面实现:

java
// 对象头结构(64位JVM)
// Mark Word (64 bits) + Class Pointer (32 bits) + Array Length (32 bits, 数组才有)

// Mark Word在不同锁状态下的存储内容:
// 无锁:hashcode(31) + age(4) + biased_lock(1) + lock(2)
// 偏向锁:thread_id(54) + epoch(2) + age(4) + biased_lock(1) + lock(2)
// 轻量级锁:ptr_to_lock_record(62) + lock(2)
// 重量级锁:ptr_to_heavyweight_monitor(62) + lock(2)

ReentrantLock

  • 定义:ReentrantLock 是一种可重入锁,通过 AQS 实现。
java
public class ReentrantLock implements Lock {
    private final Sync sync;

    // 非公平锁实现
    static final class NonfairSync extends Sync {
        final void lock() {
            // 1. 先尝试CAS获取锁
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1); // 2. 失败则进入AQS队列
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    // 公平锁实现
    static final class FairSync extends Sync {
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                // 公平锁:检查队列中是否有等待线程
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 重入锁逻辑
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
}
  • 特点:支持公平/非公平锁、可中断、超时获取。
  • 使用场景:适用于复杂场景,如高并发、需要公平锁、可中断、超时获取或多个条件变量等场景。
synchronized 和 ReentrantLock 的区别,具体使用场景,注意事项?

示例代码:

java
public class LockComparison {
    private final ReentrantLock lock = new ReentrantLock();
    private int count = 0;

    // synchronized方式
    public synchronized void syncMethod() {
        count++;
    }

    // ReentrantLock方式
    public void lockMethod() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock(); // 必须在finally中释放
        }
    }

    // ReentrantLock的高级功能
    public void advancedFeatures() throws InterruptedException {
        // 1. 尝试获取锁
        if (lock.tryLock()) {
            try {
                // 获取到锁的逻辑
            } finally {
                lock.unlock();
            }
        }

        // 2. 超时获取锁
        if (lock.tryLock(1, TimeUnit.SECONDS)) {
            try {
                // 1秒内获取到锁的逻辑
            } finally {
                lock.unlock();
            }
        }

        // 3. 可中断的锁获取
        try {
            lock.lockInterruptibly();
            // 可以被interrupt()中断
        } finally {
            lock.unlock();
        }
    }
}

对比区别:

对比项synchronizedReentrantLock
底层实现JVM 内置,基于 MonitorJDK 提供的 API,基于 AQS(AbstractQueuedSynchronizer)
锁类型悲观锁,只能独占(排他锁)支持公平/非公平锁、可中断、超时获取
可重入性✅ 支持✅ 支持
公平性❌ 非公平✅ 支持公平锁和非公平锁
锁的范围作用在方法或代码块灵活,作用于任意代码块
是否可中断❌ 不可中断✅ 可以中断 lockInterruptibly()
是否可超时❌ 不支持超时获取锁✅ tryLock(timeout) 支持超时等待
是否需要手动释放❌ 自动释放(异常时也会释放)✅ 需要 lock() 和 unlock() 手动释放
性能JVM 内部优化,性能高适用于复杂场景,但开销稍大

使用场景:

  • synchronized:适用于代码块同步,单例模式,简单场景下使用,性能较高。
  • ReentrantLock:适用于复杂场景,如高并发,需要公平锁、可中断、超时获取或多个条件变量等场景。

注意事项:

  • synchronized: 无需手动释放锁,异常时会自动释放锁,代码块优先级高于方法
  • ReentrantLock: 需要手动释放锁,finally 块中释放锁,条件变量不易过多使用,影响性能

AQS(AbstractQueuedSynchronizer)原理

示例代码:

java
// AQS的核心结构
public abstract class AbstractQueuedSynchronizer {
    // 同步状态
    private volatile int state;

    // 等待队列的头节点
    private transient volatile Node head;

    // 等待队列的尾节点
    private transient volatile Node tail;

    // 队列节点
    static final class Node {
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        volatile int waitStatus;
    }
}

线程池

线程池的核心参数有哪些?如何合理设置?

七大核心参数:

java
public ThreadPoolExecutor(
    int corePoolSize,           // 1. 核心线程数
    int maximumPoolSize,        // 2. 最大线程数
    long keepAliveTime,         // 3. 空闲线程存活时间
    TimeUnit unit,              // 4. 时间单位
    BlockingQueue<Runnable> workQueue,  // 5. 工作队列
    ThreadFactory threadFactory,        // 6. 线程工厂
    RejectedExecutionHandler handler    // 7. 拒绝策略
) {
    // 构造器实现
}

参数设置:

java
public class ThreadPoolConfig {

    // CPU密集型任务
    public static ExecutorService createCpuIntensivePool() {
        int coreSize = Runtime.getRuntime().availableProcessors();
        return new ThreadPoolExecutor(
            coreSize,                    // 核心线程数 = CPU核数
            coreSize,                    // 最大线程数 = CPU核数
            0L, TimeUnit.MILLISECONDS,   // 不需要额外线程
            new LinkedBlockingQueue<>(), // 无界队列
            new ThreadFactory() {
                private final AtomicInteger threadNumber = new AtomicInteger(1);
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "cpu-pool-" + threadNumber.getAndIncrement());
                    t.setDaemon(false);
                    return t;
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy() // 调用者执行策略
        );
    }

    // IO密集型任务
    public static ExecutorService createIoIntensivePool() {
        int coreSize = Runtime.getRuntime().availableProcessors() * 2;
        return new ThreadPoolExecutor(
            coreSize,                    // 核心线程数 = CPU核数 * 2
            coreSize * 2,                // 最大线程数 = CPU核数 * 4
            60L, TimeUnit.SECONDS,       // 空闲线程60秒后回收
            new ArrayBlockingQueue<>(100), // 有界队列,防止OOM
            r -> new Thread(r, "io-pool-" + System.currentTimeMillis()),
            new ThreadPoolExecutor.AbortPolicy() // 抛异常策略
        );
    }
}
工作队列如何选择?

队列类型对比:

java
// 1. ArrayBlockingQueue - 有界队列
BlockingQueue<Runnable> queue1 = new ArrayBlockingQueue<>(100);
// 特点:固定容量,防止OOM,但可能触发拒绝策略

// 2. LinkedBlockingQueue - 无界队列(默认Integer.MAX_VALUE)
BlockingQueue<Runnable> queue2 = new LinkedBlockingQueue<>();
// 特点:容量大,很少触发拒绝策略,但可能OOM

// 3. SynchronousQueue - 直接交换
BlockingQueue<Runnable> queue3 = new SynchronousQueue<>();
// 特点:不存储任务,直接交给线程执行

// 4. PriorityBlockingQueue - 优先级队列
BlockingQueue<Runnable> queue4 = new PriorityBlockingQueue<>();
// 特点:按优先级执行任务
拒绝策略有哪些?

四种内置策略:

java
// 1. AbortPolicy - 抛异常(默认)
new ThreadPoolExecutor.AbortPolicy()

// 2. CallerRunsPolicy - 调用者执行
new ThreadPoolExecutor.CallerRunsPolicy()

// 3. DiscardPolicy - 静默丢弃
new ThreadPoolExecutor.DiscardPolicy()

// 4. DiscardOldestPolicy - 丢弃最老的任务
new ThreadPoolExecutor.DiscardOldestPolicy()

// 5. 自定义策略
RejectedExecutionHandler customHandler = (r, executor) -> {
    // 记录日志
    logger.warn("Task rejected: {}", r.toString());
    // 可以选择重试、存储到数据库等
};

::: detaisl 如何对线程池进行监控? 监控指标:

java
public class ThreadPoolMonitor {
    private final ThreadPoolExecutor executor;

    public ThreadPoolMonitor(ThreadPoolExecutor executor) {
        this.executor = executor;
    }

    public void printStats() {
        System.out.println("=== 线程池状态 ===");
        System.out.println("核心线程数: " + executor.getCorePoolSize());
        System.out.println("最大线程数: " + executor.getMaximumPoolSize());
        System.out.println("当前线程数: " + executor.getPoolSize());
        System.out.println("活跃线程数: " + executor.getActiveCount());
        System.out.println("队列任务数: " + executor.getQueue().size());
        System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
        System.out.println("总任务数: " + executor.getTaskCount());
    }

    // 定期监控
    public void startMonitoring() {
        ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
        monitor.scheduleAtFixedRate(this::printStats, 0, 30, TimeUnit.SECONDS);
    }
}

:::

ThreadLocal

参考

材料计算常用资源