JDK 8
Stream
Stream提供对集合(Collection)、数组(Array)进行一些更高级的操作功能,比如排序、筛选、取最值等。适合高效的聚合操作(aggregate operation)和大批量数据操作(bulk data operation)。
Stream提供串行和并行两种模式进行聚合操作,并发模式能够充分利用多核优势,使用fork/join并行方式来拆分任务。
Stream API无需编写一行多线程代码,就可以很方便地写出高性能的并发程序。
Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本如下:
- 1.0-1.4 中的 java.lang.Thread
- 5.0 中的 java.util.concurrent
- 6.0 中的 Phasers 等
- 7.0 中的 Fork/Join 框架
- 8.0 中的 Lambda
Stream 的另外一大特点是,数据源本身可以是无限的。
流的构成
获取一个数据源(source)-> 数据转换 -> 执行操作获取结果
每次转换,原有Stream对象不变,返回一个新的Stream对象。
示例
实例:
有一个交易类:Transaction,这个类中有交易号(id)、交易类型(type)和交易金额(value)这两个字段,现在需要我们将交易类型为grocery 的所有交易筛选出来,然后返回交易值降序排好序的交易号的集合,代码具体如下:
在jdk1.7之前我们会这样写:
// 找出type为grocery的交易集合
List<Transaction> groceryTransactions = new Arraylist<>();
for(Transaction t: transactions){
if(t.getType() == Transaction.GROCERY){
groceryTransactions.add(t);
}
}
// 按降序排好序
Collections.sort(groceryTransactions, new Comparator(){
public int compare(Transaction t1, Transaction t2){
return t2.getValue().compareTo(t1.getValue());
}
});
// 找出交易id
List<Integer> transactionIds = new ArrayList<>();
for(Transaction t: groceryTransactions){
transactionsIds.add(t.getId());
}
使用java 8 Stream后的代码就变成了这样:
List<Integer> transactionsIds = transactions.parallelStream().
filter(t -> t.getType() == Transaction.GROCERY).
sorted(comparing(Transaction::getValue).reversed()).
map(Transaction::getId).
collect(toList());
获取Stream对象
有三种主要方式获取Stream对象
- Stream.of() Stream类的静态方法构建
- Arrays.stream() 数组转换成流
- list.stream() 集合转换成流
两种操作类型
Stream有两种操作类型:
- Intermediate 中间操作,操作完返回的还是一个Stream对象,可以继续对其执行Stream操作
map(flatMap), filter, distinct, sorted, limit, skip, parallel
- Terminal 终端操作,操作完是一个最终结果,无法再进行Stream操作
forEach, toArray, reduce, collect, min, max, count, iterator
还有一种操作类型
- Short-circuiting 中间操作的一种,操作完也是一个有限的Stream对象
anyMatch, findFirst, limit
大多数Stream操作都支持lambda表达式
map
Stream对象里的每个元素中执行一遍操作再返回一个Stream,相对于一个隐式的迭代器(Iterator)<R> Stream<R> map(Function<? super T, ? extends B> mapper)
。
// 将wordList集合中的元素全部转换成大写,然后包装成List集合返回。
List<String> output = wordList.stream().map(String::toUpperCase).collect(Collectors.toList());
map
就是将一种类型的值转换成另外一种类型的值。即将一个Stream对象转为另一个Stream对象,不能将多个Stream对象转为一个Stream对象。
flatMap
flatMap
提供将多个Stream对象转为一个Stream对象。
flatMap
将Stream的每个元素转换成一个Stream对象。
常见实例:
// 拆分单词为单个字符,并去掉重复字符
String[] words = {"hello", "world"};
// 错误写法:返回的是一个列表数组
List<String[]> strings = Arrays.stream(words).
map(e -> e.split("")).
distinct().
collect(Collectors.toList());
strings.forEach(System.out::println); // 这里输出的是两个数组对象
// 正确写法:返回的是正确结果
List<String> strings1 = Arrays.stream(words).
map(e -> e.split("")).
flatMap(Arrays::stream).
distinct().
collect(Collectors.toList());
strings1.forEach(System.out::println);
flatMap(Arrays::stream)
将map返回的列表数组对象转为一个Stream对象。
函数原型为<R> Stream<R> flatMap(Function<? super T,? extends Stream<? extends R>> mapper)
filter
对集合进行过滤,筛选出满足函数条件的Stream对象Stream<T> filter(Predicate<? super T> predicate)
。filter是一个中间操作,所以只调用filter不会有实际计算。
forEach
类似map,对Stream上每一个元素执行一遍函数表达式void forEach(Consumer<? super E> action)
。forEach是一个terminal操作,所以会立即执行。
peek
类似map,对Stream上每一个元素执行一遍函数表达式,但是函数没有返回值,即不会创建新的Stream对象,直接返回原Stream对象。
map/peek/forEach的区别
三个方法都是对Stream进行迭代操作,map/peek会返回Stream对象,forEach不会。map会创建新的Stream,peek返回原Stream。
reduce
组合Stream对象所以元素,比如拼接字符,计算和,最大,最小值。
// 字符串连接,concat = "ABCD"
String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat);
// 求最小值,minValue = -3.0
double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
limit/skip
limit返回Stream的前N个元素,skip过滤掉Stream的前N个元素。
sorted
对Stream的元素进行排序,参数为比较函数表达式。
排序函数有两个,一个是用自然排序,一个是使用自定义比较器排序,函数原型为Stream<T> sorted()
和Stream<T> sorted(Comparator<? super T> comparator)
。
sorted 时间复杂度O(NlogN)
min/max/distinct
min/max 时间复杂度O(N)
Match
- allMatch Stream中所有元素符合,返回true
- anyMatch Stream中只要有一个元素符合,返回true
- noneMatch Stream中所有元素到不符合,返回true
内部实现原理
BaseStream
Stream和Collections的区别:
- 无存储,Stream不是一种数据结构
- 惰性执行,Stream上的操作不会立即执行
对Stream的操作分为两类,中间操作和结束操作:
- 中间操作是惰性执行, 调用中间操作只会生成一个标记了该操作的新Stream,
- 结束操作会触发实际计算,计算发生时会把所有中间操作以pipeline的方式执行,这样可以减少迭代次数。
区分中间操作和结束操作,最简单的方法,就是看方法有没返回值,有返回值的大部分是中间操作,否则是结束操作。
Pipeline
目的,减少迭代次数。
Stream是一个接口,具体的实现类为ReferencePipeline(抽象类)。
ReferencePipeline定义了三个静态内部类:Head, StatelessOp, StatefulOp。
stream()对象实际是一个ReferencePipeline.Head实例。
filter() 方法,返回的是一个ReferencePipeline.StatelessOp实例。
每个节点都包含上一节点的this引用,就这样把一个个中间操作拼接到了控制数据流入的Head后面,但是并没有开始做任何数据处理的动作。
各个节点初始化的时候还会将当前节点的引用传递给上一个节点,各个节点组成了一个双向链表的结构。
Stream是延时执行的,即惰性求值实现。
Sink是一个Consumer接口,又定义了begin、end方法。
Stream执行的整个过程:
首先将Collection转为Stream,作为流水线头,然后将各个中间节点连接起来。每个中间操作节点都定义了自己对应的Sink,并重写了makeSink方法用来返回自己的Sink实例。直到终止操作节点出现时才开始将Sink实例化并串起来。
Stream的实现原理
Stream底层是由Pipeline实现的,流水线结构,由各个操作节点组成。Pipeline有三类节点Head头节点、中间操作节点、Terminal终止节点。
Stream启动的时候先开始组装Pipeline,创建各个中间操作节点,每个节点都会引用上一节点,同时也会将当前节点的引用传递给上一个节点,形成双向链表结构。当节点全部创建完成后,表示Pipeline组装完毕。接下来开始启动Pipiline,从Terminal节点开始倒序实例化Sink对象(Sink是一个Consumer类型的函数式接口),当所有Sink对象创建完成后,从Head的下一个节点开始执行Sink实例,Sink实例提供三个方法begin、accept、end。begin用于通知事情要开始,end用于通知事情要结束了,accept执行方法。从第一个中间操作节点执行,如果是无状态的节执行完或接收到通知会立即传递给下一个节点,对于有状态的节点要等数据处理完成后,才end通知并将数据传递给下一个节点。
并行原理
Stream的并行处理是基于ForkJoin框架的
Optional
Optional
构造方法
JDK 提供三个静态方法来构造一个Optional:
-
Optional.of(T value),该方法通过一个非 null 的 value 来构造一个 Optional,返回的 Optional 包含了 value 这个值。对于该方法,传入的参数一定不能为 null,否则便会抛出 NullPointerException。
-
Optional.ofNullable(T value),该方法和 of 方法的区别在于,传入的参数可以为 null , 如果为 null 的话,返回的就是 Optional.empty()。
-
Optional.empty(),该方法用来构造一个空的 Optional,即该 Optional 中不包含值,其实底层实现还是 如果 Optional 中的 value 为 null 则该 Optional 为不包含值的状态,然后在 API 层面将 Optional 表现的不能包含 null 值,使得 Optional 只存在 包含值 和 不包含值 两种状态。
ifPresent
如果Optional有值,则执行函数
// getUserById(id) 可能返回null
Optional<User> user = Optional.ofNullable(getUserById(id));
// 如果user不为null,则输出username;否则不执行任何操作
user.ifPresent(u -> System.out.println("Username is: " + u.getUsername()));
orElse
如果Optional没有值,则返回orElse方法返回的参数。
// 如果getUserById为null,则默认创建一个User对象
User user = Optional
.ofNullable(getUserById(id))
.orElse(new User(0, "Unknown"));
System.out.println("Username is: " + user.getUsername());
orElseThrow
如果Optional没有值,抛出异常
@RequestMapping("/{id}")
public User getUser(@PathVariable Integer id) {
Optional<User> user = userService.getUserById(id);
// 如果user为null,则执行orElseThrow方法
return user.orElseThrow(() -> new EntityNotFoundException("id 为 " + id + " 的用户不存在"));
}
增强类型推断
JDK 5 新增了泛型,JDK 7 新增了泛型类型推断,JDK 8 增强了类型推断。
增强类型推断:
- 支持通过方法上下文推断泛型的目标类型
- 支持在方法的调用链中,传递推断类型。
lambda
lambda如何实现的/实现原理?与函数式接口的关系?
编译后会生成一个私有的静态函数
与函数式接口的关系?
一个lambda表达式必须要与一个函数式接口对应。
lambda实现原理
编译后生成一个Lambda类
,该类实现了具体的抽象方法,声明变为private static静态私有的(方便内部类调用),但是并没有声明实现函数式接口。Lambda类
定义了一个内部类$Lambda$1
,该内部类实现了函数式接口,并重写了抽象方法,在该方法内部直接调用了外部Lambda
父类实现的具体方法。
以下为lambda实现原理的伪代码
@FunctionalInterface
interface Print<T> {
public void print(T x);
}
// 外部类没有实现Print接口,但是实现了具体的print操作。
public class Lambda {
public static void PrintString(String s, Print<String> print) {
print.print(s);
}
// 静态私有方法,内部类可以直接调用
private static void lambda$0(String x) {
System.out.println(x);
}
// 内部类实现了Print接口
final class $Lambda$1 implements Print{
@Override
public void print(Object x) {
// 直接调用外部类的方法实现
lambda$0((String)x);
}
}
public static void main(String[] args) {
PrintString("test", new Lambda().new $Lambda$1());
}
}
实现类生成后,如何调用呢?
函数式接口
只包含一个抽象方法的接口,成为函数式接口,该接口使用@FunctionalInterface
注解,可以用来检查它是否是一个函数式接口。
函数式接口,只能定义一个方法,定义多个方法,编译期会报错。但是, 可以定义默认方法、静态方法和重写Object里的pulibc方法。
它跟lambda的关系?
Java内置四个核心函数式接口:
- Consumer
消费型接口,表示只接收一个输入参数,并没有返回值的操作。接口方法:void accept(T t)。 - Supplier
供给型接口,返回一个类型为T的对象。接口方法:T get()。 - Function<T, R> 函数型接口,表示接收一个输入参数,并提供一个结果函数。接口方法:R apply(T t)。
- Predicate
断言型接口,确定类型为T的对象是否满足约束,并返回Boolean值。接口方法:boolean test(T t)。
JDK 8 中 java.lang.Runnable
接口添加了@FunctionalInterface
注解,表示Runnable
为一个函数式接口,可以通过lambda表达式来创建该接口的对象。
函数式接口可以通过lambda
表达式,方法引用,构造方法引用来实例化。
为什么引入函数式接口?
- 代码简洁
- 多核友好
ConcurrentHashMap
在java.util.concurrent
包下,定义了非常多的静态内部类。并且大量使用Unsafe类。
ConcurrentHashMap(int, float, int)
tableSizeFor(int): int
:返回一个比给定整数大且最接近的2的幂次方整数,如给定10,返回2的4次方16。该方法的核心就是让给定整数的二进制所有为都变为1。
put(K, V): V
:该方法没有加上synchronized,key和value都不允许为null。首先对key.hashCode进行hash操作,得到key的hash值。判断集合是否为空,如果为空则初始化集合initTable()
。集合不为空,则根据计算出的hash值作为集合的坐标,获取对应坐标的值,使用tabAt(tab, i = (n - 1) & hash)
方法来获取值,该方法相当于tab[(n - 1) & hash]
。tabAt内部使用Unsafe.getObjectVolatitle来获取。因为table是volatitle修饰的,在Java内存模型中,每个线程都有一个工作内存,里面存储着table的副本,为了保证线程每次拿到table中的元素值最新的。如果对应坐标的值不存在,则新建该坐标值casTabAt
也是使用Unsafe.compareAndSwapObject方法。取到了坐标值,如果该坐标值正在被移动即其他线程整个在对其扩容则一起进行扩容操作。
remove(Object): V
get(Object): V
containsKey(Object): boolean
keySet().iterator()
代码结构
JDK 1.8 的ConcurrentHashMap代码结构发生了超级大的改变,与 JDK 1.7 之前的完全不一样了。这里先找几个我们认识的。
- Node map节点,最小存储单元
- ForwardingNode 指向正在扩容的map
- ReservationNode 节点占位
- CounterCell 用于size统计的,当发生更新冲突时,使用该对象
- TreeNode 树节点的Node
- TreeBin
- *Iterator 各种迭代器
- *View
- ForEach*Task 各种任务
- Search*Task
- Reduce*Task
- MapReduce*Task
构造函数
提供了5个构造函数,大部分使用默认无参构造函数,直接实例化一个map。
成员变量
跟树相关的变量
- TREEIFY_THRESHOLD 8 链表大于8,转为树
- UNTREEIFY_THRESHOLD 6 树节点小于6,转为链表
- MIN_TREEIFY_CAPACITY 64 map容量大于64才开始进行树优化
跟统计相关的变量
- volatile long baseCount
- volatile CounterCell[] counterCells
跟节点相关的变量
- volatile Node<K, V>[] table; map
- volatile Node<K, V>[] nextTable; 扩容时候使用
跟集合初始化和扩容相关的变量
- volatile int sizeCtl 如果为-1,表示集合正在初始化或扩容(负数的数值-1表示目前操作的线程个数);大于0,表示集合的容量
put
直接开始put操作,这里先只分析第一层次。
put操作主要是通过CAS+synchronized进行插入的,通过for自旋确保插入成功
- 计算key的哈希值hash
- 如果是第一次插入,对table进行初始化,通过CAS方式进行
- 通过hash找到其在数组中的位置,获取对应的Node节点,如果Node为null,直接CAS插入
- 如果Node不为null,synchronized加锁;检查Node类型,如果是链表,则遍历链表插入;如果是树类型,则执行树的插入;
- 在遍历链表过程中会统计节点的数量,成功插入后,检查数量值是否超过设置的阈值8,如果超过了,则将链表转为树;
- 最后容量+1
以上步骤,如果执行失败会继续for循环,直到操作成功。所以1.8的ConcurrentHashMap.put操作是通过自旋+CAS+synchronized方式实现的。
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 1. 计算key哈希值
int hash = spread(key.hashCode());
int binCount = 0; // 统计链表节点个数,用于检查是否要转为树型结构
// for无限循环(目前有几种情况会再次循环:1. 初始化table后;2.casTabAt更新失败后;3.一起扩容后;4. 插入失败)
for (Node<K, V>[] tab = table; ; ) {
Node<K, V> f; // f 为key哈希计算出的对应Node
int n, i, fh;
if (tab == null || (n = tab.length) == 0) {
// 2. 第一次put才初始化table,初始化完重新for循环,确认table不为null
tab = initTable();
} else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // tabAt方法:计算key在table中的位置,找到对应Node节点
// 3. 如果key对应的Node为null,说明还没插入过(1次hash到node,如果node为null,则通过CAS进行更新)
if (casTabAt(tab, i, null, // casTabAt 将key插入,这里用CAS方式来更新
new Node<K, V>(hash, key, value, null))) {
// 4. 插入成功,结束for循环
break; // no lock when adding to empty bin
}
} else if ((fh = f.hash) == MOVED) {
// 5. 如果当前Node正在扩容,加入一起扩容
// ForwardingNode类型的节点,hash值固定为MOVED 即 -1,表示一个占位说明。Node节点的hash为-1时,就说明当前节点正在扩容。
tab = helpTransfer(tab, f);
} else {
// 6. 如果Node不为null,要就要继续遍历了。(JDK 1.8 之前,是一个链表,那就是遍历链表,头插入)
V oldVal = null;
// 7. 到这里才使用synchronized关键字同步(1次hash到node,如果node不为null,则通过synchronized进行更新)
synchronized (f) {
if (tabAt(tab, i) == f) { // 再解释一遍,tabAt用于获取tab数组下标为i的数据
// 8. 这里先知道下 hash值大于0说明是Node是链表类型节点
if (fh >= 0) {
binCount = 1; // 统计遍历了多少个节点
// 开始遍历Node链表
for (Node<K, V> e = f; ; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 找到对应的Node
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value; // 直接替换值,将旧值返回
break;
}
Node<K, V> pred = e;
if ((e = e.next) == null) { // 取下一个节点
pred.next = new Node<K, V>(hash, key, // 如果下一个节点为null,将新节点插入到链尾
value, null);
break;
}
// 继续循环下一个节点
}
} else if (f instanceof TreeBin) {
// 说明Node是树结构类型
Node<K, V> p;
binCount = 2;
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, // 用树的方式插入节点
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 不等于0,说明已经加锁并插入成功了
if (binCount != 0) {
// 这里检查下Node的节点数是否超过设置的阈值8
if (binCount >= TREEIFY_THRESHOLD)
// 这里不是直接转,而是先判断如果当然table容量小于64,则会对数组进行扩容,而不是转为树。
treeifyBin(tab, i); // 如果超过64,才将链表转为树
if (oldVal != null)
return oldVal;
break;
}
}
}
// 容量count+1
addCount(1L, binCount);
return null;
}
get
再来看看get操作
- 计算key的hash值
- 根据hash取得Node节点,如果当前Node节点就是所要找的值,直接返回
- 检查Node节点类型,如果是树类型,直接通过树进行查找;
- 否者,遍历链表
public V get(Object key) {
Node<K, V>[] tab;
Node<K, V> e, p;
int n, eh;
K ek;
// 1. 计算key的哈希值
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 && // 检查table是否已初始化
(e = tabAt(tab, (n - 1) & h)) != null) { // 根据hash值取到对应Node节点
if ((eh = e.hash) == h) {
// 如果hash相等,equals也相等
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
} else if (eh < 0) {
// 如果hash不相等,检查是否是树节点,如果是,就用树查找
return (p = e.find(h, key)) != null ? p.val : null;
}
// 否则,这是一个链表节点,遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
再看put
上面只分析了put操作的第一层,现在往下分析,也分析几个主要的
initTable
通过CAS来创建table
- 检查sizeCtl值,判断是否有其它线程已经开始对table进行初始化了
- 如果有,yield进入等待,让出CPU
- 如果没有进行初始化,通过CAS将sizeCtl更新为-1,表示当前线程要对table进行初始化
- 更新成功后,创建table,并计算出sizeCtl值,返回
helpTransfer
addCount
Future
CompletableFuture
异步串行执行,并可以等待前面任务结果返回
实现了implements Future<T>, CompletionStage<T>
两个接口
- Future接口说明它具有异步执行能力
- CompletionStage接口后面再解释