userver: /data/code/userver/testsuite/pytest_plugins/pytest_userver/plugins/ydb/ydbsupport.py Source File
Loading...
Searching...
No Matches
ydbsupport.py
1# pylint: disable=redefined-outer-name
2import concurrent.futures
3import contextlib
4import os
5import pathlib
6import subprocess
7from typing import List
8from typing import Optional
9
10import pytest
11import yaml
12
13from testsuite.environment import shell
14
15from pytest_userver import sql
16from . import client
17from . import discover
18from . import service
19
20if hasattr(yaml, 'CLoader'):
21 _YamlLoader = yaml.CLoader # type: ignore
22else:
23 _YamlLoader = yaml.Loader # type: ignore
24
25USERVER_CONFIG_HOOKS = ['userver_config_ydb']
26
27
28@pytest.fixture
29def ydb(_ydb_client, _ydb_init):
30 return _ydb_client
31
32
33@pytest.fixture(scope='session')
34def _ydb_client(_ydb_client_pool):
35 with _ydb_client_pool() as ydb_client:
36 yield ydb_client
37
38
39@pytest.fixture(scope='session')
40def _ydb_client_pool(_ydb_service, ydb_service_settings):
41 endpoint = '{}:{}'.format(
42 ydb_service_settings.host, ydb_service_settings.grpc_port,
43 )
44 pool = []
45
46 @contextlib.contextmanager
47 def get_client():
48 try:
49 ydb_client = pool.pop()
50 except IndexError:
51 ydb_client = client.YdbClient(
52 endpoint, ydb_service_settings.database,
53 )
54 try:
55 yield ydb_client
56 finally:
57 pool.append(ydb_client)
58
59 return get_client
60
61
62def pytest_service_register(register_service):
63 register_service('ydb', service.create_ydb_service)
64
65
66@pytest.fixture(scope='session')
67def _ydb_service(pytestconfig, ensure_service_started, ydb_service_settings):
68 if os.environ.get('YDB_ENDPOINT') or pytestconfig.option.ydb_host:
69 return
70 ensure_service_started('ydb', settings=ydb_service_settings)
71
72
73@pytest.fixture(scope='session')
74def ydb_service_settings(pytestconfig) -> service.ServiceSettings:
75 endpoint_from_env = os.environ.get('YDB_ENDPOINT')
76 database = os.environ.get('YDB_DATABASE', 'local')
77
78 if endpoint_from_env:
79 host, grpc_port = endpoint_from_env.split(':', 1)
80 return service.ServiceSettings(
81 host=host,
82 grpc_port=grpc_port,
83 mon_port=None,
84 ic_port=None,
85 database=database,
86 )
87
88 if pytestconfig.option.ydb_host:
89 return service.ServiceSettings(
90 host=pytestconfig.option.ydb_host,
91 grpc_port=pytestconfig.option.ydb_grpc_port,
92 mon_port=pytestconfig.option.ydb_mon_port,
93 ic_port=pytestconfig.option.ydb_ic_port,
94 database=database,
95 )
96 return service.get_service_settings()
97
98
99@pytest.fixture(scope='session')
100def _ydb_service_schemas(service_source_dir):
101 service_schemas_ydb = service_source_dir / 'ydb' / 'schemas'
102 return discover.find_schemas([service_schemas_ydb])
103
104
105@pytest.fixture(scope='session')
106def ydb_settings_substitute(ydb_service_settings):
107 def secdist_settings(*args, **kwargs):
108 return {
109 'endpoint': '{}:{}'.format(
110 ydb_service_settings.host, ydb_service_settings.grpc_port,
111 ),
112 'database': '/{}'.format(ydb_service_settings.database),
113 'token': '',
114 }
115
116 return {'ydb_settings': secdist_settings}
117
118
119@pytest.fixture(scope='session')
120def _ydb_state():
121 class State:
122 def __init__(self):
123 self.init = False
124 self.tables = []
125
126 return State()
127
128
129@pytest.fixture(scope='session')
130def ydb_migrate_dir(service_source_dir) -> pathlib.Path:
131 return service_source_dir / 'ydb' / 'migrations'
132
133
134def _ydb_migrate(ydb_service_settings, ydb_migrate_dir):
135 if not ydb_migrate_dir.exists():
136 return
137 if not list(ydb_migrate_dir.iterdir()):
138 return
139
140 if not _get_goose():
141 return
142
143 host = ydb_service_settings.host
144 port = ydb_service_settings.grpc_port
145
146 command = [
147 str(_get_goose()),
148 '-dir',
149 str(ydb_migrate_dir),
150 'ydb',
151 (
152 f'grpc://{host}:{port}/local?go_query_mode=scripting&'
153 'go_fake_tx=scripting&go_query_bind=declare,numeric'
154 ),
155 'up',
156 ]
157 try:
158 shell.execute(command, verbose=True, command_alias='ydb/migrations')
159 except shell.SubprocessFailed as exc:
160 raise Exception(f'YDB run migration failed:\n\n{exc}')
161
162
163def _get_goose() -> Optional[pathlib.Path]:
164 try:
165 import yatest
166
167 return yatest.common.runtime.binary_path(
168 'contrib/go/patched/goose/cmd/goose/goose',
169 )
170 except ImportError:
171 return None
172
173
174def _ydb_fetch_table_names(ydb_service_settings) -> List[str]:
175 try:
176 import yatest
177
178 host = ydb_service_settings.host
179 port = ydb_service_settings.grpc_port
180 output = subprocess.check_output(
181 [
182 yatest.common.runtime.binary_path('contrib/ydb/apps/ydb/ydb'),
183 '-e',
184 f'grpc://{host}:{port}',
185 '-d',
186 '/local',
187 'scheme',
188 'ls',
189 '-lR',
190 ],
191 encoding='utf-8',
192 )
193 tables = []
194
195 for line in output.split('\n'):
196 if ' table ' not in line:
197 continue
198 if '.sys' in line:
199 continue
200 path = line.split('│')[6].strip()
201 tables.append(path)
202 return tables
203 except ImportError:
204 return []
205
206
207@pytest.fixture(scope='session')
208def _ydb_prepare(
209 _ydb_client,
210 _ydb_service_schemas,
211 ydb_service_settings,
212 _ydb_state,
213 ydb_migrate_dir,
214):
215 if _ydb_service_schemas and ydb_migrate_dir.exists():
216 raise Exception(
217 'Both ydb/schema and ydb/migrations exist, '
218 'which are mutually exclusive',
219 )
220
221 # testsuite legacy
222 for schema_path in _ydb_service_schemas:
223 with open(schema_path) as fp:
224 tables_schemas = yaml.load(fp.read(), Loader=_YamlLoader)
225 for table_schema in tables_schemas:
226 client.drop_table(_ydb_client, table_schema['path'])
227 client.create_table(_ydb_client, table_schema)
228 _ydb_state.tables.append(table_schema['path'])
229
230 # goose
231 _ydb_migrate(ydb_service_settings, ydb_migrate_dir)
232
233 _ydb_state.init = True
234
235
236@pytest.fixture(scope='session')
237def _ydb_tables(_ydb_state, _ydb_prepare, ydb_service_settings):
238 tables = {
239 *_ydb_state.tables,
240 *_ydb_fetch_table_names(ydb_service_settings),
241 }
242 return tuple(sorted(tables))
243
244
245@pytest.fixture
246def _ydb_init(
247 request,
248 _ydb_client,
249 _ydb_state,
250 ydb_service_settings,
251 _ydb_prepare,
252 _ydb_tables,
253 _ydb_client_pool,
254 load,
255):
256 def ydb_mark_queries(files=(), queries=()):
257 result_queries = []
258 for path in files:
259 result_queries.append(load(path))
260 result_queries.extend(queries)
261 return result_queries
262
263 def drop_table(table):
264 with _ydb_client_pool() as ydb_client:
265 ydb_client.execute('DELETE FROM `{}`'.format(table))
266
267 if _ydb_tables:
268 with concurrent.futures.ThreadPoolExecutor(
269 max_workers=len(_ydb_tables),
270 ) as executer:
271 executer.map(drop_table, _ydb_tables)
272
273 for mark in request.node.iter_markers('ydb'):
274 queries = ydb_mark_queries(**mark.kwargs)
275 for query in queries:
276 _ydb_client.execute(query)
277
278
279@pytest.fixture
280def userver_ydb_trx(testpoint) -> sql.RegisteredTrx:
281 """
282 The fixture maintains transaction fault injection state using
283 RegisteredTrx class.
284
285 @see pytest_userver.sql.RegisteredTrx
286
287 @snippet integration_tests/tests/test_trx_failure.py fault injection
288
289 @ingroup userver_testsuite_fixtures
290 """
291
292 registered = sql.RegisteredTrx()
293
294 @testpoint('ydb_trx_commit')
295 def _pg_trx_tp(data):
296 should_fail = registered.is_failure_enabled(data['trx_name'])
297 return {'trx_should_fail': should_fail}
298
299 return registered
300
301
302@pytest.fixture(scope='session')
303def userver_config_ydb(ydb_service_settings):
304 """
305 Returns a function that adjusts the static configuration file for testsuite.
306
307 For all `ydb.databases`, sets `endpoint` and `database` to the local test
308 YDB instance.
309
310 @ingroup userver_testsuite_fixtures
311 """
312
313 endpoint = f'{ydb_service_settings.host}:{ydb_service_settings.grpc_port}'
314 database = (
315 '' if ydb_service_settings.database.startswith('/') else '/'
316 ) + ydb_service_settings.database
317
318 def patch_config(config, config_vars):
319 ydb_component = config['components_manager']['components']['ydb']
320 if isinstance(ydb_component, str):
321 ydb_component = config_vars[ydb_component[1:]]
322 databases = ydb_component['databases']
323 for dbname, dbconfig in databases.items():
324 dbconfig['endpoint'] = endpoint
325 dbconfig['database'] = database
326
327 return patch_config