Wiki LogoWiki - The Power of Many

Week 16: 进程与线程

掌握进程创建、进程间通信、POSIX 线程与线程同步.

1. 进程基础

1.1 进程概念

进程是程序的运行实例, 包含:

  • 代码段、数据段、堆、栈
  • 进程 ID (PID)
  • 文件描述符表
  • 信号处理表

1.2 创建进程 (fork)

#include <unistd.h>
#include <stdio.h>

int main(void) {
    pid_t pid = fork();
    
    if (pid == -1) {
        perror("fork");
        return 1;
    } else if (pid == 0) {
        // 子进程
        printf("Child: PID = %d, Parent PID = %d\n", 
               getpid(), getppid());
    } else {
        // 父进程
        printf("Parent: PID = %d, Child PID = %d\n", 
               getpid(), pid);
    }
    
    return 0;
}

1.3 exec 系列函数

#include <unistd.h>

// 替换当前进程映像
execl("/bin/ls", "ls", "-la", NULL);
execlp("ls", "ls", "-la", NULL);  // 使用 PATH
execv("/bin/ls", (char *[]){"/bin/ls", "-la", NULL});
execvp("ls", (char *[]){"ls", "-la", NULL});

// exec 成功后不会返回
perror("exec");  // 只有失败才会执行到这里

1.4 等待子进程

#include <sys/wait.h>

pid_t pid = fork();
if (pid == 0) {
    // 子进程
    exit(42);
} else {
    int status;
    waitpid(pid, &status, 0);
    
    if (WIFEXITED(status)) {
        printf("Exit code: %d\n", WEXITSTATUS(status));
    }
}

2. 进程间通信 (IPC)

2.1 管道

#include <unistd.h>

int pipefd[2];
pipe(pipefd);  // pipefd[0] 读, pipefd[1] 写

pid_t pid = fork();
if (pid == 0) {
    // 子进程: 写入
    close(pipefd[0]);
    write(pipefd[1], "Hello", 5);
    close(pipefd[1]);
    exit(0);
} else {
    // 父进程: 读取
    close(pipefd[1]);
    char buf[100];
    ssize_t n = read(pipefd[0], buf, sizeof(buf));
    buf[n] = '\0';
    printf("Received: %s\n", buf);
    close(pipefd[0]);
    wait(NULL);
}

2.2 共享内存

#include <sys/mman.h>
#include <fcntl.h>

// 创建共享内存
int fd = shm_open("/myshm", O_CREAT | O_RDWR, 0666);
ftruncate(fd, 4096);

// 映射
void *addr = mmap(NULL, 4096, PROT_READ | PROT_WRITE, 
                  MAP_SHARED, fd, 0);

// 使用共享内存
strcpy((char *)addr, "Hello from parent");

pid_t pid = fork();
if (pid == 0) {
    printf("Child: %s\n", (char *)addr);
    exit(0);
}

wait(NULL);
munmap(addr, 4096);
shm_unlink("/myshm");

2.3 信号

#include <signal.h>

volatile sig_atomic_t got_signal = 0;

void handler(int sig) {
    got_signal = 1;
}

int main(void) {
    signal(SIGINT, handler);  // 注册处理函数
    
    printf("Waiting for SIGINT...\n");
    while (!got_signal) {
        pause();  // 等待信号
    }
    
    printf("Got SIGINT!\n");
    return 0;
}

2.4 signalfd (信号文件描述符)

将信号转换为文件描述符事件, 可与 select/poll/epoll 集成:

#include <sys/signalfd.h>
#include <signal.h>

int main(void) {
    sigset_t mask;
    sigemptyset(&mask);
    sigaddset(&mask, SIGINT);
    sigaddset(&mask, SIGTERM);

    // 阻塞信号 (必须, 否则信号会被默认处理)
    sigprocmask(SIG_BLOCK, &mask, NULL);

    // 创建 signalfd
    int sfd = signalfd(-1, &mask, SFD_NONBLOCK);
    if (sfd == -1) {
        perror("signalfd");
        return 1;
    }

    // 可用于 epoll
    struct epoll_event ev = {.events = EPOLLIN, .data.fd = sfd};
    epoll_ctl(epfd, EPOLL_CTL_ADD, sfd, &ev);

    // 读取信号信息
    struct signalfd_siginfo info;
    ssize_t n = read(sfd, &info, sizeof(info));
    if (n == sizeof(info)) {
        printf("Received signal %d from PID %d\n",
               info.ssi_signo, info.ssi_pid);
    }

    close(sfd);
    return 0;
}

signalfd 优势:

  • 统一事件循环: 信号和 I/O 使用相同的 epoll 处理
  • 避免信号处理函数的限制 (异步信号安全)
  • 可获取信号的详细信息 (发送者 PID, uid 等)

2.5 eventfd (事件计数器)

轻量级线程/进程间通知机制:

#include <sys/eventfd.h>

int efd = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);

// 发送事件 (写入)
uint64_t val = 1;
write(efd, &val, sizeof(val));

// 接收事件 (读取)
uint64_t count;
read(efd, &count, sizeof(count));
// EFD_SEMAPHORE: 每次读取减 1
// 非 SEMAPHORE: 读取后清零

// 生产者-消费者示例
void *producer(void *arg) {
    int efd = *(int *)arg;
    for (int i = 0; i < 10; i++) {
        uint64_t val = 1;
        write(efd, &val, sizeof(val));
    }
    return NULL;
}

void *consumer(void *arg) {
    int efd = *(int *)arg;
    uint64_t count;
    while (read(efd, &count, sizeof(count)) > 0) {
        printf("Received %lu events\n", count);
    }
    return NULL;
}

eventfd vs pipe:

特性eventfdpipe
开销1 个 fd2 个 fd
数据计数器 (8 字节)任意字节流
语义事件通知数据传输
性能更高较低

2.6 timerfd (定时器文件描述符)

#include <sys/timerfd.h>

// 创建定时器
int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);

// 设置定时器 (1 秒后首次触发, 之后每 500ms)
struct itimerspec ts = {
    .it_value = {.tv_sec = 1, .tv_nsec = 0},
    .it_interval = {.tv_sec = 0, .tv_nsec = 500000000}
};
timerfd_settime(tfd, 0, &ts, NULL);

// 在 epoll 中等待
struct epoll_event ev = {.events = EPOLLIN, .data.fd = tfd};
epoll_ctl(epfd, EPOLL_CTL_ADD, tfd, &ev);

// 读取过期次数
uint64_t expirations;
read(tfd, &expirations, sizeof(expirations));
printf("Timer expired %lu times\n", expirations);

fd 家族对比:

fd 类型用途替代方案
signalfd信号处理signal(), sigaction()
eventfd事件通知pipe(), condvar
timerfd定时器setitimer(), timer_create()

优势: 统一在 epoll 事件循环中处理所有事件类型.


3. POSIX 线程

3.1 创建线程

#include <pthread.h>

void *thread_func(void *arg) {
    int id = *(int *)arg;
    printf("Thread %d running\n", id);
    return NULL;
}

int main(void) {
    pthread_t threads[5];
    int ids[5];
    
    for (int i = 0; i < 5; i++) {
        ids[i] = i;
        pthread_create(&threads[i], NULL, thread_func, &ids[i]);
    }
    
    for (int i = 0; i < 5; i++) {
        pthread_join(threads[i], NULL);
    }
    
    return 0;
}
gcc -pthread program.c -o program

3.2 线程属性

pthread_attr_t attr;
pthread_attr_init(&attr);

// 设置分离状态
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

// 设置栈大小
pthread_attr_setstacksize(&attr, 1024 * 1024);

pthread_t thread;
pthread_create(&thread, &attr, thread_func, NULL);

pthread_attr_destroy(&attr);

4. 线程同步

4.1 互斥锁

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int counter = 0;

void *increment(void *arg) {
    for (int i = 0; i < 100000; i++) {
        pthread_mutex_lock(&mutex);
        counter++;
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main(void) {
    pthread_t t1, t2;
    pthread_create(&t1, NULL, increment, NULL);
    pthread_create(&t2, NULL, increment, NULL);
    
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    
    printf("Counter: %d\n", counter);  // 200000
    
    pthread_mutex_destroy(&mutex);
    return 0;
}

4.2 条件变量

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int ready = 0;

void *producer(void *arg) {
    pthread_mutex_lock(&mutex);
    ready = 1;
    pthread_cond_signal(&cond);  // 通知
    pthread_mutex_unlock(&mutex);
    return NULL;
}

void *consumer(void *arg) {
    pthread_mutex_lock(&mutex);
    while (!ready) {
        pthread_cond_wait(&cond, &mutex);  // 等待
    }
    printf("Consumer: ready = %d\n", ready);
    pthread_mutex_unlock(&mutex);
    return NULL;
}

4.3 读写锁

pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;

void *reader(void *arg) {
    pthread_rwlock_rdlock(&rwlock);  // 读锁
    // 读取数据
    pthread_rwlock_unlock(&rwlock);
    return NULL;
}

void *writer(void *arg) {
    pthread_rwlock_wrlock(&rwlock);  // 写锁
    // 修改数据
    pthread_rwlock_unlock(&rwlock);
    return NULL;
}

5. 线程安全

5.1 可重入函数

// 不可重入
char *strtok(char *s, const char *delim);

// 可重入
char *strtok_r(char *s, const char *delim, char **saveptr);

5.2 线程局部存储

// 全局变量: 所有线程共享
int global = 0;

// 线程局部变量: 每个线程独立副本
_Thread_local int thread_local = 0;  // C11

// pthread 方式
pthread_key_t key;
pthread_key_create(&key, NULL);
pthread_setspecific(key, value);
void *val = pthread_getspecific(key);

5.3 C11 原子操作

无锁编程的基础:

#include <stdatomic.h>

atomic_int counter = ATOMIC_VAR_INIT(0);

// 原子加载/存储
int val = atomic_load(&counter);
atomic_store(&counter, 42);

// 原子加减
atomic_fetch_add(&counter, 1);  // counter++
atomic_fetch_sub(&counter, 1);  // counter--

// 比较并交换 (CAS)
int expected = 10;
int desired = 20;
if (atomic_compare_exchange_strong(&counter, &expected, desired)) {
    // 成功: counter 从 10 变为 20
} else {
    // 失败: expected 被更新为 counter 当前值
}

内存序 (Memory Order):

// 从弱到强
memory_order_relaxed    // 仅保证原子性
memory_order_consume    // 数据依赖序
memory_order_acquire    // 读屏障
memory_order_release    // 写屏障
memory_order_acq_rel    // 读写屏障
memory_order_seq_cst    // 顺序一致性 (默认, 最强)

// 生产者-消费者示例
atomic_int data_ready = ATOMIC_VAR_INIT(0);
int data = 0;

void producer(void) {
    data = 42;  // 普通写
    atomic_store_explicit(&data_ready, 1, memory_order_release);
}

void consumer(void) {
    while (!atomic_load_explicit(&data_ready, memory_order_acquire))
        ;
    printf("data = %d\n", data);  // 保证看到 42
}

无锁栈示例:

struct Node {
    int data;
    struct Node *next;
};

_Atomic(struct Node *) stack_top = NULL;

void push(int val) {
    struct Node *new_node = malloc(sizeof(struct Node));
    new_node->data = val;
    new_node->next = atomic_load(&stack_top);
    while (!atomic_compare_exchange_weak(&stack_top, 
                                         &new_node->next, 
                                         new_node))
        ;  // CAS 失败则重试
}

原子操作 vs 互斥锁:

特性原子操作互斥锁
开销
适用简单共享数据复杂临界区
死锁不会可能
难度高 (需理解内存序)

6. 练习

6.1 多进程计算

使用 fork 并行计算数组元素之和.

6.2 生产者消费者

使用线程和条件变量实现生产者消费者模型.

6.3 线程池

实现一个简单的线程池.


7. 思考题

  1. fork 后父子进程共享什么?
  2. 为什么需要 exec?
  3. 线程和进程的区别?
  4. 死锁的条件是什么?
  5. 条件变量为什么要和互斥锁一起使用?

8. 本周小结

  • 进程: fork, exec, wait.
  • IPC: 管道, 共享内存, 信号.
  • 线程: pthread_create, pthread_join.
  • 同步: 互斥锁, 条件变量, 读写锁.
  • 线程安全: 可重入, 线程局部存储.

进程和线程是并发编程的基础. 正确使用同步原语, 是编写正确并发程序的关键.

On this page