性能篇系列—stream详解,h5stream性能
性能篇系列—stream详解,h5stream性能
Stream API
- Stream API通过Lambda表达式对集合进行各种非常便利高效的聚合操作,或者大批量数据操作
// java.util.Collection
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
@Data
class Student {
private Integer height;
private String sex;
}
Map<String, List<Student>> map = Maps.newHashMap();
List<Student> list = Lists.newArrayList();
// 传统的迭代方式
for (Student student : list) {
if (student.getHeight() > 160) {
String sex = student.getSex();
if (!map.containsKey(sex)) {
map.put(sex, Lists.newArrayList());
}
map.get(sex).add(student);
}
}
// Stream API,串行实现
map = list.stream().filter((Student s) -> s.getHeight() > 160).collect(Collectors.groupingBy(Student::getSex));
// Stream API,并行实现
map = list.parallelStream().filter((Student s) -> s.getHeight() > 160).collect(Collectors.groupingBy(Student::getSex));
优化遍历
Stream操作分类
- 无状态操作:元素的处理不受之前元素的影响
- 有状态操作:该操作只有拿到所有元素之后才能继续下去
- 短路操作:遇到某些符合条件的元素就可以得到最终结果
- 非短路操作:必须处理完所有元素才能得到最终结果
Stream源码实现
- BaseStream定义了流的基本接口方法,如spliterator、isParallel等
- Stream定义了流的常用操作方法,如map、filter等
- 内部定义了Head、StatelessOp和StatefulOp三个内部类,实现了BaseStream和Stream的接口方法
- ReferencePipeline最终会将整个Stream流操作组装成一个调用链
- 而调用链上的每个Stream操作的上下文关系就是通过Sink接口来定义实现的
Stream操作叠加
- Head类主要用来定义数据源操作,初次调用.stream()时,会初次加载Head对象
- 接着加载中间操作,分为StatelessOp对象和StatefulOp对象
- 此时的Stage并没有执行,而是通过AbstractPipeline生成了中间操作的Stage链表
- 当调用终结操作时,会生成一个最终的Stage
- 通过这个Stage触发之前的中间操作,从最后一个Stage开始,递归产生一个Sink链
样例
List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");
String maxLenStartWithZ = names.stream()
.filter(name -> name.startsWith("张"))
.mapToInt(String::length)
.max()
.toString();
names是ArrayList集合,names.stream会调用集合类基础接口Collection的stream方法
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
Collection.stream方法会调用StreamSupport.stream方法,方法中初始化了一个ReferencePipeline的Head内部类对象
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
调用filter和map,两者都是无状态的中间操作,因此并没有执行任何操作,只是分别创建了一个Stage来标识用户的每一次操作
通常情况下,Stream的操作需要一个回调函数,所以一个完整的Stage是由数据来源、操作、回调函数组成的三元组表示
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
@Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
new StatelessOp会调用父类AbstractPipeline的构造函数,该构造函数会将前后的Stage联系起来,生成一个Stage链表
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
if (previousStage.linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
previousStage.linkedOrConsumed = true;
previousStage.nextStage = this; // 将当前的Stage的next指针指向之前的Stage
this.previousStage = previousStage; // 赋值当前Stage当全局变量previousStage
this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
this.sourceStage = previousStage.sourceStage;
if (opIsStateful())
sourceStage.sourceAnyStateful = true;
this.depth = previousStage.depth + 1;
}
创建Stage时,会包含opWrapSink方法,该方法把一个操作的具体实现封装在Sink类中,Sink采用处理->转发的模式来叠加操作
调用max,会调用ReferencePipeline的max方法
由于max是终结操作,会创建一个TerminalOp操作,同时创建一个ReducingSink,并且将操作封装在Sink类中
@Override
public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
return reduce(BinaryOperator.maxBy(comparator));
}
最后调用AbstractPipeline的wrapSink方法,生成一个Sink链表,Sink链表中的每一个Sink都封装了一个操作的具体实现
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
当Sink链表生成完成后,Stream开始执行,通过Spliterator迭代集合,执行Sink链表中的具体操作
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
Stream并行处理
List<String> names = Arrays.asList("张三", "李四", "王老五", "李三", "刘老四", "王小二", "张四", "张五六七");
String maxLenStartWithZ = names.stream()
.parallel()
.filter(name -> name.startsWith("张"))
.mapToInt(String::length)
.max()
.toString();
Stream的并行处理在执行终结操作之前,跟串行处理的实现是一样的,在调用终结方法之后,会调用TerminalOp.evaluateParallel
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
合理使用Stream
小结
- 中间操作又根据元素之间状态有无干扰分为有状态操作和无状态操作,实现了链式结构中的不同阶段
- Stream在执行中间操作时,并不会做实际的数据操作处理,而是将这些中间操作串联起来,最终由终结操作触发
- 生成一个数据处理链表,通过Java 8的Spliterator迭代器进行数据处理
- 对中间操作的处理跟串行处理的方式是一样的,但在终结操作中,Stream将结合ForkJoin框架对集合进行切片处理
我是小架,我们下篇文章再见!
相关文章
- 暂无相关文章
用户点评