Java8Stream流Api学习



注意:

  • parallelStream 并行流,线程不安全。而且执行效率要看你的电脑cup总数,默认线程数和cpu总数相当。
  • stream 单管,效率不及parallelStream

常见函数:

  • filter()、map()、flatMap()、mapToInt()、sorted()、reduce()、limit()、skip()、groupingBy()、allMatch()、findFirst()、sum()、max()、min()、average()、distinct()、count(返回流中元素个数)等……

首先我们先创建基础支撑数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public class Main {

public static void main(String[] args) {
Employee e1 = new Employee(3, 43, "M", null);
Employee e2 = new Employee(2, 23, "F", null);
Employee e3 = new Employee(4, 53, "M", null);
Employee e4 = new Employee(8, 33, "M", null);
Employee e5 = new Employee(9, 23, "F", null);
List<Employee> list = Arrays.asList(e1, e2, e3, e4, e5);
}
}

/**
* Employee类
**/
public class Employee {
private Integer id;
private Integer age;
private String gender;
List<Employee> children;

/**
* filter的谓词逻辑:
* 年龄大于30
*/
public static Predicate<Employee> ageGreaterThan30 = x -> x.getAge() > 30;
public static Predicate<Employee> genderM = x -> x.getGender().equals("M");


@Override
public String toString() {
return "Employee{" +
"id=" + id +
", age=" + age +
", gender='" + gender + '\'' +
", children=" + children +
'}';
}

public Employee(Integer id, Integer age, String gender, List<Employee> children) {
this.id = id;
this.age = age;
this.gender = gender;
this.children = children;
}

public List<Employee> getChildren() {
return children;
}

public void setChildren(List<Employee> children) {
this.children = children;
}

public void setId(Integer id) {
this.id = id;
}

public void setAge(Integer age) {
this.age = age;
}

public void setGender(String gender) {
this.gender = gender;
}

public Integer getId() {
return id;
}

public Integer getAge() {
return age;
}

public String getGender() {
return gender;
}

}

数组创建流

构造流:Stream.of()、Arrays.stream()、Arrays.asList()

1、Stream.of()是通用的,而Arrays.stream()不是

2、Arrays.stream():适用于int [],long []和double []类型的原始数组,并分别返回IntStream,LongStream和DoubleStream

3、Stream.of():返回类型T的通用Stream(Stream)

1
2
3
4
5
6
7
8
9
10
11
12
String[] arr = {"Geek", "for", "Geeks"};
// 可以用与遍历数组
Stream<String> array = Arrays.stream(arr);
Stream<String> stream = Stream.of(arr);
// %s:字符串的占位符
array.forEach(str -> System.out.printf("%s \t", str));
stream.forEach(str -> System.out.printf("%s ", str));
// 将数组转换成集合
List<String> collect = Arrays.asList(arr).parallelStream().collect(Collectors.toList());
System.err.println(new ArrayList<>(Arrays.asList(arr)));
// 快于new ArrayList<>(Arrays.asList(arr))
System.out.println("多线程:" + collect);

Java 装箱流

1、数值流

三个原始类型流:

  • IntStream

  • DoubleStream

  • LongStream

以IntStream为例:

  • IntStream.of()
  • IntStream.range()前闭后开、不包含结束位,startInclusive >= endExclusive,range(1, 100) :[1, 100)
  • IntStream.rangeClosed()闭区间、包含结束位,startInclusive > endExclusive,rangeClosed(1, 100) :[1, 100]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 1,2,3
IntStream.of(new int[]{1, 2, 3}).forEach(e -> {
System.out.println("IntStream.of:" + e);
});
// 1,2: 不包含结束位,startInclusive >= endExclusive
IntStream.range(1, 3).forEach(e -> {
System.out.println("IntStream.range:" + e);
});
// 包含结束位,startInclusive > endExclusive
// 1,2,3:IntStream rangeClosed (int startInclusive,int endInclusive)
// IntStream:原始int值元素的序列。startInclusive:包含的初始值。endInclusive:包含的上限。
IntStream.rangeClosed(1, 3).forEach(e -> {
System.out.println("IntStream.rangeClosed:" + e);
});
//计算1~100之间的数字中偶数的个数
long count = IntStream.rangeClosed(1, 100).filter(n -> n % 2 == 0).count();
System.out.println(count);

//将stream转换为IntStream
int totalScore = list.stream().mapToInt(Employee::getAge).sum();
System.out.println(totalScore);

//计算平均年龄
OptionalDouble avgAge = list.stream().mapToInt(Employee::getAge).average();
System.out.println(avgAge.getAsDouble());

2、Collectors类处理

将字符串流转换为列表

1
2
List<String> strings = Stream.of("this", "is", "a", "list", "of", "strings")
.collect(Collectors.toList());

然而,同样的过程并不适合处理 基本类型流

解决方法:

一:利用 boxed 方法,将 IntStream 转换为 Stream

1
2
3
List<Integer> ints = IntStream.of(3, 1, 4, 1, 5, 9)
.boxed()
.collect(Collectors.toList());

二:利用 mapToObj 方法,将基本类型流中的每个元素转换为包装类的一个实例

1
2
3
List<Integer> ints = IntStream.of(3, 1, 4, 1, 5, 9)
.mapToObj(Integer::valueOf)
.collect(Collectors.toList())

三:采用 collect 方法的三参数形式

1
2
3
4
5
6
7
8
9
10
Supplier: 是 ArrayList<Integer> 的构造函数。
accumulator: 累加器(accumulator)为 add 方法,表示如何为列表添加单个元素。
combiner: 仅在并行操作中使用的组合器(combiner)是 addAll 方法,它能将两个列表合二为一
-----------------------------------------
<R> R collect(Supplier<R> supplier,
ObjIntConsumer<R> accumulator,
BiConsumer<R,R> combiner)

List<Integer> ints = IntStream.of(3, 1, 4, 1, 5, 9)
.collect(ArrayList<Integer>::new, ArrayList::add, ArrayList::addAll);

延伸:将 IntStream 转换为 intArray

1
int[] intArray = IntStream.of(3, 1, 4, 1, 5, 9).toArray();

流转换成其他数据结构

1
2
3
4
5
6
7
8
9
// 1. Array
String[] strArray1 = stream.toArray(String[]::new);
// 2. Collection
List<String> list1 = stream.collect(Collectors.toList());
List<String> list2 = stream.collect(Collectors.toCollection(ArrayList::new));
Set set1 = stream.collect(Collectors.toSet());
Stack stack1 = stream.collect(Collectors.toCollection(Stack::new));
// 3. String
String str = stream.collect(Collectors.joining()).toString();

过滤、排序:filter、sorted

1
2
3
4
5
6
7
8
9
List<Employee> demo = list.parallelStream()
// 枚举类型:x -> x.getAge() > 30
.filter(Employee.ageGreaterThan30)
// Employee::getId => e -> e.getId()
// comparing(Employee::getId) 正序
// comparing(Employee::getId).reversed:倒序
.sorted(comparing(Employee::getId))
// 将stream流转换成list集合
.collect(Collectors.toList());

排序、map映射:filter、map

1
2
3
4
5
6
7
List<Integer> demo = list.stream()
.filter(Employee.ageGreaterThan30)
.map(e -> {
// 平方数
return e.getId() * e.getId();
})
.collect(Collectors.toList());

求取操作后的id的和:mapToInt、sum

1
2
3
4
int sum = list.stream()
// Integer -> int
.mapToInt(Employee::getId)
.sum();

一对多的扁平化处理成一行数据:flatMap

flatMap 把 input Stream 中的 层级结构扁平化,就是将最底层元素抽出来放到一起,最终 output 的。新 Stream 里面已经没有 List 了,都是直接的数字,然后通过 collect(Collectors.toList()转成list集合。

1
2
3
4
5
6
7
8
9
Stream<List<Integer>> inputStream = Stream.of(
Collections.singletonList(1),
Arrays.asList(2, 3),
Arrays.asList(4, 5, 6)
);

List<Integer> outputStream = inputStream.
flatMap(Collection::stream).collect(Collectors.toList());
// [1, 2, 3, 4, 5, 6]

留下偶数

1
2
3
4
5
6
7
8
Integer[] sixNums = {1, 2, 3, 4, 5, 6};
String[] evens =
Stream.of(sixNums).filter(n -> n % 2 == 0)
// Number,String的父级都是Object
.map(Object::toString).
toArray(String[]::new);
// 不然就是hashcode地址
System.out.println(Arrays.toString(evens));

findFirst()

findFirst():总是返回 Stream 的第一个元素,或者空
findAny():找到其中一个元素 (使用 stream() 时找到的是第一个元素;使用 parallelStream() 并行时找到的是其中一个元素)。

使用 Optional 代码的可读性更好,而且它提供的是 编译时检查,能极大的降低 NPE 这种 Runtime Exception 对程序的影响,或者迫使程序员更早的在编码阶段处理空值问题,而不是留到运行时再发现和调试。

1
2
3
Optional<Employee> optional = list.parallelStream().findFirst();
// optional.get(): 获取到optional中的对象
Optional.of(optional.get()).ifPresent(System.out::println);

reduce()

这个方法的主要作用是把 Stream 元素组合起来。它提供一个起始值(种子),然后依照运算规则(BinaryOperator),和前面 Stream 的第一个、第二个、第 n 个元素组合。从这个意义上说,字符串拼接、数值的 sum、min、max、average 都是特殊的 reduce。例如 Stream 的 sum 就相当于

Integer sum = integers.reduce(0, (a, b) -> a+b); 或

Integer sum = integers.reduce(0, Integer::sum);

没有起始值的情况,这时会把 Stream 的前面两个元素组合起来,返回的是 Optional, 通过 .get()获取数据类型

1
2
3
4
5
6
7
8
9
10
11
12
13
// 字符串连接,concat = "ABCD"
String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat);
// 求最小值,minValue = -3.0
double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
// 求和,sumValue = 10, 有起始值
int sumValue = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum);
// 求和,sumValue = 10, 无起始值
sumValue = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get();
// 过滤,字符串连接,concat = "ace"
concat = Stream.of("a", "B", "c", "D", "e", "F")
// compareTo:比参数对象大,则返回+1,如果相等,则返回0,如果比参数小,则返回-1
.filter(x -> x.compareTo("Z") > 0)
.reduce("", String::concat);

limit()、skip()

  • limit 返回 Stream 的前面 n 个元素
  • skip 则是扔掉前 n 个元素(它是由一个叫 subStream 的方法改名而来)。
1
2
3
4
5
6
7
8
9
10
List<Employee> employees = new ArrayList();
for (int i = 1; i <= 10000; i++) {
// [emp4, emp5, emp6, emp7, emp8, emp9, emp10]
Employee employee = new Employee((int) System.currentTimeMillis(), i, "emp" + i, null);
employees.add(employee);
}
// 一万条数据中取10条,扔掉前3条数据,返回集合。
List<String> personList2 = employees.stream().
map(Employee::getGender).limit(10).skip(3)
.collect(Collectors.toList());

找出最长一行的长度:max

1
2
3
4
5
6
7
BufferedReader br = new BufferedReader(new FileReader("c:\\SUService.log"));
int longest = br.lines().
mapToInt(String::length).
max().
getAsInt();
br.close();
System.out.println(longest);

match()

  • allMatch:Stream 中全部元素符合传入的 predicate,返回 true。不需要遍历所有的,只要一个元素不满足条件,就 skip 剩下的所有元素,返回 false

  • anyMatch:Stream 中只要有一个元素符合传入的 predicate,返回 true

  • noneMatch:Stream 中没有一个元素符合传入的 predicate,返回 true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// true
boolean isAllAdult = list.stream().
allMatch(p -> p.getAge() > 18);
System.out.println("All are adult? " + isAllAdult);

// false
boolean isThereAnyChild = list.stream().
anyMatch(p -> p.getAge() < 12);
System.out.println("Any child? " + isThereAnyChild);

// false
boolean noneMatch = list.stream()
.noneMatch(p -> p.getAge() > 0);
System.out.println("Any child? " + noneMatch);

生成随机整数:random

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Random random = new Random();
Supplier<Integer> num = random::nextInt;
Stream.generate(num).limit(10).forEach(System.out::println);
//Another way
System.out.println((int) (Math.random() * 5));

// // long streamSize生成的元素个数, int randomNumberOrigin 起始位,int randomNumberBound 结束位
// 五个位于 1(包括)和 10(不包括)之间的整型随机数
random.ints(5, 1, 10)
.sorted()
.forEach(System.out::print);

// 例如:[11, 16, 14, 19, 10] int、doubles、longs也可
List<Integer> listOfInts = random.ints(5, 10, 20)
.collect(LinkedList::new, LinkedList::add, LinkedList::addAll);

Random random = new Random();
//
List<Integer> listOfInts = random.ints(5, 10, 20)
.collect(LinkedList::new, LinkedList::add, LinkedList::addAll);
// 将Integer转为int型,去前三条求和
int sum2 = listOfInts.stream().mapToInt(Integer::intValue).limit(3).sum();

函数生成流

iterate 跟 reduce 操作很像,接受一个种子值,和一个 UnaryOperator(例如 f)。然后种子值成为 Stream 的第一个元素,f(seed) 为第二个,f(f(seed)) 第三个,以此类推。

1
2
3
4
5
6
7
8
9
10
11
12
/* 生成一个等差数列 */
// 生成流,第一个为0,后面的依次加1
Stream.iterate(0, n -> n + 1)

// 0 3 6 9 12 15 18 21 24 27
Stream.iterate(0, n -> n + 3).limit(10).forEach(x -> System.out.print(x + " "));

// 生成流,为 0 到 1 的随机双精度数
Stream.generate(Math :: random)

// 生成流,元素全为 1
Stream.generate(() -> 1)

按条件归组

  • groupingBy:根据什么分组

  • partitioningBy:其实是一种 特殊的 groupingBy,它依照条件测试的是否两种结果来构造返回的数据结构,get( true) 和 get(false) 能即为全部的元素对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 按照年龄分组
// {33=[Employee{id=8, age=33, gender='M', children=null}], 53=[Employee{id=4, age=53, gender='M', children=null}], 23=[Employee{id=2, age=23, gender='F', children=null}, Employee{id=9, age=23, gender='F', children=null}], 43=[Employee{id=3, age=43, gender='M', children=null}]}
Map<Integer, List<Employee>> groupingBy = list.stream()
.collect(Collectors.groupingBy(Employee::getAge));
System.out.println("groupingBy:" + groupingBy);

// 通过key值获取23岁的集合列表
// [Employee{id=2, age=23, gender='F', children=null}, Employee{id=9, age=23, gender='F', children=null}]
List<Employee> employeesOf23 = groupingBy.get(23);
System.out.println("employeesOf23:" + employeesOf23);

// 按照是否大于30岁分组
// {false=[Employee{id=2, age=23, gender='F', children=null}, Employee{id=9, age=23, gender='F', children=null}],
// true=[Employee{id=3, age=43, gender='M', children=null}, Employee{id=4, age=53, gender='M', children=null}, Employee{id=8, age=33, gender='M', children=null}]}
Map<Boolean, List<Employee>> collect = list.stream()
.collect(Collectors.partitioningBy(p -> p.getAge() > 30));
System.out.println("partitioningBy:" + collect);

去重

  1. 集合对象:根据字段去重
1
2
3
4
5
6
7
8
9
// 根据 projectCode去重 , collectingAndThen:Collectors操作后附加的整理步骤
List<ProjectStageDto> dataList = projectStageDtoList.stream().collect(
Collectors.collectingAndThen(
Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(o -> o.getProjectcode()))), ArrayList::new));

// 不能是Map<String, Object>
List<Map<String, String>> filterList = quickWayList.stream().collect(
Collectors.collectingAndThen(Collectors.toCollection(
() -> new TreeSet<>(Comparator.comparing(item -> item.get("id")))), ArrayList::new));
  1. 集合元素,也可针对String类型的去重
1
2
3
4
5
6
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 5, 5, 5, 6, 7);

List<Integer> distinctNumbers = numbers.stream()
.distinct()
.collect(Collectors.toList());
System.out.println(distinctNumbers);//1, 2, 3, 4, 5, 6, 7

并行流(ParallelStream)使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class TestParallelStream {
public static void main(String[] args) {
printFun();
}

public static void printFun() {
List<Integer> integersList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
integersList.add(i);
}
//普通集合 存储
List<Integer> parallelStorage = new ArrayList<>();
//同步集合 存储
List<Integer> parallelStorage2 = Collections.synchronizedList(new ArrayList<>());
//通过并行流存入普通集合parallelStorage中
integersList
.parallelStream()
.filter(i -> i % 2 == 0)
.forEach(i -> parallelStorage.add(i));
System.out.println("开始打印普通集合parallelStorage长度:"+parallelStorage.size());
parallelStorage
.stream()
.forEachOrdered(e -> System.out.print(e + " "));
System.out.println();
System.out.print("------------------------------------");
System.out.println();
//通过并行流存入同步集合parallelStorage2中
integersList
.parallelStream()
.filter(i -> i % 2 == 0)
.forEach(i -> parallelStorage2.add(i));
System.out.println("开始打印同步集合parallelStorage 长度:"+parallelStorage2.size());
parallelStorage2
.stream()
.forEachOrdered(e -> System.out.print(e + " "));
}
}

获取到的结果可能是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
开始打印普通集合parallelStorage
66 62 64 16 12 14 22 24 18 32 20 82 34 36 84 86 6 8 28 30 10 78 80 4 76 26 0 2 68 70 44 56 58 60 54 48 50 46 52 40 42 38 74 72 96 98 90 92 94 88
------------------------------------
开始打印同步集合parallelStorage
66 90 56 92 58 82 88 62 60 64 84 54 86 72 78 80 50 52 32 76 94 34 36 98 68 70 28 30 74 26 44 16 12 48 14 6 8 46 22 24 4 18 0 2 20 96 10 40 42 38

或者


开始打印普通集合parallelStorage
66 62 64 72 74 68 70 32 34 36 28 30 26 44 48 46 40 42 38 4 0 2 null null 12 14 24 18 20 54 90 82 92 50 52 84 88 86 78 80 96 76 98 94 10 6 56 8 58 60
------------------------------------
开始打印同步集合parallelStoragejava
66 62 64 32 56 90 34 16 92 36 58 72 12 88 28 30 74 14 60 68 26 70 54 22 50 44 52 24 18 48 40 20 42 46 82 38 6 8 10 96 84 86 4 94 98 0 2 78 80 76

获取的值有null值并且长度不等于50,按照理想来说的话: filter(i -> i % 2 == 0)只会获取50个数值,长度竟然变化的,甚至可能会出现ArrayIndexOutOfBoundsException,越界了。。。。。

因为长度以及数据的存放最根本的方法应该就是add的方法

1
2
3
4
5
6
7
8
9
10
11
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return <tt>true</tt> (as specified by {@link Collection#add})
*/
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}

我们再看ensureCapacityInternal(minCapacity)方法里的grow(minCapacity);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
transient Object[] elementData;

private void grow(int minCapacity) {
// overflow-conscious code
int oldCapacity = elementData.length;
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// minCapacity is usually close to size, so this is a win:
// 这个就会导致多线程下list的数组被修改,从而会有索引越界情况。
elementData = Arrays.copyOf(elementData, newCapacity);
}

借用大佬的解释图片也可知道:

如何解决上面的问题呢???

方法1:使用一个线程安全的数组

1
Collections.synchronizedList(new ArrayList<>())

方法二:直接使用collect();这种收集起来所有元素到新集合是线程安全的

1
2
3
List<Integer> collect = integersList
.parallelStream()
.filter(i -> i % 2 == 0).collect(Collectors.toList());

原理:parallelStream底层依赖Fork/Join框架

1
2
3
4
5
Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}

Fork/Join的思想是分治,先拆分任务,再合并结果,每个任务都用单独的线程去处理。所以虽然它同样使用ArrayList,但是我们看到他会为每个线程都创建一个ArrayList对象,最后用addAll方法把它们合并起来,每个线程操作的是自己的集合对象,自然不会有线程安全问题。

总结:这归根结底是一个线程安全问题,与parallelStream并行处理并没有半毛钱的关系,因此在使用多线程处理问题时候要充分考虑一下集合处理的安全问题,最根本的原因还是集合使用的错误

参考链接:

https://blog.csdn.net/u013099854/article/details/113887099

https://blog.csdn.net/y666666y/article/details/110393889

Optional

ofNullable

为空,创建一个userInfoDTO对象;不为空根据UserInfoDTO创建

1
2
3
Optional<UserInfo> optional1 = Optional.ofNullable(userInfoDTO); // 不为空

Optional<UserInfo> optional1 = Optional.ofNullable(null); // 为空
1
2
3
4
5
Optional.ofNullable(userInfoDTO).map(item -> {
User user = new User();
// ...
return save(survey);
});

参考:https://blog.csdn.net/aitangyong/article/details/54564100

参考链接:
IBM Developer:java8streamapi学习
IBM Developer:stream学习

打赏
  • 版权声明: 本网站所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  1. © 2020-2021 Lauy    湘ICP备20003709号-1

请我喝杯咖啡吧~

支付宝
微信