userver: userver/concurrent/queue.hpp Source File
Loading...
Searching...
No Matches
queue.hpp
1#pragma once
2
3#include <atomic>
4#include <limits>
5#include <memory>
6
7#include <moodycamel/concurrentqueue.h>
8
9#include <userver/concurrent/impl/semaphore_capacity_control.hpp>
10#include <userver/concurrent/queue_helpers.hpp>
11#include <userver/engine/deadline.hpp>
12#include <userver/engine/semaphore.hpp>
13#include <userver/engine/single_consumer_event.hpp>
14#include <userver/engine/task/cancel.hpp>
15#include <userver/utils/assert.hpp>
16#include <userver/utils/atomic.hpp>
17
18USERVER_NAMESPACE_BEGIN
19
20namespace concurrent {
21
22namespace impl {
23
24template <bool MultipleProducer, bool MultipleConsumer>
25struct SimpleQueuePolicy {
26 template <typename T>
27 static constexpr std::size_t GetElementSize(const T&) {
28 return 1;
29 }
30
31 static constexpr bool kIsMultipleProducer{MultipleProducer};
32 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
33};
34
35template <bool MultipleProducer, bool MultipleConsumer>
36struct ContainerQueuePolicy {
37 template <typename T>
38 static std::size_t GetElementSize(const T& value) {
39 return std::size(value);
40 }
41
42 static constexpr bool kIsMultipleProducer{MultipleProducer};
43 static constexpr bool kIsMultipleConsumer{MultipleConsumer};
44};
45
46} // namespace impl
47
48/// Queue with single and multi producer/consumer options
49///
50/// @see @ref scripts/docs/en/userver/synchronization.md
51template <typename T, typename QueuePolicy>
52class GenericQueue final
53 : public std::enable_shared_from_this<GenericQueue<T, QueuePolicy>> {
54 struct EmplaceEnabler final {
55 // Disable {}-initialization in Queue's constructor
56 explicit EmplaceEnabler() = default;
57 };
58
59 using ProducerToken =
60 std::conditional_t<QueuePolicy::kIsMultipleProducer,
61 moodycamel::ProducerToken, impl::NoToken>;
62 using ConsumerToken =
63 std::conditional_t<QueuePolicy::kIsMultipleProducer,
64 moodycamel::ConsumerToken, impl::NoToken>;
65 using MultiProducerToken = impl::MultiToken;
66 using MultiConsumerToken =
67 std::conditional_t<QueuePolicy::kIsMultipleProducer, impl::MultiToken,
68 impl::NoToken>;
69
70 using SingleProducerToken =
71 std::conditional_t<!QueuePolicy::kIsMultipleProducer,
72 moodycamel::ProducerToken, impl::NoToken>;
73
74 friend class Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
75 friend class Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
76 friend class Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
77 friend class Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
78
79 public:
80 using ValueType = T;
81
82 using Producer =
83 concurrent::Producer<GenericQueue, ProducerToken, EmplaceEnabler>;
84 using Consumer =
85 concurrent::Consumer<GenericQueue, ConsumerToken, EmplaceEnabler>;
86 using MultiProducer =
87 concurrent::Producer<GenericQueue, MultiProducerToken, EmplaceEnabler>;
88 using MultiConsumer =
89 concurrent::Consumer<GenericQueue, MultiConsumerToken, EmplaceEnabler>;
90
91 static constexpr std::size_t kUnbounded =
92 std::numeric_limits<std::size_t>::max() / 4;
93
94 /// @cond
95 // For internal use only
96 explicit GenericQueue(std::size_t max_size, EmplaceEnabler /*unused*/)
97 : queue_(),
98 single_producer_token_(queue_),
99 producer_side_(*this, std::min(max_size, kUnbounded)),
100 consumer_side_(*this) {}
101
102 ~GenericQueue() {
103 UASSERT(consumers_count_ == kCreatedAndDead || !consumers_count_);
104 UASSERT(producers_count_ == kCreatedAndDead || !producers_count_);
105
106 if (producers_count_ == kCreatedAndDead) {
107 // To allow reading the remaining items
108 consumer_side_.ResumeBlockingOnPop();
109 }
110
111 // Clear remaining items in queue
112 T value;
113 ConsumerToken token{queue_};
114 while (consumer_side_.PopNoblock(token, value)) {
115 }
116 }
117
118 GenericQueue(GenericQueue&&) = delete;
119 GenericQueue(const GenericQueue&) = delete;
120 GenericQueue& operator=(GenericQueue&&) = delete;
121 GenericQueue& operator=(const GenericQueue&) = delete;
122 /// @endcond
123
124 /// Create a new queue
125 static std::shared_ptr<GenericQueue> Create(
126 std::size_t max_size = kUnbounded) {
127 return std::make_shared<GenericQueue>(max_size, EmplaceEnabler{});
128 }
129
130 /// Get a `Producer` which makes it possible to push items into the queue.
131 /// Can be called multiple times. The resulting `Producer` is not thread-safe,
132 /// so you have to use multiple Producers of the same queue to simultaneously
133 /// write from multiple coroutines/threads.
134 ///
135 /// @note `Producer` may outlive the queue and consumers.
137 PrepareProducer();
138 return Producer(this->shared_from_this(), EmplaceEnabler{});
139 }
140
141 /// Get a `MultiProducer` which makes it possible to push items into the
142 /// queue. Can be called multiple times. The resulting `MultiProducer` is
143 /// thread-safe, so it can be used simultaneously from multiple
144 /// coroutines/threads.
145 ///
146 /// @note `MultiProducer` may outlive the queue and consumers.
147 ///
148 /// @note Prefer `Producer` tokens when possible, because `MultiProducer`
149 /// token incurs some overhead.
150 MultiProducer GetMultiProducer() {
151 static_assert(QueuePolicy::kIsMultipleProducer,
152 "Trying to obtain MultiProducer for a single-producer queue");
153 PrepareProducer();
154 return MultiProducer(this->shared_from_this(), EmplaceEnabler{});
155 }
156
157 /// Get a `Consumer` which makes it possible to read items from the queue.
158 /// Can be called multiple times. The resulting `Consumer` is not thread-safe,
159 /// so you have to use multiple `Consumer`s of the same queue to
160 /// simultaneously write from multiple coroutines/threads.
161 ///
162 /// @note `Consumer` may outlive the queue and producers.
164 PrepareConsumer();
165 return Consumer(this->shared_from_this(), EmplaceEnabler{});
166 }
167
168 /// Get a `MultiConsumer` which makes it possible to read items from the
169 /// queue. Can be called multiple times. The resulting `MultiConsumer` is
170 /// thread-safe, so it can be used simultaneously from multiple
171 /// coroutines/threads.
172 ///
173 /// @note `MultiConsumer` may outlive the queue and producers.
174 ///
175 /// @note Prefer `Consumer` tokens when possible, because `MultiConsumer`
176 /// token incurs some overhead.
178 static_assert(QueuePolicy::kIsMultipleConsumer,
179 "Trying to obtain MultiConsumer for a single-consumer queue");
180 PrepareConsumer();
181 return MultiConsumer(this->shared_from_this(), EmplaceEnabler{});
182 }
183
184 /// @brief Sets the limit on the queue size, pushes over this limit will block
185 /// @note This is a soft limit and may be slightly overrun under load.
186 void SetSoftMaxSize(std::size_t max_size) {
187 producer_side_.SetSoftMaxSize(std::min(max_size, kUnbounded));
188 }
189
190 /// @brief Gets the limit on the queue size
191 std::size_t GetSoftMaxSize() const { return producer_side_.GetSoftMaxSize(); }
192
193 /// @brief Gets the approximate size of queue
194 std::size_t GetSizeApproximate() const {
195 return producer_side_.GetSizeApproximate();
196 }
197
198 private:
199 class SingleProducerSide;
200 class MultiProducerSide;
201 class SingleConsumerSide;
202 class MultiConsumerSide;
203
204 /// Proxy-class makes synchronization of Push operations in multi or single
205 /// producer cases
206 using ProducerSide =
207 std::conditional_t<QueuePolicy::kIsMultipleProducer, MultiProducerSide,
208 SingleProducerSide>;
209
210 /// Proxy-class makes synchronization of Pop operations in multi or single
211 /// consumer cases
212 using ConsumerSide =
213 std::conditional_t<QueuePolicy::kIsMultipleConsumer, MultiConsumerSide,
214 SingleConsumerSide>;
215
216 template <typename Token>
217 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline) {
218 return producer_side_.Push(token, std::move(value), deadline);
219 }
220
221 template <typename Token>
222 [[nodiscard]] bool PushNoblock(Token& token, T&& value) {
223 return producer_side_.PushNoblock(token, std::move(value));
224 }
225
226 template <typename Token>
227 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
228 return consumer_side_.Pop(token, value, deadline);
229 }
230
231 template <typename Token>
232 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
233 return consumer_side_.PopNoblock(token, value);
234 }
235
236 void PrepareProducer() {
237 std::size_t old_producers_count{};
238 utils::AtomicUpdate(producers_count_, [&](auto old_value) {
239 UINVARIANT(QueuePolicy::kIsMultipleProducer || old_value != 1,
240 "Incorrect usage of queue producers");
241 old_producers_count = old_value;
242 return old_value == kCreatedAndDead ? 1 : old_value + 1;
243 });
244
245 if (old_producers_count == kCreatedAndDead) {
246 consumer_side_.ResumeBlockingOnPop();
247 }
248 }
249
250 void PrepareConsumer() {
251 std::size_t old_consumers_count{};
252 utils::AtomicUpdate(consumers_count_, [&](auto old_value) {
253 UINVARIANT(QueuePolicy::kIsMultipleConsumer || old_value != 1,
254 "Incorrect usage of queue consumers");
255 old_consumers_count = old_value;
256 return old_value == kCreatedAndDead ? 1 : old_value + 1;
257 });
258
259 if (old_consumers_count == kCreatedAndDead) {
260 producer_side_.ResumeBlockingOnPush();
261 }
262 }
263
264 void MarkConsumerIsDead() {
265 const auto new_consumers_count =
266 utils::AtomicUpdate(consumers_count_, [](auto old_value) {
267 return old_value == 1 ? kCreatedAndDead : old_value - 1;
268 });
269 if (new_consumers_count == kCreatedAndDead) {
270 producer_side_.StopBlockingOnPush();
271 }
272 }
273
274 void MarkProducerIsDead() {
275 const auto new_producers_count =
276 utils::AtomicUpdate(producers_count_, [](auto old_value) {
277 return old_value == 1 ? kCreatedAndDead : old_value - 1;
278 });
279 if (new_producers_count == kCreatedAndDead) {
280 consumer_side_.StopBlockingOnPop();
281 }
282 }
283
284 public: // TODO
285 /// @cond
286 bool NoMoreConsumers() const { return consumers_count_ == kCreatedAndDead; }
287
288 bool NoMoreProducers() const { return producers_count_ == kCreatedAndDead; }
289 /// @endcond
290
291 private:
292 template <typename Token>
293 void DoPush(Token& token, T&& value) {
294 if constexpr (std::is_same_v<Token, moodycamel::ProducerToken>) {
295 static_assert(QueuePolicy::kIsMultipleProducer);
296 queue_.enqueue(token, std::move(value));
297 } else if constexpr (std::is_same_v<Token, MultiProducerToken>) {
298 static_assert(QueuePolicy::kIsMultipleProducer);
299 queue_.enqueue(std::move(value));
300 } else {
301 static_assert(std::is_same_v<Token, impl::NoToken>);
302 static_assert(!QueuePolicy::kIsMultipleProducer);
303 queue_.enqueue(single_producer_token_, std::move(value));
304 }
305
306 consumer_side_.OnElementPushed();
307 }
308
309 template <typename Token>
310 [[nodiscard]] bool DoPop(Token& token, T& value) {
311 bool success{};
312
313 if constexpr (std::is_same_v<Token, moodycamel::ConsumerToken>) {
314 static_assert(QueuePolicy::kIsMultipleProducer);
315 success = queue_.try_dequeue(token, value);
316 } else if constexpr (std::is_same_v<Token, impl::MultiToken>) {
317 static_assert(QueuePolicy::kIsMultipleProducer);
318 success = queue_.try_dequeue(value);
319 } else {
320 static_assert(std::is_same_v<Token, impl::NoToken>);
321 static_assert(!QueuePolicy::kIsMultipleProducer);
322 success = queue_.try_dequeue_from_producer(single_producer_token_, value);
323 }
324
325 if (success) {
326 producer_side_.OnElementPopped(QueuePolicy::GetElementSize(value));
327 return true;
328 }
329
330 return false;
331 }
332
333 moodycamel::ConcurrentQueue<T> queue_{1};
334 std::atomic<std::size_t> consumers_count_{0};
335 std::atomic<std::size_t> producers_count_{0};
336
337 SingleProducerToken single_producer_token_;
338
339 ProducerSide producer_side_;
340 ConsumerSide consumer_side_;
341
342 static constexpr std::size_t kCreatedAndDead =
343 std::numeric_limits<std::size_t>::max();
344 static constexpr std::size_t kSemaphoreUnlockValue =
345 std::numeric_limits<std::size_t>::max() / 2;
346};
347
348// Single-producer ProducerSide implementation
349template <typename T, typename QueuePolicy>
350class GenericQueue<T, QueuePolicy>::SingleProducerSide final {
351 public:
352 explicit SingleProducerSide(GenericQueue& queue, std::size_t capacity)
353 : queue_(queue), used_capacity_(0), total_capacity_(capacity) {}
354
355 // Blocks if there is a consumer to Pop the current value and task
356 // shouldn't cancel and queue if full
357 template <typename Token>
358 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline) {
359 if (DoPush(token, std::move(value))) {
360 return true;
361 }
362
363 return non_full_event_.WaitForEventUntil(deadline) &&
364 // NOLINTNEXTLINE(bugprone-use-after-move)
365 DoPush(token, std::move(value));
366 }
367
368 template <typename Token>
369 [[nodiscard]] bool PushNoblock(Token& token, T&& value) {
370 return DoPush(token, std::move(value));
371 }
372
373 void OnElementPopped(std::size_t released_capacity) {
374 used_capacity_.fetch_sub(released_capacity);
375 non_full_event_.Send();
376 }
377
378 void StopBlockingOnPush() {
379 total_capacity_ += kSemaphoreUnlockValue;
380 non_full_event_.Send();
381 }
382
383 void ResumeBlockingOnPush() { total_capacity_ -= kSemaphoreUnlockValue; }
384
385 void SetSoftMaxSize(std::size_t new_capacity) {
386 const auto old_capacity = total_capacity_.exchange(new_capacity);
387 if (new_capacity > old_capacity) non_full_event_.Send();
388 }
389
390 std::size_t GetSoftMaxSize() const noexcept { return total_capacity_.load(); }
391
392 std::size_t GetSizeApproximate() const noexcept {
393 return used_capacity_.load();
394 }
395
396 private:
397 template <typename Token>
398 [[nodiscard]] bool DoPush(Token& token, T&& value) {
399 const std::size_t value_size = QueuePolicy::GetElementSize(value);
400 if (queue_.NoMoreConsumers() ||
401 used_capacity_.load() + value_size > total_capacity_.load()) {
402 return false;
403 }
404
405 used_capacity_.fetch_add(value_size);
406 queue_.DoPush(token, std::move(value));
407 non_full_event_.Reset();
408 return true;
409 }
410
411 GenericQueue& queue_;
412 engine::SingleConsumerEvent non_full_event_;
413 std::atomic<std::size_t> used_capacity_;
414 std::atomic<std::size_t> total_capacity_;
415};
416
417// Multi producer ProducerSide implementation
418template <typename T, typename QueuePolicy>
419class GenericQueue<T, QueuePolicy>::MultiProducerSide final {
420 public:
421 explicit MultiProducerSide(GenericQueue& queue, std::size_t capacity)
422 : queue_(queue),
423 remaining_capacity_(capacity),
424 remaining_capacity_control_(remaining_capacity_) {}
425
426 // Blocks if there is a consumer to Pop the current value and task
427 // shouldn't cancel and queue if full
428 template <typename Token>
429 [[nodiscard]] bool Push(Token& token, T&& value, engine::Deadline deadline) {
430 const std::size_t value_size = QueuePolicy::GetElementSize(value);
431 return remaining_capacity_.try_lock_shared_until_count(deadline,
432 value_size) &&
433 DoPush(token, std::move(value));
434 }
435
436 template <typename Token>
437 [[nodiscard]] bool PushNoblock(Token& token, T&& value) {
438 const std::size_t value_size = QueuePolicy::GetElementSize(value);
439 return remaining_capacity_.try_lock_shared_count(value_size) &&
440 DoPush(token, std::move(value));
441 }
442
443 void OnElementPopped(std::size_t value_size) {
444 remaining_capacity_.unlock_shared_count(value_size);
445 }
446
447 void StopBlockingOnPush() {
448 remaining_capacity_control_.SetCapacityOverride(0);
449 }
450
451 void ResumeBlockingOnPush() {
452 remaining_capacity_control_.RemoveCapacityOverride();
453 }
454
455 void SetSoftMaxSize(std::size_t count) {
456 remaining_capacity_control_.SetCapacity(count);
457 }
458
459 std::size_t GetSizeApproximate() const noexcept {
460 return remaining_capacity_.UsedApprox();
461 }
462
463 std::size_t GetSoftMaxSize() const noexcept {
464 return remaining_capacity_control_.GetCapacity();
465 }
466
467 private:
468 template <typename Token>
469 [[nodiscard]] bool DoPush(Token& token, T&& value) {
470 const std::size_t value_size = QueuePolicy::GetElementSize(value);
471 UASSERT(value_size > 0);
472 if (queue_.NoMoreConsumers()) {
473 remaining_capacity_.unlock_shared_count(value_size);
474 return false;
475 }
476
477 queue_.DoPush(token, std::move(value));
478 return true;
479 }
480
481 GenericQueue& queue_;
482 engine::CancellableSemaphore remaining_capacity_;
483 concurrent::impl::SemaphoreCapacityControl remaining_capacity_control_;
484};
485
486// Single consumer ConsumerSide implementation
487template <typename T, typename QueuePolicy>
488class GenericQueue<T, QueuePolicy>::SingleConsumerSide final {
489 public:
490 explicit SingleConsumerSide(GenericQueue& queue)
491 : queue_(queue), element_count_(0) {}
492
493 // Blocks only if queue is empty
494 template <typename Token>
495 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
496 while (!DoPop(token, value)) {
497 if (queue_.NoMoreProducers() ||
498 !nonempty_event_.WaitForEventUntil(deadline)) {
499 // Producer might have pushed something in queue between .pop()
500 // and !producer_is_created_and_dead_ check. Check twice to avoid
501 // TOCTOU.
502 return DoPop(token, value);
503 }
504 }
505 return true;
506 }
507
508 template <typename Token>
509 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
510 return DoPop(token, value);
511 }
512
513 void OnElementPushed() {
514 ++element_count_;
515 nonempty_event_.Send();
516 }
517
518 void StopBlockingOnPop() { nonempty_event_.Send(); }
519
520 void ResumeBlockingOnPop() {}
521
522 std::size_t GetElementCount() const { return element_count_; }
523
524 private:
525 template <typename Token>
526 [[nodiscard]] bool DoPop(Token& token, T& value) {
527 if (queue_.DoPop(token, value)) {
528 --element_count_;
529 nonempty_event_.Reset();
530 return true;
531 }
532 return false;
533 }
534
535 GenericQueue& queue_;
536 engine::SingleConsumerEvent nonempty_event_;
537 std::atomic<std::size_t> element_count_;
538};
539
540// Multi consumer ConsumerSide implementation
541template <typename T, typename QueuePolicy>
542class GenericQueue<T, QueuePolicy>::MultiConsumerSide final {
543 public:
544 explicit MultiConsumerSide(GenericQueue& queue)
545 : queue_(queue),
546 element_count_(kUnbounded),
547 element_count_control_(element_count_) {
548 const bool success = element_count_.try_lock_shared_count(kUnbounded);
549 UASSERT(success);
550 }
551
552 ~MultiConsumerSide() { element_count_.unlock_shared_count(kUnbounded); }
553
554 // Blocks only if queue is empty
555 template <typename Token>
556 [[nodiscard]] bool Pop(Token& token, T& value, engine::Deadline deadline) {
557 return element_count_.try_lock_shared_until(deadline) &&
558 DoPop(token, value);
559 }
560
561 template <typename Token>
562 [[nodiscard]] bool PopNoblock(Token& token, T& value) {
563 return element_count_.try_lock_shared() && DoPop(token, value);
564 }
565
566 void OnElementPushed() { element_count_.unlock_shared(); }
567
568 void StopBlockingOnPop() {
569 element_count_control_.SetCapacityOverride(kUnbounded +
570 kSemaphoreUnlockValue);
571 }
572
573 void ResumeBlockingOnPop() {
574 element_count_control_.RemoveCapacityOverride();
575 }
576
577 std::size_t GetElementCount() const {
578 const std::size_t cur_element_count = element_count_.RemainingApprox();
579 if (cur_element_count < kUnbounded) {
580 return cur_element_count;
581 } else if (cur_element_count <= kSemaphoreUnlockValue) {
582 return 0;
583 }
584 return cur_element_count - kSemaphoreUnlockValue;
585 }
586
587 private:
588 template <typename Token>
589 [[nodiscard]] bool DoPop(Token& token, T& value) {
590 while (true) {
591 if (queue_.DoPop(token, value)) {
592 return true;
593 }
594 if (queue_.NoMoreProducers()) {
595 element_count_.unlock_shared();
596 return false;
597 }
598 // We can get here if another consumer steals our element, leaving another
599 // element in a Moodycamel sub-queue that we have already passed.
600 }
601 }
602
603 GenericQueue& queue_;
604 engine::CancellableSemaphore element_count_;
605 concurrent::impl::SemaphoreCapacityControl element_count_control_;
606};
607
608/// @ingroup userver_concurrency
609///
610/// @brief Non FIFO multiple producers multiple consumers queue.
611///
612/// Items from the same producer are always delivered in the production order.
613/// Items from different producers (or when using a `MultiProducer` token) are
614/// delivered in an unspecified order. In other words, FIFO order is maintained
615/// only within producers, but not between them. This may lead to increased peak
616/// latency of item processing.
617///
618/// In exchange for this, the queue has lower contention and increased
619/// throughput compared to a conventional lock-free queue.
620///
621/// @see @ref scripts/docs/en/userver/synchronization.md
622template <typename T>
623using NonFifoMpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, true>>;
624
625/// @ingroup userver_concurrency
626///
627/// @brief Non FIFO multiple producers single consumer queue.
628///
629/// @see concurrent::NonFifoMpmcQueue for the description of what NonFifo means.
630/// @see @ref scripts/docs/en/userver/synchronization.md
631template <typename T>
632using NonFifoMpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<true, false>>;
633
634/// @ingroup userver_concurrency
635///
636/// @brief Single producer multiple consumers queue.
637///
638/// @see @ref scripts/docs/en/userver/synchronization.md
639template <typename T>
640using SpmcQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, true>>;
641
642/// @ingroup userver_concurrency
643///
644/// @brief Single producer single consumer queue.
645///
646/// @see @ref scripts/docs/en/userver/synchronization.md
647template <typename T>
648using SpscQueue = GenericQueue<T, impl::SimpleQueuePolicy<false, false>>;
649
650/// @ingroup userver_concurrency
651///
652/// @brief Single producer single consumer queue of std::string which is bounded
653/// bytes inside.
654///
655/// @see @ref scripts/docs/en/userver/synchronization.md
656using StringStreamQueue =
657 GenericQueue<std::string, impl::ContainerQueuePolicy<false, false>>;
658
659} // namespace concurrent
660
661USERVER_NAMESPACE_END