您现在的位置是:首页 >其他 >【网络进阶】五种IO网络模型(一)网站首页其他

【网络进阶】五种IO网络模型(一)

Ricky_0528 2023-06-12 08:00:03
简介【网络进阶】五种IO网络模型(一)

1. 阻塞IO

在Linux中,默认情况下,所有的套接字(socket)都是阻塞的。典型的读取操作流程如下:
在这里插入图片描述

当用户进程调用read系统调用时,内核开始执行I/O的第一个阶段,即准备数据。对于网络I/O来说,通常数据在一开始尚未到达(例如,尚未接收到一个完整的数据包),这时内核需要等待足够的数据到来。在用户进程这边,整个进程将被阻塞。当内核等到数据准备就绪,它会将数据从内核空间拷贝到用户空间,然后内核返回结果,用户进程才解除阻塞状态,重新运行起来。

因此,阻塞I/O的特点是在I/O执行的两个阶段(等待数据和拷贝数据)都被阻塞。

大多数程序员接触网络编程时,首先了解的是listen()、send()、recv()等阻塞型接口。使用这些接口可以方便地构建服务器/客户端模型。以下是一个简单的“一问一答”服务器示例:

在这里插入图片描述

绝大多数套接字接口都是阻塞型的。所谓阻塞型接口是指系统调用(通常是I/O接口)在没有返回调用结果的情况下让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。

实际上,除非特别指定,几乎所有的I/O接口(包括套接字接口)都是阻塞型的。这给网络编程带来了一个重大问题,即在调用send()的同时,线程将被阻塞,在此期间,线程将无法执行任何计算或响应任何网络请求。

一种简单的改进方案是在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他连接。具体使用多进程还是多线程,并没有一个特定的模式。传统上,进程的开销远大于线程,因此,如果需要同时为大量客户端提供服务,不推荐使用多进程;如果单个服务执行体需要消耗较多的CPU资源,例如需要进行大规模或长时间的数据运算或文件访问,那么进程较为安全。通常,可以使用pthread_create()创建新线程,或使用fork()创建新进程。

假设对上述服务器/客户端模型提出更高要求,即让服务器同时为多个客户端提供一问一答服务,于是有了如下模型:

在这里插入图片描述

在上述线程/时间示例中,主线程持续等待客户端的连接请求。如果有连接,则创建新线程,并在新线程中提供与前例相同的问答服务。

许多初学者可能不明白为什么一个套接字可以多次accept。实际上,套接字的设计者可能为多客户端场景特意设计了这个功能,使得accept()能够返回一个新的套接字。以下是accept接口的原型:

int accept(int s, struct sockaddr *addr, socklen_t *addrlen);

输入参数s是从socket()、bind()和listen()中继承下来的套接字句柄值。执行完bind()和listen()后,操作系统已经开始在指定的端口监听所有连接请求。如果有请求,则将该连接请求加入请求队列。调用accept()接口正是从套接字s的请求队列中提取第一个连接信息,创建一个与s同类型的新套接字并返回句柄。新的套接字句柄将作为后续read()和recv()的输入参数。如果请求队列当前没有请求,则accept()将进入阻塞状态,直到有请求进入队列。

上述多线程服务器模型似乎完美地解决了为多个客户端提供问答服务的要求,但其实并非如此。如果要同时响应数百或数千个连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而线程与进程本身也更容易进入假死状态。

许多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接,减少创建和关闭连接的频率。这两种技术都可以很好地降低系统开销,并广泛应用于许多大型系统,如Websphere、Tomcat和各种数据库等。但是,“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用I/O接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。因此,在使用“池”时,必须考虑其面临的响应规模,并根据响应规模调整“池”。

针对上例中可能同时出现的数千甚至数万次客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但不能解决所有问题。总之,多线程模型可以方便高效地解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈。这时,可以尝试使用非阻塞接口来解决这个问题。

非阻塞I/O(Non-blocking I/O)或异步I/O(Asynchronous I/O)是另一种处理大量并发连接的方法。在这种模式下,I/O操作不会阻塞当前线程,而是立即返回。如果I/O操作尚未完成,系统调用将返回一个特殊的错误码,通知调用者稍后再试。这使得服务器能够在等待某个I/O操作完成的同时,处理其他客户端的请求。

实现非阻塞I/O的一种方法是使用事件驱动编程(Event-driven programming)。在这种模式下,服务器维护一个事件循环(Event loop),用于监听各种I/O事件,如新连接请求、数据到达等。当事件发生时,事件循环将调用相应的回调函数来处理事件。这样,服务器可以在一个单线程中处理多个客户端的请求,避免了多线程或多进程带来的开销。

在Linux中,有多种实现事件驱动编程的机制,如select、poll和epoll。这些机制允许程序员在一个线程中监视多个文件描述符(如套接字)的状态,并在某个文件描述符就绪时(如可读、可写)进行处理。这些机制的主要区别在于性能和可扩展性,其中epoll在处理大量并发连接时具有较好的性能。

总之,对于大规模并发连接的场景,非阻塞I/O和事件驱动编程提供了一种有效的解决方案。通过使用select、poll或epoll等机制,可以实现高性能、可扩展的服务器,避免了多线程或多进程带来的资源开销和管理复杂性。


C语言socket例子:

该例子是一个TCP服务器,它接受客户端的连接,接收客户端发送的消息,然后将消息原样返回给客户端。

server.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define BUF_SIZE 1024
#define SERVER_PORT 8080

int main() {
    int server_socket, client_socket;
    struct sockaddr_in server_addr, client_addr;
    char buffer[BUF_SIZE];
    socklen_t client_addr_size;

    // 创建服务器socket
    server_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (server_socket == -1) {
        perror("socket creation failed");
        exit(1);
    }

    // 设置服务器地址结构体
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(SERVER_PORT);

    // 绑定socket到服务器地址
    if (bind(server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
        perror("bind failed");
        exit(1);
    }

    // 监听socket
    if (listen(server_socket, 5) == -1) {
        perror("listen failed");
        exit(1);
    }

    client_addr_size = sizeof(client_addr);
    while (1) {
        // 接受客户端连接(阻塞式)
        client_socket = accept(server_socket, (struct sockaddr *)&client_addr, &client_addr_size);
        if (client_socket == -1) {
            perror("accept failed");
            exit(1);
        }

        // 接收客户端消息并将其返回(阻塞式)
        int read_size;
        while ((read_size = read(client_socket, buffer, BUF_SIZE)) != 0) {
            printf("从客户端收到的消息:%s
", buffer);
            write(client_socket, buffer, read_size);
            memset(buffer, 0, sizeof(buffer));
        }

        close(client_socket);
    }

    close(server_socket);
    return 0;
}

client.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define BUF_SIZE 1024
#define SERVER_PORT 8080

int main() {
    int client_socket;
    struct sockaddr_in server_addr;
    char buffer[BUF_SIZE];

    // 创建客户端socket
    client_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (client_socket == -1) {
        perror("socket creation failed");
        exit(1);
    }

    // 设置服务器地址结构体
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    server_addr.sin_port = htons(SERVER_PORT);

    // 连接到服务器(阻塞式)
    if (connect(client_socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
        perror("connect failed");
        exit(1);
    }

    while (1) {
        printf("请输入消息:");
        fgets(buffer, BUF_SIZE, stdin);
        buffer[strlen(buffer) - 1] = '';
        // 向服务器发送消息
        write(client_socket, buffer, strlen(buffer));

        // 接收服务器响应(阻塞式)
        int read_size = read(client_socket, buffer, BUF_SIZE - 1);
        if (read_size == -1) {
            perror("read failed");
            exit(1);
        }

        buffer[read_size] = '';
        printf("从服务器接收到的消息:%s
", buffer);
    }

    close(client_socket);
    return 0;
}

在这里插入图片描述

2. 非阻塞IO

在Linux下,可以通过设置套接字使其变为非阻塞。对于一个非阻塞套接字进行读操作时,流程如下:

在这里插入图片描述

从上述过程中可以看出,当用户进程发起read操作时,如果内核中的数据尚未准备好,它不会阻塞用户进程,而是立即返回一个错误。从用户进程的角度看,发起一个read操作后,不需要等待,而是马上得到一个结果。用户进程判断结果为错误时,知道数据还没有准备好,于是可以再次发起read操作。一旦内核中的数据准备好了,并再次收到用户进程的系统调用,它会立即将数据拷贝到用户内存,然后返回。因此,在非阻塞I/O中,用户进程实际上需要不断主动询问内核数据是否准备好。

在非阻塞状态下,recv()接口被调用后立即返回,返回值表示不同的含义。例如,在本例中:

  • recv()返回值大于0,表示接收数据完毕,返回值即为接收到的字节数
  • recv()返回值为0,表示连接已经正常断开
  • recv()返回值为-1且errno等于EAGAIN,表示recv操作尚未完成
  • recv()返回值为-1且errno不等于EAGAIN,表示recv操作遇到系统错误errno。

非阻塞接口与阻塞接口的显著差异在于,调用后立即返回。可以使用以下函数将某个文件描述符fd设置为非阻塞状态:

fcntl(fd, F_SETFL, O_NONBLOCK);

下面将展示一个仅使用一个线程,但能够同时检测多个连接中的数据是否到达,并接收数据的模型:

在这里插入图片描述

可以看到,服务器线程可以通过循环调用recv()接口,在单个线程内实现对所有连接的数据接收工作。但是,这种模型并不推荐使用。因为循环调用recv()会大幅度提高CPU占用率;此外,在这个方案中,recv()更多地起到检测“操作是否完成”的作用,实际上操作系统提供了更为高效的检测“操作是否完成”的接口,如select()多路复用模式,可以一次检测多个连接是否活跃。使用select()等多路复用技术,能够更高效地处理并发连接,减少CPU占用,提高服务器性能。


C语言socket例子:

可以使用fcntl()函数将socket设置为非阻塞模式,并使用select()函数在读/写操作上实现异步。下面同样是一个TCP服务器,它接受客户端的连接,接收客户端发送的消息,然后将消息原样返回给客户端。

server.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/select.h>

#define BUF_SIZE 1024
#define SERVER_PORT 8080

int main() {
    int server_socket, client_socket;
    struct sockaddr_in server_addr, client_addr;
    char buffer[BUF_SIZE];
    socklen_t client_addr_size;
    fd_set read_fds, temp_fds;
    int max_fd;

    // 创建服务器socket
    server_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (server_socket == -1) {
        perror("socket creation failed");
        exit(1);
    }

    // 设置服务器地址结构体
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    server_addr.sin_port = htons(SERVER_PORT);

    // 绑定socket到服务器地址
    if (bind(server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
        perror("bind failed");
        exit(1);
    }

    // 监听socket
    if (listen(server_socket, 5) == -1) {
        perror("listen failed");
        exit(1);
    }

    // 设置服务器socket为非阻塞模式
    int flags = fcntl(server_socket, F_GETFL, 0);
    fcntl(server_socket, F_SETFL, flags | O_NONBLOCK);

    FD_ZERO(&read_fds);
    FD_SET(server_socket, &read_fds);
    max_fd = server_socket;

    client_addr_size = sizeof(client_addr);
    while (1) {
        temp_fds = read_fds;

        // 使用select()函数监控文件描述符集合
        if (select(max_fd + 1, &temp_fds, NULL, NULL, NULL) == -1) {
            perror("select failed");
            exit(1);
        }

        for (int i = 0; i <= max_fd; i++) {
            if (FD_ISSET(i, &temp_fds)) {
                if (i == server_socket) {
                    // 接受客户端连接(非阻塞式)
                    client_socket = accept(server_socket, (struct sockaddr *)&client_addr, &client_addr_size);
                    if (client_socket == -1) {
                        perror("accept failed");
                        exit(1);
                    }

                    // 将客户端socket添加到文件描述符集合
                    FD_SET(client_socket, &read_fds);
                    if (client_socket > max_fd) {
                        max_fd = client_socket;
                    }
                } else {
                    // 接收客户端消息并将其返回(非阻塞式)
                    int read_size = read(i, buffer, BUF_SIZE - 1);
                    if (read_size <= 0) {
                        close(i);
                        FD_CLR(i, &read_fds);
                    } else {
                        write(i, buffer, read_size);
                    }
                }
            }
        }
    }

    close(server_socket);
    return 0;
}

client.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/select.h>

#define BUF_SIZE 1024
#define SERVER_PORT 8080

int main() {
    int client_socket;
    struct sockaddr_in server_addr;
    char buffer[BUF_SIZE];
    fd_set read_fds, temp_fds;
    int max_fd;

    // 创建客户端socket
    client_socket = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (client_socket == -1) {
        perror("socket creation failed");
        exit(1);
    }

    // 设置服务器地址结构体
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    server_addr.sin_port = htons(SERVER_PORT);

    // 连接到服务器(阻塞式)
    if (connect(client_socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {
        perror("connect failed");
        exit(1);
    }

    // 设置客户端socket为非阻塞模式
    int flags = fcntl(client_socket, F_GETFL, 0);
    fcntl(client_socket, F_SETFL, flags | O_NONBLOCK);

    FD_ZERO(&read_fds);
    FD_SET(client_socket, &read_fds);
    FD_SET(STDIN_FILENO, &read_fds);
    max_fd = client_socket;

    while (1) {
        temp_fds = read_fds;

        // 使用select()函数监控文件描述符集合
        if (select(max_fd + 1, &temp_fds, NULL, NULL, NULL) == -1) {
            perror("select failed");
            exit(1);
        }

        if (FD_ISSET(STDIN_FILENO, &temp_fds)) {
            printf("请输入消息:");
            fgets(buffer, BUF_SIZE, stdin);
            buffer[strlen(buffer) - 1] = '';

            // 向服务器发送消息
            write(client_socket, buffer, strlen(buffer));
        }

        if (FD_ISSET(client_socket, &temp_fds)) {
            // 接收服务器响应(非阻塞式)
            int read_size = read(client_socket, buffer, BUF_SIZE - 1);
            if (read_size <= 0) {
                close(client_socket);
                exit(1);
            }

            buffer[read_size] = '';
            printf("从服务器接收到的消息:%s
", buffer);
        }
    }

    close(client_socket);
    return 0;
}

在这里插入图片描述

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。