Week 16: 进程与线程
掌握进程创建、进程间通信、POSIX 线程与线程同步.
1. 进程基础
1.1 进程概念
进程是程序的运行实例, 包含:
- 代码段、数据段、堆、栈
- 进程 ID (PID)
- 文件描述符表
- 信号处理表
1.2 创建进程 (fork)
#include <unistd.h>
#include <stdio.h>
int main(void) {
pid_t pid = fork();
if (pid == -1) {
perror("fork");
return 1;
} else if (pid == 0) {
// 子进程
printf("Child: PID = %d, Parent PID = %d\n",
getpid(), getppid());
} else {
// 父进程
printf("Parent: PID = %d, Child PID = %d\n",
getpid(), pid);
}
return 0;
}1.3 exec 系列函数
#include <unistd.h>
// 替换当前进程映像
execl("/bin/ls", "ls", "-la", NULL);
execlp("ls", "ls", "-la", NULL); // 使用 PATH
execv("/bin/ls", (char *[]){"/bin/ls", "-la", NULL});
execvp("ls", (char *[]){"ls", "-la", NULL});
// exec 成功后不会返回
perror("exec"); // 只有失败才会执行到这里1.4 等待子进程
#include <sys/wait.h>
pid_t pid = fork();
if (pid == 0) {
// 子进程
exit(42);
} else {
int status;
waitpid(pid, &status, 0);
if (WIFEXITED(status)) {
printf("Exit code: %d\n", WEXITSTATUS(status));
}
}2. 进程间通信 (IPC)
2.1 管道
#include <unistd.h>
int pipefd[2];
pipe(pipefd); // pipefd[0] 读, pipefd[1] 写
pid_t pid = fork();
if (pid == 0) {
// 子进程: 写入
close(pipefd[0]);
write(pipefd[1], "Hello", 5);
close(pipefd[1]);
exit(0);
} else {
// 父进程: 读取
close(pipefd[1]);
char buf[100];
ssize_t n = read(pipefd[0], buf, sizeof(buf));
buf[n] = '\0';
printf("Received: %s\n", buf);
close(pipefd[0]);
wait(NULL);
}2.2 共享内存
#include <sys/mman.h>
#include <fcntl.h>
// 创建共享内存
int fd = shm_open("/myshm", O_CREAT | O_RDWR, 0666);
ftruncate(fd, 4096);
// 映射
void *addr = mmap(NULL, 4096, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
// 使用共享内存
strcpy((char *)addr, "Hello from parent");
pid_t pid = fork();
if (pid == 0) {
printf("Child: %s\n", (char *)addr);
exit(0);
}
wait(NULL);
munmap(addr, 4096);
shm_unlink("/myshm");2.3 信号
#include <signal.h>
volatile sig_atomic_t got_signal = 0;
void handler(int sig) {
got_signal = 1;
}
int main(void) {
signal(SIGINT, handler); // 注册处理函数
printf("Waiting for SIGINT...\n");
while (!got_signal) {
pause(); // 等待信号
}
printf("Got SIGINT!\n");
return 0;
}2.4 signalfd (信号文件描述符)
将信号转换为文件描述符事件, 可与 select/poll/epoll 集成:
#include <sys/signalfd.h>
#include <signal.h>
int main(void) {
sigset_t mask;
sigemptyset(&mask);
sigaddset(&mask, SIGINT);
sigaddset(&mask, SIGTERM);
// 阻塞信号 (必须, 否则信号会被默认处理)
sigprocmask(SIG_BLOCK, &mask, NULL);
// 创建 signalfd
int sfd = signalfd(-1, &mask, SFD_NONBLOCK);
if (sfd == -1) {
perror("signalfd");
return 1;
}
// 可用于 epoll
struct epoll_event ev = {.events = EPOLLIN, .data.fd = sfd};
epoll_ctl(epfd, EPOLL_CTL_ADD, sfd, &ev);
// 读取信号信息
struct signalfd_siginfo info;
ssize_t n = read(sfd, &info, sizeof(info));
if (n == sizeof(info)) {
printf("Received signal %d from PID %d\n",
info.ssi_signo, info.ssi_pid);
}
close(sfd);
return 0;
}signalfd 优势:
- 统一事件循环: 信号和 I/O 使用相同的 epoll 处理
- 避免信号处理函数的限制 (异步信号安全)
- 可获取信号的详细信息 (发送者 PID, uid 等)
2.5 eventfd (事件计数器)
轻量级线程/进程间通知机制:
#include <sys/eventfd.h>
int efd = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
// 发送事件 (写入)
uint64_t val = 1;
write(efd, &val, sizeof(val));
// 接收事件 (读取)
uint64_t count;
read(efd, &count, sizeof(count));
// EFD_SEMAPHORE: 每次读取减 1
// 非 SEMAPHORE: 读取后清零
// 生产者-消费者示例
void *producer(void *arg) {
int efd = *(int *)arg;
for (int i = 0; i < 10; i++) {
uint64_t val = 1;
write(efd, &val, sizeof(val));
}
return NULL;
}
void *consumer(void *arg) {
int efd = *(int *)arg;
uint64_t count;
while (read(efd, &count, sizeof(count)) > 0) {
printf("Received %lu events\n", count);
}
return NULL;
}eventfd vs pipe:
| 特性 | eventfd | pipe |
|---|---|---|
| 开销 | 1 个 fd | 2 个 fd |
| 数据 | 计数器 (8 字节) | 任意字节流 |
| 语义 | 事件通知 | 数据传输 |
| 性能 | 更高 | 较低 |
2.6 timerfd (定时器文件描述符)
#include <sys/timerfd.h>
// 创建定时器
int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
// 设置定时器 (1 秒后首次触发, 之后每 500ms)
struct itimerspec ts = {
.it_value = {.tv_sec = 1, .tv_nsec = 0},
.it_interval = {.tv_sec = 0, .tv_nsec = 500000000}
};
timerfd_settime(tfd, 0, &ts, NULL);
// 在 epoll 中等待
struct epoll_event ev = {.events = EPOLLIN, .data.fd = tfd};
epoll_ctl(epfd, EPOLL_CTL_ADD, tfd, &ev);
// 读取过期次数
uint64_t expirations;
read(tfd, &expirations, sizeof(expirations));
printf("Timer expired %lu times\n", expirations);fd 家族对比:
| fd 类型 | 用途 | 替代方案 |
|---|---|---|
| signalfd | 信号处理 | signal(), sigaction() |
| eventfd | 事件通知 | pipe(), condvar |
| timerfd | 定时器 | setitimer(), timer_create() |
优势: 统一在 epoll 事件循环中处理所有事件类型.
3. POSIX 线程
3.1 创建线程
#include <pthread.h>
void *thread_func(void *arg) {
int id = *(int *)arg;
printf("Thread %d running\n", id);
return NULL;
}
int main(void) {
pthread_t threads[5];
int ids[5];
for (int i = 0; i < 5; i++) {
ids[i] = i;
pthread_create(&threads[i], NULL, thread_func, &ids[i]);
}
for (int i = 0; i < 5; i++) {
pthread_join(threads[i], NULL);
}
return 0;
}gcc -pthread program.c -o program3.2 线程属性
pthread_attr_t attr;
pthread_attr_init(&attr);
// 设置分离状态
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
// 设置栈大小
pthread_attr_setstacksize(&attr, 1024 * 1024);
pthread_t thread;
pthread_create(&thread, &attr, thread_func, NULL);
pthread_attr_destroy(&attr);4. 线程同步
4.1 互斥锁
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int counter = 0;
void *increment(void *arg) {
for (int i = 0; i < 100000; i++) {
pthread_mutex_lock(&mutex);
counter++;
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main(void) {
pthread_t t1, t2;
pthread_create(&t1, NULL, increment, NULL);
pthread_create(&t2, NULL, increment, NULL);
pthread_join(t1, NULL);
pthread_join(t2, NULL);
printf("Counter: %d\n", counter); // 200000
pthread_mutex_destroy(&mutex);
return 0;
}4.2 条件变量
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int ready = 0;
void *producer(void *arg) {
pthread_mutex_lock(&mutex);
ready = 1;
pthread_cond_signal(&cond); // 通知
pthread_mutex_unlock(&mutex);
return NULL;
}
void *consumer(void *arg) {
pthread_mutex_lock(&mutex);
while (!ready) {
pthread_cond_wait(&cond, &mutex); // 等待
}
printf("Consumer: ready = %d\n", ready);
pthread_mutex_unlock(&mutex);
return NULL;
}4.3 读写锁
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
void *reader(void *arg) {
pthread_rwlock_rdlock(&rwlock); // 读锁
// 读取数据
pthread_rwlock_unlock(&rwlock);
return NULL;
}
void *writer(void *arg) {
pthread_rwlock_wrlock(&rwlock); // 写锁
// 修改数据
pthread_rwlock_unlock(&rwlock);
return NULL;
}5. 线程安全
5.1 可重入函数
// 不可重入
char *strtok(char *s, const char *delim);
// 可重入
char *strtok_r(char *s, const char *delim, char **saveptr);5.2 线程局部存储
// 全局变量: 所有线程共享
int global = 0;
// 线程局部变量: 每个线程独立副本
_Thread_local int thread_local = 0; // C11
// pthread 方式
pthread_key_t key;
pthread_key_create(&key, NULL);
pthread_setspecific(key, value);
void *val = pthread_getspecific(key);5.3 C11 原子操作
无锁编程的基础:
#include <stdatomic.h>
atomic_int counter = ATOMIC_VAR_INIT(0);
// 原子加载/存储
int val = atomic_load(&counter);
atomic_store(&counter, 42);
// 原子加减
atomic_fetch_add(&counter, 1); // counter++
atomic_fetch_sub(&counter, 1); // counter--
// 比较并交换 (CAS)
int expected = 10;
int desired = 20;
if (atomic_compare_exchange_strong(&counter, &expected, desired)) {
// 成功: counter 从 10 变为 20
} else {
// 失败: expected 被更新为 counter 当前值
}内存序 (Memory Order):
// 从弱到强
memory_order_relaxed // 仅保证原子性
memory_order_consume // 数据依赖序
memory_order_acquire // 读屏障
memory_order_release // 写屏障
memory_order_acq_rel // 读写屏障
memory_order_seq_cst // 顺序一致性 (默认, 最强)
// 生产者-消费者示例
atomic_int data_ready = ATOMIC_VAR_INIT(0);
int data = 0;
void producer(void) {
data = 42; // 普通写
atomic_store_explicit(&data_ready, 1, memory_order_release);
}
void consumer(void) {
while (!atomic_load_explicit(&data_ready, memory_order_acquire))
;
printf("data = %d\n", data); // 保证看到 42
}无锁栈示例:
struct Node {
int data;
struct Node *next;
};
_Atomic(struct Node *) stack_top = NULL;
void push(int val) {
struct Node *new_node = malloc(sizeof(struct Node));
new_node->data = val;
new_node->next = atomic_load(&stack_top);
while (!atomic_compare_exchange_weak(&stack_top,
&new_node->next,
new_node))
; // CAS 失败则重试
}原子操作 vs 互斥锁:
| 特性 | 原子操作 | 互斥锁 |
|---|---|---|
| 开销 | 低 | 高 |
| 适用 | 简单共享数据 | 复杂临界区 |
| 死锁 | 不会 | 可能 |
| 难度 | 高 (需理解内存序) | 中 |
6. 练习
6.1 多进程计算
使用 fork 并行计算数组元素之和.
6.2 生产者消费者
使用线程和条件变量实现生产者消费者模型.
6.3 线程池
实现一个简单的线程池.
7. 思考题
- fork 后父子进程共享什么?
- 为什么需要 exec?
- 线程和进程的区别?
- 死锁的条件是什么?
- 条件变量为什么要和互斥锁一起使用?
8. 本周小结
- 进程: fork, exec, wait.
- IPC: 管道, 共享内存, 信号.
- 线程: pthread_create, pthread_join.
- 同步: 互斥锁, 条件变量, 读写锁.
- 线程安全: 可重入, 线程局部存储.
进程和线程是并发编程的基础. 正确使用同步原语, 是编写正确并发程序的关键.