summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIuliia Sidorina <[email protected]>2024-08-29 15:05:05 +0200
committerGitHub <[email protected]>2024-08-29 13:05:05 +0000
commit9c8c9516f55af2e67efc378aff0a4c1219781e21 (patch)
treec1027a068d776f8e6ca52e17d077dc1ff1c175dc
parent3f33b88e1db09db5ad99304b3379b95f12ccb068 (diff)
logging(ydb): add data integrity logs to grpc and session actor (#6049)
-rw-r--r--ydb/core/data_integrity_trails/data_integrity_trails.h51
-rw-r--r--ydb/core/grpc_services/grpc_integrity_trails.h265
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp6
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_script.cpp8
-rw-r--r--ydb/core/grpc_services/rpc_begin_transaction.cpp4
-rw-r--r--ydb/core/grpc_services/rpc_commit_transaction.cpp4
-rw-r--r--ydb/core/grpc_services/rpc_execute_data_query.cpp4
-rw-r--r--ydb/core/grpc_services/rpc_execute_yql_script.cpp4
-rw-r--r--ydb/core/grpc_services/rpc_rollback_transaction.cpp5
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp6
-rw-r--r--ydb/core/kqp/common/kqp_data_integrity_trails.h102
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp8
-rw-r--r--ydb/core/kqp/gateway/kqp_gateway.h1
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h17
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp34
-rw-r--r--ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp104
-rw-r--r--ydb/core/kqp/ut/data_integrity/ya.make23
-rw-r--r--ydb/core/kqp/ut/ya.make1
-rw-r--r--ydb/library/services/services.proto2
19 files changed, 630 insertions, 19 deletions
diff --git a/ydb/core/data_integrity_trails/data_integrity_trails.h b/ydb/core/data_integrity_trails/data_integrity_trails.h
new file mode 100644
index 00000000000..2a94ec9b3da
--- /dev/null
+++ b/ydb/core/data_integrity_trails/data_integrity_trails.h
@@ -0,0 +1,51 @@
+#pragma once
+
+namespace NKikimr {
+namespace NDataIntegrity {
+
+inline void LogKeyValue(const TString& key, const TString& value, TStringStream& ss, bool last = false) {
+ ss << key << ": " << (value.Empty() ? "Empty" : value) << (last ? "" : ",");
+}
+
+template <class TransactionSettings>
+inline void LogTxSettings(const TransactionSettings& txSettings, TStringStream& ss) {
+ switch (txSettings.tx_mode_case()) {
+ case TransactionSettings::kSerializableReadWrite:
+ LogKeyValue("TxMode", "SerializableReadWrite", ss);
+ break;
+ case TransactionSettings::kOnlineReadOnly:
+ LogKeyValue("TxMode", "OnlineReadOnly", ss);
+ LogKeyValue("AllowInconsistentReads", txSettings.online_read_only().allow_inconsistent_reads() ? "true" : "false", ss);
+ break;
+ case TransactionSettings::kStaleReadOnly:
+ LogKeyValue("TxMode", "StaleReadOnly", ss);
+ break;
+ case TransactionSettings::kSnapshotReadOnly:
+ LogKeyValue("TxMode", "SnapshotReadOnly", ss);
+ break;
+ case TransactionSettings::TX_MODE_NOT_SET:
+ LogKeyValue("TxMode", "Undefined", ss);
+ break;
+ }
+}
+
+template <class TxControl>
+inline void LogTxControl(const TxControl& txControl, TStringStream& ss)
+{
+ switch (txControl.tx_selector_case()) {
+ case TxControl::kTxId:
+ LogKeyValue("TxId", txControl.tx_id(), ss);
+ break;
+ case TxControl::kBeginTx:
+ LogKeyValue("BeginTx", "true", ss);
+ LogTxSettings(txControl.begin_tx(), ss);
+ break;
+ case TxControl::TX_SELECTOR_NOT_SET:
+ break;
+ }
+
+ LogKeyValue("NeedCommitTx", txControl.commit_tx() ? "true" : "false", ss);
+}
+
+}
+}
diff --git a/ydb/core/grpc_services/grpc_integrity_trails.h b/ydb/core/grpc_services/grpc_integrity_trails.h
new file mode 100644
index 00000000000..963ed76348a
--- /dev/null
+++ b/ydb/core/grpc_services/grpc_integrity_trails.h
@@ -0,0 +1,265 @@
+#pragma once
+
+#include <ydb/public/api/protos/ydb_table.pb.h>
+#include <ydb/public/api/protos/ydb_scripting.pb.h>
+#include <ydb/public/api/protos/ydb_query.pb.h>
+#include <ydb/core/data_integrity_trails/data_integrity_trails.h>
+#include <ydb/core/kqp/common/events/events.h>
+
+namespace NKikimr {
+namespace NDataIntegrity {
+
+// ExecuteDataQuery
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::ExecuteDataQueryRequest& request, const TActorContext& ctx) {
+ auto log = [](const auto& traceId, const auto& request) {
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("SessionId", request.session_id(), ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogTxControl(request.tx_control(), ss);
+ LogKeyValue("Type", "ExecuteDataQueryRequest", ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
+}
+
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::ExecuteDataQueryRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
+ auto log = [](const auto& traceId, const auto& request, const auto& response) {
+ auto& record = response->Get()->Record.GetRef();
+
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogKeyValue("Type", "ExecuteDataQueryResponse", ss);
+
+ if (request.tx_control().tx_selector_case() == Ydb::Table::TransactionControl::kBeginTx) {
+ LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss);
+ }
+
+ LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
+ LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response));
+}
+
+// BeginTransaction
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::BeginTransactionRequest& request, const TActorContext& ctx) {
+ auto log = [](const auto& traceId, const auto& request) {
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("SessionId", request.session_id(), ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogTxSettings(request.tx_settings(), ss);
+ LogKeyValue("Type", "BeginTransactionRequest", ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
+}
+
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::BeginTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
+ Y_UNUSED(request);
+
+ auto log = [](const auto& traceId, const auto& response) {
+ auto& record = response->Get()->Record.GetRef();
+
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogKeyValue("Type", "BeginTransactionResponse", ss);
+ LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss);
+ LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
+ LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response));
+}
+
+// CommitTransaction
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::CommitTransactionRequest& request, const TActorContext& ctx) {
+ auto log = [](const auto& traceId, const auto& request) {
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("SessionId", request.session_id(), ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogKeyValue("Type", "CommitTransactionRequest", ss);
+ LogKeyValue("TxId", request.tx_id(), ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
+}
+
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::CommitTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
+ auto log = [](const auto& traceId, const auto& request, const auto& response) {
+ const auto& record = response->Get()->Record.GetRef();
+
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogKeyValue("Type", "CommitTransactionResponse", ss);
+ LogKeyValue("TxId", request.tx_id(), ss);
+ LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
+ LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response));
+}
+
+// RollbackTransaction
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::RollbackTransactionRequest& request, const TActorContext& ctx) {
+ auto log = [](const auto& traceId, const auto& request) {
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("SessionId", request.session_id(), ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogKeyValue("Type", "RollbackTransactionRequest", ss);
+ LogKeyValue("TxId", request.tx_id(), ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
+}
+
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Table::RollbackTransactionRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
+ auto log = [](const auto& traceId, const auto& request, const auto& response) {
+ const auto& record = response->Get()->Record.GetRef();
+
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogKeyValue("Type", "RollbackTransactionResponse", ss);
+ LogKeyValue("TxId", request.tx_id(), ss);
+ LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
+ LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response));
+}
+
+// ExecuteYqlScript/StreamExecuteYqlScript
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Scripting::ExecuteYqlRequest& request, const TActorContext& ctx) {
+ Y_UNUSED(request);
+
+ auto log = [](const auto& traceId) {
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogKeyValue("Type", "[Stream]ExecuteYqlScriptRequest", ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId));
+}
+
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Scripting::ExecuteYqlRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
+ Y_UNUSED(request);
+
+ auto log = [](const auto& traceId, const auto& response) {
+ const auto& record = response->Get()->Record.GetRef();
+
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogKeyValue("Type", "[Stream]ExecuteYqlScriptResponse", ss);
+ LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
+ LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response));
+}
+
+// ExecuteQuery
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query::ExecuteQueryRequest& request, const TActorContext& ctx) {
+ if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) {
+ return;
+ }
+
+ auto log = [](const auto& traceId, const auto& request) {
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("SessionId", request.session_id(), ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogTxControl(request.tx_control(), ss);
+ LogKeyValue("Type", "ExecuteQueryRequest", ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request));
+}
+
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query::ExecuteQueryRequest& request, NKqp::TEvKqp::TEvQueryResponse::TPtr& response, const TActorContext& ctx) {
+ if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) {
+ return;
+ }
+
+ auto log = [](const auto& traceId, const auto& request, const auto& response) {
+ const auto& record = response->Get()->Record.GetRef();
+
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogKeyValue("Type", "ExecuteQueryResponse", ss);
+
+ if (request.tx_control().tx_selector_case() == Ydb::Query::TransactionControl::kBeginTx) {
+ LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss);
+ }
+
+ LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
+ LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, request, response));
+}
+
+// ExecuteSrcipt
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query::ExecuteScriptRequest& request, const TActorContext& ctx) {
+ if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) {
+ return;
+ }
+
+ auto log = [](const auto& traceId) {
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogKeyValue("Type", "ExecuteSrciptRequest", ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId));
+}
+
+inline void LogIntegrityTrails(const TMaybe<TString>& traceId, const Ydb::Query::ExecuteScriptRequest& request, const NKqp::TEvKqp::TEvScriptResponse::TPtr& response, const TActorContext& ctx) {
+ if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) {
+ return;
+ }
+
+ auto log = [](const auto& traceId, const auto& response) {
+ TStringStream ss;
+ LogKeyValue("Component", "Grpc", ss);
+ LogKeyValue("TraceId", traceId ? *traceId : "Empty", ss);
+ LogKeyValue("Type", "ExecuteSrciptResponse", ss);
+ LogKeyValue("Status", ToString(response->Get()->Status), ss);
+ LogKeyValue("Issues", ToString(response->Get()->Issues), ss, /*last*/ true);
+ return ss.Str();
+ };
+
+ LOG_DEBUG_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response));
+}
+
+}
+}
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index a885167e906..9645317e718 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/grpc_services/audit_dml_operations.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/cancelation/cancelation_event.h>
+#include <ydb/core/grpc_services/grpc_integrity_trails.h>
#include <ydb/core/grpc_services/rpc_kqp_base.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
@@ -264,6 +265,7 @@ private:
}
AuditContextAppend(Request_.get(), *req);
+ NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx);
auto queryType = req->concurrent_result_sets()
? NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY
@@ -384,7 +386,9 @@ private:
ctx.Send(channel.ActorId, resp.Release());
}
- void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
+ void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx);
+
auto& record = ev->Get()->Record.GetRef();
const auto& issueMessage = record.GetResponse().GetQueryIssues();
diff --git a/ydb/core/grpc_services/query/rpc_execute_script.cpp b/ydb/core/grpc_services/query/rpc_execute_script.cpp
index 1c5efad6709..3c573606a72 100644
--- a/ydb/core/grpc_services/query/rpc_execute_script.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_script.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/rpc_kqp_base.h>
#include <ydb/core/grpc_services/audit_dml_operations.h>
+#include <ydb/core/grpc_services/grpc_integrity_trails.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/public/api/protos/ydb_query.pb.h>
#include <ydb/public/lib/operation_id/operation_id.h>
@@ -91,6 +92,7 @@ public:
}
AuditContextAppend(Request_.get(), request);
+ NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), request, TlsActivationContext->AsActorContext());
Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS;
if (auto scriptRequest = MakeScriptRequest(issues, status)) {
@@ -107,10 +109,12 @@ public:
private:
STRICT_STFUNC(StateFunc,
- hFunc(NKqp::TEvKqp::TEvScriptResponse, Handle)
+ HFunc(NKqp::TEvKqp::TEvScriptResponse, Handle)
)
- void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev) {
+ void Handle(NKqp::TEvKqp::TEvScriptResponse::TPtr& ev, const TActorContext& ctx) {
+ NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *Request_->GetProtoRequest(), ev, ctx);
+
Ydb::Operations::Operation operation;
operation.set_id(ev->Get()->OperationId);
Ydb::Query::ExecuteScriptMetadata metadata;
diff --git a/ydb/core/grpc_services/rpc_begin_transaction.cpp b/ydb/core/grpc_services/rpc_begin_transaction.cpp
index 2cb6ad32166..cd9306d28bd 100644
--- a/ydb/core/grpc_services/rpc_begin_transaction.cpp
+++ b/ydb/core/grpc_services/rpc_begin_transaction.cpp
@@ -1,5 +1,6 @@
#include "service_table.h"
#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/grpc_integrity_trails.h>
#include "rpc_calls.h"
#include "rpc_kqp_base.h"
@@ -49,6 +50,7 @@ private:
const auto traceId = Request_->GetTraceId();
AuditContextAppend(Request_.get(), *req);
+ NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx);
TString sessionId;
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
@@ -91,6 +93,8 @@ private:
}
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);
+
const auto& record = ev->Get()->Record.GetRef();
SetCost(record.GetConsumedRu());
AddServerHintsIfAny(record);
diff --git a/ydb/core/grpc_services/rpc_commit_transaction.cpp b/ydb/core/grpc_services/rpc_commit_transaction.cpp
index d555933d509..1626b1c6fa6 100644
--- a/ydb/core/grpc_services/rpc_commit_transaction.cpp
+++ b/ydb/core/grpc_services/rpc_commit_transaction.cpp
@@ -1,5 +1,6 @@
#include "service_table.h"
#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/grpc_integrity_trails.h>
#include "rpc_calls.h"
#include "rpc_kqp_base.h"
@@ -47,6 +48,7 @@ private:
const auto traceId = Request_->GetTraceId();
AuditContextAppend(Request_.get(), *req);
+ NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx);
TString sessionId;
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
@@ -79,6 +81,8 @@ private:
}
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);
+
const auto& record = ev->Get()->Record.GetRef();
SetCost(record.GetConsumedRu());
AddServerHintsIfAny(record);
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp
index f5b7e87043f..143101b4a28 100644
--- a/ydb/core/grpc_services/rpc_execute_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp
@@ -1,5 +1,6 @@
#include "service_table.h"
#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/grpc_integrity_trails.h>
#include "rpc_kqp_base.h"
#include "rpc_common/rpc_common.h"
#include "service_table.h"
@@ -59,6 +60,7 @@ public:
const auto requestType = Request_->GetRequestType();
AuditContextAppend(Request_.get(), *req);
+ NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx);
if (!CheckSession(req->session_id(), Request_.get())) {
return Reply(Ydb::StatusIds::BAD_REQUEST, ctx);
@@ -169,6 +171,8 @@ public:
}
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);
+
auto& record = ev->Get()->Record.GetRef();
SetCost(record.GetConsumedRu());
AddServerHintsIfAny(record);
diff --git a/ydb/core/grpc_services/rpc_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_execute_yql_script.cpp
index 72b8350d8b7..18cd27d39e0 100644
--- a/ydb/core/grpc_services/rpc_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_execute_yql_script.cpp
@@ -3,6 +3,7 @@
#include "rpc_common/rpc_common.h"
#include "audit_dml_operations.h"
+#include <ydb/core/grpc_services/grpc_integrity_trails.h>
#include <ydb/public/api/protos/ydb_scripting.pb.h>
namespace NKikimr {
@@ -48,6 +49,7 @@ public:
const auto traceId = Request_->GetTraceId();
AuditContextAppend(Request_.get(), *req);
+ NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx);
auto script = req->script();
@@ -83,6 +85,8 @@ public:
}
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);
+
const auto& record = ev->Get()->Record.GetRef();
SetCost(record.GetConsumedRu());
AddServerHintsIfAny(record);
diff --git a/ydb/core/grpc_services/rpc_rollback_transaction.cpp b/ydb/core/grpc_services/rpc_rollback_transaction.cpp
index 2cc918e0bf0..be4bb89a841 100644
--- a/ydb/core/grpc_services/rpc_rollback_transaction.cpp
+++ b/ydb/core/grpc_services/rpc_rollback_transaction.cpp
@@ -1,5 +1,6 @@
#include "service_table.h"
#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/grpc_integrity_trails.h>
#include "rpc_calls.h"
#include "rpc_kqp_base.h"
@@ -46,6 +47,8 @@ private:
const auto req = GetProtoRequest();
const auto traceId = Request_->GetTraceId();
+ NDataIntegrity::LogIntegrityTrails(traceId, *req, ctx);
+
TString sessionId;
auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
SetAuthToken(ev, *Request_);
@@ -74,6 +77,8 @@ private:
}
void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);
+
const auto& record = ev->Get()->Record.GetRef();
AddServerHintsIfAny(record);
diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
index d36c122c2c4..2b80ebc5d9e 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
@@ -7,6 +7,7 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/feature_flags.h>
+#include <ydb/core/grpc_services/grpc_integrity_trails.h>
#include <ydb/library/ydb_issue/issue_helpers.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
@@ -156,6 +157,7 @@ private:
const auto traceId = Request_->GetTraceId();
AuditContextAppend(Request_.get(), *req);
+ NDataIntegrity::LogIntegrityTrails(traceId, *GetProtoRequest(), ctx);
auto script = req->script();
@@ -341,7 +343,9 @@ private:
}
// Final response
- void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext&) {
+ void Handle(NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const TActorContext& ctx) {
+ NDataIntegrity::LogIntegrityTrails(Request_->GetTraceId(), *GetProtoRequest(), ev, ctx);
+
auto& record = ev->Get()->Record.GetRef();
NYql::TIssues issues;
diff --git a/ydb/core/kqp/common/kqp_data_integrity_trails.h b/ydb/core/kqp/common/kqp_data_integrity_trails.h
new file mode 100644
index 00000000000..a24fcb4ecab
--- /dev/null
+++ b/ydb/core/kqp/common/kqp_data_integrity_trails.h
@@ -0,0 +1,102 @@
+#pragma once
+
+#include <openssl/sha.h>
+#include <library/cpp/string_utils/base64/base64.h>
+
+#include <ydb/core/data_integrity_trails/data_integrity_trails.h>
+
+namespace NKikimr {
+namespace NDataIntegrity {
+
+inline bool ShouldBeLogged(NKikimrKqp::EQueryAction action, NKikimrKqp::EQueryType type) {
+ switch (type) {
+ case NKikimrKqp::QUERY_TYPE_SQL_DDL:
+ case NKikimrKqp::QUERY_TYPE_SQL_SCAN:
+ case NKikimrKqp::QUERY_TYPE_AST_SCAN:
+ return false;
+ default:
+ break;
+ }
+
+ switch (action) {
+ case NKikimrKqp::QUERY_ACTION_EXECUTE:
+ case NKikimrKqp::QUERY_ACTION_EXECUTE_PREPARED:
+ case NKikimrKqp::QUERY_ACTION_BEGIN_TX:
+ case NKikimrKqp::QUERY_ACTION_COMMIT_TX:
+ case NKikimrKqp::QUERY_ACTION_ROLLBACK_TX:
+ return true;
+ default:
+ return false;
+ }
+}
+
+// SessionActor
+inline void LogIntegrityTrails(const NKqp::TEvKqp::TEvQueryRequest::TPtr& request, const TActorContext& ctx) {
+ if (!ShouldBeLogged(request->Get()->GetAction(), request->Get()->GetType())) {
+ return;
+ }
+
+ auto log = [](const auto& request) {
+ TStringStream ss;
+ LogKeyValue("Component", "SessionActor", ss);
+ LogKeyValue("SessionId", request->Get()->GetSessionId(), ss);
+ LogKeyValue("TraceId", request->Get()->GetTraceId(), ss);
+ LogKeyValue("Type", "Request", ss);
+ LogKeyValue("QueryAction", ToString(request->Get()->GetAction()), ss);
+ LogKeyValue("QueryType", ToString(request->Get()->GetType()), ss);
+
+ if (request->Get()->HasTxControl()) {
+ LogTxControl(request->Get()->GetTxControl(), ss);
+ }
+
+ return ss.Str();
+ };
+
+ LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(request));
+}
+
+inline void LogIntegrityTrails(const TString& traceId, NKikimrKqp::EQueryAction action, NKikimrKqp::EQueryType type, const std::unique_ptr<NKqp::TEvKqp::TEvQueryResponse>& response, const TActorContext& ctx) {
+ if (!ShouldBeLogged(action, type)) {
+ return;
+ }
+
+ auto log = [](const auto& traceId, const auto& response) {
+ auto& record = response->Record.GetRef();
+
+ TStringStream ss;
+ LogKeyValue("Component", "SessionActor", ss);
+ LogKeyValue("SessionId", record.GetResponse().GetSessionId(), ss);
+ LogKeyValue("TraceId", traceId, ss);
+ LogKeyValue("Type", "Response", ss);
+ LogKeyValue("TxId", record.GetResponse().HasTxMeta() ? record.GetResponse().GetTxMeta().id() : "Empty", ss);
+ LogKeyValue("Status", ToString(record.GetYdbStatus()), ss);
+ LogKeyValue("Issues", ToString(record.GetResponse().GetQueryIssues()), ss, /*last*/ true);
+
+ return ss.Str();
+ };
+
+ LOG_INFO_S(ctx, NKikimrServices::DATA_INTEGRITY, log(traceId, response));
+}
+
+// DataExecuter
+inline void LogIntegrityTrails(const TString& txType, const TString& traceId, ui64 txId, TMaybe<ui64> shardId, const TActorContext& ctx) {
+ auto log = [](const auto& type, const auto& traceId, const auto& txId, const auto& shardId) {
+ TStringStream ss;
+ LogKeyValue("Component", "Executer", ss);
+ LogKeyValue("TraceId", traceId, ss);
+ LogKeyValue("PhyTxId", ToString(txId), ss);
+
+ if (shardId) {
+ LogKeyValue("ShardId", ToString(*shardId), ss);
+ }
+
+ LogKeyValue("Type", type, ss, /*last*/ true);
+
+ return ss.Str();
+ };
+
+ LOG_NOTICE_S(ctx, NKikimrServices::DATA_INTEGRITY, log(txType, traceId, txId, shardId));
+}
+
+}
+}
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index d479dbb367d..6f365c44e2d 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -11,6 +11,7 @@
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/library/wilson_ids/wilson.h>
#include <ydb/core/client/minikql_compile/db_key_resolver.h>
+#include <ydb/core/kqp/common/kqp_data_integrity_trails.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
#include <ydb/core/kqp/common/kqp_tx.h>
@@ -991,6 +992,8 @@ private:
transaction.SetFlags(TEvTxProxy::TEvProposeTransaction::FlagVolatile);
}
+ NDataIntegrity::LogIntegrityTrails("PlannedTx", Request.UserTraceId, TxId, {}, TlsActivationContext->AsActorContext());
+
LOG_T("Execute planned transaction, coordinator: " << TxCoordinator);
Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), TxCoordinator, /* subscribe */ true));
}
@@ -1689,6 +1692,9 @@ private:
dataTransaction.SerializeAsString(),
flags));
}
+
+ NDataIntegrity::LogIntegrityTrails("DatashardTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
+
ResponseEv->Orbit.Fork(evData->Orbit);
ev = std::move(evData);
}
@@ -1722,6 +1728,8 @@ private:
auto traceId = ExecuterSpan.GetTraceId();
+ NDataIntegrity::LogIntegrityTrails("EvWriteTx", Request.UserTraceId, TxId, shardId, TlsActivationContext->AsActorContext());
+
LOG_D("ExecuteEvWriteTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity()));
Send(MakePipePerNodeCacheID(false), new TEvPipeCache::TEvForward(evWriteTransaction.release(), shardId, true), 0, 0, std::move(traceId));
diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h
index 9a46611da4f..c9d7962bf0a 100644
--- a/ydb/core/kqp/gateway/kqp_gateway.h
+++ b/ydb/core/kqp/gateway/kqp_gateway.h
@@ -163,6 +163,7 @@ public:
NLWTrace::TOrbit Orbit;
NWilson::TTraceId TraceId;
+ TString UserTraceId;
NTopic::TTopicOperations TopicOperations;
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 8cf59899139..1637495dd19 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -76,11 +76,14 @@ public:
}
}
+ YQL_ENSURE(RequestEv->HasAction());
+ QueryAction = RequestEv->GetAction();
+ QueryType = RequestEv->GetType();
+
SetQueryDeadlines(tableServiceConfig, queryServiceConfig);
- auto action = GetAction();
KqpSessionSpan = NWilson::TSpan(
TWilsonKqp::KqpSession, std::move(ev->TraceId),
- "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END);
+ "Session.query." + NKikimrKqp::EQueryAction_Name(QueryAction), NWilson::EFlags::AUTO_END);
if (RequestEv->GetUserRequestContext()) {
UserRequestContext = RequestEv->GetUserRequestContext();
} else {
@@ -109,6 +112,8 @@ public:
TKqpStatsCompile CompileStats;
TIntrusivePtr<TKqpTransactionContext> TxCtx;
TQueryData::TPtr QueryData;
+ NKikimrKqp::EQueryAction QueryAction;
+ NKikimrKqp::EQueryType QueryType;
TActorId RequestActorId;
@@ -161,7 +166,7 @@ public:
TMaybe<TString> CommandTagName;
NKikimrKqp::EQueryAction GetAction() const {
- return RequestEv->GetAction();
+ return QueryAction;
}
bool GetKeepSession() const {
@@ -177,7 +182,7 @@ public:
}
NKikimrKqp::EQueryType GetType() const {
- return RequestEv->GetType();
+ return QueryType;
}
Ydb::Query::Syntax GetSyntax() const {
@@ -196,10 +201,6 @@ public:
return ResultParams;
}
- void EnsureAction() {
- YQL_ENSURE(RequestEv->HasAction());
- }
-
bool GetUsePublicResponseDataFormat() const {
return RequestEv->GetUsePublicResponseDataFormat();
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index e5568554c13..2e29f9b518a 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -3,6 +3,7 @@
#include "kqp_query_state.h"
#include "kqp_query_stats.h"
+#include <ydb/core/kqp/common/kqp_data_integrity_trails.h>
#include <ydb/core/kqp/common/kqp_lwtrace_probes.h>
#include <ydb/core/kqp/common/kqp_ru_calc.h>
#include <ydb/core/kqp/common/kqp_timeouts.h>
@@ -367,6 +368,8 @@ public:
YQL_ENSURE(ev->Get()->GetSessionId() == SessionId,
"Invalid session, expected: " << SessionId << ", got: " << ev->Get()->GetSessionId());
+ NDataIntegrity::LogIntegrityTrails(ev, TlsActivationContext->AsActorContext());
+
if (ev->Get()->HasYdbStatus() && ev->Get()->GetYdbStatus() != Ydb::StatusIds::SUCCESS) {
NYql::TIssues issues;
NYql::IssuesFromMessage(ev->Get()->GetQueryIssues(), issues);
@@ -378,14 +381,14 @@ public:
<< status
<< " msg: "
<< errMsg <<".");
- ReplyProcessError(ev->Sender, proxyRequestId, status, errMsg);
+ ReplyProcessError(ev, status, errMsg);
return;
}
if (ShutdownState && ShutdownState->SoftTimeoutReached()) {
// we reached the soft timeout, so at this point we don't allow to accept new queries for session.
LOG_N("system shutdown requested: soft timeout reached, no queries can be accepted");
- ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::BAD_SESSION, "Session is under shutdown");
+ ReplyProcessError(ev, Ydb::StatusIds::BAD_SESSION, "Session is under shutdown");
CleanupAndPassAway();
return;
}
@@ -395,7 +398,6 @@ public:
YQL_ENSURE(QueryState->GetDatabase() == Settings.Database,
"Wrong database, expected:" << Settings.Database << ", got: " << QueryState->GetDatabase());
- QueryState->EnsureAction();
auto action = QueryState->GetAction();
LWTRACK(KqpSessionQueryRequest,
@@ -969,6 +971,7 @@ public:
if (queryState) {
request.Snapshot = queryState->TxCtx->GetSnapshot();
request.IsolationLevel = *queryState->TxCtx->EffectiveIsolationLevel;
+ request.UserTraceId = queryState->UserRequestContext->TraceId;
} else {
request.IsolationLevel = NKikimrKqp::ISOLATION_LEVEL_SERIALIZABLE;
}
@@ -1863,9 +1866,10 @@ public:
Cleanup(IsFatalError(record->GetYdbStatus()));
}
- void ReplyProcessError(const TActorId& sender, ui64 proxyRequestId, Ydb::StatusIds::StatusCode ydbStatus,
+ void ReplyProcessError(const TEvKqp::TEvQueryRequest::TPtr& request, Ydb::StatusIds::StatusCode ydbStatus,
const TString& message)
{
+ ui64 proxyRequestId = request->Cookie;
LOG_W("Reply query error, msg: " << message << " proxyRequestId: " << proxyRequestId);
auto response = std::make_unique<TEvKqp::TEvQueryResponse>();
response->Record.GetRef().SetYdbStatus(ydbStatus);
@@ -1874,12 +1878,20 @@ public:
issues.AddIssue(issue);
NYql::IssuesToMessage(issues, response->Record.GetRef().MutableResponse()->MutableQueryIssues());
AddTrailingInfo(response->Record.GetRef());
- Send(sender, response.release(), 0, proxyRequestId);
+
+ NDataIntegrity::LogIntegrityTrails(
+ request->Get()->GetTraceId(),
+ request->Get()->GetAction(),
+ request->Get()->GetType(),
+ response,
+ TlsActivationContext->AsActorContext()
+ );
+
+ Send(request->Sender, response.release(), 0, proxyRequestId);
}
void ReplyBusy(TEvKqp::TEvQueryRequest::TPtr& ev) {
- ui64 proxyRequestId = ev->Cookie;
- ReplyProcessError(ev->Sender, proxyRequestId, Ydb::StatusIds::SESSION_BUSY, "Pending previous query completion");
+ ReplyProcessError(ev, Ydb::StatusIds::SESSION_BUSY, "Pending previous query completion");
}
static bool IsFatalError(const Ydb::StatusIds::StatusCode status) {
@@ -1927,6 +1939,14 @@ public:
Counters->ReportIssues(Settings.DbCounters, CachedIssueCounters, issue);
}
+ NDataIntegrity::LogIntegrityTrails(
+ QueryState->UserRequestContext->TraceId,
+ QueryState->GetAction(),
+ QueryState->GetType(),
+ QueryResponse,
+ TlsActivationContext->AsActorContext()
+ );
+
Send(QueryState->Sender, QueryResponse.release(), 0, QueryState->ProxyRequestId);
LOG_D("Sent query response back to proxy, proxyRequestId: " << QueryState->ProxyRequestId
<< ", proxyId: " << QueryState->Sender.ToString());
diff --git a/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp
new file mode 100644
index 00000000000..e32bb0ad73b
--- /dev/null
+++ b/ydb/core/kqp/ut/data_integrity/kqp_data_integrity_trails_ut.cpp
@@ -0,0 +1,104 @@
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+namespace {
+ ui64 CountSubstr(const TString& str, const TString& substr) {
+ ui64 count = 0;
+ for (auto pos = str.find(substr); pos != TString::npos; pos = str.find(substr, pos + substr.size())) {
+ ++count;
+ }
+ return count;
+ }
+}
+
+Y_UNIT_TEST_SUITE(KqpDataIntegrityTrails) {
+ Y_UNIT_TEST(Upsert) {
+ TKikimrSettings serverSettings;
+ TStringStream ss;
+ serverSettings.LogStream = &ss;
+ TKikimrRunner kikimr(serverSettings);
+ kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_DEBUG);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ UPSERT INTO `/Root/KeyValue` (Key, Value) VALUES
+ (3u, "Value3"),
+ (101u, "Value101"),
+ (201u, "Value201");
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ // check executer logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY NOTICE: Component: Executer"), 1);
+ // check session actor logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: SessionActor"), 2);
+ // check grpc logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: Grpc"), 2);
+ // TODO: check datashard logs
+ }
+
+ Y_UNIT_TEST(Ddl) {
+ TKikimrSettings serverSettings;
+ TStringStream ss;
+ serverSettings.LogStream = &ss;
+ TKikimrRunner kikimr(serverSettings);
+ kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_DEBUG);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session.ExecuteSchemeQuery(R"(
+ --!syntax_v1
+
+ CREATE TABLE `/Root/Tmp` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ // check executer logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY NOTICE: Component: Executer"), 0);
+ // check session actor logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: SessionActor"), 0);
+ // check grpc logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: Grpc"), 0);
+ // TODO: check datashard logs
+ }
+
+ Y_UNIT_TEST(Select) {
+ TKikimrSettings serverSettings;
+ TStringStream ss;
+ serverSettings.LogStream = &ss;
+ TKikimrRunner kikimr(serverSettings);
+ kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::DATA_INTEGRITY, NLog::PRI_DEBUG);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ auto result = session.ExecuteDataQuery(R"(
+ --!syntax_v1
+
+ SELECT * FROM `/Root/KeyValue`;
+ )", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ // check executer logs (should be empty, because executer logs only modification operations)
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY NOTICE: Component: Executer"), 0);
+ // check session actor logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY INFO: Component: SessionActor"), 2);
+ // check grpc logs
+ UNIT_ASSERT_VALUES_EQUAL(CountSubstr(ss.Str(), "DATA_INTEGRITY DEBUG: Component: Grpc"), 2);
+ // TODO: check datashard logs
+ }
+}
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/ut/data_integrity/ya.make b/ydb/core/kqp/ut/data_integrity/ya.make
new file mode 100644
index 00000000000..f97b50cd3db
--- /dev/null
+++ b/ydb/core/kqp/ut/data_integrity/ya.make
@@ -0,0 +1,23 @@
+UNITTEST_FOR(ydb/core/kqp)
+
+FORK_SUBTESTS()
+SPLIT_FACTOR(50)
+
+IF (SANITIZER_TYPE)
+ REQUIREMENTS(ram:12)
+ENDIF()
+
+SIZE(SMALL)
+
+SRCS(
+ kqp_data_integrity_trails_ut.cpp
+)
+
+PEERDIR(
+ ydb/core/kqp/ut/common
+ ydb/library/yql/sql/pg_dummy
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/kqp/ut/ya.make b/ydb/core/kqp/ut/ya.make
index 1cf67f2eaa9..e8b3dbe256f 100644
--- a/ydb/core/kqp/ut/ya.make
+++ b/ydb/core/kqp/ut/ya.make
@@ -2,6 +2,7 @@ RECURSE_FOR_TESTS(
arrow
cost
data
+ data_integrity
effects
federated_query
indexes
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index 3fcf304e80f..5106304f97f 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -398,6 +398,8 @@ enum EServiceKikimr {
BS_REQUEST_COST = 2500;
GROUPED_MEMORY_LIMITER = 2700;
+
+ DATA_INTEGRITY = 3000;
};
message TActivity {