并行的Stream流
串行的Stream流
/**
* 串行流
*/
@Test
public void test01(){
Stream.of(5,6,8,3,1,6)
.filter(s->{
System.out.println(Thread.currentThread() + "" + s);
return s > 3;
}).count();
}
获取并行流的两种方式
/**
* 获取并行流的两种方式
*/
@Test
public void test02(){
List<Integer> list = new ArrayList<>();
// 通过List 接口 直接获取并行流
Stream<Integer> integerStream = list.parallelStream();
// 将已有的串行流转换为并行流
Stream<Integer> parallel = Stream.of(1, 2, 3).parallel();
}
并行流操作
/**
* 并行流操作
*/
@Test
public void test03(){
Stream.of(1,4,2,6,1,5,9)
.parallel() // 将流转换为并发流,Stream处理的时候就会通过多线程处理
.filter(s->{
System.out.println(Thread.currentThread() + " s=" +s);
return s > 2;
}).count();
}
并行流中的数据安全问题
/**
* 并行流中的数据安全问题
*/
@Test
public void test01(){
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add(i);
}
System.out.println(list.size());
List<Integer> listNew = new ArrayList<>();
// 使用并行流来向集合中添加数据
list.parallelStream()
//.forEach(s->listNew.add(s));
.forEach(listNew::add);
System.out.println(listNew.size());
}
加同步锁
/**
* 加同步锁
*/
@Test
public void test02(){
List<Integer> listNew = new ArrayList<>();
Object obj = new Object();
IntStream.rangeClosed(1,1000)
.parallel()
.forEach(i->{
synchronized (obj){
listNew.add(i);
}
});
System.out.println(listNew.size());
}
使用线程安全的容器
/**
* 使用线程安全的容器
*/
@Test
public void test03(){
Vector v = new Vector();
Object obj = new Object();
IntStream.rangeClosed(1,1000)
.parallel()
.forEach(i->{
synchronized (obj){
v.add(i);
}
});
System.out.println(v.size());
}
将线程不安全的容器转换为线程安全的容器
/**
* 将线程不安全的容器转换为线程安全的容器
*/
@Test
public void test04(){
List<Integer> listNew = new ArrayList<>();
// 将线程不安全的容器包装为线程安全的容器
List<Integer> synchronizedList = Collections.synchronizedList(listNew);
Object obj = new Object();
IntStream.rangeClosed(1,1000)
.parallel()
.forEach(i->{
synchronizedList.add(i);
});
System.out.println(synchronizedList.size());
}