프로그래밍 언어/Python

Thread와 Queue를 활용한 파이프라인 구현하기( feat. 콘웨이 생명게임 )

JMDev 2023. 9. 19. 19:52

 

 

 

더보기

콘웨이의 생명 게임(Conway's Game of Life)은 1970년에 수학자 존 콘웨이(John Conway)에 의해 고안된 세포 자동자(cellular automaton)입니다. 이 게임은 "게임"이라는 이름을 가지고 있지만, 플레이어의 참여 없이 자동으로 진행되는 시뮬레이션입니다.

게임의 규칙은 다음과 같습니다:

  1. 생존: 어떤 세포에 인접한 8개의 세포 중 정확히 2개 또는 3개가 살아 있으면, 다음 세대에서 그 세포는 살아 있습니다.
  2. 사망: 어떤 세포에 인접한 8개의 세포 중 살아 있는 세포가 2개 미만이면, 그 세포는 고립되어 사망합니다. 또한, 살아 있는 세포가 3개 초과로 인접해 있으면, 그 세포는 과밀로 인해 사망합니다.
  3. 번식: 어떤 세포에 인접한 8개의 세포 중 정확히 3개가 살아 있으면, 다음 세대에서 그 세포는 "번식"하여 살아납니다.

 

위 설명과 약간 로직은 다를 수 있겠지만, 콘웨이 생명게임을 Thread와 Queue를 활용하여
살아있는 세포를 계산하고( count_neighbors 

살아있는 세포 수에 따른 세포의 상태를 결정( game_logic ) 과정에서

두 계산을 맞물리도록 파이프라인을 작성해볼 예정입니다.

아래의 코드들을 자세히 볼 필요는 없으며,

계산하고 결정하는 로직들이 어떻게 Queue와 Thread를 활용했는지를 살펴보고, 

순서가 조금 햇갈린다면 순서도를 발로 그려놓았으니 확인해보면 감을 잡을 수 있을 것 입니다.

from threading import Thread
from queue import Queue


ALIVE = '*'
EMPTY = '-'

class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)

    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return  # Cause the thread to exit
                yield item
            finally:
                self.task_done()

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue, **kwargs):
        super().__init__(**kwargs)
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)


class ColumnPrinter:
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(
                row_count, len(data.splitlines()) + 1)

        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line

                if (i + 1) < len(self.columns):
                    rows[j] += ' | '

        return '\n'.join(rows)



def count_neighbors(y, x, get):
    n_ = get(y - 1, x + 0)  # North
    ne = get(y - 1, x + 1)  # Northeast
    e_ = get(y + 0, x + 1)  # East
    se = get(y + 1, x + 1)  # Southeast
    s_ = get(y + 1, x + 0)  # South
    sw = get(y + 1, x - 1)  # Southwest
    w_ = get(y + 0, x - 1)  # West
    nw = get(y - 1, x - 1)  # Northwest
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

def count_neighbors_thread(item):
    y, x, state, get = item
    try:
        neighbors = count_neighbors(y, x, get)
    except Exception as e:
        neighbors = e
    return (y, x, state, neighbors)
    

def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state


def game_logic_thread(item):
    y, x, state, neighbors = item
    if isinstance(neighbors, Exception):
        next_state = neighbors
    else:
        try:
            next_state = game_logic(state, neighbors)
        except Exception as e:
            next_state = e
    return (y, x, next_state)

from threading import Lock


class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output

class LockingGrid(Grid):
    def __init__(self, height, width):
        super().__init__(height, width)
        self.lock = Lock()

    def __str__(self):
        with self.lock:
            return super().__str__()

    def get(self, y, x):
        with self.lock:
            return super().get(y, x)

    def set(self, y, x, state):
        with self.lock:
            return super().set(y, x, state)


# Example 8
in_queue = ClosableQueue()
logic_queue = ClosableQueue()
out_queue = ClosableQueue()

threads = []

for _ in range(5):
    thread = StoppableWorker(
        count_neighbors_thread, in_queue, logic_queue)
    thread.start()
    threads.append(thread)

for _ in range(5):
    thread = StoppableWorker(
        game_logic_thread, logic_queue, out_queue)
    thread.start()
    threads.append(thread)

# print( len(in_queue) , len(logic_queue),  len(out_queue))

class SimulationError(Exception):
    pass

# Example 9
def simulate_phased_pipeline(
        grid, in_queue, logic_queue, out_queue):
    for y in range(grid.height):
        for x in range(grid.width):
            state = grid.get(y, x)
            item = (y, x, state, grid.get)
            in_queue.put(item)          # Fan out

    in_queue.join()
    logic_queue.join()                  # Pipeline sequencing
    out_queue.close()

    next_grid = LockingGrid(grid.height, grid.width)
    for item in out_queue:              # Fan in
        y, x, next_state = item
        if isinstance(next_state, Exception):
            raise SimulationError(y, x) from next_state
        next_grid.set(y, x, next_state)

    return next_grid


# Example 10
grid = LockingGrid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    # print(in_queue.qsize())
    grid = simulate_phased_pipeline(
        grid, in_queue, logic_queue, out_queue)
    # print(in_queue.qsize())

print(columns)

for thread in threads:
    in_queue.close()
for thread in threads:
    logic_queue.close()
for thread in threads:
    thread.join()

 

아래의 그림에 대해 보충설명을 드리면

StoppableWorker(Thread) 가 먼저 실행됩니다. 하지만 in_queue에 아이템이 없기에 해당 스레드는 

블로킹이 걸려 다음으로 넘어가게 되죠

넘어가면서 simulate_phased_pipeline 함수를 타게되고, in_queue에 데이터가 쌓이게 됩니다.

그리고 in_queue.join()를 함으로써 블로킹되었던 시점으로 돌아가 in_queue에 있던 데이터를 fn 으로 계산하고

out_queue에 쌓이게 됩니다.

in_queue ->(fn)-> logic_queue 로 데이터가 쌓이고,

쌓인 데이터로 인해

logic_queue.get()으로 다시 접근할 수 있게 되면서 Thread 작업들이 맞물려 동작되고

-> (fn) -> out_queue 

최종적으로는 out_queue에 계산된 작업들이 쌓일 수 있게 되는 것이죠!

 

동시성을 활용할 때, 각 셀에 대해 스레드를 생성하는 대신에 미리 정의된 작업자 스레드를 사용하는 것이 좋으며

이렇게 하면 자원 사용을 효율적으로 제어하고, 스레드 생성에 따른 추가 비용을 절약할 수 있습니다.

그러나, Queue를 사용하는 접근법에는 몇 가지 문제점이 있죠:

  1. 복잡도: 기존의 스레드 방식에 비해 파이프라인 함수가 더 복잡하다. 추가로 CloseableQueue와 StoppableWorker와 같은 지원 클래스가 필요하며, 이로 인해 코드의 복잡도가 증가한다.
  2. 확장성: 작업자 스레드의 수는 고정되어 있으므로, 시스템의 규모를 자동으로 확장하는 것은 어렵다. 이는 미리 예상된 부하에 따라 병렬 수준을 설정해야 함을 의미한다.
  3. 디버깅: 예외 처리가 복잡해진다. 작업 스레드에서 발생한 예외를 수동으로 잡아 Queue를 통해 주 스레드로 전달해야 한다.
  4. 요구 사항 변경: 프로그램의 요구 사항이 변경될 경우, Queue 를 사용하도록 코드를 리팩토링하는 것은 상당한 작업이 필요하다. 특히, 여러 단계의 파이프라인이 필요한 경우 더욱 그렇다.
  5. 제한된 병렬성: Queue는 프로그램이 활용할 수 있는 전체 I/O 병렬성을 제한한다. 다른 파이썬 기능이나 모듈에 비해 이는 큰 단점으로 작용할 수 있다.

결론적으로, 동시성을 활용하면서 Queue 를 사용하는 방식은 특정 상황에서 유용할 수 있지만, 위와 같은 제약 사항과 문제점을 고려하여 적절한 방법을 선택해야 합니다.