翼度科技»论坛 云主机 LINUX 查看内容

从read 系统调用到 C10M 问题

5

主题

5

帖子

15

积分

新手上路

Rank: 1

积分
15
一.前言

从上个世纪到现在,工程师们在优化服务器性能的过程中,提出了各种不同的io模型,比如非阻塞io,io复用,信号驱动式io,异步io。具体io模型在不同平台上的实现也不一样,比如io复用在bsd上可以由kqueue实现,在solaris系统上可以由/dev/poll实现。为了实现系统的可移植性,POSIX 确保 select和poll在 unix-like系统上得到广泛的支持。
在上个世纪,Dan Kegel 提出了C10K的设想,现在C10K 已经不是什么问题,比如nginx就可以做到百万级别的qps。于是又有人提出来了C10M的设想,Robert David Graham 从unix的最初设计初衷给出了自己的解决方案。
二.常见io模型

1.阻塞io

常见的read系统调用,是最常见的阻塞io:

2.非阻塞式io

非阻塞io的典型使用方式如下,设置非阻塞标志,并且常与io复用一起使用,使用起来比较复杂。
  1. val = Fcntl(sockfd, F_GETFL, 0);
  2. Fcntl(sockfd, F_SETFL, val | O_NONBLOCK); /* O_NONBLOCK 标志非阻塞 */
复制代码
 

3.io 复用 (select/poll)

io复用在处理数量庞大的fd时非常有效,我们以select为例,select的核心api是select函数:
  1. int select(int nfds, fd_set *_Nullable restrict readfds,
  2.                   fd_set *_Nullable restrict writefds,
  3.                   fd_set *_Nullable restrict exceptfds,
  4.                   struct timeval *_Nullable restrict timeout);
复制代码
看一个例子:
  1. #include        "unp.h"
  2. void
  3. str_cli(FILE *fp, int sockfd)
  4. {
  5.         int                        maxfdp1;
  6.         fd_set                rset;
  7.         char                sendline[MAXLINE], recvline[MAXLINE];
  8.         FD_ZERO(&rset);
  9.         for ( ; ; ) {
  10.                 FD_SET(fileno(fp), &rset); /* 设置要监听的socket fd */
  11.                 FD_SET(sockfd, &rset); /* 设置要监听的file fd */
  12.                 maxfdp1 = max(fileno(fp), sockfd) + 1;
  13.                 Select(maxfdp1, &rset, NULL, NULL, NULL); /* select 调用 */
  14.                 if (FD_ISSET(sockfd, &rset)) {        /* socket 可读 */
  15.                         if (Readline(sockfd, recvline, MAXLINE) == 0)
  16.                                 err_quit("str_cli: server terminated prematurely");
  17.                         Fputs(recvline, stdout);
  18.                 }
  19.                 if (FD_ISSET(fileno(fp), &rset)) {  /* input 可读 */
  20.                         if (Fgets(sendline, MAXLINE, fp) == NULL)
  21.                                 return;                /* all done */
  22.                         Writen(sockfd, sendline, strlen(sendline));
  23.                 }
  24.         }
  25. }
复制代码
 
4.信号驱动式io

但凡涉及到信号的程序都比较复杂。要使用信号驱动式io,先开启socket的信号驱动式io功能,并通过sigaction 系统调用安装一个信号处理函数:
  1. void
  2. dg_echo(int sockfd_arg, SA *pcliaddr, socklen_t clilen_arg)
  3. {
  4.         int                        i;
  5.         const int        on = 1;
  6.         sigset_t        zeromask, newmask, oldmask;
  7.         sockfd = sockfd_arg;
  8.         clilen = clilen_arg;
  9.         for (i = 0; i < QSIZE; i++) {        /* init queue of buffers */
  10.                 dg[i].dg_data = Malloc(MAXDG);
  11.                 dg[i].dg_sa = Malloc(clilen);
  12.                 dg[i].dg_salen = clilen;
  13.         }
  14.         iget = iput = nqueue = 0;
  15.         Signal(SIGHUP, sig_hup); /* 安装信号处理函数 */
  16.         Signal(SIGIO, sig_io);
  17.         Fcntl(sockfd, F_SETOWN, getpid()); /* 设置属主 */
  18.         Ioctl(sockfd, FIOASYNC, &on); /* 开启信号驱动式io */
  19.         Ioctl(sockfd, FIONBIO, &on); /* non-bloking */
  20.         Sigemptyset(&zeromask);                /* init three signal sets */
  21.         Sigemptyset(&oldmask);
  22.         Sigemptyset(&newmask);
  23.         Sigaddset(&newmask, SIGIO);        /* signal we want to block */
  24.         Sigprocmask(SIG_BLOCK, &newmask, &oldmask);
  25.         for ( ; ; ) {
  26.                 while (nqueue == 0)
  27.                         sigsuspend(&zeromask);        /* wait for datagram to process */
  28.                         /* 4unblock SIGIO */
  29.                 Sigprocmask(SIG_SETMASK, &oldmask, NULL);
  30.                 Sendto(sockfd, dg[iget].dg_data, dg[iget].dg_len, 0,
  31.                            dg[iget].dg_sa, dg[iget].dg_salen);
  32.                 if (++iget >= QSIZE)
  33.                         iget = 0;
  34.                         /* 4block SIGIO */
  35.                 Sigprocmask(SIG_BLOCK, &newmask, &oldmask);
  36.                 nqueue--;
  37.         }
  38. }
复制代码
 
5.异步io

我们来看一个aio的例子(由于aio的例子过于复杂,我们这里只截取部分关键代码):
  1. for (i = 0; i < NBUF; i++) {
  2.     switch (bufs[i].op) {
  3.         case UNUSED:
  4.             /*
  5.                                  * Read from the input file if more data
  6.                                  * remains unread.
  7.                                  */
  8.             if (off < sbuf.st_size) {
  9.                 bufs[i].op = READ_PENDING;
  10.                 bufs[i].aiocb.aio_fildes = ifd;
  11.                 bufs[i].aiocb.aio_offset = off;
  12.                 off += BSZ;
  13.                 if (off >= sbuf.st_size)
  14.                     bufs[i].last = 1;
  15.                 bufs[i].aiocb.aio_nbytes = BSZ;
  16.                 if (aio_read(&bufs[i].aiocb) < 0) /* aio_read */
  17.                     err_sys("aio_read failed");
  18.                 aiolist[i] = &bufs[i].aiocb;
  19.                 numop++;
  20.             }
  21.             break;
  22.         case READ_PENDING:
  23.             if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS) /* aio_error */
  24.                 continue;
  25.             if (err != 0) {
  26.                 if (err == -1)
  27.                     err_sys("aio_error failed");
  28.                 else
  29.                     err_exit(err, "read failed");
  30.             }
  31.             /*
  32.                                  * A read is complete; translate the buffer
  33.                                  * and write it.
  34.                                  */
  35.             if ((n = aio_return(&bufs[i].aiocb)) < 0) /* 调用aio_return成功则 说明数据已经返回 */
  36.                 err_sys("aio_return failed");
  37.             if (n != BSZ && !bufs[i].last)
  38.                 err_quit("short read (%d/%d)", n, BSZ);
  39.             for (j = 0; j < n; j++)
  40.                 bufs[i].data[j] = translate(bufs[i].data[j]);
  41.             bufs[i].op = WRITE_PENDING;
  42.             bufs[i].aiocb.aio_fildes = ofd;
  43.             bufs[i].aiocb.aio_nbytes = n;
  44.             if (aio_write(&bufs[i].aiocb) < 0) /* aio_write */
  45.                 err_sys("aio_write failed");
  46.             /* retain our spot in aiolist */
  47.             break;
  48.         case WRITE_PENDING:
  49.             if ((err = aio_error(&bufs[i].aiocb)) == EINPROGRESS) /* aio_error */
  50.                 continue;
  51.             if (err != 0) {
  52.                 if (err == -1)
  53.                     err_sys("aio_error failed");
  54.                 else
  55.                     err_exit(err, "write failed");
  56.             }
  57.             /*
  58.                                  * A write is complete; mark the buffer as unused.
  59.                                  */
  60.             if ((n = aio_return(&bufs[i].aiocb)) < 0)
  61.                 err_sys("aio_return failed");
  62.             if (n != bufs[i].aiocb.aio_nbytes)
  63.                 err_quit("short write (%d/%d)", n, BSZ);
  64.             aiolist[i] = NULL;
  65.             bufs[i].op = UNUSED;
  66.             numop--;
  67.             break;
  68.     }
  69. }
复制代码
 

6.同步和异步的分类

网络上对io同步和异步的争论很多,这里给出Stevens的分类标准: 
同步阻塞io,非阻塞io,io复用,信号驱动式io
异步异步io
 
三.C10K io策略

在上个世纪,Dan Kegel 提出了C10K的设想,即单机实现10k的并发量,主要提出了以下四种类型的解决方法:
服务器范式例子备注软件实现
Serve many clients with each thread, and use nonblocking I/O(level-triggered)select, poll(posix), /dev/poll(solaris), kqueue(bsd)轮询 
Serve many clients with each thread, and use nonblocking I/O (readiness change)kqueue(bsd), epoll(linux), Realtime Signals(linux)事件通知nginx, redis
Serve many clients with each server thread, and use asynchronous I/Oaio异步,没有得到广泛支持 
Serve one client with each server threadLinuxThreads, Java threading support in JDK 1.3.x and earlier 
早期的java使用绿色线程
 


  • 在实现的过程中有诸多限制,比如打开fd的限制,创建thread数量的限制,根据不同内核而异。
  • 32 位系统,用户态的虚拟空间只有3G,如果创建线程时分配的栈空间是10M,那么一个进程最多只能创建300 个左右的线程。 64 位系统,用户态的虚拟空间大到有128T,理论上不会受虚拟内存大小的限制(10M个线程),而会受系统的参数或性能限制(线程上下文切换)。
四.C10M

Robert David Graham认为如果要解决C10M的问题,必须对unix内核进行改造。当下的unix系统的设计目标是为了满足非常广泛的需求,于是加上了许多通用的功能,比如进程管理,内存管理等等。C10M的问题不是通用的问题,需要自己处理数据控制,而不是依赖unix内核,而且需要做到packet scalability, multi-core scalability, memory scalability。
专项问题,需要特殊的解决方案。
五.总结

本文从常见io模型出发,梳理了高并发服务器可能涉及到的io模型,这些经典io模型在过去十年基本没有发生变化。了解这些底层技术对我们了解深入理解服务器是非常有必要的。
六.参考

《unix网络编程》
http://www.kegel.com/c10k.html#threads.java
http://highscalability.com/blog/2013/5/13/the-secret-to-10-million-concurrent-connections-the-kernel-i.html 
https://man7.org/linux/man-pages/man2/select.2.html                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          

来源:https://www.cnblogs.com/darcy-yuan/p/17603735.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具