如何在Java中实现多线程生产者消费者模式

使用synchronized和wait/notify实现生产者消费者模式,通过共享缓冲区协调线程:当缓冲区满时生产者等待,空时消费者等待,调用notifyAll唤醒对应线程。

在Java中实现多线程生产者消费者模式,核心是让生产者线程向共享缓冲区添加数据,消费者线程从缓冲区取出数据,同时保证线程安全和避免资源浪费。可以通过wait()notify()或使用BlockingQueue来高效实现。

使用synchronized + wait/notify

这是基础实现方式,通过同步方法控制对共享资源的访问。

关键点:当缓冲区满时,生产者等待;当缓冲区空时,消费者等待。

示例代码:

定义一个共享缓冲区:

class SharedBuffer {
    private final int[] buffer;
    private int index = 0;

    public SharedBuffer(int size) {
        this.buffer = new int[size];
    }

    public synchronized void produce(int value) throws InterruptedException {
        while (index == buffer.length) {
            wait(); // 缓冲区满,生产者等待
        }
        buffer[index++] = value;
        System.out.println("生产: " + value);
        notifyAll(); // 唤醒所有等待线程(包括消费者)
    }

    public synchronized int consume() throws InterruptedException {
        while (index == 0) {
            wait(); // 缓冲区空,消费者等待
        }
        int value = buffer[--index];
        System.out.println("消费: " + value);
        notifyAll(); // 唤醒生产者
        return value;
    }
}

生产者线程:

class Producer implements Runnable {
    private final SharedBuffer buffer;

    public Producer(SharedBuffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            try {
                buffer.produce(i);
                Thread.sleep(500); // 模拟生产耗时
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

消费者线程:

class Consumer implements Runnable {
    private final SharedBuffer buffer;

    public Consumer(SharedBuffer buffer) {
        this.buffer = buffer;
    }

    @Override
    public void run() {
        for (int i = 0; i < 5; i++) {
            try {
                buffer.consume();
                Thread.sleep(800); // 消费较慢
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

测试运行:

public class ProducerConsumerDemo {
    public static void main(String[] args) {
        SharedBuffer buffer = new SharedBuffer(3);
        Thread p = new Thread(new Producer(buffer));
        Thread c = new Thread(new Consumer(buffer));

        p.start();
        c.start();
    }
}

使用BlockingQueue简化实现

更推荐的方式是使用java.util.concurrent.BlockingQueue,它内部已处理了线程等待与唤醒逻辑。

优势:无需手动管理wait/notify,代码简洁且不易出错。

示例代码:
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class PCUsingQueue {
    private final BlockingQueue queue = new ArrayBlockingQueue<>(3);

    class Producer implements Runnable {
        @Override
        public void run() {
            for (int i = 1; i <= 5; i++) {
                try {
                    queue.put(i); // 自动阻塞如果队列满
                    System.out.println("生产: " + i);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    class Consumer implements Runnable {
        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                try {
                    Integer value = queue.take(); // 自动阻塞如果队列空
                    System.out.println("消费: " + value);
                    Thread.sleep(800);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public static void main(String[] args) {
        PCUsingQueue pc = new PCUsingQueue();
        new Thread(pc.new Producer()).start();
        new Thread(pc.new Consumer()).start();
    }
}

注意事项

使用wait()时必须配合synchronized,否则会抛出异常。

循环判断条件应使用while而不是if,防止虚假唤醒导致问题。

notifyAll()notify()更安全,能唤醒所有等待线程,避免死锁。

实际开发中优先选择BlockingQueue实现,如ArrayBlockingQueueLinkedBlockingQueue等,它们是线程安全的,性能更好。

基本上就这些。