Photon 1.0.0
Loading...
Searching...
No Matches
mpmc_blocking_q.h
Go to the documentation of this file.
1// Copyright(c) 2015-present, Gabi Melman & spdlog contributors.
2// Distributed under the MIT License (http://opensource.org/licenses/MIT)
3
4#pragma once
5
6// multi producer-multi consumer blocking queue.
7// enqueue(..) - will block until room found to put the new message.
8// enqueue_nowait(..) - will return immediately with false if no room left in
9// the queue.
10// dequeue_for(..) - will block until the queue is not empty or timeout have
11// passed.
12
14
15#include <condition_variable>
16#include <mutex>
17
18namespace spdlog
19{
20 namespace details
21 {
22
23 template <typename T>
25 {
26 public:
27 using item_type = T;
28 explicit mpmc_blocking_queue(size_t max_items)
29 : q_(max_items)
30 {
31 }
32
33#ifndef __MINGW32__
34 // try to enqueue and block if no room left
35 void enqueue(T&& item)
36 {
37 {
38 std::unique_lock<std::mutex> lock(queue_mutex_);
39 pop_cv_.wait(lock, [this] { return !this->q_.full(); });
40 q_.push_back(std::move(item));
41 }
42 push_cv_.notify_one();
43 }
44
45 // enqueue immediately. overrun oldest message in the queue if no room left.
46 void enqueue_nowait(T&& item)
47 {
48 {
49 std::unique_lock<std::mutex> lock(queue_mutex_);
50 q_.push_back(std::move(item));
51 }
52 push_cv_.notify_one();
53 }
54
55 // dequeue with a timeout.
56 // Return true, if succeeded dequeue item, false otherwise
57 bool dequeue_for(T& popped_item, std::chrono::milliseconds wait_duration)
58 {
59 {
60 std::unique_lock<std::mutex> lock(queue_mutex_);
61 if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
62 {
63 return false;
64 }
65 popped_item = std::move(q_.front());
66 q_.pop_front();
67 }
68 pop_cv_.notify_one();
69 return true;
70 }
71
72 // blocking dequeue without a timeout.
73 void dequeue(T& popped_item)
74 {
75 {
76 std::unique_lock<std::mutex> lock(queue_mutex_);
77 push_cv_.wait(lock, [this] { return !this->q_.empty(); });
78 popped_item = std::move(q_.front());
79 q_.pop_front();
80 }
81 pop_cv_.notify_one();
82 }
83
84#else
85 // apparently mingw deadlocks if the mutex is released before cv.notify_one(),
86 // so release the mutex at the very end each function.
87
88 // try to enqueue and block if no room left
89 void enqueue(T&& item)
90 {
91 std::unique_lock<std::mutex> lock(queue_mutex_);
92 pop_cv_.wait(lock, [this] { return !this->q_.full(); });
93 q_.push_back(std::move(item));
94 push_cv_.notify_one();
95 }
96
97 // enqueue immediately. overrun oldest message in the queue if no room left.
98 void enqueue_nowait(T&& item)
99 {
100 std::unique_lock<std::mutex> lock(queue_mutex_);
101 q_.push_back(std::move(item));
102 push_cv_.notify_one();
103 }
104
105 // dequeue with a timeout.
106 // Return true, if succeeded dequeue item, false otherwise
107 bool dequeue_for(T& popped_item, std::chrono::milliseconds wait_duration)
108 {
109 std::unique_lock<std::mutex> lock(queue_mutex_);
110 if (!push_cv_.wait_for(lock, wait_duration, [this] { return !this->q_.empty(); }))
111 {
112 return false;
113 }
114 popped_item = std::move(q_.front());
115 q_.pop_front();
116 pop_cv_.notify_one();
117 return true;
118 }
119
120 // blocking dequeue without a timeout.
121 void dequeue(T& popped_item)
122 {
123 std::unique_lock<std::mutex> lock(queue_mutex_);
124 push_cv_.wait(lock, [this] { return !this->q_.empty(); });
125 popped_item = std::move(q_.front());
126 q_.pop_front();
127 pop_cv_.notify_one();
128 }
129
130#endif
131
133 {
134 std::unique_lock<std::mutex> lock(queue_mutex_);
135 return q_.overrun_counter();
136 }
137
138 size_t size()
139 {
140 std::unique_lock<std::mutex> lock(queue_mutex_);
141 return q_.size();
142 }
143
145 {
146 std::unique_lock<std::mutex> lock(queue_mutex_);
147 q_.reset_overrun_counter();
148 }
149
150 private:
151 std::mutex queue_mutex_;
152 std::condition_variable push_cv_;
153 std::condition_variable pop_cv_;
155 };
156 } // namespace details
157} // namespace spdlog
Definition circular_q.h:16
Definition mpmc_blocking_q.h:25
void enqueue_nowait(T &&item)
Definition mpmc_blocking_q.h:46
size_t overrun_counter()
Definition mpmc_blocking_q.h:132
std::condition_variable push_cv_
Definition mpmc_blocking_q.h:152
size_t size()
Definition mpmc_blocking_q.h:138
void enqueue(T &&item)
Definition mpmc_blocking_q.h:35
void dequeue(T &popped_item)
Definition mpmc_blocking_q.h:73
void reset_overrun_counter()
Definition mpmc_blocking_q.h:144
std::condition_variable pop_cv_
Definition mpmc_blocking_q.h:153
std::mutex queue_mutex_
Definition mpmc_blocking_q.h:151
T item_type
Definition mpmc_blocking_q.h:27
mpmc_blocking_queue(size_t max_items)
Definition mpmc_blocking_q.h:28
spdlog::details::circular_q< T > q_
Definition mpmc_blocking_q.h:154
bool dequeue_for(T &popped_item, std::chrono::milliseconds wait_duration)
Definition mpmc_blocking_q.h:57
Definition async.h:26
annotation details
Definition tag_strings.h:125