diff options
author | ijon <ijon@ydb.tech> | 2023-09-27 18:23:26 +0300 |
---|---|---|
committer | ijon <ijon@ydb.tech> | 2023-09-27 18:40:58 +0300 |
commit | 21d4c5e1cd3a6d7aaf8b2b0163c05526dfe39012 (patch) | |
tree | 87bc776f75d116b1e5b9fb264b6b0e88a879eac0 | |
parent | 91b5ea9f2c65f39eb5e3122fce44e9ae71502c1d (diff) | |
download | ydb-21d4c5e1cd3a6d7aaf8b2b0163c05526dfe39012.tar.gz |
auditlog: add logging of Execute{Query,Script}, (Stream)ExecuteYqlScript
KIKIMR-18688
-rw-r--r-- | ydb/core/grpc_services/audit_dml_operations.cpp | 80 | ||||
-rw-r--r-- | ydb/core/grpc_services/audit_dml_operations.h | 36 | ||||
-rw-r--r-- | ydb/core/grpc_services/base/base.h | 24 | ||||
-rw-r--r-- | ydb/core/grpc_services/grpc_request_check_actor.h | 2 | ||||
-rw-r--r-- | ydb/core/grpc_services/local_rpc/local_rpc.h | 2 | ||||
-rw-r--r-- | ydb/core/grpc_services/query/rpc_attach_session.cpp | 2 | ||||
-rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_query.cpp | 7 | ||||
-rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_script.cpp | 5 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_execute_yql_script.cpp | 5 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_read_table.cpp | 2 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp | 2 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp | 7 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_query.cpp | 4 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_scripting.cpp | 4 |
14 files changed, 147 insertions, 35 deletions
diff --git a/ydb/core/grpc_services/audit_dml_operations.cpp b/ydb/core/grpc_services/audit_dml_operations.cpp index 1365430d177..76fe9b97de3 100644 --- a/ydb/core/grpc_services/audit_dml_operations.cpp +++ b/ydb/core/grpc_services/audit_dml_operations.cpp @@ -1,4 +1,6 @@ #include <ydb/public/api/protos/ydb_table.pb.h> +#include <ydb/public/api/protos/ydb_scripting.pb.h> +#include <ydb/public/api/protos/ydb_query.pb.h> #include "base/base.h" @@ -18,6 +20,22 @@ namespace { } return CollapseInPlace(StripInPlace(text), MAX_QUERY_TEXT_LEN); } + + template <class TxControl> + void AddAuditLogTxControlPart(NKikimr::NGRpcService::IRequestCtx* ctx, const TxControl& tx_control) + { + switch (tx_control.tx_selector_case()) { + case TxControl::kTxId: + ctx->AddAuditLogPart("tx_id", tx_control.tx_id()); + break; + case TxControl::kBeginTx: + ctx->AddAuditLogPart("begin_tx", "1"); + break; + case TxControl::TX_SELECTOR_NOT_SET: + break; + } + ctx->AddAuditLogPart("commit_tx", ToString(tx_control.commit_tx())); + } } namespace NKikimr::NGRpcService { @@ -38,7 +56,7 @@ void AuditContextEnd(IRequestCtxBase* ctx) { // template <> void AuditContextAppend(IRequestCtx* ctx, const Ydb::Table::ExecuteDataQueryRequest& request) { - // yql_text or prepared_query_id + // query_text or prepared_query_id { auto query = request.query(); if (query.has_yql_text()) { @@ -49,20 +67,7 @@ void AuditContextAppend(IRequestCtx* ctx, const Ydb::Table::ExecuteDataQueryRequ } // tx_id, explicit // begin_tx, commit_tx flags - { - auto tx_control = request.tx_control(); - switch (tx_control.tx_selector_case()) { - case Ydb::Table::TransactionControl::kTxId: - ctx->AddAuditLogPart("tx_id", tx_control.tx_id()); - break; - case Ydb::Table::TransactionControl::kBeginTx: - ctx->AddAuditLogPart("begin_tx", "1"); - break; - case Ydb::Table::TransactionControl::TX_SELECTOR_NOT_SET: - break; - } - ctx->AddAuditLogPart("commit_tx", ToString(tx_control.commit_tx())); - } + AddAuditLogTxControlPart(ctx, request.tx_control()); } template <> void AuditContextAppend(IRequestCtx* ctx, const Ydb::Table::ExecuteDataQueryRequest& request, const Ydb::Table::ExecuteQueryResult& result) { @@ -70,7 +75,7 @@ void AuditContextAppend(IRequestCtx* ctx, const Ydb::Table::ExecuteDataQueryRequ if (request.tx_control().tx_selector_case() == Ydb::Table::TransactionControl::kBeginTx) { ctx->AddAuditLogPart("tx_id", result.tx_meta().id()); } - // log updated_row_count from ExecuteQueryResult.query_stats? + // log updated_row_count collected from ExecuteQueryResult.query_stats? } // PrepareDataQuery @@ -99,7 +104,7 @@ template <> void AuditContextAppend(IRequestCtx* ctx, const Ydb::Table::CommitTransactionRequest& request) { ctx->AddAuditLogPart("tx_id", request.tx_id()); } -// log updated_row_count by CommitTransactionResult.query_stats? +// log updated_row_count collected from CommitTransactionResult.query_stats? // RollbackTransaction // @@ -119,4 +124,45 @@ void AuditContextAppend(IRequestCtx* ctx, const Ydb::Table::BulkUpsertRequest& r ctx->AddAuditLogPart("row_count", ToString(request.rows().value().items_size())); } +// ExecuteYqlScript, StreamExecuteYqlScript +// +template <> +void AuditContextAppend(IRequestCtx* ctx, const Ydb::Scripting::ExecuteYqlRequest& request) { + ctx->AddAuditLogPart("query_text", PrepareText(request.script())); +} +// log updated_row_count collected from ExecuteYqlResult.query_stats? + +// ExecuteQuery +// +template <> +void AuditContextAppend(IRequestCtx* ctx, const Ydb::Query::ExecuteQueryRequest& request) { + if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) { + return; + } + // query_text + { + switch(request.query_case()) { + case Ydb::Query::ExecuteQueryRequest::kQueryContent: + ctx->AddAuditLogPart("query_text", PrepareText(request.query_content().text())); + break; + case Ydb::Query::ExecuteQueryRequest::QUERY_NOT_SET: + break; + } + } + // tx_id + // begin_tx, commit_tx flags + AddAuditLogTxControlPart(ctx, request.tx_control()); +} +// log updated_row_count collected from ExecuteQueryResponsePart.exec_stats? + +// ExecuteSrcipt +template <> +void AuditContextAppend(IRequestCtx* ctx, const Ydb::Query::ExecuteScriptRequest& request) { + if (request.exec_mode() != Ydb::Query::EXEC_MODE_EXECUTE) { + return; + } + ctx->AddAuditLogPart("query_text", PrepareText(request.script_content().text())); +} +// log updated_row_count collected from ExecuteScriptMetadata.exec_stats? + } // namespace NKikimr::NGRpcService
\ No newline at end of file diff --git a/ydb/core/grpc_services/audit_dml_operations.h b/ydb/core/grpc_services/audit_dml_operations.h index ab3b4b6c1c5..875623e8776 100644 --- a/ydb/core/grpc_services/audit_dml_operations.h +++ b/ydb/core/grpc_services/audit_dml_operations.h @@ -1,7 +1,32 @@ #pragma once #include "defs.h" -#include <ydb/public/api/protos/ydb_table.pb.h> +namespace Ydb::Table { + +class ExecuteDataQueryRequest; +class ExecuteQueryResult; +class PrepareDataQueryRequest; +class PrepareQueryResult; +class BeginTransactionRequest; +class BeginTransactionResult; +class CommitTransactionRequest; +class RollbackTransactionRequest; +class BulkUpsertRequest; + +} + +namespace Ydb::Scripting { + +class ExecuteYqlRequest; + +} + +namespace Ydb::Query { + +class ExecuteQueryRequest; +class ExecuteScriptRequest; + +} namespace NKikimr::NGRpcService { @@ -47,4 +72,13 @@ template <> void AuditContextAppend(IRequestCtx* ctx, const Ydb::Table::Rollback // BulkUpsert template <> void AuditContextAppend(IRequestCtx* ctx, const Ydb::Table::BulkUpsertRequest& request); +// ExecuteYqlScript, StreamExecuteYqlScript +template <> void AuditContextAppend(IRequestCtx* ctx, const Ydb::Scripting::ExecuteYqlRequest& request); + +// ExecuteQuery +template <> void AuditContextAppend(IRequestCtx* ctx, const Ydb::Query::ExecuteQueryRequest& request); + +// ExecuteSrcipt +template <> void AuditContextAppend(IRequestCtx* ctx, const Ydb::Query::ExecuteScriptRequest& request); + } // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index ec4929b6561..4ed03d2dfef 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -344,7 +344,7 @@ public: struct TRequestAuxSettings { TRateLimiterMode RlMode = TRateLimiterMode::Off; void (*CustomAttributeProcessor)(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData, ICheckerIface*) = nullptr; - TAuditMode AuditMode = TAuditMode::Off; + TAuditMode AuditMode = TAuditMode::Off; }; // grpc_request_proxy part @@ -407,7 +407,7 @@ public: virtual void SetCostInfo(float consumed_units) = 0; virtual void SetStreamingNotify(NGrpc::IRequestContextBase::TOnNextReply&& cb) = 0; - virtual void FinishStream() = 0; + virtual void FinishStream(ui32 status) = 0; virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) = 0; @@ -896,7 +896,7 @@ public: } auto resp = self->CreateResponseMessage(); resp->mutable_operation()->CopyFrom(operation); - self->Ctx_->Reply(resp, operation.status()); + self->Reply(resp, operation.status()); } void SendResult(Ydb::StatusIds::StatusCode status, @@ -1209,7 +1209,9 @@ public: return Ctx_->IsClientLost(); } - void FinishStream() override { + void FinishStream(ui32 status) override { + // End Of Request for streaming requests + AuditLogRequestEnd(status); Ctx_->FinishStreamingOk(); } @@ -1270,8 +1272,9 @@ public: private: void Reply(NProtoBuf::Message *resp, ui32 status) override { - if (RequestFinished && AuditLogHook) { - AuditLogHook(status, GetAuditLogParts()); + // End Of Request for non streaming requests + if (RequestFinished) { + AuditLogRequestEnd(status); } if (RespHook) { TRespHook hook = std::move(RespHook); @@ -1280,6 +1283,15 @@ private: return Ctx_->Reply(resp, status); } + void AuditLogRequestEnd(ui32 status) { + if (AuditLogHook) { + AuditLogHook(status, GetAuditLogParts()); + // Drop hook to avoid double logging in case when operation implemention + // invokes both FinishRequest() (indirectly) and FinishStream() + AuditLogHook = nullptr; + } + } + TResponse* CreateResponseMessage() { return google::protobuf::Arena::CreateMessage<TResponse>(Ctx_->GetArena()); } diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h index 558627f4e12..9ede1a768de 100644 --- a/ydb/core/grpc_services/grpc_request_check_actor.h +++ b/ydb/core/grpc_services/grpc_request_check_actor.h @@ -359,7 +359,7 @@ private: void AuditRequest(IRequestProxyCtx* requestBaseCtx, const TString& databaseName, const TString& userSID) const { const bool dmlAuditEnabled = requestBaseCtx->IsAuditable() && IsAuditEnabledFor(userSID); - + if (dmlAuditEnabled) { AuditContextStart(requestBaseCtx, databaseName, userSID); requestBaseCtx->SetAuditLogHook([requestBaseCtx](ui32 status, const TAuditLogParts& parts) { diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index e421f2ced27..f025c505b9f 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -179,7 +179,7 @@ public: Y_FAIL("Unimplemented for local rpc"); } - void FinishStream() override { + void FinishStream(ui32) override { Y_FAIL("Unimplemented for local rpc"); } diff --git a/ydb/core/grpc_services/query/rpc_attach_session.cpp b/ydb/core/grpc_services/query/rpc_attach_session.cpp index 5ffcff1f86d..13dff91caef 100644 --- a/ydb/core/grpc_services/query/rpc_attach_session.cpp +++ b/ydb/core/grpc_services/query/rpc_attach_session.cpp @@ -166,7 +166,7 @@ private: void ReplyFinishStream(Ydb::StatusIds::StatusCode status) { Request->ReplyWithYdbStatus(status); - Request->FinishStream(); + Request->FinishStream(status); this->PassAway(); } diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 89c05777088..c0de2d55994 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -5,6 +5,7 @@ #include <ydb/library/ydb_issue/issue_helpers.h> #include <ydb/core/grpc_services/base/base.h> #include <ydb/core/grpc_services/rpc_kqp_base.h> +#include <ydb/core/grpc_services/audit_dml_operations.h> #include <ydb/core/kqp/executer_actor/kqp_executer.h> #include <ydb/public/api/protos/ydb_query.pb.h> @@ -242,6 +243,8 @@ private: } } + AuditContextAppend(Request_.get(), *req); + auto queryType = req->concurrent_result_sets() ? NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY : NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY; @@ -361,6 +364,8 @@ private: auto& kqpResponse = record.GetResponse(); FillQueryStats(*response.mutable_exec_stats(), kqpResponse); + AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), response); + TString out; Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); Request_->SendSerializedResult(std::move(out), record.GetYdbStatus()); @@ -425,7 +430,7 @@ private: Request_->SendSerializedResult(std::move(out), status); } - Request_->FinishStream(); + Request_->FinishStream(status); this->PassAway(); } diff --git a/ydb/core/grpc_services/query/rpc_execute_script.cpp b/ydb/core/grpc_services/query/rpc_execute_script.cpp index 1e1c32bcd2e..55bdf55ce58 100644 --- a/ydb/core/grpc_services/query/rpc_execute_script.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_script.cpp @@ -4,6 +4,7 @@ #include <ydb/library/ydb_issue/issue_helpers.h> #include <ydb/core/grpc_services/base/base.h> #include <ydb/core/grpc_services/rpc_kqp_base.h> +#include <ydb/core/grpc_services/audit_dml_operations.h> #include <ydb/core/kqp/common/kqp.h> #include <ydb/public/api/protos/ydb_query.pb.h> #include <ydb/public/lib/operation_id/operation_id.h> @@ -92,6 +93,8 @@ public: return Reply(Ydb::StatusIds::BAD_REQUEST, issues); } + AuditContextAppend(Request_.get(), request); + Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS; if (auto scriptRequest = MakeScriptRequest(issues, status)) { if (Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), scriptRequest.Release())) { @@ -162,6 +165,8 @@ private: result.set_status(status); + AuditContextAppend(Request_.get(), *Request_->GetProtoRequest(), result); + TString serializedResult; Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult); diff --git a/ydb/core/grpc_services/rpc_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_execute_yql_script.cpp index 96d9d0a7965..49db992aed7 100644 --- a/ydb/core/grpc_services/rpc_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_execute_yql_script.cpp @@ -1,6 +1,7 @@ #include "service_yql_scripting.h" #include "rpc_kqp_base.h" #include "rpc_common/rpc_common.h" +#include "audit_dml_operations.h" #include <ydb/public/api/protos/ydb_scripting.pb.h> @@ -46,6 +47,8 @@ public: const auto req = GetProtoRequest(); const auto traceId = Request_->GetTraceId(); + AuditContextAppend(Request_.get(), *req); + auto script = req->script(); NYql::TIssues issues; @@ -104,6 +107,8 @@ public: queryResult->mutable_query_stats()->set_query_plan(kqpResponse.GetQueryPlan()); } + AuditContextAppend(Request_.get(), *GetProtoRequest(), *queryResult); + ReplyWithResult(Ydb::StatusIds::SUCCESS, issueMessage, *queryResult, ctx); } }; diff --git a/ydb/core/grpc_services/rpc_read_table.cpp b/ydb/core/grpc_services/rpc_read_table.cpp index 781f8baeafe..c684bb0a014 100644 --- a/ydb/core/grpc_services/rpc_read_table.cpp +++ b/ydb/core/grpc_services/rpc_read_table.cpp @@ -544,7 +544,7 @@ private: NullSerializeReadTableResponse(message, status, &out); Request_->SendSerializedResult(std::move(out), status); } - Request_->FinishStream(); + Request_->FinishStream(status); LOG_NOTICE_S(ctx, NKikimrServices::READ_TABLE_API, SelfId() << " Finish grpc stream, status: " << (int)status); diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 1b6893df7a0..736c1e7c25f 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -475,7 +475,7 @@ private: Request_->SendSerializedResult(std::move(out), status); } - Request_->FinishStream(); + Request_->FinishStream(status); this->PassAway(); } diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index a7837f41c78..9e1afb6bd3f 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -1,5 +1,6 @@ #include "service_yql_scripting.h" #include "rpc_kqp_base.h" +#include "audit_dml_operations.h" #include <ydb/public/api/protos/ydb_scripting.pb.h> @@ -152,6 +153,8 @@ private: auto req = GetProtoRequest(); const auto traceId = Request_->GetTraceId(); + AuditContextAppend(Request_.get(), *req); + auto script = req->script(); NYql::TIssues issues; @@ -352,6 +355,8 @@ private: response.mutable_result()->mutable_query_stats()->set_query_plan(kqpResponse.GetQueryPlan()); } + AuditContextAppend(Request_.get(), *GetProtoRequest(), response); + Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); RequestPtr()->SendSerializedResult(std::move(out), record.GetYdbStatus()); } @@ -444,7 +449,7 @@ private: RequestPtr()->SendSerializedResult(std::move(out), status); } - RequestPtr()->FinishStream(); + RequestPtr()->FinishStream(status); this->PassAway(); } diff --git a/ydb/services/ydb/ydb_query.cpp b/ydb/services/ydb/ydb_query.cpp index 69a39d69fb3..e6e5c9cf130 100644 --- a/ydb/services/ydb/ydb_query.cpp +++ b/ydb/services/ydb/ydb_query.cpp @@ -27,13 +27,13 @@ void TGRpcYdbQueryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { ADD_REQUEST(ExecuteQuery, ExecuteQueryRequest, ExecuteQueryResponsePart, { ActorSystem_->Send(GRpcRequestProxyId_, new TGrpcRequestNoOperationCall<ExecuteQueryRequest, ExecuteQueryResponsePart> - (ctx, &DoExecuteQuery, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); + (ctx, &DoExecuteQuery, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Auditable})); }) ADD_REQUEST(ExecuteScript, ExecuteScriptRequest, Ydb::Operations::Operation, { ActorSystem_->Send(GRpcRequestProxyId_, new TGrpcRequestNoOperationCall<ExecuteScriptRequest, Ydb::Operations::Operation> - (ctx, &DoExecuteScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); + (ctx, &DoExecuteScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Auditable})); }) ADD_REQUEST(FetchScriptResults, FetchScriptResultsRequest, FetchScriptResultsResponse, { diff --git a/ydb/services/ydb/ydb_scripting.cpp b/ydb/services/ydb/ydb_scripting.cpp index 7562d31cc16..f1559768923 100644 --- a/ydb/services/ydb/ydb_scripting.cpp +++ b/ydb/services/ydb/ydb_scripting.cpp @@ -31,13 +31,13 @@ void TGRpcYdbScriptingService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { ADD_REQUEST(ExecuteYql, ExecuteYqlRequest, ExecuteYqlResponse, { ActorSystem_->Send(GRpcRequestProxyId_, new TGrpcRequestOperationCall<ExecuteYqlRequest, ExecuteYqlResponse> - (ctx, &DoExecuteYqlScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Ru), nullptr})); + (ctx, &DoExecuteYqlScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Ru), nullptr, TAuditMode::Auditable})); }) ADD_REQUEST(StreamExecuteYql, ExecuteYqlRequest, ExecuteYqlPartialResponse, { ActorSystem_->Send(GRpcRequestProxyId_, new TGrpcRequestNoOperationCall<ExecuteYqlRequest, ExecuteYqlPartialResponse> - (ctx, &DoStreamExecuteYqlScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); + (ctx, &DoStreamExecuteYqlScript, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr, TAuditMode::Auditable})); }) ADD_REQUEST(ExplainYql, ExplainYqlRequest, ExplainYqlResponse, { |