aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2023-03-24 10:37:19 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2023-03-24 10:37:19 +0300
commitbaf4ccf438a3abeb6530ad6d10b1dd3468784c89 (patch)
tree83abb37af056a1f217bbdab1c6f79fcea6cce0a6
parent7d1555dfe2c8d5948a187d0e6f64d35f5f4762bb (diff)
downloadydb-baf4ccf438a3abeb6530ad6d10b1dd3468784c89.tar.gz
Move TopicOperations outside of SessionActor
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.cpp102
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h7
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp109
3 files changed, 116 insertions, 102 deletions
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp
index 523613690b4..4220ba7cee5 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.cpp
+++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp
@@ -1,5 +1,7 @@
#include "kqp_query_state.h"
+#include <ydb/library/persqueue/topic_parser/topic_parser.h>
+
namespace NKikimr::NKqp {
using namespace NSchemeCache;
@@ -191,4 +193,104 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque
CompileResult->Query, compileDeadline, DbCounters, std::move(Orbit));
}
+void TKqpQueryState::AddOffsetsToTransaction() {
+ YQL_ENSURE(HasTopicOperations());
+
+ const auto& operations = GetTopicOperations();
+
+ TMaybe<TString> consumer;
+ if (operations.HasConsumer())
+ consumer = operations.GetConsumer();
+
+ TopicOperations = NTopic::TTopicOperations();
+
+ for (auto& topic : operations.GetTopics()) {
+ auto path = CanonizePath(NPersQueue::GetFullTopicPath(TlsActivationContext->AsActorContext(),
+ GetDatabase(), topic.path()));
+
+ for (auto& partition : topic.partitions()) {
+ if (partition.partition_offsets().empty()) {
+ TopicOperations.AddOperation(path, partition.partition_id());
+ } else {
+ for (auto& range : partition.partition_offsets()) {
+ YQL_ENSURE(consumer.Defined());
+
+ TopicOperations.AddOperation(path, partition.partition_id(), *consumer, range);
+ }
+ }
+ }
+ }
+}
+
+bool TKqpQueryState::TryMergeTopicOffsets(const NTopic::TTopicOperations &operations, TString& message) {
+ try {
+ TxCtx->TopicOperations.Merge(operations);
+ return true;
+ } catch (const NTopic::TOffsetsRangeIntersectExpection &ex) {
+ message = ex.what();
+ return false;
+ }
+}
+
+std::unique_ptr<NSchemeCache::TSchemeCacheNavigate> TKqpQueryState::BuildSchemeCacheNavigate() {
+ auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
+ navigate->DatabaseName = CanonizePath(GetDatabase());
+
+ const auto& operations = GetTopicOperations();
+ TMaybe<TString> consumer;
+ if (operations.HasConsumer())
+ consumer = operations.GetConsumer();
+
+ TopicOperations.FillSchemeCacheNavigate(*navigate, std::move(consumer));
+ navigate->UserToken = UserToken;
+ navigate->Cookie = QueryId;
+ return navigate;
+}
+
+bool TKqpQueryState::IsAccessDenied(const NSchemeCache::TSchemeCacheNavigate& response, TString& message) {
+ auto rights = NACLib::EAccessRights::ReadAttributes | NACLib::EAccessRights::WriteAttributes;
+ // don't build message string on success path
+ bool denied = std::any_of(response.ResultSet.begin(), response.ResultSet.end(), [&] (auto& result) {
+ return result.SecurityObject && !result.SecurityObject->CheckAccess(rights, *UserToken);
+ });
+
+ if (!denied) {
+ return false;
+ }
+
+ TStringBuilder builder;
+ builder << "Access for topic(s)";
+ for (auto& result : response.ResultSet) {
+ if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
+ continue;
+ }
+
+ if (result.SecurityObject && !result.SecurityObject->CheckAccess(rights, *UserToken)) {
+ builder << " '" << JoinPath(result.Path) << "'";
+ }
+ }
+
+ builder << " is denied for subject '" << UserToken->GetUserSID() << "'";
+ message = std::move(builder);
+
+ return true;
+}
+
+bool TKqpQueryState::HasErrors(const NSchemeCache::TSchemeCacheNavigate& response, TString& message) {
+ if (response.ErrorCount == 0) {
+ return false;
+ }
+
+ TStringBuilder builder;
+
+ builder << "Unable to navigate:";
+ for (const auto& result : response.ResultSet) {
+ if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
+ builder << "path: '" << JoinPath(result.Path) << "' status: " << result.Status;
+ }
+ }
+ message = std::move(builder);
+
+ return true;
+}
}
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index e4687ecbfc3..bc1951b69a5 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -287,6 +287,13 @@ public:
google::protobuf::Arena* GetArena() {
return RequestEv->GetArena();
}
+
+ //// Topic ops ////
+ void AddOffsetsToTransaction();
+ bool TryMergeTopicOffsets(const NTopic::TTopicOperations &operations, TString& message);
+ std::unique_ptr<NSchemeCache::TSchemeCacheNavigate> BuildSchemeCacheNavigate();
+ bool IsAccessDenied(const NSchemeCache::TSchemeCacheNavigate& response, TString& message);
+ bool HasErrors(const NSchemeCache::TSchemeCacheNavigate& response, TString& message);
};
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 40a7b86d894..7b26e9547a9 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -27,7 +27,6 @@
#include <ydb/core/sys_view/service/sysview_service.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/library/yql/utils/actor_log/log.h>
-#include <ydb/library/persqueue/topic_parser/topic_parser.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/event_pb.h>
@@ -399,42 +398,9 @@ public:
return;
}
- YQL_ENSURE(QueryState->HasTopicOperations());
+ QueryState->AddOffsetsToTransaction();
- const NKikimrKqp::TTopicOperations& operations = QueryState->GetTopicOperations();
-
- TMaybe<TString> consumer;
- if (operations.HasConsumer()) {
- consumer = operations.GetConsumer();
- }
-
- QueryState->TopicOperations = NTopic::TTopicOperations();
-
- for (auto& topic : operations.GetTopics()) {
- auto path =
- CanonizePath(NPersQueue::GetFullTopicPath(ctx, QueryState->GetDatabase(), topic.path()));
-
- for (auto& partition : topic.partitions()) {
- if (partition.partition_offsets().empty()) {
- QueryState->TopicOperations.AddOperation(path, partition.partition_id());
- } else {
- for (auto& range : partition.partition_offsets()) {
- YQL_ENSURE(consumer.Defined());
-
- QueryState->TopicOperations.AddOperation(path, partition.partition_id(),
- *consumer,
- range);
- }
- }
- }
- }
-
- auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
- navigate->DatabaseName = CanonizePath(QueryState->GetDatabase());
- QueryState->TopicOperations.FillSchemeCacheNavigate(*navigate,
- std::move(consumer));
- navigate->UserToken = QueryState->UserToken;
- navigate->Cookie = QueryState->QueryId;
+ auto navigate = QueryState->BuildSchemeCacheNavigate();
Become(&TKqpSessionActor::TopicOpsState);
ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
@@ -1907,89 +1873,28 @@ private:
Ydb::StatusIds_StatusCode status;
TString message;
- if (IsAccessDenied(*response, message)) {
+ if (QueryState->IsAccessDenied(*response, message)) {
ythrow TRequestFail(Ydb::StatusIds::UNAUTHORIZED) << message;
}
- if (HasErrors(*response, message)) {
+ if (QueryState->HasErrors(*response, message)) {
ythrow TRequestFail(Ydb::StatusIds::SCHEME_ERROR) << message;
}
- QueryState->TopicOperations.ProcessSchemeCacheNavigate(response->ResultSet,
- status,
- message);
+ QueryState->TopicOperations.ProcessSchemeCacheNavigate(response->ResultSet, status, message);
if (status != Ydb::StatusIds::SUCCESS) {
ythrow TRequestFail(status) << message;
}
- if (!TryMergeTopicOffsets(QueryState->TopicOperations, message)) {
+ if (!QueryState->TryMergeTopicOffsets(QueryState->TopicOperations, message)) {
ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << message;
}
ReplySuccess();
}
- bool IsAccessDenied(const NSchemeCache::TSchemeCacheNavigate& response,
- TString& message)
- {
- bool denied = false;
-
- TStringBuilder builder;
- builder << "Access for topic(s)";
- for (auto& result : response.ResultSet) {
- if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
- continue;
- }
-
- auto rights = NACLib::EAccessRights::ReadAttributes | NACLib::EAccessRights::WriteAttributes;
- if (result.SecurityObject && !result.SecurityObject->CheckAccess(rights, *QueryState->UserToken)) {
- builder << " '" << JoinPath(result.Path) << "'";
- denied = true;
- }
- }
-
- if (denied) {
- builder << " is denied for subject '" << QueryState->UserToken->GetUserSID() << "'";
- message = std::move(builder);
- }
-
- return denied;
- }
-
- bool HasErrors(const NSchemeCache::TSchemeCacheNavigate& response,
- TString& message)
- {
- if (response.ErrorCount == 0) {
- return false;
- }
-
- TStringBuilder builder;
-
- builder << "Unable to navigate:";
- for (const auto& result : response.ResultSet) {
- if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
- builder << "path: '" << JoinPath(result.Path) << "' status: " << result.Status;
- }
- }
- message = std::move(builder);
-
- return true;
- }
-
- bool TryMergeTopicOffsets(const NTopic::TTopicOperations &operations, TString& message) {
- try {
- YQL_ENSURE(QueryState);
- QueryState->TxCtx->TopicOperations.Merge(operations);
- return true;
- } catch (const NTopic::TOffsetsRangeIntersectExpection &ex) {
- message = ex.what();
- return false;
- }
- }
-
void HandleTopicOps(TEvKqp::TEvCloseSessionRequest::TPtr&) {
YQL_ENSURE(QueryState);
- ReplyQueryError(Ydb::StatusIds::BAD_SESSION,
- "Request cancelled due to explicit session close request");
+ ReplyQueryError(Ydb::StatusIds::BAD_SESSION, "Request cancelled due to explicit session close request");
Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
}