线程池和线程池版本的服务器代码 Linux网络通信( 二 )

//取任务static void* thrRun(void* arg){ThreadPool *pool = (ThreadPool*)arg;int taskpos = 0;//任务位置PoolTask *task = new PoolTask();while(1){//获取任务,先要尝试加锁pthread_mutex_lock(&pool->pool_lock);//无任务并且线程池不是要摧毁while(pool->job_num <= 0 && !pool->shutdown ){//如果没有任务,线程会阻塞pthread_cond_wait(&pool->not_empty_task,&pool->pool_lock);}if(pool->job_num){//有任务需要处理taskpos = (pool->job_pop++)%pool->max_job_num;//printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());//为什么要拷贝?避免任务被修改,生产者会添加任务memcpy(task,&pool->tasks[taskpos],sizeof(PoolTask));task->arg = task;pool->job_num--;//task = &thrPool->tasks[taskpos];pthread_cond_signal(&pool->empty_task);//通知生产者}if(pool->shutdown){//代表要摧毁线程池,此时线程退出即可//pthread_detach(pthread_self());//临死前分家pthread_mutex_unlock(&pool->pool_lock);delete(task);pthread_exit(NULL);}//释放锁pthread_mutex_unlock(&pool->pool_lock);task->task_func(task->arg);//执行回调函数}}整体代码:
#include<iostream>#include<string.h>#include<pthread.h>#include<sys/types.h>#include<stdio.h>#include<unistd.h>using namespace std;int beginnum = 1;class PoolTask{public:int tasknum;//模拟任务编号void *arg;//回调函数参数void (*task_func)(void *arg);//任务的回调函数};//任务回调函数void taskRun(void *arg){PoolTask *task = (PoolTask*)arg;int num = task->tasknum;printf("task %d is runing %lu\n",num,pthread_self());sleep(1);printf("task %d is done %lu\n",num,pthread_self());}class ThreadPool{public://构造函数,初始化线程池ThreadPool(int thrnum, int maxtasknum){this->thr_num = thrnum;this->max_job_num = maxtasknum;this->shutdown = 0;//是否摧毁线程池,1代表摧毁this->job_push = 0;//任务队列添加的位置this->job_pop = 0;//任务队列出队的位置this->job_num = 0;//初始化的任务个数为0//申请最大的任务队列this->tasks = new PoolTask[thrnum];//初始化锁和条件变量pthread_mutex_init(&(this->pool_lock),NULL);pthread_cond_init(&(this->empty_task),NULL);pthread_cond_init(&(this->not_empty_task),NULL);int i = 0;this->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间pthread_attr_t attr;pthread_attr_init(&attr);pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);for(i = 0;i < thrnum;i++){pthread_create(&(this->threads[i]),&attr,thrRun,this);//创建多个线程}}static void* thrRun(void* arg){ThreadPool *pool = (ThreadPool*)arg;int taskpos = 0;//任务位置PoolTask *task = new PoolTask();while(1){//获取任务,先要尝试加锁pthread_mutex_lock(&pool->pool_lock);//无任务并且线程池不是要摧毁while(pool->job_num <= 0 && !pool->shutdown ){//如果没有任务,线程会阻塞pthread_cond_wait(&pool->not_empty_task,&pool->pool_lock);}if(pool->job_num){//有任务需要处理taskpos = (pool->job_pop++)%pool->max_job_num;//printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());//为什么要拷贝?避免任务被修改,生产者会添加任务memcpy(task,&pool->tasks[taskpos],sizeof(PoolTask));task->arg = task;pool->job_num--;//task = &thrPool->tasks[taskpos];pthread_cond_signal(&pool->empty_task);//通知生产者}if(pool->shutdown){//代表要摧毁线程池,此时线程退出即可//pthread_detach(pthread_self());//临死前分家pthread_mutex_unlock(&pool->pool_lock);delete(task);pthread_exit(NULL);}//释放锁pthread_mutex_unlock(&pool->pool_lock);task->task_func(task->arg);//执行回调函数}}//析构函数,摧毁线程池~ThreadPool(){this->shutdown = 1;//关闭线程池pthread_cond_broadcast(&(this->not_empty_task));//诱杀int i = 0;for(i = 0; i<this->thr_num ; i++){pthread_join(this->threads[i],NULL);}pthread_cond_destroy(&(this->not_empty_task));pthread_cond_destroy(&(this->empty_task));pthread_mutex_destroy(&(this->pool_lock));delete []tasks;tasks = NULL;free(this->threads);}public://添加任务到线程池void addtask(){pthread_mutex_lock(&(this->pool_lock));cout << "当前任务队列中任务的个数是: " <<this-> job_num <<endl;//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)while(this->max_job_num <= this->job_num){pthread_cond_wait(&(this->empty_task),&(this->pool_lock));}int taskpos = (this->job_push++)%this->max_job_num;this->tasks[taskpos].tasknum = beginnum++;this->tasks[taskpos].arg = (void*)&this->tasks[taskpos];this->tasks[taskpos].task_func = taskRun;this->job_num++;pthread_mutex_unlock(&(this->pool_lock));pthread_cond_signal(&(this->not_empty_task));//通知包身工}private://任务队列相关的参数int max_job_num;//最大任务个数int job_num;//实际任务个数PoolTask *tasks;//任务队列数组int job_push;//入队位置int job_pop;// 出队位置//线程相关参数int thr_num;//线程池内线程个数pthread_t *threads;//线程池内线程数组int shutdown;//是否关闭线程池pthread_mutex_t pool_lock;//线程池的锁pthread_cond_t empty_task;//任务队列为空的条件pthread_cond_t not_empty_task;//任务队列不为空的条件};int main(){ThreadPool *m = new ThreadPool(3,20);int j = 0;for(j=0;j<20;j++){m->addtask();}sleep(20);delete m;m = NULL;system("pause");return EXIT_SUCCESS;}

推荐阅读