博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
FutureTask详解
阅读量:7070 次
发布时间:2019-06-28

本文共 7861 字,大约阅读时间需要 26 分钟。

CAS:http://huangyunbin.iteye.com/blog/1942369

Future:http://blog.csdn.net/liulipuo/article/details/39029643

知识储备CAS

CAS基于冲突检测的无锁并发策略,性能也较高。CAS操作有3个操作数,内存值M,预期值E,新值U,如果M==E,则将内存值修改为B,否则啥都不做。

public class TestUnsafe {  private static Unsafe unsafe;  static{    try {      //通过反射获取rt.jar下的Unsafe类      Field field = Unsafe.class.getDeclaredField("theUnsafe");      field.setAccessible(true);        unsafe = (Unsafe) field.get(null);     } catch (Exception e) {      System.out.println("Get Unsafe instance occur error"+ e);      }  }   public static void main(String[] args) throws Exception    {        Class clazz = Target.class;        Field[] fields = clazz.getDeclaredFields();        System.out.println("fieldName:fieldOffset");        for (Field f : fields) {            // 获取属性偏移量,可以通过这个偏移量给属性设置            System.out.println(f.getName() + ":" + unsafe.objectFieldOffset(f));        }        Target target = new Target();        Field intFiled  =  clazz.getDeclaredField("intParam")  ;        int a=(Integer)intFiled.get(target ) ;        System.out.println("原始值是:"+a);        //intParam的字段偏移是12 原始值是3 我们要改为10        System.out.println(unsafe.compareAndSwapInt(target, 12, 3, 10));        int b=(Integer)intFiled.get(target) ;        System.out.println("改变之后的值是:"+b);        //这个时候已经改为10了,所以会返回false        System.out.println(unsafe.compareAndSwapInt(target, 12, 3, 10));  //false      //判断target对象的偏移量24位置,即strParam,如果该变量值=null,则给strParam赋值5      System.out.println(unsafe.compareAndSwapObject(target, 24, null, "5"));     //true  }  }    class Target{  int intParam=3;    long longParam;    String strParam;    String strParam2;  }

 测试结果:

FutureTask

futureTask的异步计算功能就不介绍了,主要分析源码,下图是futureTask的结构:

 

NEW                            新建            0

COMPLETING             执行中        1
NORMAL                    正常            2
EXCEPTIONAL            异常            3
CANCELLED                取消            4
INTERRUPTING        中断中        5
INTERRUNPED            被中断        6
state的状态变化可以有四种方式
NEW->COMPLETING->NORMAL                            正常完成的流程
NEW->COMPLETING->EXCEPTIONAL                出现异常的流程
NEW->CANCELED                                                  被取消
NEW->INTERRUNPING->INTERRRUNPTED        被中断

创建FutureTask

public FutureTask(Callable
callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; //state初始化NEW状态 this.state = NEW; // ensure visibility of callable }

run方法:

1  public void run() { 2 //如果当前state不是创建状态,且已经在运行。则直接return。 3         if (state != NEW || 4             !UNSAFE.compareAndSwapObject(this, runnerOffset, 5                                          null, Thread.currentThread())) 6             return; 7 //开始执行任务 8         try { 9             Callable
c = callable;10 //创建状态,且callable不为空,开始执行调用call()11 if (c != null && state == NEW) {12 V result;13 boolean ran;14 try {15 result = c.call();16 ran = true;17 } catch (Throwable ex) {18 result = null;19 ran = false;20 setException(ex);21 }22 //成功调用任务,执行set()方法23 if (ran)24 set(result);25 }26 } finally {27 // runner must be non-null until state is settled to28 // prevent concurrent calls to run()29 //无论是否成功,都要把runner设置为null30 runner = null;31 // state must be re-read after nulling runner to prevent32 // leaked interrupts33 int s = state;34 if (s >= INTERRUPTING)35 handlePossibleCancellationInterrupt(s);36 }37 }

接下来看一下24行的set方法:

1 protected void set(V v) { 2 //CAS设置state  NEW  ->  COMPLETING 3         if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 4 //结果赋值outcome 5             outcome = v; 6 //CAS将state设置为NORMAL 7             UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state 8 //唤醒所有的等待线程,同时设置callable=null,调用done(),具体实现后面详细介绍 9             finishCompletion();10         }11     }

cancel()取消

1     public boolean cancel(boolean mayInterruptIfRunning) { 2 //如果state已经不是NEW 或者NEW不能设置成INTERRUPTING , 则直接返回false 。 3         if (!(state == NEW && 4               UNSAFE.compareAndSwapInt(this, stateOffset, NEW, 5                   mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) 6             return false; 7         try {    // in case call to interrupt throws exception 8 //如果是可中断 那么就 调用系统中断方法 然后把状态设置成INTERRUPTED 9             if (mayInterruptIfRunning) {10                 try {11                     Thread t = runner;12                     if (t != null)13 //中断执行线程14                         t.interrupt();15                 } finally { // final state16 //最后设置state为INTERRUPTED17                     UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);18                 }19             }20         } finally {21             finishCompletion();22         }23         return true;24     }

再来看一下get操作:

public V get(long timeout, TimeUnit unit)        throws InterruptedException, ExecutionException, TimeoutException {        if (unit == null)            throw new NullPointerException();        int s = state;//如果还没完成,并且等待timeout也没有完成,则抛出超时异常        if (s <= COMPLETING &&            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)            throw new TimeoutException();        return report(s);    }

awaitDone()操作来如何实现超时,已经完成但未超时,则结束等待返回结果,看具体代码:

private int awaitDone(boolean timed, long nanos)        throws InterruptedException {//超时时间点        final long deadline = timed ? System.nanoTime() + nanos : 0L;        WaitNode q = null;        boolean queued = false;//循环执行        for (;;) {//中断处理            if (Thread.interrupted()) {                removeWaiter(q);                throw new InterruptedException();            }            int s = state;//首先判断是否完成,如果完成,直接返回state结果            if (s > COMPLETING) {                if (q != null)                    q.thread = null;                return s;            }            else if (s == COMPLETING) // cannot time out yet                Thread.yield();//未完成执行,则将当前线程(主线程)设置成等待节点并放入到等待队列中            else if (q == null)                q = new WaitNode(); //将waiters设置成q的next节点,并替换,改操作只做一次,将main线程加入到队列中,只加一次            else if (!queued)                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                                                     q.next = waiters, q);//如果启动了超时等待            else if (timed) {//计算剩余超时时间                nanos = deadline - System.nanoTime();//超时,移除队列中的等待线程,返回state,上层判断state,并报出超时异常                if (nanos <= 0L) {                    removeWaiter(q);                    return state;                }//还未超时,则等待nanos时间,然而如果task执行接受,并不会一直等待,在run()中的set()中,会执行finishCompleted(),将对待队列线性唤醒,调用LockSupport.unpark()方法来唤醒等待线程                LockSupport.parkNanos(this, nanos);            }            else//未启用超时功能,在未完成callable,则无限期等待                LockSupport.park(this);        }    }

 最后看一下finishCompletion()方法,调用callable,是如何唤醒main线程

private void finishCompletion() {        // assert state > COMPLETING;//遍历等待队列        for (WaitNode q; (q = waiters) != null;) {//将waiters的头结点中线程设置null,即移除某个等待线程            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {                for (;;) {                    Thread t = q.thread;//如果等待线程不为null  调用LockSupport.unpark(t)唤醒,然后操作下一个等待线程                    if (t != null) {                        q.thread = null;                        LockSupport.unpark(t);                    }                    WaitNode next = q.next;                    if (next == null)                        break;                    q.next = null; // unlink to help gc                    q = next;                }                break;            }        }        done();        callable = null;        // to reduce footprint    }

至此,FutureTask的源码解析已经完成,接下来看一下开源对于异步处理的解决方案。

 

举一反三:

ChannelFuture

ListenableFuture

 

转载于:https://www.cnblogs.com/gaojy/p/7237167.html

你可能感兴趣的文章
Oracle 切割字符查询
查看>>
结构体内存对齐具体解释
查看>>
Cocos Code IDE里xcodeprojectlua脚本更新
查看>>
Windows Developer Day - Adaptive Cards
查看>>
WPF自学入门(九)WPF自定义窗口基类
查看>>
ThreadLocal,LinkedBlockingQueue,线程池 获取数据库连接2改进
查看>>
mongodb的数据库操作
查看>>
继续吐槽在net下没有合适的Disk Cache之使用EhCache
查看>>
RxJava操作符之Share, Publish, Refcount
查看>>
js拦截全局ajax请求
查看>>
数据库基础常用知识
查看>>
在Linux,误删磁盘分区怎么恢复呢【转】
查看>>
LEARN SWIFT
查看>>
jenkins指定具体项目具体分支进行构建部署
查看>>
关于音频文件的上传
查看>>
数据仓库学习笔记
查看>>
js蛋疼的Class(获取class对象)
查看>>
powershell入门教程-v0.3版
查看>>
nginx的proxy_cache缓存配置
查看>>
论文笔记:Variational Capsules for Image Analysis and Synthesis
查看>>