2026/4/17 18:18:41
网站建设
项目流程
陕西手机网站建设公司,wordpress 二栏,263企业邮箱怎么改密码,深圳建设 骏域网站建设专家1. 进程池介绍
1.1 核心定义
进程池#xff08;Process Pool#xff09; 是一种预创建复用式的进程管理技术#xff0c;其本质是操作系统中预分配的进程资源容器。它包含两大核心组件#xff1a;
资源进程#xff1a;池中预先创建的空闲进程#xff0c;随时待命执…1. 进程池介绍1.1 核心定义进程池Process Pool是一种预创建复用式的进程管理技术其本质是操作系统中预分配的进程资源容器。它包含两大核心组件资源进程池中预先创建的空闲进程随时待命执行任务。管理进程负责分配任务、回收进程和监控状态的中控系统 。1.2 运作模式图解关键特征进程数量固定如CPU核心数的1-2倍任务与进程解耦任务排队等待空闲进程进程复用任务完成后进程返回池中待命所以进程池中维护了一定数量的进程这些进程在进程池创建时被预先创建并一直存在于系统中等待任务的分配。当有任务提交给进程池时它会从池中取出一个空闲进程来执行该任务任务执行完毕后该进程不会被销毁而是返回进程池等待下一个任务的分配。1.3 为什么需要进程池前提普通进程的致命缺陷代码语言txtAI代码解释- 创建进程需复制页表、文件描述符等资源Linux中fork()耗时可达**数百微秒** - 销毁进程需内核回收内存、发送信号等开销更大进程池优化减少进程创建和销毁的开销创建和销毁进程是耗时的操作涉及操作系统的系统调用和资源分配。使用进程池预先创建一组进程可避免频繁的进程创建和销毁提高程序性能和效率。控制并发进程数量进程池能限制并发执行的进程数防止进程过多导致系统资源耗尽。通过设置进程池大小可根据系统处理能力和任务特性合理分配资源确保进程数量在合理范围内。充分利用多核 CPU进程池可以充分利用多核 CPU 的优势通过并行执行多个进程实现真正的并行处理提高系统的处理能力和任务的执行效率特别适用于 CPU 密集型任务。避免进程资源竞争和冲突过多的进程易导致资源竞争和冲突如共享内存同步问题等。进程池限制了进程数量降低了资源竞争和冲突的概率提高了程序的稳定性和可靠性。提供任务队列和排队机制进程池通常与任务队列结合使用可将任务放入队列由进程池自动取出并处理。这避免了任务丢失和重复执行提供了简单高效的任务调度和排队机制使任务处理更有序可控。2. 基于匿名管道实现进程池下面我们会基于父进程写子进程读的匿名管道来实现进程池注意我们大部分的封装都会在ProcessPool.hpp源文件中实现关于.hpp源文件我们先来介绍一下.hpp 头文件可以同时包含声明和定义。允许的定义类型有代码语言txtAI代码解释- 模板定义 (必须)代码语言txtAI代码解释- 内联函数定义 (应该)代码语言txtAI代码解释- constexpr 函数/变量定义 (应该)代码语言txtAI代码解释- 类/结构体/联合体定义本身 (必须)代码语言txtAI代码解释- 类成员函数的定义 (可以需隐式或显式 inline)代码语言txtAI代码解释- inline 变量定义 (C17, 应该)代码语言txtAI代码解释- 类内静态常量整型/枚举成员的初始化 (不是完整定义C17 前需注意)禁止的定义类型有代码语言txtAI代码解释- 非内联、非模板的普通函数定义代码语言txtAI代码解释- 非内联、非 constexpr、非模板的全局变量或静态变量定义2.1 框架先来把框架给搭好上图展示的是我们大致想要实现的框架具体需求我们后面慢慢补充代码语言javascriptAI代码解释#include iostream #include vector // 先描述 class Channel { public: Channel() {} ~Channel() {} private: int _wfd; pid_t _subid; }; // 再组织 class ChannelManager { public: ChannelManager() {} ~ChannelManager() {} private: std::vectorChannel _channels; }; const int gdefaultnum 5;// 默认创建5个子进程 class ProcessPool { public: ProcessPool(int num gdefaultnum) :_process_num(num) {} ~ProcessPool() {} private: ChannelManager _cm; int _process_num; };后面需要哪个我们再继续完善Channel类将我们打开的管道描述起来父进程可以通过写端文件描述符wfd来管理所以我们肯定需要拿到打开文件的文件描述符fd故我们可以先有一个成员变量_wfd方便后续可以管理打开的管道文件同样打开管道是为了让父子进程通信那我们肯定也需要对子进程进行管理那对子进程管理就需要子进程pid所以还需要定义一个成员变量_subid至于还需要什么我们后面需要的时候再加上。ChannelManager类将我们打开的所有管道统一管理起来进行面向对象式的封装方便后续管理所以我们使用vector来将所有打开的管道进行管理ProcessPool类封装所有创建的进程为进程池类的主体将创建的进程都管理起来达到我们实现进程池的目的。每个进程都需要通过管道来与父进程通信所以我们需要一个ChannelManager类型的成员变量方便我们后续在ProcessPool类主体中将进程管理起来同时还可以使用一个成员变量来记录我们进程池中的进程数量那我们构造函数就可以先定义一个全局变量默认一开始创建5个子进程来初始化_process_num。2.2 启动进程池进程池首先得要有 进程池 所以我们需要做一些准备工作和上篇文章【进程间通信匿名管道】的代码示例一样的流程首先需要创建管道然后创建子进程分别关闭父子进程的读写端不过这里我们需要循环一样的步骤分别创建默认的5个管道和子进程来做到有 进程池 。代码如下代码语言javascriptAI代码解释bool Start() { for (int i 0; i _process_num; i) { // 1. 创建管道 int pipefd[2] {0}; int n pipe(pipefd); if(n 0) return false; // 2. 创建子进程 pid_t subid fork(); if(subid 0) return false; else if(subid 0) { // 子进程 // 3. 关闭不需要的读写端 close(pipefd[1]); Work(pipefd[0]); //子进程需要处理父进程派发的任务 close(pipefd[0]); exit(0); } else { // 父进程 // 3. 关闭不需要的读写端 close(pipefd[0]); _cm.Insert(pipefd[1], subid); // 父进程将创建的子进程和对应的管道插入到数组中管理起来 } } return true; }注意父进程完成将创建的子进程和对应的管道插入到数组中管理起来之后不能关闭写端要管理进程池就等于管理数组管理数组的增删查改管理数组我们需要通过父进程先将这些创建的子进程和管道插入到数组中方便后续管理代码语言javascriptAI代码解释void Insert(int wfd, int subid) { _channels.emplace_back(wfd, subid); }关于emplace相关接口可在【C】专栏的【C11右值引用相关文章】详细了解那么channel类的构造函数就需要完善同时为了后续方便我们肉眼能够区分每个子进程和管道再增加一个Channel类的成员变量_name这样后面测试的时候可以打印在屏幕上方便我们查看代码如下代码语言javascriptAI代码解释class Channel { public: Channel(int wfd, pid_t subid) :_wfd(wfd) ,_subid(subid) { _name channel- std::to_string(_wfd) - std::to_string(_subid); } int Fd() { return _wfd; } pid_t Subid() { return _subid; } std::string Name() { return _name; } ~Channel() { } private: int _wfd; pid_t _subid; std::string _name; };方便后续在其他类中要访问Channel类中的私有成员变量我们在实现出能获得私有变量值的公有方法子进程接收任务接下来就来处理子进程子进程通过管道读取父进程派发的任务码然后去执行对应的任务代码如下代码语言javascriptAI代码解释void Work(int rfd) { while (true) { int code 0; ssize_t n read(rfd, code, sizeof(code)); if(n 0) { if(n ! sizeof(code)) continue; std::cout 子进程[ getpid() ]收到一个任务码 std::endl; // 根据任务码执行对应任务 // 待实现 } else if(n 0) { std::cout 子进程退出 std::endl; break; } else { std::cout 读取错误 std::endl; break; } } }2.3 运行进程池启动进程池并完成需要的准备工作之后就需要运行进程池让进程池能够去执行父进程分配的任务添加任务所以我们还需要一个任务类封装函数指针这样在后面我们需要执行哪个任务就可以通过函数指针进行回调关于回调函数可以看回调函数详解代码如下代码语言javascriptAI代码解释#include iostream #include vector typedef void (*task_t)(); void PrintLog() { std::cout 我是一个打印日志的任务 std::endl; } void Download() { std::cout 我是一个下载的任务 std::endl; } void Upload() { std::cout 我是一个上传的任务 std::endl; } class TaskManager { public: TaskManager() {} void Register(task_t t) { _tasks.push_back(t); } ~TaskManager() {} private: std::vectortask_t _tasks; };同样这里使用数组vector来管理任务数组中每个元素都是函数指针我们根据任务码来执行对应的任务我们可以将任务码与对应任务进行关联怎么关联呢由于我们使用数组来管理任务所以可以使用下标来进行关联这样就可以通过任务码来获取对应的函数指针然后函数指针回调执行对应的任务所以我们需要一个注册函数将函数指针都插入到数组中那么下面我们还要处理任务码相关工作代码语言javascriptAI代码解释TaskManager() { srand(time(nullptr)); } int Code() { return rand() % _tasks.size(); } void Execute(int code) { if(code 0 code _tasks.size()) { _tasks[code](); } }我们可以使用随机数来随机生成任务码不过任务码需要与下标对应所以需要取模数组的大小然后根据任务码回调具体的任务函数那么我们在ProcessPool类中还需要再添加一个成员变量来管理任务如下代码语言javascriptAI代码解释private: ChannelManager _cm; int _process_num; TaskManager _tm;构造函数中我们需要初始化注册所有任务函数也就是将所有任务函数插入到数组中代码语言javascriptAI代码解释ProcessPool(int num gdefaultnum) : _process_num(num) { _tm.Register(PrintLog); _tm.Register(Download); _tm.Register(Upload); }运行那么接下来就需要运行进程池首先我们获取TaskManager类中随机生成的任务码代码语言javascriptAI代码解释// 1. 获取任务码 int taskcode _tm.Code();接下来我们需要将任务码写入管道然后让子进程从管道中读取任务码子进程根据任务码执行任务那么要写入管道我们需要先选择一个管道来写选哪个管道呢肯定不能随便选也不能乱选不然后果就是进程池中的进程 “闲的闲死忙的忙死” 负载不均衡。那要怎么选择才可以负载均衡这里我们采用轮询的方式来选择管道也就是依次选择管道大家轮着来。那如何轮询呢首先我们在ChannelManager类中添加一个成员变量_next代表接下来轮到谁代码语言javascriptAI代码解释private: std::vectorChannel _channels; int _next 0;同时缺省值为0表示从数组中0号下标开始轮询那么怎么选现在就一目了然了代码如下代码语言javascriptAI代码解释Channel Select() { auto c _channels[_next]; _next % _channels.size(); return c; }管道选好了那么现在就需要把任务码写入管道代码如下代码语言javascriptAI代码解释void Send(int code) { ssize_t n write(_wfd, code, sizeof(code)); if(n ! sizeof(code)) { std::cerr 写入错误 std::endl; } }完整Run()函数代码如下代码语言javascriptAI代码解释void Run() { // 1. 获取任务码 int taskcode _tm.Code(); // 2. 选择一个管道写入让子进程读取然后执行 auto c _cm.Select(); std::cout 选择了一个子进程: c.Name() std::endl; c.Send(taskcode); std::cout 发送了一个任务码: taskcode std::endl; }所以待实现的子进程执行任务代码也可以完善了代码语言javascriptAI代码解释void Work(int rfd) { while (true) { int code 0; ssize_t n read(rfd, code, sizeof(code)); if(n 0) { if(n ! sizeof(code)) continue; std::cout 子进程[ getpid() ]收到一个任务码 std::endl; // 根据任务码执行对应任务 _tm.Execute(code); } else if(n 0) { std::cout 子进程退出 std::endl; break; } else { std::cout 读取错误 std::endl; break; } } }2.4 结束进程池最终是需要结束进程池的可能是正常结束也有可能是异常结束但不管是哪种结束我们都需要回收资源避免内存泄漏代码如下代码语言javascriptAI代码解释void Stop() { // 关闭文件描述符回收子进程 _cm.CloseFd(); _cm.WaitSubid(); }注意只需要关闭父进程写端文件描述符写端关闭管道内数据读完后子进程再读read会返回0在Work()接口中就会跳出循环子进程工作完之后就会自己关闭读端关闭文件描述符只需要在ChannelManager类中循环遍历vector调用每个Channel对象自己关闭文件描述符的方法代码如下代码语言javascriptAI代码解释// ChannelManager void CloseFd() { for(auto channel : _channels) { channel.Close(); std::cout 关闭: channel.Name() std::endl; } } // Channel void Close() { close(_wfd); }回收子进程和关闭文件描述符一样只需循环遍历数组调用Channel对象自己的回收子进程方法代码语言javascriptAI代码解释// ChannelManager void WaitSubid() { for(auto channel : _channels) { channel.Wait(); std::cout 回收: channel.Name() std::endl; } } // Channel void Wait() { pid_t rid waitpid(_subid, nullptr, 0); (void)rid;//不处理返回值正常情况都会回收成功 }2.5 测试代码在Main.cc中完成对进程池的测试代码如下代码语言javascriptAI代码解释#include ProcessPool.hpp int main() { // 创建进程池 ProcessPool pp; // 启动进程池 pp.Start(); // 自动派发任务 int cnt 10; while (cnt--) { pp.Run(); sleep(1); } // 结束进程池 pp.Stop(); return 0; }这里我们循环运行10次看看效果运行结果代码语言javascriptAI代码解释ltxiv-ye1i2elts0wh2yp1ahah:~/Linux_system/lesson10/Processpool$ ./processpool 选择了一个子进程:channel-4-268693 发送了一个任务码:1 子进程[268693]收到一个任务码 我是一个下载的任务 选择了一个子进程:channel-5-268694 发送了一个任务码:0 子进程[268694]收到一个任务码 我是一个打印日志的任务 选择了一个子进程:channel-6-268695 发送了一个任务码:1 子进程[268695]收到一个任务码 我是一个下载的任务 选择了一个子进程:channel-7-268696 发送了一个任务码:1 子进程[268696]收到一个任务码 我是一个下载的任务 选择了一个子进程:channel-8-268697 发送了一个任务码:0 子进程[268697]收到一个任务码 我是一个打印日志的任务 选择了一个子进程:channel-4-268693 发送了一个任务码:1 子进程[268693]收到一个任务码 我是一个下载的任务 选择了一个子进程:channel-5-268694 发送了一个任务码:2 子进程[268694]收到一个任务码 我是一个上传的任务 选择了一个子进程:channel-6-268695 发送了一个任务码:0 子进程[268695]收到一个任务码 我是一个打印日志的任务 选择了一个子进程:channel-7-268696 发送了一个任务码:0 子进程[268696]收到一个任务码 我是一个打印日志的任务 选择了一个子进程:channel-8-268697 发送了一个任务码:2 子进程[268697]收到一个任务码 我是一个上传的任务 关闭:channel-4-268693 关闭:channel-5-268694 关闭:channel-6-268695 关闭:channel-7-268696 关闭:channel-8-268697 子进程退出 子进程退出 子进程退出 子进程退出 子进程退出 回收:channel-4-268693 回收:channel-5-268694 回收:channel-6-268695 回收:channel-7-268696 回收:channel-8-268697通过ps指令查看可以看到我们确实创建了5个子进程进程池结束之后5个子进程也成功被回收。其实我们的进程池还有一个小bug这里我们是先关闭所有子进程文件描述符再回收所有子进程那如果我们一边关闭一边回收呢如下代码代码语言javascriptAI代码解释void CloseAndWait() { for(auto channel : _channels) { channel.Close(); std::cout 关闭: channel.Name() std::endl; channel.Wait(); std::cout 回收: channel.Name() std::endl; } }运行结果我们可以看到只有父进程关闭了一个文件描述符然后就阻塞住了这是什么原因呢2.6 关于bug如下图分析由于每次循环创建子进程时都会拷贝父进程的文件描述符所以原来父进程的写端指向的管道也会被拷贝到子进程的文件描述符所以之前的管道也会被后创建的子进程的写端指向。交叉操作边关边等的问题如果改为循环内交替执行channel.Close()和channel.Wait()代码语言javascriptAI代码解释for (auto channel : _channels) { channel.Close(); // 关闭一个写端 channel.Wait(); // 等待对应子进程 }会导致死锁风险代码语言txtAI代码解释- 这里我们有子进程 1、2、3、4、5。代码语言txtAI代码解释- 关闭 父进程 的写端后子进程1 的管道引用计数-1。代码语言txtAI代码解释- 但 子进程2、3、4、5 的写端任然指向子进程1的管道**尚未关闭**写端不写读端进程因无数据可读而进入阻塞状态挂起。代码语言txtAI代码解释- 父进程在关闭 子进程1 的文件描述符后会继续阻塞等待waitpid阻塞等回收子进程1而子进程又因为read读被阻塞挂起导致一直卡在这代码语言txtAI代码解释- **问题**子进程1 的写端还未全部关闭父进程卡在 Wait() 等待 子进程1 退出但 子进程1 仍在阻塞读因为它的写端未被关闭read 不返回。代码语言txtAI代码解释- 父进程和子进程 1 互相等待父进程等 子进程1 退出子进程1 等父进程关闭写端或发送数据 → **死锁**。正确顺序先关fd再wait的工作原理关闭所有写端**CloseFd()**代码语言txtAI代码解释- 父进程关闭所有管道的写端文件描述符_wfd。代码语言txtAI代码解释- 当某个管道的**所有写端都被关闭**时子进程在 read 调用中会收到 EOF返回 n0。代码语言txtAI代码解释- 子进程检测到 n0 后跳出循环执行 close(pipefd[0]) 并 exit(0) 退出。等待子进程**WaitSubid()**代码语言txtAI代码解释- 父进程通过 waitpid 回收**已退出**的子进程。代码语言txtAI代码解释- 由于所有子进程已收到 EOF 并退出waitpid 会立即返回不会阻塞。关键点子进程的退出条件子进程通过read阻塞等待任务。只有父进程关闭写端子进程的read才会返回0从而触发退出。如果父进程不先关闭所有写端部分子进程会永远阻塞在read导致waitpid无限等待。虽然先关闭所有文件描述符再回收所有子进程不会引起这样的问题但是这个bug依然存在并没有解决。那有什么解决方法呢首先我们可以倒着从最后一个管道开始边关闭写端边回收子进程虽然这样不会出现死锁的问题但bug依然没有解决所以我们可以采取在创建子进程时关闭子进程的 哥哥进程 的写端方法来解决这个bug的根本所在代码语言javascriptAI代码解释void CloseAll() { for(int i _channels.size()-1; i 0; i--) { _channels[i].Close(); } }子进程创建时拷贝父进程的文件描述符子进程遍历数组将已经拷贝自父进程的 ”哥哥进程 的写端关闭这里你可能会问父子进程会并发进行创建子进程之后的代码那父进程在把当前子进程插入数组后那当前子进程遍历数组关闭所有写端会不会把父进程新插入指向当前子进程的管道的写端也给关闭了呢答案是不会的因为有写时拷贝(COW)的存在代码语言javascriptAI代码解释bool Start() { for (int i 0; i _process_num; i) { // 1. 创建管道 int pipefd[2] {0}; int n pipe(pipefd); if (n 0) return false; // 2. 创建子进程 pid_t subid fork(); if (subid 0) return false; else if (subid 0) { // 子进程 // 3. 关闭不需要的读写端 close(pipefd[1]); _cm.CloseAll(); // 关闭除了父进程的写端 Work(pipefd[0]); // 子进程需要处理父进程派发的任务 close(pipefd[0]); exit(0); } else { // 父进程 // 3. 关闭不需要的读写端 close(pipefd[0]); _cm.Insert(pipefd[1], subid); // 父进程将创建的子进程和对应的管道插入到数组中管理起来make } } return true; }现在我们再运行一下查看效果代码语言javascriptAI代码解释ltxiv-ye1i2elts0wh2yp1ahah:~/Linux_system/lesson10/Processpool$ make g -o processpool Main.cc -stdc11 ltxiv-ye1i2elts0wh2yp1ahah:~/Linux_system/lesson10/Processpool$ ./processpool 选择了一个子进程:channel-4-274187 发送了一个任务码:2 子进程[274187]收到一个任务码 我是一个上传的任务 选择了一个子进程:channel-5-274188 发送了一个任务码:0 子进程[274188]收到一个任务码 我是一个打印日志的任务 选择了一个子进程:channel-6-274189 发送了一个任务码:2 子进程[274189]收到一个任务码 我是一个上传的任务 选择了一个子进程:channel-7-274190 发送了一个任务码:1 子进程[274190]收到一个任务码 我是一个下载的任务 选择了一个子进程:channel-8-274191 发送了一个任务码:2 子进程[274191]收到一个任务码 我是一个上传的任务 选择了一个子进程:channel-4-274187 子进程[274187]收到一个任务码 我是一个打印日志的任务 发送了一个任务码:0 选择了一个子进程:channel-5-274188 发送了一个任务码:0 子进程[274188]收到一个任务码 我是一个打印日志的任务 选择了一个子进程:channel-6-274189 子进程[274189]收到一个任务码 我是一个打印日志的任务 发送了一个任务码:0 选择了一个子进程:channel-7-274190 发送了一个任务码:1 子进程[274190]收到一个任务码 我是一个下载的任务 选择了一个子进程:channel-8-274191 发送了一个任务码:1 子进程[274191]收到一个任务码 我是一个下载的任务 关闭:channel-4-274187 子进程退出 回收:channel-4-274187 关闭:channel-5-274188 子进程退出 回收:channel-5-274188 关闭:channel-6-274189 子进程退出 回收:channel-6-274189 关闭:channel-7-274190 子进程退出 回收:channel-7-274190 关闭:channel-8-274191 子进程退出 回收:channel-8-274191成功解决源码Main.cc代码语言javascriptAI代码解释#include ProcessPool.hpp int main() { // 创建进程池 ProcessPool pp; // 启动进程池 pp.Start(); // 自动派发任务 int cnt 10; while (cnt--) { pp.Run(); sleep(1); } // 结束进程池 pp.Stop(); return 0; }ProcessPool.hpp代码语言javascriptAI代码解释#ifndef __PROCESS_POOL_HPP__ #define __PROCESS_POOL_HPP__ #include iostream #include vector #include sys/types.h #include unistd.h #include cstdlib #include cstdio #include sys/wait.h #include Task.hpp // 先描述 class Channel { public: Channel(int wfd, pid_t subid) : _wfd(wfd), _subid(subid) { _name channel- std::to_string(_wfd) - std::to_string(_subid); } void Send(int code) { ssize_t n write(_wfd, code, sizeof(code)); // if(n ! sizeof(code)) // { // std::cerr 写入错误 std::endl; // } (void)n; } void Close() { close(_wfd); } void Wait() { pid_t rid waitpid(_subid, nullptr, 0); (void)rid;//不处理返回值正常情况都会回收成功 } int Fd() { return _wfd; } pid_t Subid() { return _subid; } std::string Name() { return _name; } ~Channel() {} private: int _wfd; pid_t _subid; std::string _name; }; // 再组织 class ChannelManager { public: ChannelManager() {} void Insert(int wfd, int subid) { _channels.emplace_back(wfd, subid); } Channel Select() { auto c _channels[_next]; _next % _channels.size(); return c; } void CloseFd() { for(auto channel : _channels) { channel.Close(); std::cout 关闭: channel.Name() std::endl; } } void WaitSubid() { for(auto channel : _channels) { channel.Wait(); std::cout 回收: channel.Name() std::endl; } } void CloseAndWait() { for(auto channel : _channels) { channel.Close(); std::cout 关闭: channel.Name() std::endl; channel.Wait(); std::cout 回收: channel.Name() std::endl; } } void CloseAll() { for(int i _channels.size()-1; i 0; i--) { _channels[i].Close(); } } ~ChannelManager() {} private: std::vectorChannel _channels; int _next 0; }; const int gdefaultnum 5; // 默认创建5个子进程 class ProcessPool { public: ProcessPool(int num gdefaultnum) : _process_num(num) { _tm.Register(PrintLog); _tm.Register(Download); _tm.Register(Upload); } void Work(int rfd) { while (true) { int code 0; ssize_t n read(rfd, code, sizeof(code)); if(n 0) { if(n ! sizeof(code)) continue; std::cout 子进程[ getpid() ]收到一个任务码 std::endl; // 根据任务码执行对应任务 _tm.Execute(code); } else if(n 0) { std::cout 子进程退出 std::endl; break; } else { std::cout 读取错误 std::endl; break; } } } bool Start() { for (int i 0; i _process_num; i) { // 1. 创建管道 int pipefd[2] {0}; int n pipe(pipefd); if (n 0) return false; // 2. 创建子进程 pid_t subid fork(); if (subid 0) return false; else if (subid 0) { // 子进程 // 3. 关闭不需要的读写端 close(pipefd[1]); _cm.CloseAll(); // 关闭除了父进程的写端 Work(pipefd[0]); // 子进程需要处理父进程派发的任务 close(pipefd[0]); exit(0); } else { // 父进程 // 3. 关闭不需要的读写端 close(pipefd[0]); _cm.Insert(pipefd[1], subid); // 父进程将创建的子进程和对应的管道插入到数组中管理起来make } } return true; } void Run() { // 1. 获取任务码 int taskcode _tm.Code(); // 2. 选择一个管道写入让子进程读取然后执行 auto c _cm.Select(); std::cout 选择了一个子进程: c.Name() std::endl; c.Send(taskcode); std::cout 发送了一个任务码: taskcode std::endl; } void Stop() { // 关闭文件描述符回收子进程 // _cm.CloseFd(); // _cm.WaitSubid(); _cm.CloseAndWait(); } ~ProcessPool() {} private: ChannelManager _cm; int _process_num; TaskManager _tm; }; #endifTask.hpp代码语言javascriptAI代码解释#pragma once #include iostream #include vector #include ctime typedef void (*task_t)(); void PrintLog() { std::cout 我是一个打印日志的任务 std::endl; } void Download() { std::cout 我是一个下载的任务 std::endl; } void Upload() { std::cout 我是一个上传的任务 std::endl; } class TaskManager { public: TaskManager() { srand(time(nullptr)); } int Code() { return rand() % _tasks.size(); } void Execute(int code) { if(code 0 code _tasks.size()) { _tasks[code](); } } void Register(task_t t) { _tasks.push_back(t); } ~TaskManager() {} private: std::vectortask_t _tasks; };Makefile代码语言javascriptAI代码解释processpool:Main.cc g -o $ $^ -stdc11 .PHONY:clean clean: rm -f processpool