You will be fine

<Lambda> 7. JAVA8 in Action - Chapter 7

by BFine
반응형

JAVA8 in Action - part2

Chapter 7 - 병렬데이터 처리와 성능

7.1.0 병렬스트림

  • 외부반복을 내부반복 으로 바꾸면 네이티브 자바 라이브러리가 스트림 요소의 처리를 제어 할 수 있다.

    개발자가 컬렉션 데이터 처리 속도를 높이려고 따로 고민할 필요가 없다.

  • parallelStream 을 호출하면 병렬스트림이 생성된다. 이는 여러 청크로 분할한 스트림이다.

  • 병렬 스트림을 이용하면 멀티코어 프로세서들 이 각각의 청크를 처리하도록 할당할 수 있다.

7.1.1 순차스트림을 병렬 스트림으로 변환하기

  • parallelsequential 중에 가장 마지막 에 선언된 것이 실행된다.
  • 순차 스트림에 parallel 을 호출하면 스트림 자체에는 아무 변화가 일어나지 않는다.
  • 그 이유는 내부적으로 연산이 병렬로 수행해야 함을 의미하는 boolean 플래그 가 설정된다.
  • 병렬 스트림은 내부적으로 ForkJoinPool 을 사용한다. 기본적으로 프로세서 수에 상응하는 스레드를 갖는다.

7.1.2 스트림 성능 측정

  • for 루프는 저수준으로 작동해 기본값의 박싱이 없기 때문에 수행속도가 빠르다.
  • iterate 연산은 결과에 따라 다음 함수 입력이 달라지기 때문에 청크로 분할하기 어렵다.
  • 기본형Stream은 쉽게 청크로 분할할 수 있는 숫자 범위를 생산한다.
  • 병렬화를 이용하기 위해서는 몇 단계의 과정이 필요하다.
    • 스트림을 재귀적으로 분할
    • 각 서브스트림을 서로 다른 스레드의 리듀싱 연산으로 할당
    • 결과값을 다시 하나로 합침
  • 코어간 데이터 전송시간보다 오래 걸리는 작업만 병렬로 처리하는 것이 바람직하다.

7.1.3 병렬 스트림의 올바른 사용법

  • 많이 잘못 사용하는 부분은 공유된 상태 를 바꾸는 알고리즘을 사용하기 때문에 발생한다.
  • 병렬 계산에서는 공유된 가변 상태를 피해야 한다.

7.1.4 병렬 스트림 효과적으로 사용하기

  • 언제나 병렬 스트림이 순차스트림보다 빠른 것 은 아니다.
  • 오토박싱 문제에 주의하여야 한다.
  • limit , findFirst 처럼 요소 순서에 의존 하는 연산은 병렬처리에서 성능이 떨어진다.
  • 소량의 데이터에서는 병렬 스트림이 도움되지 않는다.
  • 최종 연산의 병합과정 비용을 살펴봐야 한다.

7.2.0 포크/조인 프레임워크

  • 병렬화할 수 있는 작업을 재귀적 으로 작은 작업으로 분할한다.
  • 그 후에 서브태스크 각각의 결과를 합쳐서 전체 결과를 만들도록 설계한다.
  • 포크/조인 프레임워크는 작업자 스레드에 분산 할당하는 ExecutorService 인터페이스를 구현한다.
  • 포크 는 태스크를 분할하는 것을 의미하고 조인 은 결과를 합치는 것을 의미한다.
  • 하나의 작업을 여러 개로 분할하여 큐에 담고 스레드는 접근하여 자신의 큐로 옮긴다.
  • 이때 스레드가 다른스레드 큐의 작업에도 접근 가능하다.
  • 포크조인에 대한 이해 참고

7.2.1 RecursiveTask 활용

  • 스레드 풀을 이용하려면 RecursiveTask 의 서브클래스를 만들어야 한다.
  • RecursiveTask를 정의하려면 compute 를 구현해야한다.
  • compute 메서드는 서브태스크로 더이상 분할할 수 없을때 각각의 서브태스크 결과 만들 로직을 정의한다.
  • 둘이상의 ForkJoinPool을 쓰지 않는다. 한번만 인스턴스화 해서 싱글턴 으로 저장한다.
  • 포크/조인 프레임워크로 병렬합계 수행
      public class Main{
        public static void main(String[] args) throws InterruptedException, ExecutionException {
          int[] arr = IntStream.range(1, 11).toArray();
          ForkJoinTask<Integer> task = new Cal(arr);
          int res = new ForkJoinPool().invoke(task);
          // 일을 Pool에 넣는다. 모든프로세서가 자유롭게 접근한다.
          System.out.println(res);
        }
      }
     
     
      class Cal extends RecursiveTask<Integer>{
        private final int[] numbers;
        private final int start;
        private final int end;
        private final int THRESHOLD = 2;
     
        private Cal(int[] numbers, int start, int end) {
          this.numbers = numbers;
          this.start = start;
          this.end = end;
        }
        public Cal(int[] numbers) {
          this(numbers,0,numbers.length);
        }
        @Override
        protected Integer compute() {
          int length = end - start;
          if(length <= THRESHOLD) {
          // 지금 이 스레드에서 처리하겠다. THRESHOLD까지
            return computeSeq();
          }
     
          Cal left = new Cal(numbers,start,start+length/2);
          left.fork(); // 비동기로 left 객체를 서브태스크로 분할
          Cal right = new Cal(numbers,start+length/2,end);
          int rightResult = right.compute();// 오른쪽 재귀
          int leftResult = left.join();
          // 위에서 서브태스크로 분할하여 다른스레드의 처리가 올때까지 기다렸다 합침
          System.out.println("조인함 : "+(leftResult+rightResult)
                    +" 처리쓰레드 : "+Thread.currentThread().getName());
          return leftResult+rightResult;
        }
     
        private int computeSeq() {
          int sum = 0;
          for(int i = start; i < end; i++) {
            sum+= numbers[i];
          }
          System.out.println("합계함 : "+start+" 끝 : "+(end-1)+" 결과 : "+sum
              +" 처리쓰레드 : "+Thread.currentThread().getName());
          return sum;
        }
     
        /********************************************************
          출력
          ----
     
          합계함 : 8 끝 : 9 결과 : 19 처리쓰레드 : ForkJoinPool-1-worker-1
          합계함 : 5 끝 : 6 결과 : 13 처리쓰레드 : ForkJoinPool-1-worker-3
          합계함 : 0 끝 : 1 결과 : 3 처리쓰레드 : ForkJoinPool-1-worker-4
          합계함 : 2 끝 : 2 결과 : 3 처리쓰레드 : ForkJoinPool-1-worker-3
          합계함 : 7 끝 : 7 결과 : 8 처리쓰레드 : ForkJoinPool-1-worker-1
          조인함 : 27 처리쓰레드 : ForkJoinPool-1-worker-1
          합계함 : 3 끝 : 4 결과 : 9 처리쓰레드 : ForkJoinPool-1-worker-2
          조인함 : 40 처리쓰레드 : ForkJoinPool-1-worker-1
          조인함 : 12 처리쓰레드 : ForkJoinPool-1-worker-2
          조인함 : 15 처리쓰레드 : ForkJoinPool-1-worker-2
          조인함 : 55 처리쓰레드 : ForkJoinPool-1-worker-1
          55
     
         *******************************************************/
     
      }
     

7.2.2 포크/조인 프레임워크 제대로 사용하는 방법

  • 두 서브태스크가 모두 시작된 다음에 join 을 호출해야한다.

  • 순차코드에서 병렬 계산을 시작할때만 invoke 를 사용한다.

  • 한쪽 작업에는 compute를 사용하는 것이 효과적이다.

    같은 스레드를 재사용하여 불필요한 서브태스크 할당을 피할 수 있음

  • 순차처리 보다 무조건 빠르지 않다. 서브태스크 실행시간이 포킹시간 보다 길어야한다.

7.2.3 작업 훔치기

  • 코어 개수와 상관없이 적절한 크기로 분할된 태스크를 포킹하는 것이 바람직하다.
  • 작업훔치기 기법에서는 스레드를 공정하게 분할한다.
  • 스레드는 이중 연결리스트를 가지고 있어 이 꼬리를 참조하여 다른 스레드의 작업을 가져온다.

7.3.0 Spliterator

  • 자바8에서 제공하는 인터페이스
  • Spliterator 는 분할할 수 있는 반복자라는 의미이다.
  • Iterator 처럼 요소 탐색기능을 제공하지만 병렬작업 에 특화된 특징이 있다.
    • tryAdvance 는 요소를 하나씩 소비하면서 요소가 남아있으면 반환한다.
    • trySplit 는 일부 요소를 분할해서 새로운 Spliterator를 생성한다.
    • estimateSize 는 탐색해야할 요소수를 반환한다.

7.3.1 분할과정

  • 스트림을 여러 스트림으로 분할하는 과정은 재귀적 으로 일어난다.
  • trySplit 의 결과가 null 일때까지 반복한다.
  • charaterisitic 은 Spliterator 자체 특성 집합을 포함하는 int를 반환한다.
    • ORDERED, DISINCT, SIZED 등 (p.248 참고)

7.3.2 커스텀 Spliterator 구현

  • 탐색하려는 데이터를 포함하는 스트림을 어떻게 병렬화할 것인지 정의

출처

  • Java8 in Action


반응형

블로그의 정보

57개월 BackEnd

BFine

활동하기