Local에서 pyspark로 CSV를 읽고, 해당 CSV를 parquet으로 쪼개서 저장해야하는 작업을 진행했다.
S3는 AWS가 아닌 Naver Cloud에서 제공해주는 Object Storage 를 활용하였다.
( 참고로 작업환경은 Mac M1 )
우선 Naver Cloud에서는 AWS형태의 모듈들을 전체적으로 지원이 되어서, 기존에 사용하던
AWS에 대한 권한 설정 혹은 세팅들을 전부 지원이 되어서 해당 모듈들을 활용하여
Naver Cloud의 S3를 직접적으로 읽고 쓸 수 있는 편리함을 가지고 있다
Spark로 S3 관련 모듈로 통해 AWS S3를 읽고 쓰는 코드들을 대체로 활용하면 Naver Cloud의 S3도 동일한
작업들을 할 수 있었는데, 구글링에 pyspark aws s3 로 검색되는 코드들을 레퍼런스 삼아 작업도 가능하다.
( 네이버 클라우드로는 레퍼런스 코드가 거의 없음 )
구글링해서 찾은 결과,
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import TimestampType
# AWS 자격 증명 설정
access_key = ''
secret_key = ''
endpoint_url = "https://kr.object.ncloudstorage.com"
# Spark 세션 초기화 및 S3 설정 추가
spark = SparkSession.builder.appName("csvToParquet") \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0,com.amazonaws:aws-java-sdk-bundle:1.11.375") \
.config("spark.hadoop.fs.s3a.access.key", access_key) \
.config("spark.hadoop.fs.s3a.secret.key", secret_key) \
.config("spark.hadoop.fs.s3a.endpoint", endpoint_url) \
.getOrCreate()
# S3에서 CSV 파일 읽기
csv_file_path = "s3a://data-lake/bronze/eventsim_big.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
# 'ts' 필드를 날짜 형식으로 변환
df = df.withColumn("date", to_date((col("ts") / 1000).cast(TimestampType())))
# 날짜별로 데이터를 분할하고 Parquet으로 저장
output_base_path = "s3a://data-lake/silver/eventsim/"
df.write.partitionBy("date").parquet(output_base_path)
# Spark 세션 종료
spark.stop()
위 코드로 실행을 시도하였고, 실행이 되지 않았다.
발생한 에러코드로는 아래와 같았다.
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
File "/Users/parkjeongmin/workspace/event-project/naver_spark_s3.py", line 40, in <module>
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
File "/Users/parkjeongmin/workspace/event-project/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 740, in csv
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
File "/Users/parkjeongmin/workspace/event-project/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/parkjeongmin/workspace/event-project/venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
return f(*a, **kw)
File "/Users/parkjeongmin/workspace/event-project/venv/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o40.csv.
: java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2625)
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2590)
at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:538)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.auth.AWSCredentialsProvider
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 31 more
위 에러코드로만 추측해보았을 떈, AWSCredentialsProvider를 보고 AWS에 추가적인 권한설정 영역이라고 판단했다.
그리고 구글링으로 서칭해봤을 때, 나와 동일한 오류로 질문한 stack overflow를 보게되었고
Configuring Pyspark with Amazon S3 giving java.lang.ClassNotFoundException: com.amazonaws.auth.AWSCredentialsProvider
I'm trying to configure pyspark with Amazon s3 but got the following error: py4j.protocol.Py4JJavaError: An error occurred while calling o66.csv. : java.lang.NoClassDefFoundError: com/amazonaws/auth/
stackoverflow.com
거기의 답변으로는 버전에 대한 문제와 설정에 대한 문제라고 지적해주고 있었고,
자연스럽게 포커싱이 이러한 버전,의존성 문제로 삽질을 몇 시간동안 씨름하다가
해당 메세지를 자세히 읽어보면, java.lang.ClassNotFoundException
클래스 자체를 못찾고 있었고, 상단에 config에 넣어주었던 aws-java-sdk.jar 가 정상적으로 의존되지 못함을
추측하게 되었다.
config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0,com.amazonaws:aws-java-sdk-bundle:1.11.375")
위에 있는 설정 자체가 에러였음이라고 생각함..
그래서 이 부분을 직접적으로 jars를 로컬에 받아놓고, 해당 부분을 직접적으로 File path를 잡아주었다.
그러니 실행이 되어버렸다. 몇 시간동안 잘못된 키워드로 삽질했고, 여러 https://hadoop.apache.org/ 도 보면서
원인이 무엇인지도 모르다가, 뭔지 모르게 허무하게 해결되었다.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import TimestampType
# AWS 자격 증명 설정
access_key = ''
secret_key = ''
endpoint_url = "https://kr.object.ncloudstorage.com"
spark = SparkSession.builder.appName("csvToParquet") \
.config("spark.hadoop.fs.s3a.access.key", access_key) \
.config("spark.hadoop.fs.s3a.secret.key", secret_key) \
.config("spark.hadoop.fs.s3a.endpoint", endpoint_url) \
.config("spark.driver.extraClassPath", "/Users/parkjeongmin/workspace/event-project/aws-java-sdk-bundle-1.12.481.jar") \
.config("spark.executor.extraClassPath", "/Users/parkjeongmin/workspace/event-project/aws-java-sdk-bundle-1.12.481.jar") \
.getOrCreate()
# S3에서 CSV 파일 읽기
csv_file_path = "s3a://data-lake/bronze/eventsim.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
# 'ts' 필드를 날짜 형식으로 변환
df = df.withColumn("date", to_date((col("ts") / 1000).cast(TimestampType())))
# 날짜별로 데이터를 분할하고 Parquet으로 저장
output_base_path = "s3a://data-lake/silver/eventsim/"
df.write.partitionBy("date").parquet(output_base_path)
# Spark 세션 종료
spark.stop()
위에서 작업한 부분은 spark.driver.extraClassPath, spark.executor.extraClassPath 에
직접 mvn홈페이지에서 aws-java-sdk-bundle.jar를 설치한 경로를 바라보게 만들어주었다.
https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bundle
일단 작업이 필요한 영역은 구현이 되어서 다행이지만,
해당 오류를 보고 파악하는 부분에 대해 좀 더 확실하고 꼼꼼한 추론을 하였다면
그렇게 많은 시간을 허비하지 않았을 것 같다. 특히, pyspark으로 s3 를 read하는 레퍼런스들 중에
참고했을 때, 원하는 결과가 나오지 않았고, 키워드 위주로 검색하여 나온 몇 안되는 답변글에
집중하게 되니, 실제로 나에게 부합하지 않은 답변글임에도 그에 대한 판단을 못하고 시야가 좁아진 상태에서
서칭을 하다보니 시간이 더욱 길어진 것 같다. 이런 부분을 개선할 수 있는 부분들이 뭐가 있을지, 또 이러한
상황이 발생했을 때 지금 이 순간을 참고해볼 수 있음 좋겠다.