翼度科技»论坛 云主机 LINUX 查看内容

Linux线程间交互

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
前言

上一篇说过,系统会为线程mmap一块内存,每个线程有自己的私有栈,使用局部变量没啥问题。但是实际场景中不可避免的需要线程之间共享数据,这就需要确保每个线程看到的数据是一样的,如果大家都只需要读这块数据没有问题,但是当有了修改共享区域的需求时就会出现数据不一致的问题。甚至线程2的任务在执行到某个地方的时候,需要线程1先做好准备工作,出现顺序依赖的情况。为了解决这些问题,Linux提供了多种API来适用于不同的场景。
互斥量 mutex

排他的访问共享数据,锁竞争激烈的场景使用。锁竞争不激烈的情况可以使用自旋锁(忙等)
当我们用trace -f 去追踪多线程的时候会看到执行加锁解锁的调用是futex,glibc通过futex(fast user space mutex)实现互斥量。通过FUTEX_WAIT_PRIVATE标志的futex调用内核的futex_wait挂起线程,通过FUTEX_WAKE_PRIVATE的futex调用内核的futex_wake来唤醒等待的线程。这之中glibc做了优化:

  • 加锁时,当前mutex没有被加锁,则直接加锁,不做系统调用,自然不需要做上下文切换。如果已经加锁则需要系统调用futex_wait让内核将线程挂起到等待队列
  • 解锁时,没有其他线程在等待该mutex,直接解锁,不做系统调用。如果有其他线程在等待,则通过系统调用futex_wake唤醒等待队列中的一个线程
初始化互斥量
  1. #include <pthread.h>
  2. // 动态初始化并设置互斥量属性,用完需要销毁
  3. int pthread_mutex_init(pthread_mutex_t *restrict mutex,
  4.                        const pthread_mutexattr_t *restrict attr);
  5. // attr 设置mutex的属性,NULL为使用默认属性
  6. // 返回值:成功返回0,失败返回错误编号
  7. // 静态初始化,无需销毁
  8. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
复制代码
销毁互斥量
  1. // 销毁互斥量
  2. int pthread_mutex_destroy(pthread_mutex_t *mutex);
  3. // 返回值:成功返回0,失败返回错误编号。
  4. //                 如果互斥量是锁定状态,或者正在和条件变量共同使用,销毁会返回EBUSY
复制代码
加锁和解锁


  • 使用pthread_mutex_lock加锁
  1. #include <pthread.h>
  2. // 阻塞
  3. int pthread_mutex_lock(pthread_mutex_t *mutex);
  4. // 返回值:成功返回0,失败返回错误编号
  5. // 非阻塞
  6. int pthread_mutex_trylock(pthread_mutex_t *mutex);
  7. // 返回值:加锁成功直接返回0,加锁失败返回EBUSY
  8. int pthread_mutex_unlock(pthread_mutex_t *mutex);
  9. // 返回值:成功返回0,失败返回错误编号
复制代码
调用状态:

  • 调用时互斥量未锁定,该函数所在线程争取到mutex,返回。
  • 调用时已有其他线程对mutex加锁,则阻塞等待mutex被释放后重新尝试加锁
重复调用问题,即本线程已经对mutex加锁,再次调用加锁操作时,根据互斥量的类型不同会有不同表现:

  • PTHREAD_MUTEX_TIMED_NP:重复加锁导致死锁,该调用线程永久阻塞,并且其他线程无法申请到该mutex
  • PTHREAD_MUTEX_ERRORCHECK_NP:内部记录着调用线程,重复加锁返回EDEADLK,如果解锁的线程不是锁记录的线程,返回EPERM
  • PTHREAD_MUTEX_RECURSIVE_NP:允许重复加锁,锁内部维护着引用计数和调用线程。如果解锁的线程不是锁记录的线程,返回EPERM
  • PTHREAD_MUTEX_ADAPTIVE_NP(自适应锁):先自旋一段时间,自旋的时间由__spins和MAX_ADAPTIVE_COUNT共同决定,自动调整__spin的大小但是不会超过MAX_ADAPTIVE_COUNT。超过自旋时间让出CPU等待,比自旋锁温柔,比normal mutex激进。
设置mutex属性
  1. // 设置mutex为ADAPTER模式
  2. pthread_mutexattr_t mutexattr;
  3. pthread_mutexattr_init(&mutexattr);
  4. pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_ADAPTIVE_NP);
  5. // 获取mutex模式
  6. int kind;
  7. pthread_mutexattr_gettype(&mutexattr, &kind);
  8. if (kind == PTHREAD_MUTEX_ADAPTIVE_NP) {
  9. printf("mutex type is %s", "PTHREAD_MUTEX_ADAPTIVE_NP\n");
  10. }
复制代码
带有超时的mutex
  1. int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
  2. // abstime表示在该时间之前阻塞,不是时间间隔
  3. // 成功返回0,失败返回错误编号,超时返回ETIMIEOUT
复制代码
demo

对已经加锁的mutex继续使用timedlock加锁,timedlock超时返回,之后mutex解锁
  1. #define _DEFAULT_SOURCE 1
  2. #include <errno.h>
  3. #include <pthread.h>
  4. #include <stdio.h>
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include <sys/time.h>
  8. char* now_time(char buf[]) {
  9.   struct timespec abstime;
  10.   abstime.tv_sec = time(0);
  11.   strftime(buf, 1024, "%r", localtime(&abstime.tv_sec));
  12.   return buf;
  13. }
  14. int main() {
  15.   char buf[1024];
  16.   pthread_mutex_t mutex;
  17.   struct timespec abstime;
  18.   pthread_mutex_init(&mutex, NULL);
  19.   pthread_mutex_lock(&mutex);
  20.   char* now = now_time(buf);
  21.   printf("mutex locked, now: %s\n", buf);
  22.   // 设置超时的绝对时间,不设置tv_nsec会返回22,EINVAL
  23.   abstime.tv_sec = time(0) + 10;
  24.   abstime.tv_nsec = 0;
  25.   int ret = pthread_mutex_timedlock(&mutex, &abstime);
  26.   fprintf(stderr, "error %d\n", ret);
  27.   if (ret == ETIMEDOUT) {
  28.     printf("lock mutex timeout\n");
  29.   } else if (ret == 0) {
  30.     printf("lock mutex successfully\n");
  31.   } else if (ret == EINVAL) {
  32.     printf("timedlock param invalid!\n");
  33.   } else {
  34.     printf("other error\n");
  35.   }
  36.   pthread_mutex_unlock(&mutex);
  37.   memset(buf, '\0', 1024);
  38.   now = now_time(buf);
  39.   printf("mutex unlocked, now: %s\n", buf);
  40.   pthread_mutex_destroy(&mutex);
  41.   return 0;
  42. }
  43. // -----------------------------
  44. root@yielde:~/workspace/code-container/cpp/blog_demo# ./test
  45. mutex locked, now: 08:18:34 PM
  46. error 110
  47. lock mutex timeout
  48. mutex unlocked, now: 08:18:44 PM
复制代码
读写锁

读写锁适用于临界区很大并且在大多数情况下读取共享资源,极少数情况下需要写的场景

  • 未加锁:加读、写锁都可以
  • 加读锁:再次尝试加读锁成功,写锁阻塞
  • 加写锁:再次尝试加读、写锁阻塞
常用接口与mutex类似,用的时候查https://man7.org/linux/man-pages/dir_section_3.html,读写锁有两种策略:
  1. PTHREAD_RWLOCK_PREFER_READER_NP, // 读者优先
  2. PTHREAD_RWLOCK_PREFER_WRITER_NP, // 读者优先
  3. PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP, // 写者优先
  4. PTHREAD_RWLOCK_DEFAULT_NP = PTHREAD_RWLOCK_PREFER_READER_NP
  5. // 通过以下函数设置
  6. int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
  7. int pthread_rwlockattr_getkind_np(const pthread_rwlockattr_t *attr, int *pref);
复制代码
读写锁存在的问题:

  • 如果临界区小,锁内部维护的数据结构多于mutex,性能不如mutex
  • 因为有读优先和写优先的策略,使用不当会出现读或写线程饿死的现象
  • 如果是写策略优先,线程1持有读锁,线程2等待加写锁,线程1再次加读锁,就出现了死锁情况
demo

启动5个线程共同对一个变量累加1,使用读写锁让线程并发,用自适应锁对共享变量加锁。
  1. /*
  2.   5个线程对total加1执行指定次数
  3. */
  4. #define _DEFAULT_SOURCE 1  // 处理vscode 未定义 pthread_rwlock_t
  5. #include <pthread.h>
  6. #include <stdio.h>
  7. #include <stdlib.h>
  8. #include <unistd.h>
  9. #define THREAD_COUNT 5
  10. int total = 0;                                      // 最终和
  11. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;  // 初始化互斥量
  12. pthread_rwlock_t rwlock;                            // 读写锁变量
  13. typedef struct param {                              // 线程参数类型
  14.   int count;
  15.   int id;
  16. } param;
  17. void *handler(void *arg) {
  18.   struct param *pa = (struct param *)arg;
  19.   pthread_rwlock_rdlock(&rwlock);  // 当主线程不unlock写锁时,会阻塞在这里
  20.   for (int i = 0; i < pa->count; ++i) {
  21.     pthread_mutex_lock(&mutex);  // 加互斥锁
  22.     ++total;
  23.     pthread_mutex_unlock(&mutex);
  24.   }
  25.   pthread_rwlock_unlock(&rwlock);
  26.   printf("thread %d complete\n", pa->id);
  27.   return NULL;
  28. }
  29. int main(int argc, char *argv[]) {
  30.   if (argc != 2) {
  31.     printf("usage: %s per_thread_loop_count\n", argv[0]);
  32.     return 1;
  33.   }
  34.   // 设置mutex为ADAPTER模式
  35.   pthread_mutexattr_t mutexattr;
  36.   pthread_mutexattr_init(&mutexattr);
  37.   pthread_mutexattr_settype(&mutexattr, PTHREAD_MUTEX_ADAPTIVE_NP);
  38.   // 给handler传参
  39.   int loop_count = atoi(argv[1]);
  40.   // 存放线程id的数组
  41.   pthread_t tid[THREAD_COUNT];
  42.   param pa[THREAD_COUNT];
  43.   pthread_rwlock_init(&rwlock, NULL);  // 动态初始化读写锁
  44.   pthread_rwlock_wrlock(&rwlock);  // 给写加锁,等所有线程创建好后解锁,线程执行
  45.   for (int i = 0; i < THREAD_COUNT; ++i) {  // 创建5个线程
  46.     pa[i].count = loop_count;
  47.     pa[i].id = i;
  48.     pthread_create(&tid[i], NULL, handler, &pa[i]);
  49.   }
  50.   pthread_rwlock_unlock(&rwlock);
  51.   for (int i = 0; i < THREAD_COUNT; ++i) {
  52.     pthread_join(tid[i], NULL);
  53.   }
  54.   pthread_rwlock_destroy(&rwlock);
  55.   printf("thread count: %d\n", THREAD_COUNT);
  56.   printf("per thread loop count: %d\n", loop_count);
  57.   printf("total except: %d\n", loop_count * 5);
  58.   printf("total result: %d\n", total);
  59.   int kind;
  60.   pthread_mutexattr_gettype(&mutexattr, &kind);
  61.   if (kind == PTHREAD_MUTEX_ADAPTIVE_NP) {
  62.     printf("mutex type is %s", "PTHREAD_MUTEX_ADAPTIVE_NP\n");
  63.   }
  64.   return 0;
  65. }
  66. // --------------------------------
  67. root@yielde:~/workspace/code-container/cpp/blog_demo# ./test 2000
  68. thread 2 complete
  69. thread 1 complete
  70. thread 0 complete
  71. thread 3 complete
  72. thread 4 complete
  73. thread count: 5
  74. per thread loop count: 2000
  75. total except: 10000
  76. total result: 10000
  77. mutex type is PTHREAD_MUTEX_ADAPTIVE_NP
复制代码
自旋锁

等待锁的时候不会通知内个将线程挂起,而是忙等。适用于临界区很小,锁被持有的时间很短的情况,相比于互斥锁,节省了上下文切换的开销
线程同步-屏障

barrier可以同步多个线程,允许任意数量的线程等待,直到所有的线程完成工作,然后继续执行
  1. #include <pthread.h>
  2. int pthread_barrier_destroy(pthread_barrier_t *barrier);
  3. // 返回值:成功返回0,失败返回错误号
  4. int pthread_barrier_init(pthread_barrier_t *restrict barrier,
  5.            const pthread_barrierattr_t *restrict attr, unsigned count);
  6. // count指定有多少个线程到达屏障后再继续执行下去
  7. // 返回值:成功返回0,失败返回错误号
  8. int pthread_barrier_wait(pthread_barrier_t *barrier);
  9. // 成功:给一个线程返回PTHREAD_BARRIER_SERIAL_THREAD,其他线程返回0
  10. // 失败返回错误号
复制代码
demo

使用4个线程,每个线程计算1+1+..+1=10,将结果放入数组的一个位置,完成后到达barrier。主线程创建好线程后到达barrier,等四个线程全部完成后,由主线程合计结果
  1. #define _DEFAULT_SOURCE
  2. #include <pthread.h>
  3. #include <stdio.h>
  4. #include <unistd.h>
  5. #define COUNT 10
  6. #define THR_NUM 4
  7. pthread_barrier_t barrier;
  8. long total_arr[THR_NUM] = {0};
  9. void *handler(void *arg) {
  10.   long idx = (long)arg;
  11.   long tmp = 0;
  12.   for (int i = 0; i < COUNT; ++i) {
  13.     ++tmp;
  14.     sleep(1);
  15.   }
  16.   total_arr[idx] = tmp;
  17.   printf("thread %ld complete, count %ld\n", idx, tmp);
  18.   pthread_barrier_wait(&barrier); // 等待在barrier
  19.   return NULL;
  20. }
  21. int main() {
  22.   pthread_t tids[THR_NUM];
  23.   unsigned long total = 0;
  24.   pthread_barrier_init(&barrier, NULL, THR_NUM + 1);  // 包含主线程
  25.   for (long i = 0; i < THR_NUM; ++i) {
  26.     pthread_create(&tids[i], NULL, handler, (void *)i);
  27.   }
  28.   pthread_barrier_wait(&barrier); // 到达barrier
  29.   for (int i = 0; i < THR_NUM; ++i) {
  30.     total += total_arr[i];
  31.   }
  32.   for (int i = 0; i < THR_NUM; ++i) {
  33.     pthread_join(tids[i], NULL);
  34.   }
  35.   pthread_barrier_destroy(&barrier); // 销毁barrier
  36.   printf("total: %lu\n", total);
  37. }
  38. // ---------------------
  39. root@yielde:~/workspace/code-container/cpp/blog_demo# time ./test
  40. thread 2 complete, count 10
  41. thread 0 complete, count 10
  42. thread 3 complete, count 10
  43. thread 1 complete, count 10
  44. total: 40
  45. real    0m10.027s
  46. user    0m0.005s
  47. sys     0m0.003s
复制代码
线程同步-条件变量

如果条件不满足,线程会等待在条件变量上,并且让出mutex,等待其他线程来执行。其他线程执行到条件满足后会发信号唤醒等待的线程。
  1. // 销毁条件变量
  2. int pthread_cond_destroy(pthread_cond_t *cond);
  3. // 初始化条件变量
  4. pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
  5. int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
  6. // 等待条件变量
  7. int pthread_cond_timedwait(pthread_cond_t *restrict cond,
  8.                    pthread_mutex_t *restrict mutex,
  9.                    const struct timespec *restrict abstime);
  10. int pthread_cond_wait(pthread_cond_t *restrict cond,
  11.                    pthread_mutex_t *restrict mutex);
  12. // 通知条件变量满足
  13. int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒所有线程
  14. int pthread_cond_signal(pthread_cond_t *cond); // 至少唤醒1个线程
  15. //返回值成功返回0,失败返回错误号
复制代码
对于 cond_wait,传递mutex保护条件变量,调用线程将锁住的mutex传给函数,函数将调用线程挂起到等待队列上,解锁互斥量。当函数返回时,互斥量再次被锁住。
demo

handler_hello往buf里输入字符串,由handler_print打印
  1. #include <pthread.h>
  2. #include <stdio.h>
  3. #include <string.h>
  4. #include <unistd.h>
  5. pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 初始化互斥量
  6. pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // 初始化条件变量
  7. char buf[8] = {0};
  8. void *handler_hello(void *arg) {
  9.   for (;;) {
  10.     sleep(2);
  11.     pthread_mutex_lock(&mutex);
  12.     sprintf(buf, "%s", "hello !");
  13.     pthread_mutex_unlock(&mutex);
  14.     pthread_cond_signal(&cond); // 唤醒wait的线程
  15.   }
  16.   return NULL;
  17. }
  18. void *handler_print(void *arg) {
  19.   for (;;) {
  20.     pthread_mutex_lock(&mutex);
  21.     while (buf[0] == 0) {
  22.         // 如果buf没有内容就等待,此处将线程挂入队列,然后解锁mutex,等收到handler_hello的signal后返回,加锁mutex
  23.         //
  24.       pthread_cond_wait(&cond, &mutex);
  25.     }
  26.     fprintf(stderr, "%s", buf);
  27.     memset(buf, '\0', 8);
  28.     pthread_mutex_unlock(&mutex);
  29.   }
  30.   return NULL;
  31. }
  32. int main() {
  33.   pthread_t tid1, tid2;
  34.   pthread_create(&tid1, NULL, handler_hello, NULL);
  35.   pthread_create(&tid2, NULL, handler_print, NULL);
  36.   pthread_join(tid1, NULL);
  37.   pthread_join(tid2, NULL);
  38.   printf("%s", buf);
  39.   return 0;
  40. }
  41. // ------------------------
  42. root@yielde:~/workspace/code-container/cpp/blog_demo# ./test
  43. hello !hello !hello !hello !^C
复制代码
学习自:
《UNIX环境高级编程》
《Linux环境编程从应用到内核》高峰 李彬 著

来源:https://www.cnblogs.com/tongh/p/17993658
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具