博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
并发编程之基础( 四)
阅读量:6146 次
发布时间:2019-06-21

本文共 25388 字,大约阅读时间需要 84 分钟。

新类库

前面已经把并发编程的基础知识讲的差不多了,这章主要介绍一下JAVA中其它一些关于并发编程的类库,主要有一下几个类库。

  • CountDownLatch
  • CyclicBarrier
  • BlockingQueue
  • ScheduleExecutor
  • Semaphore
  • Exchanger

1. CountDownLatch

  该类主要是同步一个或多个任务,强制一个或多个任务等待其它任务执行的一组操作完成。可以给该对象设置一个初始计数值,当计数值不为0时,调用该对象的await()方法就会阻塞,调用counDown()方法会让计数值减1,当计数值为0时阻塞任务会被唤醒。其典型用法就是将一个程序分成多个独立的任务,并给CountDownLatch设定一个初始值,该初始值应该为首先需要执行的线程的个数(比如赛跑,5个运动员都做好准备之后,裁判才能打枪,这时初始值应该设置为5)。一些任务需要等待其它任务先完成或者其它任务的一部分完成,那么可以待用await()将自己挂起。而另一些任务的某些操作完成时调用countDown()方法来减小计数值,等待计数值为0时,挂起的任务则则认为当前所有的条件以满足继续执行的需要了,则可以继续运行。注意:计数值只能被设置一次且在new的时候就要指定初值,而且该对象只能使用一次,如果想重复使用,请考虑CyclicBarrier

1 package com.dy.xidian; 2  3 import java.util.Random; 4 import java.util.concurrent.CountDownLatch; 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 import java.util.concurrent.TimeUnit; 8  9 class TaskPortion implements Runnable {10     private static int counter = 0;11     private final int id = counter++;12     private static Random rand = new Random(47);13     private final CountDownLatch latch;14 15     public TaskPortion(CountDownLatch latch) {16         super();17         this.latch = latch;18     }19 20     @Override21     public void run() {22         try {23             doWork();24             latch.countDown();25         } catch (InterruptedException e) {26         }27 28     }29 30     public void doWork() throws InterruptedException {31         TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));32         System.out.println(this + "completed");33     }34 35     public String toString() {36         return String.format("%1$-3d", id);37     }38 }39 40 class WaitingTask implements Runnable {41     private static int counter = 0;42     private final int id = counter++;43     private final CountDownLatch latch;44 45     public WaitingTask(CountDownLatch latch) {46         super();47         this.latch = latch;48     }49 50     @Override51     public void run() {52         try {53             latch.await();54             System.out.println("Latch barrier passed for " + this);55         } catch (InterruptedException e) {56             System.out.println(this + " interrupted");57         }58     }59 60     public String toString() {61         return String.format("WaitingTask %1$-3d ", id);62     }63 }64 public class CountDownLatchDemo {65     static final int SIZE = 100;66     public static void main(String[] args) {67         ExecutorService exec = Executors.newCachedThreadPool();68         CountDownLatch latch = new CountDownLatch(SIZE);69         for (int i = 0; i < 10; i++)70             exec.execute(new WaitingTask(latch));71         for (int i = 0; i < SIZE; i++)72             exec.execute(new TaskPortion(latch));73         System.out.println("Launched all tasks");74         exec.shutdownNow();75     }76 }
View Code

2. CyclicBarrier

  CyclicBarrier与CountDownLatch功能差不多,不同之处就是可以多次使用,等到计数值变为0时,它会自动重置。而且不需要每个线程都去调用类似countDown()这样的方法,因为每调用一个await(),它就会自动将计数值减1。它使用于这种情况:多个线程并行执行工作,大家一致向前推进,所有线程在这个阶段的工作都完成了(所有的线程都调用了await方法),才能进入下一阶段,而对于那些早完成的线程只能先等待了。下面是一个赛马比赛,每个马可以看作一个线程,等所有的马都达到栅栏后,才能开始新一轮的比赛。

1 package com.dy.xidian;  2   3 import java.util.ArrayList;  4 import java.util.List;  5 import java.util.Random;  6 import java.util.concurrent.BrokenBarrierException;  7 import java.util.concurrent.CyclicBarrier;  8 import java.util.concurrent.ExecutorService;  9 import java.util.concurrent.Executors; 10 import java.util.concurrent.TimeUnit; 11  12 class Horse implements Runnable { 13     private static int counter = 0; 14     private final int id = counter++; 15     private int strides = 0; 16     private static Random rand = new Random(47); 17     private static CyclicBarrier barrier; 18  19     public Horse(CyclicBarrier b) { 20         barrier = b; 21     } 22  23     public synchronized int getStriders() { 24         return strides; 25     } 26  27     @Override 28     public void run() { 29         try { 30             while (!Thread.interrupted()) { 31                 synchronized (this) { 32                     strides += rand.nextInt(3); 33                 } 34                 barrier.await(); 35             } 36         } catch (InterruptedException e) { 37             // TODO 38         } catch (BrokenBarrierException e) { 39             throw new RuntimeException(e); 40         } 41     } 42  43     public String toString() { 44         return "Horse " + id + " "; 45     } 46  47     public String tracks() { 48         StringBuilder s = new StringBuilder(); 49         for (int i = 0; i < getStriders(); i++) 50             s.append("*"); 51         s.append(id); 52         return s.toString(); 53     } 54 } 55  56 public class HorseRace { 57     static final int FINISH_LINE = 75; 58     private List
horses = new ArrayList
(); 59 private ExecutorService exec = Executors.newCachedThreadPool(); 60 private CyclicBarrier barrier; 61 62 public HorseRace(int nHorses, final int pause) { 63 barrier = new CyclicBarrier(nHorses, new Runnable() { 64 65 @Override 66 public void run() { 67 StringBuilder s = new StringBuilder(); 68 for (int i = 0; i < FINISH_LINE; i++) { 69 s.append("="); 70 System.out.println(s); 71 for (Horse horse : horses) 72 System.out.println(horse.tracks()); 73 for (Horse horse : horses) 74 if (horse.getStriders() >= FINISH_LINE) { 75 System.out.println(horse + "won!"); 76 exec.shutdownNow(); 77 return; 78 } 79 try { 80 TimeUnit.MILLISECONDS.sleep(pause); 81 } catch (InterruptedException e) { 82 System.out.println("barrier-action sleep interrupted"); 83 } 84 } 85 } 86 }); 87 88 for (int i = 0; i < nHorses; i++) { 89 Horse horse = new Horse(barrier); 90 horses.add(horse); 91 exec.execute(horse); 92 } 93 } 94 95 public static void main(String[] args) { 96 int nHorses = 7; 97 int pause = 200; 98 if (args.length > 0) { 99 int n = new Integer(args[0]);100 nHorses = n > 0 ? n : nHorses;101 }102 if (args.length > 1) {103 int p = new Integer(args[1]);104 pause = p > -1 ? p : pause;105 }106 new HorseRace(nHorses, pause);107 }108 }
View Code

运行结果:

=**0***1*2**3*4***5***6==**0***1*2**3*4***5***6===**0***1*2**3*4***5***6====

  运行结果中的==表示栅栏,数字为每个马的编号,*的个数代表每个马目前跑了多少步。在代码我,我们可以看到在创建CyclicBarrier对象时,我们还给他传递了一个复写了Runnable后的对象,这是我CounDownLatch不同的地方。每当计数器的值为0的是时候,里面的该对象中的run方法会被调用。可能有这样一种情况,当计数值再次变为0时,上次的run方法还没执行完,它会不会创建新的线程重新执行run方法呢?通过测试,这种情况是不会发生的,只有等run执行完,才会去创建新的线程。

 3 BlockingQueue

BlockingQueue是一个接口,用于生产者-消费者模型,是一个线程安全的容器。它的实现类有LinkedBlockingQueue(空间无限,FIFO), ArrayBlockingQueue(空间有限,FIFO),PriorityBlockingQueue(元素等级高的在队头),SynchronousQueue(内部没有缓冲区,生产者线程需要将产品直接交给一个空闲的消费者线程,否则将一直处于阻塞状态)

3.1 DelayQueue

  DelayQueue是一个无界的阻塞队列,用于存放实现了Delayed接口的对象,其中的对象只能在其延迟期满才能从队列中取走。该队列的头部是延迟期满后保存时间最长的Delayed元素。如果没有任何延迟期满的对象,那就不会有任何头元素,这时如果使用take()方法从队列获取对象时会发生阻塞,使用poll时会直接返回null。

1 package com.dy.xidian;  2   3 import java.util.ArrayList;  4 import java.util.List;  5 import java.util.Random;  6 import java.util.concurrent.DelayQueue;  7 import java.util.concurrent.Delayed;  8 import java.util.concurrent.ExecutorService;  9 import java.util.concurrent.Executors; 10 import java.util.concurrent.TimeUnit; 11  12 class DelayedTask implements Runnable, Delayed { 13     private static int counter = 0; 14     private final int id = counter++; 15     private final int delayTime; 16     private final long trigger; 17  18     protected static List
sequeue = new ArrayList
(); 19 20 // System.nanoTime()获取当前时间,结果是纳秒级 21 // TimeUnit.MILLSECONDS.convert(time, TimeUnit.SECONDS) 22 // 时间转换(一般是大单位转小单位),比如计算1s=多少ms之类的 23 // time是时间,TimeUnit.SECONDS是原始单位(s),MILLISECONDS是转换后的单位(ms) 24 public DelayedTask(int delayInMilliseconds) { 25 delayTime = delayInMilliseconds; 26 trigger = System.nanoTime() 27 + TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS); 28 sequeue.add(this); 29 } 30 31 // 重载Delayed接口的getDelay方法,该示例代码给出的是重载的标准形式 32 @Override 33 public long getDelay(TimeUnit unit) { 34 return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS); 35 36 } 37 38 //比较每个对象的触发时间,以确定在队列中的位置 39 @Override 40 public int compareTo(Delayed arg) { 41 DelayedTask that = (DelayedTask) arg; 42 if (trigger < that.trigger) 43 return -1; 44 if (trigger > that.trigger) 45 return 1; 46 return 0; 47 } 48 49 @Override 50 public void run() { 51 System.out.println(this + " "); 52 } 53 54 public String toString() { 55 return String.format("[%1$-4d]", counter) + "Task " + id; 56 } 57 58 public String summary() { 59 return "(" + id + ":" + counter + ")"; 60 } 61 62 public static class EndSentinel extends DelayedTask { 63 private ExecutorService exec; 64 65 public EndSentinel(int delay, ExecutorService e) { 66 super(delay); 67 exec = e; 68 } 69 70 public void run() { 71 for (DelayedTask pt : sequeue) { 72 System.out.println(pt.summary() + " "); 73 } 74 System.out.println(this + " Calling shutdownNow()"); 75 exec.shutdownNow(); 76 } 77 } 78 } 79 80 class DelayTaskConsumer implements Runnable { 81 private DelayQueue
q; 82 public DelayTaskConsumer(DelayQueue
q) { 83 this.q = q; 84 } 85 86 @Override 87 public void run() { 88 try { 89 while (!Thread.interrupted()) 90 q.take().run(); 91 } catch (InterruptedException e) { 92 } 93 System.out.println("Finised DelayedTaskConsumer!"); 94 } 95 96 } 97 public class DelayQueueDemo { 98 public static void main(String[] args) { 99 Random rand = new Random(47);100 ExecutorService exec = Executors.newCachedThreadPool();101 DelayQueue
queue = new DelayQueue
();102 for (int i = 0; i < 20; i++)103 queue.put(new DelayedTask(rand.nextInt(5000)));104 queue.add(new DelayedTask.EndSentinel(5000, exec));105 exec.execute(new DelayTaskConsumer(queue));106 }107 }
View Code

运行结果:

1 [128 ]Task 11  2 [200 ]Task 7  3 [429 ]Task 5  4 [520 ]Task 18  5 [555 ]Task 1  6 [961 ]Task 4  7 [998 ]Task 16  8 [1207]Task 9  9 [1693]Task 2 10 [1809]Task 14 11 [1861]Task 3 12 [2278]Task 15 13 [3288]Task 10 14 [3551]Task 12 15 [4258]Task 0 16 [4258]Task 19 17 [4522]Task 8 18 [4589]Task 13 19 [4861]Task 17 20 [4868]Task 6 21 (0:21) 22 (1:21) 23 (2:21) 24 (3:21) 25 (4:21) 26 (5:21) 27 (6:21) 28 (7:21) 29 (8:21) 30 (9:21) 31 (10:21) 32 (11:21) 33 (12:21) 34 (13:21) 35 (14:21) 36 (15:21) 37 (16:21) 38 (17:21) 39 (18:21) 40 (19:21) 41 (20:21) 42 [5000]Task 20 Calling shutdownNow()43 Finised DelayedTaskConsumer!
View Code

  该程序创建了20个delayedTask对象,这20对象其实是线程对象,然后将这20对象放入DelayedQueue中,同时将这20个对象加入到list中以表明创建的先后顺序。每个线程的延迟期是通过随机数指定的。在DelayedTask中有一个内部类,该类的作用就是遍历list,输出每个线程的信息(id + 延迟期),最后关闭整个线程。DelayedTaskConsumer就是不断从DelayedQueue中取线程对象,然后让其执行。

  关于Delayed接口的实现这里要强调一下,代码中写的是标准形式,也是策略模式的一种简单实现。delayTime是延迟期,需要我们指定。trigger表示这个对象的激活时间(比如到11点整时,其延迟期满),其计算方法就是获取当前时间+延迟期。而getDelay(TimeUnit unit)这个函数是个关键,这个函数会被调用两次:第一次查看延期满的时间点和当前时间之差(比如当前时间9点,延迟期满是在11点),发现是正值,对象需要继续等待;第二次查看时发现是负值(比如当前时间已经到了12点了),返回值为负数,说明对象的延迟期已经到了,可以使用了。 

3.2 PriorityBlockingQueue

队列是按照优先级级顺序排序的,优先级大的在队头。队列中的对象应该实现Comparable接口。在compareTo中,当和其他对象比较时,如果该方法返回负数,那么在队列里面的优先级就比较高。

1 package com.dy.xidian;  2   3 import java.util.ArrayList;  4 import java.util.List;  5 import java.util.Queue;  6 import java.util.Random;  7 import java.util.concurrent.ExecutorService;  8 import java.util.concurrent.Executors;  9 import java.util.concurrent.PriorityBlockingQueue; 10 import java.util.concurrent.TimeUnit; 11  12 class PrioritizedTask implements Runnable, Comparable
{ 13 private Random rand = new Random(47); 14 private static int counter = 0; 15 private final int id = counter++; 16 private final int priority; 17 18 protected static List
sequeue = new ArrayList
(); 19 20 public PrioritizedTask(int priority) { 21 super(); 22 this.priority = priority; 23 sequeue.add(this); 24 } 25 26 @Override 27 public int compareTo(PrioritizedTask that) { 28 if (this.priority > that.priority) 29 return -1; 30 if (this.priority < that.priority) 31 return 1; 32 return 0; 33 } 34 35 @Override 36 public void run() { 37 try { 38 TimeUnit.MILLISECONDS.sleep(rand.nextInt(250)); 39 System.out.println(this); 40 } catch (InterruptedException e) { 41 } 42 43 } 44 45 @Override 46 public String toString() { 47 return String.format("[%1$-3d]", priority) + "Task" + id; 48 } 49 50 public String summary() { 51 return "(" + id + " : " + priority + ")"; 52 } 53 54 public static class EndSentinel extends PrioritizedTask { 55 private ExecutorService exec; 56 57 public EndSentinel(ExecutorService e) { 58 super(-1); 59 exec = e; 60 } 61 62 public void run() { 63 int count = 0; 64 for (PrioritizedTask pt : sequeue) { 65 System.out.println(pt.summary()); 66 if (++count % 5 == 0) 67 System.out.println(""); 68 } 69 System.out.println(""); 70 System.out.println(this + " Calling shutdownNow()"); 71 exec.shutdownNow(); 72 } 73 } 74 } 75 76 class PrioritizedTaskProducer implements Runnable { 77 private Random rand = new Random(47); 78 private Queue
queue; 79 private ExecutorService exec; 80 81 public PrioritizedTaskProducer(Queue
queue, ExecutorService exec) { 82 super(); 83 this.queue = queue; 84 this.exec = exec; 85 } 86 87 @Override 88 public void run() { 89 for (int i = 0; i < 10; i++) { 90 queue.add(new PrioritizedTask(rand.nextInt(10))); 91 Thread.yield(); 92 } 93 try { 94 for (int i = 0; i < 10; i++) { 95 TimeUnit.MILLISECONDS.sleep(250); 96 queue.add(new PrioritizedTask(10)); 97 } 98 99 } catch (InterruptedException e) {100 }101 for (int i = 0; i < 10; i++)102 queue.add(new PrioritizedTask(i));103 queue.add(new PrioritizedTask.EndSentinel(exec));104 System.out.println("Finished PrioritizedTaskProducer");105 }106 }107 108 class PrioritizedTaskConsumer implements Runnable {109 private PriorityBlockingQueue
q;110 public PrioritizedTaskConsumer(PriorityBlockingQueue
q) {111 super();112 this.q = q;113 }114 115 @Override116 public void run() {117 try {118 TimeUnit.SECONDS.sleep(1);119 } catch (InterruptedException e1) {120 }121 try {122 while (!Thread.interrupted()) {123 q.take().run();124 }125 } catch (InterruptedException e) {126 System.out.println("Interrupted Execption!");127 }128 System.out.println("Finished PrioritizedTaskConsumer");129 }130 }131 132 public class PriorityBlockingQueueDemo {133 public static void main(String[] args) throws InterruptedException {134 ExecutorService exec = Executors.newCachedThreadPool();135 PriorityBlockingQueue
queue = new PriorityBlockingQueue
();136 exec.execute(new PrioritizedTaskProducer(queue, exec));137 exec.execute(new PrioritizedTaskConsumer(queue));138 }139 }
View Code

 

4.ScheduledThreadPoolExecutor

制定任务计划表,指定主线程运行多少秒(毫秒)后,开启子线程来运行别的任务。

1 package com.dy.xidian; 2  3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 import java.util.concurrent.ScheduledThreadPoolExecutor; 6 import java.util.concurrent.TimeUnit; 7  8 public class TaskTest { 9     static ScheduledThreadPoolExecutor scheduler = null;10     static int index = 0;11 12     public static void main(String[] args) {13 14         // 构造一个ScheduledThreadPoolExecutor对象,并且设置它的容量为5个15         scheduler = new ScheduledThreadPoolExecutor(5);16         MyTask task = new MyTask();17         // 隔2秒后开始执行任务,并且在上一次任务开始后隔一秒再执行一次;18         // stpe.scheduleWithFixedDelay(task, 2, 1, TimeUnit.SECONDS);19         // 隔6秒后执行一次,但只会执行一次。20         for (int i = 0; i < 10; i++)21             scheduler.schedule(task, i + 1, TimeUnit.SECONDS);22     }23 24     private static String getTimes() {25         SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss E");26         Date date = new Date();27         date.setTime(System.currentTimeMillis());28         return format.format(date);29     }30 31     private static class MyTask implements Runnable {32 33         @Override34         public void run() {35             index++;36             System.out.println(getTimes() + " " + index);37             if (index >= 10) {38                 scheduler.shutdownNow();39             }40         }41     }42 }

5.Semaphore

  同类资源只有一个的话,我们可以用Lock或者是synchronized来对它进行互斥访问。当同类资源数量有多个,能够满足多个线程同时操作时,可以考虑到使用信号量来实现互斥访问。

对象池代码

1 package com.dy.xidian; 2  3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.concurrent.Semaphore; 6  7 public class ObjectPool
{ 8 private int size; 9 private List
items = new ArrayList
();10 private volatile boolean[] checkOut;11 private Semaphore available;12 13 public ObjectPool(Class
classObject, int size) {14 this.size = size;15 checkOut = new boolean[size];16 //size表示初始资源数,true表示对请求进行先来先服务操作17 available = new Semaphore(size, true);18 for (int i = 0; i < size; i++) {19 try {20 items.add(classObject.newInstance());21 } catch (Exception e) {22 throw new RuntimeException(e);23 }24 }25 }26 27 public T checkOut() throws InterruptedException {28 //获取信号量,如果没有资源请等待,信号量计数减129 available.acquire();30 return getItem();31 }32 33 private synchronized T getItem() {34 for (int i = 0; i < size; i++) {35 if (!checkOut[i]) {36 checkOut[i] = true;37 return items.get(i);38 }39 }40 return null;41 }42 43 public void checkIn(T x) {44 //归还资源,释放信号量,信号量计数加145 if (releaseItem(x))46 available.release();47 }48 49 private synchronized boolean releaseItem(T item) {50 int index = items.indexOf(item);51 if (index == -1)52 return false;53 if (checkOut[index]) {54 checkOut[index] = false;55 return true;56 }57 return false;58 }59 }
View Code

信号量Demo:

1 package com.dy.xidian; 2  3 import java.util.ArrayList; 4 import java.util.List; 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 import java.util.concurrent.Future; 8 import java.util.concurrent.TimeUnit; 9 10 class Fat {11     private volatile double d = 0;12     private static int counter = 0;13     private final int id = counter++;14     public Fat() {15         for (int i = 0; i < 1000; i++) {16             d += (Math.PI + Math.E) / (double) i;17         }18     }19 20     public void operation() {21         System.out.println(this);22     }23 24     public String toString() {25         return "Fat id: " + id;26     }27 }28 class CheckOutTask
implements Runnable {29 private static int counter = 0;30 private final int id = counter++;31 private ObjectPool
pool;32 33 public CheckOutTask(ObjectPool
pool) {34 this.pool = pool;35 }36 37 @Override38 public void run() {39 try {40 T item = pool.checkOut();41 System.out.println(this + "checked out " + item);42 TimeUnit.SECONDS.sleep(1);43 System.out.println(this + "checked in " + item);44 pool.checkIn(item);45 } catch (InterruptedException e) {46 }47 }48 49 public String toString() {50 return "CheckoutTask" + id + " ";51 }52 }53 54 public class SemaphoreDemo {55 final static int SIZE = 10;56 57 public static void main(String[] args) throws InterruptedException {58 final ObjectPool
pool = new ObjectPool
(Fat.class, SIZE);59 ExecutorService exec = Executors.newCachedThreadPool();60 List
list = new ArrayList
();61 // 创建10个子线程进行签入、签出操作62 for (int i = 0; i < SIZE; i++)63 exec.execute(new CheckOutTask
(pool));64 System.out.println("All checkout Task created");65 66 // 主线程只签出67 for (int i = 0; i < SIZE; i++) {68 Fat f = pool.checkOut();69 System.out.println(i + " : main() thread checked out");70 f.operation();71 list.add(f);72 }73 74 Future
blocked = exec.submit(new Runnable() {75 76 @Override77 public void run() {78 try {79 pool.checkOut();80 } catch (InterruptedException e) {81 System.out.println("checkOut() interrupted");82 }83 }84 });85 TimeUnit.SECONDS.sleep(2);86 blocked.cancel(true);87 System.out.println("Checking in objects in " + list);88 for (Fat fat : list)89 pool.checkIn(fat);90 for (Fat fat : list)91 pool.checkIn(fat);92 exec.shutdown();93 }94 }

  代码中创建了一个对象池,每次从池中获取对象时都要先获取信号量,如果信号量计数小于或等于0,则等待。信号量在创建时需要指定资源数,对象池中最开始有10个对象,则信号量的初始值应为10。在使用完对象后应该归还该对象并释放信号量。对于信号量的使用比较简答些。关于对象池代码则可以作为今后编程中示例代码。

6.Exchanger

  Exchanger可以在两个线程之间交换数据,只能是2个线程,他不支持更多的线程之间互换数据。当线程A调用Exchange对象的exchange()方法后,他会陷入阻塞状态,直到线程B也调用了exchange()方法,然后以线程安全的方式交换数据,之后线程A和B继续运行。

1 package com.dy.xidian;  2   3 import java.util.List;  4 import java.util.concurrent.CopyOnWriteArrayList;  5 import java.util.concurrent.Exchanger;  6 import java.util.concurrent.ExecutorService;  7 import java.util.concurrent.Executors;  8 import java.util.concurrent.TimeUnit;  9  10 import net.mindview.util.BasicGenerator; 11 import net.mindview.util.Generator; 12  13 class ExchangerProducer
implements Runnable { 14 private Generator
generator; 15 private Exchanger
> exchanger; 16 private List
holder; 17 18 /** 19 * 20 * @param exchanger交换器 21 * , 用于交换对象 22 * @param generator产生器 23 * , 产生要交换的数据 24 * @param holder数据容器 25 * , 用来存储产生的数据 26 */ 27 public ExchangerProducer(Exchanger
> exchanger, 28 Generator
generator, List
holder) { 29 super(); 30 this.generator = generator; 31 this.exchanger = exchanger; 32 this.holder = holder; 33 } 34 35 /** 36 * 生产者线程会生成一个满的List,用于交换对象 37 */ 38 @Override 39 public void run() { 40 try { 41 while (!Thread.interrupted()) { 42 for (int i = 0; i < ExchangerDemo.size; i++) 43 holder.add(generator.next()); 44 // 返回值是从消费者那里拿到的数据(其实就是一个空表) 45 holder = exchanger.exchange(holder); 46 } 47 } catch (InterruptedException e) { 48 } 49 } 50 } 51 52 class ExchangerConsumer
implements Runnable { 53 private Exchanger
> exchanger; 54 private List
holder; 55 private volatile T value; 56 57 /** 58 * 59 * @param exchanger交换器 60 * ,用于交换数据 61 * @param holder 62 * 欲交换的对象 63 */ 64 public ExchangerConsumer(Exchanger
> exchanger, List
holder) { 65 super(); 66 this.exchanger = exchanger; 67 this.holder = holder; 68 } 69 70 /** 71 * 消费者不断将表中的元素移除,给生产者一个空表 72 */ 73 @Override 74 public void run() { 75 try { 76 while (!Thread.interrupted()) { 77 holder = exchanger.exchange(holder); 78 for (T x : holder) { 79 value = x; 80 holder.remove(x); 81 } 82 } 83 } catch (InterruptedException e) { 84 } 85 System.out.println("Final value: " + value); 86 } 87 } 88 89 public class ExchangerDemo { 90 static int size = 10; 91 static int delay = 5; 92 93 public static void main(String[] args) throws Exception { 94 if (args.length > 0) 95 size = new Integer(args[0]); 96 if (args.length > 1) 97 delay = new Integer(args[1]); 98 ExecutorService exec = Executors.newCachedThreadPool(); 99 Exchanger
> xc = new Exchanger
>();100 List
producerList = new CopyOnWriteArrayList
(), consumerList = new CopyOnWriteArrayList
();101 exec.execute(new ExchangerProducer
(xc, BasicGenerator102 .create(Fat.class), producerList));103 exec.execute(new ExchangerConsumer
(xc, consumerList));104 TimeUnit.SECONDS.sleep(delay);105 exec.shutdownNow();106 }107 }
View Code

代码中ExchangeProducer不断填充List,然后将这个满表交换为ExchangerConsumer传递给它的空表。

代码中用到的写时拷贝技术可以查考下面链接

 

转载于:https://www.cnblogs.com/xidongyu/p/5319902.html

你可能感兴趣的文章
Mindjet MindManager 2019使用教程:
查看>>
游戏设计的基本构成要素有哪些?
查看>>
详解 CSS 绝对定位
查看>>
AOP
查看>>
我的友情链接
查看>>
NGUI Label Color Code
查看>>
.NET Core微服务之基于Polly+AspectCore实现熔断与降级机制
查看>>
vue组件开发练习--焦点图切换
查看>>
浅谈OSI七层模型
查看>>
Webpack 2 中一些常见的优化措施
查看>>
移动端响应式
查看>>
python实现牛顿法求解求解最小值(包括拟牛顿法)【最优化课程笔记】
查看>>
js中var、let、const的区别
查看>>
腾讯云加入LoRa联盟成为发起成员,加速推动物联网到智联网的进化
查看>>
从Python2到Python3:超百万行代码迁移实践
查看>>
Windows Server已可安装Docker,Azure开始支持Mesosphere
查看>>
简洁优雅地实现夜间模式
查看>>
react学习总结
查看>>
微软正式发布PowerShell Core 6.0
查看>>
Amazon发布新的会话管理器
查看>>