Chunk
- 여러 개의 아이템을 하나로 묶은 하나의 덩어리, 블록
- 한번에 하나씩 아이템을 입력받아, chunk단위의 덩어리로 만든 후, 트랜잭션을 처리함. 즉 chunk단위의 commit과 rollback이 이루어짐
- 대용량 데이터를 한번에 처리하는게 아니라, 청크 단위로 쪼개어서 작업함
=> 이렇게 청크 단위로 쪼개서 작업하는 이유는,
1. 메모리 효율성 : 한꺼번에 너무 많은 데이터를 메모리에 올리면 메모리 부족 문제가 발생할 수 있음. 청크 단위로 적절한 양의 데이터를 읽어 메모리 사용량을 줄일 수 있음.
2. 트랜잭션 관리 : 부분적으로 처리하다가, 오류가 나면 롤백 처리 하는 등 일관성 유지 가능
3. 쓰기 효율성 : ItemWriter에서는 청크를 모아서 한 번에 쓰기 작업을 처리하는데, 데이터를 청크 단위로 모아서 쓰면, 불필요한 I/O작업을 줄일 수 있음.
Chunk<I>
- ItemReader로 읽은 하나의 아이템을 Chunk에서 정한 개수만큼 반복해서 저장
- 청크 단위로 읽어 들인 입력 데이터 모음
Chunk<O>
- output용 chunk
- ItemReader로부터 전달받은 Chunk<I>를 참조해서 ItemProcessor에서 적절하게 가공, 필터링한다음 ItemWriter에서 전달하는 타입
- 청크 단위로 트랜잭션을 실행하는 프로세스임.
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<String, String>chunk(5)
.reader(new ListItemReader<>(Arrays.asList("item1", "item2"))
.processor(new ItemProcessor<String, String>() {
@Override
public String process(String item) throws Exception {
Thread.sleep(300);
System.out.println("item = " + item);
return "my" + item;
}
})
.writer(new ItemWriter<String>() {
public void write(List<? extends String> items) throws Exception {
Thread.sleep(300);
System.out.println("items = " + items);
}
})
.build();
}
ChunkOrientedTasklet
- 스프링 배치에서 제공하는 Tasklet의 구현체 / Chunk 지향 프로세싱을 담당함
- ItemReader, ItemWriter, ItemProcessor를 사용하여 Chunk 기반의 데이터 입출력처리를 담당함
- TaskletStep에 의해 반본적으로 실행됨
- ChunkOrientedTasklet이 실행될 때마다 매번 새로운 트랜잭션이 생성되어 처리가 이루어짐
- Exception이 발생할 경우, 해당 Chunk는 롤백이 됨 -> 이전에 커밋한 청크는 완료된 상태가 유지됨.
public Step chunkStep() {
return stepBuilderFactory.get("chunkStep")
.<I,O>chunk(10)
.<I,O>chunk(CompletionPolicy)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.stream(ItemStream())
.readerIsTransactionalQueueu()
.listener(ChunkListener)
.build();
}
ChunkProvider
- ItemReader를 사용해서 소스로부터 아이템을 Chunk Size만큼 읽어서 Chunk단위로 만들어 제공하는 도메인 객체
- Chunk<I>를 만들고, 내부적으로 반복문을 사용해서 ItemReader.read()를 계속 호출하면서 item을 chunk에 쌓음
- ChunkProvider가 호출될 때마다 새로운 Chunk생성
=> 반복문을 돌면서, Chunk size만큼 item을 읽으면서 반복문 종료 및 ChunkProcessor로 넘어감
- 기본 구현체로 SimpleChunkProvider와 FaultTolerantChunkProvider가 있음
ChunkProcessor
- ItemProcessor를 사용해서 Item을 변형, 가공, 필터링하고 Itemwriter를 사용해서 Chunk데이터를 저장, 출력함.
- Chunk<O>를 만들고, 앞에서 넘어온 Chunk<I>의 Item을 한 건씩 처리함.
- ItemProcessor의 처리가 완료되면, Chunk<O>에 있는 List<Item>을 ItemWriter에게 전달함
- ItemWriter가 처리가 완료되면, Chunk 트랜잭션이 종료하게 되고, Step반복문에서 ChunkOrientedTasklet이 새롭게 실행된다
- ItemWriter는 Chunk size만큼 데이터를 Commit 처리함 -> Chunk Size는 곧 Commit Interval임
- 기본 구현체로 SimpleChunkProcessor와 FaultTolerantChunkProcessor가 있음
청크 기반 처리의 전체 흐름
- ChunkProvider가 데이터를 청크 단위로 읽어옴.
- ChunkProvider는 ItemReader를 사용하여 소스에서 데이터를 읽어와서 Chunk<I>를 생성함.
- ChunkProcessor가 읽어온 데이터를 처리함.
- ChunkProcessor는 ItemProcessor를 사용하여 Chunk<I>를 받아 각 아이템을 처리하고, 결과를 Chunk<O>로 변환함.
- ItemWriter가 처리된 데이터를 출력함.
- ItemWriter는 Chunk<O>를 받아서 최종 데이터를 데이터베이스나 파일 등에 저장.
ItemReader
- 다양한 입력으로부터 데이터를 읽어서 제공하는 인터페이스
- ChunkOrientedTasklet 실행시 필수적 요소
ItemWriter
- Chunk단위로 데이터를 받아 일괄 출력 작업을 함
- 아이템 하나가 아닌 아이템 리스트를 전달받음
- ChunkOrientedTasklet 실행시 필수적 요소임.
- void write(List<? extends T> items)
=> 출력 데이터를 아이템 리스트로 받아 처리함
=> 출력이 완료되고, 트랜잭션이 종료되면 새로운 Chunk 단위 프로세스로 이동함.
ItemProcessor
- 데이터를 출력하기 전에 데이터를 가공, 변형, 필터링하는 역할
- ItemReader 및 ItemWriter와 분리되어 비즈니스 로직을 구현 가능하다
- ItemReader로부터 받은 아이템을 특정 타입으로 변환하여 ItemWriter에 넘겨줌
- 필수는 아님.
청크 프로세스 아키텍처
출처 )
https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98
'Spring' 카테고리의 다른 글
Spring Batch - ItemProcessor, ItemWriter (0) | 2024.07.06 |
---|---|
Spring Batch - ItemReader (0) | 2024.06.29 |
Spring Batch - Flow (0) | 2024.06.15 |
스프링 배치 - Job / Step (0) | 2024.06.05 |
Spring Batch - 개요, 도메인 이해 (0) | 2024.05.26 |