diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-24 10:37:19 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2023-03-24 10:37:19 +0300 |
commit | baf4ccf438a3abeb6530ad6d10b1dd3468784c89 (patch) | |
tree | 83abb37af056a1f217bbdab1c6f79fcea6cce0a6 | |
parent | 7d1555dfe2c8d5948a187d0e6f64d35f5f4762bb (diff) | |
download | ydb-baf4ccf438a3abeb6530ad6d10b1dd3468784c89.tar.gz |
Move TopicOperations outside of SessionActor
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.cpp | 102 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_query_state.h | 7 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 109 |
3 files changed, 116 insertions, 102 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index 523613690b4..4220ba7cee5 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -1,5 +1,7 @@ #include "kqp_query_state.h" +#include <ydb/library/persqueue/topic_parser/topic_parser.h> + namespace NKikimr::NKqp { using namespace NSchemeCache; @@ -191,4 +193,104 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque CompileResult->Query, compileDeadline, DbCounters, std::move(Orbit)); } +void TKqpQueryState::AddOffsetsToTransaction() { + YQL_ENSURE(HasTopicOperations()); + + const auto& operations = GetTopicOperations(); + + TMaybe<TString> consumer; + if (operations.HasConsumer()) + consumer = operations.GetConsumer(); + + TopicOperations = NTopic::TTopicOperations(); + + for (auto& topic : operations.GetTopics()) { + auto path = CanonizePath(NPersQueue::GetFullTopicPath(TlsActivationContext->AsActorContext(), + GetDatabase(), topic.path())); + + for (auto& partition : topic.partitions()) { + if (partition.partition_offsets().empty()) { + TopicOperations.AddOperation(path, partition.partition_id()); + } else { + for (auto& range : partition.partition_offsets()) { + YQL_ENSURE(consumer.Defined()); + + TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range); + } + } + } + } +} + +bool TKqpQueryState::TryMergeTopicOffsets(const NTopic::TTopicOperations &operations, TString& message) { + try { + TxCtx->TopicOperations.Merge(operations); + return true; + } catch (const NTopic::TOffsetsRangeIntersectExpection &ex) { + message = ex.what(); + return false; + } +} + +std::unique_ptr<NSchemeCache::TSchemeCacheNavigate> TKqpQueryState::BuildSchemeCacheNavigate() { + auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(); + navigate->DatabaseName = CanonizePath(GetDatabase()); + + const auto& operations = GetTopicOperations(); + TMaybe<TString> consumer; + if (operations.HasConsumer()) + consumer = operations.GetConsumer(); + + TopicOperations.FillSchemeCacheNavigate(*navigate, std::move(consumer)); + navigate->UserToken = UserToken; + navigate->Cookie = QueryId; + return navigate; +} + +bool TKqpQueryState::IsAccessDenied(const NSchemeCache::TSchemeCacheNavigate& response, TString& message) { + auto rights = NACLib::EAccessRights::ReadAttributes | NACLib::EAccessRights::WriteAttributes; + // don't build message string on success path + bool denied = std::any_of(response.ResultSet.begin(), response.ResultSet.end(), [&] (auto& result) { + return result.SecurityObject && !result.SecurityObject->CheckAccess(rights, *UserToken); + }); + + if (!denied) { + return false; + } + + TStringBuilder builder; + builder << "Access for topic(s)"; + for (auto& result : response.ResultSet) { + if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { + continue; + } + + if (result.SecurityObject && !result.SecurityObject->CheckAccess(rights, *UserToken)) { + builder << " '" << JoinPath(result.Path) << "'"; + } + } + + builder << " is denied for subject '" << UserToken->GetUserSID() << "'"; + message = std::move(builder); + + return true; +} + +bool TKqpQueryState::HasErrors(const NSchemeCache::TSchemeCacheNavigate& response, TString& message) { + if (response.ErrorCount == 0) { + return false; + } + + TStringBuilder builder; + + builder << "Unable to navigate:"; + for (const auto& result : response.ResultSet) { + if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { + builder << "path: '" << JoinPath(result.Path) << "' status: " << result.Status; + } + } + message = std::move(builder); + + return true; +} } diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index e4687ecbfc3..bc1951b69a5 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -287,6 +287,13 @@ public: google::protobuf::Arena* GetArena() { return RequestEv->GetArena(); } + + //// Topic ops //// + void AddOffsetsToTransaction(); + bool TryMergeTopicOffsets(const NTopic::TTopicOperations &operations, TString& message); + std::unique_ptr<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigate(); + bool IsAccessDenied(const NSchemeCache::TSchemeCacheNavigate& response, TString& message); + bool HasErrors(const NSchemeCache::TSchemeCacheNavigate& response, TString& message); }; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 40a7b86d894..7b26e9547a9 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -27,7 +27,6 @@ #include <ydb/core/sys_view/service/sysview_service.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/library/yql/utils/actor_log/log.h> -#include <ydb/library/persqueue/topic_parser/topic_parser.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/event_pb.h> @@ -399,42 +398,9 @@ public: return; } - YQL_ENSURE(QueryState->HasTopicOperations()); + QueryState->AddOffsetsToTransaction(); - const NKikimrKqp::TTopicOperations& operations = QueryState->GetTopicOperations(); - - TMaybe<TString> consumer; - if (operations.HasConsumer()) { - consumer = operations.GetConsumer(); - } - - QueryState->TopicOperations = NTopic::TTopicOperations(); - - for (auto& topic : operations.GetTopics()) { - auto path = - CanonizePath(NPersQueue::GetFullTopicPath(ctx, QueryState->GetDatabase(), topic.path())); - - for (auto& partition : topic.partitions()) { - if (partition.partition_offsets().empty()) { - QueryState->TopicOperations.AddOperation(path, partition.partition_id()); - } else { - for (auto& range : partition.partition_offsets()) { - YQL_ENSURE(consumer.Defined()); - - QueryState->TopicOperations.AddOperation(path, partition.partition_id(), - *consumer, - range); - } - } - } - } - - auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(); - navigate->DatabaseName = CanonizePath(QueryState->GetDatabase()); - QueryState->TopicOperations.FillSchemeCacheNavigate(*navigate, - std::move(consumer)); - navigate->UserToken = QueryState->UserToken; - navigate->Cookie = QueryState->QueryId; + auto navigate = QueryState->BuildSchemeCacheNavigate(); Become(&TKqpSessionActor::TopicOpsState); ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release())); @@ -1907,89 +1873,28 @@ private: Ydb::StatusIds_StatusCode status; TString message; - if (IsAccessDenied(*response, message)) { + if (QueryState->IsAccessDenied(*response, message)) { ythrow TRequestFail(Ydb::StatusIds::UNAUTHORIZED) << message; } - if (HasErrors(*response, message)) { + if (QueryState->HasErrors(*response, message)) { ythrow TRequestFail(Ydb::StatusIds::SCHEME_ERROR) << message; } - QueryState->TopicOperations.ProcessSchemeCacheNavigate(response->ResultSet, - status, - message); + QueryState->TopicOperations.ProcessSchemeCacheNavigate(response->ResultSet, status, message); if (status != Ydb::StatusIds::SUCCESS) { ythrow TRequestFail(status) << message; } - if (!TryMergeTopicOffsets(QueryState->TopicOperations, message)) { + if (!QueryState->TryMergeTopicOffsets(QueryState->TopicOperations, message)) { ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << message; } ReplySuccess(); } - bool IsAccessDenied(const NSchemeCache::TSchemeCacheNavigate& response, - TString& message) - { - bool denied = false; - - TStringBuilder builder; - builder << "Access for topic(s)"; - for (auto& result : response.ResultSet) { - if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { - continue; - } - - auto rights = NACLib::EAccessRights::ReadAttributes | NACLib::EAccessRights::WriteAttributes; - if (result.SecurityObject && !result.SecurityObject->CheckAccess(rights, *QueryState->UserToken)) { - builder << " '" << JoinPath(result.Path) << "'"; - denied = true; - } - } - - if (denied) { - builder << " is denied for subject '" << QueryState->UserToken->GetUserSID() << "'"; - message = std::move(builder); - } - - return denied; - } - - bool HasErrors(const NSchemeCache::TSchemeCacheNavigate& response, - TString& message) - { - if (response.ErrorCount == 0) { - return false; - } - - TStringBuilder builder; - - builder << "Unable to navigate:"; - for (const auto& result : response.ResultSet) { - if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { - builder << "path: '" << JoinPath(result.Path) << "' status: " << result.Status; - } - } - message = std::move(builder); - - return true; - } - - bool TryMergeTopicOffsets(const NTopic::TTopicOperations &operations, TString& message) { - try { - YQL_ENSURE(QueryState); - QueryState->TxCtx->TopicOperations.Merge(operations); - return true; - } catch (const NTopic::TOffsetsRangeIntersectExpection &ex) { - message = ex.what(); - return false; - } - } - void HandleTopicOps(TEvKqp::TEvCloseSessionRequest::TPtr&) { YQL_ENSURE(QueryState); - ReplyQueryError(Ydb::StatusIds::BAD_SESSION, - "Request cancelled due to explicit session close request"); + ReplyQueryError(Ydb::StatusIds::BAD_SESSION, "Request cancelled due to explicit session close request"); Counters->ReportSessionActorClosedRequest(Settings.DbCounters); } |