userver: userver/concurrent/queue_helpers.hpp Source File
Loading...
Searching...
No Matches
queue_helpers.hpp
1#pragma once
2
3#include <memory>
4
5#include <userver/engine/deadline.hpp>
6
7USERVER_NAMESPACE_BEGIN
8
9namespace concurrent {
10
11namespace impl {
12
13struct NoToken final {
14 template <typename LockFreeQueue>
15 explicit NoToken(LockFreeQueue& /*unused*/) {}
16};
17
18struct MultiToken final {
19 template <typename LockFreeQueue>
20 explicit MultiToken(LockFreeQueue& /*unused*/) {}
21};
22
23} // namespace impl
24
25/// @warning A single Producer must not be used from multiple threads
26/// concurrently
27template <typename QueueType, typename ProducerToken,
28 typename EmplaceEnablerType>
29class Producer final {
30 static_assert(
31 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
32 "Do not instantiate Producer on your own. Use Producer type alias "
33 "from queue");
34
35 using ValueType = typename QueueType::ValueType;
36
37 public:
38 Producer(const Producer&) = delete;
39 Producer(Producer&&) noexcept = default;
40 Producer& operator=(const Producer&) = delete;
41 Producer& operator=(Producer&& other) noexcept {
42 queue_.swap(other.queue_);
43 std::swap(token_, other.token_);
44 return *this;
45 }
46
47 ~Producer() {
48 if (queue_) queue_->MarkProducerIsDead();
49 }
50
51 /// Push element into queue. May wait asynchronously if the queue is full.
52 /// Leaves the `value` unmodified if the operation does not succeed.
53 /// @returns whether push succeeded before the deadline and before the task
54 /// was canceled.
55 [[nodiscard]] bool Push(ValueType&& value,
56 engine::Deadline deadline = {}) const {
57 UASSERT(queue_);
58 return queue_->Push(token_, std::move(value), deadline);
59 }
60
61 /// Try to push element into queue without blocking. May be used in
62 /// non-coroutine environment. Leaves the `value` unmodified if the operation
63 /// does not succeed.
64 /// @returns whether push succeeded.
65 [[nodiscard]] bool PushNoblock(ValueType&& value) const {
66 UASSERT(queue_);
67 return queue_->PushNoblock(token_, std::move(value));
68 }
69
70 void Reset() && {
71 if (queue_) queue_->MarkProducerIsDead();
72 queue_.reset();
73 [[maybe_unused]] ProducerToken for_destruction = std::move(token_);
74 }
75
76 /// Const access to source queue.
77 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const {
78 return {queue_};
79 }
80
81 /// @cond
82 // For internal use only
83 Producer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
84 : queue_(std::move(queue)), token_(queue_->queue_) {}
85 /// @endcond
86
87 private:
88 std::shared_ptr<QueueType> queue_;
89 mutable ProducerToken token_;
90};
91
92/// @warning A single Consumer must not be used from multiple threads
93/// concurrently
94template <typename QueueType, typename ConsumerToken,
95 typename EmplaceEnablerType>
96class Consumer final {
97 static_assert(
98 std::is_same_v<EmplaceEnablerType, typename QueueType::EmplaceEnabler>,
99 "Do not instantiate Consumer on your own. Use Consumer type alias "
100 "from queue");
101
102 using ValueType = typename QueueType::ValueType;
103
104 public:
105 Consumer(const Consumer&) = delete;
106 Consumer(Consumer&&) noexcept = default;
107 Consumer& operator=(const Consumer&) = delete;
108 Consumer& operator=(Consumer&& other) noexcept {
109 queue_.swap(other.queue_);
110 std::swap(token_, other.token_);
111 return *this;
112 }
113
114 ~Consumer() {
115 if (queue_) queue_->MarkConsumerIsDead();
116 }
117
118 /// Pop element from queue. May wait asynchronously if the queue is empty,
119 /// but the producer is alive.
120 /// @returns whether something was popped before the deadline.
121 /// @note `false` can be returned before the deadline
122 /// when the producer is no longer alive.
123 [[nodiscard]] bool Pop(ValueType& value,
124 engine::Deadline deadline = {}) const {
125 return queue_->Pop(token_, value, deadline);
126 }
127
128 /// Try to pop element from queue without blocking. May be used in
129 /// non-coroutine environment
130 /// @return whether something was popped.
131 [[nodiscard]] bool PopNoblock(ValueType& value) const {
132 return queue_->PopNoblock(token_, value);
133 }
134
135 void Reset() && {
136 if (queue_) queue_->MarkConsumerIsDead();
137 queue_.reset();
138 [[maybe_unused]] ConsumerToken for_destruction = std::move(token_);
139 }
140
141 /// Const access to source queue.
142 [[nodiscard]] std::shared_ptr<const QueueType> Queue() const {
143 return {queue_};
144 }
145
146 /// @cond
147 // For internal use only
148 Consumer(std::shared_ptr<QueueType> queue, EmplaceEnablerType /*unused*/)
149 : queue_(std::move(queue)), token_(queue_->queue_) {}
150 /// @endcond
151
152 private:
153 std::shared_ptr<QueueType> queue_{};
154 mutable ConsumerToken token_;
155};
156
157} // namespace concurrent
158
159USERVER_NAMESPACE_END