并行的Stream流

13

并行的Stream流

串行的Stream流

/**
     * 串行流
     */
    @Test
    public void test01(){
        Stream.of(5,6,8,3,1,6)
                .filter(s->{
                    System.out.println(Thread.currentThread() + "" + s);
                    return s > 3;
                }).count();
    }

获取并行流的两种方式

/**
     * 获取并行流的两种方式
     */
    @Test
    public void test02(){
        List<Integer> list = new ArrayList<>();
        // 通过List 接口 直接获取并行流
        Stream<Integer> integerStream = list.parallelStream();
        // 将已有的串行流转换为并行流
        Stream<Integer> parallel = Stream.of(1, 2, 3).parallel();
    }

并行流操作

/**
     * 并行流操作
     */
    @Test
    public void test03(){

        Stream.of(1,4,2,6,1,5,9)
                .parallel() // 将流转换为并发流,Stream处理的时候就会通过多线程处理
                .filter(s->{
                    System.out.println(Thread.currentThread() + " s=" +s);
                    return s > 2;
                }).count();
    }

并行流中的数据安全问题

/**
     * 并行流中的数据安全问题
     */
    @Test
    public void test01(){
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 1000; i++) {
            list.add(i);
        }
        System.out.println(list.size());
        List<Integer> listNew = new ArrayList<>();
        // 使用并行流来向集合中添加数据
        list.parallelStream()
                //.forEach(s->listNew.add(s));
                .forEach(listNew::add);
        System.out.println(listNew.size());
    }

加同步锁

/**
     * 加同步锁
     */
    @Test
    public void test02(){
        List<Integer> listNew = new ArrayList<>();
        Object obj = new Object();
        IntStream.rangeClosed(1,1000)
                .parallel()
                .forEach(i->{
                    synchronized (obj){
                        listNew.add(i);
                    }

                });
        System.out.println(listNew.size());
    }

使用线程安全的容器

/**
     * 使用线程安全的容器
     */
    @Test
    public void test03(){
        Vector v = new Vector();
        Object obj = new Object();
        IntStream.rangeClosed(1,1000)
                .parallel()
                .forEach(i->{
                    synchronized (obj){
                        v.add(i);
                    }

                });
        System.out.println(v.size());
    }

将线程不安全的容器转换为线程安全的容器

/**
     * 将线程不安全的容器转换为线程安全的容器
     */
    @Test
    public void test04(){
        List<Integer> listNew = new ArrayList<>();
        // 将线程不安全的容器包装为线程安全的容器
        List<Integer> synchronizedList = Collections.synchronizedList(listNew);
        Object obj = new Object();
        IntStream.rangeClosed(1,1000)
                .parallel()
                .forEach(i->{
                        synchronizedList.add(i);
                });
        System.out.println(synchronizedList.size());
    }