diff options
author | abcdef <akotov@ydb.tech> | 2022-09-01 10:29:01 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2022-09-01 10:29:01 +0300 |
commit | cc3dd66211358c885715bc6dc0ab24283d040dea (patch) | |
tree | eabdfb65948b5c5e643eb2b591453cfcea0223c3 | |
parent | 14291f2f6aea6fdce47fc32e6a278eb8970a4cbf (diff) | |
download | ydb-cc3dd66211358c885715bc6dc0ab24283d040dea.tar.gz |
27 files changed, 1017 insertions, 24 deletions
diff --git a/ydb/core/grpc_services/service_table.h b/ydb/core/grpc_services/service_table.h index adf8493daa..dd7dd90035 100644 --- a/ydb/core/grpc_services/service_table.h +++ b/ydb/core/grpc_services/service_table.h @@ -31,5 +31,7 @@ void DoDescribeTableOptionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFaci void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); void DoExecuteScanQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&); +void DoAddOffsetsToTransaction(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + } } diff --git a/ydb/core/kqp/common/CMakeLists.txt b/ydb/core/kqp/common/CMakeLists.txt index bda9e7d47c..f07836ba55 100644 --- a/ydb/core/kqp/common/CMakeLists.txt +++ b/ydb/core/kqp/common/CMakeLists.txt @@ -34,6 +34,7 @@ target_sources(core-kqp-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_topic.cpp ) generate_enum_serilization(core-kqp-common ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_tx_info.h diff --git a/ydb/core/kqp/common/kqp_topic.cpp b/ydb/core/kqp/common/kqp_topic.cpp new file mode 100644 index 0000000000..7aaaca9e75 --- /dev/null +++ b/ydb/core/kqp/common/kqp_topic.cpp @@ -0,0 +1,158 @@ +#include "kqp_topic.h" +#include <ydb/core/base/path.h> + +#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg) + +namespace NKikimr::NKqp::NTopic { + +// +// TPartition +// +void TPartition::AddRange(const Ydb::Topic::OffsetsRange &range) +{ + AddRangeImpl(range.start(), range.end()); +} + +void TPartition::SetTabletId(ui64 value) +{ + TabletId_ = value; +} + +void TPartition::Merge(const TPartition& rhs) +{ + for (auto& range : rhs.Offsets_) { + AddRangeImpl(range.first, range.second); + } +} + +void TPartition::AddRangeImpl(ui64 begin, ui64 end) +{ + if (Offsets_.Intersects(begin, end)) { + ythrow TOffsetsRangeIntersectExpection() << "offset ranges intersect"; + } + + Offsets_.InsertInterval(begin, end); +} + +// +// TTopic +// +TPartition& TTopic::AddPartition(ui32 id) +{ + return Partitions_[id]; +} + +TPartition* TTopic::GetPartition(ui32 id) +{ + auto p = Partitions_.find(id); + if (p == Partitions_.end()) { + return nullptr; + } + return &p->second; +} + +void TTopic::Merge(const TTopic& rhs) +{ + for (auto& [name, partition] : rhs.Partitions_) { + Partitions_[name].Merge(partition); + } +} + +// +// TOffsetsInfo +// +TTopic& TOffsetsInfo::AddTopic(const TString &path) +{ + return Topics_[path]; +} + +TTopic *TOffsetsInfo::GetTopic(const TString &path) +{ + auto p = Topics_.find(path); + if (p == Topics_.end()) { + return nullptr; + } + return &p->second; +} + +void TOffsetsInfo::FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate) const +{ + for (auto& [topic, _] : Topics_) { + NSchemeCache::TSchemeCacheNavigate::TEntry entry; + entry.Path = NKikimr::SplitPath(topic); + entry.SyncVersion = true; + entry.ShowPrivatePath = true; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList; + + navigate.ResultSet.push_back(std::move(entry)); + } +} + +bool TOffsetsInfo::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results, + Ydb::StatusIds_StatusCode& status, + TString& message) +{ + Y_VERIFY(results.size() == Topics_.size()); + + if (results.empty()) { + status = Ydb::StatusIds::BAD_REQUEST; + message = "empty request"; + return false; + } + + TStringBuilder builder; + + for (auto& result : results) { + if (result.Kind != NSchemeCache::TSchemeCacheNavigate::KindTopic) { + builder << "Path '" << JoinPath(result.Path) << "' is not a topic"; + + status = Ydb::StatusIds::SCHEME_ERROR; + message = std::move(builder); + + return false; + } + + if (result.PQGroupInfo) { + const NKikimrSchemeOp::TPersQueueGroupDescription& description = + result.PQGroupInfo->Description; + + TTopic* topic = GetTopic(CanonizePath(result.Path)); + + // + // TODO: если топика нет, то отправить сообщение с ошибкой SCHEME_ERROR + // + Y_VERIFY(topic != nullptr); + + for (auto& p : description.GetPartitions()) { + if (auto partition = topic->GetPartition(p.GetPartitionId()); partition) { + LOG_D("topic=" << description.GetName() + << ", partition=" << p.GetPartitionId() + << ", tabletId=" << p.GetTabletId()); + + partition->SetTabletId(p.GetTabletId()); + } + } + } else { + builder << "The '" << JoinPath(result.Path) << "' topic is missing"; + + status = Ydb::StatusIds::SCHEME_ERROR; + message = std::move(builder); + + return false; + } + } + + status = Ydb::StatusIds::SUCCESS; + message = ""; + + return true; +} + +void TOffsetsInfo::Merge(const TOffsetsInfo& rhs) +{ + for (auto& [name, topic] : rhs.Topics_) { + Topics_[name].Merge(topic); + } +} + +} diff --git a/ydb/core/kqp/common/kqp_topic.h b/ydb/core/kqp/common/kqp_topic.h new file mode 100644 index 0000000000..e319c441dc --- /dev/null +++ b/ydb/core/kqp/common/kqp_topic.h @@ -0,0 +1,61 @@ +#pragma once + +#include <ydb/public/api/protos/ydb_status_codes.pb.h> +#include <ydb/public/api/protos/ydb_topic.pb.h> + +#include <ydb/core/tx/scheme_cache/scheme_cache.h> + +#include <library/cpp/actors/core/actor.h> + +#include <util/system/types.h> + +#include <util/generic/fwd.h> +#include <util/generic/hash.h> +#include <util/generic/ylimits.h> + +#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h> + +#include <vector> + +namespace NKikimr::NKqp::NTopic { + +class TOffsetsRangeIntersectExpection : public yexception { +}; + +struct TPartition { + void AddRange(const Ydb::Topic::OffsetsRange &range); + void SetTabletId(ui64 value); + + void Merge(const TPartition& rhs); + +private: + void AddRangeImpl(ui64 begin, ui64 end); + + TDisjointIntervalTree<ui64> Offsets_; + ui64 TabletId_ = Max<ui64>(); +}; + +struct TTopic { + TPartition& AddPartition(ui32 id); + TPartition* GetPartition(ui32 id); + + void Merge(const TTopic& rhs); + + THashMap<ui32, TPartition> Partitions_; +}; + +struct TOffsetsInfo { + TTopic& AddTopic(const TString &path); + TTopic* GetTopic(const TString &path); + + void FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate) const; + bool ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results, + Ydb::StatusIds_StatusCode& status, + TString& message); + + void Merge(const TOffsetsInfo& rhs); + + THashMap<TString, TTopic> Topics_; +}; + +} diff --git a/ydb/core/kqp/common/kqp_transform.cpp b/ydb/core/kqp/common/kqp_transform.cpp index d07f42f3b3..7db2407390 100644 --- a/ydb/core/kqp/common/kqp_transform.cpp +++ b/ydb/core/kqp/common/kqp_transform.cpp @@ -18,14 +18,14 @@ IGraphTransformer::TStatus TLogExprTransformer::operator()(const TExprNode::TPtr return IGraphTransformer::TStatus::Ok; } -TAutoPtr<IGraphTransformer> TLogExprTransformer::Sync(const TString& description, NLog::EComponent component, - NLog::ELevel level) +TAutoPtr<IGraphTransformer> TLogExprTransformer::Sync(const TString& description, NYql::NLog::EComponent component, + NYql::NLog::ELevel level) { return CreateFunctorTransformer(TLogExprTransformer(description, component, level)); } -void TLogExprTransformer::LogExpr(const TExprNode& input, TExprContext& ctx, const TString& description, NLog::EComponent component, - NLog::ELevel level) +void TLogExprTransformer::LogExpr(const TExprNode& input, TExprContext& ctx, const TString& description, NYql::NLog::EComponent component, + NYql::NLog::ELevel level) { YQL_CVLOG(level, component) << description << ":\n" << KqpExprToPrettyString(input, ctx); } diff --git a/ydb/core/kqp/common/kqp_transform.h b/ydb/core/kqp/common/kqp_transform.h index 3e60200e46..93e85ab7b3 100644 --- a/ydb/core/kqp/common/kqp_transform.h +++ b/ydb/core/kqp/common/kqp_transform.h @@ -1,8 +1,10 @@ #pragma once #include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h> + #include <ydb/core/kqp/common/kqp_gateway.h> #include <ydb/core/kqp/common/kqp_tx_info.h> +#include <ydb/core/kqp/common/kqp_topic.h> #include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h> #include <ydb/core/kqp/provider/yql_kikimr_provider.h> @@ -239,6 +241,10 @@ public: ForceNewEngineSettings.ForceNewEngineLevel = level; } + void MergeTopicOffsets(const NTopic::TOffsetsInfo &offsets) { + TopicOffsets.Merge(offsets); + } + public: struct TParamsState : public TThrRefBase { TParamValueMap Values; @@ -259,6 +265,7 @@ public: TKqpTxLocks Locks; TDeferredEffects DeferredEffects; + NTopic::TOffsetsInfo TopicOffsets; TIntrusivePtr<TParamsState> ParamsState; IKqpGateway::TKqpSnapshotHandle SnapshotHandle; diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index 0f6abe1d13..55ec3c395e 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -161,8 +161,8 @@ public: KqlTypeAnnTransformer = CreateTypeAnnotationTransformer(CreateExtCallableTypeAnnotationTransformer(*typesCtx), *typesCtx); - auto logLevel = NLog::ELevel::TRACE; - auto logComp = NLog::EComponent::ProviderKqp; + auto logLevel = NYql::NLog::ELevel::TRACE; + auto logComp = NYql::NLog::EComponent::ProviderKqp; KqlOptimizeTransformer = TTransformationPipeline(typesCtx) .AddServiceTransformers() diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index 0525d171e2..9aac53c984 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -16,11 +16,13 @@ #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/appdata.h> #include <ydb/core/base/cputime.h> +#include <ydb/core/base/path.h> #include <ydb/core/base/wilson.h> #include <ydb/core/protos/kqp.pb.h> #include <ydb/core/sys_view/service/sysview_service.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/library/yql/utils/actor_log/log.h> +#include <ydb/library/persqueue/topic_parser/topic_parser.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/event_pb.h> @@ -91,6 +93,8 @@ struct TKqpQueryState { TString TxId; // User tx bool Commit = false; + + NTopic::TOffsetsInfo Offsets; }; struct TKqpCleanupCtx { @@ -291,7 +295,7 @@ public: } } - void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev) { + void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev, const NActors::TActorContext& ctx) { ui64 proxyRequestId = ev->Cookie; auto& event = ev->Get()->Record; auto requestInfo = TKqpRequestInfo(event.GetTraceId(), event.GetRequest().GetSessionId()); @@ -392,6 +396,10 @@ public: event.MutableRequest()->Swap(&QueryState->Request); ForwardRequest(ev); return; + + case NKikimrKqp::QUERY_ACTION_TOPIC: + AddOffsetsToTransaction(ctx); + return; } StopIdleTimer(); @@ -399,6 +407,38 @@ public: CompileQuery(); } + void AddOffsetsToTransaction(const NActors::TActorContext& ctx) { + YQL_ENSURE(QueryState); + auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); + + if (!PrepareQueryTransaction(requestInfo)) { + return; + } + + QueryState->Offsets = NTopic::TOffsetsInfo(); + + for (auto& topic : QueryState->Request.topics()) { + auto path = CanonizePath(NPersQueue::GetFullTopicPath(ctx, QueryState->Request.GetDatabase(), topic.path())); + auto& t = QueryState->Offsets.AddTopic(path); + + for (auto& partition : topic.partitions()) { + auto& p = t.AddPartition(partition.partition_id()); + + for (auto& range : partition.partition_offsets()) { + p.AddRange(range); + } + } + } + + auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(); + navigate->DatabaseName = CanonizePath(QueryState->Request.GetDatabase()); + QueryState->Offsets.FillSchemeCacheNavigate(*navigate); + navigate->UserToken = new NACLib::TUserToken(QueryState->UserToken); + + Become(&TKqpSessionActor::TopicOpsState); + ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release())); + } + void CompileQuery() { YQL_ENSURE(QueryState); auto& queryRequest = QueryState->Request; @@ -638,10 +678,7 @@ public: return {success, ctx.IssueManager.GetIssues()}; } - bool PrepareQueryContext() { - YQL_ENSURE(QueryState); - auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); - + bool PrepareQueryTransaction(const TKqpRequestInfo& requestInfo) { auto& queryRequest = QueryState->Request; if (queryRequest.HasTxControl()) { @@ -676,6 +713,17 @@ public: QueryState->TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED; } + return true; + } + + bool PrepareQueryContext() { + YQL_ENSURE(QueryState); + auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); + + if (!PrepareQueryTransaction(requestInfo)) { + return false; + } + auto& queryCtx = QueryState->QueryCtx; queryCtx->TimeProvider = TAppData::TimeProvider; queryCtx->RandomProvider = TAppData::RandomProvider; @@ -688,6 +736,7 @@ public: return false; } + auto& queryRequest = QueryState->Request; auto action = queryRequest.GetAction(); auto type = queryRequest.GetType(); @@ -1845,10 +1894,10 @@ public: Cleanup(IsFatalError(ydbStatus)); } - STATEFN(ReadyState) { + STFUNC(ReadyState) { try { switch (ev->GetTypeRewrite()) { - hFunc(TEvKqp::TEvQueryRequest, HandleReady); + HFunc(TEvKqp::TEvQueryRequest, HandleReady); hFunc(TEvKqp::TEvPingSessionRequest, Handle); hFunc(TEvKqp::TEvIdleTimeout, Handle); @@ -1942,6 +1991,28 @@ public: } } + STATEFN(TopicOpsState) { + try { + switch (ev->GetTypeRewrite()) { + hFunc(TEvKqp::TEvQueryRequest, HandleTopicOps); + + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopicOps); + + hFunc(TEvKqp::TEvPingSessionRequest, Handle); + hFunc(TEvKqp::TEvCloseSessionRequest, HandleTopicOps); + hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle); + hFunc(TEvKqp::TEvContinueShutdown, Handle); + hFunc(TEvKqp::TEvIdleTimeout, HandleNoop); + default: + UnexpectedEvent("TopicOpsState", ev); + } + } catch (const TRequestFail& ex) { + ReplyQueryError(ex.RequestInfo, ex.Status, ex.what(), ex.Issues); + } catch (const yexception& ex) { + InternalError(ex.what()); + } + } + private: void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) { InternalError(TStringBuilder() << "TKqpSessionActor in state " << state << " recieve unexpected event " << @@ -1958,6 +2029,106 @@ private: } } + void HandleTopicOps(TEvKqp::TEvQueryRequest::TPtr& ev) { + ReplyBusy(ev); + } + + void HandleTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + TKqpRequestInfo requestInfo(QueryState->TraceId, SessionId); + NSchemeCache::TSchemeCacheNavigate* response = ev->Get()->Request.Get(); + + Ydb::StatusIds_StatusCode status; + TString message; + + if (IsAccessDenied(*response, message)) { + ythrow TRequestFail(requestInfo, Ydb::StatusIds::UNAUTHORIZED) << message; + } + if (HasErrors(*response, message)) { + ythrow TRequestFail(requestInfo, Ydb::StatusIds::SCHEME_ERROR) << message; + } + + QueryState->Offsets.ProcessSchemeCacheNavigate(response->ResultSet, + status, + message); + if (status != Ydb::StatusIds::SUCCESS) { + ythrow TRequestFail(requestInfo, status) << message; + } + + if (!TryMergeTopicOffsets(QueryState->Offsets, message)) { + ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << message; + } + + ReplySuccess(); + } + + bool IsAccessDenied(const NSchemeCache::TSchemeCacheNavigate& response, + TString& message) + { + bool denied = false; + + TStringBuilder builder; + NACLib::TUserToken token(QueryState->UserToken); + + builder << "Access for topic(s)"; + for (auto& result : response.ResultSet) { + if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { + continue; + } + + auto rights = NACLib::EAccessRights::ReadAttributes | NACLib::EAccessRights::WriteAttributes; + if (result.SecurityObject && !result.SecurityObject->CheckAccess(rights, token)) { + builder << " '" << JoinPath(result.Path) << "'"; + denied = true; + } + } + + if (denied) { + builder << " is denied for subject '" << token.GetUserSID() << "'"; + + message = std::move(builder); + } + + return denied; + } + + bool HasErrors(const NSchemeCache::TSchemeCacheNavigate& response, + TString& message) + { + if (response.ErrorCount == 0) { + return false; + } + + TStringBuilder builder; + + builder << "Unable to navigate:"; + for (const auto& result : response.ResultSet) { + if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { + builder << "path: '" << JoinPath(result.Path) << "' status: " << result.Status; + } + } + message = std::move(builder); + + return true; + } + + bool TryMergeTopicOffsets(const NTopic::TOffsetsInfo &offsets, TString& message) { + try { + YQL_ENSURE(QueryState); + QueryState->TxCtx->MergeTopicOffsets(offsets); + return true; + } catch (const NTopic::TOffsetsRangeIntersectExpection &ex) { + message = ex.what(); + return false; + } + } + + void HandleTopicOps(TEvKqp::TEvCloseSessionRequest::TPtr&) { + YQL_ENSURE(QueryState); + ReplyQueryError(TKqpRequestInfo(QueryState->TraceId, SessionId), Ydb::StatusIds::BAD_SESSION, + "Request cancelled due to explicit session close request"); + Counters->ReportSessionActorClosedRequest(Settings.DbCounters); + } + private: TActorId Owner; TString SessionId; @@ -1983,7 +2154,6 @@ private: TActorId IdleTimerActorId; ui32 IdleTimerId = 0; std::optional<TSessionShutdownState> ShutdownState; - }; } // namespace diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp index 711788e367..197c7dc774 100644 --- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp +++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp @@ -162,7 +162,7 @@ private: static TMaybeNode<TExprList> BuildTxResults(const TKqlQueryResultList& results, TVector<TDqPhyStage>& stages, TExprContext& ctx) { - if (NLog::YqlLogger().NeedToLog(NLog::EComponent::ProviderKqp, NLog::ELevel::TRACE)) { + if (NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE)) { TStringBuilder sb; sb << "-- BuildTxResults" << Endl; sb << " results:" << Endl; @@ -400,7 +400,7 @@ public: DataTxTransformer = TTransformationPipeline(&typesCtx) .AddServiceTransformers() - .Add(TExprLogTransformer::Sync("TxOpt", NLog::EComponent::ProviderKqp, NLog::ELevel::TRACE), "TxOpt") + .Add(TExprLogTransformer::Sync("TxOpt", NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE), "TxOpt") .Add(*TypeAnnTransformer, "TypeAnnotation") .AddPostTypeAnnotation(/* forSubgraph */ true) .Add(CreateKqpBuildPhyStagesTransformer(/* allowDependantConsumers */ false), "BuildPhysicalStages") @@ -410,7 +410,7 @@ public: ScanTxTransformer = TTransformationPipeline(&typesCtx) .AddServiceTransformers() - .Add(TExprLogTransformer::Sync("TxOpt", NLog::EComponent::ProviderKqp, NLog::ELevel::TRACE), "TxOpt") + .Add(TExprLogTransformer::Sync("TxOpt", NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE), "TxOpt") .Add(*TypeAnnTransformer, "TypeAnnotation") .AddPostTypeAnnotation(/* forSubgraph */ true) .Add(CreateKqpBuildPhyStagesTransformer(config->SpillingEnabled()), "BuildPhysicalStages") diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp index 61c8a5ddcf..5483697143 100644 --- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp +++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp @@ -19,7 +19,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase { public: TKqpLogicalOptTransformer(TTypeAnnotationContext& typesCtx, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx, const TKikimrConfiguration::TPtr& config) - : TOptimizeTransformerBase(nullptr, NLog::EComponent::ProviderKqp, {}) + : TOptimizeTransformerBase(nullptr, NYql::NLog::EComponent::ProviderKqp, {}) , TypesCtx(typesCtx) , KqpCtx(*kqpCtx) , Config(config) diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp index 5c1f1cabc0..65bae1cebe 100644 --- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp +++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp @@ -28,7 +28,7 @@ using TStatus = IGraphTransformer::TStatus; class TKqpPeepholeTransformer : public TOptimizeTransformerBase { public: TKqpPeepholeTransformer() - : TOptimizeTransformerBase(nullptr, NLog::EComponent::ProviderKqp, {}) + : TOptimizeTransformerBase(nullptr, NYql::NLog::EComponent::ProviderKqp, {}) { #define HNDL(name) "KqpPeephole-"#name, Hndl(&TKqpPeepholeTransformer::name) AddHandler(0, &TDqReplicate::Match, HNDL(RewriteReplicate)); @@ -260,7 +260,7 @@ public: { TxTransformer = TTransformationPipeline(&typesCtx) .AddServiceTransformers() - .Add(TLogExprTransformer::Sync("TxsPeephole", NLog::EComponent::ProviderKqp, NLog::ELevel::TRACE), "TxsPeephole") + .Add(TLogExprTransformer::Sync("TxsPeephole", NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE), "TxsPeephole") .Add(*TypeAnnTransformer, "TypeAnnotation") .AddPostTypeAnnotation(/* forSubgraph */ true) .Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config), "Peephole") diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp index 3619cf6fb5..223c276354 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp @@ -21,7 +21,7 @@ using TStatus = IGraphTransformer::TStatus; class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase { public: TKqpPhysicalOptTransformer(TTypeAnnotationContext& typesCtx, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx) - : TOptimizeTransformerBase(nullptr, NLog::EComponent::ProviderKqp, {}) + : TOptimizeTransformerBase(nullptr, NYql::NLog::EComponent::ProviderKqp, {}) , TypesCtx(typesCtx) , KqpCtx(*kqpCtx) { diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index e0c220ad21..3a4e101ff5 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -10,6 +10,7 @@ import "ydb/public/api/protos/ydb_status_codes.proto"; import "ydb/public/api/protos/ydb_table.proto"; import "ydb/public/api/protos/ydb_value.proto"; import "ydb/public/api/protos/ydb_issue_message.proto"; +import "ydb/public/api/protos/ydb_topic.proto"; import "ydb/core/protos/kqp_query_settings.proto"; import "ydb/library/mkql_proto/protos/minikql.proto"; import "ydb/library/yql/dq/actors/protos/dq_events.proto"; @@ -47,6 +48,7 @@ enum EQueryAction { QUERY_ACTION_COMMIT_TX = 7; QUERY_ACTION_ROLLBACK_TX = 8; QUERY_ACTION_PARSE = 9; + QUERY_ACTION_TOPIC = 10; }; enum EIsolationLevel { @@ -93,6 +95,7 @@ message TQueryRequest { reserved 19; // (deprecated) StatsMode optional NYql.NDqProto.EDqStatsMode StatsMode = 20; // deprecated optional Ydb.Table.QueryStatsCollection.Mode CollectStats = 21; + repeated Ydb.Topic.AddOffsetsToTransactionRequest.TopicOffsets Topics = 22; } message TKqpPathIdProto { diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index fb2d8cf6fd..1072532178 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -315,6 +315,7 @@ namespace Tests { GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxyId, true)); GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true)); GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true)); + GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicServiceTx(system, counters, grpcRequestProxyId)); GRpcServer->AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(system, counters, grpcRequestProxyId)); GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxyId, true)); GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxyId, true)); diff --git a/ydb/public/api/grpc/draft/CMakeLists.txt b/ydb/public/api/grpc/draft/CMakeLists.txt index d95ff4a020..6da5beb110 100644 --- a/ydb/public/api/grpc/draft/CMakeLists.txt +++ b/ydb/public/api/grpc/draft/CMakeLists.txt @@ -27,6 +27,7 @@ target_proto_messages(api-grpc-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_s3_internal_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto + ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto ) target_proto_addincls(api-grpc-draft ./ diff --git a/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto b/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto new file mode 100644 index 0000000000..120f32f500 --- /dev/null +++ b/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package Ydb.Topic.V1; + +option java_package = "com.yandex.ydb.topic.v1"; + +import "ydb/public/api/protos/ydb_topic.proto"; + +service TopicServiceTx { + // Add offsets to transaction + rpc AddOffsetsToTransaction(AddOffsetsToTransactionRequest) returns (AddOffsetsToTransactionResponse); +} + + + diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index 9b13a292e6..4501e3fee7 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -3,6 +3,7 @@ import "ydb/public/api/protos/ydb_operation.proto"; import "ydb/public/api/protos/ydb_scheme.proto"; import "ydb/public/api/protos/ydb_status_codes.proto"; import "ydb/public/api/protos/ydb_issue_message.proto"; +import "ydb/public/api/protos/ydb_table.proto"; import "ydb/public/api/protos/annotations/validation.proto"; import "google/protobuf/duration.proto"; @@ -495,6 +496,45 @@ message StreamReadMessage { } } +// Add offsets to transaction request sent from client to server. +message AddOffsetsToTransactionRequest { + Ydb.Operations.OperationParams operation_params = 1; + + // Session identifier from TableService. + string session_id = 2; + + // Transaction identifier from TableService. + Ydb.Table.TransactionControl tx_control = 3; + + // Ranges of offsets by topics. + repeated TopicOffsets topics = 4; + + message TopicOffsets { + // Topic path. + string path = 1; + + // Ranges of offsets by partitions. + repeated PartitionOffsets partitions = 2; + + message PartitionOffsets { + // Partition identifier. + int64 partition_id = 1; + + // List of offset ranges. + repeated OffsetsRange partition_offsets = 2; + } + } +} + +// Add offsets to transaction response sent from server to client. +message AddOffsetsToTransactionResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +// Add offsets to transaction result message that will be inside AddOffsetsToTransactionResponse.operation. +message AddOffsetsToTransactionResult { +} //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Control messages diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp index a5cfc8f32f..d16cb6e8b8 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp @@ -479,6 +479,5 @@ Y_UNIT_TEST_SUITE(BasicUsage) { }, event); } - } } // namespace NYdb::NPersQueue::NTests diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.txt b/ydb/services/persqueue_v1/actors/CMakeLists.txt index 39e69eccf9..255fa7270e 100644 --- a/ydb/services/persqueue_v1/actors/CMakeLists.txt +++ b/ydb/services/persqueue_v1/actors/CMakeLists.txt @@ -33,4 +33,5 @@ target_sources(services-persqueue_v1-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp ) diff --git a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp b/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp new file mode 100644 index 0000000000..b6d1e4933b --- /dev/null +++ b/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp @@ -0,0 +1,103 @@ +#include "add_offsets_to_transaction_actor.h" + +namespace NKikimr::NGRpcService { + +TAddOffsetsToTransactionActor::TAddOffsetsToTransactionActor(IRequestOpCtx* request) + : TBase{request} +{ +} + +void TAddOffsetsToTransactionActor::Bootstrap(const NActors::TActorContext& ctx) +{ + TBase::Bootstrap(ctx); + Become(&TAddOffsetsToTransactionActor::StateWork); + Proceed(ctx); +} + +void TAddOffsetsToTransactionActor::Proceed(const NActors::TActorContext& ctx) +{ + const auto req = GetProtoRequest(); + + auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>(); + SetAuthToken(ev, *Request_); + SetDatabase(ev, *Request_); + + NYql::TIssues issues; + if (CheckSession(req->session_id(), issues)) { + ev->Record.MutableRequest()->SetSessionId(req->session_id()); + } else { + return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); + } + + // + // TODO: новый тип запроса. например, QUERY_TYPE_TOPIC + // + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_UNDEFINED); + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_TOPIC); + + if (const auto traceId = Request_->GetTraceId(); traceId) { + ev->Record.SetTraceId(traceId.GetRef()); + } + + if (const auto requestType = Request_->GetRequestType(); requestType) { + ev->Record.SetRequestType(requestType.GetRef()); + } + + ev->Record.MutableRequest()->SetCancelAfterMs(GetCancelAfter().MilliSeconds()); + ev->Record.MutableRequest()->SetTimeoutMs(GetOperationTimeout().MilliSeconds()); + + if (!req->has_tx_control()) { + NYql::TIssues issues; + issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Empty tx_control.")); + return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx); + } + + // + // TODO: проверить комбинацию значений атрибутов tx_control + // + + ev->Record.MutableRequest()->MutableTxControl()->CopyFrom(req->tx_control()); + + // + // скопировать информацию о смещениях + // + *ev->Record.MutableRequest()->MutableTopics() = req->Gettopics(); + + ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release()); +} + +void TAddOffsetsToTransactionActor::Handle(const NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx) +{ + const auto& record = ev->Get()->Record.GetRef(); + SetCost(record.GetConsumedRu()); + AddServerHintsIfAny(record); + + if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { + const auto& kqpResponse = record.GetResponse(); + const auto& issueMessage = kqpResponse.GetQueryIssues(); + auto queryResult = TEvAddOffsetsToTransactionRequest::AllocateResult<Ydb::Topic::AddOffsetsToTransactionResult>(Request_); + + // + // TODO: сохранить результат + // + + // + // TODO: статистика + // + + // + // TODO: tx_meta + // + + ReplyWithResult(Ydb::StatusIds::SUCCESS, issueMessage, *queryResult, ctx); + } else { + OnGenericQueryResponseError(record, ctx); + } +} + +void DoAddOffsetsToTransaction(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) +{ + TActivationContext::AsActorContext().Register(new TAddOffsetsToTransactionActor(p.release())); +} + +} diff --git a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.h b/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.h new file mode 100644 index 0000000000..ea8212668f --- /dev/null +++ b/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.h @@ -0,0 +1,42 @@ +#pragma once + +#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/rpc_kqp_base.h> + +#include <ydb/core/kqp/kqp.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> + +#include <memory> + +namespace NKikimr::NGRpcService { + +using TEvAddOffsetsToTransactionRequest = + TGrpcRequestOperationCall<Ydb::Topic::AddOffsetsToTransactionRequest, Ydb::Topic::AddOffsetsToTransactionResponse>; + +class TAddOffsetsToTransactionActor : public TRpcKqpRequestActor<TAddOffsetsToTransactionActor, TEvAddOffsetsToTransactionRequest> { +public: + using TBase = TRpcKqpRequestActor<TAddOffsetsToTransactionActor, TEvAddOffsetsToTransactionRequest>; + using TResult = Ydb::Topic::AddOffsetsToTransactionResult; + + explicit TAddOffsetsToTransactionActor(IRequestOpCtx* msg); + + void Bootstrap(const NActors::TActorContext& ctx); + +private: + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle); + default: + TBase::StateWork(ev, ctx); + break; + } + } + + void Handle(const NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx); + + void Proceed(const NActors::TActorContext& ctx); +}; + +} diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index c46a80e129..0604c67749 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -4966,6 +4966,5 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { UNIT_ASSERT(partId == 1 || partId == 3); } } - } } diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp index 8dd541760e..badaf32eb0 100644 --- a/ydb/services/persqueue_v1/topic.cpp +++ b/ydb/services/persqueue_v1/topic.cpp @@ -4,6 +4,7 @@ #include <ydb/core/base/counters.h> #include <ydb/core/grpc_services/rpc_calls.h> #include <ydb/core/grpc_services/grpc_helper.h> +#include <ydb/core/grpc_services/service_table.h> #include <ydb/core/tx/scheme_board/cache.h> #include "grpc_pq_read.h" @@ -121,11 +122,68 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { }) #undef ADD_REQUEST +} +void TGRpcTopicService::StopService() noexcept { + TGrpcServiceBase::StopService(); +} +// +// TGRpcTopicServiceTx +// +TGRpcTopicServiceTx::TGRpcTopicServiceTx(NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const NActors::TActorId& grpcRequestProxy) + : ActorSystem(system) + , Counters(counters) + , GRpcRequestProxy(grpcRequestProxy) +{ } -void TGRpcTopicService::StopService() noexcept { +void TGRpcTopicServiceTx::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { + CQ = cq; + + if (ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled()) { + SetupIncomingRequests(std::move(logger)); + } +} + +void TGRpcTopicServiceTx::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { + Limiter = limiter; +} + +bool TGRpcTopicServiceTx::IncRequest() { + return Limiter->Inc(); +} + +void TGRpcTopicServiceTx::DecRequest() { + Limiter->Dec(); +} + +void TGRpcTopicServiceTx::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { + auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters, ActorSystem); + +#ifdef ADD_REQUEST_LIMIT +#error ADD_REQUEST_LIMIT macro already defined +#endif + +#define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE) \ + MakeIntrusive<TGRpcRequest<Ydb::Topic::NAME##Request, Ydb::Topic::NAME##Response, TGRpcTopicServiceTx>> \ + (this, this->GetService(), CQ, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem, ctx->GetPeer()); \ + ActorSystem->Send(GRpcRequestProxy, \ + new TGrpcRequestOperationCall<Ydb::Topic::NAME##Request, Ydb::Topic::NAME##Response> \ + (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \ + }, &Ydb::Topic::V1::TopicServiceTx::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("topic", #NAME))->Run(); + + ADD_REQUEST_LIMIT(AddOffsetsToTransaction, DoAddOffsetsToTransaction, Ru) + +#undef ADD_REQUEST_LIMIT +} + +void TGRpcTopicServiceTx::StopService() noexcept { TGrpcServiceBase::StopService(); } diff --git a/ydb/services/persqueue_v1/topic.h b/ydb/services/persqueue_v1/topic.h index a75f1b8375..ee5f8be7b1 100644 --- a/ydb/services/persqueue_v1/topic.h +++ b/ydb/services/persqueue_v1/topic.h @@ -3,6 +3,7 @@ #include <library/cpp/actors/core/actorsystem.h> #include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h> +#include <ydb/public/api/grpc/draft/ydb_topic_tx_v1.grpc.pb.h> #include <library/cpp/grpc/server/grpc_server.h> @@ -35,6 +36,34 @@ private: NActors::TActorId NewSchemeCache; }; +class TGRpcTopicServiceTx + : public NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicServiceTx> +{ +public: + TGRpcTopicServiceTx(NActors::TActorSystem* system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const NActors::TActorId& grpcRequestProxy); + + void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; + void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; + void StopService() noexcept override; + + using NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicServiceTx>::GetService; + + bool IncRequest(); + void DecRequest(); + +private: + void SetupIncomingRequests(NGrpc::TLoggerPtr logger); + + NActors::TActorSystem* ActorSystem; + grpc::ServerCompletionQueue* CQ = nullptr; + + TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters; + NGrpc::TGlobalLimiter* Limiter = nullptr; + NActors::TActorId GRpcRequestProxy; +}; + } // namespace V1 } // namespace NGRpcService } // namespace NKikimr diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt index 774adc10d7..a3f22ab07a 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt @@ -48,6 +48,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp ) add_test( NAME diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt index 0becc47bd6..cf4cd9f3a7 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt @@ -52,6 +52,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp ) add_test( NAME diff --git a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp new file mode 100644 index 0000000000..089a960073 --- /dev/null +++ b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp @@ -0,0 +1,300 @@ +#include <ydb/public/api/grpc/draft/ydb_topic_tx_v1.grpc.pb.h> + +#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h> + +#include <ydb/core/protos/services.pb.h> + +#include <util/stream/output.h> +#include <util/string/builder.h> + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NPersQueueTests { + +Y_UNIT_TEST_SUITE(TopicService) { + +NYdb::NTable::TSession CreateSession(NYdb::TDriver &driver) { + NYdb::NTable::TClientSettings settings; + NYdb::NTable::TTableClient client(driver, settings); + + auto result = client.CreateSession().ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + + return result.GetSession(); +} + +NYdb::NTable::TTransaction BeginTransaction(NYdb::NTable::TSession &session) { + auto result = session.BeginTransaction().ExtractValueSync(); + UNIT_ASSERT_EQUAL(result.IsTransportError(), false); + + return result.GetTransaction(); +} + +template<class T> +std::unique_ptr<typename T::Stub> CreateServiceStub(const NPersQueue::TTestServer &server) { + std::shared_ptr<grpc::Channel> channel; + std::unique_ptr<typename T::Stub> stub; + + channel = grpc::CreateChannel("localhost:" + ToString(server.GrpcPort), grpc::InsecureChannelCredentials()); + stub = T::NewStub(channel); + + return stub; +} + +std::unique_ptr<Ydb::Topic::V1::TopicServiceTx::Stub> CreateTopicServiceTxStub(const NPersQueue::TTestServer &server) { + return CreateServiceStub<Ydb::Topic::V1::TopicServiceTx>(server); +} + +struct TOffsetRange { + ui64 Begin; + ui64 End; +}; + +struct TPartition { + ui64 Id; + TVector<TOffsetRange> Offsets; +}; + +struct TTopic { + TString Path; + TVector<TPartition> Partitions; +}; + +void AppendOffsetsRange(const TOffsetRange& r, google::protobuf::RepeatedPtrField<Ydb::Topic::OffsetsRange> *offsets) +{ + auto* range = offsets->Add(); + + range->set_start(r.Begin); + range->set_end(r.End); +} + +void AppendPartition(const TPartition& p, + google::protobuf::RepeatedPtrField<Ydb::Topic::AddOffsetsToTransactionRequest_TopicOffsets_PartitionOffsets> *partitions) +{ + auto* partition = partitions->Add(); + + partition->set_partition_id(p.Id); + + for (auto& r : p.Offsets) { + AppendOffsetsRange(r, partition->mutable_partition_offsets()); + } +} + +void AppendTopic(const TTopic &t, + google::protobuf::RepeatedPtrField<Ydb::Topic::AddOffsetsToTransactionRequest_TopicOffsets> *topics) +{ + auto* topic = topics->Add(); + + topic->set_path(t.Path); + + for (auto& p : t.Partitions) { + AppendPartition(p, topic->mutable_partitions()); + } +} + +Ydb::Topic::AddOffsetsToTransactionRequest CreateRequest(const TString& session_id, + const TString& tx_id, + const TVector<TTopic>& topics) +{ + Ydb::Topic::AddOffsetsToTransactionRequest request; + + request.set_session_id(session_id); + request.mutable_tx_control()->set_tx_id(tx_id); + + for (auto& t : topics) { + AppendTopic(t, request.mutable_topics()); + } + + return request; +} + +class TAddOffsetToTransactionFixture : public NUnitTest::TBaseFixture { +protected: + TMaybe<NPersQueue::TTestServer> server; + TMaybe<NYdb::NTable::TSession> session; + TMaybe<NYdb::NTable::TTransaction> tx; + std::unique_ptr<Ydb::Topic::V1::TopicServiceTx::Stub> stub; + + const TString DATABASE = "/Root"; + const TString TOPIC_PARENT = "/Root/PQ"; + + const TString VALID_TOPIC_NAME = "rt3.dc1--topic1"; + const TString VALID_SHORT_TOPIC_NAME = "topic1"; + const TString VALID_TOPIC_PATH = TOPIC_PARENT + "/" + VALID_TOPIC_NAME; + + const TString INVALID_TOPIC_NAME = VALID_TOPIC_NAME + "_2"; + const TString INVALID_SHORT_TOPIC_NAME = VALID_SHORT_TOPIC_NAME + "_2"; + const TString INVALID_TOPIC_PATH = TOPIC_PARENT + "/" + INVALID_TOPIC_NAME; + + const TString AUTH_TOKEN = "x-user-x@builtin"; + + void SetUp(NUnitTest::TTestContext&) override { + server = NPersQueue::TTestServer(false); + server->ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(true); + server->StartServer(); + server->EnableLogs({NKikimrServices::PQ_WRITE_PROXY + , NKikimrServices::PQ_READ_PROXY + , NKikimrServices::TX_PROXY_SCHEME_CACHE + , NKikimrServices::KQP_PROXY + , NKikimrServices::PERSQUEUE + , NKikimrServices::KQP_SESSION}, NActors::NLog::PRI_DEBUG); + + auto partsCount = 5u; + server->AnnoyingClient->CreateTopicNoLegacy(VALID_TOPIC_PATH, partsCount); + + NACLib::TDiffACL acl; + acl.AddAccess(NACLib::EAccessType::Allow, NACLib::DescribeSchema, AUTH_TOKEN); + acl.AddAccess(NACLib::EAccessType::Allow, NACLib::ReadAttributes, AUTH_TOKEN); + acl.AddAccess(NACLib::EAccessType::Allow, NACLib::WriteAttributes, AUTH_TOKEN); + server->AnnoyingClient->ModifyACL(TOPIC_PARENT, VALID_TOPIC_NAME, acl.SerializeAsString()); + + auto driverCfg = NYdb::TDriverConfig() + .SetEndpoint(TStringBuilder() << "localhost:" << server->GrpcPort) + .SetDatabase(DATABASE) + .SetAuthToken(AUTH_TOKEN); + + auto ydbDriver = std::make_shared<NYdb::TDriver>(driverCfg); + + session = CreateSession(*ydbDriver); + tx = BeginTransaction(*session); + + stub = CreateTopicServiceTxStub(*server); + } + + Ydb::Topic::AddOffsetsToTransactionResponse CallAddOffsetsToTransaction(const TVector<TTopic>& topics) { + grpc::ClientContext rcontext; + rcontext.AddMetadata("x-ydb-auth-ticket", AUTH_TOKEN); + rcontext.AddMetadata("x-ydb-database", DATABASE); + + Ydb::Topic::AddOffsetsToTransactionResponse response; + + grpc::Status status = stub->AddOffsetsToTransaction(&rcontext, + CreateRequest(session->GetId(), tx->GetId(), topics), + &response); + UNIT_ASSERT(status.ok()); + + return response; + } + + void TestTopicPaths(const TString& path1, const TString& path2) { + const auto PARTITION_ID = 1; + const auto BEGIN = 4; + const auto END = 7; + + auto response = CallAddOffsetsToTransaction({ + TTopic{.Path=path1, .Partitions={ + TPartition{.Id=PARTITION_ID, .Offsets={ + TOffsetRange{.Begin=BEGIN, .End=END} + }} + }} + }); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + + response = CallAddOffsetsToTransaction({ + TTopic{.Path=path2, .Partitions={ + TPartition{.Id=PARTITION_ID, .Offsets={ + TOffsetRange{.Begin=BEGIN, .End=END} + }} + }} + }); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_REQUEST); + } +}; + +Y_UNIT_TEST_F(TheRangesDoNotOverlap, TAddOffsetToTransactionFixture) { + Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({ + TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ + TPartition{.Id=4, .Offsets={ + TOffsetRange{.Begin=1, .End=3}, + TOffsetRange{.Begin=5, .End=8} + }}, + TPartition{.Id=1, .Offsets={ + TOffsetRange{.Begin=2, .End=6} + }} + }} + }); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + + response = CallAddOffsetsToTransaction({ + TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ + TPartition{.Id=1, .Offsets={ + TOffsetRange{.Begin=8, .End=11} + }} + }} + }); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); +} + +Y_UNIT_TEST_F(TheRangesOverlap, TAddOffsetToTransactionFixture) { + Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({ + TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ + TPartition{.Id=4, .Offsets={ + TOffsetRange{.Begin=1, .End=3}, + TOffsetRange{.Begin=5, .End=8} + }}, + TPartition{.Id=1, .Offsets={ + TOffsetRange{.Begin=2, .End=6} + }} + }} + }); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + + response = CallAddOffsetsToTransaction({ + TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ + TPartition{.Id=4, .Offsets={ + TOffsetRange{.Begin=4, .End=7} + }} + }} + }); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_REQUEST); +} + +Y_UNIT_TEST_F(UnknownTopic, TAddOffsetToTransactionFixture) { + auto response = CallAddOffsetsToTransaction({ + TTopic{.Path=INVALID_TOPIC_PATH, .Partitions={ + TPartition{.Id=4, .Offsets={ + TOffsetRange{.Begin=4, .End=7} + }} + }} + }); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SCHEME_ERROR); +} + +Y_UNIT_TEST_F(UseDoubleSlashInTopicPath, TAddOffsetToTransactionFixture) { + TestTopicPaths("//Root//PQ//rt3.dc1--topic1", "/Root/PQ/rt3.dc1--topic1"); +} + +Y_UNIT_TEST_F(RelativePath, TAddOffsetToTransactionFixture) { + TestTopicPaths("PQ/rt3.dc1--topic1", "/Root/PQ/rt3.dc1--topic1"); +} + +Y_UNIT_TEST_F(AccessRights, TAddOffsetToTransactionFixture) { + auto response = CallAddOffsetsToTransaction({ + TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ + TPartition{.Id=4, .Offsets={ + TOffsetRange{.Begin=4, .End=7} + }} + }} + }); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS); + + NACLib::TDiffACL acl; + acl.RemoveAccess(NACLib::EAccessType::Allow, NACLib::ReadAttributes, AUTH_TOKEN); + server->AnnoyingClient->ModifyACL(TOPIC_PARENT, VALID_TOPIC_NAME, acl.SerializeAsString()); + + response = CallAddOffsetsToTransaction({ + TTopic{.Path=VALID_TOPIC_PATH, .Partitions={ + TPartition{.Id=4, .Offsets={ + TOffsetRange{.Begin=14, .End=17} + }} + }} + }); + UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::UNAUTHORIZED); +} + +} + +} |