Server
收到一个客户端 Client_1
发来的连接请求。 listenfd(3)
的 connfd(4)
fork()
创建一个子进程 Child_1
,由后者向 Client_1
提供服务。 Child_1
关闭 listenfd(3)
Server
关闭 connfd(4)
Client_2
发来的连接请求。 listenfd(3)
的 connfd(5)
fork()
创建一个子进程 Child_2
,由后者向 Client_2
提供服务。 Child_2
关闭 listenfd(3)
Server
关闭 connfd(5)
echoserverp.c
#include "csapp.h"
void echo(int connect_fd);
void sigchld_handler(int sig) {
while (waitpid(-1, 0, WNOHANG) > 0)
;
return;
}
int main(int argc, char **argv) {
int listen_fd, connect_fd;
socklen_t client_len;
struct sockaddr_storage client_addr;
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
Signal(SIGCHLD, sigchld_handler);
listen_fd = Open_listenfd(argv[1]);
while (1) {
client_len = sizeof(struct sockaddr_storage);
connect_fd = Accept(listen_fd, (SA *)&client_addr, &client_len);
if (Fork() == 0) {
Close(listen_fd); /* Child closes its listening socket */
echo(connect_fd); /* Child services client */
Close(connect_fd); /* Child closes connection with client */
exit(0); /* Child exits */
}
Close(connect_fd); /* Parent closes connected socket (important!) */
}
}
各进程有独立的虚拟内存空间,既是优点,也是缺点:
select()
在上述方案中,服务端(主进程)不能并发地处理连接请求与键盘输入:
【解决方案】用 select()
实现读写复用 (I/O multiplexing)。
此函数令内核暂停当前进程,直到读取集 (read set) fdset
中的至少一个(文件或套接字)描述符进入可用 (ready) 状态(即读取操作会立即返回)这一事件 (event) 发生,将传入的读取集 fdset
修改为可用集 (ready set),并返回可用描述符的数量。
#include <sys/select.h>
int select(int n/* 集合大小 */, fd_set *fdset, NULL, NULL, NULL);
FD_ZERO(fd_set *fdset); /* Clear all bits in `fdset` */
FD_CLR(int fd, fd_set *fdset); /* Clear bit `fd` in `fdset` */
FD_SET(int fd, fd_set *fdset); /* Turn on bit `fd` in `fdset` */
FD_ISSET(int fd, fd_set *fdset); /* Is bit `fd` in `fdset` on? */
服务端主程序
fdset
,其元素为 stdin
、listen_fd
及现有连接的 connect_fd
s。select()
函数发现有可用 fd
时返回。 stdin
可用,以 command()
处理之。listen_fd
可用,为其创建新的 connect_fd
并加入上述集合。connect_fd
可用,读取信息并响应之。⚠️ 在以下简化的示例中,创建完连接即响应,响应完即关闭连接,故相当于将后两种情形合二为一。将后两种情形分开处理的示例参见 echoservers.c
。
#include "csapp.h"
void echo(int connect_fd);
void command(void) {
char buf[MAXLINE];
if (!Fgets(buf, MAXLINE, stdin))
exit(0); /* EOF */
printf("%s", buf); /* Process the input command */
}
int main(int argc, char **argv) {
int listen_fd, connect_fd;
socklen_t client_len;
struct sockaddr_storage client_addr;
fd_set read_set, ready_set;
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listen_fd = Open_listenfd(argv[1]);
FD_ZERO(&read_set); /* read_set = { } */
FD_SET(STDIN_FILENO, &read_set); /* read_set = { stdin } */
FD_SET(listen_fd, &read_set); /* read_set = { stdin, listen_fd } */
while (1) {
ready_set = read_set;
Select(listen_fd+1, &ready_set, NULL, NULL, NULL);
/* 直到 stdin 或 listen_fd 可用 */
if (FD_ISSET(STDIN_FILENO, &ready_set)) {
/* stdin 可用,响应键盘输入 */
command();
}
if (FD_ISSET(listen_fd, &ready_set)) {
/* listen_fd 可用,响应连接请求 */
client_len = sizeof(struct sockaddr_storage);
connect_fd = Accept(listen_fd, (SA *)&client_addr, &client_len);
echo(connect_fd); /* 可优化为 echo_at_most_one_line() */
Close(connect_fd);
}
}
}
echoservers.c
状态机 (state machine):服务端由 accept()
收到客户端 Client_k
发来的请求后,得到新的描述符 d_k
,通过 add_client()
将其加入 pool_t
型容器。
d_k
可用。select()
检测到 d_k
可用。d_k
读取一行,通过 check_clients()
实现。pool_t
typedef struct { /* Represents a pool of connected descriptors */
int max_fd; /* Largest descriptor in read_set */
fd_set read_set; /* Set of all active descriptors */
fd_set ready_set; /* Subset of descriptors ready for reading */
int n_ready; /* Number of ready descriptors from select */
int max_i; /* Highwater index into client array */
int client_fd[FD_SETSIZE]; /* Set of active descriptors */
rio_t client_rio[FD_SETSIZE]; /* Set of active read buffers */
} pool_t;
void init_pool(int listen_fd, pool_t *p) {
/* Initially, there are no connected descriptors */
int i;
p->max_i = -1;
for (i = 0; i < FD_SETSIZE; i++)
p->client_fd[i] = -1;
/* Initially, listen_fd is the only member of read_set */
p->max_fd = listen_fd;
FD_ZERO(&p->read_set);
FD_SET(listen_fd, &p->read_set);
}
main()
int byte_count = 0; /* Counts total bytes received by server */
int main(int argc, char **argv) {
int listen_fd, connect_fd;
socklen_t client_len;
struct sockaddr_storage client_addr;
static pool_t pool;
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listen_fd = Open_listenfd(argv[1]);
init_pool(listen_fd, &pool);
while (1) {
/* Wait for listening/connected descriptor(s) to become ready */
pool.ready_set = pool.read_set;
pool.n_ready = Select(pool.max_fd+1, &pool.ready_set, NULL, NULL, NULL);
/* If listening descriptor ready, add new client to pool */
if (FD_ISSET(listen_fd, &pool.ready_set)) {
client_len = sizeof(struct sockaddr_storage);
connect_fd = Accept(listen_fd, (SA *)&client_addr, &client_len);
add_client(connect_fd, &pool);
}
/* Echo a text line from each ready connected descriptor */
check_clients(&pool);
}
}
add_client()
void add_client(int connect_fd, pool_t *p) {
int i;
p->n_ready--;
for (i = 0; i < FD_SETSIZE; i++) {
if (p->client_fd[i] < 0) { /* Find an available slot */
/* Add connect_fd to the pool */
p->client_fd[i] = connect_fd;
Rio_readinitb(&p->client_rio[i], connect_fd);
FD_SET(connect_fd, &p->read_set); /* Add connect_fd to read_set */
/* Update max_fd and max_i */
if (connect_fd > p->max_fd)
p->max_fd = connect_fd;
if (i > p->max_i)
p->max_i = i;
break;
}
}
if (i == FD_SETSIZE) /* Couldn't find an empty slot */
app_error("add_client error: Too many clients");
}
check_clients()
void check_clients(pool_t *p) {
int i, connect_fd, n;
char buf[MAXLINE];
rio_t rio;
for (i = 0; (i <= p->max_i) && (p->n_ready > 0); i++) {
connect_fd = p->client_fd[i];
rio = p->client_rio[i];
/* If the descriptor is ready, echo a text line from it */
if ((connect_fd > 0) && (FD_ISSET(connect_fd, &p->ready_set))) {
p->n_ready--;
if ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
byte_count += n;
printf("Server received %d (%d total) bytes on fd %d\n",
n, byte_count, connect_fd);
Rio_writen(connect_fd, buf, n);
}
else { /* EOF detected, remove descriptor from pool */
Close(connect_fd);
FD_CLR(connect_fd, &p->read_set);
p->client_fd[i] = -1;
}
}
}
}
线程 (thread):运行在某个进程中的一条逻辑控制流。
线程执行模型与进程执行模型类似:
进程上下文切换 | 线程上下文切换 |
---|---|
但有以下区别:
pthread
#include "csapp.h"
void *f(void *vargp) { /* thread routine */
printf("Hello, world!\n");
return NULL;
}
int main() {
pthread_t tid;
Pthread_create(&tid, NULL, thread, NULL); /* 创建同伴线程,在其中运行 f() */
Pthread_join(tid, NULL); /* 直到同伴线程结束后才返回 */
exit(0);
}
f()
只能接收与返回 void*
,若要传入或返回多个参数,需借助 struct
。 在 f()
调用的函数必须是线程安全的。
#include <pthread.h>
typedef void *(func)(void *);
int pthread_create(pthread_t *tid, pthread_attr_t *attr/* NULL 表示默认属性 */,
func *f, void *arg/* 传给 f() 的实参 */);
pthread_t pthread_self(void); /* 返回当前线程的 TID */
结束线程的几种方式:
pthread_create()
的 f()
运行完毕并返回。pthread_exit()
结束当前线程。pthread_cancel()
而结束。exit()
结束整个进程。#include <pthread.h>
void pthread_exit(void *thread_return);
int pthread_cancel(pthread_t tid);
pthread_join()
#include <pthread.h>
int pthread_join(pthread_t tid, void **thread_return);
与收割子进程的 waitpid()
类似,但 pthread_join()
只能收割给定的线程。
pthread_detach()
任何线程总是处于以下两种状态之一:
pthread_join()
或 pthread_cancel()
时才被释放。(默认)为避免内存泄漏,任何 joinable 线程都应当被其他线程调用 pthread_join()
或 pthread_cancel()
以释放资源,或调用以下函数使当前线程转为 detached 状态:
#include <pthread.h>
int pthread_detach(pthread_t tid);
/* 常用:分离当前线程 */
pthread_detach(pthread_self());
pthread_once()
#include <pthread.h>
pthread_once_t once_control = PTHREAD_ONCE_INIT;
int pthread_once(pthread_once_t *once_control,
void (*init_routine)(void));
pthread_once()
会运行 init_routine()
以初始化全局变量。once_control
再次调用 pthread_once()
不会做任何事。echoservert.c
#include "csapp.h"
void echo(int connect_fd);
void *f(void *vargp) { /* Thread routine */
int connect_fd = *((int *)vargp);
Pthread_detach(pthread_self());
Free(vargp); /* Malloc'ed in main thread */
echo(connect_fd);
Close(connect_fd);
return NULL;
}
int main(int argc, char **argv) {
int listen_fd, *connect_fd_ptr;
socklen_t client_len;
struct sockaddr_storage client_addr;
pthread_t tid;
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listen_fd = Open_listenfd(argv[1]);
while (1) {
client_len = sizeof(struct sockaddr_storage);
connect_fd_ptr = Malloc(sizeof(int));
/* 若用主线程栈中的 connect_fd,会造成两个同伴线程的竞争 */
*connect_fd_ptr = Accept(listen_fd, (SA *)&client_addr, &client_len);
Pthread_create(&tid, NULL, f, connect_fd_ptr);
}
}
共享变量 (shared variable):被多个线程(直接或间接)访问的变量。
各线程 conceptually 独享其栈区,但栈区属于虚拟内存,故 operationally 可以共享。
#include "csapp.h"
char **ptr; /* 全局变量 in 数据读写区,直接共享 */
void *thread(void *vargp) {
int i = (int)vargp; /* 局部自动变量 in 该线程栈区,不被共享 */
static int count = 0; /* 局部静态变量 in 数据读写区,直接共享 */
printf("msgs[%d]: %s (count=%d)\n", i, ptr[i], ++count);
return NULL;
}
int main() {
int i;
pthread_t tid;
char *msgs[2] = { /* 局部自动变量 in 主线程栈区,间接共享 */
"Hello from foo", "Hello from bar"
};
ptr = msgs;
for (i = 0; i < 2; i++)
Pthread_create(&tid, NULL, thread, (void *)i);
Pthread_exit(NULL);
}
一般而言,无法预知各线程被操作系统选中的执行顺序。
假设 cnt
为一内存变量(与整个生命期在寄存器中度过的寄存器变量相对):
进展图 (progress graph):
进展图有助于理解以下概念:
信号量 (semaphore):用于同步 (synchronize) 并发程序的整型全局变量 s
,只能被以下两种操作修改(由🇳🇱计算机科学家 Dijkstra 发明,故以🇳🇱语命名)
P(&s)
】🇳🇱proberen🇨🇳检测 s != 0
,则 return --s
,此过程不会被打断(具有原子性)。s == 0
,则暂停当前线程,直到被 V(&s)
重启,再 return --s
。V(&s)
】🇳🇱verhogen🇨🇳增加 s
、增加 s
、存储 s
,此过程不会被打断(具有原子性)。P(&s)
中等待,则重启其中任意一个。前 P 后 V:
P(&s)
与 V(&s)
之间。s >= 0
,则该条件始终成立。POSIX semaphores(不属于 POSIX threads):
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared/* 通常为 0 */, unsigned int v/* 通常为 1 */);
int sem_wait(sem_t *s); /* P(&s) */
int sem_post(sem_t *s); /* V(&s) */
本课程封装的接口:
#include "csapp.h"
void P(sem_t *s); /* Wrapper function for sem_wait */
void V(sem_t *s); /* Wrapper function for sem_post */
二项信号量 (binary semaphore):为每个共享变量关联一个初值为 1
的信号量 s
,用 P(&s)
及 V(&s)
包围关键段。
P(&s)
,以阻止其他线程访问。V(&s)
,以允许其他线程访问。s < 0
的区域,略大于不安全区域。#include "csapp.h"
volatile long cnt = 0; /* global counter */
sem_t mutex; /* semaphore that protects `cnt` */
void *thread(void *vargp) {
long n_iters = *((long *)vargp);
for (long i = 0; i < n_iters; i++) {
P(&mutex); /* 上锁 */
cnt++;
V(&mutex); /* 开锁 */
}
return NULL;
}
int main(int argc, char **argv) {
long n_iters;
pthread_t tid1, tid2;
/* Check input argument */
if (argc != 2) {
printf("usage: %s <n_iters>\n", argv[0]);
exit(0);
}
n_iters = atoi(argv[1]);
Sem_init(&mutex, 0, 1);
/* Create threads and wait for them to finish */
Pthread_create(&tid1, NULL, thread, &n_iters);
Pthread_create(&tid2, NULL, thread, &n_iters);
Pthread_join(tid1, NULL);
Pthread_join(tid2, NULL);
/* Check result */
if (cnt != (2 * n_iters))
printf("BOOM! cnt=%ld\n", cnt);
else
printf("OK cnt=%ld\n", cnt);
exit(0);
}
计数信号量 (counting semaphore):其值表示当前可用资源个数的信号量。
有界缓冲区 (bounded buffer):容量有限、含有若干项目 (item) 的共享数据结构。
typedef struct {
/* a FIFO queue */
int *buf; /* Buffer array */
int n; /* Maximum number of slots */
int front; /* buf[(front+1)%n] is first item */
int rear; /* buf[rear%n] is last item */
/* mutex and counting semaphores */
sem_t mutex; /* Protects accesses to the queue */
sem_t slots; /* Counts available slots */
sem_t items; /* Counts available items */
} sbuf_t;
/* Create an empty, bounded, shared FIFO buffer with n slots */
void sbuf_init(sbuf_t *sp, int n) {
sp->buf = Calloc(n, sizeof(int));
sp->n = n; /* Buffer holds max of n items */
sp->front = sp->rear = 0; /* Empty buffer iff front == rear */
Sem_init(&sp->mutex, 0, 1); /* Binary semaphore for locking */
Sem_init(&sp->slots, 0, n); /* Initially, buf has n empty slots */
Sem_init(&sp->items, 0, 0); /* Initially, buf has zero data items */
}
/* Clean up buffer sp */
void sbuf_deinit(sbuf_t *sp) {
Free(sp->buf);
}
/* Insert item onto the rear of shared buffer sp */
void sbuf_insert(sbuf_t *sp, int item) {
P(&sp->slots); /* Wait for available slot */
P(&sp->mutex); /* Lock the buffer */
sp->buf[(++sp->rear)%(sp->n)] = item; /* Insert the item */
V(&sp->mutex); /* Unlock the buffer */
V(&sp->items); /* Announce available item */
}
/* Remove and return the first item from buffer sp */
int sbuf_remove(sbuf_t *sp) {
int item;
P(&sp->items); /* Wait for available item */
P(&sp->mutex); /* Lock the buffer */
item = sp->buf[(++sp->front)%(sp->n)]; /* Remove the item */
V(&sp->mutex); /* Unlock the buffer */
V(&sp->slots); /* Announce available slot */
return item;
}
/* 第一类读写问题解决方案 */
int readcnt; /* 初值为 0 */
sem_t mutex_readcnt, mutex_write; /* 初值均为 1 */
void reader(void) {
while (1) {
P(&mutex_readcnt);
readcnt++;
if (readcnt == 1)
P(&mutex_write); /* 第一个 reader 负责上锁 */
V(&mutex_readcnt);
/* mutex_write 控制的关键段:多个线程可以并发读取 */
P(&mutex_readcnt);
readcnt--;
if (readcnt == 0)
V(&mutex_write); /* 最后一个 reader 负责开锁 */
V(&mutex_readcnt);
}
}
void writer(void) {
while (1) {
P(&mutex_write);
/* mutex_write 控制的关键段:至多一个 writer 在写 */
V(&mutex_write);
}
}
POSIX threads 提供了读写锁机制。
echoservert-pre.c
echoservert.c
在收到客户端请求后,创建新线程。 为减少频繁创建线程的开销,可采用以下预先创建线程 (prethreaded) 方案:
#include "csapp.h"
#include "sbuf.h"
#define NTHREADS 4
#define SBUFSIZE 16
void echo_cnt(int connfd);
void *thread(void *vargp);
sbuf_t sbuf; /* Shared buffer of connected descriptors */
int main(int argc, char **argv) {
int i, listen_fd, connect_fd;
socklen_t client_len;
struct sockaddr_storage client_addr;
pthread_t tid;
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listen_fd = Open_listenfd(argv[1]);
sbuf_init(&sbuf, SBUFSIZE);
for (i = 0; i < NTHREADS; i++) /* Create worker threads */
Pthread_create(&tid, NULL, thread, NULL);
while (1) {
client_len = sizeof(struct sockaddr_storage);
connect_fd = Accept(listen_fd, (SA *)&client_addr, &client_len);
sbuf_insert(&sbuf, connect_fd); /* Insert connect_fd in buffer */
}
}
void *thread(void *vargp) {
Pthread_detach(pthread_self());
while (1) {
int connect_fd = sbuf_remove(&sbuf); /* Remove connect_fd from buffer */
echo_cnt(connect_fd); /* Provide service to the client */
Close(connect_fd);
}
}
echo_cnt.c
echo_cnt()
用到了 pthread_once()
:
#include "csapp.h"
static int byte_cnt; /* Byte counter ... */
static sem_t mutex; /* and the mutex that protects it */
static void init_echo_cnt(void) {
Sem_init(&mutex, 0, 1);
byte_cnt = 0;
}
void echo_cnt(int connect_fd) {
int n;
char buf[MAXLINE];
rio_t rio;
static pthread_once_t once = PTHREAD_ONCE_INIT;
Pthread_once(&once, init_echo_cnt); /* 初始化,只运行一次 */
Rio_readinitb(&rio, connect_fd);
while((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) {
P(&mutex);
byte_cnt += n;
printf("server received %d (%d total) bytes on fd %d\n",
n, byte_cnt, connect_fd);
V(&mutex);
Rio_writen(connect_fd, buf, n);
}
}
pthread_mutex_t
详见 4.9 Mutexes in Shared-memory programming with Pthreads。
#include <pthread.h>
// Without static initialization
static pthread_once_t foo_once = PTHREAD_ONCE_INIT;
static pthread_mutex_t foo_mutex;
void foo_init() {
pthread_mutex_init(&foo_mutex, NULL);
}
void foo() {
pthread_once(&foo_once, foo_init);
pthread_mutex_lock(&foo_mutex);
/* critical section */
pthread_mutex_unlock(&foo_mutex);
}
// With static initialization, the same routine could be coded as
static pthread_mutex_t foo_mutex = PTHREAD_MUTEX_INITIALIZER;
void foo() {
pthread_mutex_lock(&foo_mutex);
/* critical section */
pthread_mutex_unlock(&foo_mutex);
}
int main() {
/* use foo() */
pthread_mutex_destroy(&foo_mutex);
}
pthread_rwlock_t
详见 4.9 Read–write locks in Shared-memory programming with Pthreads。
#include <pthread.h>
// With static initialization:
static pthread_rwlock_t foo_rwlock = PTHREAD_RWLOCK_INITIALIZER;
void foo() {
pthread_rwlock_rdlock(&foo_rwlock);
/* critical section for reading */
pthread_rwlock_unlock(&foo_rwlock);
/* ... */
pthread_rwlock_wrlock(&foo_rwlock);
/* critical section for writing */
pthread_rwlock_unlock(&foo_rwlock);
}
int main() {
/* use foo() */
pthread_rwlock_destroy(&foo_rwlock);
}
pthread_cond_t
详见 4.6.3 Condition variables in Shared-memory programming with Pthreads。
#include <pthread.h>
static int counter = 0;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond_var = PTHREAD_COND_INITIALIZER;
void *thread() {
/* ... */
/* Barrier */
pthread_mutex_lock(&mutex);
counter++;
if (counter == thread_count) {
counter = 0;
pthread_cond_broadcast(&cond_var);
/* 或调用 (thread_count - 1) 次 pthread_cond_signal() */
} else {
while (pthread_cond_wait(&cond_var, &mutex));
/* 返回值非负,表示被 pthread_cond_broadcast()
或 pthread_cond_signal() 以外的事件唤醒 */
}
pthread_mutex_unlock(&mutex);
/* ... */
}
并行程序 (parallel program):运行在多核处理器 (multi-core processor) 上的并发程序。
【通用技巧】将原问题划分为若干子问题,各线程依据其 tid
计算相应的子问题。
psum-mutex.c
原始思路:
#include "csapp.h"
#define MAXTHREADS 32
long gsum = 0; /* Global sum */
long nelems_per_thread; /* Number of elements to sum */
sem_t mutex; /* Mutex to protect global sum */
void *sum_mutex(void *vargp) {
long myid = *((long *)vargp); /* Extract the thread ID */
long start = myid * nelems_per_thread; /* Start element index */
long end = start + nelems_per_thread; /* End element index */
for (long i = start; i < end; i++) {
P(&mutex); gsum += i; V(&mutex); /* 同步次数太多 ⚠️ */
}
return NULL;
}
int main(int argc, char **argv) {
long i, nelems, log_nelems, nthreads, myid[MAXTHREADS];
pthread_t tid[MAXTHREADS];
nelems_per_thread = /* ... */
sem_init(&mutex, 0, 1);
/* Create peer threads and wait for them to finish */
for (i = 0; i < nthreads; i++) {
myid[i] = i;
Pthread_create(&tid[i], NULL, sum_mutex, &myid[i]);
}
for (i = 0; i < nthreads; i++)
Pthread_join(tid[i], NULL);
/* ... */
}
【重要结论】同步开销很昂贵,应当尽量避免;若不能避免,则单次同步应当分摊 (amortize) 尽可能多的计算量。
psum-array.c
初步改进的思路:
#include "csapp.h"
#define MAXTHREADS 32
long psum[MAXTHREADS]; /* Partial sum computed by each thread */
long nelems_per_thread; /* Number of elements summed by each thread */
void *sum_array(void *vargp) {
long myid = *((long *)vargp); /* Extract the thread ID */
long start = myid * nelems_per_thread; /* Start element index */
long end = start + nelems_per_thread; /* End element index */
for (long i = start; i < end; i++)
psum[myid] += i; /* 不必同步,但访存太多 ⚠️ */
return NULL;
}
int main(int argc, char **argv) {
long i, nelems, log_nelems, nthreads, myid[MAXTHREADS];
pthread_t tid[MAXTHREADS];
nelems_per_thread = /* ... */
/* Create peer threads and wait for them to finish */
for (i = 0; i < nthreads; i++) {
myid[i] = i;
Pthread_create(&tid[i], NULL, sum_array, &myid[i]);
}
for (i = 0; i < nthreads; i++)
Pthread_join(tid[i], NULL);
/* Add up the partial sums computed by each thread */
for (i = 0; i < nthreads; i++)
result += psum[i];
/* ... */
}
psum-local.c
进一步改进的思路:
/* 同 psum-array.c */
void *sum_array(void *vargp) {
/* 同 psum-array.c */
long sum = 0; /* 寄存器变量 */
for (long i = start; i < end; i++)
sum += i; /* 不必同步,不必访存 */
psum[myid] = sum; /* 只访存一次 */
return NULL;
}
/* 同 psum-array.c */
【实验现象】核心数量 n_cores
等于线程数量 n_threads
时,加速效果最好。
性能指标
可扩展性
线程安全 (thread-safe):反复运行并发的多线程函数总是给出相同结果。
以下函数不是线程安全的:
ctime()
, gethostname()
将不安全函数封装在锁内,在其中将结果 deep copy 到私有内存。
/* Demo of lock-and-copy: a thread-safe wrapper of
char *ctime(const time_t *timep)
*/
char *ctime_ts(const time_t *timep, char *privatep) {
char *sharedp;
P(&mutex);
sharedp = ctime(timep);
strcpy(privatep, sharedp);
V(&mutex);
return privatep;
}
再入函数 (reentrant function):被多个线程调用时,不访问共享变量。
/* rand_r - return a pseudorandom integer in [0, 32768) */
int rand_r(unsigned int *nextp/* 指向调用侧的私有数据 */) {
*nextp = *nextp * 1103515245 + 12345;
return (unsigned int)(*nextp / 65536) % 32768;
}
大多数 C 标准库函数是线程安全的。部分不安全的有再入版本(以 _r
为后缀):
rand()
strtok()
asctime()
, ctime()
, localtime()
gethostbyaddr()
, gethostbyname()
inet_ntoa()
⚠️ 无再入版本竞争 (race):多个线程同一段时间内访问同一块内存,且至少一个线程在改写其中的内容。
死锁 (deadlock):某些被暂停的线程等待着不可能发生的事件。
现代处理器利用高速缓存来缓解处理器与存储器之间的速度差异。
多线程共享同一端主存,但不同处理核心有各自独立的缓存。
若线程 A 向某地址(缓存 A)写出数据后,线程 B 尝试从该地址(缓存 B)读取数据,则硬件会将缓存 B 标记为失效,并强制线程 B 重新从主存读取数据。
缓存一致性以缓存线 (cache line) 为单位。