안녕하세요! 저는 데이터플랫폼셀에서 데이터 엔지니어로 일하고 있는 김민수입니다. 데이터플랫폼셀은 데브시스터즈의 모든 데이터가 흐르는 DevPlay Data Platform 을 만드는 팀입니다. 저희는 엔지니어링을 통하여 제품에서 발생하는 여러 가지 로그와 데이터가 잘 흐르도록 하는 일을 담당합니다. 이번 글에서는 데이터 분석가가 PySpark SQL을 잘 사용할 수 있도록 DevPlay 플랫폼 데이터 분석에 편의를 제공하는 라이브러리인 DevPlay Analytics를 만든 과정을 몇 편에 걸쳐서 소개하려고 합니다.
PySpark를 통한 빅데이터 분석
Apache Spark는 빅데이터를 위한 병렬 분산 처리 프레임워크 입니다. 그 중, Spark SQL 모듈을 사용하면 HDFS, AWS S3 등 여러 가지 데이터 소스에서 JSON, Parquet, ORC, JDBC 등의 포맷으로 데이터 입출력을 지원하며 SQL 인터페이스를 통해 데이터를 쿼리할 수 있습니다.
Spark는 기본적으로는 Scala API를 통하여 사용할 수 있는데요, PySpark라는 Python API도 제공하고 있어서 Python 환경에 익숙한 데이터 분석가분들도 어느 정도 편하게 사용할 수 있습니다.
DevPlay의 여러 가지 데이터는 Spark를 통하여 병렬 분산 처리를 하여 적재와 분석을 하고 있습니다. 그렇다면 데이터 분석가들이 어떻게 PySpark SQL을 통하여 데이터를 분석할 수 있을까요? 우선, 각 게임에서 발생하는 로그는 데이터 파이프라인을 통해 AWS S3에 최종적으로 적재되고 있습니다. 데이터 분석가는 PySpark를 통하여 이렇게 저장된 로그들(과 데이터)에 접근하여 쿼리를 해볼 수 있고, 그 결과를 다시 S3로 적재할 수 있습니다.
PySpark API를 활용 하여 데이터를 로드하고, 처리 후, 다시 저장하는 예시입니다.
from pyspark.sql import SparkSession, functions as F
# 1. PySpark SQL의 API는 SparSession을 entry point로 접근할 수 있습니다.
spark: SparkSession = SparkSession.builder.getOrCreate()
# 2. SparkSession을 통하여 AWS S3에 Parquet 포맷으로 저장된 로그를
# DataFrame으로 로드합니다.
economy_log: DataFrame = spark.read.parquet("s3a://logging-test-game/log/partition=economy")
# 3. DataFrame을 temp table로 등록해서 SQL로 쿼리합니다.
economy_log.registerTempTable("economy_log")
economy_table: DataFrame = spark.sql("""
SELECT mid, sum(properties.amountDelta) AS amount
FROM economy_log
GROUP BY mid
""")
# 3-1. 또는 DataFrame API를 통하여 쿼리할 수도 있습니다.
economy_table: DataFrame = (
economy_log
.groupBy("mid")
.agg(F.sum(F.col("properties.amountDelta")).alias("amount"))
.select("mid", "amount")
)
# 4. 결과물을 S3로 다시 저장합니다.
economy_table.write.parquet("s3a://analytics-test-game/dw/f_economy")
이 예시와 같은 PySpark를 사용한 데이터 분석은 주로 Jupyterhub을 통하여 진행하고 있습니다.(이것과 관련된 자세한 부분은 추후 Kubernetes 기반의 데이터 플랫폼과 관련된 글에서 다룰 예정입니다)
이렇게 진행된 분석 결과를 재사용할 필요가 있거나 매일 실행해서 저장할 필요가 있으면, 로그 적재 후 매일 분석 작업을 실행해야 합니다. 데이터 플랫폼에서는 Apache Airflow를 통하여 이런 분석 작업들의 워크플로우를 관리하고 스케줄링합니다.
문제 정의
점점 다양한 게임들이 DevPlay Data Platform을 사용하게 되고, 거의 모든 분석에서 PySpark를 사용하게 되면서 몇 가지 문제점을 발견할 수 있었습니다.
문제 1: 많은 경로를 일일히 입력해야 해서 불편하다
DevPlay 플랫폼의 게임 종류가 점점 다양해지고, 또한, 데이터 계층도 많아지면서 각각의 데이터 입출력 경로를 관리하기 어려워졌습니다. 위의 예시에서 s3a://logging-test-game/log/partition=economy
와 같은 형태로 데이터의 경로를 설정하는데, 데이터 분석가는 각 게임과 데이터의 계층에 따라 다른 이 수많은 경로를 기억하고 사용해야 합니다. 그리고 만약 이 데이터 경로의 형태를 migration 할 경우, 데이터 엔지니어는 모든 분석 파일들을 일일이 확인하여 기존 경로의 사용 여부를 확인해야 합니다. 특정 제품만 담당하는 데이터 엔지니어링 팀의 경우 이런 경로 복잡성에 대한 고민을 약간은 덜 수 있을 것 같은데요, 플랫폼에서는 많은 데이터 소비자에게 쉬운 방식으로 데이터 접근을 제공할 필요가 있어서 고민이 되었습니다.
문제 2: credential을 포함한 설정값들을 관리해야 한다
또한, 어떤 데이터에 접근하기 위해서는 다양한 설정값이 필요합니다. 이런 설정은 GitHub private repository에 그냥 적어도 되는 경우도 있지만, 절대 커밋을 하면 안 되고, 심지어는 슬랙과 같은 메시지를 통해서 전달하면 안 되는 경우도 있습니다.
문제 3: 분석 프로젝트가 점점 복잡해져서 노트북과 batch 환경이 달라진다
분석 프로젝트의 규모가 점점 커지면서 개발 효율성을 위해 공통 동작에 대해서는 유틸 함수를 정의하여 사용하게 되었습니다. 예를 들어, 아래 예시에서처럼 Spark DataFrame을 저장할 때 repartition
, saveMode
, partitionBy
등의 옵션을 지정해주어야 하는데, 매번 이런 옵션들을 지정하여 저장하는게 귀찮고 실수로 잘못 설정할 여지도 있습니다.
(economy_table
.repartition(4) # 적절한 개수로 파티션을 나눠줘야 이후 데이터 분석에서 효율적으로 사용
.write
.parquet(
"s3a://analytics-test-game/dw/f_economy",
mode="overwrite", # overwrite 모드로 저장하지 않으면 재적재 시 에러
partitionBy=["date"], # fact table의 경우 date 컬럼으로 파티션됨
)
)
그래서 이 문제를 해결하기 위해, Airflow 위에서 돌아가는 batch 프로젝트에서는 save_dataframe
와 같은 유틸 함수를 정의하여 사용하게 되었습니다.
def save_dataframe(
df: DataFrame,
s3a_address: str,
partition_by: List[str],
repartition: int = 4,
save_mode: str = "overwrite",
):
df \
.repartition(repartition) \
.write \
.parquet(
s3a_address,
mode=save_mode,
partitionBy=partition_by,
)
그런데 이런 Airflow batch 분석 코드에서 사용되는 유틸 함수가 점점 늘어나면서, 같은 유틸 함수를 참조할 수 없는 Jupyter Notebook 환경에서 불편함이 생겼습니다. 결과적으로 1. Jupyter 환경에서 코드를 작성하여 테스트한 후 → 2. 다시 Airflow 환경 버전으로 작성한 다음 → 3. batch 환경에서 테스트를 마쳐야 분석 코드를 batch 환경으로 배포할 수 있게 되면서, 이 사이클이 너무 길고 어려워져 많은 시간이 소요되었습니다. 따라서, 두 환경에서 최대한 같은 방법으로 데이터 분석을 가능케 할 방법이 필요해졌습니다.
문제 4: 애초에 PySpark이 분석가에게 친화적이 지 않다
처음 데브시스터즈에서 Spark을 도입했을 때에는 소프트웨어 엔지니어가 데이터 분석 업무를 같이 담당하고 있었습니다. 따라서 PySpark 의 reference 문서나 소스를 기반으로 분석 코드를 작성하는 게 어색하지 않았습니다. 하지만 점점 SQL 기반의 분석이 익숙하신 분석가분들이 많이 합류하시면서, PySpark를 통한 데이터 분석 방법을 개선시킬 필요가 있었습니다. 단적으로, 제가 이 블로그 글을 쓰기 위해서 PySpark API 문서를 조금 살펴봤는데요, 이 문서들은 소프트웨어 엔지니어에게 PySpark의 컨셉을 설명해주기 위한 문서들로 분석가가 레퍼런스 삼아 분석에 활용하기에는 친화적이지 않다는 생각이 강하게 들었습니다. 그렇기에, 데이터 분석가분들이 이런 척박한 환경에서 데이터 분석을 진행하면서 쌓은 노하우를 사내 Notion에 많이 정리해두기는 했으나, 여전히 러닝 커브가 컸습니다.
DevPlay Analytics 라이브러리 개발
데이터플랫폼셀은 이러한 문제를 해결하기 위해 DevPlay Analytics 라이브러리를 개발했습니다. 이 라이브러리를 통해 일관된 방법으로 데이터 입출력이 가능할 뿐만 아니라, 각종 설정을 데이터 분석 코드상에 남기지 않고 관리할 수 있습니다. 무엇보다도 DevPlay Analytics는 계층별 데이터를 테이블 형태로 추상화하여 제공함으로써, 데이터 분석가에게 가장 친숙한 인터페이스인 SQL를 통한 데이터 접근을 가능케 합니다. 풍부한 분석 예시를 곁들인 레퍼런스 문서 또한 데이터 분석의 편의성을 높여 줍니다.
개발 목표
1. 데이터 계층 추상화
각 계층별 데이터를 모듈로 접근하며 테이블에 쿼리하는 듯한 인터페이스 형태를 구상했습니다. 앞선 예시 코드에 서 데이터 분석가에게 필요한 데이터 API는 "log 계층
의 economy 테이블
을 불러와서 Data Warehouse 계층
의 f_economy 테이블
로 저장한다" 라고 할 수 있습니다. 분석 예시 사례를 편하게 작성할 수 있도록 DevPlay Analytics에서는 데이터 경로 대신 데이터 계층에서 테이블을 불러오는 형태로,
devplay_analytics.log.load("economy") # log 계층의 economy 테이블을 불러온다
devplay_analytics.dw.load("f_economy") # DW 계층의 f_economy 테이블을 불러온다
devplay_analytics.dw.save(df, "f_economy") # DataFrame을 DW 계층의 f_economy 테이블로 저장한다
위와 같은 형상으로 데이터를 접근하도록 하였습니다.
2. SQL 인터페이스
데이터 분석가에게 가장 익숙한 인터페이스인 SQL만으로도 데이터 로드를 할 수 있게 쿼리 인터페이스를 제공하며,
devplay_analytics.sql.query("""
SELECT mid, sum(properties.amountDelta) AS amount
FROM log_economy
GROUP BY mid
""").save(devplay_analytics.dw, "f_economy")
쿼리의 결과를 데이터 계층 모듈과 테이블 이름을 지정하여 저장할 수 있습니다.
3. 레퍼런스 문서 제공
라이브러리의 모든 API를 레퍼런스 문서로 제공하고, 주로 사용하는 분석 패턴들을 기반으로 API 사용 예시를 추가하여, 데이터 분석가분들이나 관련 지식이 있는 엔지니어라면 누구나 레퍼런스 문서만으로도 충분히 데이터 분석을 할 수 있도록 만들고자 하였습니다.
라이브러리의 구조
개발 목표 요구사항을 만족시키기 위하여, 라이브러리에서 제공하여야 하는 기능을 모듈로 정의하고, 각 모듈을 계층 단위로 나누어 설계하였습니다. 여기서 가장 중요한 계층은 데이터 계층인데요, 데이터 계층은 Log 모듈
, Data Warehouse 모듈
등 데이터를 테이블 형태로 조회, 저장하는 기능을 제공합니다. 각 계층은 아래 계층의 기능을 호출하는 방식으로 연동되게 구현하였습니다. 예를 들어, SQL 계층은 데이터 계층을 호출하여 테이블을 로드하는데요, 데이터 계층은 IO 계층을 호출하여 S3로 입출력합니다.
그리고 각 계층은 설정 계층을 호출하여 필요한 설정과 credential을 불러오고 있습니다. 이 설정 파 일은 Vault를 활용하여 credential을 각자 권한에 맞게 렌더링할 수 있는 형태입니다. 게임 종류와 환경에 대한 맥락은 devplay_analytics
라는 객체에 저장하고 있기에, 맥락에 따라 달라지는 설정을 설정 계층에서 불러올 수 있게 구현되어 있습니다.
from devplay_analytics import build
# test_game의 dev환경에 대한 devplay_analytics 객체를 객체 생성하고
devplay_analytics = build("test_game", "dev")
# 해당 환경의 f_economy라는 DW를 로드합니다.
devplay_analytics.dw.load("f_economy")
# 이때, 어떤 경로로부터 데이터를 불러올지는
# DW 모듈에서 적절한 설정을 설정 계층으로부터 읽어서 IO 계층을 호출합니다.
계층별 기능을 구현하면서 기존 분석 환경에서 여러 곳으로 흩어져있던 많은 유틸 기능을 각 역할에 맞게 배치하였습니다. 예를 들어, 데이터 계층에서는 공통적으로 데이터 저장 전에 적절한 파티션 개수를 계산하여 repartition하는 로직을 구현했습니다. 이외에도 분석에서 사용되는 유틸 함수들의 경우 devplay_analytics.util
에서 제공해주어 노트북 환경과 batch 환경 모두에서 사용할 수 있도록 하였습니다.
라이브러리 모듈 구조 최상단의 SQL 모듈에서는 데이터 계층 모듈을 테이블 형태로 SQL로 접근할 수 있게 하였습니다. SQL 모듈에서 데이터 계층 테이블을 참조하면 Spark SQL temp table로 등록한 후 쿼리를 실행합니다. 예를 들어, log_economy
라는 테이블로 쿼리하면 devplay_analytics.log
모듈에서 로드를 하며, dw_billing
이라는 테이블로 쿼리하면 devplay_analytics.dw
모듈에서 데이터를 로드합니다.
devplay_analytics.sql.query("""
SELECT mid, sum(properties.amountDelta) AS amount
FROM log_economy
GROUP BY mid
""").save(devplay_analytics.dw, "f_economy")
그리고 쿼리 결과를 데이터 경로를 따로 설정하지 않고, devplay_analytics.dw
모듈 과 "f_economy"
라는 테이블 이름만 지정하여 저장되도록 하였습니다.
레퍼런스 문서 제공
모든 API에서 docstring으로 함수 사용 방법에 대한 스펙과 예시 코드를 작성하고 Sphinx를 사용하여 레퍼런스 문서로 제공합니다. 또한, pytest doctest 모듈 을 사용해서 모든 docstring을 테스트 코드로도 활용하면서, 레퍼런스로 작성된 코드가 outdate되지 않고 실제로 작동하는 것을 보장합니다.
(DevPlay Analytics 라이브러리의 테스팅과 문서화에 대해서는 다음으로 이어지는 글에서 좀 더 자세히 설명하려고 합니다. 데이터 분석 라이브러리의 테스트 방법에 대해 궁금하신 분들은 다음 글도 꼭 읽어주세요!)
DevPlay Analytics로 작성한 분석 코드
앞선 예시의 분석 코드를 DevPlay Analytics 버전으로 작성하면 아래와 같습니다.
from devplay_analytics import build
from pyspark.sql import SparkSession, functions as F
# 1. DevPlay Analytics 객체를 생성합니다.
devplay_analytics = build("test_game", "prod")
# 2. Log 계층에서 economy 로그를 로드하여 가공한 후 테이블로 만듭니다.
economy_table = devplay_analytics.sql("""
SELECT mid, sum(properties.amountDelta) AS amount
FROM log_economy
GROUP BY mid
""")
# 2-1. 또는 Log 계층에서 economy 로그를 DataFrame으로 로드한 후 가공합니다.
economy_df = (
devplay_analytics.log.load("economy")
.groupBy("mid")
.agg(F.sum(F.col("properties.amountDelta")).alias("amount"))
.select("mid", "amount")
)
# 3. 쿼리 결과를 DW 계층의 economy 테이블로 저장합니다.
economy_table.save(devplay_analytics.dw, "f_economy")
# 3-1. DataFrame의 경우
devplay_analytics.dw.save(economy_df, "f_economy")
데이터 분석가는 어떤 경로에서 데이터를 읽어와서 저장해야 하는지를 신경 쓰지 않고, 각 데이터 계층에서 테이블을 로드/저장하는 방식으로 분석 코드를 작성할 수 있습니다. 그리고 이 분석 코드를 그대로 복사하여 Airflow batch 프로젝트에서 사용해도 동일한 동작을 보장합니다.
결과
DevPlay Analytics 라이브러리 1.0 버전을 릴리즈한 후, 거의 모든 DevPlay 분석 코드가 라이브러리를 통하여 작성되었습니다. 현재는 라이브러리를 통해 매일 약 200종이 넘는 테이블을 적재하고 있습니다.
저희는 데이터 분석가분들의 불편함을 덜어드리기 위하여 라이브러리 개발 목표 OKR을 위와 같이 설정하여 릴리즈 목표로 삼았습니다.
1.0 버전 릴리즈 이후 라이브러리의 주요 사용자인 데이터 분석가분들께서 여러 가지 피드백을 주셨는데요, 그중 장점으로는
- 바닥부터 data를 불러오지 않아도 되어 편리하다
- 라이브러리의 각 기능의 경계가 명확하다
- 문서를 통해서 대부분의 기능을 파악할 수 있다
등을 꼽아주셨습니다. 그래서 최초 설정한 OKR의 Key Result를 어느 정도 달성했다고 결론 내릴 수 있었습니다!!
하지만, 여전히 데이터 분석가분들이 이 라이브러리를 잘 활용하기에는 부족한 부분이 많았습니다. 특히,
- 문서만으로는 어떻게 분석을 할 수 있는지는 알 수가 없다
- 여전히 Python 문법을 익히지 않고는 작업이 어렵다 (실제로 대략 이렇게 실행하면 된다, 정도로 이해하고 사용하는 경우가 많다고 합니다)
등의 부분에서 여전히 많은 불편을 겪고 계셨습니다.
그렇기에, 저희는 여전히 DevPlay Analytics 라이브러리를 포함하여 여러가지 툴들을 활발히 개발하고 있습니다. 데이터플랫폼셀에서는 이런 데이터를 잘 활용하기 위한 도움을 주기 위해 재미있는 문제들을 엔지니어링으로 풀고 있습니다. 데이터가 잘 흐르게 하는 엔지니어링에 관심 있으신 분들은 저희 채용 페이지도 한번 꼭 확인 부탁드려요. :)
긴 글 읽어주셔서 감사합니다.