翼度科技»论坛 云主机 LINUX 查看内容

线程池的原理与C语言实现

3

主题

3

帖子

9

积分

新手上路

Rank: 1

积分
9
V1.0 2024年6月11日 发布于博客园
目录


目录

线程池原理

线程池是什么

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。


线程池模型(同进程池):

多个子线程处理同一个客户连接上的不同任务

使用线程池可以带来一系列好处:

  • 降低资源消耗(系统资源):通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高线程的可管理性(系统资源):线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 提高响应速度(任务响应):任务到达时,无需等待线程创建即可立即执行。
  • 提供更多更强大的功能(功能扩展):线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
线程池解决的问题

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

  • 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
  • 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
  • 系统无法合理管理内部的资源分布,会降低系统的稳定性。
动态创建子线程的缺点

通过动态创建子进程(或子线程)来实现并发服务器,这样做有如下缺点:

  • 动态创建进程(或线程)是比较耗费时间的,这将导致较慢的客户响应。
  • 动态创建的子进程(或子线程)通常只用来为一个客户服务(除非我们做特殊的处理),这将导致系统上产生大量的细微进程(或线程)。进程(或线程)间的切换将消耗大量CPU时间。
  • 动态创建的子进程是当前进程的完整映像。当前进程必须谨慎地管理其分配的文件描述符和堆内存等系统资源,否则子进程可能复制这些资源,从而使系统的可用资源急剧下降,进而影响服务器的性能。
线程池相关接口

线程池相关结构体

struct task 任务节点

  1. // 任务结点  单向链表的节点,类型
  2. struct task
  3. {
  4.     void *(*do_task)(void *arg); // 任务函数指针  指向线程要执行的任务  格式是固定的
  5.     void *arg;                                         // 需要传递给任务的参数,如果不需要,则NULL
  6.     struct task *next; // 指向下一个任务结点的指针
  7. };
复制代码
线程池接口

init_pool() 线程池初始化

  1. // 初始化线程池 pool线程池指针  threads_number 初始化线程的个数
  2. bool init_pool(thread_pool *pool, unsigned int threads_number)
  3. {
  4.     // 初始化互斥锁
  5.     pthread_mutex_init(&pool->lock, NULL);
  6.     // 初始化条件量
  7.     pthread_cond_init(&pool->cond, NULL);
  8.     // 销毁标志 设置线程池为未关闭状态
  9.     pool->shutdown = false; // 不销毁
  10.     // 给任务链表的节点申请堆内存
  11.     pool->task_list = malloc(sizeof(struct task));
  12.     // 申请堆内存,用于存储创建出来的线程的ID
  13.     pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);
  14.     // 错误处理,对malloc进行错误处理
  15.     if (pool->task_list == NULL || pool->tids == NULL)
  16.     {
  17.         perror("分配内存错误");
  18.         return false;
  19.     }
  20.     // 对任务链表中的节点的指针域进行初始化
  21.     pool->task_list->next = NULL;
  22.     // 设置线程池中处于等待状态的任务数量最大值
  23.     pool->max_waiting_tasks = MAX_WAITING_TASKS;
  24.     // 设置等待线程处理的任务的数量为0,说明现在没有任务
  25.     pool->waiting_tasks = 0;
  26.     // 设置线程池中活跃的线程的数量
  27.     pool->active_threads = threads_number;
  28.     int i;
  29.     // 循环创建活跃线程
  30.     for (i = 0; i < pool->active_threads; i++)
  31.     {
  32.         // 创建线程  把线程的ID存储在申请的堆内存
  33.         if (pthread_create(&((pool->tids)[i]), NULL,
  34.                            routine, (void *)pool) != 0)
  35.         {
  36.             perror("创建线程错误");
  37.             return false;
  38.         }
  39.     }
  40.     return true;
  41. }
复制代码
线程池初始化流程图

mermaid
graph TD    A[初始化线程池] --> B[初始化互斥锁]    B --> C[初始化条件变量]    C --> D[分配任务链表内存]    D --> E[分配线程ID数组内存]    E --> F{内存分配是否成功?}    F -- 否 --> G[打印错误信息]    F -- 是 --> H[设置初始值]    H --> I[创建指定数量线程]    I --> J[线程池初始化完成]add_task() 向线程池添加任务

  1. // 向线程池的任务链表中添加任务
  2. bool add_task(thread_pool *pool,
  3.               void *(*do_task)(void *arg), void *arg)
  4. {
  5.     // 给任务链表节点申请内存
  6.     struct task *new_task = malloc(sizeof(struct task));
  7.     if (new_task == NULL) // 检查内存分配是否成功
  8.     {
  9.         perror("申请内存错误");
  10.         return false;
  11.     }
  12.     new_task->do_task = do_task; // 设置任务函数指针
  13.     new_task->arg = arg;                 // 设置任务参数
  14.     new_task->next = NULL;                 // 指针域设置为NULL  初始化任务的下一个指针
  15.     //============ LOCK =============//
  16.     pthread_mutex_lock(&pool->lock); // 加锁,保护共享资源
  17.     //===============================//
  18.     // 说明要处理的任务的数量大于能处理的任务数量
  19.     if (pool->waiting_tasks >= MAX_WAITING_TASKS) // 检查等待任务是否超过最大值
  20.     {
  21.         pthread_mutex_unlock(&pool->lock); // 解锁
  22.         fprintf(stderr, "任务太多.\n"); // 打印错误信息
  23.         free(new_task);                                        // 释放新任务内存
  24.         return false;
  25.     }
  26.     struct task *tmp = pool->task_list; // 获取任务链表头
  27.     // 遍历链表,找到单向链表的尾节点
  28.     while (tmp->next != NULL)
  29.         tmp = tmp->next;
  30.     // 把新的要处理的任务插入到链表的尾部  尾插
  31.     tmp->next = new_task;
  32.     // 要处理的任务的数量+1 (等待任务数量+1)
  33.     pool->waiting_tasks++;
  34.     //=========== UNLOCK ============//
  35.     pthread_mutex_unlock(&pool->lock); // 解锁
  36.     //===============================//
  37.     // 唤醒第一个处于阻塞队列中的线程
  38.     pthread_cond_signal(&pool->cond);
  39.     return true;
  40. }
复制代码
add_thread() 增加活跃线程

  1. // 向线程池加入新线程
  2. int add_thread(thread_pool *pool, unsigned additional_threads)
  3. {
  4.     // 判断需要添加的新线程的数量是否为0  如果没有要添加的线程,直接返回
  5.     if (additional_threads == 0)
  6.         return 0;
  7.     // 计算线程池中总线程的数量
  8.     unsigned total_threads =
  9.         pool->active_threads + additional_threads;
  10.     int i, actual_increment = 0; // 初始化计数器
  11.     // 循环创建新线程
  12.     for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++)
  13.     {
  14.         // 创建新线程
  15.         if (pthread_create(&((pool->tids)[i]),
  16.                            NULL, routine, (void *)pool) != 0)
  17.         {
  18.             perror("增加活跃线程错误"); // 打印错误信息
  19.             // 如果没有成功创建任何线程,返回错误
  20.             if (actual_increment == 0)
  21.                 return -1;
  22.             break; // 退出循环
  23.         }
  24.         actual_increment++; // 增加计数器
  25.     }
  26.     // 记录此时线程池中活跃线程的总数
  27.     pool->active_threads += actual_increment; // 更新活跃线程数
  28.     return actual_increment;                                  // 返回实际增加的线程数
  29. }
复制代码
remove_thread()删除活跃线程

  1. // 从线程池中删除线程
  2. int remove_thread(thread_pool *pool, unsigned int removing_threads)
  3. {
  4.     if (removing_threads == 0)
  5.         return pool->active_threads; // 如果没有要删除的线程,直接返回
  6.     int remaining_threads = pool->active_threads - removing_threads;   // 计算剩余线程数
  7.     remaining_threads = remaining_threads > 0 ? remaining_threads : 1; // 确保至少有一个线程
  8.     int i;
  9.     for (i = pool->active_threads - 1; i > remaining_threads - 1; i--) // 循环取消线程
  10.     {
  11.         errno = pthread_cancel(pool->tids[i]); // 取消线程
  12.         if (errno != 0) // 检查取消是否成功
  13.             break;
  14.     }
  15.     if (i == pool->active_threads - 1) // 如果没有成功取消任何线程,返回错误
  16.         return -1;
  17.     else
  18.     {
  19.         pool->active_threads = i + 1; // 更新活跃线程数
  20.         return i + 1;                                  // 返回剩余线程数
  21.     }
  22. }
复制代码
destroy_pool()销毁线程池

  1. // 销毁线程池
  2. bool destroy_pool(thread_pool *pool)
  3. {
  4.     // 1,激活所有线程 设置关闭标志
  5.     pool->shutdown = true;
  6.     pthread_cond_broadcast(&pool->cond); // 唤醒所有等待中的线程
  7.     // 2, 等待线程们执行完毕
  8.     int i;
  9.     for (i = 0; i < pool->active_threads; i++) // 循环等待所有线程退出
  10.     {
  11.         /**
  12.                  * pthread_join(pool->tids[i], NULL) 的作用是等待线程池中第 i 个线程终止,并清理其相关资源。通过这种方式,可以确保在销毁线程池时,所有线程都已经安全地终止。
  13.                  * pthread_join 是 POSIX 线程库中的一个函数,用于等待一个线程的终止。它的功能类似于进程中的 wait 系统调用。
  14.                  */
  15.         errno = pthread_join(pool->tids[i], NULL); // 等待线程退出
  16.         if (errno != 0)                                                           // 检查等待是否成功
  17.         {
  18.             printf("join tids[%d] error: %s\n",
  19.                    i, strerror(errno)); // 打印错误信息
  20.         }
  21.         else
  22.             printf("[%u] is joined\n", (unsigned)pool->tids[i]); // 打印线程退出信息
  23.     }
  24.     // 3, 销毁线程池
  25.     free(pool->task_list); // 释放任务链表内存
  26.     free(pool->tids);           // 释放线程ID数组内存
  27.     free(pool);                           // 释放线程池结构体内存
  28.     return true;
  29. }
复制代码
线程池实例

main.c
  1. #include "thread_pool.h" // 包含线程池头文件
  2. // 任务函数, 打印一次线程任务信息,并等待n秒,模拟真正的线程任务
  3. void *mytask(void *arg)
  4. {
  5.     int n = (int)arg; // 要执行的秒数 将参数转换为整数, 强制转换才能使用
  6.     /**
  7.          * %u:无符号整数(unsigned int)
  8.          * pthread_self():这是一个 POSIX 线程库函数,返回调用它的线程的线程 ID。
  9.          * __FUNCTION__:这是一个预定义的宏,扩展为当前函数的名称。它在调试和日志记录时非常有用,可以显示当前正在执行的函数名。
  10.          */
  11.     printf("[%u][%s] ==>工作将会在这里被执行 %d 秒...\n",
  12.            (unsigned)pthread_self(), __FUNCTION__, n); // 打印任务开始信息
  13.     sleep(n);
  14.     printf("[%u][%s] ==> 工作完毕!\n",
  15.            (unsigned)pthread_self(), __FUNCTION__); // 打印任务完成信息
  16.     return NULL;
  17. }
  18. // 计时函数
  19. void *count_time(void *arg)
  20. {
  21.     int i = 0; // 初始化计数器
  22.     while (1)
  23.     {
  24.         sleep(1);
  25.         printf("sec: %d\n", ++i); // 打印经过的秒数
  26.     }
  27. }
  28. int main(void)
  29. {
  30.     pthread_t a;                                                                // 定义一个线程ID
  31.     pthread_create(&a, NULL, count_time, NULL); // 创建计时线程
  32.     // 1, 初始化线程池
  33.     thread_pool *pool = malloc(sizeof(thread_pool)); // 分配内存给 线程池管理结构体
  34.     init_pool(pool, 2);                                                                 // 初始化线程池,创建2个线程
  35.     // 2, 添加任务
  36.     printf("向线程池中投送3个任务...\n");
  37.     /**
  38.          * rand() 是 C 标准库函数,定义在 <stdlib.h> 头文件中。它返回一个伪随机数,
  39.          *                  范围在 0 到 RAND_MAX 之间,RAND_MAX 是一个宏,通常定义为 32767。
  40.          *
  41.          * rand() % 10 的结果是 rand() 产生的随机数对 10 取模的结果,也就是说,它会返回一个 0 到 9 之间的整数(包括 0 和 9)
  42.          *
  43.          * 线程函数和任务函数通常需要一个 void * 类型的参数,以便能够传递任意类型的数据。在这种情况下,任务函数 mytask 需要一个 void * 类型的参数。
  44.          */
  45.     add_task(pool, mytask, (void *)(rand() % 10));
  46.     add_task(pool, mytask, (void *)(rand() % 10));
  47.     add_task(pool, mytask, (void *)(rand() % 10));
  48.     // 3, 检查活跃线程数量
  49.     printf("当前活跃的线程数量: %d\n",
  50.            remove_thread(pool, 0)); // 打印当前活跃线程数
  51.     sleep(9);                                                // 等待9秒
  52.     // 4, 添加更多任务
  53.     printf("向线程池中投送2个任务...\n"); // 打印信息
  54.     add_task(pool, mytask, (void *)(rand() % 10));
  55.     add_task(pool, mytask, (void *)(rand() % 10));
  56.     // 5, 添加线程
  57.     add_thread(pool, 2); // 添加2个线程
  58.     sleep(5); // 等待5秒
  59.     // 6, 删除线程
  60.     printf("从线程池中删除3个活跃线程, "
  61.            "当前线程数量: %d\n",
  62.            remove_thread(pool, 3));
  63.     // 7, 销毁线程池
  64.     destroy_pool(pool); // 销毁线程池
  65.     return 0;                        // 程序正常结束
  66. }
复制代码
实际使用时, 只需要将上述代码中的 mytask 函数修改为我们需要实现的功能函数即可
主函数流程图

graph TD    A[主函数开始] --> B[定义线程ID]    B --> C[创建计时线程]    C --> D[初始化线程池]    D --> E[分配内存给线程池]    E --> F[初始化线程池,创建2个线程]    F --> G[添加任务]    G --> H[打印信息: throwing 3 tasks...]    H --> I[添加任务1]    I --> J[添加任务2]    J --> K[添加任务3]    K --> L[检查活跃线程数量]    L --> M[打印当前活跃线程数]    M --> N[等待9秒]    N --> O[添加更多任务]    O --> P[打印信息: throwing another 2 tasks...]    P --> Q[添加任务4]    Q --> R[添加任务5]    R --> S[添加线程]    S --> T[添加2个线程]    T --> U[等待5秒]    U --> V[删除线程]    V --> W[打印信息: remove 3 threads...]    W --> X[删除3个线程]    X --> Y[销毁线程池]    Y --> Z[销毁线程池并释放资源]    Z --> AA[主函数结束]thread_pool.h
  1. #ifndef _THREAD_POOL_H_#define _THREAD_POOL_H_#include          // 标准输入输出库#include  // 布尔类型库#include          // UNIX 标准库,包含 sleep 函数#include          // 标准库,包含 malloc 和 free 函数#include          // 字符串处理库#include  // 字符串处理库#include          // 错误号库#include  // POSIX 线程库#define MAX_WAITING_TASKS 1000 // 处于等待状态的任务数量最大为1000#define MAX_ACTIVE_THREADS 20  // 活跃线程的最大数量, 但该数量最佳应该==CPU一次性可执行的线程数量, 例如6核12线程, 则为12/*************第一步: 构建任务结构体******************/// 任务结点  单向链表的节点,类型
  2. struct task
  3. {
  4.     void *(*do_task)(void *arg); // 任务函数指针  指向线程要执行的任务  格式是固定的
  5.     void *arg;                                         // 需要传递给任务的参数,如果不需要,则NULL
  6.     struct task *next; // 指向下一个任务结点的指针
  7. };// 线程池的管理结构体typedef struct thread_pool{    pthread_mutex_t lock; // 互斥锁, 用于保护任务队列    pthread_cond_t cond;  // 条件量, 代表任务队列中任务个数的变化---如果主线程向队列投放任务, 则可以通过条件变量来唤醒哪些睡着了的线程    bool shutdown; // 是否需要销毁线程池, 控制线程退出, 进而销毁整个线程池    struct task *task_list; // 用于存储任务的链表, 任务队列刚开始没有任何任务, 是一个具有头节点的空链队列    pthread_t *tids; // 用于记录线程池中线程的ID    unsigned max_waiting_tasks; // 线程池中处于等待状态的任务数量最大值    unsigned waiting_tasks;                // 处于等待状态的线程数量    unsigned active_threads;        // 正在活跃的线程数量} thread_pool;// 初始化线程池bool init_pool(thread_pool *pool, unsigned int threads_number);// 向线程池中添加任务bool add_task(thread_pool *pool, void *(*do_task)(void *arg), void *task);// 先线程池中添加线程int add_thread(thread_pool *pool, unsigned int additional_threads_number);// 从线程池中删除线程int remove_thread(thread_pool *pool, unsigned int removing_threads_number);// 销毁线程池bool destroy_pool(thread_pool *pool);// 任务函数 线程例程void *routine(void *arg);#endif
复制代码
thread_pool.c
  1. #include "thread_pool.h" // 包含线程池头文件// 线程取消处理函数,确保线程取消时解锁互斥锁void handler(void *arg){    printf("[%u] 结束了.\n",           (unsigned)pthread_self()); // 打印线程结束信息    pthread_mutex_unlock((pthread_mutex_t *)arg); // 解锁互斥锁}// 线程执行的任务函数void *routine(void *arg){    // 调试    #ifdef DEBUG    printf("[%u] is started.\n",           (unsigned)pthread_self()); // 打印线程开始信息    #endif    // 把需要传递给线程任务的参数进行备份    thread_pool *pool = (thread_pool *)arg; // 将传入的参数转换为线程池指针    struct task *p;                                                        // 定义一个任务指针    while (1) // 无限循环,持续处理任务    {        /*                ** push a cleanup functon handler(), make sure that                ** the calling thread will release the mutex properly                ** even if it is cancelled during holding the mutex.                **                ** NOTE:                ** pthread_cleanup_push() is a macro which includes a                ** loop in it, so if the specified field of codes that                ** paired within pthread_cleanup_push() and pthread_                ** cleanup_pop() use 'break' may NOT break out of the                ** truely loop but break out of these two macros.                ** see line 61 below.                */        /*                 * 注意:                 * 推送一个清理函数handler(),确保调用线程将正确释放互斥量,即使它在持有互斥量期间被取消。                 *                 * pthread_cleanup_push()是一个宏,其中包含一个循环,                 * 所以如果在pthread_cleanup_push()和pthread_ cleanup_pop()中配对的代码的指定字段使用` break `可能不会跳出真正的循环,                 * 而是跳出这两个宏。参见下面的第61行。                 */        //================================================//        pthread_cleanup_push(handler, (void *)&pool->lock); // 注册取消处理函数        pthread_mutex_lock(&pool->lock);                                        // 加锁,保护共享资源        //================================================//        // 1,如果没有任务且线程池未关闭,则等待        while (pool->waiting_tasks == 0 && !pool->shutdown)        {            pthread_cond_wait(&pool->cond, &pool->lock); // 等待条件变量        }        // 2,  如果没有任务且线程池已关闭,则退出        if (pool->waiting_tasks == 0 && pool->shutdown == true)        {            pthread_mutex_unlock(&pool->lock); // 解锁            pthread_exit(NULL);                                   // CANNOT use 'break';  退出线程        }        // 3,    有任务则取出任务        p = pool->task_list->next;                 // 获取第一个任务        pool->task_list->next = p->next; // 将任务从链表中移除        pool->waiting_tasks--;                         // 减少等待任务计数        //================================================//        pthread_mutex_unlock(&pool->lock); // 解锁        pthread_cleanup_pop(0);                           // 取消注册的取消处理函数        //================================================//        pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); // 禁止线程取消        (p->do_task)(p->arg);                                                                  // 执行任务        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);  // 允许线程取消        free(p); // 释放任务内存    }    pthread_exit(NULL); // 退出线程}// 初始化线程池 pool线程池指针  threads_number 初始化线程的个数bool init_pool(thread_pool *pool, unsigned int threads_number){    // 初始化互斥锁    pthread_mutex_init(&pool->lock, NULL);    // 初始化条件量    pthread_cond_init(&pool->cond, NULL);    // 销毁标志 设置线程池为未关闭状态    pool->shutdown = false; // 不销毁    // 给任务链表的节点申请堆内存    pool->task_list = malloc(sizeof(struct task));    // 申请堆内存,用于存储创建出来的线程的ID    pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);    // 错误处理,对malloc进行错误处理    if (pool->task_list == NULL || pool->tids == NULL)    {        perror("分配内存错误");        return false;    }    // 对任务链表中的节点的指针域进行初始化    pool->task_list->next = NULL;    // 设置线程池中处于等待状态的任务数量最大值    pool->max_waiting_tasks = MAX_WAITING_TASKS;    // 设置等待线程处理的任务的数量为0,说明现在没有任务    pool->waiting_tasks = 0;    // 设置线程池中活跃的线程的数量    pool->active_threads = threads_number;    int i;    // 循环创建活跃线程    for (i = 0; i < pool->active_threads; i++)    {        // 创建线程  把线程的ID存储在申请的堆内存        if (pthread_create(&((pool->tids)[i]), NULL,                           routine, (void *)pool) != 0)        {            perror("创建线程错误");            return false;        }        // 用于调试        #ifdef DEBUG        printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",               (unsigned)pthread_self(), __FUNCTION__,               i, (unsigned)pool->tids[i]); // 打印线程创建信息        #endif    }    return true;}// 向线程池的任务链表中添加任务bool add_task(thread_pool *pool,              void *(*do_task)(void *arg), void *arg){    // 给任务链表节点申请内存    struct task *new_task = malloc(sizeof(struct task));    if (new_task == NULL) // 检查内存分配是否成功    {        perror("申请内存错误");        return false;    }    new_task->do_task = do_task; // 设置任务函数指针    new_task->arg = arg;                 // 设置任务参数    new_task->next = NULL;                 // 指针域设置为NULL  初始化任务的下一个指针    //============ LOCK =============//    pthread_mutex_lock(&pool->lock); // 加锁,保护共享资源    //===============================//    // 说明要处理的任务的数量大于能处理的任务数量    if (pool->waiting_tasks >= MAX_WAITING_TASKS) // 检查等待任务是否超过最大值    {        pthread_mutex_unlock(&pool->lock); // 解锁        fprintf(stderr, "任务太多.\n"); // 打印错误信息        free(new_task);                                        // 释放新任务内存        return false;    }    struct task *tmp = pool->task_list; // 获取任务链表头    // 遍历链表,找到单向链表的尾节点    while (tmp->next != NULL)        tmp = tmp->next;    // 把新的要处理的任务插入到链表的尾部  尾插    tmp->next = new_task;    // 要处理的任务的数量+1 (等待任务数量+1)    pool->waiting_tasks++;    //=========== UNLOCK ============//    pthread_mutex_unlock(&pool->lock); // 解锁    //===============================//    // 调试    #ifdef DEBUG    printf("[%u][%s] ==> a new task has been added.\n",           (unsigned)pthread_self(), __FUNCTION__); // 打印任务添加信息    #endif    // 唤醒第一个处于阻塞队列中的线程    pthread_cond_signal(&pool->cond);    return true;}// 向线程池加入新线程int add_thread(thread_pool *pool, unsigned additional_threads){    // 判断需要添加的新线程的数量是否为0  如果没有要添加的线程,直接返回    if (additional_threads == 0)        return 0;    // 计算线程池中总线程的数量    unsigned total_threads =        pool->active_threads + additional_threads;    int i, actual_increment = 0; // 初始化计数器    // 循环创建新线程    for (i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++)    {        // 创建新线程        if (pthread_create(&((pool->tids)[i]),                           NULL, routine, (void *)pool) != 0)        {            perror("增加活跃线程错误"); // 打印错误信息            // 如果没有成功创建任何线程,返回错误            if (actual_increment == 0)                return -1;            break; // 退出循环        }        actual_increment++; // 增加计数器        #ifdef DEBUG        printf("[%u]:[%s] ==> tids[%d]: [%u] is created.\n",               (unsigned)pthread_self(), __FUNCTION__,               i, (unsigned)pool->tids[i]); // 打印线程创建信息        #endif    }    // 记录此时线程池中活跃线程的总数    pool->active_threads += actual_increment; // 更新活跃线程数    return actual_increment;                                  // 返回实际增加的线程数}// 从线程池中删除线程int remove_thread(thread_pool *pool, unsigned int removing_threads){    if (removing_threads == 0)        return pool->active_threads; // 如果没有要删除的线程,直接返回    int remaining_threads = pool->active_threads - removing_threads;   // 计算剩余线程数    remaining_threads = remaining_threads > 0 ? remaining_threads : 1; // 确保至少有一个线程    int i;    for (i = pool->active_threads - 1; i > remaining_threads - 1; i--) // 循环取消线程    {        errno = pthread_cancel(pool->tids[i]); // 取消线程        if (errno != 0) // 检查取消是否成功            break;        #ifdef DEBUG        printf("[%u]:[%s] ==> cancelling tids[%d]: [%u]...\n",               (unsigned)pthread_self(), __FUNCTION__,               i, (unsigned)pool->tids[i]); // 打印线程取消信息        #endif    }    if (i == pool->active_threads - 1) // 如果没有成功取消任何线程,返回错误        return -1;    else    {        pool->active_threads = i + 1; // 更新活跃线程数        return i + 1;                                  // 返回剩余线程数    }}// 销毁线程池
  2. bool destroy_pool(thread_pool *pool)
  3. {
  4.     // 1,激活所有线程 设置关闭标志
  5.     pool->shutdown = true;
  6.     pthread_cond_broadcast(&pool->cond); // 唤醒所有等待中的线程
  7.     // 2, 等待线程们执行完毕
  8.     int i;
  9.     for (i = 0; i < pool->active_threads; i++) // 循环等待所有线程退出
  10.     {
  11.         /**
  12.                  * pthread_join(pool->tids[i], NULL) 的作用是等待线程池中第 i 个线程终止,并清理其相关资源。通过这种方式,可以确保在销毁线程池时,所有线程都已经安全地终止。
  13.                  * pthread_join 是 POSIX 线程库中的一个函数,用于等待一个线程的终止。它的功能类似于进程中的 wait 系统调用。
  14.                  */
  15.         errno = pthread_join(pool->tids[i], NULL); // 等待线程退出
  16.         if (errno != 0)                                                           // 检查等待是否成功
  17.         {
  18.             printf("join tids[%d] error: %s\n",
  19.                    i, strerror(errno)); // 打印错误信息
  20.         }
  21.         else
  22.             printf("[%u] is joined\n", (unsigned)pool->tids[i]); // 打印线程退出信息
  23.     }
  24.     // 3, 销毁线程池
  25.     free(pool->task_list); // 释放任务链表内存
  26.     free(pool->tids);           // 释放线程ID数组内存
  27.     free(pool);                           // 释放线程池结构体内存
  28.     return true;
  29. }
复制代码
线程执行的任务函数流程图

void *routine(void *arg)
mermaid
graph TD    A[线程执行的任务函数开始] --> B[注册取消处理函数]    B --> C[加锁]    C --> D{是否有任务 且 线程池未关闭?}    D -- 否 --> E[等待条件变量]    D -- 是 --> F{是否没有任务 且 线程池已关闭?}    F -- 是 --> G[解锁并退出线程]    F -- 否 --> H[取出任务]    H --> I[从链表中移除任务]    I --> J[减少等待任务计数]    J --> K[解锁]    K --> L[取消注册的取消处理函数]    L --> M[禁止线程取消]    M --> N[执行任务]    N --> O[允许线程取消]    O --> P[释放任务内存]    P --> A销毁线程池流程图

mermaid
graph TD    A[销毁线程池] --> B[设置关闭标志]    B --> C[唤醒所有等待线程]    C --> D[等待所有线程终止]    D --> E[释放任务链表内存]    E --> F[释放线程ID数组内存]    F --> G[释放线程池结构体内存]    G --> H[线程池销毁完成]参考


来源:https://www.cnblogs.com/zqingyang/p/18241374
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具