FlatFileItemReader
- DB가 아닌 Resource에서 데이터를 읽어올 수 있도록 구현된 구현체
- 2차원 데이터료 표현된 유형의 파일을 처리하는 ItemReader
- Resource와 LineMapper 두 가지 요소가 필요함
Resource
- FileSystemResource - new FileSystemResource("resource/path/config.xml")
=> 파일 시스템에서 자원을 읽어올 때 사용
- ClassPathResource - new ClassPathResource("classpath:path/config.xml")
=> 클래스패스에서 자원을 읽어올 때 사용
LineMapper
- 파일의 라인 한 줄을 Object로 변환해서 FlatFileItemReader로 리턴함
- 문자열을 토큰화해여 객체로 맵핑하는 과정이 필요함
- LineTokenizer와 FieldSetMapper를 사용하여 처리함
=> FieldSet
- FieldSet은 LineTokenizer에 의해 생성된 필드들의 집합을 나타냄. JDBC의 ResultSet과 유사하며, 특정 인덱스나 필드 이름으로 값을 읽어올 수 있음
=> LineTokenizer
- 파일의 한 라인을 받아서 이를 토큰으로 나누고, FieldSet 객체로 변환함.
=> FieldSetMapper
- FieldSetMapper는 FieldSet 객체를 받아서 원하는 객체로 매핑하여 리턴하는 역할
- resource에 읽을 파일 객체 할당
- delimeted 옵션으로 구분자 지정 가능
FlatFileItemReader - delemitedlinetokenizer
- 구분자 방식의 tokenizer
- 한 개 라인의 String을 구분자 기준으로 나누어 토큰화하는 방식
FlatFileItemReader - FixedLengthTokenizer
- 고정 길이의 필드를 사용하여 라인을 토큰으로 분리
FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();
tokenizer.setNames("id", "name", "description"); // 고정 길이 파일의 각 필드 이름
tokenizer.setColumns(
new Range(1, 4),
new Range(5, 14),
new Range(15, 24)
);
JsonItemReader
- Json데이터의 Parsing과 Binding을 JsonObjectReader 인터페이스 구현체에 위임하여 처리하는 ItemReader
- 두가지 구현체를 제공함
1. JacksonJsonObjectReader
2. GsonJsonObjectReader
@Bean
public JsonItemReader<Person> jsonItemReader() {
return new JsonItemReaderBuilder<Person>()
.jsonObjectReader(new JacksonJsonObjectReader<>(Person.class))
.resource(new ClassPathResource("data.json"))
.name("jsonItemReader")
.build();
}
@Bean
public Step step1(StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("step1")
.<Person, Person>chunk(10)
.reader(jsonItemReader())
.writer(personWriter())
.build();
}
Cursor Based & Paging Based
- 배치 어플리케이션은 실시간적 처리가 어려운 대용량 데이터를 다룸
=> 이때 DB I/O의 성능문제와 메모리 자원의 효율성 문제를 해결할 수 있어야함
=> 이를 해결하기 위해, Cursor Based와 Paging Based 로 해결함
Cursor Based
- JDBC ResultSet의 기본 매커니즘
- 현재 행에 커서를 유지하여, 다음 데이터를 호출하면 다음 행으로 커서를 이동하여 데이터 반환이 이루어지는 Streaming 방식의 I/O
- ResultSet이 open될 때 마다 next()메서드가 호출되어 Database의 데이터가 반환되고 객체와 맵핑 이루어짐
- DB Connection이 연결되면, 배치 처리가 완료될때까지 데이터를 읽어오기 때문에 DB와 SocketTimeout을 충분히 큰 값으로 설정해야함
- 모든 결과를 메모리에 할당함 => 메모리 사용량이 많아짐
Paging Based 처리
- 페이징 단위로 데이터를 조회하는 방식 -> Page Size만큼 한 번에 메모리로 가지고 온 다음 한 개씩 읽음
- 한 페이지를 읽을 때마다 Connection을 맺고 끊음 -> 대량의 데이터를 처리하더라도 SocketTimeout 예외가 거의 일어나지 않음
- 시작 행 번호를 지정하고, 페이지에 반환시키고자 하는 행의 수를 지정한 후 사용 - Offset, Limit
- 페이징 단위의 결과만 메모리에 할당하기 때문에 메모리 사용량이 적어짐
JdbcCursorItemReader
- cursor기반의 JDBC 구현체
- ResultSet과 함께 사용되며, Datasource에서 Connection을 얻어와 SQL을 실행함
- Thread 안정성을 보장하지 않음 -> 멀티스레드에서 사용할 경우 동시성 이슈가 발생하지 않도록 별도 동기화 처리 필요
@Bean
public JdbcCursorItemReader<Person> jdbcCursorItemReader() {
return new JdbcCursorItemReaderBuilder<Person>()
.dataSource(dataSource())
.name("jdbcCursorItemReader")
.sql("SELECT id, first_name, last_name, age FROM person")
.rowMapper(new BeanPropertyRowMapper<>(Person.class))
.build();
}
@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.h2.Driver");
dataSource.setUrl("jdbc:h2:mem:testdb");
dataSource.setUsername("sa");
dataSource.setPassword("");
return dataSource;
}
1. Step 시작
- ItemStream의 open() 호출:
- DB Connection: 데이터베이스 연결을 염
- PrepareStatement: 쿼리를 준비함
- ResultSet: 쿼리 실행 결과를 받음
2. 데이터 읽기
- JdbcCursorItemReader의 read() 호출:
- readCursor(): 커서에서 데이터를 읽음
- RowMapper: ResultSet의 현재 행을 객체로 매핑. 맵핑된 객체를 반환함.
- ResultSet의 next() 호출: 커서를 다음 행으로 이동시킴.
이 과정은 배치 작업의 청크 크기(Chunk Size)에 따라 반복됨
3. Step 종료
- ItemStream의 close() 호출:
- Close ResultSet: ResultSet을 닫음.
- Close PrepareStatement: PreparedStatement를 닫음.
- Close Connection: 데이터베이스 연결을 닫음.
JpaCursorItemReader
- Spring Batch 4.3버전부터 지원
- Cursor기반의 JPA구현체로, EntityMangerFactory객체가 필요함. 쿼리는 JPQL을 사용함
1. Step 시작 시
- ItemStream의 open() 호출:
- Create EntityManager: 엔티티 매니저를 생성
- Create Query: JPQL 또는 네이티브 SQL 쿼리를 생성
- Create ResultStream: 쿼리 실행 결과를 스트림 형태로 생성
2. 데이터 읽기
- JpaCursorItemReader의 read() 호출:
- doRead(): 스트림에서 데이터를 읽음.
- Iterator: 스트림의 현재 위치에서 데이터를 읽어옵니다. 매핑된 객체를 반환함.
- ResultStream의 next() 호출: 스트림의 다음 요소로 이동함.
이 과정은 배치 작업의 청크 크기(Chunk Size)에 따라 반복됨
3. Step 종료 시
- ItemStream의 close() 호출:
- Close EntityManager: 엔티티 매니저를 닫아 자원을 해제
@Bean
public JpaCursorItemReader<Person> jpaCursorItemReader() {
return new JpaCursorItemReaderBuilder<Person>()
.name("jpaCursorItemReader")
.entityManagerFactory(entityManagerFactory())
.queryString("SELECT p FROM Person p")
.build();
}
@Bean
public EntityManagerFactory entityManagerFactory() {
return Persistence.createEntityManagerFactory("example-unit");
}
JdbcPagingItemReader
- Paging 기반의 JDBC 구현체로서, 시작행 번호와 페이지에서 반환할 행수를 지정해서 SQL을 실행함
- offset과 limit을 PageSize에 맞게 자동으로 생성해줌
-> 페이징 단위로 데이터를 조회할 때마다 새로운 쿼리가 실행됨
- 멀티 스레드 환경에서 Thread 안정성을 보장하기 때문에, 별도의 동기화를 할 필요가 없음
- 페이지마다 새로운 쿼리를 실행하기 때문에, 데이터의 순서가 보장될 수 있도록 order by 구문이 필수임.
PagingQueryProvider
- 쿼리 실행에 필요한 쿼리문을 ItemReader에게 제공하는 클래스
- 데이터베이스마다 페이징전략이 다르기 때문에, 각 DB 유형마다 다른 PagingQueryProvider를 사용해야함
- Select 절, from절, sortKey는 필수로 설정해야하며, where,group by절은 필수가 아님
- selectClause와 fromClause, groupClause, sortKeys를 설정하는 부분이 paigingQueryProvider 역할
@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("org.h2.Driver");
dataSource.setUrl("jdbc:h2:mem:testdb");
dataSource.setUsername("seah");
dataSource.setPassword("1111");
return dataSource;
}
@Bean
public JdbcPagingItemReader<Person> pagingItemReader() throws Exception {
return new JdbcPagingItemReaderBuilder<Person>()
.dataSource(dataSource())
.name("pagingItemReader")
.queryProvider(queryProvider())
.pageSize(10)
.rowMapper(new BeanPropertyRowMapper<>(Person.class))
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
queryProvider.setDataSource(dataSource());
queryProvider.setSelectClause("SELECT id, first_name, last_name, age");
queryProvider.setFromClause("FROM person");
queryProvider.setSortKey("id");
return queryProvider;
}
- sortKey를 id로 주어, id를 기준으로 정렬되도록 하고 있음.
public class JdbcPagingItemReader<T> extends AbstractPagingItemReader<T> {
private volatile boolean initialized = false;
private volatile boolean executed = false;
@Override
protected void doReadPage() {
synchronized (this) {
if (!executed) {
// 쿼리 실행 코드 (JdbcTemplate을 사용하여 데이터베이스에서 데이터 읽어오기)
this.executed = true;
}
}
// 페이징 쿼리 실행 및 데이터 읽기
this.results = getJdbcTemplate().query(getQueryProvider().generateSelectQuery(page, pageSize),
new RowMapper<T>() {
@Override
public T mapRow(ResultSet rs, int rowNum) throws SQLException {
return mapRow(rs, rowNum);
}
});
}
}
- JdbcPagingItemReader는 doRead()시 내부적으로 synchronized() 블록이 구현되어있기 때문에, 동기화가 보장되는 것임.
JpaPagingItemReader
- Paging 기반의 JPA 구현체로서, EntityMangerFactory 객체가 필요하며, 쿼리는 JPQL을 사용함
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<T>()
.name(“pagingItemReader")
.pageSize(int count)
.queryString(String JPQL)
.EntityManagerFactory(EntityManagerFactory)
.parameterValue(Map<String, Object> parameters)
.build();
}
출처 -
https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98/dashboard
해당 강의를 보고 작성 및 추가적으로 조사한 내용을 정리하였습니다.
'Spring' 카테고리의 다른 글
스프링 배치 반복 및 오류 제어 (0) | 2024.07.13 |
---|---|
Spring Batch - ItemProcessor, ItemWriter (0) | 2024.07.06 |
Spring Batch - Chunk (0) | 2024.06.22 |
Spring Batch - Flow (0) | 2024.06.15 |
스프링 배치 - Job / Step (0) | 2024.06.05 |