You will be fine

<Spring Batch> 5. ItemProcessor, ItemWriter

by BFine
반응형

출처 : http://www.yes24.com/Product/Goods/99422216

 

스프링 배치 완벽 가이드 2/e - YES24

스프링 배치의 `Hello, World!`부터 최근 플랫폼의 발전에 따른 클라우드 네이티브 기술을 활용한 배치까지 폭넓은 스프링 배치 활용 방법과 이와 관련된 유용한 내용을 다룬다. 또한 스프링 프레임

www.yes24.com

 

가. ItemProcessor

 a. 무엇인가?

  -  ItemProcessor는 Chunk 기반 배치에서 처리&변환(Output)를 담당하고 있다.  

     => 비필수이며 보통 개발자의 비지니스로직이 들어간다.

  -  입력으로 들어오는 Item이 null 일경우 .process 메서드에 접근하지 못한다.

 

 b. 특징

  -  Adapter가 제공되어 기존 서비스 재사용가능하다.

  -  ScriptItemProcessor로 특정 스크립트 실행 가능하다.

  -  chain 형태의 ItemProcessor 목록 만들기 가능하다.

  -  Input 데이터와 Output 데이터가 같은 타입일 필요없다.

    

 c. 종류

  -  다른 부분들과는 다르게 다양하지 않지만 여러 Processor들을 구현체로 제공하고 있다.

 

d. 로직을 여러 개의 ItemProcessor를 처리하고 싶을때

  -  ItemProcessor 하나에 모든 로직을 처리하기에 너무 코드가 길어지거나 세분화해 재사용하고 싶을 수 있다.

  -  이를 위해 스프링 배치는 CompositeItemProcessor 구현체를 제공하고 있다.

  -  코드 보면 알 수 있듯이 제네릭이 모두 와일드카드(<?>) 로 되어있어 타입에 대한 유연성이 있다.

     => 첫번째 ItemProcessor에서 Input, 마지막 ItemProcessor에서 Output에 타입만 맞춰 주면 된다.

  -  간단한 예제를 만들어보면 (단순 문자열 붙이기)

  @Bean
  public Step step(JpaPagingItemReader<Input> jpaPagingItemReader){

        CompositeItemProcessor<Input,Output> itemProcessor = new CompositeItemProcessor<>();

        List<ItemProcessor<?,?>> delegates = new ArrayList<>();
        delegates.add(new FirstItemProcessor());
        delegates.add(new SecondItemProcessor());
        delegates.add(new FinalItemProcessor());

        itemProcessor.setDelegates(delegates);


        return stepBuilderFactory.get("Chunk")
                .<Input,Output>chunk(1)
                .reader(jpaPagingItemReader)
                .processor(itemProcessor)
                .writer(new ItemWriter<Output>() {
                    @Override
                    public void write(List<? extends Output> items) throws Exception {
                        Thread.sleep(1000*1);
                        System.out.print("### Writer ");
                        items.forEach(System.out::println);
                    }
                })
                .build();
    }




public class FirstItemProcessor implements ItemProcessor<Input, Input> {

    @Override
    public Input process(Input item) throws Exception {
        item.setValue("Hello ");
        System.out.println(" 첫번째");
        return item;
    }
}

public class SecondItemProcessor implements ItemProcessor<Input, Input> {

    @Override
    public Input process(Input item) throws Exception {
        item.setValue(item.getValue()+"World");
        System.out.println(" 두번째");
        return item;
    }
}

public class FinalItemProcessor implements ItemProcessor<Input, Output> {

    @Override
    public Output process(Input item) throws Exception {
        item.setValue(item.getValue()+".");
        System.out.println(" 마지막 ");
        return new Output(item.getId(),item.getValue());
    }
}

 

e. 객체 별로 다른 ItemProcessor가 필요할때

  -  배치처리를 할때 Input 객체 별로 다른 처리 해야 하는 경우가 있을 수 있다.

  -  이때 ItemProcessor의 구현체 중 ClassifierCompositeItemProcessor 사용하여 간단하게 처리 할 수 있다. 

     => Classifier 필드를 통해 분류처리를 하여 item 별로 다른 ItemProcessor 를 적용 할 수 있다. 

    private final AItemProcessor aItemProcessor;
    private final BItemProcessor bItemProcessor;
    private final OtherItemProcessor otherItemProcessor;

    @Bean
    public Step step(//생략){

        ClassifierCompositeItemProcessor<Input, Output> processor = new ClassifierCompositeItemProcessor<>();

         processor.setClassifier(input -> {
            switch (input.getType()){
                case "A" :
                    return aItemProcessor;
                case "B" :
                    return bItemProcessor;
                default:
                    return otherItemProcessor;
            }
        });

        return stepBuilderFactory.get("Chunk")
                .<Input,Output>chunk(10)
                .reader(jpaPagingItemReader)
                .processor(processor)
                .writer(jpaItemWriter)
                .build();
    }
    

@Component
public class AItemProcessor implements ItemProcessor<Input,Output> {
    @Override
    public Output process(Input item) throws Exception {
        System.out.println("## A 처리 - "+item);
        return null;
    }
}

@Component
public class BItemProcessor implements ItemProcessor<Input,Output> {
    @Override
    public Output process(Input item) throws Exception {
        System.out.println("## B 처리 - "+item);
        return null;
    }
}


@Component
public class OtherItemProcessor implements ItemProcessor<Input,Output> {

    @Override
    public Output process(Input item) throws Exception {
        System.out.println("## Other - "+item);
        return null;
    }
}

나. ItemWriter

 a. 무엇인가?

  -  Chunk 기반 배치 프로세스중 쓰기를 담당하는 부분이다. (DB or 파일 등)

 

 b. 특징

  -  다른 부분들과 다르게 item을 건건이 처리하지 않고 Chunk 단위로 처리한다.

     => 파라미터를 보면 item 리스트인것을 볼 수 있다.

 

 c. 종류

  -  ItemReader와 동일하게 다양한 종류의 Writer를 제공하고 있다.

 

 d. JpaItemWriter

  -  JPA를 이용하여 DB 데이터를 입력할 수 있도록 ItemWriter 구현체를 제공하고 있다.

  -  아쉬운점은 아래 코드를 보면 item을 하나씩 처리해 bulk insert/update를 처리 할 수 없다.

      => ItemWriter를 상속받아 Custom으로 처리하거나 JdbcItemWriter를 사용해야 한다..

   - 간단하게 예제를 만들어 보고 실행해보았다.

  @Bean
    public Step step(JpaPagingItemReader<Input> jpaPagingItemReader, JpaItemWriter<Output> jpaItemWriter){
                    
        return stepBuilderFactory.get("Chunk")
                .<Input,Output>chunk(3)
                .reader(jpaPagingItemReader)
                .processor(new ItemProcessor<Input, Output>() {
                    @Override
                    public Output process(Input item) throws Exception {

                        Thread.sleep(1000*1);
                        System.out.println("### Process "+(++process)+"번 "+item);
                        return Output.builder()
                                .value(item.getValue())
                                .build();
                    }
                }).writer(jpaItemWriter)
                .build();
    }

    @Bean
    public JpaItemWriter<Output> jpaItemWriter(){
      return new JpaItemWriterBuilder<Output>()
                .entityManagerFactory(entityManagerFactory)
                .usePersist(true)
                .build();
    }

반응형

블로그의 정보

57개월 BackEnd

BFine

활동하기