https://park-dev-diary.tistory.com/25
Eventsim으로 생성한 로그데이터로 SuperSet으로 시각화하기(1)
https://park-dev-diary.tistory.com/21 Eventsim 빌드 및 실행하기 이 모든 과정은 MacOS 기준을 작성되었습니다! 1. JAVA 설치하기 터미널에서 java -version 명령어로 설치되었는지 확인 후 없다면 아래 사이트를
park-dev-diary.tistory.com
이전 글에서 시각화 진행할 예정이였는데, 위에서 해준대로 데이터 마이그레이션하면 데이터 INSERT 도중에 에러가 나서
옮겨야할 데이터를 전부 가져오지 못하는 상황이 발생했다.
당장 급하게 찾은 해결방안은 chunk_size 만큼 지정해서, 저장하는 도중 오류가 발생하더라도
지속적으로 INSERT되는 코드를 작성하였다.
import pandas as pd
from sqlalchemy import create_engine, types
import time
# 시작 시간 기록
start_time = time.time()
# JSON 파일 로드
file_path = 'test.data.json' # JSON 파일 경로
df = pd.read_json(file_path, lines=True)
# 데이터 정제 및 전처리 (예시)
# df['new_column'] = df['existing_column'].apply(some_function) # 새로운 컬럼 추가 또는 변환
# MySQL 데이터베이스 연결
database_username = 'root'
database_password = 'my-secret-pw'
database_ip = 'localhost'
database_name = 'mydatabase'
database_connection = create_engine(f'mysql+pymysql://{database_username}:{database_password}@{database_ip}/{database_name}')
table_name = 'user_song_count'
dtype={
'ts': types.BigInteger,
'userId': types.VARCHAR(length=255),
'sessionId': types.Integer,
'page': types.VARCHAR(length=255),
'auth': types.VARCHAR(length=255),
'method': types.VARCHAR(length=255),
'status': types.Integer,
'level': types.VARCHAR(length=255),
'itemInSession': types.Integer,
'location': types.VARCHAR(length=255),
'userAgent': types.VARCHAR(length=255),
'lastName': types.VARCHAR(length=255),
'firstName': types.VARCHAR(length=255),
'registration': types.BigInteger,
'gender': types.VARCHAR(length=255),
'tag': types.VARCHAR(length=255),
'artist': types.VARCHAR(length=500),
'song': types.VARCHAR(length=255),
'length': types.Float
}
# 데이터베이스에 데이터 저장
chunk_size = 100 # 적절한 청크 크기 설정
for i in range(0, len(df), chunk_size):
chunk = df[i:i + chunk_size]
try:
chunk.to_sql(con=database_connection, name=table_name, if_exists='append', index=False, dtype=dtype, chunksize=chunk_size)
except Exception as e:
print(f"Error occurred with chunk starting at row {i}: {e}")
# 오류 발생 시 로그를 남기고 다음 청크로 계속 진행
continue
# 실행 완료 후 경과 시간 출력
end_time = time.time()
print(f"Time taken: {end_time - start_time} seconds")
# 데이터베이스 연결 종료 (선택적)
# database_connection.dispose()
위와 같이 수정하고 데이터의 대부분을 INSERT 할 수 있었다.
조금 찝찝해서 해당 부분에서 발생하고 있는 오류들을 바라보니,
마이그레이션 대상인 데이터 파일에 artist의 길이가 255보다 더 길어지는 데이터 값이 있었던 모양이다.
그래서 우선 artist length값도 조금 더 증가시켜줬다.
그렇게 일단 데이터를 추가하는 것은 완료하였다.
하지만 데이터의 크기가 조금 아쉬웠다.
이벤트심으로 만든 데이터크기가 대략적으로 500MB정도 되는데, 이걸 10배 정도 더 키워서
이 부분을 결정적으로 시각적인 데이터로 표현할 때, 어느정도로 부화가 있는지를 체크하고 싶었다.
그래서 이벤트심을 통해 다시 한번 데이터를 생성하고,
위와 같은 작업을 반복하여 데이터를 MySQL로 마이그레이션 해주었다.
bin/eventsim --config examples/example-config.json --tag contral -n 25000 --start-time "2015-06-01T00:00:00" --end-time "2015-12-01T00:00:00" --growth-rate 0.25 --userid 1 --randomseed 1 event.data.json
End-time과 contral -n (유저수) 인자값만 수정하면 기존 default 데이터보다 훨씬 많은 데이터들을 얻을 수 있다.
위 명령어로 만들어진 데이터의 용량은 대략 7기가 정도 된다.
해당 데이터를 다시 한번 Pandas를 활용해서 MySQL에 넣어주면 될 것 같다.
라고 생각하고 진행한 결과 우선 Pandas으로 7기가 이상을 MySQL에 INSERT 작업을 하는 중에
지속적으로 Killed 되어서 DB로 넣어지지 않았다.
찾아보니 Python에서 커다란 데이터를 pandas로 데이터프레임을 만드는 과정에서 너무 커버리면
메모리 아웃이 발생하여 killed 된다고 한다.
그래서 청크로도 쪼개보고, df에서 파라미터에 chunk 값도 넣어봐주고, json를 스트림으로 저장할 수 있게 도와주는
ijson를 설치해서 저장도 해보고, 버퍼개념을 활용해서 실행해봤지만, 소용없었다.
아래는 나의 삽질이 녹아있는 코드이다. 전부 killed이 발생하였다.
import ijson
import pandas as pd
from sqlalchemy import create_engine, types
from sqlalchemy.exc import SQLAlchemyError
import time
# 시작 시간 기록
start_time = time.time()
# JSON 파일 로드
file_path = 'event.data.json' # JSON 파일 경로
df = pd.read_json(file_path, lines=True)
# 데이터 정제 및 전처리 (예시)
# df['new_column'] = df['existing_column'].apply(some_function) # 새로운 컬럼 추가 또는 변환
# MySQL 데이터베이스 연결
database_username = 'root'
database_password = 'my-secret-pw'
database_ip = 'localhost'
database_name = 'mydatabase'
database_connection = create_engine(f'mysql+pymysql://{database_username}:{database_password}@{database_ip}/{database_name}')
table_name = 'user_song_count'
dtype={
'ts': types.BigInteger,
'userId': types.VARCHAR(length=255),
'sessionId': types.Integer,
'page': types.VARCHAR(length=255),
'auth': types.VARCHAR(length=255),
'method': types.VARCHAR(length=255),
'status': types.Integer,
'level': types.VARCHAR(length=255),
'itemInSession': types.Integer,
'location': types.VARCHAR(length=255),
'userAgent': types.VARCHAR(length=255),
'lastName': types.VARCHAR(length=255),
'firstName': types.VARCHAR(length=255),
'registration': types.BigInteger,
'gender': types.VARCHAR(length=255),
'tag': types.VARCHAR(length=255),
'artist': types.VARCHAR(length=255),
'song': types.VARCHAR(length=255),
'length': types.Float
}
buffer = [] # 아이템을 저장할 버퍼
buffer_size = 1000 # 버퍼 크기 설정
with open(file_path, 'r') as file:
for item in ijson.items(file, 'item'):
buffer.append(item)
if len(buffer) >= buffer_size:
# DataFrame 생성
df = pd.DataFrame(buffer)
# 일괄 처리를 위한 SQL 문 생성
insert_statement = pd.io.sql.get_insert_statement(table_name='user_song_count', con=database_connection, schema=None, if_exists='append', index=False, dtype=dtype)
# 데이터베이스에 일괄 삽입
try:
database_connection.execute(insert_statement, df.to_dict(orient='records'))
except SQLAlchemyError as e:
print(f"Error occurred: {e}")
buffer = [] # 버퍼 초기화
# 남은 데이터 처리
if buffer:
df = pd.DataFrame(buffer)
insert_statement = pd.io.sql.get_insert_statement(table_name='user_song_count', con=database_connection, schema=None, if_exists='append', index=False, dtype=dtype)
try:
database_connection.execute(insert_statement, df.to_dict(orient='records'))
except SQLAlchemyError as e:
print(f"Error occurred: {e}")
# with open('event.data.json', 'r') as file:
# for item in ijson.items(file, 'item'):
# buffer.append(item)
# if len(buffer) >= buffer_size:
# df = pd.DataFrame(buffer)
# df.to_sql(con=database_connection, name='user_song_count', if_exists='append', index=False)
# buffer = [] # 버퍼 초기화
# if buffer: # 남은 아이템 처리
# df = pd.DataFrame(buffer)
# df.to_sql(con=database_connection, name='user_song_count', if_exists='append', index=False)
# with open('event.data.json', 'r') as file:
# # ijson은 파일을 스트리밍하며 'item' 이벤트를 발생시킵니다.
# for item in ijson.items(file, 'item'):
# # 각 item을 DataFrame으로 변환
# df = pd.DataFrame([item])
# # 데이터베이스에 데이터 저장
# df.to_sql(con=database_connection, name='user_song_count', if_exists='append', index=False)
# # 청크 크기 설정
# chunk_size = 1000 # 적절한 청크 크기 설정
# # JSON 파일을 청크 단위로 읽기
# for chunk in pd.read_json(file_path, lines=True, chunksize=chunk_size):
# # 데이터베이스에 데이터 저장
# try:
# chunk.to_sql(con=database_connection, name=table_name, if_exists='append', index=False, dtype=dtype)
# except Exception as e:
# print(f"Error occurred: {e}")
# # 오류 발생 시 로그를 남기고 다음 청크로 계속 진행
# continue
# # 데이터베이스에 데이터 저장
# chunk_size = 100 # 적절한 청크 크기 설정
# for i in range(0, len(df), chunk_size):
# chunk = df[i:i + chunk_size]
# try:
# chunk.to_sql(con=database_connection, name=table_name, if_exists='append', index=False, dtype=dtype, chunksize=chunk_size)
# except Exception as e:
# print(f"Error occurred with chunk starting at row {i}: {e}")
# # 오류 발생 시 로그를 남기고 다음 청크로 계속 진행
# continue
# 실행 완료 후 경과 시간 출력
end_time = time.time()
print(f"Time taken: {end_time - start_time} seconds")
# 데이터베이스 연결 종료 (선택적)
# database_connection.dispose()
결국에 어떤 방법으로 해결할 수 있을까? 라는 생각을 하였는데,
애시당초 pandas로 굳이 MySQL에 적재를 해야하나 라는 생각이 들었다.
또는 json을 여러개로 쪼개서, 하나씩 넣어주는 방법도 있었을 것 같았다.
결국엔 넣었던 방법은 pySpark로 실행하는 것이였다.
여기서 Spark를 활용하여, 어떤 분산처리를 하여 데이터를 적재하거나, 빠르게 처리하기 위해서 사용한 것은 아니였다.
오로지 Spark가 데이터를 분산해서 Worker에게 쪼개주는 부분만 활용해서 적재할 수 있는 방법이 있어
이 부분을 위해 Spark를 이용하게 된 것이다...
결론적으로는 Python에 Pandas를 써버리면, Killed 당해버리니
Spark를 활용하여 데이터 마이그레이션을 진행하였고, 결과적으로는 성공하였다.
솔직히 조금 이해가 되지 않는 부분들이 있다.
Pandas를 활용해서 데이터를 적재하게 되었을 때, 파이썬의 스펙이 좋지 못해서 killed 당하는 건지,
아니면 Pandas에서 데이터프레임으로 Wrap하는 과정에서 메모리를 낭비하게 되는건지, 하지만
이러한 메모리 낭비 부분을 충분히 청크파일 혹은 ijson으로 커버하였다고 생각했지만 이 방법도 통하지가 않았다.
Spark가 killed 되지 않은 이유는 대량 JSON를 RDD로 분산해서 데이터를 쪼개서 워커에게 나눠줄 수 있어
메모리를 효율적으로 관리할 수 있었고, JVM환경에서 실행되기 때문에 Pandas의 무분별한 메모리 낭비를 안할 수 있어
실행이 되었던 것 같긴한데.. 솔직히 명확하게 잘 모르겠다.
아무튼 데이터 적재하고 카운트를 보니 천만정도 되는 수를 INSERT를 했다.
그리고 7기가 정도 데이터를 MySQL에 적재해보니 세삼 이 정도의 크기를 가지고, 시각화 대시보드를 활용하기가
어려울 것 같고, 집계테이블을 따로 만들어, aggreate 해야하는데 어떤 데이터와 어떤 방식으로 진행할지 고민이 필요할 것 같다.
이 aggreate 작업이 되어야 시각화 대쉬보드가 탄생할 수 있을 것 같다...
'오픈소스' 카테고리의 다른 글
Eventsim으로 생성한 로그데이터로 SuperSet으로 시각화하기(4) (0) | 2023.12.07 |
---|---|
Eventsim으로 생성한 로그데이터로 SuperSet으로 시각화하기(3) (0) | 2023.12.03 |
Eventsim으로 생성한 로그데이터로 SuperSet으로 시각화하기(1) (0) | 2023.11.19 |
SuperSet 설치하면서 발생한 오류들 (1) | 2023.11.12 |
[Mac] M1 Docker Ubuntu에서 C -m32 실행안될 때 (0) | 2023.11.10 |