입수는 Datalake로! (feat. Iceberg)

조승완 · 토스 Data Engineer
2024년 10월 29일

안녕하세요. 오늘은 토스 데이터 플랫폼팀에서 데이터 효율성을 높이기 위해 도입한 ‘Iceberg’에 대해 이야기해 보려고 합니다. Iceberg에 대한 기본적인 정보는 다른 곳에서도 쉽게 찾아볼 수 있지만, 저는 특히 유지보수와 운영 측면에 집중해서 이야기하려 합니다.

최근 데이터의 양과 다양성이 급격히 증가하면서, 효율적인 데이터 파이프라인 구축의 중요성이 그 어느 때보다 커졌는데요. 특히, 실시간 데이터 조회와 수정, 운영 비용 절감, 스키마 진화의 간소화, 쿼리 성능 최적화와 같은 도전 과제들이 주요 이슈로 떠오르고 있습니다.

이러한 문제를 해결하기 위해, 토스 데이터 플랫폼 팀은 작년 하반기부터 올해 상반기까지 ‘Iceage 프로젝트’를 진행하며 DataLake에 Iceberg 포맷을 도입해 효율적인 데이터 파이프라인을 구축했습니다. 이번 글에서는 Iceberg 도입을 통해 얻은 경험을 바탕으로, 유지보수와 운영 과정에서의 실질적인 팁과 인사이트를 공유하려 합니다. 글에 공유된 모든 예시는 Spark(버전 3.5.2)와 Iceberg(버전 1.5.2) 기준으로 작성되었습니다.

Iceage 프로젝트의 목표

저희 팀의 최종 목표는 다양한 데이터 소스(Kafka, CDC 등)로부터 입수된 데이터를 Iceberg 포맷으로 관리하여, 실시간으로 데이터를 조회하고 수정할 수 있는 효율적인 데이터 파이프라인을 만드는 것입니다. 이를 통해 운영 비용을 절감하고, 데이터 처리 효율성을 높이며, 스키마 진화와 쿼리 성능을 최적화하고자 했습니다.

Iceage 프로젝트 목표

좀 더 구체적으로 말씀드리면:

  1. 준실시간 데이터 조회와 수정 지원: 입수 후 5분에서 15분 이내에 데이터를 조회하고 수정할 수 있는 구조를 만들어서 사용자에게 실시간 데이터를 제공하고자 했습니다.
  2. 효율적인 리소스 활용과 비용 절감: JSON으로 입수되는 데이터를 Parquet, Kudu로 적재하기 위한 비용을 절감하고, 리소스를 효율적으로 사용할 수 있는 파이프라인을 구축했습니다.
  3. 스키마 진화의 간소화: Iceberg의 스키마 에볼루션 기능을 활용해 스키마 변경 시 발생하는 복잡한 커뮤니케이션을 줄이고 리소스 사용을 최소화했습니다.
  4. 쿼리 성능 최적화: Iceberg의 히든 파티셔닝과 파티션 에볼루션을 통해 쿼리 성능을 최적화할 수 있었습니다.
  5. 운영 효율성 향상: Kafka Connect 기반의 입수 작업 메타데이터와 리니지 관리를 통해 운영 효율성을 크게 향상시켰습니다.
  6. 데이터 일관성 및 무결성 유지: Iceberg의 트랜잭션 지원 덕분에 데이터의 일관성과 무결성을 유지할 수 있었습니다.
  7. Iceberg 메타데이터를 이용한 테이블 모니터링 시스템 구축: Iceberg의 메타데이터를 활용해 테이블의 상태를 실시간으로 모니터링하고, 문제 발생 시 빠르게 대응할 수 있는 시스템을 구축했습니다.

문제 정의

기존 데이터 파이프라인에는 어떤 문제가 있었고, 왜 위와 같은 목표를 세우게 됐는지 살펴보겠습니다.

문제 #1: Kafka 데이터 처리와 CDC 입수의 문제점

기존 데이터 처리에서는 Kafka와 CDC 데이터 입수 과정에서 여러 가지 문제가 있었습니다.

  1. Kafka 데이터 처리 문제

    Kafka 데이터를 JSON 형식으로 입수하고 Parquet 형식으로 변환하는 과정에서 고정된 리소스를 사용해야 했습니다. 데이터가 증가하면 리소스도 끊임없이 늘어나야 했기 때문에 비용 효율적이지 않았습니다.

    데이터를 실시간으로 제공하기보다는 한 시간 단위로 모아 배치 처리하는 방식이었기 때문에 사용자 입장에서는 최신 데이터를 빠르게 얻기가 어려웠습니다.

  2. CDC 데이터를 Kudu로 받는 문제

    CDC 데이터를 실시간으로 처리하는 데 Kudu를 사용했는데, 큰 테이블의 경우 Kudu에 데이터를 적재하는 과정에서 예상보다 많은 지연이 발생했습니다. 이런 지연은 특히 실시간 데이터 제공의 품질에 악영향을 미쳤죠.

    Kudu에서는 파티션이나 스키마를 변경할 때마다 추가적인 커뮤니케이션과 작업이 필요해, 그로 인해 많은 리소스와 시간이 소모되었습니다.

문제 #2: 테이블 작업과 파티션 운영의 문제점

기존 데이터 환경에서 테이블 작업을 진행하다가 데이터를 읽을 수 없는 경우가 발생하거나, 파티션 관리에 어려움이 많았습니다. Kudu와 Parquet를 사용할 때 각기 다른 파티션 관리의 제약이 있었고, 이러한 문제들로 인해 쿼리 성능을 최적화하기 어려웠습니다.

특히, Parquet는 특정 파티션 필드 값 때문에 데이터 적재에 제약이 있었고, 파티셔닝을 통한 효율적인 데이터 관리가 쉽지 않았습니다. 파티셔닝이 잘못되면 데이터가 누락되거나, 쿼리 성능을 개선하기 위한 파티션 필터링이 적용되지 않아 많은 비용이 발생할 수 있었습니다.

문제 #3: 운영 자동화의 필요성

기존에는 수많은 데이터 입수 파이프라인을 수동으로 관리해야 했기 때문에, 관리에 많은 시간이 소요되었고 운영 부담도 상당히 컸습니다. 각 파이프라인을 개별적으로 모니터링하고 조정해야 했기에 비효율적이었으며, 그로 인해 데이터의 일관성을 유지하기도 어려웠습니다. 이러한 상황에서는 실시간 데이터 제공의 품질이 저하될 수밖에 없었으며, 문제를 해결하기 위해 데이터 파이프라인의 자동화가 필수적이었습니다.

Iceberg란 무엇인가요?

Iceberg의 구조 (출처: https://iceberg.apache.org/spec/#overview)

Iceberg는 오픈 테이블 포맷으로, 데이터를 효율적으로 관리하고 실시간으로 쿼리할 수 있도록 돕는 도구입니다. Iceberg의 설계 핵심은 데이터와 메타데이터를 분리하여 저장하는 구조입니다. 이를 통해 데이터의 파티셔닝, 스키마 진화, 트랜잭션 처리 등을 유연하게 관리할 수 있게 되었죠.

기존의 데이터 레이크 솔루션들은 Parquet와 같은 파일 포맷에서 다양한 파티셔닝 문제를 안고 있었고, 데이터 스키마가 변경될 때마다 많은 수정이 필요해 비효율적이었습니다. Iceberg는 이러한 문제를 해결하고자 Netflix에서 설계한 오픈소스 프로젝트로, 대규모 데이터 환경에서도 효율적으로 운영될 수 있도록 개발되었습니다.

Iceberg의 가장 큰 장점은 스키마 진화와 파티션 관리의 유연성입니다. 데이터를 새로운 형태로 쉽게 변경할 수 있으며, 파티셔닝 전략을 자유롭게 수정할 수 있어 쿼리 성능을 크게 개선할 수 있습니다. 또한, ACID 트랜잭션을 지원하여 데이터의 일관성과 무결성을 보장합니다. Iceberg의 구조적 설계 덕분에 데이터가 변경되더라도 메타데이터가 이를 추적하고 관리하기 때문에 안정성과 성능을 동시에 유지할 수 있습니다.

이러한 유연한 설계는 Iceberg를 실시간 데이터 처리복잡한 데이터 환경에서 효과적으로 사용할 수 있게 해주며, Netflix를 비롯한 여러 대규모 데이터 시스템에서 널리 사용되고 있습니다.

Iceberg의 장점

  1. 준실시간 데이터 제공: Iceberg 포맷의 효율성 덕분에 데이터 입수 후 5분에서 15분 이내에 실시간으로 데이터를 조회할 수 있게 되었어요.
  2. 비용 효율성: 기존(Parquet, Kudu) 에 비해 유지, 운영 비용을 줄일 수 있었고, 리소스도 더 효율적으로 사용할 수 있었습니다.
  3. 스키마 진화 간소화: Iceberg의 스키마 에볼루션 기능을 통해 스키마 변경에 따르는 복잡한 과정을 간소화할 수 있었죠.
  4. 처리 효율성 향상: Iceberg의 히든 파티셔닝과 파티션 에볼루션을 지원하여 데이터를 더 효율적으로 관리할 수 있었습니다.
  5. 쿼리 성능 최적화: 더 나은 통계 관리와 파일 나열 오버헤드를 줄여 쿼리 성능을 크게 개선할 수 있었습니다.

입수 자동화

데이터 파이프라인 운영에서 자동화는 필수적입니다. Netflix의 사례에서 알 수 있듯, 복잡한 데이터 매쉬 환경에서도 SQL 기반으로 스트리밍 데이터를 관리하고 자동화를 통해 운영의 복잡성을 줄이고 있습니다. 저희도 이를 참고하여 Kafka Connect 기반의 자동화된 입수 작업을 구축해 데이터 파이프라인의 운영 효율성을 높였습니다.

자동화된 입수파이프라인 요청예시

저희의 자동화된 입수 시스템은 메타데이터와 리니지 관리를 통해 실시간 모니터링이 가능하도록 설계되었습니다. 문제가 발생하면 빠르게 대응할 수 있으며, 사내 메신저 알림을 통해 운영 이슈에도 즉각적으로 대처할 수 있는 시스템을 구축해 운영 성숙도를 크게 향상시켰습니다. 이러한 자동화 덕분에 사람의 개입을 최소화하면서도 데이터 파이프라인의 확장성을 크게 높일 수 있었습니다.

DataLake로 입수되는 Iceberg 모니터링

사내 메신저로 즉각적인 대응
모니터링하고 있는 모든 지표

이 자동화 도구 덕분에 약 천여 개의 입수 파이프라인을 단 세 명이 관리할 수 있게 되었습니다. 운영 인력이 적더라도 실시간 데이터 제공의 품질을 유지하며 안정적으로 데이터를 관리할 수 있었던 것도 이러한 자동화 시스템 덕분입니다.

자동화 시스템은 위에 보이는 이미지와 같이 다양한 지표를 모니터링하고 있는데요. 각 지표가 뭔지, 그리고 왜 필요한지 간략히 설명드리겠습니다.

  • manifest_count: 테이블에 포함된 매니페스트 파일의 개수입니다. 매니페스트가 많을수록 메타데이터 크기가 증가할 수 있습니다.
  • total_manifest_size_mb: 모든 매니페스트 파일의 총 크기(메가바이트)입니다. 메타데이터의 크기와 관련된 중요한 지표입니다.
  • total_manifest_entries: 매니페스트 파일에 포함된 총 엔트리 수로, 테이블 내 데이터 파일에 대한 정보를 나타냅니다.
  • snapshot_interval_ms: 스냅샷 간의 시간 간격(밀리초)입니다. 스냅샷 생성 주기를 모니터링하여 너무 짧거나 길지 않도록 관리합니다.
  • snapshot_size_mb: 최근 생성된 스냅샷의 크기(메가바이트)입니다. 스냅샷 크기가 지나치게 클 경우 성능 저하로 이어질 수 있습니다.
  • total_data_size_mb: 테이블에 저장된 전체 데이터의 크기(메가바이트)입니다. 테이블의 규모를 파악하는 데 사용됩니다.
  • total_data_files: 테이블에 포함된 데이터 파일의 총 개수입니다.
  • total_deleted_files: 테이블에서 삭제된 파일의 총 개수입니다. 불필요한 삭제 파일이 쌓이지 않도록 모니터링이 필요합니다.
  • latency_ms: 데이터 입수 및 처리의 평균 지연 시간(밀리초)입니다. 성능 저하를 방지하기 위해 지연 시간을 모니터링합니다.
  • avg_data_files_per_snapshot: 스냅샷당 평균 데이터 파일 개수입니다. 스냅샷에 포함된 데이터 파일의 개수를 파악하여 효율적인 관리를 도울 수 있습니다.
  • avg_deleted_files_per_snapshot: 스냅샷당 평균 삭제 파일 개수입니다. 삭제 파일이 너무 많을 경우 읽기성능 저하가 발생할 수 있어 유지보수 작업이 필요할 수 있습니다.
  • table_size_mb: 테이블의 총 크기(메가바이트)입니다. 테이블의 전체 규모를 나타내며, 크기 관리에 유용합니다.
  • partition_count: 테이블에 포함된 파티션의 총 개수입니다. 파티션 수는 쿼리 성능 및 데이터 관리에 중요한 영향을 미칩니다.
  • avg_partition_size_mb: 파티션당 평균 크기(메가바이트)입니다. 파티션 크기가 작을 경우 파티션 제거를 고려할 수 있습니다.
  • max_partition_size_mb: 가장 큰 파티션의 크기(메가바이트)입니다. 이 값이 128MB또는 256MB이하로 유지된다면, 파티션을 제거하는 것도 고려할 수 있습니다.
  • avg_partition_record_count: 파티션당 평균 레코드 수입니다. 파티션 내 레코드의 균형을 유지하여 성능 최적화를 도울 수 있습니다.
  • latest_partition_name: 최근 생성된 파티션의 이름입니다. 파티션 생성 주기와 관련된 정보를 제공하여 데이터 갱신 상태를 확인할 수 있습니다.
  • row_count: 테이블에 저장된 전체 레코드 수입니다. 데이터의 전체 규모를 파악하는 데 유용합니다.
  • position_delete_file_count: 위치 삭제 파일의 총 개수입니다. 데이터 정리 및 삭제 관리에 중요한 역할을 합니다.
  • position_delete_total_size_mb: 위치 삭제 파일의 총 크기(메가바이트)입니다. 삭제 파일이 불필요하게 커지지 않도록 관리해야 합니다.
  • equality_delete_file_count: 동일성 조건으로 삭제된 파일의 총 개수입니다. 데이터 유지보수와 정리에 필요한 정보를 제공합니다.
  • equality_delete_total_size_mb: 동일성 조건으로 삭제된 파일의 총 크기(메가바이트)입니다. 삭제 파일의 크기를 모니터링하여 불필요한 공간 사용을 줄입니다.

Iceberg 유지보수 및 운영 팁

Iceberg를 유지하고 최적화하는 작업은 데이터 엔지니어에게 중요한 과제입니다. Iceberg의 메타데이터는 읽기 성능에 큰 영향을 미치기 때문에 주기적으로 정리하고 최적화해야 합니다. 이를 위해 저희는 모니터링과 자동화된 Slack 알림 시스템을 구축해, 메타데이터 상태를 실시간으로 점검하고 필요시 바로 대응할 수 있도록 했습니다.

자동화된 모니터링 및 알림 시스템

저희 팀은 Iceberg 테이블의 읽기 성능 저하를 방지하기 위해 특정 지표가 일정 임계값을 초과할 경우 Slack 알림을 통해 즉각적으로 대응할 수 있도록 설정했습니다. 모니터링하는 주요 지표는 다음과 같습니다:

  • Iceberg 테이블에 불필요한 파티션이 설정되어 있어 스몰 파일이 다수 존재하는지
  • 파티션 설정이 추가로 필요한 테이블이 있는지
  • 유지보수 작업으로 Position/Equality Delete 파일이 제대로 정리되지 않고 있는지

이와 같은 알림을 설정하기 위해 DataLake에 있는 모든 Iceberg 테이블의 메타데이터를 주기적으로 파싱하고, 이를 바탕으로 지표를 수집하여 관리하고 있습니다. 필요한 메트릭은 아래의 쿼리를 통해 추출할 수 있습니다.

snapshot_df = spark.sql(f"SELECT * FROM spark_catalog.{table}.snapshots")
manifest_df = spark.sql(f"SELECT * FROM spark_catalog.{table}.manifests")
data_files_df = spark.sql(f"SELECT * FROM spark_catalog.{table}.files")
partitions_df = spark.sql(f"SELECT * FROM spark_catalog.{table}.partitions")
row_count_df = spark.sql(f"select count(1) as row_count from spark_catalog.{table}")
delete_files_df = spark.sql(f"SELECT * FROM spark_catalog.{table}.delete_files")
Iceberg 테이블 모니터링 알람

메타데이터 관리

Iceberg는 데이터 변경 사항을 스냅샷으로 관리하는데, 시간이 지남에 따라 스냅샷이 쌓여 메타데이터가 커지고, 이로 인해 성능이 저하될 수 있습니다. 이를 방지하기 위해 저희 팀은 정기적인 메타데이터 관리와 최적화 작업을 진행하고 있습니다.

  • 스냅샷 정리 및 메타데이터 최적화: 주기적으로 expire_snapshots 명령을 사용하여 오래된 스냅샷을 제거하고, rewrite_manifests를 통해 메타데이터를 최적화합니다. 이를 통해 메타데이터 크기를 줄여 쿼리 성능을 유지하고, 전체적인 운영 효율성을 높일 수 있습니다. 다만, 작업 중 간혹 org.apache.iceberg.exceptions.NotFoundException: File does not exist Avro와 같은 오류가 발생할 수 있는데, 이는 특정 스냅샷 파일이 사라졌을 때 생기는 문제입니다. 이를 방지하기 위해 저희 팀은 최근 2개의 스냅샷을 유지한 상태에서 오래된 스냅샷을 제거하는 방식으로 관리하고 있습니다.
    # 오래된 스냅샷 제거 마지막 개의 스냅샷 유지
    spark.sql(f"CALL system.expire_snapshots(table => '{table}', older_than => TIMESTAMP '{older_then_now}', retain_last => 2)")
    options_map = "map('target-file-size-bytes', '241658240', 'max-file-size-bytes', '536870912', 'partial-progress.enabled', 'true')"
    spark.sql(f"CALL system.rewrite_data_files(table => '{table}', where => '`{partition_column}`>=\"{partition_day}\"', options => {options_map})")
  • 정기적인 데이터 정리 작업: 데이터 파이프라인의 안정성과 성능을 유지하려면 정기적인 데이터 정리가 필수적입니다. 저희는 remove_orphan_files, rewrite_data_files 같은 명령을 자동화하여 스케줄링하고, Airflow와 같은 워크플로우 관리 도구를 활용해 입수 작업 후 정리 작업이 자동으로 이루어지도록 설정했습니다.
    • 특정 파티션 정리: 큰 테이블의 경우, 전체 데이터를 재정리하는 대신 특정 파티션만 rewrite_data_files 명령을 사용해 유지보수 시간을 단축할 수 있었습니다. 예를 들어, 특정 날짜 이후의 데이터만 정리할 수 있습니다.
    • remove_orphan_files 주의사항: Iceberg 파일이 현재 쓰기 작업 중이라면, 그 파일은 orphan 상태로 인식될 수 있습니다. 이때 remove_orphan_files 명령을 실행하면 쓰기 작업이 실패할 수 있으며, 이는 메타데이터 손상으로 이어져 테이블을 읽지 못하게 됩니다.
      spakr.sql(f"CALL system.remove_orphan_files(table => '{table}')"
    • CDC 테이블 유지보수: CDC 테이블처럼 delete 파일이 발생하는 경우, 유지보수 작업을 진행할 때 입수 작업을 잠시 중단하는 것이 안전합니다. 데이터 쓰기 작업과 rewrite 작업이 동시에 이루어지면 메타데이터가 손상될 위험이 있기 때문입니다. 또한, delete 파일을 효율적으로 정리하기 위해 delete-file-threshold 옵션을 0으로 설정하여 불필요한 파일을 제거할 수 있습니다.
      spark.sql(f"CALL system.rewrite_data_files(table => '{table}', options => map('target-file-size-bytes', '251658240', 'delete-file-threshold', '0'))")

효율적인 데이터 쓰기

Iceberg에서 대규모 테이블에 파티션을 추가할 때, 데이터를 효율적으로 읽기 위해서는 Iceberg의 write.distribution-mode 설정이 매우 중요합니다. 이 설정을 통해 데이터 저장 방식을 최적화할 수 있으며, 몇 가지 주의할 점이 있습니다.

데이터 분산 저장: 데이터를 단순히 순차적으로 저장하면 특정 파티션이나 파일에 데이터가 집중될 수 있습니다. 이 경우 특정 파티션을 집중적으로 읽을 때 I/O 부하가 특정 노드에 몰리면서 성능이 저하될 수 있습니다. write.distribution-mode를 사용하면 데이터를 고르게 분산해 저장할 수 있어, 여러 노드에 걸쳐 데이터를 병렬로 처리함으로써 읽기 성능을 향상시킬 수 있습니다. Iceberg에서는 다음 세 가지 분포 옵션을 제공합니다:

  • none: 기본 설정으로, Spark에서 셔플링이나 정렬을 수행하지 않는 방식입니다.
  • range: 지정된 컬럼의 값 범위에 따라 데이터를 분배합니다. 예를 들어, 날짜 컬럼을 기준으로 설정하면 날짜별로 데이터를 분산시켜 저장하므로 특정 날짜 범위의 데이터를 빠르게 로드할 수 있습니다.
  • hash: 해시 함수를 사용해 데이터를 분산시킵니다. 특정 컬럼을 기준으로 해싱하여 여러 파일이나 파티션에 데이터를 균등하게 분배함으로써 I/O 병목 현상을 줄일 수 있습니다.

  • 적절한 옵션 선택: 작은 사이즈의 테이블에서는 hash 옵션을 사용하면 읽기 작업이 빠를 수 있지만, 사이즈가 매우 큰 테이블에서는 hash로 설정할 경우 오히려 읽기 작업이 느려질 수 있습니다. 이때 range 설정을 사용하면 성능 저하를 완화할 수 있습니다. 또한, locally 설정은 셔플링 없이 데이터를 쓸 수 있지만, 이로 인해 작은 파일들이 많이 생성될 수 있어 상황에 맞게 조정이 필요합니다.
환경에 따라 각 설정의 효과가 다를 수 있으므로, 사용 환경에 맞는 최적의 값을 찾기 위해 테스트를 거쳐 설정하는 것이 중요합니다.
-- 분포 모드 설정 예시

-- 'write.distribution-mode'='none'
ALTER TABLE {table} WRITE UNORDERED;

-- 'write.distribution-mode'='none'
ALTER TABLE {table} WRITE LOCALLY ORDERED BY `{partition_col}`;

-- 'write.distribution-mode'='range'
ALTER TABLE {table} WRITE ORDERED BY `{partition_col}`;

-- 'write.distribution-mode'='hash' (셔플링 없이 로컬 정렬)
ALTER TABLE {table} WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY `{partition_col}`;

-- 'write.distribution-mode'='hash' (데이터 분산)
ALTER TABLE {table} WRITE DISTRIBUTED BY PARTITION;

Partition Pruning 문제 해결

Iceberg에서 파티션 프루닝이 잘 작동하지 않는 경우, 이는 파티션 컬럼의 타입과 쿼리에서 사용한 조건의 타입이 일치하지 않는 데서 비롯될 수 있습니다. 예를 들어, 파티션 컬럼이 String 타입인데 Date 타입으로 쿼리를 작성하면 프루닝이 제대로 이루어지지 않아 성능이 저하될 수 있습니다.

  • 타입 일치의 중요성: 아래는 파티션 컬럼이 String 타입일 때 Date 타입으로 쿼리한 예시입니다. 이 경우, 전체 데이터 파일 크기가 124.12TB로 불필요하게 많은 데이터를 읽어야 합니다.
    -- 파티션 컬럼이 String이지만, Date 타입으로 쿼리한 경우
    EXPLAIN SELECT partition_day, COUNT(1) AS cnt
    FROM {table} WHERE partition_day >= DATE_SUB(CURRENT_DATE(), 3)
    GROUP BY partition_day;
    
    -- 결과
    ...생략
    | 00:SCAN HDFS [{table}]                                         |
    |    HDFS partitions=1/1 files=655173 size=124.12TB                                  |
    |    predicates: partition_day >= DATE '2024-06-30'                                  |
    |    row-size=12B cardinality=74.23G
  • 올바른 타입 사용: 파티션 컬럼과 쿼리 조건의 타입을 일치시킴으로써 성능을 개선할 수 있습니다. String으로 캐스팅하여 쿼리한 경우, 읽어야 할 데이터의 크기가 3.7TB로 줄어듭니다.
    -- 파티션 컬럼 타입과 맞추어 String으로 캐스팅한 경우
    EXPLAIN SELECT partition_day, COUNT(1) AS cnt
    FROM {table} WHERE partition_day >= CAST(DATE_SUB(CURRENT_DATE(), 3) AS STRING)
    GROUP BY partition_day;
    
    -- 결과
    ...생략
    | 00:SCAN HDFS [{table}]                                         |
    |    HDFS partitions=1/1 files=19054 size=3.70TB                                     |
    |    skipped Iceberg predicates: partition_day >= '2024-06-30'                       |
    |    row-size=12B cardinality=21.57G

이러한 방식으로 write.distribution-mode설정과 파티션 프루닝을 적절히 활용하면, Iceberg 테이블의 쓰기 및 읽기 성능을 최적화할 수 있습니다. 테이블 크기와 데이터 특성에 맞는 설정을 통해 성능 저하를 방지하고, 효율적인 데이터 운영을 이끌어낼 수 있습니다.

테이블 복구 및 재구축

Iceberg 테이블을 운영하다 보면, 여러 가지 이유로 테이블을 복구하거나 다시 설정해야 하는 상황이 발생할 수 있습니다. 다음은 테이블 복구 및 재구축을 위해 사용되는 주요 방법들입니다.

  • 카탈로그에서 테이블이 제거된 경우: 데이터와 메타데이터 파일이 정상적으로 존재하지만, 카탈로그에서 테이블이 삭제된 경우 register_table 명령을 사용해 테이블을 다시 등록할 수 있습니다. 이 방법을 통해 기존 데이터 구조를 유지하면서 빠르게 테이블을 복구할 수 있습니다. 더 자세한 내용은 공식 문서에서 확인해주세요.
    # 기존 메타데이터 파일을 사용해 테이블을 등록
    spark.sql("CALL system.register_table(table => 'db.sample', metadata_file => 'hdfs://{metadata_path}/metadata.json')")
  • 데이터 파일만 존재하는 경우: 테이블의 데이터 파일은 남아 있지만, 메타데이터가 손실된 경우 add_files 명령을 사용해 데이터 파일을 새롭게 Iceberg 테이블로 등록할 수 있습니다. 이를 통해 Parquet 등 다른 포맷으로 저장된 데이터를 손쉽게 Iceberg 포맷으로 전환할 수 있습니다. 더 자세한 내용은 공식문서를 참고해주세요.
    # 데이터 파일을 Iceberg 테이블로 추가
    spark.sql("CALL system.add_files(table => 'db.sample', source_table => 'parquet.`hdfs://{path}/data`')")
  • 기존 테이블을 Iceberg로 변환: 다른 포맷으로 저장된 기존 테이블을 Iceberg 테이블로 변경하고 싶을 때는 migrate 명령을 사용할 수 있습니다. 이 명령을 통해 기존 데이터를 유지하면서 Iceberg의 장점을 활용할 수 있습니다. 더 자세한 내용은 공식문서에서 확인해주세요.
    # 기존 테이블을 Iceberg 포맷으로 마이그레이션
    spark.sql("CALL catalog_name.system.migrate('db.sample')")

이러한 방법들은 운영 환경에서 발생할 수 있는 다양한 상황에 유연하게 대응할 수 있도록 도와줍니다. Iceberg의 복구 및 재구축 기능을 통해 데이터 손실 없이 테이블을 관리하고, 운영 중 발생할 수 있는 문제들을 효과적으로 해결할 수 있습니다.

또한, 저희 팀은 데이터 입수 작업을 Kafka Connect 기반으로 설정 파일을 통해 관리하면서 메타데이터와 데이터 리니지를 체계적으로 관리했습니다. 이를 통해 스키마나 파티션 변경 시 발생할 수 있는 문제들을 보다 쉽게 해결할 수 있었고, 운영 효율성도 높일 수 있었습니다. 특히, 데이터 파이프라인에서 발생하는 다양한 변경 사항에 유연하게 대응할 수 있어 테이블 복구 및 재구축 작업과 연계하여 안정적인 데이터 운영 환경을 유지할 수 있었습니다.

이와 같은 Iceberg의 복구 및 관리 기능을 통해, 데이터 손실 없이 테이블을 유지하고, 스키마 변경과 같은 변화에도 유연하게 대처할 수 있었습니다.

Kafka로 입수된 데이터의 리니지와 영향도

앞으로의 계획

저희 팀은 지금까지 DataOps 도구를 고도화하여, 최소한의 인력으로 수많은 파이프라인을 효율적으로 운영할 수 있는 구조를 구축해 왔습니다. 이제 한 걸음 더 나아가, 데이터 메시 구현을 위한 셀프서비스 플랫폼과 데이터 게이트웨이를 개발하는 데 집중하려 합니다.

우선, 셀프서비스 플랫폼을 SaaS 형태로 제공하여, 데이터 팀뿐만 아니라 다양한 사용자가 손쉽게 데이터 파이프라인을 구성하고 데이터를 자유롭게 활용할 수 있는 환경을 만들고자 합니다. 이 플랫폼은 각 도메인 팀이 독립적으로 데이터를 생성하고 관리할 수 있도록 지원하며, 데이터 입수, 처리, 저장을 손쉽게 관리할 수 있는 도구를 제공합니다. 이를 통해 중앙 집중식 관리의 부담을 줄이고, 각 도메인이 필요한 데이터를 직접 책임지고 운영할 수 있도록 돕고자 합니다.

동시에, 데이터를 필요로 하는 팀들이 손쉽게 접근하고 활용할 수 있도록 데이터 게이트웨이도 개발 중입니다. 데이터 게이트웨이는 각 도메인에서 생성된 데이터가 조직 전체에 걸쳐 효율적으로 공유되고 사용될 수 있게 하여, 데이터 활용의 효율성을 극대화합니다. 이를 통해 조직 내에서 필요한 데이터가 언제든지 쉽게 접근 가능하도록 하여 데이터의 가치를 높일 계획입니다.

저희의 최종 목표는 데이터 접근성과 사용성을 더욱 높이는 것이며, 이를 통해 데이터의 가치를 극대화하고자 합니다. 데이터 메시의 철학을 바탕으로, 각 도메인이 주도적으로 데이터를 생성하고 관리하며, 필요에 따라 자유롭게 데이터를 활용할 수 있는 유연하고 효율적인 데이터 환경을 만들고자 합니다.

마무리하며

Iceberg를 도입하면서 기존의 다양한 문제들을 해결하고, 데이터 파이프라인의 효율성을 크게 개선할 수 있었습니다. 실시간 데이터 제공, 비용 절감, 쿼리 성능 최적화 등 여러 이점을 통해 데이터 인프라의 안정성과 성능을 동시에 확보할 수 있었습니다.

저희의 경험이 데이터 파이프라인 최적화와 데이터 레이크 구축에 관심 있는 분들께 도움이 되기를 바랍니다. Iceberg를 활용하여 복잡한 데이터 환경을 더 효율적이고 유연하게 관리해 보세요.

댓글 0댓글 관련 문의: toss-tech@toss.im
연관 콘텐츠
㈜비바리퍼블리카 Copyright © Viva Republica, Inc. All Rights Reserved.