카테고리 없음

[네이버 클라우드] Local에서 pyspark로 S3 읽고 저장하기

JMDev 2023. 12. 18. 19:50

 

Local에서 pyspark로 CSV를 읽고, 해당 CSV를 parquet으로 쪼개서 저장해야하는 작업을 진행했다.

S3는 AWS가 아닌 Naver Cloud에서 제공해주는 Object Storage 를 활용하였다.

( 참고로 작업환경은 Mac M1 )

 

우선 Naver Cloud에서는 AWS형태의 모듈들을 전체적으로 지원이 되어서, 기존에 사용하던

AWS에 대한 권한 설정 혹은 세팅들을 전부 지원이 되어서 해당 모듈들을 활용하여 

Naver Cloud의 S3를 직접적으로 읽고 쓸 수 있는 편리함을 가지고 있다

 

Spark로 S3 관련 모듈로 통해 AWS S3를 읽고 쓰는 코드들을 대체로 활용하면 Naver Cloud의 S3도 동일한

작업들을 할 수 있었는데, 구글링에 pyspark aws s3 로 검색되는 코드들을 레퍼런스 삼아 작업도 가능하다.

( 네이버 클라우드로는 레퍼런스 코드가 거의 없음 )

 

 

구글링해서 찾은 결과,

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import TimestampType

# AWS 자격 증명 설정
access_key = ''
secret_key = ''
endpoint_url = "https://kr.object.ncloudstorage.com"

# Spark 세션 초기화 및 S3 설정 추가
spark = SparkSession.builder.appName("csvToParquet") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0,com.amazonaws:aws-java-sdk-bundle:1.11.375") \
    .config("spark.hadoop.fs.s3a.access.key", access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
    .config("spark.hadoop.fs.s3a.endpoint", endpoint_url) \
    .getOrCreate()

# S3에서 CSV 파일 읽기
csv_file_path = "s3a://data-lake/bronze/eventsim_big.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# 'ts' 필드를 날짜 형식으로 변환
df = df.withColumn("date", to_date((col("ts") / 1000).cast(TimestampType())))

# 날짜별로 데이터를 분할하고 Parquet으로 저장
output_base_path = "s3a://data-lake/silver/eventsim/"
df.write.partitionBy("date").parquet(output_base_path)

# Spark 세션 종료
spark.stop()

 

위 코드로 실행을 시도하였고, 실행이 되지 않았다.

 

발생한 에러코드로는 아래와 같았다.

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
  File "/Users/parkjeongmin/workspace/event-project/naver_spark_s3.py", line 40, in <module>
    df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
  File "/Users/parkjeongmin/workspace/event-project/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 740, in csv
    return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File "/Users/parkjeongmin/workspace/event-project/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/Users/parkjeongmin/workspace/event-project/venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
  File "/Users/parkjeongmin/workspace/event-project/venv/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o40.csv.
: java.lang.NoClassDefFoundError: com/amazonaws/auth/AWSCredentialsProvider
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.hadoop.conf.Configuration.getClassByNameOrNull(Configuration.java:2625)
        at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2590)
        at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
        at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
        at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
        at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
        at scala.Option.getOrElse(Option.scala:189)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:538)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: com.amazonaws.auth.AWSCredentialsProvider
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 31 more

 

 

위 에러코드로만 추측해보았을 떈, AWSCredentialsProvider를 보고 AWS에 추가적인 권한설정 영역이라고 판단했다.

그리고 구글링으로 서칭해봤을 때, 나와 동일한 오류로 질문한 stack overflow를 보게되었고

 

https://stackoverflow.com/questions/74647110/configuring-pyspark-with-amazon-s3-giving-java-lang-classnotfoundexception-com

 

Configuring Pyspark with Amazon S3 giving java.lang.ClassNotFoundException: com.amazonaws.auth.AWSCredentialsProvider

I'm trying to configure pyspark with Amazon s3 but got the following error: py4j.protocol.Py4JJavaError: An error occurred while calling o66.csv. : java.lang.NoClassDefFoundError: com/amazonaws/auth/

stackoverflow.com

 

거기의 답변으로는 버전에 대한 문제와 설정에 대한 문제라고 지적해주고 있었고,

자연스럽게 포커싱이 이러한 버전,의존성 문제로 삽질을 몇 시간동안 씨름하다가

해당 메세지를 자세히 읽어보면, java.lang.ClassNotFoundException 

클래스 자체를 못찾고 있었고, 상단에 config에 넣어주었던 aws-java-sdk.jar 가 정상적으로 의존되지 못함을 

추측하게 되었다.

config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.0,com.amazonaws:aws-java-sdk-bundle:1.11.375")

위에 있는 설정 자체가 에러였음이라고 생각함..

 

 

그래서 이 부분을 직접적으로 jars를 로컬에 받아놓고, 해당 부분을 직접적으로 File path를 잡아주었다.

그러니 실행이 되어버렸다. 몇 시간동안 잘못된 키워드로 삽질했고, 여러 https://hadoop.apache.org/ 도 보면서 

원인이 무엇인지도 모르다가, 뭔지 모르게 허무하게 해결되었다.

 

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date
from pyspark.sql.types import TimestampType

# AWS 자격 증명 설정
access_key = ''
secret_key = ''
endpoint_url = "https://kr.object.ncloudstorage.com"

spark = SparkSession.builder.appName("csvToParquet") \
    .config("spark.hadoop.fs.s3a.access.key", access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", secret_key) \
    .config("spark.hadoop.fs.s3a.endpoint", endpoint_url) \
    .config("spark.driver.extraClassPath", "/Users/parkjeongmin/workspace/event-project/aws-java-sdk-bundle-1.12.481.jar") \
    .config("spark.executor.extraClassPath", "/Users/parkjeongmin/workspace/event-project/aws-java-sdk-bundle-1.12.481.jar") \
    .getOrCreate()


# S3에서 CSV 파일 읽기
csv_file_path = "s3a://data-lake/bronze/eventsim.csv"
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)

# 'ts' 필드를 날짜 형식으로 변환
df = df.withColumn("date", to_date((col("ts") / 1000).cast(TimestampType())))

# 날짜별로 데이터를 분할하고 Parquet으로 저장
output_base_path = "s3a://data-lake/silver/eventsim/"
df.write.partitionBy("date").parquet(output_base_path)

# Spark 세션 종료
spark.stop()

 

위에서 작업한 부분은 spark.driver.extraClassPath, spark.executor.extraClassPath 에 

직접 mvn홈페이지에서 aws-java-sdk-bundle.jar를 설치한 경로를 바라보게 만들어주었다.

https://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk-bundle

 

 

일단 작업이 필요한 영역은 구현이 되어서 다행이지만, 

해당 오류를 보고 파악하는 부분에 대해 좀 더 확실하고 꼼꼼한 추론을 하였다면

그렇게 많은 시간을 허비하지 않았을 것 같다. 특히, pyspark으로 s3 를 read하는 레퍼런스들 중에 

참고했을 때, 원하는 결과가 나오지 않았고, 키워드 위주로 검색하여 나온 몇 안되는 답변글에 

집중하게 되니, 실제로 나에게 부합하지 않은 답변글임에도 그에 대한 판단을 못하고 시야가 좁아진 상태에서

서칭을 하다보니 시간이 더욱 길어진 것 같다. 이런 부분을 개선할 수 있는 부분들이 뭐가 있을지, 또 이러한

상황이 발생했을 때 지금 이 순간을 참고해볼 수 있음 좋겠다.