Spring batch가 무엇인지 작성한 글이 아닌
어떻게 활용했는지에 대한 글입니다!!

상황

이번 프로젝트는 AI 기반 맛집 추천 프로젝트로, 데이터 수집이 매우 중요하다.

Image

Image

API를 사용할까 하며 이런저런 계산도 해봤지만
팀원들과 여러번의 회의 끝에 결국
공공데이터 + 이 정보 기반으로 검색 크롤링
이 방법이 최선이라는 결론에 도달했다.

그리고 방학 기간부터 개발을 시작했는데
크게 문제점이 3가지 발생했다.

CSV : 오류, 속도

크롤링 들어가기 전부터 쉽지 않았다.

첫번째로 오류가 많이 발생했다.
파일마다 컬럼 이름도 다르고,정보가 들쭉날쭉 했다.

중요한건 어떤 레코드가 실패했는지 알수가 없었고, 안정적으로 모든 데이터를 db에 추가하기 힘들었다.

두번째로 속도가 많이 느렸다.
하나의 레코드를 읽고 처리해서 쓰는데 약 0.01초가 걸렸다.

일반음식점 70만개, 휴게음식점 20만개 총 90만개의 레코드를 추가하기는 너무 느렸다.

크롤링 : 진행상황 확인, 실패 기록

효율적으로 크롤링을 진행하기 위해서
어디까지 진행되었는지, 어떤 레코드가 왜 실패했는지 체계적으로 기록해야했다.

크롤링을 진행하는 서비스에 구현하는 방법도 가능했지만
더 좋은 방법을 찾고 싶었다.

요구사항 : 특정 시간에 작업

특정 시간에 작업을 진행해야 하는 요구사항이 있었다.

예를 들어

  1. 사진 정보가 중요한 서비스이기 때문에
    특정 시간에 데이터를 추가하는 작업이 필요했다.

  2. 금주의 투표 기능을 개발해야 했는데,
    매주 일요일 자정에 한주동안 쌓인 투표를 백업하고,
    초기화 후 금주의 투표를 다시 시작해야 하는 요구사항이 있었다.

Spring batch

이 모든걸 해결할 방법을 찾다가 Spring batch를 알게 되었다.

  1. CSV 파일 read, write 배치 처리
  2. Listener를 활용해 실패 기록
  3. Quartz를 활용해 특정 시간에 작업

Spring batch : CSV -> DB

1,2185,일반음식점 추가 step,1,2025-01-19 03:53:27.587107,2025-01-19 03:53:27.590110,2025-01-19 04:01:38.464664,COMPLETED,2183,2182247,1485250,696997,0,0,0,0,COMPLETED,"",2025-01-19 04:01:38.465663

Image

70만개 레코드를 추가하는 작업이 약 10초밖에 안걸렸다

0.01 * 70만개 = 7000초 = 116분 = 1시간 56분

Batch를 사용하지 않은 코드도 개선할 여지가 있었겠지만
약 700배 성능향상.. 효과가 좋았다

코드

BatchConfig

@Configuration
@RequiredArgsConstructor
public class RestaurantBatchConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private final RestaurantReaderConfig restaurantReaderConfig;
    private final RestaurantProcessorConfig restaurantProcessorConfig;
    private final RestaurantWriterConfig restaurantWriterConfig;

    @Bean
    public Job restaurantCSVJob() {
        return new JobBuilder("restaurantJob", jobRepository)
                .start(restaurantCSVStep())
                .next(restaurantCSVStep2())
                .build();
    }


    @Bean
    public Step restaurantCSVStep() {
        return new StepBuilder("일반음식점 추가 step", jobRepository)
                .<RestaurantCSVDto, Restaurant>chunk(1000, transactionManager)
                .reader(restaurantReaderConfig.csvReader())
                .processor(restaurantProcessorConfig.csv2DBProcessor())
                .writer(restaurantWriterConfig.restaurantItemWriter())
                .build();
    }

    @Bean
    public Step restaurantCSVStep2() {
        return new StepBuilder("휴게음식점 추가 step", jobRepository)
                .<RestaurantCSVDto, Restaurant>chunk(1000, transactionManager)
                .reader(restaurantReaderConfig.csvReader2())
                .processor(restaurantProcessorConfig.csv2DBProcessor())
                .writer(restaurantWriterConfig.restaurantItemWriter())
                .build();
    }
}



Reader

@Configuration
@RequiredArgsConstructor
public class RestaurantReaderConfig {

    private final EntityManagerFactory entityManagerFactory;

    @Bean
    public FlatFileItemReader<RestaurantCSVDto> csvReader() {
        return new FlatFileItemReaderBuilder<RestaurantCSVDto>()
                .name("restaurantCSVReader")
                .encoding("EUC-KR")
                .resource(new ClassPathResource("일반음식점.csv"))

                .lineMapper(getMapper())
                .linesToSkip(1)
                .build();
    }

    @Bean
    public FlatFileItemReader<RestaurantCSVDto> csvReader2() {
        return new FlatFileItemReaderBuilder<RestaurantCSVDto>()
                .name("restaurantCSVReader")
                .encoding("EUC-KR")
                .resource(new ClassPathResource("휴게음식점.csv"))

                .lineMapper(getMapper())
                .linesToSkip(1)
                .build();
    }
}

Processor


@Configuration
@RequiredArgsConstructor
public class RestaurantProcessorConfig {

    private final ModelMapper modelMapper;
    private final SeleniumService seleniumService;

    private final NaverReviewFeatureRepository naverReviewFeatureRepository;
    private final NaverReviewFeatureCountRepository naverReviewFeatureCountRepository;

    @Bean
    public ItemProcessor<RestaurantCSVDto, Restaurant> csv2DBProcessor() {
        return new ItemProcessor<RestaurantCSVDto, Restaurant>() {
            @Override
            public Restaurant process(RestaurantCSVDto item) throws Exception {

                if (item.getBusinessStatusName().equals("폐업")) {
                    return null;
                }

                OpenDataInformation openDataInformation = modelMapper.map(item, OpenDataInformation.class);
                return Restaurant.builder()
                        .openDataInformation(openDataInformation)
                        .build();
            }
        };
    }

Writer

@Configuration
@RequiredArgsConstructor
public class RestaurantWriterConfig {

    private final EntityManagerFactory entityManagerFactory;

    @Bean
    public JpaItemWriter<Restaurant> restaurantItemWriter() {
        return new JpaItemWriterBuilder<Restaurant>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

}

Spring batch 크롤링 : DB -> DB

크롤링 + Spring batch 관련 정보가 인터넷에 없어서 약간 헤맸다.

아이디어는 간단하다.

  • DB에 있는 식당을 read

  • Processor에서 크롤링 수행
    크롤링 Service는 식당 이름, 주소를 주면 크롤링 수행, DTO 반환
    실패시 해당 레코드 rollback (chunk size = 1)
    Listener에서 실패 기록 (어떤 레코드, 왜 실패했는지)

  • Writer에서 update


Processor, Writer, Reader만 간단하게 작성하면
작업을 언제 실행했는지, 지금까지 몇번 진행되었는지, 몇개의 레코드를 write, skip, rolback했는지
모든 정보가 기록되고, chunck size로 간편하게 트랜잭션 관리까지 할 수 있어 편리하다.



생겼던 문제 (트랜잭션)

Listener를 넣었는데도 불구하고 실패한 레코드가 기록되지 않았다.
해겷하기 위해 로그를 보니
트랜잭션을 롤백하며 실패한 레코드가 기록되지 않았다.

이유
Spring batch는 기본적으로 chunck를 단위로 트랜잭션을 관리한다.

그래서 실패하는 경우에 chunck 안에서 read -> process Restaurant(실패) -> write Failed Record(listen) -> rollback

해결하기 위해 Listener에서 발생하는 트랜잭션은
독립적인 트랜잭션으로 관리해야한다.

    @Transactional(propagation = REQUIRES_NEW)
    @Override
    public void onProcessError(Restaurant item, Exception e) {

        if (e.getMessage().length() > 1024) {
            e = new Exception(e.getMessage().substring(0, 1024));
        }

        FailedRecord failedRecord = FailedRecord.builder()
                .recordType("PROCESS")
                .recordDataId(item.getRestaurantId())
                .errorMessage(e.getMessage())
                .build();

        failedRecordRepository.save(failedRecord);
    }

코드

BatchConfig



@Configuration
@RequiredArgsConstructor
public class CrawlBatchConfig {

    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private final RestaurantReaderConfig restaurantReaderConfig;
    private final RestaurantProcessorConfig restaurantProcessorConfig;
    private final RestaurantWriterConfig restaurantWriterConfig;

    private final RestaurantProcessListener restaurantProcessListener;
    private final RestaurantWriteListener restaurantWriteListener;

    @Bean
    public Job crawlJob() {
        return new JobBuilder("crawlJob", jobRepository)
                .incrementer(new RunIdIncrementer())
                .start(crawlStep())
                .build();
    }

    @Bean
    public Step crawlStep() {
        return new StepBuilder("crawlStep", jobRepository)
                .<Restaurant, Restaurant>chunk(1, transactionManager)
                .reader(restaurantReaderConfig.dbRestaurantReader())
                .processor(restaurantProcessorConfig.db2DBCrawlProcessor())
                .writer(restaurantWriterConfig.restaurantItemWriter())

//                .taskExecutor(new SimpleAsyncTaskExecutor()) // 병렬처리

                .faultTolerant()
                .skip(Exception.class)
                .skipLimit(300)

                // 실패 레코드 기록
                .listener(restaurantProcessListener)
                .listener(restaurantWriteListener)

                .build();
    }


}

Reader

    @Bean
    public JpaPagingItemReader<Restaurant> dbRestaurantReader() {

        JpaPagingItemReader<Restaurant> reader = new JpaPagingItemReader<>();
        reader.setEntityManagerFactory(entityManagerFactory);
        reader.setPageSize(5);

        reader.setMaxItemCount(300);


        reader.setQueryString("SELECT r FROM Restaurant r " +
                                      "WHERE r.openDataInformation.fullAddress LIKE :address" +
                                      " AND r.crawlComplete = false" +
                                      " AND r.restaurantId NOT IN (SELECT f.recordDataId FROM FailedRecord f)");
        reader.setParameterValues(Map.of("address", "%마포구%"));

        return reader;
    }

Writer

@Configuration
@RequiredArgsConstructor
public class RestaurantWriterConfig {

    private final EntityManagerFactory entityManagerFactory;

    @Bean
    public JpaItemWriter<Restaurant> restaurantItemWriter() {
        return new JpaItemWriterBuilder<Restaurant>()
                .entityManagerFactory(entityManagerFactory)
                .build();
    }

}

Listener

@Component
@RequiredArgsConstructor
public class RestaurantProcessListener implements ItemProcessListener<Restaurant, Restaurant> {

    private final FailedRecordRepository failedRecordRepository;


    @Transactional(propagation = REQUIRES_NEW)
    @Override
    public void onProcessError(Restaurant item, Exception e) {

        if (e.getMessage().length() > 1024) {
            e = new Exception(e.getMessage().substring(0, 1024));
        }

        FailedRecord failedRecord = FailedRecord.builder()
                .recordType("PROCESS")
                .recordDataId(item.getRestaurantId())
                .errorMessage(e.getMessage())
                .build();

        failedRecordRepository.save(failedRecord);
    }
}


@Component
@RequiredArgsConstructor
public class RestaurantWriteListener implements ItemWriteListener<Restaurant> {

    private final FailedRecordRepository failedRecordRepository;


    @Transactional(propagation = REQUIRES_NEW)
    @Override
    public void onWriteError(Exception exception, Chunk<? extends Restaurant> items) {

        items.getItems().forEach(item -> {
            FailedRecord failedRecord = FailedRecord.builder()
                    .recordType("WRITE")
                    .recordDataId(item.getRestaurantId())
                    .errorMessage(exception.getMessage())
                    .build();

            failedRecordRepository.save(failedRecord);
        });
    }
}

project 카테고리의 다른 글

Categories:

Updated: