JDK8流式操作Stream

2017/09/01 JavaEE

Stream 的特性

  • 不是数据结构
  • 它没有内部存储,它只是用操作管道从 source(数据结构、数组、generator function、IO channel)抓取数据。
  • 它也绝不修改自己所封装的底层数据结构的数据。例如 Stream 的 filter 操作会产生一个不包含被过滤元素的新 Stream,而不是从 source 删除那些元素。
  • 所有 Stream 的操作必须以 lambda 表达式为参数
  • 不支持索引访问
  • 你可以请求第一个元素,但无法请求第二个,第三个,或最后一个。不过请参阅下一项。
  • 很容易生成数组或者 List
  • 惰性化
  • 很多 Stream 操作是向后延迟的,一直到它弄清楚了最后需要多少数据才会开始。
  • Intermediate 操作永远是惰性化的。
  • 并行能力
  • 当一个 Stream 是并行化的,就不需要再写多线程代码,所有对它的操作会自动并行进行的。
  • 可以是无限的
    • 集合有固定大小,Stream 则不必。limit(n) 和 findFirst() 这类的 short-circuiting 操作可以对无限的 Stream 进行运算并很快完成。

流利用了这种最强大的计算原理:组合。
遵循 “深度优先” 而不是 “宽度优先” 的执行战略(跟踪一个数据元素在整个管道中的深度),会让被处理的操作在缓存中变得更 “热”,所以您可以将更多时间用于计算,花更少时间来等待数据

流的构成

使用一个流,包括三个基本步骤:

获取数据源(source) → 数据转换 → 执行操作获取结果

每次转换原有 Stream 对象不改变,返回一个新的 Stream 对象(可以有多次转换),这就允许对其操作可以像链条一样排列,变成一个管道

生成 Stream Source

  • 从 Collection 和数组
    • Collection.stream()
    • Collection.parallelStream()
    • Arrays.stream(T array) or Stream.of()
  • 从 BufferedReader
    • java.io.BufferedReader.lines()
  • 静态工厂
    • java.util.stream.IntStream.range()
      • IntStream、LongStream、DoubleStream。当然我们也可以用 Stream<Integer>Stream<Long>Stream<Double>,但是 boxing 和 unboxing 会很耗时,所以特别为这三种基本数值型提供了对应的 Stream。
    • java.nio.file.Files.walk()
  • 自己构建
    • java.util.Spliterator
  • 其它
    • Random.ints()
    • BitSet.stream()
    • Pattern.splitAsStream(java.lang.CharSequence)
    • JarFile.stream()
// 构造流的几种常见方法
// 1. Individual values
Stream stream = Stream.of("a", "b", "c");
// 2. Arrays
String [] strArray = new String[] {"a", "b", "c"};
stream = Stream.of(strArray);
stream = Arrays.stream(strArray);
// 3. Collections
List<String> list = Arrays.asList(strArray);
stream = list.stream();
// 数值流的构造
IntStream.of(new int[]{1, 2, 3}).forEach(System.out::println);
IntStream.range(1, 3).forEach(System.out::println);
IntStream.rangeClosed(1, 3).forEach(System.out::println);

自己生成流

Stream.generate

通过实现 Supplier 接口,你可以自己来控制流的生成。这种情形通常用于随机数、常量的 Stream,或者需要前后元素间维持着某种状态信息的 Stream。把 Supplier 实例传递给 Stream.generate() 生成的 Stream,默认是串行(相对 parallel 而言)但无序的(相对 ordered 而言)。

由于它是无限的,在管道中,必须利用 limit 之类的操作限制 Stream 大小。

Random seed = new Random();
Supplier<Integer> random = seed::nextInt;
Stream.generate(random).limit(10).forEach(System.out::println);
//Another way
IntStream.generate(() -> (int) (System.nanoTime() % 100)).
        limit(10).forEach(System.out::println);
public static void main(String[] args) {
    Stream.generate(new PersonSupplier()).
            limit(10).
            forEach(p -> System.out.println(p.getName() + ", " + p.getNo()));
}

private static class PersonSupplier implements Supplier<Person> {
    private int index = 0;
    private Random random = new Random();
    @Override
    public Person get() {
        return new Person(index++, "StormTestUser" + index, random.nextInt(100));
    }
}
Stream.iterate

iterate 跟 reduce 操作很像,接受一个种子值,和一个 UnaryOperator(例如 f)。然后种子值成为 Stream 的第一个元素,f(seed) 为第二个,f(f(seed)) 第三个,以此类推。

与 Stream.generate 相仿,在 iterate 时候管道必须有 limit 这样的操作来限制 Stream 大小。

Stream.iterate(0, n -> n + 3).limit(10). forEach(x -> System.out.print(x + " "));
// 0 3 6 9 12 15 18 21 24 27

流的操作类型

  • Intermediate(中间操作):一个流可以后面跟随零个或多个 intermediate 操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。
    • 转换操作都是 lazy 的,多个转换操作只会在 Terminal 操作的时候融合起来,一次循环完成
    • map (mapToInt, flatMap 等)
    • filter
    • distinct
    • sorted
    • peek
    • limit
    • skip
    • parallel
    • sequential
    • unordered
  • Terminal(终止操作):一个流只能有一个 terminal 操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以这必定是流的最后一个操作。Terminal 操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个 side effect。
    • forEach
    • forEachOrdered
    • toArray
    • reduce
    • collect
    • min
    • max
    • count
    • anyMatch
    • allMatch
    • noneMatch
    • findFirst
    • findAny
    • iterator
  • short-circuiting
    • 对于一个 intermediate 操作,如果它接受的是一个无限大(infinite/unbounded)的 Stream,但返回一个有限的新 Stream。
    • 对于一个 terminal 操作,如果它接受的是一个无限大的 Stream,但能在有限的时间计算出结果。
    • anyMatch
    • allMatch
    • noneMatch
    • findFirst
    • findAny
    • limit

将流管道表达为一系列功能转换,有助于实施一些有用的执行战略,比如惰性、并行性、短路和操作融合。

map (mapToInt, flatMap 等)

对于 forEach() 版本,多个线程会同时尝试访问一个结果容器,而对于并行 collect(),每个线程拥有自己的本地结果容器,会在以后合并其中的结果。

一对一

List<Integer> nums = Arrays.asList(1, 2, 3, 4);
List<Integer> squareNums = nums.stream()
        .map(n -> n * n)
        .collect(Collectors.toList());

多对一

Stream<List<Integer>> inputStream = Stream.of(
        Arrays.asList(1),
        Arrays.asList(2, 3),
        Arrays.asList(4, 5, 6)
);
Stream<Integer> outputStream = inputStream
        .flatMap((childList) -> childList.stream());
outputStream.forEach(System.out::println);

reduce

String concatenated = strings.stream().reduce("", String::concat);
int sum = Stream.of(ints).reduce(0, (x,y) -> x+y);
// 缩减不需要仅应用于整数和字符串,它可以应用于您想要将元素序列缩减为该类型的单个元素的任何情形。
Comparator<Person> byHeight = Comparator.comparing(Person::getNo);
BinaryOperator<Person> tallerOf = BinaryOperator.maxBy(byHeight);
Optional<Person> tallest = people.stream().reduce(tallerOf);
Person person = tallest.get();

collect

收集的参数方法

  • 创建:一种生成空结果容器的途径
  • 填充:一种将新元素合并到结果容器中的途径
  • 合并结果容器:一种合并两个结果容器的途径
Set<String> uniqueStrings = strings.stream()
                                   .collect(HashSet::new,
                                            HashSet::add,
                                            HashSet::addAll);
// 高效收集字符串
StringBuilder concat = strings.stream()
                              .collect(() -> new StringBuilder(),
                                       (sb, s) -> sb.append(s),
                                       (sb, sb2) -> sb.append(sb2));
StringBuilder concat = strings.stream()
                              .collect(StringBuilder::new,
                                       StringBuilder::append,
                                       StringBuilder::append);
Collectors
方法 作用
toList() 将元素收集到一个 List 中。
toSet() 将元素收集到一个 Set 中。
toCollection(Supplier<Collection>) 将元素收集到一个特定类型的 Collection 中。
toMap(Function<T, K>, Function<T, V>) 将元素收集到一个 Map 中,依据提供的映射函数将元素转换为键值。
summingInt(ToIntFunction<T>) 计算将提供的 int 值映射函数应用于每个元素(以及 long 和 double 版本)的结果的总和。
summarizingInt(ToIntFunction<T>) 计算将提供的 int 值映射函数应用于每个元素(以及 long 和 double 版本)的结果的 sum、min、max、count 和 average。
reducing() 向元素应用缩减(通常用作下游收集器,比如用于 groupingBy)(各种版本)。
partitioningBy(Predicate<T>) 将元素分为两组:为其保留了提供的预期的组和未保留预期的组。
partitioningBy(Predicate<T>, Collector) 将元素分区,使用指定的下游收集器处理每个分区。
groupingBy(Function<T,U>) 将元素分组到一个 Map 中,其中的键是所提供的应用于流元素的函数,值是共享该键的元素列表。
groupingBy(Function<T,U>, Collector) 将元素分组,使用指定的下游收集器来处理与每个组有关联的值。
minBy(BinaryOperator<T>) 计算元素的最小值(与 maxBy() 相同)。
mapping(Function<T,U>, Collector) 将提供的映射函数应用于每个元素,并使用指定的下游收集器(通常用作下游收集器本身,比如用于 groupingBy)进行处理。
joining() 假设元素为 String 类型,将这些元素联结到一个字符串中(或许使用分隔符、前缀和后缀)。
counting() 计算元素数量。(通常用作下游收集器。)
Map<Seller, Txn> biggestTxnBySeller =
    txns.stream()
        .collect(groupingBy(Txn::getSeller,
                            maxBy(comparing(Txn::getAmount))));// 无需将每个键的元素收集到列表中,而是将它们提供给另一个收集器

List<Integer> integers = Arrays.asList(1, 2, 3, 4, 4, 21, 23, 33, 2);
Map<Boolean, Long> collect = integers.stream()
        .collect(
                Collectors.partitioningBy(
                        e -> e.compareTo(20) > 0,
                        Collectors.counting()
                )
        );
// {false=6, true=3}

Map<Integer, Double> collect = orders.stream()
                .collect(
                        Collectors.groupingBy(
                                Order::getRegion,
                                Collectors.averagingDouble(Order::getPrice
                                )
                        )
                );

// 使用 Streams 计算单词数量直方图
Pattern whitespace = Pattern.compile("\\s+");
Map<String, Integer> wordFrequencies =
    reader.lines()
          .flatMap(s -> whitespace.splitAsStream())
          .collect(groupingBy(String::toLowerCase),
                              Collectors.counting());
Arrays.stream(params.split("&"))
                .map(e -> e.split("="))
                .collect(
                        Collectors.toMap(
                                t -> t[0],
                                t -> t[1]
                        )
                );
自定义Collector

toList()实现

// Collector 接口
// 输入类型 T、累加器类型 A 和最终的返回类型 R(A 和 R 通常是相同的)
public interface Collector<T, A, R> {
    /** Return a function that creates a new empty result container */
    Supplier<A> supplier();
    /** Return a function that incorporates an element into a container */
    BiConsumer<A, T> accumulator();
    /** Return a function that merges two result containers */
    BinaryOperator<A> combiner();
    /** Return a function that converts the intermediate result container
        into the final representation */
    Function<A, R> finisher();
    /** Special characteristics of this collector */
    Set<Characteristics> characteristics();
}
// 实现
public static <T> Collector<T, ?, List<T>> toList() {
    return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                               (left, right) -> { left.addAll(right); return left; },
                               CH_ID);
}

summarizingInt() 收集器使用的 IntSummaryStatistics 类

public class IntSummaryStatistics implements IntConsumer {
    private long count;
    private long sum;
    private int min = Integer.MAX_VALUE;
    private int max = Integer.MIN_VALUE;
    public void accept(int value) {
        ++count;
        sum += value;
        min = Math.min(min, value);
        max = Math.max(max, value);
    }
    public void combine(IntSummaryStatistics other) {
        count += other.count;
        sum += other.sum;
        min = Math.min(min, other.min);
        max = Math.max(max, other.max);
    }
    // plus getters for count, sum, min, max, and average
}
// summarizingInt() 收集器工厂
public static <T> Collector<T, ?, IntSummaryStatistics> summarizingInt(ToIntFunction<? super T> mapper) {
    return new CollectorImpl<T, IntSummaryStatistics, IntSummaryStatistics>(
            IntSummaryStatistics::new,
            (r, t) -> r.accept(mapper.applyAsInt(t)),
            (l, r) -> { l.combine(r); return l; },
            CH_ID);
}

filter

// 这段代码首先把每行的单词用 flatMap 整理到新的 Stream,然后保留长度不为 0 的,就是整篇文章中的全部单词了。
List<String> output = reader.lines()
	    .flatMap(
	            line -> Stream.of(line.split(REGEXP))
	    )
	    .filter(word -> word.length() > 0)
	    .collect(Collectors.toList());

forEach

一般认为,forEach 和常规 for 循环的差异不涉及到性能,它们仅仅是函数式风格与传统 Java 风格的差别。
forEach 不能修改自己包含的本地变量值,也不能用 break/return 之类的关键字提前结束循环。
forEach 是 terminal 操作,因此它执行后,Stream 的元素就被“消费”掉了,你无法对一个 Stream 进行两次 terminal 运算
相反,具有相似功能的 intermediate 操作 peek 可以达到上述目的

roster.stream()
    .filter(p -> p.getGender() == Person.Sex.MALE)
    .forEach(p -> System.out.println(p.getName()));

reduce

这个方法的主要作用是把 Stream 元素组合起来。它提供一个起始值(种子),然后依照运算规则(BinaryOperator),和前面 Stream 的第一个、第二个、第 n 个元素组合。
字符串拼接、数值的 sum、min、max、average 都是特殊的 reduce

有起始值的 reduce() 都返回具体的对象。没有起始值的 reduce(),由于可能没有足够的元素,返回的是 Optional

缩减(也称为折叠)技术简单、灵活,而且可并行化,还能在比命令式累加更高的抽象级别上操作

// 字符串连接,concat = "ABCD"
String concat = Stream.of("A", "B", "C", "D")
        .reduce("", String::concat);
// 求最小值,minValue = -3.0
double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0)
        .reduce(Double.MAX_VALUE, Double::min);
// 求和,sumValue = 10, 有起始值
int sumValue = Stream.of(1, 2, 3, 4)
        .reduce(0, Integer::sum);
// 求和,sumValue = 10, 无起始值
sumValue = Stream.of(1, 2, 3, 4)
        .reduce(Integer::sum).get();
// 过滤,字符串连接,concat = "ace"
concat = Stream.of("a", "B", "c", "D", "e", "F")
        .filter(x -> x.compareTo("Z") > 0)
        .reduce("", String::concat);

int sum = Stream.of(ints).reduce(0, (x,y) -> x+y);
String concatenated = strings.stream().reduce("", String::concat);// 效率低 使用StringBuilder的collect方式

limit/skip

limit 返回 Stream 的前面 n 个元素;skip 则是扔掉前 n 个元素(它是由一个叫 subStream 的方法改名而来)。

limit/skip 无法达到 short-circuiting 目的的,就是把它们放在 Stream 的排序操作后,原因跟 sorted 这个 intermediate 操作有关:此时系统并不知道 Stream 排序后的次序如何,所以 sorted 中的操作看上去就像完全没有被 limit 或者 skip 一样。
对一个 parallel 的 Steam 管道来说,如果其元素是有序的,那么 limit 操作的成本会比较大,因为它的返回对象必须是前 n 个也有一样次序的元素。取而代之的策略是取消元素间的次序,或者不要用 parallel Stream。

List<String> personList2 = persons.stream().
                map(Person::getName)
                .limit(10)
                .skip(3)
                .collect(Collectors.toList());
// name1
// name2
// name3
// name4
// name5
// name6
// name7
// name8
// name9
// name10
// [name4, name5, name6, name7, name8, name9, name10]

sorted

对 Stream 的排序通过 sorted 进行,它比数组的排序更强之处在于你可以首先对 Stream 进行各类 map、filter、limit、skip 甚至 distinct 来减少元素数量后,再排序,这能帮助程序明显缩短执行时间。
这种优化是有 business logic 上的局限性的:即不要求排序后再取值。

List<Person> personList2 = persons.stream()
		.limit(2)
		.sorted(
			(p1, p2) -> p1.getName().compareTo(p2.getName())
			)
		.collect(Collectors.toList());

min/max/distinct

min 和 max 的功能也可以通过对 Stream 元素先排序,再 findFirst 来实现,但前者的性能会更好,为 O(n),而 sorted 的成本是 O(n log n)。同时它们作为特殊的 reduce 方法被独立出来也是因为求最大最小值是很常见的操作。

// 找出最长一行的长度
BufferedReader br = new BufferedReader(new FileReader("c:\\SUService.log"));
int longest = br.lines()
	.mapToInt(String::length)
	.max()
	.getAsInt();
br.close();

match

  • allMatch:Stream 中全部元素符合传入的 predicate,返回 true
  • anyMatch:Stream 中只要有一个元素符合传入的 predicate,返回 true
  • noneMatch:Stream 中没有一个元素符合传入的 predicate,返回 true

它们都不是要遍历全部元素才能返回结果

List<Person> persons = new ArrayList();
persons.add(new Person(1, "name" + 1, 10));
persons.add(new Person(2, "name" + 2, 21));
persons.add(new Person(3, "name" + 3, 34));
persons.add(new Person(4, "name" + 4, 6));
persons.add(new Person(5, "name" + 5, 55));
boolean isAllAdult = persons.stream().
 allMatch(p -> p.getAge() > 18);
System.out.println("All are adult? " + isAllAdult);
boolean isThereAnyChild = persons.stream().
 anyMatch(p -> p.getAge() < 12);
System.out.println("Any child? " + isThereAnyChild);
// All are adult? false
// Any child? true

流的转换

// 流转换为其它数据结构
// 1. Array
String[] strArray1 = stream.toArray(String[]::new);
// 2. Collection
List<String> list1 = stream.collect(Collectors.toList());
List<String> list2 = stream.collect(Collectors.toCollection(ArrayList::new));
Set set1 = stream.collect(Collectors.toSet());
Stack stack1 = stream.collect(Collectors.toCollection(Stack::new));
// 3. String
String str = stream.collect(Collectors.joining()).toString();

用 Collectors 来进行 reduction 操作

java.util.stream.Collectors 类的主要作用就是辅助进行各类有用的 reduction 操作,例如转变输出为 Collection,把 Stream 元素进行归组。

groupingBy/partitioningBy

partitioningBy 其实是一种特殊的 groupingBy,它依照条件测试的是否两种结果来构造返回的数据结构,get(true) 和 get(false) 能即为全部的元素对象。

Map<Integer, List<Person>> personGroups = Stream.generate(new PersonSupplier())
        .limit(100)
        .collect(Collectors.groupingBy(Person::getNo));// 通过No分组
Iterator it = personGroups.entrySet().iterator();
while (it.hasNext()) {
    Map.Entry<Integer, List<Person>> persons = (Map.Entry) it.next();
    System.out.println("No " + persons.getKey() + " = " + persons.getValue().size());
}

private static class PersonSupplier implements Supplier<Person> {
    private int index = 0;
    private Random random = new Random();

    @Override
    public Person get() {
        return new Person(index++, "StormTestUser" + index, random.nextInt(100));
    }
}

Map<Boolean, List<Person>> children = Stream.generate(new PersonSupplier())
		.limit(100)
		.collect(Collectors.partitioningBy(p -> p.getAge() < 18));// 通过条件分组
System.out.println("Children number: " + children.get(true).size());
System.out.println("Adult number: " + children.get(false).size());

Spliterator

流来源有一种称为 Spliterator 的抽象来描述
JDK 中的 Collection 实现都已配备了高质量的 Spliterator 实现。

  • 包含多个元素的 ArrayList 始终可以干净且均匀地进行拆分;
  • LinkedList 的拆分效率一直很差;
  • 而且基于哈希值和基于树的数据集通常能够进行比较不错的拆分。
// tryAdvance() 方法尝试处理单个元素。如果没有元素,tryAdvance() 只会返回 false;
// 否则,它会前移游标,将当前元素传递给所提供的处理函数并返回 true。
boolean tryAdvance(Consumer<? super T> action);
// forEachRemaining() 方法处理所有剩余的元素,将它们一次一个地传递给所提供的处理函数。
void forEachRemaining(Consumer<? super T> action);
// trySplit() 的行为是尝试将剩余元素拆分为两个部分,这两部分最好具有类似的大小,
// 返回 null 意味着使用这个 Spliterator 作为来源的流将无法利用并行性来加速计算
Spliterator<T> trySplit();

流标志

来源阶段的流标志来自 spliterator 的 characteristics 位图

在某些情况下,Streams 可以使用来源和之前的操作的知识来完全省略某个操作。

流标志 解释
SIZED 流的大小已知。
DISTINCT 依据用于对象流的 Object.equals() 或用于原语流的 ==,流的元素将有所不同。
SORTED 流的元素按自然顺序排序。
ORDERED 流有一个有意义的遇到顺序

每个中间操作都对流标志具有已知的影响;一个操作可设置、清除或保留每个标志的设置。

  • filter() 操作保留 SORTED 和 DISTINCT 标志,但清除 SIZED 标志;
  • map() 操作清除 SORTED 和 DISTINCT 标志,但保留 SIZED 标志;
  • sorted() 操作保留 SIZED 和 DISTINCT 标志,但注入 SORTED 标志。

影响 spliterator 质量的考虑因素

  • spliterator 是否报告了准确的大小?
  • spliterator 能否拆分输入?
  • 它能否将输入拆分为几乎相等的部分?
  • 所拆分部分的大小是否可预测(通过 SUBSIZED 特征反映)?
  • spliterator 是否报告了所有相关特征?

导致并行执行效率降低的多个因素

  • 来源的拆分成本很高或拆分不均。
  • 合并部分结果的成本很高。
  • 问题不允许足够的可利用并行性。
  • 数据布局导致糟糕的访问位置。
  • 没有足够的数据来克服并行性的启动成本。

创建 spliterator 的最简单方法(但会导致最差的结果质量)是将 Iterator 传递给 Spliterators.spliteratorUnknownSize()。
通过将 Iterator 和一个大小传递给 Spliterators.spliterator 来获得稍微好点的 spliterator。
但是如果流性能很重要(尤其是并行性能),可以实现完整的 Spliterator 接口(包括所有适用的特征)。
集合类(比如 ArrayList、TreeSet 和 HashMap)的 JDK 来源提供了一些高质量的 spliterator 示例

Optional

使用 Optional 代码的可读性更好,而且它提供的是编译时检查,能极大的降低 NPE 这种 Runtime Exception 对程序的影响
Stream 中的 findAny、max/min、reduce 等方法等返回 Optional 值。还有例如 IntStream.average() 返回 OptionalDouble 等等。

Optional.ofNullable(text).ifPresent(System.out::println);
Optional.ofNullable(text).map(String::length).orElse(-1);

Optional<User> user...
return user.map(u -> u.getOrders()).orElse(Collections.emptyList())
 
//上面避免了我们类似 Java 8 之前的做法
if(user.isPresent()) {
  return user.get().getOrders();
} else {
  return Collections.emptyList();
}

并行和串行

所有流操作都可以顺序或并行执行,但请记住,并行性并不是高性能的原因。并行执行可能比顺序执行更快、一样快或更慢。最好首先从顺序流开始,在您知道您能够获得提速(并从中受益)时才应用并行性。

执行流程

  1. 发起终止操作时,流实现会挑选一个执行计划
    • 中间操作可划分为无状态(filter()、map()、flatMap())
    • 有状态(sorted()、limit()、distinct())
  2. 管道按顺序执行,或者并行执行,
    • 包含所有无状态操作,那么它可以在一轮中计算。
    • 否则,管道会划分为多个部分(在有状态操作边界上划分)并分多轮计算。
  3. 终止操作
    • 如果终止操作是非短路操作,那么可以批量处理数据(使用来源 spliterator 的 forEachRemaining() 方法,进一步减少访问每个元素的开销);
      • 非短路(reduce()、collect()、forEach())
    • 如果它是短路操作,则必须一个元素处理一次(使用 tryAdvance())。
      • 短路(allMatch()、findFirst())

并行和串行处理

顺序执行,Streams 构造了一个 “机器” — 一个 Consumer 对象链,其结构与管道结构相符
其中每个 Consumer 对象知道下一个阶段;当它收到一个元素(或被告知没有更多元素)时,它会将 0 或多个元素发送到链中的下一个阶段。

与 filter() 阶段有关联的 Consumer 将过滤器谓词应用于输入元素,并将它发送或不发送到下一个阶段;
与 map() 阶段有关联的 Consumer 将映射函数应用于输入元素,并将结果发送到下一个阶段。

与有状态操作(比如 sorted())有关联的 Consumer 会缓冲元素,直到它看到输入的末尾,然后将排序的数据发送到下一个阶段。
机器中的最后一个阶段将实现终止操作。如果此操作生成了结果,比如 reduce() 或 toArray(),该阶段可充当此结果的累加器。

blocks.stream()
      .map(block -> block.squash())
      .filter(block -> block.getColor() != YELLOW)
      .forEach(block -> block.display());

一个流机器的动画图像。

并行执行将会执行类似的操作,但不会创建单个机器,每个工作线程将会获取自己的机器副本并将其数据节提供给它,然后将每个线程机器的结果与其他机器的结果合并,生成最终结果。

顺序

JDK 集合的 spliterator 会根据集合的规范来设置此标志;一些中间操作可能注入 ORDERED (sorted()) 或清除它 (unordered())。

  • 许多操作(无状态中间操作和一些终止操作(比如 reduce())),遵守遇到顺序不会产生任何实际成本。
  • 其他操作(有状态中间操作,其语义与遇到顺序关联的终止操作,比如 findFirst() 或 forEachOrdered()),在并行执行中遵守遇到顺序的责任可能很重大
    • 如果流有一个已定义的遇到顺序,但该顺序对结果没有意义,那么可以通过使用 unordered() 操作删除 ORDERED 标志,加速包含顺序敏感型操作的管道的顺序执行。

更多参见

Fork/Join框架


IBM developer works

Search

    Post Directory