并发控制

1 c++中的锁和条件变量实现状态同步

#include <queue>
#include <string>
#include <mutex>
#include <thread>
#include <chrono>
#include <iostream>

using std::vector;
using std::string;
using std::queue;
using std::mutex;
using std::thread;

class Message {
 public:

     void Put(const string& msg) {
         std::lock_guard<std::mutex> guard(msgs_lock_);
         msgs_.push(msg);
     }

     bool Get(string& out_msg) {
         std::lock_guard<std::mutex> guard(msgs_lock_);
         if (!msgs_.empty()) {
             out_msg = msgs_.front();
             msgs_.pop();
             return true;
         }
         return false;
     }

 private:
     queue<string> msgs_;
     mutex msgs_lock_;
};

    
Message g_message_mgr;

void produce() {
    for(;;) {
        string msg = "some rand msg";
        g_message_mgr.Put(msg);
        std::this_thread::sleep_for(std::chrono::seconds(2));
    }
}

void cosumer() {
    for(;;) {
        string msg;
        if (g_message_mgr.Get(msg)) {
            // do sth with msg
            std::cout << msg << std::endl;
        }
    }
}

int main() {
    thread t1(produce);
    thread t2(cosumer);
    t1.join();
    t2.join();

    return 0;
}

  • 以上程序是一个生产者和一个消费者典型例子, 运行之后发现程序cpu高达99%.
  • 改善Message类
class Message {
 public:

     void Put(const string& msg) {
         std::unique_lock<std::mutex> guard(msgs_lock_);

         bool need_notify = false;
         if (msgs_.empty()) {
             need_notify = true;
         }

         msgs_.push(msg);

         if (need_notify) {
             cv_.notify_all();
         }
     }

     bool Get(string& out_msg) {
          std::unique_lock<std::mutex> guard(msgs_lock_);
         if (!msgs_.empty()) {
             out_msg = msgs_.front();
             msgs_.pop();
             return true;
         } else {
             cv_.wait(guard, [this]{return msgs_.empty() == false;});
         }
         return false;
     }

 private:
     queue<string> msgs_;
     mutex msgs_lock_;
     condition_variable cv_;
};
  • 改善之后cpu使用率降低为0%
  • 由上可以看出:
    • mutex只是起到了互斥作用, 并不能起到事件通知作用, 造成了在没消息的时候去死循环, 浪费cpu
    • condition_variable条件变量有事件通知以及挂起线程的效果, 生产者在空到非空的事件触发的时候可以同步给消费者, 避免消费者自己死循环的去查找而浪费精力.

2 信号量的同步

从第一节看出是由锁和条件变量来实现状态同步的, 而信号量也可以实现同样的功能, 参考伪代码:

produce:
    P(emptyCount)
    P(useQueue)
    putItemIntoQueue(item)
    V(useQueue)
    V(fullCount)
consume:
    P(fullCount)
    P(useQueue)
    item ← getItemFromQueue()
    V(useQueue)
    V(emptyCount)
  • emptyCount 初始资源大小为 N, fullCount 初始化大小是 0, 并且useQueue is 初始化大小是1.

  • P代表减1, V代表加1

  • 通过P(emptyCount)来检查共享资源, 当资源不够时候线程被挂起, 直到V(emptyCount)释放了资源. 同理fullCount

    以上P和V都是原子操作, 提供了检查资源, 挂起, 通知等功能. 简而言之是一种共享锁(相对互斥锁来说). 当资源总共为1的时候, 同互斥锁.
    信号量也可以实现类似wait功能, 但是参考golang里面的waitgroup, 还是有一些不足, 例如在通知的时候信号量如何做到广播通知所有wait的线程.

3 锁与信号量

锁和信号量都能实现类似的功能, 在一些复杂的场景, 例如多种相互依赖的状态需要同步的时候通过锁和条件变量更方便实现, 但是要注意锁的串行特性, 不要锁住耗时多的代码.


发表评论

电子邮件地址不会被公开。 必填项已用*标注