오픈소스

Eventsim으로 생성한 로그데이터로 SuperSet으로 시각화하기(3)

JMDev 2023. 12. 3. 00:29

https://park-dev-diary.tistory.com/26

 

Eventsim으로 생성한 로그데이터로 SuperSet으로 시각화하기(2)

https://park-dev-diary.tistory.com/25 Eventsim으로 생성한 로그데이터로 SuperSet으로 시각화하기(1) https://park-dev-diary.tistory.com/21 Eventsim 빌드 및 실행하기 이 모든 과정은 MacOS 기준을 작성되었습니다! 1. JAVA

park-dev-diary.tistory.com

 

 

그 이전에 Pandas로 진행하다가 안되었던 이유는 간단했다.

df = pd.read_json(file_path, lines=True)

를 주석처리하지 않고 그대로 작업을 해서 결국엔 chunk, ijson 둘다 실행하기 이전에 큰 데이터를 읽게 되어 

터져버리게 되었던 것이였다.

 

주석처리하고 실행하니, Pandas로도 Insert 작업엔 크게 무리가 없었다.

 

그리고 그렇게 넣은 데이터들을 aggreate 하는데

Pandas, Spark 두 라이브러리를 활용하여 진행했는데

 

Spark 코드는 이렇다.

 

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType, FloatType
from pyspark.sql.functions import col, from_unixtime, to_date, count
from pyspark.sql import functions as F
# Spark 세션 초기화
spark = SparkSession.builder \
    .appName("JsonToMySQLDailyAggregate") \
    .config("spark.jars", "/Users/parkjeongmin/workspace/event-project/mysql-connector-java-8.0.28.jar") \
    .config("spark.driver.extraClassPath", "/Users/parkjeongmin/workspace/event-project/mysql-connector-java-8.0.28.jar") \
    .getOrCreate()

# 스키마 정의
schema = StructType([
    StructField("ts", LongType(), True),
    StructField("userId", StringType(), True),
    StructField("sessionId", IntegerType(), True),
    StructField("page", StringType(), True),
    StructField("auth", StringType(), True),
    StructField("method", StringType(), True),
    StructField("status", IntegerType(), True),
    StructField("level", StringType(), True),
    StructField("itemInSession", IntegerType(), True),
    StructField("location", StringType(), True),
    StructField("userAgent", StringType(), True),
    StructField("lastName", StringType(), True),
    StructField("firstName", StringType(), True),
    StructField("registration", LongType(), True),
    StructField("gender", StringType(), True),
    StructField("tag", StringType(), True),
    StructField("artist", StringType(), True),
    StructField("song", StringType(), True),
    StructField("length", FloatType(), True)
])

# JSON 파일 읽기
df = spark.read.json("event.data.json", schema=schema)

# 타임스탬프를 날짜로 변환
df = df.withColumn("date", to_date(from_unixtime(col("ts") / 1000)))

# 일 단위로 집계
daily_agg_df = df.groupBy("date", "level", "location", "gender").agg(
    F.countDistinct("userId").alias("unique_user_count"),  # 고유한 사용자 수
    F.countDistinct("sessionId").alias("unique_session_count"),  # 고유한 세션 수
    F.count("song").alias("total_song_plays"),  # 총 노래 재생 횟수
    F.sum("length").alias("total_play_time")  # 총 재생 시간
)

# 데이터 MySQL에 저장
daily_agg_df.write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://localhost/mydatabase") \
    .option("dbtable", "user_daily_song_count") \
    .option("user", "root") \
    .option("password", "my-secret-pw") \
    .mode("append") \
    .save()

# Spark 세션 종료
spark.stop()

 

Spark로 실행한 코드는 매우 정상적으로 잘 돌아갔다.

 

그리고 문제의 Pandas.

 

앞서 Pandas로 데이터들을 넣으려고 할 때, 읽는 데이터가 너무 큰 데이터라 터져버렸기에

Chunksize를 줘서 조각내서 데이터를 읽고 agg 작업을 진행하였는데

대략적인 코드는 이러하다.

 

aggregated_data = pd.DataFrame()

# JSON 파일을 청크 단위로 읽고 집계
for chunk in pd.read_json(file_path, lines=True, chunksize=chunk_size):
    chunk['date'] = pd.to_datetime(chunk['ts'], unit='ms').dt.date
    aggregated_chunk = chunk.groupby(['date', 'level', 'location', 'gender']).agg({
        'userId': pd.Series.nunique,
        'sessionId': pd.Series.nunique,
        'song': 'count',
        'length': 'sum'
    }).rename(columns={
        'userId': 'unique_user_count',
        'sessionId': 'unique_session_count',
        'song': 'total_song_plays',
        'length': 'total_play_time'
    }).reset_index()
    # aggregated_data.append(aggregated_chunk)

    aggregated_data = pd.concat([aggregated_data, aggregated_chunk], ignore_index=True)

# 모든 청크의 집계 결과를 하나의 DataFrame으로 합침
# final_aggregated_df = pd.concat(aggregated_data)

# 데이터베이스에 저장
try:
    aggregated_data.to_sql(con=database_connection, name='user_daily_song_count2', if_exists='append', index=False)
except Exception as e:
    print(f"Error occurred: {e}")

위 코드의 문제점은 각 청크데이터로만 agg을 시도하였고, 결국엔 조각조각 합계를 시도한 후에,

마지막에 합쳐버려서 종합적으로는 조각된 합계들을 뭉쳐서 저장하게 되었던 것이였다.

 

만일 정상적으로 잘 집계가 되면, 전체적인 데이터 수는 30만개 정도로 나오게 되는데

위 Pandas 코드를 실행하게 되면, 300만개 정도의 데이터가 Insert 되는 현상이 발생한다.

 

이 부분을 어떻게 집계해야하는지, Chunk된 조각데이터들을 합치기 위해서는 새로운 DataFrame를 만들고

Chunk데이터를 지속적으로 UpSert 하는 느낌으로 진행되어야 할 것 같았다.

그래서 여러 삽질을 하다가,  모든 청크의 집계결과를 하나의 DataFrame으로 만들어 놓고,

이 DataFrame를 또 한번 집계를 수행하는 방식을 진행하였다.

 

chunk_size = 1000

aggregated_chunks = []

# JSON 파일을 청크 단위로 읽고 집계
for chunk in pd.read_json(file_path, lines=True, chunksize=chunk_size):
    chunk['date'] = pd.to_datetime(chunk['ts'], unit='ms').dt.date
    aggregated_chunk = chunk.groupby(['date', 'level', 'location', 'gender']).agg({
        'userId': pd.Series.nunique,
        'sessionId': pd.Series.nunique,
        'song': 'count',
        'length': 'sum'
    }).rename(columns={
        'userId': 'unique_user_count',
        'sessionId': 'unique_session_count',
        'song': 'total_song_plays',
        'length': 'total_play_time'
    }).reset_index()
    aggregated_chunks.append(aggregated_chunk)

# 모든 청크의 집계 결과를 하나의 DataFrame으로 합침
final_aggregated_df = pd.concat(aggregated_chunks, ignore_index=True)

# 최종 집계 수행
final_aggregated_df = final_aggregated_df.groupby(['date', 'level', 'location', 'gender']).sum().reset_index()

# 데이터베이스에 저장
try:
    final_aggregated_df.to_sql(con=database_connection, name='user_daily_song_count2', if_exists='append', index=False)
except Exception as e:
    print(f"Error occurred: {e}")

 

이렇게 하니, 정상적으로 데이터가 집계되어 들어가게 되었음을 확인하였다.

 

이제 위 비교군을 가지고 Pandas와 Spark의 성능을 비교할 수 있는 비교군 코드를 만들 수 있게 되었다.

아무튼 이렇게 만들어진 데이터를 가지고 시각화를 해보자.. 이젠..