summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorijon <[email protected]>2023-10-04 15:00:47 +0300
committerijon <[email protected]>2023-10-04 15:19:11 +0300
commitf8c324ac007288b73e563428a50601932dfd092a (patch)
treeddf40c7a010684503cd19baa2afd6ce9ad62d907
parent4ec4afb5d8ecbe817c5329f578618f5eadd489db (diff)
auditlog: add partial tests
Add functional tests for DML audit logging behavior. Include essential fixes in audit logging, DML audit settings support and minor fixes in functional testing framework. KIKIMR-18688
-rw-r--r--ydb/core/audit/audit_log.cpp4
-rw-r--r--ydb/core/audit/audit_log.h16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp3
-rw-r--r--ydb/tests/functional/audit/conftest.py5
-rw-r--r--ydb/tests/functional/audit/test_auditlog.py246
-rw-r--r--ydb/tests/functional/audit/ya.make28
-rw-r--r--ydb/tests/functional/ya.make1
-rw-r--r--ydb/tests/library/harness/kikimr_config.py2
-rw-r--r--ydb/tests/library/harness/ydb_fixtures.py8
9 files changed, 298 insertions, 15 deletions
diff --git a/ydb/core/audit/audit_log.cpp b/ydb/core/audit/audit_log.cpp
index df9e5f65e1e..24cea6f8457 100644
--- a/ydb/core/audit/audit_log.cpp
+++ b/ydb/core/audit/audit_log.cpp
@@ -15,9 +15,9 @@ THolder<NActors::IActor> CreateAuditWriter(TMap<NKikimrConfig::TAuditConfig::EFo
return MakeHolder<TAuditLogActor>(std::move(logBackends));
}
-void SendAuditLog(const NActors::TActorSystem* sys, TVector<std::pair<TStringBuf, TString>>& parts)
+void SendAuditLog(const NActors::TActorSystem* sys, TVector<std::pair<TString, TString>>&& parts)
{
- auto request = MakeHolder<TEvAuditLog::TEvWriteAuditLog>(Now(), parts);
+ auto request = MakeHolder<TEvAuditLog::TEvWriteAuditLog>(Now(), std::move(parts));
sys->Send(MakeAuditServiceID(), request.Release());
}
diff --git a/ydb/core/audit/audit_log.h b/ydb/core/audit/audit_log.h
index 48b3e0346b1..e17d664a28c 100644
--- a/ydb/core/audit/audit_log.h
+++ b/ydb/core/audit/audit_log.h
@@ -16,10 +16,10 @@
#define AUDIT_LOG_S(sys, expr) \
do { \
- if (::NKikimr::NAudit::AUDIT_LOG_ENABLED.load()) { \
- TVector<std::pair<TStringBuf, TString>> auditParts; \
+ if (::NKikimr::NAudit::AUDIT_LOG_ENABLED.load()) { \
+ TVector<std::pair<TString, TString>> auditParts; \
expr \
- ::NKikimr::NAudit::SendAuditLog(sys, auditParts); \
+ ::NKikimr::NAudit::SendAuditLog(sys, std::move(auditParts)); \
} \
} while (0) /**/
@@ -29,7 +29,7 @@
#define AUDIT_PART_COND(key, value, condition) \
do { \
if (condition && !value.empty()) { \
- auditParts.push_back({key, value}); \
+ auditParts.emplace_back(key, value); \
} \
} while (0);
@@ -63,9 +63,9 @@ struct TEvAuditLog
: public NActors::TEventLocal<TEvWriteAuditLog, EvWriteAuditLog>
{
TInstant Time;
- TVector<std::pair<TStringBuf, TString>> Parts;
+ TVector<std::pair<TString, TString>> Parts;
- TEvWriteAuditLog(TInstant time, TVector<std::pair<TStringBuf, TString>> parts)
+ TEvWriteAuditLog(TInstant time, TVector<std::pair<TString, TString>>&& parts)
: Time(time)
, Parts(std::move(parts))
{}
@@ -100,7 +100,7 @@ private:
const TActorContext& ctx);
static void WriteLog(
- const TString& log,
+ const TString& log,
const TVector<THolder<TLogBackend>>& logBackends);
static TString GetJsonLog(
@@ -114,7 +114,7 @@ private:
////////////////////////////////////////////////////////////////////////////////
-void SendAuditLog(const NActors::TActorSystem* sys, TVector<std::pair<TStringBuf, TString>>& parts);
+void SendAuditLog(const NActors::TActorSystem* sys, TVector<std::pair<TString, TString>>&& parts);
inline NActors::TActorId MakeAuditServiceID() {
return NActors::TActorId(0, TStringBuf("YDB_AUDIT"));
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
index e0ab1cecde8..dd221a3b0a5 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_side_effects.cpp
@@ -453,6 +453,9 @@ void TSideEffects::DoUpdateTenant(TSchemeShard* ss, NTabletFlatExecutor::TTransa
if (subDomain->GetDatabaseQuotas()) {
message->Record.MutableDatabaseQuotas()->CopyFrom(*subDomain->GetDatabaseQuotas());
}
+ if (const auto& auditSettings = subDomain->GetAuditSettings()) {
+ message->Record.MutableAuditSettings()->CopyFrom(*auditSettings);
+ }
hasChanges = true;
}
diff --git a/ydb/tests/functional/audit/conftest.py b/ydb/tests/functional/audit/conftest.py
new file mode 100644
index 00000000000..b6c9a98e3f4
--- /dev/null
+++ b/ydb/tests/functional/audit/conftest.py
@@ -0,0 +1,5 @@
+# XXX: setting of pytest_plugins should work if specified directly in test modules
+# but somehow it does not
+#
+# for ydb_{cluster, database, ...} fixture family
+pytest_plugins = 'ydb.tests.library.harness.ydb_fixtures'
diff --git a/ydb/tests/functional/audit/test_auditlog.py b/ydb/tests/functional/audit/test_auditlog.py
new file mode 100644
index 00000000000..e4541ea593e
--- /dev/null
+++ b/ydb/tests/functional/audit/test_auditlog.py
@@ -0,0 +1,246 @@
+# -*- coding: utf-8 -*-
+import logging
+import os
+import subprocess
+import sys
+
+import pytest
+
+from ydb import Driver, DriverConfig, SessionPool
+from ydb.tests.library.harness.util import LogLevels
+from ydb.tests.library.harness.ydb_fixtures import ydb_database_ctx
+from ydb.tests.oss.ydb_sdk_import import ydb
+
+logger = logging.getLogger(__name__)
+
+
+# local configuration for the ydb cluster (fetched by ydb_cluster_configuration fixture)
+CLUSTER_CONFIG = dict(
+ additional_log_configs={
+ # more logs
+ 'GRPC_PROXY': LogLevels.DEBUG,
+ 'GRPC_SERVER': LogLevels.DEBUG,
+ 'FLAT_TX_SCHEMESHARD': LogLevels.TRACE,
+ # less logs
+ 'KQP_PROXY': LogLevels.DEBUG,
+ 'KQP_GATEWAY': LogLevels.DEBUG,
+ 'KQP_WORKER': LogLevels.ERROR,
+ 'KQP_YQL': LogLevels.ERROR,
+ 'KQP_SESSION': LogLevels.ERROR,
+ 'KQP_COMPILE_ACTOR': LogLevels.ERROR,
+ 'TX_DATASHARD': LogLevels.ERROR,
+ 'HIVE': LogLevels.ERROR,
+ 'CMS_TENANTS': LogLevels.ERROR,
+ 'PERSQUEUE_CLUSTER_TRACKER': LogLevels.CRIT,
+ 'TX_PROXY_SCHEME_CACHE': LogLevels.CRIT,
+ 'TX_PROXY': LogLevels.CRIT,
+ },
+ enable_audit_log=True,
+ # extra_feature_flags=['enable_grpc_audit'],
+)
+
+
+def cluster_endpoint(cluster):
+ return f'{cluster.nodes[1].host}:{cluster.nodes[1].port}'
+
+
+def ydbcli_db_schema_exec(cluster, operation_proto):
+ endpoint = cluster_endpoint(cluster)
+ args = [
+ # cluster.binary_path,
+ cluster.nodes[1].binary_path,
+ f'--server=grpc://{endpoint}',
+ 'db',
+ 'schema',
+ 'exec',
+ operation_proto,
+ ]
+ r = subprocess.run(args, capture_output=True)
+ assert r.returncode == 0, r
+
+
+def alter_database_audit_settings(cluster, database_path, enable_dml_audit, expected_subjects=None):
+ alter_proto = r'''ModifyScheme {
+ OperationType: ESchemeOpAlterExtSubDomain
+ WorkingDir: "%s"
+ SubDomain {
+ Name: "%s"
+ AuditSettings {
+ EnableDmlAudit: %s
+ }
+ }
+ }''' % (
+ os.path.dirname(database_path),
+ os.path.basename(database_path),
+ enable_dml_audit,
+ )
+ ydbcli_db_schema_exec(cluster, alter_proto)
+
+
+class CaptureFileOutput:
+ def __init__(self, filename):
+ self.filename = filename
+
+ def __enter__(self):
+ self.saved_pos = os.path.getsize(self.filename)
+ return self
+
+ def __exit__(self, *exc):
+ with open(self.filename, 'r') as f:
+ f.seek(self.saved_pos)
+ self.captured = f.read()
+
+
[email protected](scope='module')
+def _database(ydb_cluster, ydb_root, request):
+ database_path = os.path.join(ydb_root, request.node.name)
+ with ydb_database_ctx(ydb_cluster, database_path):
+ yield database_path
+
+
[email protected](scope='module')
+def _client_session_pool_with_auth(ydb_cluster, _database):
+ with Driver(DriverConfig(cluster_endpoint(ydb_cluster), _database, auth_token='root@builtin')) as driver:
+ with SessionPool(driver) as pool:
+ yield pool
+
+
[email protected](scope='module')
+def _client_session_pool_no_auth(ydb_cluster, _database):
+ with Driver(DriverConfig(cluster_endpoint(ydb_cluster), _database, auth_token=None)) as driver:
+ with SessionPool(driver) as pool:
+ yield pool
+
+
[email protected](scope='module')
+def _client_session_pool_bad_auth(ydb_cluster, _database):
+ with Driver(DriverConfig(cluster_endpoint(ydb_cluster), _database, auth_token='__bad__@builtin')) as driver:
+ with SessionPool(driver) as pool:
+ yield pool
+
+
+def create_table(pool, table_path):
+ def f(s, table_path):
+ s.execute_scheme(fr'''
+ create table `{table_path}` (
+ id int64,
+ value int64,
+ primary key (id)
+ );
+ ''')
+ pool.retry_operation_sync(f, table_path=table_path, retry_settings=None)
+
+
+def fill_table(pool, table_path):
+ def f(s, table_path):
+ s.transaction().execute(fr'''
+ insert into `{table_path}` (id, value) values (1, 1), (2, 2)
+ ''')
+ pool.retry_operation_sync(f, table_path=table_path, retry_settings=None)
+
+
[email protected](scope='module')
+def prepared_test_env(ydb_cluster, _database, _client_session_pool_no_auth):
+ database_path = _database
+ table_path = os.path.join(database_path, 'test-table')
+ pool = _client_session_pool_no_auth
+
+ create_table(pool, table_path)
+ fill_table(pool, table_path)
+
+ capture_audit = CaptureFileOutput(ydb_cluster.config.audit_file_path)
+ print('AAA', capture_audit.filename, file=sys.stderr)
+ # print('AAA', ydb_cluster.config.binary_path, file=sys.stderr)
+
+ alter_database_audit_settings(ydb_cluster, database_path, enable_dml_audit=True)
+
+ return table_path, capture_audit
+
+
+def execute_data_query(pool, text):
+ pool.retry_operation_sync(lambda s: s.transaction().execute(text, commit_tx=True))
+
+
+QUERIES = [
+ r'''insert into `{table_path}` (id, value) values (100, 100), (101, 101)''',
+ r'''select id from `{table_path}`''',
+ r'''update `{table_path}` set value = 0 where id = 1''',
+ r'''delete from `{table_path}` where id = 2''',
+ r'''replace into `{table_path}` (id, value) values (2, 3), (3, 3)''',
+ r'''upsert into `{table_path}` (id, value) values (4, 4), (5, 5)''',
+]
+
+
[email protected]("query_template", QUERIES, ids=lambda x: x.split(maxsplit=1)[0])
+def test_single_dml_query_logged(query_template, prepared_test_env, _client_session_pool_with_auth):
+ table_path, capture_audit = prepared_test_env
+
+ pool = _client_session_pool_with_auth
+ query_text = query_template.format(table_path=table_path)
+
+ with capture_audit:
+ execute_data_query(pool, query_text)
+
+ print(capture_audit.captured, file=sys.stderr)
+ assert query_text in capture_audit.captured
+
+
+def test_dml_begin_commit_logged(prepared_test_env, _client_session_pool_with_auth):
+ table_path, capture_audit = prepared_test_env
+
+ pool = _client_session_pool_with_auth
+
+ with pool.checkout() as session:
+ with capture_audit:
+ tx = session.transaction().begin()
+ tx.execute(fr'''update `{table_path}` set value = 0 where id = 1''')
+ tx.commit()
+
+ print(capture_audit.captured, file=sys.stderr)
+ assert 'BeginTransaction' in capture_audit.captured
+ assert 'CommitTransaction' in capture_audit.captured
+
+
+# TODO: fix ydbd crash on exit
+# def test_dml_begin_rollback_logged(prepared_test_env, _client_session_pool_with_auth):
+# table_path, capture_audit = prepared_test_env
+#
+# pool = _client_session_pool_with_auth
+#
+# with pool.checkout() as session:
+# with capture_audit:
+# tx = session.transaction().begin()
+# tx.execute(fr'''update `{table_path}` set value = 0 where id = 1''')
+# tx.rollback()
+#
+# print(capture_audit.captured, file=sys.stderr)
+# assert 'BeginTransaction' in capture_audit.captured
+# assert 'RollbackTransaction' in capture_audit.captured
+
+
+def test_dml_requests_arent_logged_when_anonymous(prepared_test_env, _client_session_pool_no_auth):
+ table_path, capture_audit = prepared_test_env
+ pool = _client_session_pool_no_auth
+
+ with capture_audit:
+ for i in QUERIES:
+ query_text = i.format(table_path=table_path)
+ execute_data_query(pool, query_text)
+
+ print(capture_audit.captured, file=sys.stderr)
+ assert len(capture_audit.captured) == 0, capture_audit.captured
+
+
+def test_dml_requests_logged_when_unauthorized(prepared_test_env, _client_session_pool_bad_auth):
+ table_path, capture_audit = prepared_test_env
+ pool = _client_session_pool_bad_auth
+
+ for i in QUERIES:
+ query_text = i.format(table_path=table_path)
+ with pool.checkout() as session:
+ tx = session.transaction()
+ with capture_audit:
+ with pytest.raises(ydb.issues.SchemeError):
+ tx.execute(query_text, commit_tx=True)
+ print(capture_audit.captured, file=sys.stderr)
+ assert query_text in capture_audit.captured
diff --git a/ydb/tests/functional/audit/ya.make b/ydb/tests/functional/audit/ya.make
new file mode 100644
index 00000000000..30a62b1dd30
--- /dev/null
+++ b/ydb/tests/functional/audit/ya.make
@@ -0,0 +1,28 @@
+PY3TEST()
+
+SPLIT_FACTOR(30)
+FORK_SUBTESTS()
+FORK_TEST_FILES()
+TIMEOUT(600)
+SIZE(MEDIUM)
+
+ENV(YDB_DRIVER_BINARY="ydb/apps/ydbd/ydbd")
+
+TEST_SRCS(
+ conftest.py
+ test_auditlog.py
+)
+
+DEPENDS(
+ ydb/apps/ydbd
+)
+
+PEERDIR(
+ ydb/tests/library
+ ydb/tests/oss/ydb_sdk_import
+ ydb/public/sdk/python
+)
+
+REQUIREMENTS(ram:10)
+
+END()
diff --git a/ydb/tests/functional/ya.make b/ydb/tests/functional/ya.make
index 72438e3093d..43651176494 100644
--- a/ydb/tests/functional/ya.make
+++ b/ydb/tests/functional/ya.make
@@ -1,5 +1,6 @@
RECURSE(
api
+ audit
autoconfig
blobstorage
canonical
diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py
index c2de76a6dd5..27b7b10efd1 100644
--- a/ydb/tests/library/harness/kikimr_config.py
+++ b/ydb/tests/library/harness/kikimr_config.py
@@ -444,7 +444,7 @@ class KikimrConfigGenerator(object):
@property
def audit_file_path(self):
- return self.yaml_config.get('audit_config', {}).get('audit_file_path')
+ return self.yaml_config.get('audit_config', {}).get('file_backend', {}).get('file_path')
@property
def nbs_enable(self):
diff --git a/ydb/tests/library/harness/ydb_fixtures.py b/ydb/tests/library/harness/ydb_fixtures.py
index 766663941be..5a7fcef94d1 100644
--- a/ydb/tests/library/harness/ydb_fixtures.py
+++ b/ydb/tests/library/harness/ydb_fixtures.py
@@ -130,8 +130,8 @@ def ydb_endpoint(ydb_cluster):
@pytest.fixture(scope='function')
def ydb_client(ydb_endpoint, request):
- def _make_driver(database_path):
- driver_config = DriverConfig(ydb_endpoint, database_path)
+ def _make_driver(database_path, **kwargs):
+ driver_config = DriverConfig(ydb_endpoint, database_path, **kwargs)
driver = Driver(driver_config)
def stop_driver():
@@ -144,8 +144,8 @@ def ydb_client(ydb_endpoint, request):
@pytest.fixture(scope='function')
def ydb_client_session(ydb_client, request):
- def _make_pool(database_path):
- driver = ydb_client(database_path)
+ def _make_pool(database_path, **kwargs):
+ driver = ydb_client(database_path, **kwargs)
pool = SessionPool(driver)
def stop_pool():