diff options
author | ijon <[email protected]> | 2023-10-04 15:00:47 +0300 |
---|---|---|
committer | ijon <[email protected]> | 2023-10-04 15:19:11 +0300 |
commit | f8c324ac007288b73e563428a50601932dfd092a (patch) | |
tree | ddf40c7a010684503cd19baa2afd6ce9ad62d907 | |
parent | 4ec4afb5d8ecbe817c5329f578618f5eadd489db (diff) |
auditlog: add partial tests
Add functional tests for DML audit logging behavior.
Include essential fixes in audit logging, DML audit settings support
and minor fixes in functional testing framework.
KIKIMR-18688
-rw-r--r-- | ydb/core/audit/audit_log.cpp | 4 | ||||
-rw-r--r-- | ydb/core/audit/audit_log.h | 16 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp | 3 | ||||
-rw-r--r-- | ydb/tests/functional/audit/conftest.py | 5 | ||||
-rw-r--r-- | ydb/tests/functional/audit/test_auditlog.py | 246 | ||||
-rw-r--r-- | ydb/tests/functional/audit/ya.make | 28 | ||||
-rw-r--r-- | ydb/tests/functional/ya.make | 1 | ||||
-rw-r--r-- | ydb/tests/library/harness/kikimr_config.py | 2 | ||||
-rw-r--r-- | ydb/tests/library/harness/ydb_fixtures.py | 8 |
9 files changed, 298 insertions, 15 deletions
diff --git a/ydb/core/audit/audit_log.cpp b/ydb/core/audit/audit_log.cpp index df9e5f65e1e..24cea6f8457 100644 --- a/ydb/core/audit/audit_log.cpp +++ b/ydb/core/audit/audit_log.cpp @@ -15,9 +15,9 @@ THolder<NActors::IActor> CreateAuditWriter(TMap<NKikimrConfig::TAuditConfig::EFo return MakeHolder<TAuditLogActor>(std::move(logBackends)); } -void SendAuditLog(const NActors::TActorSystem* sys, TVector<std::pair<TStringBuf, TString>>& parts) +void SendAuditLog(const NActors::TActorSystem* sys, TVector<std::pair<TString, TString>>&& parts) { - auto request = MakeHolder<TEvAuditLog::TEvWriteAuditLog>(Now(), parts); + auto request = MakeHolder<TEvAuditLog::TEvWriteAuditLog>(Now(), std::move(parts)); sys->Send(MakeAuditServiceID(), request.Release()); } diff --git a/ydb/core/audit/audit_log.h b/ydb/core/audit/audit_log.h index 48b3e0346b1..e17d664a28c 100644 --- a/ydb/core/audit/audit_log.h +++ b/ydb/core/audit/audit_log.h @@ -16,10 +16,10 @@ #define AUDIT_LOG_S(sys, expr) \ do { \ - if (::NKikimr::NAudit::AUDIT_LOG_ENABLED.load()) { \ - TVector<std::pair<TStringBuf, TString>> auditParts; \ + if (::NKikimr::NAudit::AUDIT_LOG_ENABLED.load()) { \ + TVector<std::pair<TString, TString>> auditParts; \ expr \ - ::NKikimr::NAudit::SendAuditLog(sys, auditParts); \ + ::NKikimr::NAudit::SendAuditLog(sys, std::move(auditParts)); \ } \ } while (0) /**/ @@ -29,7 +29,7 @@ #define AUDIT_PART_COND(key, value, condition) \ do { \ if (condition && !value.empty()) { \ - auditParts.push_back({key, value}); \ + auditParts.emplace_back(key, value); \ } \ } while (0); @@ -63,9 +63,9 @@ struct TEvAuditLog : public NActors::TEventLocal<TEvWriteAuditLog, EvWriteAuditLog> { TInstant Time; - TVector<std::pair<TStringBuf, TString>> Parts; + TVector<std::pair<TString, TString>> Parts; - TEvWriteAuditLog(TInstant time, TVector<std::pair<TStringBuf, TString>> parts) + TEvWriteAuditLog(TInstant time, TVector<std::pair<TString, TString>>&& parts) : Time(time) , Parts(std::move(parts)) {} @@ -100,7 +100,7 @@ private: const TActorContext& ctx); static void WriteLog( - const TString& log, + const TString& log, const TVector<THolder<TLogBackend>>& logBackends); static TString GetJsonLog( @@ -114,7 +114,7 @@ private: //////////////////////////////////////////////////////////////////////////////// -void SendAuditLog(const NActors::TActorSystem* sys, TVector<std::pair<TStringBuf, TString>>& parts); +void SendAuditLog(const NActors::TActorSystem* sys, TVector<std::pair<TString, TString>>&& parts); inline NActors::TActorId MakeAuditServiceID() { return NActors::TActorId(0, TStringBuf("YDB_AUDIT")); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp index e0ab1cecde8..dd221a3b0a5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp @@ -453,6 +453,9 @@ void TSideEffects::DoUpdateTenant(TSchemeShard* ss, NTabletFlatExecutor::TTransa if (subDomain->GetDatabaseQuotas()) { message->Record.MutableDatabaseQuotas()->CopyFrom(*subDomain->GetDatabaseQuotas()); } + if (const auto& auditSettings = subDomain->GetAuditSettings()) { + message->Record.MutableAuditSettings()->CopyFrom(*auditSettings); + } hasChanges = true; } diff --git a/ydb/tests/functional/audit/conftest.py b/ydb/tests/functional/audit/conftest.py new file mode 100644 index 00000000000..b6c9a98e3f4 --- /dev/null +++ b/ydb/tests/functional/audit/conftest.py @@ -0,0 +1,5 @@ +# XXX: setting of pytest_plugins should work if specified directly in test modules +# but somehow it does not +# +# for ydb_{cluster, database, ...} fixture family +pytest_plugins = 'ydb.tests.library.harness.ydb_fixtures' diff --git a/ydb/tests/functional/audit/test_auditlog.py b/ydb/tests/functional/audit/test_auditlog.py new file mode 100644 index 00000000000..e4541ea593e --- /dev/null +++ b/ydb/tests/functional/audit/test_auditlog.py @@ -0,0 +1,246 @@ +# -*- coding: utf-8 -*- +import logging +import os +import subprocess +import sys + +import pytest + +from ydb import Driver, DriverConfig, SessionPool +from ydb.tests.library.harness.util import LogLevels +from ydb.tests.library.harness.ydb_fixtures import ydb_database_ctx +from ydb.tests.oss.ydb_sdk_import import ydb + +logger = logging.getLogger(__name__) + + +# local configuration for the ydb cluster (fetched by ydb_cluster_configuration fixture) +CLUSTER_CONFIG = dict( + additional_log_configs={ + # more logs + 'GRPC_PROXY': LogLevels.DEBUG, + 'GRPC_SERVER': LogLevels.DEBUG, + 'FLAT_TX_SCHEMESHARD': LogLevels.TRACE, + # less logs + 'KQP_PROXY': LogLevels.DEBUG, + 'KQP_GATEWAY': LogLevels.DEBUG, + 'KQP_WORKER': LogLevels.ERROR, + 'KQP_YQL': LogLevels.ERROR, + 'KQP_SESSION': LogLevels.ERROR, + 'KQP_COMPILE_ACTOR': LogLevels.ERROR, + 'TX_DATASHARD': LogLevels.ERROR, + 'HIVE': LogLevels.ERROR, + 'CMS_TENANTS': LogLevels.ERROR, + 'PERSQUEUE_CLUSTER_TRACKER': LogLevels.CRIT, + 'TX_PROXY_SCHEME_CACHE': LogLevels.CRIT, + 'TX_PROXY': LogLevels.CRIT, + }, + enable_audit_log=True, + # extra_feature_flags=['enable_grpc_audit'], +) + + +def cluster_endpoint(cluster): + return f'{cluster.nodes[1].host}:{cluster.nodes[1].port}' + + +def ydbcli_db_schema_exec(cluster, operation_proto): + endpoint = cluster_endpoint(cluster) + args = [ + # cluster.binary_path, + cluster.nodes[1].binary_path, + f'--server=grpc://{endpoint}', + 'db', + 'schema', + 'exec', + operation_proto, + ] + r = subprocess.run(args, capture_output=True) + assert r.returncode == 0, r + + +def alter_database_audit_settings(cluster, database_path, enable_dml_audit, expected_subjects=None): + alter_proto = r'''ModifyScheme { + OperationType: ESchemeOpAlterExtSubDomain + WorkingDir: "%s" + SubDomain { + Name: "%s" + AuditSettings { + EnableDmlAudit: %s + } + } + }''' % ( + os.path.dirname(database_path), + os.path.basename(database_path), + enable_dml_audit, + ) + ydbcli_db_schema_exec(cluster, alter_proto) + + +class CaptureFileOutput: + def __init__(self, filename): + self.filename = filename + + def __enter__(self): + self.saved_pos = os.path.getsize(self.filename) + return self + + def __exit__(self, *exc): + with open(self.filename, 'r') as f: + f.seek(self.saved_pos) + self.captured = f.read() + + [email protected](scope='module') +def _database(ydb_cluster, ydb_root, request): + database_path = os.path.join(ydb_root, request.node.name) + with ydb_database_ctx(ydb_cluster, database_path): + yield database_path + + [email protected](scope='module') +def _client_session_pool_with_auth(ydb_cluster, _database): + with Driver(DriverConfig(cluster_endpoint(ydb_cluster), _database, auth_token='root@builtin')) as driver: + with SessionPool(driver) as pool: + yield pool + + [email protected](scope='module') +def _client_session_pool_no_auth(ydb_cluster, _database): + with Driver(DriverConfig(cluster_endpoint(ydb_cluster), _database, auth_token=None)) as driver: + with SessionPool(driver) as pool: + yield pool + + [email protected](scope='module') +def _client_session_pool_bad_auth(ydb_cluster, _database): + with Driver(DriverConfig(cluster_endpoint(ydb_cluster), _database, auth_token='__bad__@builtin')) as driver: + with SessionPool(driver) as pool: + yield pool + + +def create_table(pool, table_path): + def f(s, table_path): + s.execute_scheme(fr''' + create table `{table_path}` ( + id int64, + value int64, + primary key (id) + ); + ''') + pool.retry_operation_sync(f, table_path=table_path, retry_settings=None) + + +def fill_table(pool, table_path): + def f(s, table_path): + s.transaction().execute(fr''' + insert into `{table_path}` (id, value) values (1, 1), (2, 2) + ''') + pool.retry_operation_sync(f, table_path=table_path, retry_settings=None) + + [email protected](scope='module') +def prepared_test_env(ydb_cluster, _database, _client_session_pool_no_auth): + database_path = _database + table_path = os.path.join(database_path, 'test-table') + pool = _client_session_pool_no_auth + + create_table(pool, table_path) + fill_table(pool, table_path) + + capture_audit = CaptureFileOutput(ydb_cluster.config.audit_file_path) + print('AAA', capture_audit.filename, file=sys.stderr) + # print('AAA', ydb_cluster.config.binary_path, file=sys.stderr) + + alter_database_audit_settings(ydb_cluster, database_path, enable_dml_audit=True) + + return table_path, capture_audit + + +def execute_data_query(pool, text): + pool.retry_operation_sync(lambda s: s.transaction().execute(text, commit_tx=True)) + + +QUERIES = [ + r'''insert into `{table_path}` (id, value) values (100, 100), (101, 101)''', + r'''select id from `{table_path}`''', + r'''update `{table_path}` set value = 0 where id = 1''', + r'''delete from `{table_path}` where id = 2''', + r'''replace into `{table_path}` (id, value) values (2, 3), (3, 3)''', + r'''upsert into `{table_path}` (id, value) values (4, 4), (5, 5)''', +] + + [email protected]("query_template", QUERIES, ids=lambda x: x.split(maxsplit=1)[0]) +def test_single_dml_query_logged(query_template, prepared_test_env, _client_session_pool_with_auth): + table_path, capture_audit = prepared_test_env + + pool = _client_session_pool_with_auth + query_text = query_template.format(table_path=table_path) + + with capture_audit: + execute_data_query(pool, query_text) + + print(capture_audit.captured, file=sys.stderr) + assert query_text in capture_audit.captured + + +def test_dml_begin_commit_logged(prepared_test_env, _client_session_pool_with_auth): + table_path, capture_audit = prepared_test_env + + pool = _client_session_pool_with_auth + + with pool.checkout() as session: + with capture_audit: + tx = session.transaction().begin() + tx.execute(fr'''update `{table_path}` set value = 0 where id = 1''') + tx.commit() + + print(capture_audit.captured, file=sys.stderr) + assert 'BeginTransaction' in capture_audit.captured + assert 'CommitTransaction' in capture_audit.captured + + +# TODO: fix ydbd crash on exit +# def test_dml_begin_rollback_logged(prepared_test_env, _client_session_pool_with_auth): +# table_path, capture_audit = prepared_test_env +# +# pool = _client_session_pool_with_auth +# +# with pool.checkout() as session: +# with capture_audit: +# tx = session.transaction().begin() +# tx.execute(fr'''update `{table_path}` set value = 0 where id = 1''') +# tx.rollback() +# +# print(capture_audit.captured, file=sys.stderr) +# assert 'BeginTransaction' in capture_audit.captured +# assert 'RollbackTransaction' in capture_audit.captured + + +def test_dml_requests_arent_logged_when_anonymous(prepared_test_env, _client_session_pool_no_auth): + table_path, capture_audit = prepared_test_env + pool = _client_session_pool_no_auth + + with capture_audit: + for i in QUERIES: + query_text = i.format(table_path=table_path) + execute_data_query(pool, query_text) + + print(capture_audit.captured, file=sys.stderr) + assert len(capture_audit.captured) == 0, capture_audit.captured + + +def test_dml_requests_logged_when_unauthorized(prepared_test_env, _client_session_pool_bad_auth): + table_path, capture_audit = prepared_test_env + pool = _client_session_pool_bad_auth + + for i in QUERIES: + query_text = i.format(table_path=table_path) + with pool.checkout() as session: + tx = session.transaction() + with capture_audit: + with pytest.raises(ydb.issues.SchemeError): + tx.execute(query_text, commit_tx=True) + print(capture_audit.captured, file=sys.stderr) + assert query_text in capture_audit.captured diff --git a/ydb/tests/functional/audit/ya.make b/ydb/tests/functional/audit/ya.make new file mode 100644 index 00000000000..30a62b1dd30 --- /dev/null +++ b/ydb/tests/functional/audit/ya.make @@ -0,0 +1,28 @@ +PY3TEST() + +SPLIT_FACTOR(30) +FORK_SUBTESTS() +FORK_TEST_FILES() +TIMEOUT(600) +SIZE(MEDIUM) + +ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd") + +TEST_SRCS( + conftest.py + test_auditlog.py +) + +DEPENDS( + ydb/apps/ydbd +) + +PEERDIR( + ydb/tests/library + ydb/tests/oss/ydb_sdk_import + ydb/public/sdk/python +) + +REQUIREMENTS(ram:10) + +END() diff --git a/ydb/tests/functional/ya.make b/ydb/tests/functional/ya.make index 72438e3093d..43651176494 100644 --- a/ydb/tests/functional/ya.make +++ b/ydb/tests/functional/ya.make @@ -1,5 +1,6 @@ RECURSE( api + audit autoconfig blobstorage canonical diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py index c2de76a6dd5..27b7b10efd1 100644 --- a/ydb/tests/library/harness/kikimr_config.py +++ b/ydb/tests/library/harness/kikimr_config.py @@ -444,7 +444,7 @@ class KikimrConfigGenerator(object): @property def audit_file_path(self): - return self.yaml_config.get('audit_config', {}).get('audit_file_path') + return self.yaml_config.get('audit_config', {}).get('file_backend', {}).get('file_path') @property def nbs_enable(self): diff --git a/ydb/tests/library/harness/ydb_fixtures.py b/ydb/tests/library/harness/ydb_fixtures.py index 766663941be..5a7fcef94d1 100644 --- a/ydb/tests/library/harness/ydb_fixtures.py +++ b/ydb/tests/library/harness/ydb_fixtures.py @@ -130,8 +130,8 @@ def ydb_endpoint(ydb_cluster): @pytest.fixture(scope='function') def ydb_client(ydb_endpoint, request): - def _make_driver(database_path): - driver_config = DriverConfig(ydb_endpoint, database_path) + def _make_driver(database_path, **kwargs): + driver_config = DriverConfig(ydb_endpoint, database_path, **kwargs) driver = Driver(driver_config) def stop_driver(): @@ -144,8 +144,8 @@ def ydb_client(ydb_endpoint, request): @pytest.fixture(scope='function') def ydb_client_session(ydb_client, request): - def _make_pool(database_path): - driver = ydb_client(database_path) + def _make_pool(database_path, **kwargs): + driver = ydb_client(database_path, **kwargs) pool = SessionPool(driver) def stop_pool(): |