오픈소스

[Spark] serializer 를 통한 성능개선

JMDev 2024. 3. 29. 13:39

Spark를 최적화 할 수 있는 방법 중에는 직렬화 속성을 명시하면서 7% 성능을 높일 수 있었고, 더 많은 데이터를 처리하게 되면 이상의 성능을 높일 수 있을 것 같습니다.

직렬화는 분산 애플리케이션의 성능에 중요한 역활을 합니다. 객체를 직렬화하는데 시간이 오래 걸리거나 많은 바이트를 소비하는 방식은 퍼포먼스를 늦추게 됩니다.

직렬화는 객체를 바이트 스트림으로 변환하는 과정이며, 역직렬화는 바이트 스트림을 다시 객체로 변환하는 방법입니다.

위에 빨간색으로 강조한 부분이 Spark에서 RDD를 다른 분산 환경으로 네트워크를 통해 셔플링 될 때, 직렬화 된 데이터를 노드에게 전달합니다. 전달받은 노드에서는 직렬화된 데이터를 역직렬화하여 Object로써 활용하게 됩니다.

위에서 설명한 흐름을 이해하였다면, 직렬화, 역직렬화를 방식이 성능에 영향을 끼칠 것이라는 것을 짐작할 수 있을 것 같습니다.

그렇다면 Spark에서 직렬화는 어떠한 방식으로 이뤄질까요?

Spark에서 직렬화의 디폴트는 Java의 내장된 직렬화 기능을 사용합니다. 그리고 성능을 개선시킬 수 있는 Kryo직렬화 방식이 존재합니다.

Java 내부 직렬화와 Kryo 직렬화 기능 두 기능의 차이점에 대해 알아보면 이렇습니다.

 

Java 직렬화

  • Spark의 디폴트는 자바의 ObjectOutputStream 프레임워크를 사용하여 객체를 직렬화
  • 이 방법은 java.io.Serializable 을 구현하는 모든 클래스와 함께 동작
  • java.io.Externalizable 을 확장함으로써 직렬화의 성능을 더 세밀하게 조정 가능
  • Java 직렬화는 유연하지만 성능적으로는 느림
  • 하지만 많은 클래스에 대해 큰 직렬화 포맷을 생성

Kryo 직렬화

  • Spark는 Kryo 라이브러리를 사용하면 객체를 더 빠르게 직렬화할 수도 있음.
  • Kryo는 Java 직렬화보다 훨씬 빠르고 압축률 높음.
  • 하지만 모든 Serializable 타입을 지원하지 않아, 미리 사용할 클래스가 등록되어야 함.

결론적으로 데이터 포맷, 클래스 등록 및 필드 캐싱, 확장성으로 부터 두 직렬화 기능에 성능적인 차이가 발생한다고 볼 수 있을 것 같습니다.

 

Kryo가 오류나는 특정한 상황

코드상으로 Kryo 직렬화를 사용할 때 오류가 발생하는 순간을 확인하고 싶었고, 아래는 그러한 오류가 발생하는 코드입니다.

class NonSerializableClass {
    private int value;

    public NonSerializableClass(int value) {
        this.value = value;
    }

    public int getValue() {
        return value;
    }
}

public class SerializationErrorExample {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("SerializationErrorExample").setMaster("local");
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        // 이 클래스를 Kryo에 등록하지 않음
        // conf.registerKryoClasses(new Class[]{NonSerializableClass.class});

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<NonSerializableClass> list = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            list.add(new NonSerializableClass(i));
        }

        JavaRDD<NonSerializableClass> rdd = sc.parallelize(list);
        // 액션을 수행하여 직렬화를 강제함
        System.out.println(rdd.count()); // 이 시점에서 직렬화 오류 발생 가능성

        sc.close();
    }
}

위 코드를 오류나지 않게 가능하려면 아래 직렬화 가능한 인터페이스를 implements 를 합니다.

// Serializable 인터페이스를 구현하여 직렬화 가능하게 만듦
class SerializableClass implements Serializable {
    private int value;

    public SerializableClass(int value) {
        this.value = value;
    }

    public int getValue() {
        return value;
    }
}

public class SerializationFixedExample {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("SerializationFixedExample").setMaster("local");
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        // 클래스를 Kryo에 등록
        conf.registerKryoClasses(new Class[]{SerializableClass.class});

        JavaSparkContext sc = new JavaSparkContext(conf);

        List<SerializableClass> list = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            list.add(new SerializableClass(i));
        }

        JavaRDD<SerializableClass> rdd = sc.parallelize(list);
        System.out.println(rdd.count()); // 직렬화 오류 없이 실행됨

        sc.close();
    }
}

위와 같이 직렬화가 가능하게 Serializable 인터페이스를 사용해야하며, 별도의 관리가 필요한 것을 확인 할 수 있습니다.

위와 같이, python으로 재현하고자 하였는데 python에서는 위와 같은 재현이 어렵습니다.

class NonSerializableClass:
    def __init__(self, value):
        self.value = value

# 사용자 정의 클래스의 인스턴스 생성
non_serializable_objects = [NonSerializableClass(i) for i in range(5)]

# RDD 생성 시도
rdd = spark.sparkContext.parallelize(non_serializable_objects)

# 액션을 통해 직렬화 강제 시도
rdd.collect()

위에는 자바와 비슷한 방식으로 오류가 발생할 것 같아 보이는 형식으로 재현한 코드이지만, 실제로는 정상작동하는 코드입니다. 그러한 이유는 아래와 같습니다.

 

Python 내장 직렬화 방식 (피클링 )

  • Python에서는 기본적으로 직렬화 및 역직렬화가 가능합니다
  • 즉, Java에서 Serializable 를 implements 해야 직렬화 되던 것이, 파이썬에서는 모든 Class에 구현 되어 있다라고 볼 수 있을 것 같음.
  • KryoSerializer를 설정하더라도 크게 오류가 발생하지 않음.
  • 결론은 Python으로 작성한 Spark 코드에서는 큰 오류 없이 사용 가능.

그렇다면 현재 진행 중인 프로젝트에서 kryo 직렬화 방식을 추가해보도록 하겠습니다.

기존코드

def create_spark_session():
    return SparkSession.builder \\
        .appName("DataProcessing") \\
        .config("spark.hadoop.fs.s3a.endpoint", "<https://s3.ap-northeast-2.amazonaws.com>") \\
        .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \\
        .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \\
        .getOrCreate()

수정코드

def create_spark_session():
    return SparkSession.builder \\
        .appName("DataProcessing") \\
        .config("spark.hadoop.fs.s3a.endpoint", "<https://s3.ap-northeast-2.amazonaws.com>") \\
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \\
        .config("spark.kryo.registrationRequired", "true") \\
        .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \\
        .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \\
        .getOrCreate()

 

8초가 걸린 프로세스가 수정 전 코드이고, 7.4초가 걸린 코드는 kryo 직렬화를 적용한 코드입니다.

확실히 성능개선이 이뤄진 것을 볼 수 있습니다.

 

참고

https://medium.com/@charchitpatidar/data-serialization-an-optimization-technique-in-apache-spark-22f070628f6c

https://medium.com/@sathamn.n/serialization-and-deserialization-in-spark-a-quick-overview-34ef74b7471e