userver: userver/rcu/rcu.hpp Source File
Loading...
Searching...
No Matches
rcu.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/rcu/rcu.hpp
4/// @brief @copybrief rcu::Variable
5
6#include <atomic>
7#include <cstdlib>
8#include <memory>
9#include <optional>
10#include <utility>
11
12// TODO remove extra includes
13#include <list>
14#include <unordered_set>
15#include <userver/compiler/thread_local.hpp>
16
17#include <userver/concurrent/impl/asymmetric_fence.hpp>
18#include <userver/concurrent/impl/intrusive_hooks.hpp>
19#include <userver/concurrent/impl/intrusive_stack.hpp>
20#include <userver/concurrent/impl/striped_read_indicator.hpp>
21#include <userver/engine/async.hpp>
22#include <userver/engine/mutex.hpp>
23#include <userver/logging/log.hpp>
24#include <userver/rcu/fwd.hpp>
25#include <userver/utils/assert.hpp>
26#include <userver/utils/impl/wait_token_storage.hpp>
27
28USERVER_NAMESPACE_BEGIN
29
30/// @brief Read-Copy-Update
31///
32/// @see Based on ideas from
33/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
34/// with modified API
35namespace rcu {
36
37namespace impl {
38
39template <typename T>
40struct SnapshotRecord final {
41 std::optional<T> data;
42 concurrent::impl::StripedReadIndicator indicator;
43 concurrent::impl::SinglyLinkedHook<SnapshotRecord> free_list_hook;
44 SnapshotRecord* next_retired{nullptr};
45};
46
47// Used instead of concurrent::impl::MemberHook to avoid instantiating
48// SnapshotRecord<T> ahead of time.
49template <typename T>
50struct FreeListHookGetter {
51 auto& operator()(SnapshotRecord<T>& node) const noexcept {
52 return node.free_list_hook;
53 }
54};
55
56template <typename T>
57using SnapshotRecordFreeList =
58 concurrent::impl::IntrusiveStack<SnapshotRecord<T>, FreeListHookGetter<T>>;
59
60template <typename T>
61class SnapshotRecordRetiredList final {
62 public:
63 SnapshotRecordRetiredList() = default;
64
65 bool IsEmpty() const noexcept { return head_ == nullptr; }
66
67 void Push(SnapshotRecord<T>& record) noexcept {
68 record.next_retired = head_;
69 head_ = &record;
70 }
71
72 template <typename Predicate, typename Disposer>
73 void RemoveAndDisposeIf(Predicate predicate, Disposer disposer) {
74 SnapshotRecord<T>** ptr_to_current = &head_;
75
76 while (*ptr_to_current != nullptr) {
77 SnapshotRecord<T>* const current = *ptr_to_current;
78
79 if (predicate(*current)) {
80 *ptr_to_current = std::exchange(current->next_retired, nullptr);
81 disposer(*current);
82 } else {
83 ptr_to_current = &current->next_retired;
84 }
85 }
86 }
87
88 private:
89 SnapshotRecord<T>* head_{nullptr};
90};
91
92} // namespace impl
93
94/// Default Rcu traits.
95/// - `MutexType` is a writer's mutex type that has to be used to protect
96/// structure on update
97template <typename T>
98struct DefaultRcuTraits {
99 using MutexType = engine::Mutex;
100};
101
102/// Reader smart pointer for rcu::Variable<T>. You may use operator*() or
103/// operator->() to do something with the stored value. Once created,
104/// ReadablePtr references the same immutable value: if Variable's value is
105/// changed during ReadablePtr lifetime, it will not affect value referenced by
106/// ReadablePtr.
107template <typename T, typename RcuTraits>
108class [[nodiscard]] ReadablePtr final {
109 public:
110 explicit ReadablePtr(const Variable<T, RcuTraits>& ptr) {
111 auto* record = ptr.current_.load();
112
113 while (true) {
114 // Lock 'record', which may or may not be 'current_' by the time we got
115 // there.
116 lock_ = record->indicator.Lock();
117
118 // seq_cst is required for indicator.Lock in the following case.
119 //
120 // Reader thread point-of-view:
121 // 1. [reader] load current_
122 // 2. [reader] indicator.Lock
123 // 3. [reader] load current_
124 // 4. [writer] store current_
125 // 5. [writer] indicator.IsFree
126 //
127 // Given seq_cst only on (3), (4), and (5), the writer can see
128 // (2) after (5). In this case the reader will think that it has
129 // successfully taken the lock, which is false.
130 //
131 // So we need seq_cst on all of (2), (3), (4), (5). Making (2) seq_cst is
132 // somewhat expensive, but this is a well-known cost of hazard pointers.
133 //
134 // The seq_cst cost can be mitigated by utilizing asymmetric fences.
135 // This asymmetric fence effectively grants std::memory_order_seq_cst
136 // to indicator.Lock when applied together with AsymmetricThreadFenceHeavy
137 // in (5). The technique is taken from Folly HazPtr.
138 concurrent::impl::AsymmetricThreadFenceLight();
139
140 // Is the record we locked 'current_'? If so, congratulations, we are
141 // holding a lock to 'current_'.
142 auto* new_current = ptr.current_.load(std::memory_order_seq_cst);
143 if (new_current == record) break;
144
145 // 'current_' changed, try again
146 record = new_current;
147 }
148
149 ptr_ = &*record->data;
150 }
151
152 ReadablePtr(ReadablePtr&& other) noexcept = default;
153 ReadablePtr& operator=(ReadablePtr&& other) noexcept = default;
154 ReadablePtr(const ReadablePtr& other) = default;
155 ReadablePtr& operator=(const ReadablePtr& other) = default;
156 ~ReadablePtr() = default;
157
158 const T* Get() const& {
159 UASSERT(ptr_);
160 return ptr_;
161 }
162
163 const T* Get() && { return GetOnRvalue(); }
164
165 const T* operator->() const& { return Get(); }
166 const T* operator->() && { return GetOnRvalue(); }
167
168 const T& operator*() const& { return *Get(); }
169 const T& operator*() && { return *GetOnRvalue(); }
170
171 private:
172 const T* GetOnRvalue() {
173 static_assert(!sizeof(T),
174 "Don't use temporary ReadablePtr, store it to a variable");
175 std::abort();
176 }
177
178 const T* ptr_;
179 concurrent::impl::StripedReadIndicatorLock lock_;
180};
181
182/// Smart pointer for rcu::Variable<T> for changing RCU value. It stores a
183/// reference to a to-be-changed value and allows one to mutate the value (e.g.
184/// add items to std::unordered_map). Changed value is not visible to readers
185/// until explicit store by Commit. Only a single writer may own a WritablePtr
186/// associated with the same Variable, so WritablePtr creates a critical
187/// section. This critical section doesn't affect readers, so a slow writer
188/// doesn't block readers.
189/// @note you may not pass WritablePtr between coroutines as it owns
190/// engine::Mutex, which must be unlocked in the same coroutine that was used to
191/// lock the mutex.
192template <typename T, typename RcuTraits>
193class [[nodiscard]] WritablePtr final {
194 public:
195 /// @cond
196 // For internal use only. Use `var.StartWrite()` instead
197 explicit WritablePtr(Variable<T, RcuTraits>& var)
198 : var_(var),
199 lock_(var.mutex_),
200 record_(&var.EmplaceSnapshot(*var.current_.load()->data)) {}
201
202 // For internal use only. Use `var.Emplace(args...)` instead
203 template <typename... Args>
204 WritablePtr(Variable<T, RcuTraits>& var, std::in_place_t,
205 Args&&... initial_value_args)
206 : var_(var),
207 lock_(var.mutex_),
208 record_(
209 &var.EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
210 /// @endcond
211
212 WritablePtr(WritablePtr&& other) noexcept
213 : var_(other.var_),
214 lock_(std::move(other.lock_)),
215 record_(std::exchange(other.record_, nullptr)) {}
216
217 ~WritablePtr() {
218 if (record_) {
219 // TODO should DeleteSnapshot instead?
220 var_.DeleteSnapshotSync(*record_);
221 }
222 }
223
224 /// Store the changed value in Variable. After Commit() the value becomes
225 /// visible to new readers (IOW, Variable::Read() returns ReadablePtr
226 /// referencing the stored value, not an old value).
227 void Commit() {
228 UASSERT(record_ != nullptr);
229 var_.DoAssign(*std::exchange(record_, nullptr), lock_);
230 lock_.unlock();
231 }
232
233 T* Get() & {
234 UASSERT(record_ != nullptr);
235 return &*record_->data;
236 }
237
238 T* Get() && { return GetOnRvalue(); }
239
240 T* operator->() & { return Get(); }
241 T* operator->() && { return GetOnRvalue(); }
242
243 T& operator*() & { return *Get(); }
244 T& operator*() && { return *GetOnRvalue(); }
245
246 private:
247 [[noreturn]] static T* GetOnRvalue() {
248 static_assert(!sizeof(T),
249 "Don't use temporary WritablePtr, store it to a variable");
250 std::abort();
251 }
252
253 Variable<T, RcuTraits>& var_;
254 std::unique_lock<typename RcuTraits::MutexType> lock_;
255 impl::SnapshotRecord<T>* record_;
256};
257
258/// @brief Can be passed to `rcu::Variable` as the first argument to customize
259/// whether old values should be destroyed asynchronously.
260enum class DestructionType { kSync, kAsync };
261
262/// @ingroup userver_concurrency userver_containers
263///
264/// @brief Read-Copy-Update variable
265///
266/// @see Based on ideas from
267/// http://www.drdobbs.com/lock-free-data-structures-with-hazard-po/184401890
268/// with modified API.
269///
270/// A variable with MT-access pattern "very often reads, seldom writes". It is
271/// specially optimized for reads. On read, one obtains a ReaderPtr<T> from it
272/// and uses the obtained value as long as it wants to. On write, one obtains a
273/// WritablePtr<T> with a copy of the last version of the value, makes some
274/// changes to it, and commits the result to update current variable value (does
275/// Read-Copy-Update). Old version of the value is not freed on update, it will
276/// be eventually freed when a subsequent writer identifies that nobody works
277/// with this version.
278///
279/// @note There is no way to create a "null" `Variable`.
280///
281/// ## Example usage:
282///
283/// @snippet rcu/rcu_test.cpp Sample rcu::Variable usage
284///
285/// @see @ref scripts/docs/en/userver/synchronization.md
286template <typename T, typename RcuTraits>
287class Variable final {
288 public:
289 using MutexType = typename RcuTraits::MutexType;
290
291 /// Create a new `Variable` with an in-place constructed initial value.
292 /// Asynchronous destruction is enabled by default.
293 /// @param initial_value_args arguments passed to the constructor of the
294 /// initial value
295 template <typename... Args>
296 // TODO make explicit
297 Variable(Args&&... initial_value_args)
298 : destruction_type_(std::is_trivially_destructible_v<T> ||
299 std::is_same_v<T, std::string> ||
300 !std::is_same_v<MutexType, engine::Mutex>
301 ? DestructionType::kSync
302 : DestructionType::kAsync),
303 current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
304
305 /// Create a new `Variable` with an in-place constructed initial value.
306 /// @param destruction_type controls whether destruction of old values should
307 /// be performed asynchronously
308 /// @param initial_value_args arguments passed to the constructor of the
309 /// initial value
310 template <typename... Args>
311 explicit Variable(DestructionType destruction_type,
312 Args&&... initial_value_args)
313 : destruction_type_(destruction_type),
314 current_(&EmplaceSnapshot(std::forward<Args>(initial_value_args)...)) {}
315
316 Variable(const Variable&) = delete;
317 Variable(Variable&&) = delete;
318 Variable& operator=(const Variable&) = delete;
319 Variable& operator=(Variable&&) = delete;
320
321 ~Variable() {
322 {
323 auto* record = current_.load();
324 UASSERT_MSG(record->indicator.IsFree(),
325 "RCU variable is destroyed while being used");
326 delete record;
327 }
328
329 retired_list_.RemoveAndDisposeIf(
330 [](impl::SnapshotRecord<T>&) { return true; },
331 [](impl::SnapshotRecord<T>& record) {
332 UASSERT_MSG(record.indicator.IsFree(),
333 "RCU variable is destroyed while being used");
334 delete &record;
335 });
336
337 if (destruction_type_ == DestructionType::kAsync) {
338 wait_token_storage_.WaitForAllTokens();
339 }
340
341 free_list_.DisposeUnsafe(
342 [](impl::SnapshotRecord<T>& record) { delete &record; });
343 }
344
345 /// Obtain a smart pointer which can be used to read the current value.
346 ReadablePtr<T, RcuTraits> Read() const {
347 return ReadablePtr<T, RcuTraits>(*this);
348 }
349
350 /// Obtain a copy of contained value.
351 T ReadCopy() const {
352 auto ptr = Read();
353 return *ptr;
354 }
355
356 /// Obtain a smart pointer that will *copy* the current value. The pointer can
357 /// be used to make changes to the value and to set the `Variable` to the
358 /// changed value.
359 WritablePtr<T, RcuTraits> StartWrite() {
360 return WritablePtr<T, RcuTraits>(*this);
361 }
362
363 /// Obtain a smart pointer to a newly in-place constructed value, but does
364 /// not replace the current one yet (in contrast with regular `Emplace`).
365 template <typename... Args>
366 WritablePtr<T, RcuTraits> StartWriteEmplace(Args&&... args) {
367 return WritablePtr<T, RcuTraits>(*this, std::in_place,
368 std::forward<Args>(args)...);
369 }
370
371 /// Replaces the `Variable`'s value with the provided one.
372 void Assign(T new_value) {
373 WritablePtr<T, RcuTraits>(*this, std::in_place, std::move(new_value))
374 .Commit();
375 }
376
377 /// Replaces the `Variable`'s value with an in-place constructed one.
378 template <typename... Args>
379 void Emplace(Args&&... args) {
380 WritablePtr<T, RcuTraits>(*this, std::in_place, std::forward<Args>(args)...)
381 .Commit();
382 }
383
384 void Cleanup() {
385 std::unique_lock lock(mutex_, std::try_to_lock);
386 if (!lock.owns_lock()) {
387 // Someone is already assigning to the RCU. They will call ScanRetireList
388 // in the process.
389 return;
390 }
391 ScanRetiredList(lock);
392 }
393
394 private:
395 friend class ReadablePtr<T, RcuTraits>;
396 friend class WritablePtr<T, RcuTraits>;
397
398 void DoAssign(impl::SnapshotRecord<T>& new_snapshot,
399 std::unique_lock<MutexType>& lock) {
400 UASSERT(lock.owns_lock());
401
402 // Note: exchange RMW operation would not give any benefits here.
403 auto* const old_snapshot = current_.load();
404 current_.store(&new_snapshot, std::memory_order_seq_cst);
405
406 UASSERT(old_snapshot);
407 retired_list_.Push(*old_snapshot);
408 ScanRetiredList(lock);
409 }
410
411 template <typename... Args>
412 [[nodiscard]] impl::SnapshotRecord<T>& EmplaceSnapshot(Args&&... args) {
413 auto* const free_list_record = free_list_.TryPop();
414 auto& record =
415 free_list_record ? *free_list_record : *new impl::SnapshotRecord<T>{};
416 UASSERT(!record.data);
417
418 try {
419 record.data.emplace(std::forward<Args>(args)...);
420 } catch (...) {
421 if (free_list_record) {
422 free_list_.Push(record);
423 } else {
424 // Important to delete this way in the rcu::Variable's constructor,
425 // otherwise we'd have to clean up free_list_ there.
426 delete &record;
427 }
428 throw;
429 }
430
431 return record;
432 }
433
434 void ScanRetiredList(std::unique_lock<MutexType>& lock) noexcept {
435 UASSERT(lock.owns_lock());
436 if (retired_list_.IsEmpty()) return;
437
438 concurrent::impl::AsymmetricThreadFenceHeavy();
439
440 retired_list_.RemoveAndDisposeIf(
441 [](impl::SnapshotRecord<T>& record) {
442 return record.indicator.IsFree();
443 },
444 [&](impl::SnapshotRecord<T>& record) { DeleteSnapshot(record); });
445 }
446
447 void DeleteSnapshot(impl::SnapshotRecord<T>& record) {
448 switch (destruction_type_) {
449 case DestructionType::kSync:
450 DeleteSnapshotSync(record);
451 break;
452 case DestructionType::kAsync:
453 engine::CriticalAsyncNoSpan([this, &record,
454 token = wait_token_storage_.GetToken()] {
455 DeleteSnapshotSync(record);
456 }).Detach();
457 break;
458 }
459 }
460
461 void DeleteSnapshotSync(impl::SnapshotRecord<T>& record) noexcept {
462 record.data.reset();
463 free_list_.Push(record);
464 }
465
466 const DestructionType destruction_type_;
467 // Covers current_ writes, free_list_.Pop, retired_list_
468 MutexType mutex_;
469 impl::SnapshotRecordFreeList<T> free_list_;
470 impl::SnapshotRecordRetiredList<T> retired_list_;
471 std::atomic<impl::SnapshotRecord<T>*> current_;
472 utils::impl::WaitTokenStorage wait_token_storage_;
473};
474
475} // namespace rcu
476
477USERVER_NAMESPACE_END