https://park-dev-diary.tistory.com/26
Eventsim으로 생성한 로그데이터로 SuperSet으로 시각화하기(2)
https://park-dev-diary.tistory.com/25 Eventsim으로 생성한 로그데이터로 SuperSet으로 시각화하기(1) https://park-dev-diary.tistory.com/21 Eventsim 빌드 및 실행하기 이 모든 과정은 MacOS 기준을 작성되었습니다! 1. JAVA
park-dev-diary.tistory.com
그 이전에 Pandas로 진행하다가 안되었던 이유는 간단했다.
df = pd.read_json(file_path, lines=True)
를 주석처리하지 않고 그대로 작업을 해서 결국엔 chunk, ijson 둘다 실행하기 이전에 큰 데이터를 읽게 되어
터져버리게 되었던 것이였다.
주석처리하고 실행하니, Pandas로도 Insert 작업엔 크게 무리가 없었다.
그리고 그렇게 넣은 데이터들을 aggreate 하는데
Pandas, Spark 두 라이브러리를 활용하여 진행했는데
Spark 코드는 이렇다.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, FloatType
from pyspark.sql.functions import col, from_unixtime, to_date, count
from pyspark.sql import functions as F
# Spark 세션 초기화
spark = SparkSession.builder \
.appName("JsonToMySQLDailyAggregate") \
.config("spark.jars", "/Users/parkjeongmin/workspace/event-project/mysql-connector-java-8.0.28.jar") \
.config("spark.driver.extraClassPath", "/Users/parkjeongmin/workspace/event-project/mysql-connector-java-8.0.28.jar") \
.getOrCreate()
# 스키마 정의
schema = StructType([
StructField("ts", LongType(), True),
StructField("userId", StringType(), True),
StructField("sessionId", IntegerType(), True),
StructField("page", StringType(), True),
StructField("auth", StringType(), True),
StructField("method", StringType(), True),
StructField("status", IntegerType(), True),
StructField("level", StringType(), True),
StructField("itemInSession", IntegerType(), True),
StructField("location", StringType(), True),
StructField("userAgent", StringType(), True),
StructField("lastName", StringType(), True),
StructField("firstName", StringType(), True),
StructField("registration", LongType(), True),
StructField("gender", StringType(), True),
StructField("tag", StringType(), True),
StructField("artist", StringType(), True),
StructField("song", StringType(), True),
StructField("length", FloatType(), True)
])
# JSON 파일 읽기
df = spark.read.json("event.data.json", schema=schema)
# 타임스탬프를 날짜로 변환
df = df.withColumn("date", to_date(from_unixtime(col("ts") / 1000)))
# 일 단위로 집계
daily_agg_df = df.groupBy("date", "level", "location", "gender").agg(
F.countDistinct("userId").alias("unique_user_count"), # 고유한 사용자 수
F.countDistinct("sessionId").alias("unique_session_count"), # 고유한 세션 수
F.count("song").alias("total_song_plays"), # 총 노래 재생 횟수
F.sum("length").alias("total_play_time") # 총 재생 시간
)
# 데이터 MySQL에 저장
daily_agg_df.write \
.format("jdbc") \
.option("url", "jdbc:mysql://localhost/mydatabase") \
.option("dbtable", "user_daily_song_count") \
.option("user", "root") \
.option("password", "my-secret-pw") \
.mode("append") \
.save()
# Spark 세션 종료
spark.stop()
Spark로 실행한 코드는 매우 정상적으로 잘 돌아갔다.
그리고 문제의 Pandas.
앞서 Pandas로 데이터들을 넣으려고 할 때, 읽는 데이터가 너무 큰 데이터라 터져버렸기에
Chunksize를 줘서 조각내서 데이터를 읽고 agg 작업을 진행하였는데
대략적인 코드는 이러하다.
aggregated_data = pd.DataFrame()
# JSON 파일을 청크 단위로 읽고 집계
for chunk in pd.read_json(file_path, lines=True, chunksize=chunk_size):
chunk['date'] = pd.to_datetime(chunk['ts'], unit='ms').dt.date
aggregated_chunk = chunk.groupby(['date', 'level', 'location', 'gender']).agg({
'userId': pd.Series.nunique,
'sessionId': pd.Series.nunique,
'song': 'count',
'length': 'sum'
}).rename(columns={
'userId': 'unique_user_count',
'sessionId': 'unique_session_count',
'song': 'total_song_plays',
'length': 'total_play_time'
}).reset_index()
# aggregated_data.append(aggregated_chunk)
aggregated_data = pd.concat([aggregated_data, aggregated_chunk], ignore_index=True)
# 모든 청크의 집계 결과를 하나의 DataFrame으로 합침
# final_aggregated_df = pd.concat(aggregated_data)
# 데이터베이스에 저장
try:
aggregated_data.to_sql(con=database_connection, name='user_daily_song_count2', if_exists='append', index=False)
except Exception as e:
print(f"Error occurred: {e}")
위 코드의 문제점은 각 청크데이터로만 agg을 시도하였고, 결국엔 조각조각 합계를 시도한 후에,
마지막에 합쳐버려서 종합적으로는 조각된 합계들을 뭉쳐서 저장하게 되었던 것이였다.
만일 정상적으로 잘 집계가 되면, 전체적인 데이터 수는 30만개 정도로 나오게 되는데
위 Pandas 코드를 실행하게 되면, 300만개 정도의 데이터가 Insert 되는 현상이 발생한다.
이 부분을 어떻게 집계해야하는지, Chunk된 조각데이터들을 합치기 위해서는 새로운 DataFrame를 만들고
Chunk데이터를 지속적으로 UpSert 하는 느낌으로 진행되어야 할 것 같았다.
그래서 여러 삽질을 하다가, 모든 청크의 집계결과를 하나의 DataFrame으로 만들어 놓고,
이 DataFrame를 또 한번 집계를 수행하는 방식을 진행하였다.
chunk_size = 1000
aggregated_chunks = []
# JSON 파일을 청크 단위로 읽고 집계
for chunk in pd.read_json(file_path, lines=True, chunksize=chunk_size):
chunk['date'] = pd.to_datetime(chunk['ts'], unit='ms').dt.date
aggregated_chunk = chunk.groupby(['date', 'level', 'location', 'gender']).agg({
'userId': pd.Series.nunique,
'sessionId': pd.Series.nunique,
'song': 'count',
'length': 'sum'
}).rename(columns={
'userId': 'unique_user_count',
'sessionId': 'unique_session_count',
'song': 'total_song_plays',
'length': 'total_play_time'
}).reset_index()
aggregated_chunks.append(aggregated_chunk)
# 모든 청크의 집계 결과를 하나의 DataFrame으로 합침
final_aggregated_df = pd.concat(aggregated_chunks, ignore_index=True)
# 최종 집계 수행
final_aggregated_df = final_aggregated_df.groupby(['date', 'level', 'location', 'gender']).sum().reset_index()
# 데이터베이스에 저장
try:
final_aggregated_df.to_sql(con=database_connection, name='user_daily_song_count2', if_exists='append', index=False)
except Exception as e:
print(f"Error occurred: {e}")
이렇게 하니, 정상적으로 데이터가 집계되어 들어가게 되었음을 확인하였다.
이제 위 비교군을 가지고 Pandas와 Spark의 성능을 비교할 수 있는 비교군 코드를 만들 수 있게 되었다.
아무튼 이렇게 만들어진 데이터를 가지고 시각화를 해보자.. 이젠..
'오픈소스' 카테고리의 다른 글
M1으로 docker 로 설치하다 실패해 local로 설치하는 과정 (0) | 2023.12.15 |
---|---|
Eventsim으로 생성한 로그데이터로 SuperSet으로 시각화하기(4) (0) | 2023.12.07 |
Eventsim으로 생성한 로그데이터로 SuperSet으로 시각화하기(2) (0) | 2023.11.25 |
Eventsim으로 생성한 로그데이터로 SuperSet으로 시각화하기(1) (0) | 2023.11.19 |
SuperSet 설치하면서 발생한 오류들 (1) | 2023.11.12 |