跳转到内容

C 语言线程池

来自代码酷


C语言线程池是一种并发编程技术,用于管理和复用多个线程,以提高多线程应用程序的性能和资源利用率。它通过预先创建一组线程并维护一个任务队列,避免了频繁创建和销毁线程的开销。

概述[编辑 | 编辑源代码]

线程池的核心思想是“线程复用”。在传统多线程编程中,每次需要执行新任务时都会创建新线程,任务完成后线程被销毁。这种方式在高并发场景下会导致:

  • 频繁的线程创建/销毁开销
  • 系统资源浪费
  • 线程数量不可控

线程池通过以下组件解决这些问题:

  • 固定数量的工作线程:预先创建,长期存在
  • 任务队列:存放待执行的任务
  • 任务调度机制:将任务分配给空闲线程

线程池实现[编辑 | 编辑源代码]

以下是一个基本的C语言线程池实现(需要POSIX线程库支持):

#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>

#define THREAD_NUM 4

typedef struct Task {
    void (*function)(void*);
    void* arg;
    struct Task* next;
} Task;

typedef struct ThreadPool {
    pthread_t threads[THREAD_NUM];
    Task* task_head;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    int shutdown;
} ThreadPool;

void* worker(void* arg) {
    ThreadPool* pool = (ThreadPool*)arg;
    while (1) {
        pthread_mutex_lock(&pool->mutex);
        while (pool->task_head == NULL && !pool->shutdown) {
            pthread_cond_wait(&pool->cond, &pool->mutex);
        }
        if (pool->shutdown) {
            pthread_mutex_unlock(&pool->mutex);
            pthread_exit(NULL);
        }
        Task* task = pool->task_head;
        pool->task_head = pool->task_head->next;
        pthread_mutex_unlock(&pool->mutex);
        task->function(task->arg);
        free(task);
    }
    return NULL;
}

ThreadPool* thread_pool_create() {
    ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
    pool->task_head = NULL;
    pool->shutdown = 0;
    pthread_mutex_init(&pool->mutex, NULL);
    pthread_cond_init(&pool->cond, NULL);
    
    for (int i = 0; i < THREAD_NUM; ++i) {
        pthread_create(&pool->threads[i], NULL, worker, pool);
    }
    return pool;
}

void thread_pool_add_task(ThreadPool* pool, void (*function)(void*), void* arg) {
    Task* task = (Task*)malloc(sizeof(Task));
    task->function = function;
    task->arg = arg;
    task->next = NULL;
    
    pthread_mutex_lock(&pool->mutex);
    if (pool->task_head == NULL) {
        pool->task_head = task;
    } else {
        Task* current = pool->task_head;
        while (current->next != NULL) {
            current = current->next;
        }
        current->next = task;
    }
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
}

void thread_pool_destroy(ThreadPool* pool) {
    pool->shutdown = 1;
    pthread_cond_broadcast(&pool->cond);
    for (int i = 0; i < THREAD_NUM; ++i) {
        pthread_join(pool->threads[i], NULL);
    }
    pthread_mutex_destroy(&pool->mutex);
    pthread_cond_destroy(&pool->cond);
    free(pool);
}

代码说明[编辑 | 编辑源代码]

1. ThreadPool结构体:包含线程数组、任务队列头指针、互斥锁和条件变量 2. worker函数:工作线程的主循环,从队列获取并执行任务 3. thread_pool_create:初始化线程池并创建工作线程 4. thread_pool_add_task:向线程池添加新任务 5. thread_pool_destroy:安全关闭线程池

使用示例[编辑 | 编辑源代码]

以下示例展示如何使用上述线程池计算斐波那契数列:

void fibonacci(void* arg) {
    int n = *(int*)arg;
    if (n <= 1) {
        printf("fib(%d) = %d\n", n, n);
        return;
    }
    
    int a = 0, b = 1, c;
    for (int i = 2; i <= n; ++i) {
        c = a + b;
        a = b;
        b = c;
    }
    printf("fib(%d) = %d\n", n, b);
}

int main() {
    ThreadPool* pool = thread_pool_create();
    
    int nums[] = {10, 20, 30, 40, 50};
    for (int i = 0; i < 5; ++i) {
        int* arg = malloc(sizeof(int));
        *arg = nums[i];
        thread_pool_add_task(pool, fibonacci, arg);
    }
    
    sleep(1); // 等待任务完成
    thread_pool_destroy(pool);
    return 0;
}

输出示例

fib(10) = 55
fib(20) = 6765
fib(30) = 832040
fib(40) = 102334155
fib(50) = 12586269025

线程池架构[编辑 | 编辑源代码]

线程池的工作流程可以用以下mermaid图表示:

graph TD A[主线程] -->|添加任务| B[任务队列] B --> C[工作线程1] B --> D[工作线程2] B --> E[工作线程...] C --> F[执行任务] D --> F E --> F

数学建模[编辑 | 编辑源代码]

线程池的性能可以用排队论模型分析。设:

  • λ:任务到达率(任务/秒)
  • μ:单个线程处理速率(任务/秒)
  • c:线程数量

系统利用率: ρ=λcμ

ρ<1时系统稳定,平均任务等待时间: Wq=ρcμ(1ρ)

实际应用场景[编辑 | 编辑源代码]

1. Web服务器:处理并发HTTP请求 2. 数据库连接池:复用数据库连接 3. 批量数据处理:如图像处理、日志分析 4. 游戏服务器:处理玩家并行请求

高级主题[编辑 | 编辑源代码]

动态线程池[编辑 | 编辑源代码]

可根据负载动态调整线程数量:

  • 核心线程数:常驻线程
  • 最大线程数:允许扩展到的上限
  • 空闲线程超时:回收多余线程

任务优先级[编辑 | 编辑源代码]

实现优先级队列,使高优先级任务先执行:

// 修改Task结构体
typedef struct Task {
    void (*function)(void*);
    void* arg;
    int priority;  // 优先级字段
    struct Task* next;
} Task;

负载均衡[编辑 | 编辑源代码]

使用工作窃取(Work Stealing)算法:

  • 每个线程维护自己的任务队列
  • 空闲线程可以从其他线程队列“窃取”任务

最佳实践[编辑 | 编辑源代码]

1. 合理设置线程数量:通常为CPU核心数的1-2倍 2. 避免任务阻塞:长时间运行的任务应使用回调或异步IO 3. 任务原子性:确保任务可以独立执行 4. 资源清理:正确释放线程池和任务资源

常见问题[编辑 | 编辑源代码]

Q:线程池大小如何确定? A:考虑:

  • CPU密集型任务:Ncpu+1
  • IO密集型任务:2*Ncpu
  • 公式:Nthreads = Ncpu * Ucpu * (1 + W/C)
 其中Ucpu是目标CPU利用率,W/C是等待时间与计算时间比

Q:如何处理任务异常? A:建议: 1. 在任务函数内部捕获异常 2. 记录错误日志 3. 避免异常影响线程池运行

扩展阅读[编辑 | 编辑源代码]

  • POSIX线程(pthread)规范
  • 生产者-消费者模式
  • 无锁队列实现
  • 现代C++中的线程池实现(如C++17的execution)