神刀安全网

并发基础知识之线程间的协作

多线程除了我们前面讲的竞争,其实还有协作。就像我们人一样,不但要竞争,也要学会合作,这样才能进步。这篇文章我们就讲讲多线程协作的基本机制wait/notify。同时使用多线程实现生产者/消费者模式。

1.协作的场景

多线程协作的场景有很多,比如:

  • 经典的生产者/消费者模式:生产者消费者通过共享队列实现协作,生产者往队列中放数据,消费者向队列中取数据,当队列满了的时候,生产者就不能再放了,当队列空了的时候,消费者就不能取了。

  • 同时开始:比如百米赛跑,所有的运动员必须等待裁判吹哨子之后才能开始跑。在一些程序,尤其是模拟仿真程序,要求多个线程同时开始。

  • 等待结束:主从协作模式也是一种比较常见的协作模式,主线程将任务分成若干小任务,为每个小任务创建一个线程,主线程必须等待所有子线程都运行结束后才能继续。

  • 集合点:比如班级去春游,在集合点必须等到所有同学都来齐了,才能去下一个旅游点,不能抛下任何一个同学,这是不负责任的。反映在程序中,比如并行迭代计算,每个线程负责一部分计算,然后在集合点等待其他线程完成,一起交付数据。


2.wait/notify方法的介绍

wait/notify方法是类Object中的方法,而Object又是所有类的父类,这样就使得所有的对象都可以调用这两个方法。

主要有两个wait方法

public final void wait() throws InterruptedException public final void wait(long timeout) throws InterruptedException 

不带参数或者参数为0,表示无限期等待。

同时这里说一下wait和sleep的区别:wait会释放对象锁,而且不能主动唤醒,需要其他线程去唤醒它。而sleep不会释放对象锁,且到了设置的时间就会主动唤醒。

前面的文章中有说过,每个对象都有一个锁和一个等待队列,当一个线程尝试去获取对象的锁失败时,就会加入该对象的等待队列。其实对象还有另一个队列,叫条件队列,专门用于线程间的协作。

当一个线程调用wait方法后就会把当前线程加入条件队列中并阻塞,等待一个线程去把它唤醒。唤醒使用notify方法,主要有下面形式:

public final void notify() public final void notifyAll() 

notify方法是在条件队列中随机选择一个线程,将其从条件队列中移除并唤醒。notifyAll方法,顾名思义,就是把条件队列中的线程都唤醒。

wait的具体过程如下

1.把当前线程放入条件队列,释放对象锁,线程状态变为WAITING或TIMED_WAITING。

2.等待时间到或者其他线程调用notify/notifyAll方法唤醒,从条件队列中移除,但要重新竞争锁

3.如果获得了对象锁,则线程状态变为RUNNABLE,并从wait方法调用中返回。否则进入等待队列,线程状态变为BLOCKED,只有获得锁之后才从wait方法调用中返回。

说明:线程被唤醒不一定就能立刻获得锁。

ps:wait/notify方法只能在synchronized块中被调用。如果调用wait/notify方法时,当前线程没有获得对象锁,则会抛出异常。


3.生产者/消费者模式

生产者/消费者模式中,协作的共享变量是队列,队列满了,则生产者就wait,队列空了,则消费者就wait。

/**  * 生产者消费者的共享队列  */ public class MyBlockingQueue<E> {     private int length;     private Queue<E> queue;     public MyBlockingQueue(int length) {         this.length = length;         queue = new ArrayDeque<E>(length);     }      public synchronized void put(E e) throws InterruptedException {         while(queue.size() == length) {             wait();         }         queue.add(e);         notifyAll();     }      public synchronized E get() throws InterruptedException {         while(queue.isEmpty()) {             wait();         }         E e = queue.poll();         notifyAll();         return e;     } } 

该类是生产者消费者的共享队列,有两个方法,分别是生产者放数据的put方法,以及消费者取数据的get方法。都加了synchronized进行修饰。两个方法中都使用了wait方法,等待的条件不一样,但会加入相同的等待条件队列,所以这里要使用notifyAll方法,因为notify只能唤醒一个线程,如果唤醒的是同类线程那就完蛋了。

只能有一个条件等待队列,这是wait/notify机制的局限性。

/**  * 生产者线程  */ public class Producer extends Thread {     MyBlockingQueue<String> queue;     public Producer(MyBlockingQueue<String> queue) {         this.queue = queue;     }          @Override     public void run() {         int num = 0;         try {             while(true) {                 String value = String.valueOf(num);                 queue.put(value);                 System.out.println("producer put " + value);                 num++;                 Thread.sleep((int)(Math.random() * 1000));             }         } catch (InterruptedException e) {             e.printStackTrace();         }              } } 
/**  * 消费者线程  */ public class Consumer extends Thread {     MyBlockingQueue<String> queue;     public Consumer(MyBlockingQueue<String> queue) {         this.queue = queue;     }          @Override     public void run() {         try {             while(true) {                 String value = queue.get();                 System.out.println("Consumer get " + value);                 Thread.sleep((int)(Math.random() * 1000));             }         } catch (InterruptedException e) {             e.printStackTrace();         }              } } 

搞一个主程序运行

public class ProducerAndConsumer {     public static void main(String[] args) {         MyBlockingQueue<String> queue = new MyBlockingQueue<>(10);         new Producer(queue).start();         new Consumer(queue).start();     } } 

运行后会交替的打印生产者线程和消费者线程的存取信息。

这里我们使用了ArrayDeque,Java提供了专门的阻塞队列实现

  • 接口BlockingQueue和BlockingDeque
  • 基于数组的实现类ArrayBlockingQueue
  • 基于链表的实现类LinkedBlockingQueue和LinkedBlockingDeque
  • 基于堆的实现类PriorityBlockingQueue

在实际开发中,应优先使用这些类。

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » 并发基础知识之线程间的协作

分享到:更多 ()