C++11实现线程安全的队列
使用C++11提供的互斥锁 std::mutex 和条件变量 std::condition_variable 完成线程安全的队列。
后面进行了测试,启动3个线程从global的队列里读取数据并输出。
见代码。
#include <mutex>
#include <queue>
#include <thread>
#include <iostream>
#include <stdlib.h>
#include <time.h>
template<typename T>
class thread_safe_queue {
private:
mutable std::mutex mtx;
mutable std::condition_variable data_cond;
using queue_type = std::queue<T>;
queue_type data_queue;
public:
using value_type = typename queue_type::value_type;
using container_type = typename queue_type::container_type;
// 默认构造函数
thread_safe_queue() = default;
// 使用容器为参数的构造函数
explicit thread_safe_queue(const container_type& c) : data_queue(c) {}
template<typename _InputIterator>
thread_safe_queue(_InputIterator first, _InputIterator last)
{
for (auto it = first; it != last; ++it) {
data_queue.push(*it);
}
}
// 使用初始化列表为参数的构造函数
thread_safe_queue(std::initializer_list<value_type> list) : thread_safe_queue(list.begin(), list.end()) {}
// 入队列
void push(const value_type& value)
{
std::lock_guard<std::mutex> lk(mtx);
data_queue.push(std::move(value));
data_cond.notify_one();
}
// 出队列
value_type wait_and_pop()
{
std::unique_lock<std::mutex> lk(mtx);
data_cond.wait(lk, [this]{return !this->data_queue.empty();});
auto value = std::move(data_queue.front());
data_queue.pop();
return value;
}
bool try_pop(value_type& value)
{
std::lock_guard<std::mutex> lk(mtx);
if (data_queue.empty())
return false;
value = std::move(data_queue.front());
data_queue.pop();
return true;
}
// 返回队列是否为空
auto empty() const->decltype(data_queue.empty())
{
std::lock_guard<std::mutex> lk(mtx);
return data_queue.empty();
}
// 返回队列元素个数
auto size() const->decltype(data_queue.size())
{
std::lock_guard<std::mutex> lk(mtx);
return data_queue.size();
}
};
const int CONSUMERS_NUMS = 3;
std::thread* consumers[CONSUMERS_NUMS];
//const int PRODUCERS_NUMS = 10;
//std::thread* producers[PRODUCERS_NUMS];
thread_safe_queue<int> numbers;
void produce(int value)
{
numbers.push(value);
}
void init_threads(std::mutex& output_mutex)
{
for (int i = 0; i < CONSUMERS_NUMS; ++i) {
consumers[i] = new std::thread([&output_mutex]() {
while (1) {
int value = numbers.wait_and_pop();
std::lock_guard<std::mutex> lk(output_mutex);
std::cout << "thread " << std::this_thread::get_id() << ", value = " << value << ", value * value = " << value * value << std::endl;
//std::this_thread::sleep_for(std::chrono::microseconds(500));
}
});
consumers[i]->detach();
}
}
int main ()
{
std::mutex output_mutex;
init_threads(output_mutex);
//std::srand(std::time(nullptr));
const int NUM = 10;
std::vector<int> nums;
for (int i = 0; i < NUM; ++i) {
//nums.push_back(std::rand() % 999);
nums.push_back(i);
}
for (int value : nums) {
//while (1) {
//int value;
//std::cin >> value;
produce(value);
std::this_thread::sleep_for(std::chrono::microseconds(500));
}
return 0;
}
/*
output:
thread 0x7000088a8000, value = 0, value * value = 0
thread 0x70000892b000, value = 1, value * value = 1
thread 0x7000089ae000, value = 2, value * value = 4
thread 0x7000088a8000, value = 3, value * value = 9
thread 0x70000892b000, value = 4, value * value = 16
thread 0x7000089ae000, value = 5, value * value = 25
thread 0x7000088a8000, value = 6, value * value = 36
thread 0x70000892b000, value = 7, value * value = 49
thread 0x7000089ae000, value = 8, value * value = 64
thread 0x7000088a8000, value = 9, value * value = 81
*/
goudan-er SHARE · CPP
C++