콘웨이의 생명 게임(Conway's Game of Life)은 1970년에 수학자 존 콘웨이(John Conway)에 의해 고안된 세포 자동자(cellular automaton)입니다. 이 게임은 "게임"이라는 이름을 가지고 있지만, 플레이어의 참여 없이 자동으로 진행되는 시뮬레이션입니다.
게임의 규칙은 다음과 같습니다:
- 생존: 어떤 세포에 인접한 8개의 세포 중 정확히 2개 또는 3개가 살아 있으면, 다음 세대에서 그 세포는 살아 있습니다.
- 사망: 어떤 세포에 인접한 8개의 세포 중 살아 있는 세포가 2개 미만이면, 그 세포는 고립되어 사망합니다. 또한, 살아 있는 세포가 3개 초과로 인접해 있으면, 그 세포는 과밀로 인해 사망합니다.
- 번식: 어떤 세포에 인접한 8개의 세포 중 정확히 3개가 살아 있으면, 다음 세대에서 그 세포는 "번식"하여 살아납니다.
위 설명과 약간 로직은 다를 수 있겠지만, 콘웨이 생명게임을 Thread와 Queue를 활용하여
살아있는 세포를 계산하고( count_neighbors )
살아있는 세포 수에 따른 세포의 상태를 결정( game_logic ) 과정에서
두 계산을 맞물리도록 파이프라인을 작성해볼 예정입니다.
아래의 코드들을 자세히 볼 필요는 없으며,
계산하고 결정하는 로직들이 어떻게 Queue와 Thread를 활용했는지를 살펴보고,
순서가 조금 햇갈린다면 순서도를 발로 그려놓았으니 확인해보면 감을 잡을 수 있을 것 입니다.
from threading import Thread
from queue import Queue
ALIVE = '*'
EMPTY = '-'
class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return # Cause the thread to exit
yield item
finally:
self.task_done()
class StoppableWorker(Thread):
def __init__(self, func, in_queue, out_queue, **kwargs):
super().__init__(**kwargs)
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)
class ColumnPrinter:
def __init__(self):
self.columns = []
def append(self, data):
self.columns.append(data)
def __str__(self):
row_count = 1
for data in self.columns:
row_count = max(
row_count, len(data.splitlines()) + 1)
rows = [''] * row_count
for j in range(row_count):
for i, data in enumerate(self.columns):
line = data.splitlines()[max(0, j - 1)]
if j == 0:
padding = ' ' * (len(line) // 2)
rows[j] += padding + str(i) + padding
else:
rows[j] += line
if (i + 1) < len(self.columns):
rows[j] += ' | '
return '\n'.join(rows)
def count_neighbors(y, x, get):
n_ = get(y - 1, x + 0) # North
ne = get(y - 1, x + 1) # Northeast
e_ = get(y + 0, x + 1) # East
se = get(y + 1, x + 1) # Southeast
s_ = get(y + 1, x + 0) # South
sw = get(y + 1, x - 1) # Southwest
w_ = get(y + 0, x - 1) # West
nw = get(y - 1, x - 1) # Northwest
neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
count = 0
for state in neighbor_states:
if state == ALIVE:
count += 1
return count
def count_neighbors_thread(item):
y, x, state, get = item
try:
neighbors = count_neighbors(y, x, get)
except Exception as e:
neighbors = e
return (y, x, state, neighbors)
def game_logic(state, neighbors):
if state == ALIVE:
if neighbors < 2:
return EMPTY # Die: Too few
elif neighbors > 3:
return EMPTY # Die: Too many
else:
if neighbors == 3:
return ALIVE # Regenerate
return state
def game_logic_thread(item):
y, x, state, neighbors = item
if isinstance(neighbors, Exception):
next_state = neighbors
else:
try:
next_state = game_logic(state, neighbors)
except Exception as e:
next_state = e
return (y, x, next_state)
from threading import Lock
class Grid:
def __init__(self, height, width):
self.height = height
self.width = width
self.rows = []
for _ in range(self.height):
self.rows.append([EMPTY] * self.width)
def get(self, y, x):
return self.rows[y % self.height][x % self.width]
def set(self, y, x, state):
self.rows[y % self.height][x % self.width] = state
def __str__(self):
output = ''
for row in self.rows:
for cell in row:
output += cell
output += '\n'
return output
class LockingGrid(Grid):
def __init__(self, height, width):
super().__init__(height, width)
self.lock = Lock()
def __str__(self):
with self.lock:
return super().__str__()
def get(self, y, x):
with self.lock:
return super().get(y, x)
def set(self, y, x, state):
with self.lock:
return super().set(y, x, state)
# Example 8
in_queue = ClosableQueue()
logic_queue = ClosableQueue()
out_queue = ClosableQueue()
threads = []
for _ in range(5):
thread = StoppableWorker(
count_neighbors_thread, in_queue, logic_queue)
thread.start()
threads.append(thread)
for _ in range(5):
thread = StoppableWorker(
game_logic_thread, logic_queue, out_queue)
thread.start()
threads.append(thread)
# print( len(in_queue) , len(logic_queue), len(out_queue))
class SimulationError(Exception):
pass
# Example 9
def simulate_phased_pipeline(
grid, in_queue, logic_queue, out_queue):
for y in range(grid.height):
for x in range(grid.width):
state = grid.get(y, x)
item = (y, x, state, grid.get)
in_queue.put(item) # Fan out
in_queue.join()
logic_queue.join() # Pipeline sequencing
out_queue.close()
next_grid = LockingGrid(grid.height, grid.width)
for item in out_queue: # Fan in
y, x, next_state = item
if isinstance(next_state, Exception):
raise SimulationError(y, x) from next_state
next_grid.set(y, x, next_state)
return next_grid
# Example 10
grid = LockingGrid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
columns = ColumnPrinter()
for i in range(5):
columns.append(str(grid))
# print(in_queue.qsize())
grid = simulate_phased_pipeline(
grid, in_queue, logic_queue, out_queue)
# print(in_queue.qsize())
print(columns)
for thread in threads:
in_queue.close()
for thread in threads:
logic_queue.close()
for thread in threads:
thread.join()
아래의 그림에 대해 보충설명을 드리면
StoppableWorker(Thread) 가 먼저 실행됩니다. 하지만 in_queue에 아이템이 없기에 해당 스레드는
블로킹이 걸려 다음으로 넘어가게 되죠
넘어가면서 simulate_phased_pipeline 함수를 타게되고, in_queue에 데이터가 쌓이게 됩니다.
그리고 in_queue.join()를 함으로써 블로킹되었던 시점으로 돌아가 in_queue에 있던 데이터를 fn 으로 계산하고
out_queue에 쌓이게 됩니다.
in_queue ->(fn)-> logic_queue 로 데이터가 쌓이고,
쌓인 데이터로 인해
logic_queue.get()으로 다시 접근할 수 있게 되면서 Thread 작업들이 맞물려 동작되고
-> (fn) -> out_queue
최종적으로는 out_queue에 계산된 작업들이 쌓일 수 있게 되는 것이죠!

동시성을 활용할 때, 각 셀에 대해 스레드를 생성하는 대신에 미리 정의된 작업자 스레드를 사용하는 것이 좋으며
이렇게 하면 자원 사용을 효율적으로 제어하고, 스레드 생성에 따른 추가 비용을 절약할 수 있습니다.
그러나, Queue를 사용하는 접근법에는 몇 가지 문제점이 있죠:
- 복잡도: 기존의 스레드 방식에 비해 파이프라인 함수가 더 복잡하다. 추가로 CloseableQueue와 StoppableWorker와 같은 지원 클래스가 필요하며, 이로 인해 코드의 복잡도가 증가한다.
- 확장성: 작업자 스레드의 수는 고정되어 있으므로, 시스템의 규모를 자동으로 확장하는 것은 어렵다. 이는 미리 예상된 부하에 따라 병렬 수준을 설정해야 함을 의미한다.
- 디버깅: 예외 처리가 복잡해진다. 작업 스레드에서 발생한 예외를 수동으로 잡아 Queue를 통해 주 스레드로 전달해야 한다.
- 요구 사항 변경: 프로그램의 요구 사항이 변경될 경우, Queue 를 사용하도록 코드를 리팩토링하는 것은 상당한 작업이 필요하다. 특히, 여러 단계의 파이프라인이 필요한 경우 더욱 그렇다.
- 제한된 병렬성: Queue는 프로그램이 활용할 수 있는 전체 I/O 병렬성을 제한한다. 다른 파이썬 기능이나 모듈에 비해 이는 큰 단점으로 작용할 수 있다.
결론적으로, 동시성을 활용하면서 Queue 를 사용하는 방식은 특정 상황에서 유용할 수 있지만, 위와 같은 제약 사항과 문제점을 고려하여 적절한 방법을 선택해야 합니다.
'프로그래밍 언어 > Python' 카테고리의 다른 글
| with와 @contextlib.contextmanager (0) | 2023.09.20 |
|---|---|
| 컨텍스트 스위칭으로부터의 예상치 못한 결과 ( feat.GIL ) (0) | 2023.09.20 |
| 파이썬에서 외부 터미널 명령어 사용하기 ( subprocess ) (0) | 2023.09.19 |
| GIL를 무조건 신뢰하면 안되는 이유( feat.스레드 감수성 ) (0) | 2023.09.15 |
| 두 코드의 차이점을 설명하시요( feat. Python,쓰레드, GIL, 병렬 I/O) (0) | 2023.09.15 |