IT_Programming/Network Programming

[펌] 소켓 입출력 모델 - Completion Port [IOCP] 모델

JJun ™ 2009. 11. 18. 14:04



 
출처

 : http://blog.naver.com/ree31206/46430257



 

소켓 입출력 모델 - Completion Port [IOCP] 모델

 

 

* 입출력 완료 포트(I/O completion port)

 

- 비동기 입출력 결과와 이 결과를 처리할 스레드에 대한 정보를 담고 있는 구조로 

  Overlapped 모델(II)에서 나오는 APC 큐와 비슷한 개념

 

 

 

* 입출력 완료 포트 vs. APC 큐의 차이점

 

- 생성과 파괴

   APC 큐는 각 스레드마다 자동으로 생성되고 파괴. 입출력 완료 포트는 CreateIoCompletionPort()

   함수로 생성하고, CloseHandle() 함수를 호출하여 파괴한다.

 

- 접근 제약

   APC 큐에 저장된 결과는 APC 큐를 소유한 스레드만 확인할 수 있지만 입출력 완료 포트에는

   이러한 제약이 없다. 대게 입출력 완료 포트를 접근하는 작업 스레드를 별도로 도는데

   이상적인 작업자 스레드 개수는 CPU 개수와 같게 하지만 몇가지 이유로 인해 CPU 개수 * n개를

   생성한다. (n 의 최소값은 1)

 

- 비동기 입출력 처리 방법

   APC 큐에 저장된 결과를 처리하려면 해다 스레드는 Alertable wait 상태에 진입해야 한다.

   입출력 완료 포트에 저장된 결과를 처리하려면 작업자 스래드는 GetQueuedCompletionStatus()

   함수를 호출해야 한다.

 

 

[동작 원리]

 

1. 애플리케이션을 구성하는 임의의 스레드에서 비동기 입출력 함수를 호출함으로써 운영체제에서

    입출력 작업을 요청한다.

 

2. 모든 작업자 스레드는 GetQueuedCompletionStatus() 함수를 호출하여 입출력 완료 포트를

    감시한다. 완료한 비동기 입출력 작업이 아직 없다면 모든 작업자 스레드는 대기상태가 된다.

    이 때 대기중인 작업자 스레드 목록은 입출력 완료 포트 내부에 저장.

 

3. 비동기 입출력 작업이 완료되면 운영체제는 입출력 완료 포트에 결과를 저장.

    이 때 저장되는 정보를 입출력 완료 패킷(I/O completion packet) 이라 부른다.

 

4. 운영체제는 입추력 완료 포트에 저장된 작업자 스레드 목록에서 하나를 선택하여 깨운다.

    대기 상태에서 개어난 작업자 스레드는 비동기 입출력 결과를 처리한다. 이 후 작업자 스레드는

    필요에 따라 다시 비동기 입출력 함수를 호출할 수 있다.

 

 

 

[Completion Port 모델을 이용한 소켓 입출력 절차]

 

1. CreateIoCompletionPort() 함수를 호출하여 입출력 완료 포트를 생성한다.

2. CPU 개수에 비례하여 작업자 스레드를 생성. 모든 작업자 스레드는

    GetQueuedCompletionStatus() 함수를 호출하여 대기상태가 됨.

 

3. 비동기 입출력을 지원하는 소켓을 생성. 이 소켓에 대한 비동기 입출력 결과가 입출력 완료 포트에

    저장되려면 CreateIoCompletionPort() 함수를 호출하여 소켓과 입출력 완료 포트를 연결시켜야

    한다.

 

4. 비동기 입출력 함수를 호출한다. 비동기 입출력 작업이 곧바로 완료되지 않으면,

    소켓 함수는 오류를 리턴하고, 오류 코드는 WSA_IO_PENDING 으로 설정된다.

 

5. 비동기 입출력 작업이 완료되면 운영체제는 입출력 완료 포트에 결과를 저장하고 대기중인

    스레드 하나를 깨운다. 대기 상테에서 깨어난 작업자 스레드는 비동기 입출력 결과를 처리한다.

 

6. 새로운 소켓을 생성하면 3 ~ 5 를 그렇지 않으면 4 ~ 5를 반복한다.

 

 

 

 

* 출력 완료 포트 생성하거나 소켓과 입출력 완료 포트를 연결

HANDLE CreateIoCompletionPort(

    HANDLE FileHandle,

    HANDLE ExistingCompletionPort,

    ULONG CompletionKey,

    DWORD NumberOfConcurrentThreads

);    성공 : 입출력 완료 포트 핸들.    실패 : NULL

 

FileHandle : 입출력 완료 포트와 연결한 파일 핸들. 

                   소켓 프로그래밍에서는 소켓 디스크립터를 넣어주면 된다.

                   새로운 입출력 완료 포트를 생성할때는 INVALID_HANDLE_VALUE 값을 사용해도 된다.

 

ExistingCompletionPort : 파일 또는 소켓과 연결할 입출력 완료 포트 핸들.

                                       NULL 이면 새로운 입출력 완료 포트를 생성.

 

CompletionKey : 입출력 완료 패킷에 들어갈 부가적인 정보로 32비트 값을 줄 수 있다.

                          입출력 완료 패킷은 비동기 입출력 작업이 완료할 때마다 생성되어

                          입출력 완료 포트에 저장된다.

 

NumberOfConcurrentThreads : 동시에 실행할 수 있는 작업자 스레드의 개수.

                                                  0 을 사용하면 자동으로 CPU 개수와 같은수로 설정

 

 

* 비동기 입출력 결과 확인

입출력 완료 패킷이 들어올때 까지 대기하다 입출력 완료 패킷이 입출력 완료 포트에 들어오면

운영체제는 실행중인 작업자 쓰레드를 체크하여 CreateCompletionPort() 에서 설정한 값보다 작다면

대기 상태인 작업자 스레드를 깨워서 입출력 완료 패킷을 처리하도록 한다.

 

BOOL GetQueuedCompletionStatus(

    HANDLE CompletionPort,

    LPDWORD lpNumberOfBytes,

    LPDWORD lpCompletionKey,

    LPOVERLAPPED* lpOverlapped,

    DWORD dwMilliseconds

);    성공 : 0 이 아닌 값.    실패 : 0

 

CompletionPort : 입출력 완료포트 핸들

lpNumberOfBytes : 비동기 입출력 작업으로 전송된 바이트 수가 여기에 저장된다.

lpCompletionKey : CreateCompletionPort() 함수 호출시 전달한 세 번째 인자(32비트)가 여기에

                             저장된다.

 

lpOverlapped : 비동기 입출력 함수 호출시 전달한 OVERLAPPED 구조체의 주소값이 여기에

                        저장된다.

 

dwMilliseconds : 작업자 스레드가 대기할 시간을 설정한다. INFINITE 값을 사용하면

                           입출력 완료 패킷이 생성되어 체제가 자신을 깨울 때까지 무한대기.

 

 

 

 

[Completion Port(IOCP) 모델을 이용한 에코서버]

 

#include <winsock2.h>
#include <stdlib.h>
#include <stdio.h>

 

#define BUFSIZE 512

 

// 소켓 정보 저장을 위한 구조체
struct SOCKETINFO
{
    OVERLAPPED overlapped;
    SOCKET sock;
    char buf[BUFSIZE+1];
    int recvbytes;
    int sendbytes;
    WSABUF wsabuf;
};

 

// 소켓 입출력 함수
DWORD WINAPI WorkerThread(LPVOID arg);

// 오류 출력 함수
void err_quit(char *msg);
void err_display(char *msg);

 

int main(int argc, char* argv[])
{
    int retval;
 
    // 윈속 초기화
    WSADATA wsa;
    if(WSAStartup(MAKEWORD(2,2), &wsa) != 0)
        return -1;
 
    // 입출력 완료 포트 생성
    HANDLE hcp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if(hcp == NULL) return -1;
 
    // CPU 개수 확인
    SYSTEM_INFO si;
    GetSystemInfo(&si);
 
    // (CPU 개수 * 2)개의 작업자 스레드 생성
    HANDLE hThread;
    DWORD ThreadId;
    for(int i=0; i<(int)si.dwNumberOfProcessors*2; i++) {
        hThread = CreateThread(NULL, 0, WorkerThread, hcp, 0, &ThreadId);
        if(hThread == NULL) return -1;
        CloseHandle(hThread);
    }
 
    // socket()
    SOCKET listen_sock = socket(AF_INET, SOCK_STREAM, 0);
    if(listen_sock == INVALID_SOCKET) err_quit("socket()");
 

    // bind()
    SOCKADDR_IN serveraddr;
    ZeroMemory(&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;
    serveraddr.sin_port = htons(9000);
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    retval = bind(listen_sock, (SOCKADDR *)&serveraddr, sizeof(serveraddr));
    if(retval == SOCKET_ERROR) err_quit("bind()");
 
    // listen()
    retval = listen(listen_sock, SOMAXCONN);
    if(retval == SOCKET_ERROR) err_quit("listen()");
 
    while(1)

    {
        // accept()
        SOCKADDR_IN clientaddr;
        int addrlen = sizeof(clientaddr);
        SOCKET client_sock = accept(listen_sock, (SOCKADDR *)&clientaddr, &addrlen);
        if(client_sock == INVALID_SOCKET) {
            err_display("accept()");
            continue;
        }
        printf("[TCP 서버] 클라이언트 접속: IP 주소=%s, 포트 번호=%d\n",
                            inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port));
  
        // 소켓과 입출력 완료 포트 연결
        HANDLE hResult = CreateIoCompletionPort((HANDLE)client_sock, hcp, 

                                                                                 (DWORD)client_sock, 0);
        if(hResult == NULL)

                  return -1;
  
        // 소켓 정보 구조체 할당
        SOCKETINFO *ptr = new SOCKETINFO;
        if(ptr == NULL) {
            printf("[오류] 메모리가 부족합니다!\n");
            break;
        }


        ZeroMemory(&(ptr->overlapped), sizeof(ptr->overlapped));
        ptr->sock = client_sock;
        ptr->recvbytes = 0;
        ptr->sendbytes = 0;
        ptr->wsabuf.buf = ptr->buf;
        ptr->wsabuf.len = BUFSIZE;
  

        // 비동기 입출력 시작
        DWORD recvbytes;
        DWORD flags = 0;
        retval = WSARecv(client_sock, &(ptr->wsabuf), 1, &recvbytes, &flags, &(ptr->overlapped),

                                      NULL);


        if(retval == SOCKET_ERROR) {
            if(WSAGetLastError() != ERROR_IO_PENDING) {
                err_display("WSARecv()");
            }
            continue;
        }
    }
    // 윈속 종료
    WSACleanup();
    return 0;
}


DWORD WINAPI WorkerThread(LPVOID arg)
{
    HANDLE hcp = (HANDLE)arg;
    int retval;
 
    while(1)

    {
        // 비동기 입출력 완료 기다리기
        DWORD cbTransferred;
        SOCKET client_sock;
        SOCKETINFO *ptr;
        retval = GetQueuedCompletionStatus(hcp, &cbTransferred,
            (LPDWORD)&client_sock, (LPOVERLAPPED *)&ptr, INFINITE);
  
        // 클라이언트 정보 얻기
        SOCKADDR_IN clientaddr;
        int addrlen = sizeof(clientaddr);
        getpeername(ptr->sock, (SOCKADDR *)&clientaddr, &addrlen);
  
        // 비동기 입출력 결과 확인
        if(retval == 0 || cbTransferred == 0) {
            if(retval == 0) {
                DWORD temp1, temp2;
                WSAGetOverlappedResult(ptr->sock, &(ptr->overlapped), &temp1, FALSE, &temp2);
                err_display("WSAGetOverlappedResult()");
            }
            closesocket(ptr->sock);
            printf("[TCP 서버] 클라이언트 종료: IP 주소=%s, 포트 번호=%d\n",
                inet_ntoa(clientaddr.sin_addr), ntohs(clientaddr.sin_port));
            delete ptr;
            continue;
        }
  
        // 데이터 전송량 갱신
        if(ptr->recvbytes == 0) {
            ptr->recvbytes = cbTransferred;
            ptr->sendbytes = 0;
            // 받은 데이터 출력
            ptr->buf[ptr->recvbytes] = '\0';
            printf("[TCP/%s:%d] %s\n", inet_ntoa(clientaddr.sin_addr),

                                                                   ntohs(clientaddr.sin_port), ptr->buf);
        }
        else {
            ptr->sendbytes += cbTransferred;
        }
  
        if(ptr->recvbytes > ptr->sendbytes) {
            // 데이터 보내기
            ZeroMemory(&(ptr->overlapped), sizeof(ptr->overlapped));
            ptr->wsabuf.buf = ptr->buf + ptr->sendbytes;
            ptr->wsabuf.len = ptr->recvbytes - ptr->sendbytes;
   
            DWORD sendbytes;
            retval = WSASend(ptr->sock, &(ptr->wsabuf), 1, &sendbytes, 0, &(ptr->overlapped),

                                              NULL);


            if(retval == SOCKET_ERROR) {
                if(WSAGetLastError() != WSA_IO_PENDING) {
                    err_display("WSASend()");
                }
                continue;
            }   
        }
        else {
            ptr->recvbytes = 0;
   

            // 데이터 받기
            ZeroMemory(&(ptr->overlapped), sizeof(ptr->overlapped));
            ptr->wsabuf.buf = ptr->buf;
            ptr->wsabuf.len = BUFSIZE;
            DWORD recvbytes;
            DWORD flags = 0;
            retval = WSARecv(ptr->sock, &(ptr->wsabuf), 1, &recvbytes, &flags,

                                                 &(ptr->overlapped), NULL);


            if(retval == SOCKET_ERROR) {
                if(WSAGetLastError() != WSA_IO_PENDING) {
                    err_display("WSARecv()");
                }
                continue;
            }
        }
    }
    return 0;
}

 

// 소켓 함수 오류 출력 후 종료
void err_quit(char *msg)
{
    LPVOID lpMsgBuf;
    FormatMessage(
        FORMAT_MESSAGE_ALLOCATE_BUFFER|
        FORMAT_MESSAGE_FROM_SYSTEM,
        NULL, WSAGetLastError(),
        MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
        (LPTSTR)&lpMsgBuf, 0, NULL);
        MessageBox(NULL, (LPCTSTR)lpMsgBuf, msg, MB_ICONERROR);
    LocalFree(lpMsgBuf);
    exit(-1);
}


// 소켓 함수 오류 출력
void err_display(char *msg)
{
    LPVOID lpMsgBuf;
    FormatMessage(
        FORMAT_MESSAGE_ALLOCATE_BUFFER|
        FORMAT_MESSAGE_FROM_SYSTEM,
        NULL, WSAGetLastError(),
        MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
        (LPTSTR)&lpMsgBuf, 0, NULL);
    printf("[%s] %s", msg, (LPCTSTR)lpMsgBuf);
    LocalFree(lpMsgBuf);
}



더보기



[socket] IOCP 서버 제작에 있어서의 유의할 점들

IOCP 서버 제작에 있어서의 유의할 점들 
현재 개발중인 게임 서버의 소켓이 4년전에 제작한 비동기 이벤트 셀렉트 방식인 관계로 퍼포먼스를 향상시키고자 IOCP 네트워크 구조를 최근 제작하게 되었다. 실제 게임서버에 적용 가능할지는 좀 더 고려해 보아야 하겠지만, IOCP 서버를 만들면서 겪었던 점들을 공유하고자 한다. 

시중의 책들과 공개된 소스들을 참고해서 IOCP를 구현해 보면, 항상 과부하 테스트시에 문제가 발생했다. 내가 설정한 과부하 테스트는 다음과 같은 상황이다:

1) 클라이언트 측에서 과도할 정도로 Connect를 시도한다.
2) 서버는 Accept 직후 랜덤하게 연결을 끊어버린다.
3) 클라이언트는 Connect된 직후 서버로 데이터를 전송한다.
4) 서버는 클라이언트로부터 데이터가 수신되면 바로 응답메시지를 전송한다. 이때 응답메시지를 전송할 내부 버퍼(소켓 버퍼 아님)가 모자라는 경우 연결을 끊는다. 이 처리와는 별도로 데이터 수신시 랜덤하게 연결을 끊는다.
5) 클라이언트는 서버로부터 데이터를 수신하면 바로 응답메시지를 전송한다. 이때 응답메시지를 전송할 내부 버퍼(소켓 버퍼 아님)가 모자라는 경우 연결을 끊는다. 이 처리와는 별도로 데이터 수신시 랜덤하게 연결을 끊는다.
6) 클라이언트는 연결이 끊어진 커넥션이 발생하면 그에 대응하는 Connect를 시도한다. 

클라이언트는 초기에 몇천개 이상의 Connect를 시도하고 연결된 커넥션들에 대해 각각 위의 규칙대로 처리를 반복하는 상황을 만들어 테스트 해 보았는데, 여러번의 삽질끝에 발견한 문제점들은 다음과 같다.

일단, 시중에 떠도는 IOCP소스들의 대부분은 에코(Echo) 서버들이다. 이 소스들은 항상 데이터를 recv한 다음 send를 하므로 참조 카운트가 1 이상 올라가지 않지만, 게임 서버는 그렇지 않다. 걸어놓은 recv에 대한 응답이 안 온 상태에서 send를 할 수 있으므로 소켓 1개에 대해 2개의 참조 카운트가 발생할 수 있다. GetQueuedCompletionStatus가 FALSE를 리턴한 경우, 대부분의 에코서버 소스에서는 바로 소켓을 close한 다음 커넥션 객체를 삭제해 버리는데, 이것은 참조 카운트가 1이상 올라가지 않기 때문이다. 이런 경우 게임서버에서는 그 소켓에 대한 참조카운트가 2라면, 그냥 소켓을 close한 다음 나머지 작업에 대한 실패 통보가 와서 참조 카운트가 0이 되었을때 커넥션 객체를 삭제해야만 한다. 마찬가지로 WSARecv에 대한 리턴이 왔는데 전송 바이트가 0인 경우에도 무조건 객체를 지워서는 안된다.

IOCP에서 WSASend, WSARecv를 사용할 때 소켓에러 없이(IO_PENDING는 에러가 아니므로 제외) 포스팅된 소켓 연산의 결과는 반드시 각각 GetQueuedCompletionStatus에서 리턴된다. 단, 함수의 반환값은 TRUE일수도 FALSE일수도 있다.

한가지 이상한 현상을 발견했는데, close한 소켓을 WSASend나 WSARecv에 사용했을때 에러가 반환되지 않는 경우가 있다. 예를 들어, 소켓 A에 대해서 WSARecv가 포스팅된 상태에서, 소켓 A를 close했다. 이 때 GetQueue... 에서 WSARecv에 대한 성공 결과가 리턴될 수 있다(close하기 전에 성공한 결과일 수 있으므로), 이때 다시 WSARecv를 걸려고 할 때 close된 소켓이므로 WSARecv가 에러를 발생해야 정상이므로 이 시점에 커넥션을 삭제하고자 했는데, WSARecv가 에러를 발생시키지 않는 거다. 원인은 잘 모르겠지만, 아마도 네트워크 과부하 상황일때 이런 현상이 발생하는게 아닌가 추측하고 있다. 결국 결국 소켓이 닫혔다는 별도의 플래그를 만들어 직접 해결할 수 밖에 없었다. 이런 상황에서 얻은 정보를 써 보자면:

서버에서 소켓을 close하는 것이 closesocket() 호출 이전에 포스팅한 WSASend, WSARecv를 꼭 실패시키지는 않는다. 성공할 수도 있고, 실패할 수도 있다.

서버에서 소켓을 close하면 이전에 포스팅된 연산에 대한 결과는 반드시 GetQueuedCompletionStatus에서 각각 모두 리턴된다. 


이런 것들을 이해하고 나니, 과부하 테스트에서도 정확하게 동작하는 IOCP 네트워크 클래스를 제작할 수 있었다.


출처: https://androbee.tistory.com/20 [멋대로 해버려]




clientVC6.zip
0.11MB
IOCP2.doc
0.08MB
IOCPQueueAppendix.zip
0.12MB
IocpServerVC6.zip
0.15MB
IOCP1.doc
0.09MB
IOCP3.doc
0.12MB
IOCP3-2.doc
0.05MB
IOCP4.doc
0.1MB