博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java笔记(十四) 并发基础知识
阅读量:5065 次
发布时间:2019-06-12

本文共 28003 字,大约阅读时间需要 93 分钟。

并发基础知识

一、线程的基本概念

线程表示一条单独的执行流,它有自己的程序计数器,有自己的栈。

1.创建线程

1)继承Thread

Java中java.lang.Thread这个类表示线程,一个类可以继承Thread并重写run方法来实现一个线程:

public class MyThread extends Thread{    @Override    public void run() {        System.out.println("thread name: " + Thread.currentThread().getName() +                " thread id: " + Thread.currentThread().getId());        System.out.println("Running my thread!");    }    public static void main(String[] args) {        MyThread thread = new MyThread();        //启动线程        thread.start();        /*thread name: Thread-0 thread id: 11        Running my thread!*/    }}

2)实现Runnable接口

public class MyRunnable implements Runnable{    @Override    public void run() {        System.out.println("thread name:" + Thread.currentThread().getName());    }    public static void main(String[] args) {        System.out.println("Main thread name : "                + Thread.currentThread().getName()); //Main thread name : main        Thread thread = new Thread(new MyRunnable()); //thread name:Thread-0        thread.start();    }}

2.线程的基本属性和方法

1)id和name

id: 一个递增整数,每创建一个线程就加一

name:默认值是Thread-后跟一个编号,name可以在Thread的构造方法中指定,可以通过setName方法进行设置。

2)优先级

优先级从1到10,默认为5,相关方法:

public final void setPriority(int newPriority)public final int getPriority()

在编程中不要过分依赖优先级。

3)状态

线程有一个状态概念,Thread获取状态方法:

public State getState()

返回值类型为Thread.State,它是一个枚举值:

public enum State {NEW, //线程还没调用start//调用start后线程在执行run方法且没有阻塞时状态为RUNNABLE,//不过,这并不代表CPU一定在执行该线程的代码,可能正在执行也可能在//等待操作系统分配时间片,只是它在等待其他条件。RUNNABLE,BLOCKED,WAITING,TIMED_WAITING,TERMINATED; //线程已经结束运行}

Thread还有一个方法,返回线程是否alive:

//线程被启动后,run方法运行结束前,返回值都是truepublic final native boolean isAlive()

4.是否是daemon线程 

一般情况下整个程序只有在所有的线程都结束后,线程才会退出。

而daemon线程不一样,它是守护线程,当它辅助的主线程结束时,它也会结束。

public final void setDaemon(boolean on)public final boolean isDaemon()

5.sleep方法

Thread有一个静态方法,调用该方法会让当前线程睡眠指定时间,单位是毫秒。

public static native void sleep(long millis) throws InterruptedException;

在睡眠期间,该线程会让出CPU,但睡眠时间不是确切的给定毫秒数,可能有一定偏差。

睡眠期间,线程可以被中断,如果被中断,sleep会抛异常。

6.yield方法 

public static native void yield();

调用该方法意思是:我现在不急着占用CPU,你可以先让其他线程运行。

不过这对系统调度器也仅仅是建议,调度器如何处理不一定,它可能忽略该调用。

7.join方法 

在前面的MyThread例子中,MyThread可能没执行完,main线程就可能执行完了。

Thread有一个join方法,可以让调用join的线程等待该线程的结束。

public final void join() throws InterruptedException
//限定等待的最长时间public final synchronized void join(long millis) throws InterruptedException
MyThread thread = new MyThread();thread.start();//让main线程在子线程调用结束后再退出,相当于阻塞了main线程thread.join();

3.共享内存及可能出现的问题

虽然每个线程表示一条单独的执行流,有自己的程序计数器和栈,

但线程之间可以共享内存,它们可以访问和操作相同的对象。

public class ShareMemoryDemo {    private static int shared = 0;    private static void incrShared() {        shared ++;    }    static class ChildThread extends Thread {        List
list; public ChildThread(List
list) { this.list = list; } @Override public void run() { incrShared(); list.add(Thread.currentThread().getName()); } } public static void main(String[] args) throws InterruptedException { ArrayList
list = new ArrayList<>(); ChildThread t1 = new ChildThread(list); ChildThread t2 = new ChildThread(list); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(shared); System.out.println(list); /*2 [Thread-0, Thread-1]*/ }}

ChildThread的run方法访问了共享变量shared和list。

执行流、内存和程序代码之间的关系:

1)不同的执行流可以访问和操作相同的变量

2)不同的执行流可以执行相同的程序代码

所以,在分析代码执行过程时,理解代码在被哪个线程执行是很重要的

3)当多条执行流执行相同的程序代码时,每条执行流都有自己的栈,方法中

的参数和局部变量都有自己的一份。

当多条执行流可以操作相同的变量时,可能会出现意料之外的结果,包括竞态条件和内存可见性问题。

1.竞态条件 

所谓竞态条件(race condition)是指,当多个线程访问和操作同一个对象时,最终结果和执行时序

有关,可能正确也可能不正确。

public class CounterThread extends Thread{    private static int counter = 0;    @Override    public void run() {        for (int i = 0; i < 1000; i++) {            counter++;        }    }    public static void main(String[] args) throws InterruptedException {        int num = 1000;        Thread[] threads = new Thread[num];        for (int i = 0; i< num; i++) {            threads[i] = new CounterThread();            threads[i].start();        }        for (int i = 0; i < 1000; i++) {            threads[i].join();        }        System.out.println(counter); //998400    }}

期望结果应该是10000000,实际结果为998400,为什么呢?

因为counter++这个操作不是原子操作,它分为了3个步骤:

1)去counter的当前值

2)在当前值上加1

3)将新值重新赋值给counter

两个线程可能同时执行第一步,取到了相同的counter值。

2.内存的可见性 

多个线程可以共享和访问和操作相同的变量,但一个线程对一个共享变量的修改,

另一个线程不一定能马上见到,甚至永远见不到。

public class VisibilityDemo {    private static boolean shutdown = false;    static class MyThread extends Thread {        @Override        public void run() {            while (!shutdown) {                //do nothing            }            System.out.println("exit myThread");        }    }    public static void main(String[] args) throws InterruptedException {        new MyThread().start();        Thread.sleep(1000);        shutdown = true;        System.out.println("exit main!");    }}

期望的结果是两个线程都退出,但实际执行时,很可能会发现myThread永远不退出,

也就是说在myThread执行流看来,shutdown永远为false,即使main线程已经更改为了ture。

这就是内存可见性问题,在计算机系统中,除了内存,数据还会被缓存在CPU的寄存器以及

各级缓存中,当访问一个变量时,可能从CPU寄存器和各级缓存取,而不是从内存,当

修改一个变量时,也可能是先修改到缓存中,稍后再同步更新到内存中。在单线程程序中,这

一般不是问题,但在多线程程序中,尤其是有多个CPU的情况下,这就是严重问题。一个线程对

内存的修改,另一个线程看不到,一是修改没有及时同步到到内存,二是另一个线程根本就没有从内存中读取。

二、理解synchronized(同步)

1.用法和基本原理

synchronized可以用于:

1)实例方法:

public class Counter {    private int count;    public synchronized void incre() {        count ++;    }    public synchronized int getCount() {        return count;    }}
public class CounterThread extends Thread {    private Counter counter;    public CounterThread(Counter counter) {        this.counter = counter;    }    @Override    public void run() {        for (int i = 0; i < 1000; i++) {            counter.incre();        }    }    public static void main(String[] args) throws InterruptedException {        int num = 1000;        Counter counter = new Counter();        Thread[] threads = new Thread[num];        for (int i = 0; i < num; i++) {            threads[i] = new CounterThread(counter);            threads[i].start();            threads[i].join();        }        System.out.println(counter.getCount());    }}

看上去,synchronized使得同时只能有一个线程执行实例方法,

但这个理解是不确切的。多个线程是可以同时执行一个synchronized方法的,

只要它们访问的对象不同即可。所以,synchronized实例方法保护的是同一个

对象方法的调用,确保同时只能有一个线程执行。大致过程如下:

1)尝试获得锁,如果能够获得锁,继续下一步,否则加入等待队列,阻塞并等待唤醒;

2)执行实例方法代码;

3)释放锁,如果等待队列上有等待线程,从中抽取一个并唤醒,如果有多个等待线程,唤醒

哪一个是不一定的,不保证公平性。

当线程不能获得锁的时候,他会加入等待队列,状态会变为BLOCKED.

注意:一般在保护变量时,需要在所有访问该变量的方法上加上synchronized。

2)静态方法 

synchronized保护的是类对象,而非实例对象。

3)代码块 

public class Counter {    private int count;    public void incre() {        //synchronized括号里面的就是保护对象{}里就是同步执行代码        synchronized (this) {            count ++;        }    }    public synchronized int getCount() {        synchronized (this) {            return count;        }    }}
public class StaticCounter {    private static int count = 0;    public static void incr() {        synchronized (StaticCounter.class) {            count ++;        }    }    public static int getCount() {        synchronized (StaticCounter.class) {            return count;        }    }}
public class Counter {    private int count;    private Object obj = new Object();    public void incre() {        //synchronized同步的对象可以是任意对象,任意对象都有一个锁和等待队列        synchronized (obj) {            count ++;        }    }    public synchronized int getCount() {        synchronized (obj) {            return count;        }    }}

2.进一步理解synchronized

1)可重入性 

对同一个执行线程,在它获得了锁之后,在调用其他需要同样锁的代码时,可以直接调用。

比如,在一个synchronized实例方法内,可以调用同一个实例中的synchronized实例方法。

2)内存可见性 

synchronized除了保证原子操作外,它还有一个重要作用,就是保证内存可见性,

在释放锁时,所有写入都会写入内存,而获得锁后,都会从内存中读取数据。

不过,如果只是为了保证内存可见性,synchronized成本有点高,可以给变量加修饰符volatile代替。

加了volatile后,Java会在操作对应变量时加入特殊指令,保证读写到内存最新值,而非缓存值。

3)死锁 

死锁举例:有a、b两个线程,a线程持有锁A,在等待锁B,b线程持有锁B,在等待锁A,

a、b陷入了互相等待,最后谁都执行不下去。

public class DeadLockDemo {    private static Object lockA = new Object();    private static Object lockB = new Object();    public static void startThreadA() {        Thread t = new Thread(() -> {            synchronized (lockA) {                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }                synchronized (lockB) {}            }        });        t.start();    }    public static void startThreadB() {        Thread t = new Thread(() -> {            synchronized (lockB) {                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }                synchronized (lockA) {}            }        });        t.start();    }    public static void main(String[] args) {        startThreadA();        startThreadA();    }}

如何解决死锁问题:首先,应该尽量避免在持有一个锁的同时去申请另一个锁,

如果确实需要多个锁,所有代码都应该按照相同的顺序去申请锁。

3.同步容器

Collections类中有一些方法,可以返回线程安全的同步容器:

public static 
Collection
synchronizedCollection(Collection
c)public static
List
synchronizedList(List
list)public static
Map
synchronizedMap(Map
m)

它们是给所有容器方法加上synchronized来实现安全的,例如:

static class SynchronizedCollection
implements Collection
{ final Collection
c; //Backing Collection final Object mutex; //Object on which to synchronize SynchronizedCollection(Collection
c) { if(c==null) throw new NullPointerException(); this.c = c; mutex = this; }public int size() { synchronized (mutex) {
return c.size();} } public boolean add(E e) { synchronized (mutex) {
return c.add(e);} } public boolean remove(Object o) { synchronized (mutex) {
return c.remove(o);} }//…}

这里的线程安全针对的是容器对象,指的是当多个线程并发访问同一个容器对象时,  

不需要额外的同步操作,也不会出现错误结果。这样是不是就绝对安全了呢?不是

的我们还需要注意以下情况:

1)复合操作 

public class EnhancedMap 
{ Map
map; public EnhancedMap(Map
map){ this.map = Collections.synchronizedMap(map); } public V putIfAbsent(K key, V value){ V old = map.get(key); if(old!=null){ return old; } return map.put(key, value); } public V put(K key, V value){ return map.put(key, value); }//…}

其中的putAbsent方法语义是只有在原方法没有对应键的情况下才添加,

这是一个检查然后在更新的操作,在多线程的情况下,可能有多个线程

执行完了检查这一步,都发现Map中没有对应的键,然后就会都调用put,这就破坏了该方法的语义。

2.伪同步 

那么给该方法加上synchronized就线程安全了吗?

public synchronized V putIfAbsent(K key, V value){    V old = map.get(key);    if(old!=null){        return old;    }    return map.put(key, value);}

答案是否定的,原因是同步对象错了。putAbsent同步使用的是EnhancedMap对象,

而其他方法使用的是Collections当作的map对象。要解决这个问题,所有方法必须

使用相同的锁。所以可以改为:

public V putIfAbsent(K key, V value){    synchronized(map){        V old = map.get(key);        if(old!=null){            return old;        }        return map.put(key, value);    }}

3.迭代 

对于同步容器对象,虽然单个操作是安全的,但迭代并不是:

public class DeadLockDemo {    private static void startModifyThread(final List
list) { Thread modifyThread = new Thread(new Runnable() { @Override public void run() { for(int i = 0; i < 100; i++) { list.add("item " + i); try { Thread.sleep((int) (Math.random() * 10)); } catch (InterruptedException e) { } } } }); modifyThread.start(); } private static void startIteratorThread(final List
list) { Thread iteratorThread = new Thread(new Runnable() { @Override public void run() { while (true) { for(String str : list) { } } } }); iteratorThread.start(); } public static void main(String[] args) { final List
list = Collections .synchronizedList(new ArrayList
()); startIteratorThread(list); startModifyThread(list); } /*Exception in thread "Thread-0" java.util.ConcurrentModificationException at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901) at java.util.ArrayList$Itr.next(ArrayList.java:851) at com.cdert.roadpms.test.utils.DeadLockDemo$2.run(DeadLockDemo.java:34) at java.lang.Thread.run(Thread.java:748)*/}

可以改为:

private static void startIteratorThread2(final List
list) { Thread iteratorThread = new Thread(new Runnable() { @Override public void run() { while(true) { synchronized(list){ for(String str : list) { } } } } }); iteratorThread.start();}

二、线程的基本协作机制

一)协作的场景

1)生产者/消费者模式:生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到

队列上,而消费者从队列取数据或者任务,如果队列长度有限,在队列满的时候,生产者需要等待,

在队列为空的时候,消费者需要等待。

2)同时开始:在一些程序,尤其是仿真程序中,要求多个线程同时开始。

3)等待结束:主线程将任务分解成若干子任务,为每个子任务创建一个线程,主线程在继续执行其他

任务之前需要等待每个子任务执行完毕。

4)异步结果:在主从协作模式中,主线程手工创建子线程往往笔记麻烦,一种常见的模式是把子线程封装

为异步调用,异步调用马上返回,但返回的不是最终结果,而是一个称为Future的对象,通过它可以在随后获得最终结果。

5)集合点:在一些程序中,比如并行迭代计算中,每个线程负责一部分计算,然后在集合点等待其他线程完成,所有线程

到齐后,交还数据和计算结果,再进行下一次迭代。

二、wait/notify

在Java根父类中定义了一些线程协作的基本方法,这些方法

分为两类:wait和notify。wait主要有两个方法:

public final void wait() throws InterruptedException//单位为毫秒表示最长等待时间,wait(0)表示无限期等待,无参wait等于调用wait(0)public final native void wait(long timeout) throws InterruptedException;

每个对象都有一把锁和等待队列,一个线程在进入synchronized代码块时,

会尝试获取锁,如果获取不到就会把当前线程加入等待队列,其实,除了

用于锁的等待队列,每个对象还有另一个等待队列,叫做条件队列,该队列

用于线程间的协作。调用wait就会把当前线程(调用该方法使用的线程)放到条件

队列上并阻塞,表示当前线程无法执行,它需要等待一个条件,这个条件不能由

它自己发起,需要其他线程发起。当其他线程改变条件后,应该调用notify方法。

//notify将会从条件队列中选取一个线程,将其从队列中移除并唤醒public final native void notify();//notifyAll与notify的区别是:它会移除条件队列中的所有线程,并全部唤醒public final native void notifyAll();

简单的协作示例:

public class WaitThread extends Thread{    private volatile boolean fire = false;    @Override    public void run() {        try {            synchronized(this) {                while (!fire) {                    wait();                }            }            System.out.println("fired!");        } catch (Exception e) {            e.printStackTrace();        }    }    public synchronized void fire() {        this.fire = true;        notify();    }    public static void main(String[] args) throws InterruptedException {        WaitThread waitThread = new WaitThread();        waitThread.start();        Thread.sleep(1000);        System.out.println("fire");        waitThread.fire();    }}

两个线程都要访问变量fire,容易出现竞态条件,所有相关代码都要被

synchronized保护。实际上,wait/notify方法只能在synchronized代码

块内被调用,如果调用wait/notify方法时,当前线程没有持有对象锁,会抛出异常。

看看另一种情况:

public class WaitThread extends Thread{    private volatile boolean fire = false;    private volatile int count = 0;    @Override    public void run() {        try {            synchronized(this) {                while (!fire) {                    sleep(1000);                    System.out.println(++count);                }            }            System.out.println("fired");        } catch (Exception e) {            e.printStackTrace();        }    }    //实例对象的锁不会被释放,该方法永远不会被执行    public synchronized void fire() {        System.out.println("fire it");         this.fire = true;    }    public static void main(String[] args) throws InterruptedException {        WaitThread waitThread = new WaitThread();        waitThread.start();        Thread.sleep(1000);        System.out.println("fire");        waitThread.fire();    }}

思考:如果wait必须被synchronized保护,那一个线程在wait时,另一个线程

怎么可能调用到同样被synchronized保护的notify方法(注意两个被保护的synchronized方法是一个实例的)?

它不需要等待锁么?我们需要进一步理解wait的内部过程才能解开谜团!!虽然是在synchronized内,但调用

wait时,线程会释放对象锁。wait的具体过程是:

1)把当前线程放入条件等待队列,释放对象锁,阻塞等待,线程状态变为WAITING或TIMED_WAITING。

2)等待时间到或者被其他线程调用notify/notifyAll从条件队列中移除,这时,要重新竞争对象锁:

如果能够重新获得锁,线程状态变为RUNNABLE,并从wait调用中返回。

否则,该线程加入对象锁等待队列,线程变为BLOCKED,只有在获得锁后才会从wait调用中返回。

线程从wait调用中返回后,不代表其等待条件就一定成立了,它需要重新检查其等待条件,一般

调用模式是:

synchronized (obj) {while(条件不成立)obj.wait();…//执行条件满足后的操作}

比如上例中的代码:

synchronized (this) {    while(!fire) {    wait();    }}

调用notify会把条件队列中的线程唤醒并从队列中移除,但它并不会

释放调用它的代码块获得的对象锁,也就是说,只有在包含notify的

synchronized代码块执行完毕后,等待线程才会从wait调用中返回。

三、生产者\消费者模式

public class MyQueue
{ private Queue
queue = null; private int limit; public MyQueue(int limit) { this.limit = limit; queue = new ArrayDeque
(limit); } //给生产者用的方法,往队列上放数据,满了就wait public synchronized void put(E e) throws InterruptedException { while (queue.size() == limit) wait(); queue.add(e); //由于条件不同但又使用相同的的等待队列,所以 //要用notifyAll而不能调用notify notifyAll(); } //take给消费者用的方法空了就wait public synchronized E take() throws InterruptedException { while (queue.isEmpty()) wait(); E e = queue.poll(); notifyAll(); return e; }}

只能有一个条件等待队列,这是 wait/notify机制的缺陷,这使得对于条件的分析变得复杂。

public class Producer extends Thread{    MyQueue
queue; public Producer(MyQueue
queue) { this.queue = queue; } @Override public void run() { int num = 0; try { while (true) { String task = String.valueOf(num); queue.put(task); System.out.println("produce: " + task); num ++; Thread.sleep(3000); } } catch (InterruptedException e) { e.printStackTrace(); } }}
public class Consumer extends Thread{    MyQueue
queue; public Consumer(MyQueue
queue) { this.queue = queue; } @Override public void run() { try { while (true) { String task = queue.take(); System.out.println("consumer: " + task); Thread.sleep(3000); } } catch (InterruptedException e) { e.printStackTrace(); } }}

三、同时开始

public class FireFlag {    private volatile boolean fired = false;    public synchronized void waitForFire() throws InterruptedException {        while (!fired) {            wait();        }    }    public synchronized void fire() {        this.fired = true;        notifyAll();    }}
public class Racer extends Thread{    private FireFlag flag;    public Racer(FireFlag fireFlag) {        this.flag = fireFlag;    }    @Override    public void run() {        try {            flag.waitForFire();            System.out.println("start run " + Thread.currentThread().getName());        } catch (InterruptedException e) {            e.printStackTrace();        }    }    public static void main(String[] args) throws InterruptedException {        int num = 10;        FireFlag flag = new FireFlag();        Thread[] threads = new Thread[num];        for (int i = 0; i < num; i++) {            threads[i] = new Racer(flag);            threads[i].start();        }        Thread.sleep(1000);        flag.fire();    }}

四、等待结束

join方法实际上就是调用了wait方法:

while (isAlive()) {    wait(0);}

只要线程是活的,isAlive()返回true,join就一直等待。谁来通知它呢?

当线程运行结束时,Java系统调用notifyAll来通知。使用join比较麻烦,需要

主线程逐一等待每个子线程。这里我们展示一种新写法:

public class MyLatch {    //未完成的线程个数,初始值为子线程总个数    private int threadsCount;    public MyLatch(int threadsCount) {        this.threadsCount = threadsCount;    }    public synchronized void await() throws InterruptedException {        while (threadsCount > 0) wait();    }    public synchronized void countDown() {        threadsCount --;        if (threadsCount <= 0) this.notifyAll();    }}
public class Worker extends Thread{    MyLatch myLatch;    public Worker(MyLatch myLatch) {        this.myLatch = myLatch;    }    @Override    public void run() {        try {            Thread.sleep(5000);            myLatch.countDown();        } catch (InterruptedException e) {            e.printStackTrace();        }    }    public static void main(String[] args) throws InterruptedException {        int num = 10;        MyLatch myLatch = new MyLatch(num);        Thread[] threads = new Thread[num];        for (int i = 0; i < num; i++) {            threads[i] = new Worker(myLatch);            threads[i].start();        }        myLatch.await();        System.out.println("ready to end !");    }}

五、异步结果

在Java中表示子任务的接口是Callable,声明为:

public interface Callable
{ V call() throws Exception;}

为表示异步调用的结果,定义一个MyFuture接口:

public interface MyFuture 
{ //get方法返回真正的结果,如果结果没有计算完成, //get方法会阻塞直到计算完成 V get() throws Exception ;}

为方便主线程调用子任务,定义一个MyExcecutor类,其中定义一个execute

方法表示执行子任务并返回调用结果。

//利用该方法,对于主线程,就不需要创建并管理子线程了,//并且可以方便地获取异步调用的结果public 
MyFuture
execute(final Callable
task)

实例:

//表示子任务得接口interface MyCallable
{ V call() throws Exception;}
//表示异步调用的结果interface MyFuture
{ //返回真正的结果 V get() throws Exception;}
interface MyExecutor {    
MyFuture
execute(final MyCallable
task);}
//执行任务的子线程public class ExecuteThread
extends Thread{ private V result = null; private Exception exception = null; private volatile boolean done = false; private MyCallable
task; private Object lock; public ExecuteThread(MyCallable
task, Object lock) { this.task = task; this.lock = lock; } @Override public void run() { try { result = task.call(); } catch (Exception e) { exception = e; } finally { synchronized (lock) { done = true; lock.notifyAll(); } } } public V getResult() { return result; } public boolean isDone() { return done; } public Exception getException() { return exception; }}
public class MyExecutorImpl implements MyExecutor{    @Override    public 
MyFuture
execute(MyCallable
task) { final Object lock = new Object(); final ExecuteThread
thread = new ExecuteThread<>(task, lock); MyFuture
myFuture = new MyFuture
(){ @Override public V get() throws Exception{ synchronized (lock) { System.out.println("Try to get result"); while (!thread.isDone()) { try { //阻塞直到任务执行完毕 lock.wait(); } catch (Exception e) {} } if (thread.getException() != null) { throw thread.getException(); } } System.out.println("Got it!!!"); return thread.getResult(); } }; thread.start(); return myFuture; }}
public class MainClass {    public static void main(String[] args) {        MyExecutor executor = new MyExecutorImpl();        MyCallable
task = new MyCallable
() { @Override public Integer call() throws Exception { System.out.println("Start do the call."); int result = (int) (Math.random() * 1000); Thread.sleep(5000); System.out.println("Do the call done."); return result; } }; //异步调用,返回一个MyFuture对象 MyFuture
future = executor.execute(task); //执行其他操作 try { System.out.println("Start do other things."); Thread.sleep(3000); System.out.println("Other things done. "); System.out.println("Ready to get."); //获取异步调用结果 Integer result = future.get(); System.out.println("The result is " + result); } catch (Exception e) { e.printStackTrace(); } /*Start do other things. Do the call done. Other things done. The result is 802*/ }}

六、集合点

public class AssemblePoint {    //未到达集合点的线程数量,初始为所有线程数    private int threadsCount;    public AssemblePoint(int threadsCount) {        this.threadsCount = threadsCount;    }    public synchronized void await() throws InterruptedException {        System.out.println("Now the count is " + threadsCount);        if (threadsCount > 0) {            threadsCount --;            if (threadsCount == 0 ) {                notifyAll();                System.out.println("All the threads done.");            }            else {                while (threadsCount !=0) {                    this.wait();                }            }        }    }}
public class AssemblePointDemo {    static class Tourist extends Thread {        AssemblePoint point;        public Tourist(AssemblePoint point) {            this.point = point;        }        @Override        public void run() {            try {                //模拟各自独自运行                Thread.sleep((long) (Math.random() * 1000));                //该线程运行完毕,集合                point.await();            } catch (Exception e) {                e.printStackTrace();            }        }    }    public static void main(String[] args) {        int num = 10;        Tourist[] threads = new Tourist[num];        AssemblePoint point = new AssemblePoint(num);        for (int i = 0; i < num; i++) {            threads[i] = new Tourist(point);            threads[i].start();        }    }}

三、线程的中断

 一)取消/关闭机制

在Java中停止一个线程的主要机制是中断,中断并不是强迫终止一个线程,它是一种协作机制,

是给线程传递一个取消信号,但是由线程线程决定如何以及何时退出。每个线程都有一个标志位,表示该线程是否被中断了。

//返回线程的中断标志位public boolean isInterrupted()//中断对应的线程public void interrupt()//该方法是静态方法,实际会调用Thread.currentThread()操作当前线程,返回标志位,并清空public static boolean interrupted()

 二)线程中断的反应

interrupt()对线程的影响与线程的状态和正在进行的IO操作有关,

我们主要讨论线程状态:

1)RUNNABLE:线程正在运行或者具备运行的条件只是在等待操作系统调度;

2)WAITING/TIME_WAITING:线程在等待某个条件或者超时;

3)BLOCKED:线程在等待锁,试图进入同步块

4)NEW/TERMINATED:线程还未启动或者已经结束。

1.RUNNABLE 

如果线程在运行中,且没有执行IO操作,interrupt()只是会设置线程的中断标致位,

没有任何其他作用。线程应该在运行过程中合适的位置检查中断标志位,例如:

public class InterruptRunnableDemo extends Thread {    @Override    public void run() {        while(!Thread.currentThread().isInterrupted()) {        }        System.out.println("done ");    }}

2.WAITING/TIME_WAITING 

线程调用join/wait/sleep方法会进入WAITING/TIME_WAITING状态,

在这些状态时,对线程对象调用interrupt会使线程抛出InterruptedException。

抛出异常后,中断标志位会被清空,而不是设置。比如:

public static void main(String[] args) {    Thread thread = new Thread(() -> {        try {            Thread.sleep(1000);        } catch (Exception e) {            System.out.println(Thread.currentThread().isInterrupted()); //false        }    });    thread.start();    try {        Thread.sleep(100);    } catch (Exception e) {}    thread.interrupt();}

捕获InterruptedException,通常表示希望线程结束,线程大致有两种处理方式:

1)向上传递该异常,这使得该方法也变成一个可中断方法,需要调用者进行处理;

2)有些情况不能向上传递异常,比如Thread的run方法,它的声明是固定的,不能

抛出受检异常,这时,应该捕获异常,进行合理的清理操作,清理后,一般应该调

用Thread的interrupt方法设置中断标志位,使得其他代码有方法知道它发生中断。

3.BLOCKED 

如果一个线程在等待锁,对线程调用interrupt只是会设置中断标志位,线程

仍然会处于BLOCKED状态。interrupt并不能使一个正在等待锁的线程正真“中断”。

public class InterruptSynchronizedDemo {    private static Object lock = new Object();    private static class MyThread extends Thread {        @Override        public void run() {            synchronized (lock) {                System.out.println("Coming in "); //never                while (!Thread.currentThread().isInterrupted()) {                }            }            System.out.println("exit");//never        }    }    public static void test() throws InterruptedException {        synchronized (lock) {            MyThread thread = new MyThread();            thread.start();            Thread.sleep(1000);            thread.interrupt();            thread.join();        }    }    public static void main(String[] args) {        //test方法在持有锁的情况下启动线程thread,而线程thread        //也尝试获得锁,所以会进入等待队列。        try {            test();        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

在使用synchronized关键字获取锁的过程中不响应中断请求,这是synchronized的局限性。

4.NEW/TERMINATE

调用interrupt无任何效果。

三)如何正确地取消/关闭线程

interrupt方法不一定会真正中断线程,它只是一种协作机制。

对于以线程提供服务的程序模块而言,它应该封装取消/关闭操作,提供

单独的取消/关闭方法给调用者。

转载于:https://www.cnblogs.com/Shadowplay/p/10035987.html

你可能感兴趣的文章
数据库框架的log4j日志配置
查看>>
lintcode-easy-Remove Element
查看>>
Android 常用开源框架源码解析 系列 (四)Glide
查看>>
操作系统概述
查看>>
mysql 根据地图 坐标 查询 周边景区、酒店
查看>>
<CDQ分治> Jam's problem again [HDU - 5618]
查看>>
mysql重置密码
查看>>
使用request简单爬虫
查看>>
jQuery轮 播的封装
查看>>
一天一道算法题--5.30---递归
查看>>
switchcase的用法
查看>>
React.js 小书 Lesson15 - 实战分析:评论功能(二)
查看>>
Java基础03 构造器与方法重载
查看>>
软件项目经理职责[转](
查看>>
辗转相除求最大公约数
查看>>
Redis 主从集群搭建及哨兵模式配置
查看>>
nginx ------反向代理和负载均衡
查看>>
Linux下安装JDK
查看>>
[HDU] 3711 Binary Number [位运算]
查看>>
908. Smallest Range I
查看>>