userver: userver/drivers/impl/connection_pool_base.hpp Source File
Loading...
Searching...
No Matches
connection_pool_base.hpp
Go to the documentation of this file.
1#pragma once
2
3/// @file userver/drivers/impl/connection_pool_base.hpp
4/// @brief @copybrief drivers::impl::ConnectionPoolBase
5
6#include <atomic>
7#include <chrono>
8#include <memory>
9#include <stdexcept>
10#include <utility>
11#include <vector>
12
13#include <boost/lockfree/queue.hpp>
14
15#include <userver/engine/async.hpp>
16#include <userver/engine/deadline.hpp>
17#include <userver/engine/get_all.hpp>
18#include <userver/engine/semaphore.hpp>
19#include <userver/logging/log.hpp>
20#include <userver/utils/assert.hpp>
21
22USERVER_NAMESPACE_BEGIN
23
24namespace drivers::impl {
25
26/// @brief Thrown when no connection could be acquired from the pool within
27/// specified timeout.
28class PoolWaitLimitExceededError : public std::runtime_error {
29 public:
30 PoolWaitLimitExceededError();
31};
32
33/// @brief Base connection pool implementation to be derived in different
34/// drivers. Takes care of synchronization, pool limits (min/max, simultaneously
35/// connecting etc.) and provides hooks for metrics.
36template <class Connection, class Derived>
37class ConnectionPoolBase : public std::enable_shared_from_this<Derived> {
38 public:
39 protected:
40 using ConnectionRawPtr = Connection*;
41 using ConnectionUniquePtr = std::unique_ptr<Connection>;
42
43 struct ConnectionHolder final {
44 std::shared_ptr<Derived> pool_ptr;
45 ConnectionUniquePtr connection_ptr;
46 };
47
48 /// @brief Constructor, doesn't create any connections, one should call `Init`
49 /// to initialize the pool.
50 ConnectionPoolBase(std::size_t max_pool_size,
51 std::size_t max_simultaneously_connecting_clients);
52 /// @brief Destructor. One should call `Reset` before the destructor is
53 /// invoked.
54 ~ConnectionPoolBase();
55
56 /// @brief Initializes the pool with given limits.
57 /// Uses some methods that must or may be overridden in derived class, hence
58 /// a separate method and not called from base class constructor.
59 /// Derived class constructor is a good place to call this.
60 void Init(std::size_t initial_size,
61 std::chrono::milliseconds connection_setup_timeout);
62 /// @brief Resets the pool, destroying all managed connections.
63 /// Uses some methods that may be overridden in derived class, hence
64 /// a separate function and not called from base class constructor.
65 /// Derived class destructor is a good place to call this.
66 void Reset();
67
68 /// @brief Acquires a connection from the pool.
69 ConnectionHolder AcquireConnection(engine::Deadline deadline);
70 /// @brief Returns the connection to the pool.
71 /// If `connection_ptr->IsBroken()` is true, the connection is destroyed.
72 void ReleaseConnection(ConnectionUniquePtr connection_ptr);
73
74 /// @brief Pops a connection from the pool, might create a new connection if
75 /// there are no ready connections.
76 ConnectionUniquePtr Pop(engine::Deadline deadline);
77 /// @brief Tries to pop a ready connection from the pool,
78 /// may return `nullptr`.
79 ConnectionUniquePtr TryPop();
80
81 /// @brief Returns the connection to the pool, internal.
82 /// If `connection_ptr->IsBroken()` is true, the connection is destroyed.
83 /// Doesn't affect pool limits, so you shouldn't call this directly,
84 /// unless the connection is acquired from `TryPop` - then it's the only
85 /// correct way to return it back.
86 void DoRelease(ConnectionUniquePtr connection_ptr);
87
88 /// @brief Creates a new connection and tries to push it into the ready
89 /// queue. If the queue is full, the connection is dropped immediately.
90 void PushConnection(engine::Deadline deadline);
91 /// @brief Drops the connections - destroys the object and accounts for that.
92 void Drop(ConnectionRawPtr connection_ptr) noexcept;
93
94 /// @brief Returns the approximate count of alive connections (given away and
95 /// ready to use).
96 std::size_t AliveConnectionsCountApprox() const;
97
98 /// @brief Call this method if for some reason a connection previously
99 /// acquired from the pool won't be returned into it.
100 /// If one fails to do so pool limits might shrink until pool becomes
101 /// unusable.
102 void NotifyConnectionWontBeReleased();
103
104 private:
105 Derived& AsDerived() noexcept;
106 const Derived& AsDerived() const noexcept;
107
108 ConnectionUniquePtr CreateConnection(
109 const engine::SemaphoreLock& connecting_lock, engine::Deadline deadline);
110
111 void CleanupQueue();
112
113 void EnsureInitialized() const;
114 void EnsureReset() const;
115
116 engine::Semaphore given_away_semaphore_;
117 engine::Semaphore connecting_semaphore_;
118
119 boost::lockfree::queue<ConnectionRawPtr> queue_;
120 std::atomic<std::size_t> alive_connections_{0};
121
122 bool initialized_{false};
123 bool reset_{false};
124};
125
126template <class Connection, class Derived>
127ConnectionPoolBase<Connection, Derived>::ConnectionPoolBase(
128 std::size_t max_pool_size,
129 std::size_t max_simultaneously_connecting_clients)
130 : given_away_semaphore_{max_pool_size},
131 connecting_semaphore_{max_simultaneously_connecting_clients},
132 queue_{max_pool_size} {}
133
134template <class Connection, class Derived>
135ConnectionPoolBase<Connection, Derived>::~ConnectionPoolBase() {
136 if (initialized_) {
137 // We don't call Reset here, because dropping a connection (when cleaning
138 // up the queue) might invoke virtual methods, and the derived class is
139 // already destroyed, so unexpected things could happen.
140 // However, we assert that derived class performed a cleanup itself.
141 EnsureReset();
142 }
143}
144
145template <class Connection, class Derived>
146void ConnectionPoolBase<Connection, Derived>::Init(
147 std::size_t initial_size,
148 std::chrono::milliseconds connection_setup_timeout) {
149 UASSERT_MSG(!initialized_, "Calling Init multiple times is a API misuse");
150 // We mark the pool as initialized even if this method throws, because
151 // this `initialized_` field is here just to ensure correct API usage,
152 // doesn't have much meaning aside from that.
153 initialized_ = true;
154
155 std::vector<engine::TaskWithResult<void>> init_tasks{};
156 init_tasks.reserve(initial_size);
157
158 for (std::size_t i = 0; i < initial_size; ++i) {
159 init_tasks.push_back(engine::AsyncNoSpan([this, connection_setup_timeout] {
160 PushConnection(engine::Deadline::FromDuration(connection_setup_timeout));
161 }));
162 }
163
164 try {
165 engine::GetAll(init_tasks);
166 } catch (const std::exception& ex) {
167 LOG_WARNING() << "Failed to properly setup connection pool: " << ex;
168 throw;
169 }
170}
171
172template <class Connection, class Derived>
173void ConnectionPoolBase<Connection, Derived>::Reset() {
174 UASSERT_MSG(!reset_, "Calling Reset multiple times is a API misuse");
175 reset_ = true;
176
177 CleanupQueue();
178}
179
180template <class Connection, class Derived>
181typename ConnectionPoolBase<Connection, Derived>::ConnectionHolder
182ConnectionPoolBase<Connection, Derived>::AcquireConnection(
183 engine::Deadline deadline) {
184 EnsureInitialized();
185
186 auto connection_ptr = Pop(deadline);
187 return {this->shared_from_this(), std::move(connection_ptr)};
188}
189
190template <class Connection, class Derived>
191void ConnectionPoolBase<Connection, Derived>::ReleaseConnection(
192 ConnectionUniquePtr connection_ptr) {
193 EnsureInitialized();
194 UASSERT(connection_ptr);
195
196 DoRelease(std::move(connection_ptr));
197
198 given_away_semaphore_.unlock_shared();
199 AsDerived().AccountConnectionReleased();
200}
201
202template <class Connection, class Derived>
203Derived& ConnectionPoolBase<Connection, Derived>::AsDerived() noexcept {
204 return *static_cast<Derived*>(this);
205}
206
207template <class Connection, class Derived>
208const Derived& ConnectionPoolBase<Connection, Derived>::AsDerived() const
209 noexcept {
210 return *static_cast<const Derived*>(this);
211}
212
213template <class Connection, class Derived>
214typename ConnectionPoolBase<Connection, Derived>::ConnectionUniquePtr
215ConnectionPoolBase<Connection, Derived>::CreateConnection(
216 const engine::SemaphoreLock& connecting_lock, engine::Deadline deadline) {
217 EnsureInitialized();
218
219 UASSERT(connecting_lock.OwnsLock());
220 auto connection_ptr = AsDerived().DoCreateConnection(deadline);
221
222 alive_connections_.fetch_add(1);
223 AsDerived().AccountConnectionCreated();
224
225 return connection_ptr;
226}
227
228template <class Connection, class Derived>
229typename ConnectionPoolBase<Connection, Derived>::ConnectionUniquePtr
230ConnectionPoolBase<Connection, Derived>::Pop(engine::Deadline deadline) {
231 EnsureInitialized();
232
233 engine::SemaphoreLock given_away_lock{given_away_semaphore_, deadline};
234 if (!given_away_lock.OwnsLock()) {
235 AsDerived().AccountOverload();
236 throw PoolWaitLimitExceededError{};
237 }
238
239 auto connection_ptr = TryPop();
240 if (!connection_ptr) {
241 engine::SemaphoreLock connecting_lock{connecting_semaphore_, deadline};
242
243 connection_ptr = TryPop();
244 if (!connection_ptr) {
245 if (!connecting_lock.OwnsLock()) {
246 AsDerived().AccountOverload();
247 throw PoolWaitLimitExceededError{};
248 }
249 connection_ptr = CreateConnection(connecting_lock, deadline);
250 }
251 }
252
253 UASSERT(connection_ptr);
254
255 given_away_lock.Release();
256 AsDerived().AccountConnectionAcquired();
257
258 return connection_ptr;
259}
260
261template <class Connection, class Derived>
262typename ConnectionPoolBase<Connection, Derived>::ConnectionUniquePtr
263ConnectionPoolBase<Connection, Derived>::TryPop() {
264 EnsureInitialized();
265
266 ConnectionRawPtr connection_ptr{nullptr};
267 if (!queue_.pop(connection_ptr)) {
268 return nullptr;
269 }
270
271 return ConnectionUniquePtr{connection_ptr};
272}
273
274template <class Connection, class Derived>
275void ConnectionPoolBase<Connection, Derived>::DoRelease(
276 ConnectionUniquePtr connection_ptr) {
277 EnsureInitialized();
278 UASSERT(connection_ptr);
279
280 const auto is_broken = connection_ptr->IsBroken();
281 ConnectionRawPtr connection_raw_ptr = connection_ptr.release();
282 if (is_broken || !queue_.bounded_push(connection_raw_ptr)) {
283 Drop(connection_raw_ptr);
284 }
285}
286
287template <class Connection, class Derived>
288void ConnectionPoolBase<Connection, Derived>::PushConnection(
289 engine::Deadline deadline) {
290 EnsureInitialized();
291
292 engine::SemaphoreLock connecting_lock{connecting_semaphore_, deadline};
293 if (!connecting_lock.OwnsLock()) {
294 throw PoolWaitLimitExceededError{};
295 }
296
297 ConnectionUniquePtr connection_ptr =
298 CreateConnection(connecting_lock, deadline);
299 connecting_lock.Unlock();
300
301 ConnectionRawPtr connection_raw_ptr = connection_ptr.release();
302 if (!queue_.bounded_push(connection_raw_ptr)) {
303 Drop(connection_raw_ptr);
304 }
305}
306
307template <class Connection, class Derived>
308void ConnectionPoolBase<Connection, Derived>::Drop(
309 ConnectionRawPtr connection_ptr) noexcept {
310 EnsureInitialized();
311 std::default_delete<Connection>{}(connection_ptr);
312
313 alive_connections_.fetch_sub(1);
314
315 static_assert(noexcept(AsDerived().AccountConnectionDestroyed()),
316 "Please make AccountConnectionDestroyed() noexcept, "
317 "because it might get called in pool destructor and is "
318 "expected to be noexcept");
319 AsDerived().AccountConnectionDestroyed();
320}
321
322template <class Connection, class Derived>
323std::size_t
324ConnectionPoolBase<Connection, Derived>::AliveConnectionsCountApprox() const {
325 EnsureInitialized();
326
327 return alive_connections_.load();
328}
329
330template <class Connection, class Derived>
331void ConnectionPoolBase<Connection, Derived>::NotifyConnectionWontBeReleased() {
332 EnsureInitialized();
333
334 given_away_semaphore_.unlock_shared();
335 alive_connections_.fetch_sub(1);
336}
337
338template <class Connection, class Derived>
339void ConnectionPoolBase<Connection, Derived>::CleanupQueue() {
340 EnsureInitialized();
341
342 ConnectionRawPtr connection_ptr{nullptr};
343
344 while (queue_.pop(connection_ptr)) {
345 Drop(connection_ptr);
346 }
347}
348
349template <class Connection, class Derived>
350void ConnectionPoolBase<Connection, Derived>::EnsureInitialized() const {
352 initialized_,
353 "Please call Init before invoking any other methods on connection pool.");
354}
355
356template <class Connection, class Derived>
357void ConnectionPoolBase<Connection, Derived>::EnsureReset() const {
358 UASSERT_MSG(reset_,
359 "Please call Reset before base class is destroyed, otherwise no "
360 "cleanup is performed.");
361}
362
363} // namespace drivers::impl
364
365USERVER_NAMESPACE_END