프로그래밍 언어/Python

asyncio를 활용해 간단하게 서버 구현한 코드 리뷰하기

JMDev 2023. 9. 22. 15:50

 

asyncio로 서버를 구성하는 코드예제를 보고,

그냥 넘어가는 것보단 정리하면서 개념도 같이 정리해보면 좋을 것 같아서 코드리뷰 느낌으로 글을 작성해보았습니다!

전체코드를 보고 싶으신 분들은 아래 깃헙주소를 참고해주시면 좋을 것 같아요

https://github.com/bslatkin/effectivepython/blob/master/example_code/item_61.py#L278

 

또한 해당 코드에서의 목적은 클라이언트와 서버와의 숫자맞추기 게임로직을 구현한 것인데, 게임로직 함수 구현내용은

거의 생략을 했음을 참고해주시면서 보면 좋을 것 같습니다. 코드는 최대한 발생순서대로 정리하였습니다

async def main_async():
    address = ('127.0.0.1', 4321)

    server = run_async_server(address) # 1번
    asyncio.create_task(server) 

    results = await run_async_client(address) # 2번
    for number, outcome in results:
        print(f'Client: {number} is {outcome}')

위 코드는 메인으로 실행되는 코드입니다. `run_async_server` 함수는 구현된 서버 코루틴 객체를 반환합니다. 

이 코루틴 객체는 `asyncio.create_task`에 의해 이벤트 루프에서 비동기적으로 실행되는 태스크로 변환되며, 

이 태스크는 백그라운드에서 실행됩니다. 

이렇게 하면, 서버가 시작되는 동시에 메인 플로우는 다음 작업, 즉 클라이언트의 실행으로 진행할 수 있습니다.

async def run_async_server(address):
    server = await asyncio.start_server(
        handle_async_connection, *address) 

    print(server) 
    # <Server sockets=(<asyncio.TransportSocket fd=6, family=AddressFamily.AF_INET,
    #  type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 4321)>,)>
    
    async with server:
        await server.serve_forever()

async def handle_async_connection(reader, writer):
    session = AsyncSession(reader, writer)
    try:
        await session.loop()
    except EOFError:
        pass

asyncio.start_server는 클라이언트가 연결을 시도할 때마다 handle_async_connection 코루틴을 호출합니다. 이때, handle_async_connection 코루틴에는 두 개의 파라미터, readerwriter가 전달됩니다.

  • reader: 클라이언트로부터의 데이터를 읽기 위한 StreamReader 객체입니다.
  • writer: 클라이언트에게 데이터를 쓰기 위한 StreamWriter 객체입니다.

즉, 클라이언트가 send() #drain 를 호출하면 대기하고 있던 handle_async_connection 함수가 실행되는 원리인 것이죠

async def run_async_client(address):
    # Wait for the server to listen before trying to connect
    await asyncio.sleep(0.1)

    streams = await asyncio.open_connection(*address)   # New
    client = AsyncClient(*streams)                      # New

    async with client.session(1, 5, 3):
        results = [(x, await client.report_outcome(x))
                   async for x in client.request_numbers(5)]

    async with client.session(10, 15, 12):
        async for number in client.request_numbers(5):
            outcome = await client.report_outcome(number)
            results.append((number, outcome))

    _, writer = streams                                 # New
    writer.close()                                      # New
    await writer.wait_closed()                          # New

    return results

그 다음으로 클라이언트 코드를 구현한 부분을 확인해보겠습니다.

await asyncio.sleep(0.1) 주석으로는 서버가 연결 하기전에 서버가 잘 연결되었는지 기다린다 라고

적혀있는데, 여기서 의아했던 것은 서버 측에서 실행이 완료 된 상태에서 굳이 서버의 정상작동을

고려해야 하는지, 앞의 추측으로 해당코드는 불필요한 코드가 아닌지에 대해 생각해보았습니다.

결론적으로는 해당코드는 실제 서비스코드에서는 불필요할 것이라고 추측됩니다. 하지만 지금 저희는 테스트코드를

작성하고 있기 때문에 서버를 실행하는 코드 중 백그라운드 처리된 작업들이 전부 완성이 되지 않을

가능성이 있는 것이죠. 그렇기에 커넥션을 시도했을 때, 서버의 백그라운드 작업이 전부 완성되지 않는다면

예상치 못한 오류가 발생할 것이고 그러한 오류를 방지차원에서 sleep를 했다고 볼 수 있는 것입니다

그 이후의 코드들은 AsyncClient의 구현체에 따라 게임로직을 위해 파라미터들을 던지고

서버와 상호작용하는 코드들임을 볼 수 있습니다. 그 중에 조금 독특한 문법인

async with 와 async for은 사실 이전에 자주 보았던 with, for 앞에 async 라는 용어가 추가된

비동기적인 문법임을 알 수 있습니다. 실제로도 기능적인 면에서도 유사하다고 볼 수 있습니다

class AsyncConnectionBase:
    def __init__(self, reader, writer):             # Changed
        self.reader = reader                        # Changed
        self.writer = writer                        # Changed

    async def send(self, command):
        line = command + '\\n'
        data = line.encode()
        self.writer.write(data)                     # Changed
        await self.writer.drain()                   # Changed

    async def receive(self):
        line = await self.reader.readline()         # Changed
        if not line:
            raise EOFError('Connection closed')
        return line[:-1].decode()

# Example 15
class AsyncSession(AsyncConnectionBase):            # Changed
    def __init__(self, *args):
        super().__init__(*args)
        self._clear_values(None, None)

    def _clear_values(self, lower, upper):
        self.lower = lower
        self.upper = upper
        self.secret = None
        self.guesses = []

# Example 16
    async def loop(self):                           # Changed
        while command := await self.receive():      # Changed
            parts = command.split(' ')
            if parts[0] == 'PARAMS':
                self.set_params(parts)
            elif parts[0] == 'NUMBER':
                await self.send_number()            # Changed
            elif parts[0] == 'REPORT':
                self.receive_report(parts)
            else:
                raise UnknownCommandError(command)

# Example 17
    def set_params(self, parts):

# Example 18
    def next_guess(self):
        return guess

    async def send_number(self):                    # Changed
        await self.send(format(guess))              # Changed

# Example 19
    def receive_report(self, parts):
        print(f'Server: {last} is {decision}')

# Example 20
class AsyncClient(AsyncConnectionBase):             # Changed
    def __init__(self, *args):
        super().__init__(*args)
        self._clear_state()

    def _clear_state(self):
        self.secret = None
        self.last_distance = None

# Example 21
    @contextlib.asynccontextmanager                 # Changed
    async def session(self, lower, upper, secret):  # Changed
        await self.send(f'PARAMS {lower} {upper}')  # Changed
        try:
            yield
        finally:
            self._clear_state()
            await self.send('PARAMS 0 -1')          # Changed

# Example 22
    async def request_numbers(self, count):         # Changed
        for _ in range(count):
            await self.send('NUMBER')               # Changed
            data = await self.receive()             # Changed
            yield int(data)
            if self.last_distance == 0:
                return

# Example 23
    async def report_outcome(self, number):         # Changed
        await self.send(f'REPORT {decision}')       # Changed
        # Make it so the output printing is in
        # the same order as the threaded version.
        await asyncio.sleep(0.01)
        return decision

위 코드들이 뭔가 복잡해보이지만, 클라이언트 측에서 서버측으로 보낼 때 게임로직으로 인한

데이터 변환과정, 그 반대로도 거의 비슷한 과정을 갖고 있기에 위 코드들은 그냥 그렇게 쓰는구나

정도까지로 인식하면 좋을 것 같습니다

하지만 그중에 위 코드에서 한가지 좀 독특한 부분이 있는데요.

소켓을 활용한 코드와는 다르게 reader, writer 를 구분짓고 사용한다는 부분이였습니다.

소켓같은 경우에는 소켓객체를 활용해 보내진 메세지들을 읽고, 쓰기까지 동시에 가능했는데요

그에 반해 비동기 서버객체에서는 쓰기와 읽기를 따로 객체로 분리함을 볼 수 있습니다

왜 그럴까요?

asyncio에서 reader와 writer를 분리하여 사용하는 이유는 비동기 프로그래밍의 특성과 관련이 있습니다. asyncio의 StreamReader와 StreamWriter 객체는 비동기 I/O 작업을 수행할 때, 읽기와 쓰기 작업을 명확하게 분리하여 코드의 가독성과 유지 보수성을 향상시키기 위해 디자인되었습니다.

1. 명확한 역할 분리:

  • StreamReader는 데이터를 읽는 작업에만 집중하며, StreamWriter는 데이터를 쓰는 작업에만 집중합니다. 이로 인해 각 객체의 역할이 명확해지고, 코드가 더욱 직관적이며 이해하기 쉬워집니다.

2. 비동기 작업의 효율성:

  • asyncio를 사용하면 여러 비동기 작업을 동시에 수행할 수 있습니다. reader와 writer를 분리하면, 읽기와 쓰기 작업을 동시에 수행할 수 있어 효율성이 향상됩니다.

3. 유연성과 확장성:

  • reader와 writer의 분리는 더욱 유연한 코드 작성을 가능하게 합니다. 예를 들어, 특정 상황에서는 reader만 필요하고 writer는 필요하지 않을 수 있습니다. 이 경우, 필요한 객체만 생성하여 리소스를 효율적으로 사용할 수 있습니다.

 

아래는 그닥 도움이 되지 않겠지만, 대략적으로 위 코드의 형태를 그려보았습니다.

이 코드의 대략적인 구조( 명칭 다 다름 주의 )