相比集合,流提供了一种可以在更高的概念级别上指定计算任务的数据视图。使用流时,只需指定需要完成的任务,而无需指定如何实现。

例如,要计算某属性的平均值,可以指定数据源与目标属性,然后由流库优化计算(使用多线程等)。

1. 从迭代到流

// 从迭代到流
// 创建 List
List<String> words = new ArrayList<>();
 
// 迭代写法
int count = 0;
for (String w : words) {
    if (w.length() > 10) count++;
}
 
// 流写法
long count = words.stream().filter(w -> w.length() > 10).count();

Collection 接口的 stream():将集合转换为流。

filter():生成一个新流,包含满足指定条件的元素。

count():返回流中的元素数量。

流写法只需看方法名就能明白代码的作用,可读性更好。

.stream() 改为 .parallelStream() 或对已有流调用 .parallel() 可以指定以并行方式处理。

流遵循“做什么而非怎么做”的原则,只要结果正确,操作的实现与顺序是任意的,能够优化计算。

流与集合类似,都可以获取与转换数据,它们的区别:

  • 流不存储元素,其所操作的对象可以位于集合中,也可以按需生成。
  • 流不会修改源数据,例如,filter 不会从源中删除元素,而是生成一个新的、只包含满足条件的元素的流。
  • 流的操作尽可能惰性执行(直到需要计算结果时才执行计算),例如,要求查找前 5 个符合要求的单词时,流将在找到第 5 个目标后立即停止计算。

2. 流的创建

静态方法 Stream::of:将数组转换为流。

Stream<String> words = Stream.of(contents.split("\\PL+"));
 
// 其参数个数可变, 可以创建有任意数量引元的流
Stream<String> song = Stream.of("gently", "down", "the", "stream");

静态方法 Arrays::stream:将数组转换为流,且可指定只使用一个子数组。

Stream<String> someWords = Arrays.stream(contents, 0, 10);

静态方法 Stream::empty:创建不含任何元素的流。

Stream<String> silence = Stream.empty();

Stream 接口有两个用于创建无限流的静态方法:

  • generate():接收一个不含任何引元的方法(技术上,是一个 Supplier<T> 接口对象),通常在需要一个流类型的值时调用。
Stream<String> echos = Stream.generate(() -> "Echo");
// 生成一个随机数的流
Stream<Double> randoms = Stream.generate(Math::random);
  • iterate():接收一个“种子”值与一个方法(技术上,是一个 UnaryOperator<T> 接口对象),迭代式地对前一次的结果再次调用该方法。
Stream<BigInteger> integers = Stream.iterate(BigInteger.ZERO, n -> n.add(BigInteger.ONE));

在生成的流中,第 1 个元素为种子 BigInteger.ZERO,第 2 个为 f(BigInteger.ZERO),第 3 个为 f(f(BigInteger.ZERO)),以此类推。

谓词(predicate):函数式接口 Predicate(见 [4 2. 3) 函数式接口])与内部类 的实例,一个返回 truefalse 的表达式。

谓词可以描述 iterate 的迭代操作应何时结束,用以生成有限流。

BigInteger aMillion = new BigInteger("1,000,000");
Predicate<BigInteger> lessThanAMillion = n -> n.compareTo(aMillion) < 0; // 谓词
Stream<BigInteger> integers = Stream.iterate(BigInteger.ZERO, filter(lessThanAMillion), n -> n.add(BigInteger.ONE));
 
// 用内嵌 lambda 改写为
Stream<BigInteger> integers = Stream.iterate(BigInteger.ZERO, n -> n.compareTo(aMillion) < 0, n -> n.add(BigInteger.ONE));

如果某次迭代的结果没有通过谓词的检查条件,流的生成将终止。

方法Stream.ofNullable:由指定对象生成一个短流,如果该对象为null,流的长度为0,否则为1(只包含该对象),它常与方法flatMap结合(见[11 7. 7) 将Optional转换为流])。

方法toList:将流中的元素收集到一个List中。与count相同,toList将在流的生成终止时执行。

方法limit:将流的最大长度限制为指定整数,超出时,流的生成将终止。

/*limit的使用*/
Stream<Double> randoms = Stream.generate(Math::random).limit(10);

将其他类型的对象转换为流:

//非集合的Iterable对象
StreamSupport.stream(iterable.spliterator(), false);
 
//Iterator对象
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);

流操作分为中间操作与终止操作:

  • 中间操作:返回一个流的操作,例如筛选、映射、排序,可以连接形成一个“流水线”,但不会生成结果。
  • 终止操作:返回一个非流的操作,例如forEach、reduce、collect,可以触发中间操作的执行,并生成结果。

还分为干涉操作与非干涉操作:

  • 干涉操作(intermediate operation):在“流水线”中修改流中的元素或产生副作用的中间操作,包括filter、distinct、sorted、map、flatMap等,它们可以修改流的元素或结构,或者产生副作用,例如修改全局变量、打印日志等。
  • 非干涉操作(non-interference operation):不修改流的元素或结构、而只是生成一个新流的操作,包括limit、skip、forEach、reduce、collect等终止操作以及peek等中间操作。

应避免在同一个“流水线”中使用多个干涉操作,因为这将导致难以理解和调试的行为。使用干涉操作时,应单独调用,并将结果传递给下一个“流水线”。相反,应尽可能通过链式调用将多个非干涉操作组合在一起,以提高代码的可读性和可维护性。

/*流操作的使用*/
List<String> wordList = ...;
Stream<String> words = wordList.stream();
 
//不推荐的代码,不过可以正常运行
wordList.add("END");
long n = words.distinct().count();
 
//错误的代码
words.forEach(s -> {} if (s.length() < 12) wordList.remove(s); {});
/*流的使用*/
import java.io.*;
import java.math.*;
import java.nio.file.*;
import java.util.*;
import java.util.regex.Pattern;
import java.util.stream.*;
 
public class CreatingStreams
{
    public static <T> void show(String title, Stream<T> stream)
    {
        final int SIZE = 10;
        List<T> firstElements = stream
            .limit(SIZE + 1)
            .toList();
        System.out.print(title + ": ");
        for (int i = 0; i < firstElements.size(); i++)
        {
            if (i > 0) System.out.print(", ");
            if (i < SIZE) System.out.print(firstElements.get(i));
            else System.out.print("...");
        }
        System.out.println();
    }
    
    public static void main(String[] args) throws IOException
    {
        Path path = Path.of("D:\\NOTICE.txt");
        var contents = Files.readString(path);
        
        Stream<String> words = Stream.of(contents.split("\\PL+"));
        show("words", words);
        Stream<String> song = Stream.of("a", "an", "and", "the");
        show("song", song);
        Stream<String> silence = Stream.empty();
        show("silence", silence);
        
        Stream<String> echos = Stream.generate(() -> "Echo");
        show("echos", echos);
        
        Stream<Double> randoms = Stream.generate(Math::random);
        show("randoms", randoms);
        
        Stream<BigInteger> integers = Stream.iterate(BigInteger.ONE,
            n -> n.add(BigInteger.ONE));
        show("integers", integers);
        
        Stream<String> greetings = "Hello\nGuten Tag\nBonjour".lines();
        show("greetings", greetings);
        
        Stream<String> wordsAnotherWay = Pattern.compile("\\PL+").splitAsStream(contents);
        show("wordsAnotherWay", wordsAnotherWay);
        
        try (Stream<String> lines = Files.lines(path))
        {
            show("lines", lines);
        }
        
        Iterable<Path> iterable = FileSystems.getDefault().getRootDirectories();
        Stream<Path> rootDirectories = StreamSupport.stream(iterable.spliterator(), false);
        show("rootDirectories", rootDirectories);
        
        Iterator<Path> iterator = Path.of("D:\\NOTICE.txt").iterator();
        Stream<Path> pathComponents = StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);
        show("pathComponents", pathComponents);
    }
}

3. map与flatMap

方法map:按指定转换逻辑转换所有流元素,生成一个新流。

/*map的使用*/
Stream<String> lowercaseWords = words.stream().map(String::toLowerCase);
 
//用lambda改写为
Stream<String> lowercaseWords = words.stream().map(s -> s.toLowerCase());

假定方法codePoints,它返回一个String流(Stream<String>),包含指定字符串的所有的Unicode码点,例如调用 codePoints("Hello🌐") 将得到流 ["H", "e", "l", "l", "o", "🌐"]

如果调用 .map(codePoints)

Stream<Stream<String>> result = words.stream().map(w -> codePoints(w));

将得到一个嵌套流(Stream<Stream<T>>,例如 [...["h", "i"], ["m", "e"]...])。

方法flatMap:类似于map,不过最后将多个流合并为一个。

Stream<String> flatResult = words.stream().flatMap(w -> codePoints(w));

实际上,String类中有方法codePoints,它返回一个IntStream,调用 "Hello🌐".codePoints() 将返回IntStream [72, 101, 108, 108, 111, 32, 127760]

IntStream与Stream<Integer>略有不同:

  • IntStream为基本类型流,是一种特殊类型的流,专用于处理基本类型元素。它提供一组专门处理基本类型元素的操作方法,例如求和、平均值、最大值、最小值。
  • Stream<Integer>为包装类型流,是通用的泛型流,可以处理任何类型的元素。它提供一组可以处理任何类型元素的操作方法,例如过滤、映射、排序。
  • 基本类型流在处理基本类型元素时比包装类型流更高效,而包装类型流适用范围更广。

方法mapTo(Int|Long|Double):将包装类型流转换为基本类型流。

/*基本类型流与包装类型流的使用*/
//基本类型流
IntStream intStream = IntStream.of(1, 2, 3, 4, 5);
int sum = intStream.sum();
 
//包装类型流
Stream<Integer> integerStream = Stream.of(1, 2, 3, 4, 5);
int sum = integerStream.mapToInt(Integer::intValue).sum();

例如,将字符串流中元素长度转换为一个IntStream:

Stream<String> words = ...;
IntStream lengths = words.mapToInt(String::length);

方法mapToObj/boxed:将基本类型流转换为包装类型流。

/*用String.codePoints与mapToObj实现自定义方法codePoints*/
public static Stream<String> codePoints(String s)
{
    return s.codePoints().mapToObj(cp -> new String(new int [] { cp }, 0, 1));
}
/*boxed的使用*/
Stream<Integer> integers = IntStream.range(0, 100)
                                    .boxed();

使用flatMap时需要提供一个方法,为每个流元素生成一个新流,这样做效率较低。

方法mapMulti提供了另一种选择,与其产生由结果构成的流,它将所有生成的结果通过调用方法accept传递给一个收集器(collector/consumer)(一个函数式接口Consumer对象)。

/*Consumer的定义*/
public interface Consumer<T>
{
    void accept(T, t);
}

调用mapMulti时,需要提供一个在流元素与收集器上调用的方法,其调用结果将传递给收集器。

/*mapMulti的使用*/
Consumer<Integer> collector = ...;
List<Integer> numbers = List.of(1, 2, 3, 4, 5);
numbers.stream()
       .mapMulti((num, collector) ->
            {
                collector.accept(num * 2);
                collector.accept(num * 3);
            })
       .forEach(System.out::println);

4. 子流与组合流

方法limit:限制流的长度,调用 stream.limit(n) 将生成一个新流,只包含源流的前 n 个元素。

/*limit的使用*/
Stream<Double> randoms = Stream.generate(Math::random).limit(100);

方法skip:从头开始跳过流中指定数量的元素,调用 stream.skip(n) 将生成一个新流,不包含源流的前 n 个元素。如果源流中元素数量不大于 n,则生成空流。

/*skip的使用*/
Stream<String> words = Stream.of(contents.split("\\PL+").skip(1));

方法takeWhile:类似于filter,takeWhile也接收一个谓词作为参数,用于过滤流元素。

filter与takeWhile的区别:

  • filter对所有流元素进行检查,保留满足谓词的元素,并将其传递给下一个操作。
  • takeWhile对流元素依次检查,一旦遇到不满足谓词的元素,将停止处理,并返回在其之前的所有元素(即连续的若干个满足谓词的元素)。
/*takeWhile的使用*/
Stream<String> str = Stream.of("1", "23", "54", "6", "Jack");
Stream<String> consecutiveIncreasingNumbers = str.takeWhile(
    s -> "0123456789".contains(s)); //["1", "23"]

方法dropWhile:类似于takeWhile,dropWhile也对流元素依次检查,不过它在遇到不满足谓词的元素时将停止检查并返回从该元素开始的所有元素(即去除连续的若干个满足谓词的元素)。

/*dropWhile的使用*/
Stream<String> str = Stream.of("1", "23", "54", "6", "Jack");
Stream<String> notConsecutiveIncreasingNumbers = str.dropWhile(
    s -> "0123456789".contains(s)); //["54", "6", "Jack"]

静态方法Stream.concat:拼接两个流。

/*Stream.concat的使用*/
Stream<String> combined = Stream.concat(Stream.of("Hello", " "),
                                        Stream.of("Java"));

第一个流参数不能为无限流(语法上合法,但如果这样做,第二个流将无法参与拼接)。

5. 其他转换

方法distinct:生成一个新流,从原流中剔除重复元素。

/*distinct的使用*/
Stream<String> uniqueWords = Stream.of("a", "b", "c", "a")
                                   .distinct();

方法sorted:生成一个新流,将原流的元素排序。

/*sorted的使用*/
Stream<String> longestFirst = words.sorted(Comparator.comparing(String::length)
                       .reversed());

方法peek:生成一个新流,元素与原流相同,不过每次获取元素时对其调用指定函数。在调试时很方便。

/*peek的使用*/
Object[] powers = Stream.iterate(1.0, p -> p * 2)
                        .peek(e -> System.out.println("Fetching" + e))
                        .limit(20).toArray();

peek在其后的中间操作(如map、filter)被调用时才执行,且不会在终止操作之后执行。

6. 约简

约简(reduction):将流元素聚合为一个非流值,以便后续使用。约简是一种终止操作。

上文中的count即为一种约简,其他约简还有max、min等,它们的返回值类型为Optional<T>(见[11 7. Optional类型])。

/*max的使用*/
Optional<String> largest = words.max(String::compareToIgnoreCase);
System.out.println("largest: " + largest.orElse(""));

方法findFirst:返回流的第一个元素,常与filter组合使用。

/*findFirst的使用*/
Optional<String> firstStartsWithQ = words.filter(s -> s.startsWith("Q"))
                                         .findFirst(); //首个以Q开头的词

方法findAny:类似于findFirst,不过不限制元素位置为第一个。

/*findAny的使用*/
Optional<String> anyStartsWithQ = words.parallel()
                                       .filter(s -> s.startsWith("Q"))
                                       .findAny(); //某个以Q开头的词

在串行流中,findAny与findFirst都将返回第一个元素;但在这里,words被转换为并行流处理,findAny将返回最快完成任务的线程所处理的元素,不一定是第一个元素。

方法anyMatch:检查流中是否存在满足谓词的元素,返回值类型为boolean。

/*anyMatch的使用*/
boolean ifAnyWordStartsWithQ = words.parallel()
                                    .anyMatch(s -> s.startsWith("Q"));

方法allMatch/noneMatch:检查是否所有元素/没有元素满足谓词。

7. Optional

Java 8中引入了Optional类,用于表示一个值可能存在、也可能不存在的情况。它是一个容器对象,可以包含一个非空的值或者空值。与值引用相比,Optional对象可以更加优雅地处理可能存在空值的情况。

1) 获取Optional对象

有效地使用Optional的关键是要使用一种正确的方法,它在值存在时使用该值,而在值不存在时产生一个替代物。

方法Optional.of:创建一个包含非空值的Optional对象。

/*Optional.of的使用*/
String str = "Java";
Optional<String> optional = Optional.of(str);

值为空时,调用 Optional.of 将抛出NullPointerException。

方法Optional.ofNullable:根据值是否为空,创建一个包含值或空值的Optional对象。调用 Optional.ofNullable(obj) 将在 obj 非null时返回 Optional.of(obj),否则返回 Optional.empty()

在值为空时,通常以默认值代替:

  • 方法orElse:指定Optional对象值为空时的默认值。
/*orElse的使用*/
String result = optional.orElse("");
  • 方法orElseGet:接收一个Supplier对象(见[4 2. 3) 函数式接口),在Optional对象值为空时返回该Supplier对象的返回值。
/*orElseGet的使用*/
String result = optional.orElseGet(
    () -> System.getProperty("myapp.default"));
  • 方法orElseThrow:在Optional对象值为空时抛出异常。
/*orElseThrow的使用*/
String result = optional.orElseThrow(IllegalStateException::new);

2) 使用Optional对象

方法ifPresent:接收一个方法,如果Optional对象值非空,将其传递给该方法,否则什么都不做。

例如,在该值存在时将其添加到结果集合中:

/*ifPresent的使用*/
optional.ifPresent(v -> results.add(v));
//或者
optional.ifPresent(results::add);

方法ifPresentOrElse:接收两个方法,分别在Optional对象值非空/为空时调用。

/*ifPresentOrElse的使用*/
optional.ifPresentOrElse(
    v -> System.out.println("Found " + v),
    () -> logger.warning("No match"));

方法or:将空Optional替换为一个可替代的Optional,后者以惰性方式计算。

/*or的使用*/
Optional<String> result = optional.or(
    () -> alternatives.stream()
                      .findFirst());

optional 非空时将其赋给 result,否则赋以lambda的计算结果。

3) “流水线化”处理Optional对象

map与filter也可以处理Optional对象。类似于流操作方法的链式调用,Optional对象的操作方法也可以“流水线化”。

/*链式调用Optional对象的操作方法*/
Optional<String> transformed = optional.map(String::toUpperCase)
                                       .filter(s -> s.contains("E"));
                                       //optional不满足谓词时transformed为空
 
transformed.map(results::add); //transformed非空时添加到结果集合

假设有一个返回Optional<T>对象的方法f,以及一个目标类型为T、返回Optional<U>对象的方法g,组合调用 s.f().g() 无法工作,因为 s.f() 返回值类型为Optiona<T>而非T。此时需调用flatMap。

Optional<U> result = s.f().flatMap(T::g);

4) 不适合使用Optional对象的情况

方法get:在Optional对象非空时获得其中包装的元素,否则抛出一个NoSuchElementException。

//1
Optional<T> optional = ...;
optional.get()./*某种方法*/;
 
//2
T value = ...;
value./*某种方法*/;
 
//1并不比2更安全

方法isPresent/isEmpty:检查某个Optional对象是否为空。

//1
if (optional.isPresent()) optionalValue.get()./*某种方法*/;
 
//2
if (value != null) value./*某种方法*/;
 
//1并不比2更安全

正确使用Optional类型:

  • Optional类型变量永远不应为null。
  • 不要使用Optional类型的字段。在类内,应使用null表示缺失的字段。
  • 不要使用Optional类型的方法参数,而应编写包含与不包含该参数的两个重载方法。
  • 不要在集合中使用Optional对象,并且不要用作Map的键。

5) 将Optional转换为流

方法stream:将一个Optional<T>对象转换为一个具有0个或1个元素的Stream<T>对象。

假定一个用户ID流以及方法lookup:

Optional<User>
lookup(String id)

要在获取用户流时跳过无效ID,可以过滤后对有效ID调用get:

Stream<String> ids = ...;
Stream<User> users = ids.map(Users::lookup)
                        .filter(Optional::isPresent)
                        .map(Optional::get);

这样用到了上文中警告过需要慎用的isPresent与get,以下调用更加优雅:

Stream<User> users = ids.map(Users::lookup)
                        .flatMap(Optional::stream);

假定方法Users.classicLookup(id)将返回一个User对象或null,可以过滤掉null:

Stream<User> users = ids.map(Users::classicLookup)
                        .filter(Objects::nonNull);

也可以调用flatMap:

Stream<User> users = ids.flatMap(
    id -> Stream.ofNullable(Users.classicLookup(id)));
//或者
Stream<User> users = ids.map(Users::classicLookup)
    .flatMap(Stream::ofNullable);

8. 收集结果

对流处理完毕后,可以调用方法iterator创建用以访问元素的经典迭代器,也可以调用方法forEach对每个元素应用指定函数:

stream.forEach(System.out::println);

在并行流上,forEach将按任意顺序访问元素,如需按元素的原顺序处理,可以调用方法forEachOrdered,不过同时也将失去并行处理的优势。

方法toList/toArray/…:由流元素生成List/Array/…。

由于泛型数组无法在运行时创建,调用 stream.toArray() 将返回一个Object[]数组,可以将目标类型传递给数组构造器:

String[] result = stream.toArray(String[]::new);

方法collect:将流元素收集到另一个目标中,它接收一个Collector接口对象。

收集器(collector):收集众多元素并生成单一结果的对象,Collectors类提供了大量用于生成常见收集器的工厂方法。

在toList/toSet被添加到Java 16之前,必须使用由方法Collectors.toList/toSet生成的收集器:

List<String> result = stream.collect(Collectors.toList());
 
Set<String> result = stream.collect(Collectors.toSet());

如需控制获得的Set的种类,可以调用方法toCollection并传递一个目标类型的构造器方法引用。

TreeSet<String> result = stream.collect(Collectors.toCollection(TreeSet::new));

方法joining:将所有流元素连接为一个字符串。

/*joining的使用*/
String result = stream.collect(Collectors.joining());
 
String result = stream.collect(Collectors.joining(", ")); //设置分隔符

如果流中包含非字符串元素,需先转换为字符串:

String result = stream.map(Object::toString)
                      .collect(Collectors.joining(", "));

方法summarizing(Int|Long|Double):将流的结果约简为总和、计数、平均值、最大值或最小值。

IntSummaryStatistics summary = stream.collect(
    Collectors.summarizingInt(String::length));
double averageWordLength = summary.getAverage();
double maxWordLength = summary.getMax();

方法toMap有两个方法引元,用以产生key与value。

public record Person(int id, String name) {}
...
Map<Integer, String> idToName = people.collect(
    Collectors.toMap(Person::id, Person::name));
Map<Integer, Person> idToPerson = people.collect(
    Collectors.toMap(Person::id, Function.identity()));

当key冲突时,collector将抛出一个IllegalStateException,可以提供第三个方法引元以覆盖,它应该返回已有值、新值或它们的组合。

如需指定类型为TreeMap,可以提供第四个构造器方法引元。

Map<Integer, Person> idToPerson = people.collect(
    Collectors.toMap(
        Person::id,
        Function.identity(),
        (existingValue, newValue) -> { throw new IllegalStateException(); },
        TreeMap::new));

9. 群组与分区

P27-28

10. 收集器方法

方法groupingBy:用于根据给定的条件对流中的元素进行分组操作,接收一个方法参数作为分组依据,返回一个Collector对象,后者可以用于Stream.collect。

/*groupingBy的使用*/
List<String> words
    = Arrays.asList("apple", "banana", "cat", "dog", "elephant");
Map<Integer, List<String>> groups
    = words.stream()
           .collect(Collectors.groupingBy(String::length));
System.out.println(groups);
/*控制台输出*/
{3=[cat, dog], 5=[apple], 6=[banana], 8=[elephant]}

Java提供了多种可以将收集到的元素约简为数字的收集器方法:

  • 方法counting:返回收集到的元素的个数。
  • 方法summing(Int|Long|Double)/averaging(Int|Long|Double):接收一个方法引元,对下游元素调用,返回其和/平均值。
  • 方法maxBy/minBy:接收一个comparator,返回下游元素中的最大值/最小值。

方法collectingAndThen:在收集器后添加一个最终处理操作。

例如,如需获取互异的结果总数,可以将其收集到Set中,然后对后者调用size:

/*collectingAndThen的使用*/
Map<Character, Integer> stringCountsByStartingLetter = strings.collect(
    groupingBy(s -> s.charAt(0),
        collectingAndThen(toSet(), Set::size)));

方法mapping:对收集到的元素调用指定方法,并将结果传递给其下游收集器。

例如,对字符串按首字母分组,在每组内部计算字符串长度,并将这些长度收集到Set中:

/*mapping的使用*/
Map<Character, Set<Integer>> stringLengthsByStartingLetter = strings.collect(
    groupingBy(s -> s.charAt(0),
        mapping(String::length, toSet())));

方法filtering:将一个filter应用到每个组上。

Map<String, Set<City>> largeCitiesByState = cities.collect(
    groupingBy(City::state,
        filtering(c -> c.population() > 50,0000,
            toSet())));

方法teeing:将两个收集器的结果合并为一个。

例如,需要收集城市名并计算平均人口,可以用teeing执行两次计算,然后将结果组合:

/*teeing的使用*/
record Pair<S, T>(S first, T second) {}
Pair<List<String>, Double> result = cities.filter(c -> c.state().equals("NW"))
.collect(teeing(
    mapping(City::name, toList()), //第一收集器
    averagingDouble(City::population), //第二收集器
    (list, avg) -> new Result(list, avg))); //结果组合方法

收集器的组合使用是一种强大的方式,但也可能导致复杂的表达式,最佳用法是与groupingBy和partitioningBy一起处理下游Map中的值,否则,应直接对流调用map、reduce、count、max与min等方法。

11. 约简操作

方法reduce:将流中的元素按照指定的操作进行累积、合并或求值,接收一个二元操作参数,并从前两个元素开始持续对流元素操作。

给定约简操作 op,约简将产生 s_1 op s_2 op s_3 op ...,其中 s_i op s_{i + 1} 表示方法调用 op(s_i, s_{i + 1})

例如,对所有流元素求和:

/*reduce的使用*/
List<Integer> values = ...;
Optional<Integer> sum = values.stream()
                              .reduce((x, y) -> x + y);

此处的lambda可以改写为方法引用 .reduce(Integer::sum)。当 values 为空时,reduce将返回一个Optional对象。

约简的实际应用有求和、乘积、字符串连接、求最值、求并集/交集等。

对并行流约简时,约简操作必须满足结合律,即 (x op y) op z 需等于 x op (y op z),以保证结果与元素顺序无关。

幺元值:满足 e op x = x 的值 e,通常作为计算起点,例如,0为加法的幺元值。

/*幺元值的使用*/
Integer sum = values.stream()
                    .reduce(0, (x, y) -> x + y);

与上文不同,由于此处使用了幺元值0,当 values 为空时,reduce将返回封装了0的包装器类对象,而不是Optional对象。

12. 基本类型流

概念见[11 3. map与flatMap]

方法IntStream.of/Arrays.stream:创建IntStream。

IntStream stream = IntStream.of(1, 1, 2, 3, 5);
//或者
int[] values = {1, 1, 2, 3, 5};
IntStream stream = Arrays.stream(values, 0, 4);

对象类型流的方法generate与iterate同样适用于基本类型流。

静态方法range/rangeClosed:生成步长为1的整数区间。

/*range/rangeClosed的使用*/
IntStream zeroToNinetyNine = IntStream.range(0, 100);
IntStream zeroToOneHundred = IntStream.rangeClosed(0, 100);

13. 并行流

方法parallelStream:生成并行流。

/*parallelStream的使用*/
List<String> words = ...;
Stream<String> parallesWords = words.parallelStream();

方法parallel:由已有流生成并行流。

/*parallel的使用*/
Stream<String> words = ...;
Stream<String> parallelWords = words.parallel();

执行终止操作时,如果流处于并行模式,所有中间操作都将并行化。

使用并行流的原则为其返回结果必须与顺序操作的返回结果一致,这要求无状态的、可以以任意顺序执行的操作。

例如,对字符串流中的短单词计数:

var shortWords = new int[12];
words.parallelStream().forEach(
    s -> { if (s.length() < 12) shortWords[s.length()]++; });
System.out.println(Arrays.toString(shortWords));

这段代码非常糟糕,传递给forEach的lambda将在多个并发线程中运行,每一个线程都将更新同一个数组 shortWords,造成混乱的竞争。

要安全地并行化此操作,可以以长度对字符串分组,然后分别计数:

Map<Integer, Long> shortWordCounts
    = words.parallelStream()
           .filter(s -> s.length() < 12)
           .collect(groupingBy(
               String::length,
               counting()));