<Spring Batch> 5. ItemProcessor, ItemWriter
by BFine
출처 : http://www.yes24.com/Product/Goods/99422216
가. ItemProcessor
a. 무엇인가?
- ItemProcessor는 Chunk 기반 배치에서 처리&변환(Output)를 담당하고 있다.
=> 비필수이며 보통 개발자의 비지니스로직이 들어간다.
- 입력으로 들어오는 Item이 null 일경우 .process 메서드에 접근하지 못한다.
b. 특징
- Adapter가 제공되어 기존 서비스 재사용가능하다.
- ScriptItemProcessor로 특정 스크립트 실행 가능하다.
- chain 형태의 ItemProcessor 목록 만들기 가능하다.
- Input 데이터와 Output 데이터가 같은 타입일 필요없다.
c. 종류
- 다른 부분들과는 다르게 다양하지 않지만 여러 Processor들을 구현체로 제공하고 있다.
d. 로직을 여러 개의 ItemProcessor를 처리하고 싶을때
- ItemProcessor 하나에 모든 로직을 처리하기에 너무 코드가 길어지거나 세분화해 재사용하고 싶을 수 있다.
- 이를 위해 스프링 배치는 CompositeItemProcessor 구현체를 제공하고 있다.
- 코드 보면 알 수 있듯이 제네릭이 모두 와일드카드(<?>) 로 되어있어 타입에 대한 유연성이 있다.
=> 첫번째 ItemProcessor에서 Input, 마지막 ItemProcessor에서 Output에 타입만 맞춰 주면 된다.
- 간단한 예제를 만들어보면 (단순 문자열 붙이기)
@Bean
public Step step(JpaPagingItemReader<Input> jpaPagingItemReader){
CompositeItemProcessor<Input,Output> itemProcessor = new CompositeItemProcessor<>();
List<ItemProcessor<?,?>> delegates = new ArrayList<>();
delegates.add(new FirstItemProcessor());
delegates.add(new SecondItemProcessor());
delegates.add(new FinalItemProcessor());
itemProcessor.setDelegates(delegates);
return stepBuilderFactory.get("Chunk")
.<Input,Output>chunk(1)
.reader(jpaPagingItemReader)
.processor(itemProcessor)
.writer(new ItemWriter<Output>() {
@Override
public void write(List<? extends Output> items) throws Exception {
Thread.sleep(1000*1);
System.out.print("### Writer ");
items.forEach(System.out::println);
}
})
.build();
}
public class FirstItemProcessor implements ItemProcessor<Input, Input> {
@Override
public Input process(Input item) throws Exception {
item.setValue("Hello ");
System.out.println(" 첫번째");
return item;
}
}
public class SecondItemProcessor implements ItemProcessor<Input, Input> {
@Override
public Input process(Input item) throws Exception {
item.setValue(item.getValue()+"World");
System.out.println(" 두번째");
return item;
}
}
public class FinalItemProcessor implements ItemProcessor<Input, Output> {
@Override
public Output process(Input item) throws Exception {
item.setValue(item.getValue()+".");
System.out.println(" 마지막 ");
return new Output(item.getId(),item.getValue());
}
}
e. 객체 별로 다른 ItemProcessor가 필요할때
- 배치처리를 할때 Input 객체 별로 다른 처리 해야 하는 경우가 있을 수 있다.
- 이때 ItemProcessor의 구현체 중 ClassifierCompositeItemProcessor 사용하여 간단하게 처리 할 수 있다.
=> Classifier 필드를 통해 분류처리를 하여 item 별로 다른 ItemProcessor 를 적용 할 수 있다.
private final AItemProcessor aItemProcessor;
private final BItemProcessor bItemProcessor;
private final OtherItemProcessor otherItemProcessor;
@Bean
public Step step(//생략){
ClassifierCompositeItemProcessor<Input, Output> processor = new ClassifierCompositeItemProcessor<>();
processor.setClassifier(input -> {
switch (input.getType()){
case "A" :
return aItemProcessor;
case "B" :
return bItemProcessor;
default:
return otherItemProcessor;
}
});
return stepBuilderFactory.get("Chunk")
.<Input,Output>chunk(10)
.reader(jpaPagingItemReader)
.processor(processor)
.writer(jpaItemWriter)
.build();
}
@Component
public class AItemProcessor implements ItemProcessor<Input,Output> {
@Override
public Output process(Input item) throws Exception {
System.out.println("## A 처리 - "+item);
return null;
}
}
@Component
public class BItemProcessor implements ItemProcessor<Input,Output> {
@Override
public Output process(Input item) throws Exception {
System.out.println("## B 처리 - "+item);
return null;
}
}
@Component
public class OtherItemProcessor implements ItemProcessor<Input,Output> {
@Override
public Output process(Input item) throws Exception {
System.out.println("## Other - "+item);
return null;
}
}
나. ItemWriter
a. 무엇인가?
- Chunk 기반 배치 프로세스중 쓰기를 담당하는 부분이다. (DB or 파일 등)
b. 특징
- 다른 부분들과 다르게 item을 건건이 처리하지 않고 Chunk 단위로 처리한다.
=> 파라미터를 보면 item 리스트인것을 볼 수 있다.
c. 종류
- ItemReader와 동일하게 다양한 종류의 Writer를 제공하고 있다.
d. JpaItemWriter
- JPA를 이용하여 DB 데이터를 입력할 수 있도록 ItemWriter 구현체를 제공하고 있다.
- 아쉬운점은 아래 코드를 보면 item을 하나씩 처리해 bulk insert/update를 처리 할 수 없다.
=> ItemWriter를 상속받아 Custom으로 처리하거나 JdbcItemWriter를 사용해야 한다..
- 간단하게 예제를 만들어 보고 실행해보았다.
@Bean
public Step step(JpaPagingItemReader<Input> jpaPagingItemReader, JpaItemWriter<Output> jpaItemWriter){
return stepBuilderFactory.get("Chunk")
.<Input,Output>chunk(3)
.reader(jpaPagingItemReader)
.processor(new ItemProcessor<Input, Output>() {
@Override
public Output process(Input item) throws Exception {
Thread.sleep(1000*1);
System.out.println("### Process "+(++process)+"번 "+item);
return Output.builder()
.value(item.getValue())
.build();
}
}).writer(jpaItemWriter)
.build();
}
@Bean
public JpaItemWriter<Output> jpaItemWriter(){
return new JpaItemWriterBuilder<Output>()
.entityManagerFactory(entityManagerFactory)
.usePersist(true)
.build();
}
'공부 > Spring Batch' 카테고리의 다른 글
<Spring Batch> 6. Exception에 대한 skip 처리하기 (0) | 2022.01.12 |
---|---|
<Spring Batch> 4. Chunk 기반 배치, ItemReader (0) | 2021.10.02 |
<Spring Batch> 3. 배치 테이블 Embedded DB(H2)로 따로 관리하기 (1) | 2021.09.27 |
<Spring Batch> 2. JobRepository, Config Customizing (0) | 2021.09.22 |
<Spring Batch> 1. JobParameters, ExecutionContext (0) | 2021.09.21 |
블로그의 정보
57개월 BackEnd
BFine