본문 바로가기
System

I/O Multiplexing 톺아보기 (2부)

by 라바킴 2021. 2. 16.

이전 1부에서 Blocking/Non-Blocking, Synchronous/Asynchronous 기본 개념과 Linux 환경에서의 기법들을 살펴보았다.

I/O 동작의 가장 기본 형태인 Synchronous Blocking 방식은 굉장히 직관적이긴 하지만 2개 이상의 파일을 처리할 때는 multi-process 또는 multi-thread 기법으로 동작해야했다. Non-Blocking이라 한들 지속적인 Context Switching이 일어나기도 한다. 즉 Synchronous 방식에서 성능을 고려하면 multi-processing 또는 multi-threading 을 구현하지 않을 수 없다는 것이다.

하지만 multi-processing 환경에선 IPC(프로세스간 통신)나 동기화(semaphore, mutex 등)를 고려해야만 하기 때문에 복잡한 이슈가 생길 수 있어 Multiplexing 기법이 각광 받았다고 하였다.

 

이번 포스팅에선 linux 외 환경에서의 I/O multiplexing 기법에 대해 알아보자.

 

 

1. kpueue - FreeBSD

 

k-que.... 아니 kqueue는 FressBSD 환경에서 사용하는 I/O Multiplexing 기법이다. kernel에 event를 저장할 queue를 생성하면, I/O event가 queue에 쌓이고 사용자가 이를 직접 polling하는 방식이기 때문에 select, poll처럼 event가 발생한 FD를 찾기 위한 추가적인 작업이 필요없는 기법이다. FreeBSD 환경에서의 epoll이라 할 수 있겠다. 함수 원형은 아래와 같다.

 

     #include <sys/event.h>

     int kqueue(void);

     int kevent(int kq, const struct kevent *changelist, int nchanges,
	 struct kevent *eventlist, int nevents,
	 const struct timespec *timeout);

 

kqueue() 함수는 event를 저장할 새로운 queue를 kernel에 요청하는 함수이다. 해당 queue의 descripter를 반환받는다. 

kevent() 함수는 kqueue에 특정 event를 등록하고 보류중인 event를 등록하거나 사용자에게 반환하는 system call 함수이다. 즉 kqueue()로 할당받은 공간을 kevenet()로 관리하는 셈이다.

 

  • changelist : <sys / event.h>에 정의 된 kevent 구조의 배열에 대한 pointer
    • 변경 목록에 포함 된 모든 변경 사항은 큐에서 보류중인 이벤트를 읽기 전에 적용
  • nchanges : 변경 목록의 크기
    • eventlist : kevent 구조체 배열에 대한 pointer
  • nevents : eventlist의 크기
    • 0 : 즉시 반환
  • timeout : null이면 kevent()가 무한 대기하는 것
    • user process의 polling을 위해선 값이 0 인 timespec 구조를 가리키게 구현해야 함

 

kqueue() 함수에 대한 모든 작업은 같은 이름의 kevent 라는 구조체에 의해 이루어진다. kevent는 ident, filter라는 인자를 하나의 key로 삼아 식별되며, kqueue 내에는 독립적인 kevent만 존재하게 된다. 식별자인 ident 외 filter까지 하나의 key로 보는 이유는 filter가 기존 저장된 event가 존재하는지 여부를 판단하는 역할을 하기 때문인데,

 

filter는 kevent의 초기 등록시 실행되며, I/O event는 발생할 때마다 filter가 확인을 하게되고, 신규 event로 판단하면 해당 kevent는 kqueue에 배치된다. 사용자가 kqueue에서 kevent를 검색하려고 할 때도 실행되는데 만약 event 발생 조건에 부합되지 않는다면 해당 kevent는 kqueue에서 제거되고 return 되지 않는다. 이렇게 선분류를 해주는 filter 덕분에 kqueue에는 kevent가 최소한으로 배치된다.

 

각 filed들에 대한 부연 설명과 몇 가지 예시와 함께 kevent 구조체를 살펴보자. 

 

	struct kevent {
	     uintptr_t       ident;           /* identifier for this event */
	     int16_t         filter;           /* filter for event */
	     uint16_t        flags;          /* general flags */
	     uint32_t        fflags;          /* filter-specific flags */
	     intptr_t        data;            /* filter-specific data */
	     void            *udata;         /* opaque user data identifier */
	     uint64_t        ext[4];	     /*	extensions */

 	};

 

  • ident : event 식별자
  • filter : event 선처리할 때 사용되는 filter
    • EVFILT_READ : FD를 ident로 지정 -> 읽을 data가 있을 때마다 반환
    • EVFILT_WRITE : FD를 ident로 지정 -> 쓸 data가 있을 때마다 반환 
    • EVFILT_EMPTY : FD를 ident로 지정 -> 쓸 data가 없을 때마다 반환 
    • EVFILT_VNODE : FD 또는 fflags에서 지정한 event를 ident로 지정 -> event 발생 시 반환
    • EVFILT_PROC : 감시할 pid 또는 fflags에서 지정한 event 를 ident로 지정 -> event 발생 시 반환
    • EVFILT_SIGNAL : signal number를 ident로 지정 -> signal 발생 시 반환
    • EVFILT_TIMER : 임의의 timer를 ident로 지정 -> 주기마다 반환
  • flags : event에 수행할 작업
    • EV_ADD : kqueue에 이벤트를 추가
      • 있는 event를 또 추가하면 인자가 update되어 중복 방지
      • 추가된 event는 EV_DISABLE 플래그로 재정의되지 않는 한 자동으로 활성화
    • EV_ENABLE : kevent() 호출 시 event 반환을 허용
    • EV_DISABLE : 이벤트를 비활성화하여 kevent ()가 반환하지 않도록 함. 필터 자체는 비활성화되지 않음
    • EV_DISPATCH 이벤트 전달 직후 EV_DISABLE 설정
    • EV_DELETE : kqueue에서 이벤트를 제거
    • EV_RECEIPT : kqueue 대량 변경 시 유용 (보류중인 event는 배제)
    • EV_ONESHOT : event 감지로 인한 첫 번째 filter 실행만 반환
      • 이후 사용자가 kqueue에서 event를 검색하면 삭제됨
    • EV_CLEAR : 사용자가 event 검색 후 상태 재설정
      • 재 상태 대신 상태 변화를 보고하는 필터에 유용
    • EV_ERROR : 각종 error 
  • fflgas : filter 별 flag
  • data : filter 별 data 값
  • udata : 명확하지 않은 user data
  • ext[4] : kernel과 주고받는 확장 data
    • ext[0], ext[1] : filter에 의해 정의
    • ext[2], ext[3] : context

kevent 구조체를 쉽게 초기화하기 위한 EV_SET() 함수도 제공된다.

     EV_SET(kev, ident,	filter,	flags, fflags, data, udata);

 

아래는 간단한 I/O 작업을 확인할 수 있는 예제 C 코드이다. 이는 BSD 공식 홈페이지에서 제공되고 있으며 kqueue() 함수를 통해 queue에 대한 descriptor를 얻고 kevent() 함수를 통해 event를 처리하는 부분에서 위에서 설명한 핵심 로직을 확인할 수 있다.

 

     #include <sys/event.h>
     #include <err.h>
     #include <fcntl.h>
     #include <stdio.h>
     #include <stdlib.h>
     #include <string.h>

     int main(int argc, char **argv)
     {
	 struct	kevent event;	 /* Event we want to monitor */
	 struct	kevent tevent;	 /* Event triggered */
	 int kq, fd, ret;

	 if (argc != 2)
	     err(EXIT_FAILURE, "Usage: %s path\n", argv[0]);
	 fd = open(argv[1], O_RDONLY);
	 if (fd	== -1)
	     err(EXIT_FAILURE, "Failed to open '%s'", argv[1]);

	 /* Create kqueue. */
	 kq = kqueue();
	 if (kq	== -1)
	     err(EXIT_FAILURE, "kqueue() failed");

	 /* Initialize kevent structure. */
	 EV_SET(&event,	fd, EVFILT_VNODE, EV_ADD | EV_CLEAR, NOTE_WRITE,
	     0,	NULL);
	 /* Attach event to the	kqueue.	*/
	 ret = kevent(kq, &event, 1, NULL, 0, NULL);
	 if (ret == -1)
	     err(EXIT_FAILURE, "kevent register");
	 if (event.flags & EV_ERROR)
	     errx(EXIT_FAILURE,	"Event error: %s", strerror(event.data));

	 for (;;) {
	     /*	Sleep until something happens. */
	     ret = kevent(kq, NULL, 0, &tevent,	1, NULL);
	     if	(ret ==	-1) {
		 err(EXIT_FAILURE, "kevent wait");
	     } else if (ret > 0) {
		 printf("Something was written in '%s'\n", argv[1]);
	     }
	 }
     }

 

2. /dev/poll - Solaris

 

solaris 계열에서 사용하는 polling 방식의 다중화 기법이다. oracle에서는 /dev/poll을 '많은 수의 File Descriptor를 효율적으로 polling 할 수 있는 driver'라고 정의하고 있다. 동작 방식은 poll 기법과 매우 유사하지만 epoll이나 kqueue와 마찬가지로 FD의 개수가 아닌 event가 발생한 FD 자체를 반환해준다. 아예 driver 형태로 구현되었기 때문에 kernel과 user space 영역 간 data 전달이 자유롭다는 장점이 있다.

 

구현을 위해선 FD와 event를 담고있는 pollfd 라는 구조체 배열을 /dev/poll driver에 기록해두면 되는데 이는 open(), write(), ioctl() 세 가지 system call에 의해 구현된다.

#include <sys/devpoll.h>
int fd = open("/dev/poll", O_RDWR);
ssize_t n = write(int fd, struct pollfd buf[], int bufsize);
int n = ioctl(int fd, DP_POLL, struct dvpoll* arg);
int n = ioctl(int fd, DP_ISPOLLED, struct pollfd* pfd);

open() 함수 호출로 /dev/poll driver를 열어두고 모니터링할 FD들을 pollfd라는 구조체에 저장하여 등록해준다. 해당 작업은 write()로 수행하며, 이 함수는 성공 시 기록한 bytes 크기를 반환하기 때문에 모니터링 대상인 pollfd 배열의 size와 같게 된다.

	struct pollfd {
	    int   fd;
	    short events; 
	    short revents;
	 }

 

  • fd : polling 되는 file descriptor
  • events : 모니터링 할 event
    • 만약 pollfd 배열에서 FD가 중복되는 경우 events는 OR로 처리
    • POLLREMOVE : 감지되면 해당 FD 삭제
  • revents : 사용되지 않음

 

ioctl() 함수는 발생하는 event에 대한 관리 기능을 제공하며 DP_POLL 방식과 DP_ISPOLLED 방식으로 나뉜다.

DP_POLL 방식은 대상 FD에서 발생한 event를 검색하는 작업을 수행한다. 인자로 dvpoll 구조체의 pointer가 들어올 경우 DP_POLL, pollfd 구조체 pointer가 직접 들어올 경우 DP_ISPOLLED 방식으로 동작한다.

 struct dvpoll {
     struct pollfd* dp_fds;
     int dp_nfds;
     int dp_timeout;
 }

 

  • dp_fds : 관심 대상 pollfd 배열의 pointer
  • dp_nfds : event가 감지된 pollfd 구조체에 대한 buffer 크기 지정 = 한 번에 polling 할 수 있은 FD 수의 최댓값
  • dp_timeout : millisecond 단위
    • 0 : 즉시 반환
    • -1 : 무한 대기

DP_POLL 방식의 ioctl() 호출에선 실패 시 -1, timeout 발생 시 0을 반환하며 그 외 호출 성공 시엔 dp_fds가 가리키는 배열에서 event가 발생한 pollfd 항목 수를 반환한다. 이때 pollfd의 revents에는 발생한 event가 저장되며 호출이 실패한 경우엔 –1로 설된다.

 

DP_ISPOLLED 방식의 ioctl()는 해당 FD가 모니터링 되고있는지 유무를 확인한다. 해당 FD를 가지는 pollfd가 있으면 ioctl()은 1이 반환되고 event 필드에 0, revent 필드엔 polling 된 evnet가 채워진다. 없으면 pollfd 구조체를 구정하지 않고 단순 0만 반환한다.

 

 

/dev/poll 기법에는 이 정도의 기본 기능만 구현되어있다. 더 깊이 들어가볼 수도 있겠지만 Solaris에서는 /dev/poll가 아닌 다른 poll 방식의 다중화 기법을 모색 중이라고 한다. fade-out 중이라는 의미로 언젠간 소리소문없이 Solaris Release에서 사라질 수 있다. 그러니 이런 모델도 있었다는 것만 기억하고 이쯤에서 /dev/poll은 보내주기로 하자. 


 

#TMI
IOCP에서 자주 언급할 thread의 개념을 간단하게 짚고 가도록 한다.

process는 생성 작업부터 memory 영역을 전부 복사하기 때문에 운영체제 입장에서는 꽤 부담되는 대상이다. 또한 process 마다 독립된 memory 공간을 유지하기에 process 간 통신을 위해선 별도 IPC 기법이 필요하다. 뿐만 아니라 program 실행을 위해 process 정보가 main memory에 올라오게 되는데, multi-process 환경이라 한다면 이전 process를 memory에서 내려서 disk로 옮기고 다음 process를 올리는 context switching이 매우 빈번하게 발생시킬 수 있는 비교적 무거운 단위라고 볼 수 있다. 

결국 IT 산업 발전에 따라 복잡해지는 사용자 요구에 맞추어 multi-processing 특징을 유지하면서 보다 경량화 시킨 thread라는 개념이 등장한다. 이는 process 보다 가볍고 빠르게 동작하며 data 교환에도 별도 기법이 필요하지 않은 형태이다. process의 memory 구조는 크게 전역 변수가 할당되는 'data 영역', malloc() 함수 등에 의해 동적 할당이 이뤄지는 'heap 영역', 함수 실행에 사용되는 'stack 영역'으로 이루어 지는데, thread는 stack 영역만 독립적으로 유지하며 data, heap 영역은 공유하는 구조로 설계되었다. 이에 상대적으로 process보다 효율적인 thread를 접목시키는 사례가 많아지게 된다.

 

3. IOCP(Input Output Completion Port) - Windows 

 

socket이라는 개념은 TCP/IP를 unix 개발자들이 사용하기 편하게 라이브러리 형태로 제공하면서 개발되었던 netwrok interface이다. 이를 windows 환경에서 사용할 수 있게 만든 것을 windows socket API, 줄여서 winsock이라 부른다. version 1에서는 unix socket과 호환성을 제공하면서 TCP/IP를 지원하는 것이 주 목적이었다면 version 2에서는 보다 다양한 protocol을 지원하도록 확장되며 multi-thread를 접목시켜 I/O 성능을 대폭 향상시켰다. 지금까지도 많은 개발자들의 관심을 받는 Windows I/O의 선두자인 IOCP 모델에 대해 알아보자.

 

Input Output Completion Port의 약자로 Windows에서 지원하는 I/O 다중화 모델이다. 여러 socket을 하나의 IOCP 객체로 처리하며 해당 객체 하위에서 돌아가는 thread도 여러 개가 동시 대기하는 구조이다. IOCP는 kernel로 부터 반환받는 결과에 따라 미리 생성돼있던 thread를 깨우거나 대기 상태로 유지시킨다. Asynchronos 한 모델이라 볼 수 있다.

IOCP가 등장하기 전에도 Asynchronos 특징을 가진 WSAEventSelect, Overlapped I/O 등의 모델들이 있었다. WSAEventSelect 모델은 실제 I/O 동작이 아닌 '알림'이 비동기로 처리되는 방식이며, windows 95 이후 발표된 winsock 2에서 최초로 등장한 모델이라 거의 대부분의 windows 플랫폼에서 지원하는 모델이기에 호환성이 높다는 장점은 있었으나, I/O 처리에 대한 낮은 성능과 socket 하나하나마다 I/O 처리 결과를 관리해줘야 해서 비용이 비싼 편이었다.

 

Overlapped I/O 모델은 I/O 작업 자체를 비동기로 '중첩시켜' 처리하는 특징으로 1부에서 언급된 AIO와 흡사한 특징으로 기존 방식에 비해 성능이 우수하지만, 여전히 완료 통지를 event로 전달받고 있었기에 각 socket에 대한 event 처리를 일정 단위의 thread로 처리해야 했으며, 이에 연결 요청 처리와 I/O 처리의 반복 호출로 인한 context switching이 빈번하다는 문제점을 가지고 있었다. 

 

IOCP는 이 단점을 해결하기 위해 Completion Routine(완료 루틴)으로 I/O 완료 통지를 받아보게 설계되었다. Completion Routine은 흔히 말하는 callback 함수의 역할로 WSASend와 WSARecv 함수를 호출하여 I/O 작업이 완료된 시점에 호출된다. 이로써 IOCP는 무한한 socket을 수용할 수 있는 완성도 높은 windows I/O 모델로 인정받게 된다. socket 수와 무관한 고정된 수의 thread와 IOCP 객체로(대부분 하나) 동작하여 접속 량이 많아지는 경우에도 성능이 크게 떨어지지 않는다. 

 

즉 여기서 명명하는 Overlapped I/O 모델과 IOCP는 둘 다 Overlapped 즉 Asynchronos 한 I/O 모델이며 기본 동작 방식은 비슷하나 I/O 작업이 완료된 상태에서의 확인 방법에서 차이가 있다고 볼 수 있다.

 

IOCP의 구현을 위한 필수 함수들을 살펴보자.

 

1) CreateIoCompletionPort()

IOCP 사용을 위해선 우선 사용자 요청을 입력 받을 창구 역할을 해줄 IOCP object를 생성해야 한다. 그리고 실제 사용자의 요청이 해당 object로 잘 전달될 수 있게 생성된 IOCP object에 socket을 연결해둬야 한다. 이 두 역할은 모두 CreateIoCompletionPort() 함수가 담당한다. (프로투잡러)

HANDLE WINAPI CreateIoCompletionPort(
  _In_     HANDLE    FileHandle,
  _In_opt_ HANDLE    ExistingCompletionPort,
  _In_     ULONG_PTR CompletionKey,
  _In_     DWORD     NumberOfConcurrentThreads
);

 

  • FileHandle : Completion Port에 연결하고자 하는 socket 
    • 새로운 IOCP object 생성 시엔 INVALID_HANDLE_VALUE
  • ExistingCompletionPort : IOCP object 
    • 새로운 IOCP object 생성 시엔 NULL
  • CompletionKey : 완료된 I/O 관련 정보 전달을 위한 인자
    • I/O 완료 대기 시 user process가 호출하는 GetQueuedCompletionStatus에 전달되는 값
      • socket과 IOCP object 연결 시에만 유효
      • 새로운 IOCP object 생성 시엔 0
  • NumberOfConcurrentThread : 동시에 실행 가능한 thread 수
    • 0이면 해당 시스템의 최대 CPU core 수 만큼 지정
      • 새로운 IOCP object 생성 시에만 유효

새로운 IOCP object 생성시엔 위 옵션 별로 지정된 값을 지니며, 아니라면 FileHandle로 전달된 socket과 ExistingCompletionPort 필드로 전달된 IOCP object를 연결하게 된다.

 

socket과 IOCP object '연결' 시엔 첫 번째 인자에 socket, 두 번째 인자에 IOCP object 값을 넣어주면 된다고 하였다. 이로인한 일반적인 구현 형태는 아래와 같다.

if (CreateIoCompletionPort (tmp_sock, cp_object, (DWORD), 0) != cp_object)
    return 0; // error

 

IOCP object를 처음 '생성'할 때 넣어줄 인자는 마지막으로 존재하는 NumberOfConcurrentThreads 한 가지 뿐이다. 미리 thread 수를 한정해두는 역할이기에 처음 만들어질 때만 의미가 있는 것이다. 가상 이상적인 그림은 실행되는 thread가 CPU core 개수와 동일한 경우이므로 일반적으로 0으로 설정하게 된다. 결론적으로 IOCP object 생성 시엔 대부분 아래 형태의 코드로 구현된다.

cp_object = CreateIoCompletionPort (INVALID_HANDLE_VALUE , NULL, 0, 0)
if (cp_object == NULL)
    return 0; // error

 

다만 일반적으로 실제로 만드는 thread 개수와 앞서 선언한 NumberOfConcurrentThreads와는 차이가 있다. 보통 대기용 thread 개수까지 포함하여 총 CPU core의 2배 개수 정도로 산정하기 때문이다. 이 범위 만큼의 thread 들은 대기 상태에서 I/O 완료 응답이 올때마다 하나씩 깨어나는 것이고 동시에 깨어날 수 있는 한계가 NumberOfConcurrentThreads 만큼인 셈이다.

 

이처럼 굳이 동시 실행되지 않는 thread까지 사전에 고려하는 목적은 성능 향상에 있다. 깨어난 thread 중 하나가 sleep 또는 wait 부류의 함수를 호출하여 대기 상태로 들어가는 경우엔 CPU를 점유하지 않기에 자원 활용 효율이 떨어지는데 이때 열심히 일할 다른 thread를 깨워 그 자리를 대치시킨다. 이 부분에서 IOCP의 철학은 context switching을 최소화하여 I/O 작업 처리에 집중함과 동시에 깨어있는 대기 thread가 없도록 자원을 최대한 활용하고자 함이라는 것을 알 수 있다.

 

다음으로 요청한 I/O 작업이 완료 될까지 대기하는 함수를 살펴보자.

 

2) GetQueuedCompletionStatus()

IOCP에서는 I/O 완료 결과를 completion queue에 저장하여 처리한다. 해당 GetQueuedCompletionStatus() 함수는 해당 completion queue에 완료 결과가 존재하는지 여부를 확인하는 용도이다. 

BOOL GetQueuedCompletionStatus(
  HANDLE       CompletionPort,
  LPDWORD      lpNumberOfBytesTransferred,
  PULONG_PTR   lpCompletionKey,
  LPOVERLAPPED *lpOverlapped,
  DWORD        dwMilliseconds
);

 

  • CompletionPort : IOCP object
  • lpNumberOfBytesTransferred : I/O 처리 시 송수신 된 data 크기를 저장하는 변수의 주소
  • lpCompletionKey : CreateIoCompletionPort 함수의 세 번째 인자 주소
  • lpOverlapped : WSASend, WSARecv 함수 호출 시 전달되는 OVERLAPPED 구조체 변수 주소
  • dwMilliseconds : timout
    • INFINITE : 무한 대기

 

lpCompletionKey 인자는 CreateIoCompletionPort() 함수를 호출하여 Completion Port에 socket을 연결할 때 입력한 값이었다. I/O 작업이 발생되었을 때 socket을 구별하거나 계속해서 I/O를 요청하기 위한 사용자 정의 값을 전달 받게 된다.

lpOverlapped에는 WSASend, WSARecv와 같은 함수를 호출할 때 입력했던 OVERLAPPED 구조체 주소가 반환되는데, OVERLAPPED 구조체는 I/O 작업의 상태 정보를 담고 있다. 이 구조체로 인해 어느 시점에 I/O 작업이 완료되어 user process로 통보해 줄 지를 알 수 있게 된다. 

 

typedef struct _OVERLAPPED {
  ULONG_PTR Internal;
  ULONG_PTR InternalHigh;
  union {
    struct {
      DWORD Offset;
      DWORD OffsetHigh;
    } DUMMYSTRUCTNAME;
    PVOID Pointer;
  } DUMMYUNIONNAME;
  HANDLE    hEvent;
} OVERLAPPED, *LPOVERLAPPED;

 

  • Internal : I/O request 처리 상태
  • InternalHigh : 전송된 data bytes 수
  • Offset & OffsetHigh : 64bit file offset 하위/상위 32bit
  • hEvent : Overlapped I/O에서 작업 완료를 전달하는 event handle

 

3) PostQueuedCompletionStatus()

PostQueuedCompletionStatus() 함수는 인자로 받은 I/O 처리 결과를 completion queue에 추가하는 역할을 수행한다. 처리 결과가 queue에 등록되면 앞선 GetQueuedCompletionStatus() 호출에서 완료 여부를 감지할 수 있게 된다. user가 직접 completion queue에 결과를 넣을 수도 있기에 대기중인 thread를 깨워 정상적으로 종료하게끔 유도할 때 많이 사용된다. 한 번 호출할 때마다 하나의 결과만 queue에 등록되므로 대기하는 thread가 여러 개면 그 만큼 여러 번 호출해야 한다. 함수 원형을 살펴보자.

 

BOOL WINAPI PostQueuedCompletionStatus(
  _In_     HANDLE       CompletionPort,
  _In_     DWORD        dwNumberOfBytesTransferred,
  _In_     ULONG_PTR    dwCompletionKey,
  _In_opt_ LPOVERLAPPED lpOverlapped
);

 

  • CompletionPort : IOCP object
  • dwNumberOfBytesTransferred : 완료된 I/O bytes
    • GetQueuedCompletionStatus() 함수의 2번째 인자로 반환됨
  • dwCompletionKey : CreateIoCompletionPort 함수의 세 번째 인자
    • GetQueuedCompletionStatus() 함수의 3번째 인자로 반환됨
  • lpOverlapped : WSASend, WSARecv 함수 호출 시 전달되는 LOVERLAPPED 구조체
    • GetQueuedCompletionStatus() 함수의 4번째 인자로 반환됨

 

 

여기까지 IOCP 구현에 가장 필수적인 함수 몇 가지를 살펴보았다. 이젠 예제를 통해 전체적인 흐름을 이해해보자. 설명의 편의성을 위해 하나의 코드를 부분부분 쪼개어 확인해보았다. 아래 code는 microsoft 사의 공식 git에서 발췌한 C++ 예제이다. (링크)

 

#pragma warning (disable:4127)

#ifdef _IA64_
	#pragma warning(disable:4267)
#endif 

#ifndef WIN32_LEAN_AND_MEAN
	#define WIN32_LEAN_AND_MEAN
#endif

#define xmalloc(s) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,(s))
#define xfree(p)   HeapFree(GetProcessHeap(),0,(p))

#include <winsock2.h>
#include <Ws2tcpip.h>
#include <stdio.h>
#include <stdlib.h>
#include <strsafe.h>

#include "iocpserver.h"

char *g_Port = DEFAULT_PORT;
BOOL g_bEndServer = FALSE;			// set to TRUE on CTRL-C
BOOL g_bRestart = TRUE;				// set to TRUE to CTRL-BRK
BOOL g_bVerbose = FALSE;
DWORD g_dwThreadCount = 0;		//worker thread count
HANDLE g_hIOCP = INVALID_HANDLE_VALUE;
SOCKET g_sdListen = INVALID_SOCKET;
HANDLE g_ThreadHandles[MAX_WORKER_THREAD];
PPER_SOCKET_CONTEXT g_pCtxtList = NULL;		

CRITICAL_SECTION g_CriticalSection;		// guard access to the global context list

int myprintf(const char *lpFormat, ...);

 

필요한 라이브러리와 전역 변수를 선언해준다. 보다보니 IOCP object와 server socket 변수 및 worker thread에 대한 handle이 눈에 들어온다. 앞으로 이 변수들이 언제 할당되고 전달되는지를 유심히 관찰해야 할 것 같다. 참고로 여기서 MAX_WORKER_THREAD는 <iocpserver.h> 에 정의된 상수인 16이다. 

 

// ...
void __cdecl main (int argc, char *argv[]) {

	SYSTEM_INFO systemInfo;
	WSADATA wsaData;
	SOCKET sdAccept = INVALID_SOCKET;
	PPER_SOCKET_CONTEXT lpPerSocketContext = NULL;
	DWORD dwRecvNumBytes = 0;     
	DWORD dwFlags = 0;            
	int nRet = 0;

	for( int i = 0; i < MAX_WORKER_THREAD; i++ ) {
		g_ThreadHandles[i] = INVALID_HANDLE_VALUE;
	}

	if( !ValidOptions(argc, argv) )
		return;

	if( !SetConsoleCtrlHandler(CtrlHandler, TRUE) ) {
		myprintf("SetConsoleCtrlHandler() failed to install console handler: %d\n", 
				 GetLastError());
		return;
	}

	GetSystemInfo(&systemInfo);
	g_dwThreadCount = systemInfo.dwNumberOfProcessors * 2;

	if( (nRet = WSAStartup(MAKEWORD(2,2), &wsaData)) != 0 ) {
		myprintf("WSAStartup() failed: %d\n",nRet);
		SetConsoleCtrlHandler(CtrlHandler, FALSE);
		return;
	}

	__try
    {
        InitializeCriticalSection(&g_CriticalSection);
    }
    __except(EXCEPTION_EXECUTE_HANDLER)
    {
        myprintf("InitializeCriticalSection raised exception.\n");
        return;
    }

 

실제 CPU core 수를 알아내기 전 임의의 상수인 MAX_WORKER_THREAD 만큼 ThreadHandle 들을 초기화 해준다. 콘솔에서 인입되는 option이나 signal을 처리해두고 multi-threading을 위한 임계 영역 초기화 하는 구문도 보이지만 여기서 중요한 부분은 windows system 정보에서 CPU core 수를 알아오는 부분이다. 미리 thread를 생성해두기 위함이다. Windows.h에서 제공하는 GetSystemInfo 함수에 SYSTEM_INFO 구조체 주소를 전달하여 system 정보를 받아온 뒤, dwNumberOfProcessors 값을 통해 CPU core 수를 알아오는 모습이 보인다. 대기용 thread까지 고려하여 잠시 후 만들 thread 개수(g_dwThreadCount)를 core 수의 2배로 산정해두자.

 

// ...
	while( g_bRestart ) {
		g_bRestart = FALSE;
		g_bEndServer = FALSE;

		__try {
			g_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
			if( g_hIOCP == NULL ) {
				myprintf("CreateIoCompletionPort() failed to create I/O completion port: %d\n", 
						GetLastError());
				__leave;
			}

			for( DWORD dwCPU = 0; dwCPU < g_dwThreadCount; dwCPU++ ) {

				HANDLE hThread = INVALID_HANDLE_VALUE;
				DWORD dwThreadId = 0;

				hThread = CreateThread(NULL, 0, WorkerThread, g_hIOCP, 0, &dwThreadId);
				if( hThread == NULL ) {
					myprintf("CreateThread() failed to create worker thread: %d\n", 
							GetLastError());
					__leave;
				}
				g_ThreadHandles[dwCPU] = hThread;
				hThread = INVALID_HANDLE_VALUE;
			}

			if( !CreateListenSocket() )
				__leave;

 

해당 블록에서 가장 먼저 눈에 들어오는 함수가 있다.(아닐수도 있다) IOCP object를 생성하는 CreateIoCompletionPort() 함수이다. 이 함수로 IOCP object를 생성하여 전역 변수로 선언되었던 g_hIOCP에 넣어주었다.
이때 1,2,3번 째 인자는 정해진 생성 규칙대로 INVALID_HANDLE_VALUE, NULL, 0이 들어가는게 확인된다. 여기선 마지막 전달 인자를 0으로 주었으니 CPU core 수만큼 thread가 할당될 수 있겠다.


이제 아까 산정해둔 core 수의 2배(=g_dwThreadCount) 만큼 thread를 생성해준다. 이 때 IOCP object handle인 g_hIOCP를 전달하여 thread 들이 IOCP object에 할당되게 해준다. 그리고 socket 생성, bind, listen 하는 일련의 과정을 구현한 CreateListenSocket()를 호출하여 server socket을 준비한다. 

 

이때 kernel 함수인 CreateThread() 에 의해 실행되는 WorkerThread() 함수는 worker thread 들의 역할을 구현해둔 사용자 정의 함수인데 main 로직을 다 살펴 본 뒤에 짚어보도록 하겠다.

 

// ...
			while( TRUE ) {

				sdAccept = WSAAccept(g_sdListen, NULL, NULL, NULL, 0);
				if( sdAccept == SOCKET_ERROR ) {
					myprintf("WSAAccept() failed: %d\n", WSAGetLastError());
					__leave;
				}
				
				lpPerSocketContext = UpdateCompletionPort(sdAccept, ClientIoRead, TRUE);
				if( lpPerSocketContext == NULL )
					__leave;

				//
				// if a CTRL-C was pressed "after" WSAAccept returns, the CTRL-C handler
				// will have set this flag and we can break out of the loop here before
				// we go ahead and post another read (but after we have added it to the 
				// list of sockets to close).
				//
				if( g_bEndServer )
					break;

                //
				// post initial receive on this socket
				//
				nRet = WSARecv(sdAccept, &(lpPerSocketContext->pIOContext->wsabuf), 
							   1, &dwRecvNumBytes, &dwFlags,
							   &(lpPerSocketContext->pIOContext->Overlapped), NULL);
				if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) {
					myprintf("WSARecv() Failed: %d\n", WSAGetLastError());
					CloseClient(lpPerSocketContext, FALSE);
				}
			} //while
		}
		

 

server 통신을 설정한 후엔 loop를 돌며 client의 연결을 연속적으로 받아들인다. 그리고 UpdateComletionPort() 함수를 통해 accept 이후 반환되는 client socket 들을 iocp에 추가해주는데 중요한 부분이므로 짚고 가보면,

PPER_SOCKET_CONTEXT UpdateCompletionPort(SOCKET sd, IO_OPERATION ClientIo,
										 BOOL bAddToList) {

	PPER_SOCKET_CONTEXT lpPerSocketContext;

	lpPerSocketContext = CtxtAllocate(sd, ClientIo);
	if( lpPerSocketContext == NULL )
		return(NULL);

	g_hIOCP = CreateIoCompletionPort((HANDLE)sd, g_hIOCP, (DWORD_PTR)lpPerSocketContext, 0);
	if( g_hIOCP == NULL ) {
		myprintf("CreateIoCompletionPort() failed: %d\n", GetLastError());
		if( lpPerSocketContext->pIOContext )
			xfree(lpPerSocketContext->pIOContext);
		xfree(lpPerSocketContext);
		return(NULL);
	}

	if( bAddToList ) CtxtListAddTo(lpPerSocketContext);

	if( g_bVerbose )
		myprintf("UpdateCompletionPort: Socket(%d) added to IOCP\n", lpPerSocketContext->Socket);

	return(lpPerSocketContext);
}

PPER_SOCKET_CONTEXT CtxtAllocate(SOCKET sd, IO_OPERATION ClientIO) {

	PPER_SOCKET_CONTEXT lpPerSocketContext;

	__try
    {
        EnterCriticalSection(&g_CriticalSection);
    }
    __except(EXCEPTION_EXECUTE_HANDLER)
    {
        myprintf("EnterCriticalSection raised an exception.\n");
        return NULL;
    }

	lpPerSocketContext = (PPER_SOCKET_CONTEXT)xmalloc(sizeof(PER_SOCKET_CONTEXT));
	if( lpPerSocketContext ) {
		lpPerSocketContext->pIOContext = (PPER_IO_CONTEXT)xmalloc(sizeof(PER_IO_CONTEXT));
		if( lpPerSocketContext->pIOContext ) {
			lpPerSocketContext->Socket = sd;
			lpPerSocketContext->pCtxtBack = NULL;
			lpPerSocketContext->pCtxtForward = NULL;

			lpPerSocketContext->pIOContext->Overlapped.Internal = 0;
			lpPerSocketContext->pIOContext->Overlapped.InternalHigh = 0;
			lpPerSocketContext->pIOContext->Overlapped.Offset = 0;
			lpPerSocketContext->pIOContext->Overlapped.OffsetHigh = 0;
			lpPerSocketContext->pIOContext->Overlapped.hEvent = NULL;
			lpPerSocketContext->pIOContext->IOOperation = ClientIO;
			lpPerSocketContext->pIOContext->pIOContextForward = NULL;
			lpPerSocketContext->pIOContext->nTotalBytes = 0;
			lpPerSocketContext->pIOContext->nSentBytes  = 0;
			lpPerSocketContext->pIOContext->wsabuf.buf  = lpPerSocketContext->pIOContext->Buffer;
			lpPerSocketContext->pIOContext->wsabuf.len  = sizeof(lpPerSocketContext->pIOContext->Buffer);

			ZeroMemory(lpPerSocketContext->pIOContext->wsabuf.buf, lpPerSocketContext->pIOContext->wsabuf.len);
		} else {
			xfree(lpPerSocketContext);
			myprintf("HeapAlloc() PER_IO_CONTEXT failed: %d\n", GetLastError());
		}

	} else {
		myprintf("HeapAlloc() PER_SOCKET_CONTEXT failed: %d\n", GetLastError());
	}

	LeaveCriticalSection(&g_CriticalSection);

	return(lpPerSocketContext);
} 

새로운 client socket과 I/O에 필요한 buffer에 대한 메모리를 할당해주고 Overlapped 구조체 설정하는 로직이 보인다. 이어서 client가 무슨 요청을 하는지 읽기 위해 전달된 ClientIoRead 인자를 통해 읽기모드로 지정 하는 등 send/recv에 필요한 만반의 준비를 해두고 있다.

그리고 이제는 익숙해진 CreateCompletionPort() 투잡함수를 호출하는데 이번에는 IOCP object를 생성하는 역할이 아닌 socket과 연결해주기 위해 1번째 인자에는 client socket을, 2번째 인자에는 앞서 생성한 IOCP object handle을 넣어주었다. 

 

자 이로써 socket 뿐만 아니라 I/O에 필요한 buffer 및 Overlapped 구조체까지 준비되었다. 이젠 client들이 무슨 요청을 했는지 확인하기 위해 WSARecv() 함수를 호출해준다. WSARecv() 함수 6번째 인자로 Overlapped 구조체 변수 주소 값을 넣어주었는데, 이 구조체엔 나중에 I/O 작업이 완료되고 GetQueuedCompletionStatus() 함수가 반환될 때 유효한 I/O 작업 결과 값이 채워져 있을 것이라 기대해본다.

// ...
		__finally   {

			g_bEndServer = TRUE;

            //
			// Cause worker threads to exit
			//
			if( g_hIOCP ) {
				for( DWORD i = 0; i < g_dwThreadCount; i++ )
					PostQueuedCompletionStatus(g_hIOCP, 0, 0, NULL);
			}
            
			//
			//Make sure worker threads exits.
            //
			if( WAIT_OBJECT_0 != WaitForMultipleObjects( g_dwThreadCount,  g_ThreadHandles, TRUE, 1000) )
				myprintf("WaitForMultipleObjects() failed: %d\n", GetLastError());
			else
				for( DWORD i = 0; i < g_dwThreadCount; i++ ) {
					if( g_ThreadHandles[i] != INVALID_HANDLE_VALUE ) CloseHandle(g_ThreadHandles[i]);
					g_ThreadHandles[i] = INVALID_HANDLE_VALUE;
				}

			CtxtListFree();

			if( g_hIOCP ) {
				CloseHandle(g_hIOCP);
				g_hIOCP = NULL;
			}

			if( g_sdListen != INVALID_SOCKET ) {
				closesocket(g_sdListen); 
				g_sdListen = INVALID_SOCKET;
			}

			if( sdAccept != INVALID_SOCKET ) {
				closesocket(sdAccept); 
				sdAccept = INVALID_SOCKET;
			}

		} //finally

		if( g_bRestart ) {
			myprintf("\niocpserver is restarting...\n");
		} else
			myprintf("\niocpserver is exiting...\n");

	} //while (g_bRestart)

	DeleteCriticalSection(&g_CriticalSection);
	WSACleanup();
	SetConsoleCtrlHandler(CtrlHandler, FALSE);
} //main   

main의 마지막 구문이다. PostQueuedCompletionStatus() 함수를 호출하여 대기중인 thread들을 종료시키며 handle을 반환한다. socket까지 깔끔하게 정리한 후 종료된다.

 

마지막으로 아까 CreateThread()에 의해 호출되었던 WorkerThread() 함수를 통해 해당 예제에서의 worker thread 동작을 살펴보자.

DWORD WINAPI WorkerThread (LPVOID WorkThreadContext) {

	HANDLE hIOCP = (HANDLE)WorkThreadContext;
	BOOL bSuccess = FALSE;
	int nRet = 0;
	LPWSAOVERLAPPED lpOverlapped = NULL;
	PPER_SOCKET_CONTEXT lpPerSocketContext = NULL;
	PPER_IO_CONTEXT lpIOContext = NULL; 
	WSABUF buffRecv;
	WSABUF buffSend;
	DWORD dwRecvNumBytes = 0;
	DWORD dwSendNumBytes = 0;
	DWORD dwFlags = 0;
	DWORD dwIoSize = 0;

	while( TRUE ) {
		bSuccess = GetQueuedCompletionStatus(hIOCP, &dwIoSize,
											 (PDWORD_PTR)&lpPerSocketContext,
											 (LPOVERLAPPED *)&lpOverlapped, 
											 INFINITE);
		if( !bSuccess )
			myprintf("GetQueuedCompletionStatus() failed: %d\n", GetLastError());

		if( lpPerSocketContext == NULL ) {
			return(0);
		}

		if( g_bEndServer ) {
			return(0);
		}

		if( !bSuccess || (bSuccess && (dwIoSize == 0)) ) {
			CloseClient(lpPerSocketContext, FALSE); 
			continue;
		}

		lpIOContext = (PPER_IO_CONTEXT)lpOverlapped;
		switch( lpIOContext->IOOperation ) {
		case ClientIoRead:
			
			lpIOContext->IOOperation = ClientIoWrite;
			lpIOContext->nTotalBytes = dwIoSize;
			lpIOContext->nSentBytes  = 0;
			lpIOContext->wsabuf.len  = dwIoSize;
			dwFlags = 0;
			nRet = WSASend(lpPerSocketContext->Socket, &lpIOContext->wsabuf, 1, 
						   &dwSendNumBytes, dwFlags, &(lpIOContext->Overlapped), NULL);
			if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) {
				myprintf("WSASend() failed: %d\n", WSAGetLastError());
				CloseClient(lpPerSocketContext, FALSE);
			} else if( g_bVerbose ) {
				myprintf("WorkerThread %d: Socket(%d) Recv completed (%d bytes), Send posted\n", 
					   GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);
			}
			break;

		case ClientIoWrite:
			
			lpIOContext->IOOperation = ClientIoWrite;
			lpIOContext->nSentBytes  += dwIoSize;
			dwFlags = 0;
			if( lpIOContext->nSentBytes < lpIOContext->nTotalBytes ) {

				buffSend.buf = lpIOContext->Buffer + lpIOContext->nSentBytes;
				buffSend.len = lpIOContext->nTotalBytes - lpIOContext->nSentBytes;
				nRet = WSASend (lpPerSocketContext->Socket, &buffSend, 1, 
								&dwSendNumBytes, dwFlags, &(lpIOContext->Overlapped), NULL);
				if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) {
					myprintf("WSASend() failed: %d\n", WSAGetLastError());
					CloseClient(lpPerSocketContext, FALSE);
				} else if( g_bVerbose ) {
					myprintf("WorkerThread %d: Socket(%d) Send partially completed (%d bytes), Recv posted\n", 
						   GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);
				}
			} else {

				lpIOContext->IOOperation = ClientIoRead; 
				dwRecvNumBytes = 0;
				dwFlags = 0;
				buffRecv.buf = lpIOContext->Buffer,
				buffRecv.len = MAX_BUFF_SIZE;
				nRet = WSARecv(lpPerSocketContext->Socket, &buffRecv, 1, 
							   &dwRecvNumBytes, &dwFlags, &lpIOContext->Overlapped, NULL);
				if( nRet == SOCKET_ERROR && (ERROR_IO_PENDING != WSAGetLastError()) ) {
					myprintf("WSARecv() failed: %d\n", WSAGetLastError());
					CloseClient(lpPerSocketContext, FALSE);
				} else if( g_bVerbose ) {
					myprintf("WorkerThread %d: Socket(%d) Send completed (%d bytes), Recv posted\n", 
						   GetCurrentThreadId(), lpPerSocketContext->Socket, dwIoSize);
				}
			}
			break;

		} //switch
	} //while
	return(0);
} 

 

여기선 GetQueuedCompletionStatus() 함수가 가장 주요한 함수겠다. loop를 돌며 I/O 작업 완료를 대기하고 있는 모습이다. 마지막 인자가 INFINITE이니 I/O 작업이 완료되고 completion queue에 등록될 때 까지 무한히 대기한다. 이 함수가 반환되면 client socket과 I/O 결과 정보를 받아볼 수 있을 것이다.

 

완료된 I/O packet을 확인하게 된다면 이 작업이 읽기 작업이었는지 쓰기 작업이었는지에 따라 후속 작업이 달라진다.

operation이 read, 즉 client 요청 등을 정상적으로 읽어들인 경우, 이 예제는 echo server 이므로 operation mode를 write로 바꾼 뒤 동일한 data buffer를 활용하여 data를 그대로 client에 echo 응답을 보내는 쓰기 작업을 수행하는 모습을 보여준다.

operation이 write, 즉 client한테 응답한 쓰기 작업이 완료된 경우라고 한다면 전송하려는 모든 data가 전송되었는지 sent bytes 수와 total bytes 수를 비교/확인한다. 남은 data가 있다면 send buffer를 채우며 계속 쓰기 작업을 수행하고, 모든 data가 전송되었다면 send buffer는 total bytes 수만큼의 data가 쌓여있게되고 그때 operation mode를 read로 바꾸어 다음 요청이 올 때까지 대기하도록 설정하는 일련의 과정을 보여준다.


 

지금까지 select, poll과 같은 I/O 모델의 한계 극복을 위해 kernel level에서 성능을 향상시킨 운영체제 별 I/O 모델을 살펴보았다. linux의 epoll, BSD의 kqueue, solaris의 /dev/poll, windows의 IOCP가 대표적인 예였다. 한 가지 유념할 것은 이들 성능은 하드웨어, 네트워크 환경, 동접자 수 등 정말 다양한 이유로 좌우된다. 각 모델 별 특성과 부각되는 장점은 있겠지만 모든 경우의 수에서 그 특징이 나타나는 것도 아니기에 성능에 대한 절대적인 기준을 매기긴 어렵다는 것이다. 판단은 개인의 몫으로 남겨두도록 하고 I/O Multiplexing 톺아보기 시리즈는 여기서 줄이도록 한다.

 

 

 

 

ps. 원글 링크

추가되는 내용은 이 블로그에 업데이트 됨

'System' 카테고리의 다른 글

I/O Multiplexing 톺아보기 (1부)  (0) 2020.11.25
core dump 분석을 위한 gdb 사용법 간단 정리  (0) 2020.05.15

댓글