aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2022-09-01 10:29:01 +0300
committerabcdef <akotov@ydb.tech>2022-09-01 10:29:01 +0300
commitcc3dd66211358c885715bc6dc0ab24283d040dea (patch)
treeeabdfb65948b5c5e643eb2b591453cfcea0223c3
parent14291f2f6aea6fdce47fc32e6a278eb8970a4cbf (diff)
downloadydb-cc3dd66211358c885715bc6dc0ab24283d040dea.tar.gz
-rw-r--r--ydb/core/grpc_services/service_table.h2
-rw-r--r--ydb/core/kqp/common/CMakeLists.txt1
-rw-r--r--ydb/core/kqp/common/kqp_topic.cpp158
-rw-r--r--ydb/core/kqp/common/kqp_topic.h61
-rw-r--r--ydb/core/kqp/common/kqp_transform.cpp8
-rw-r--r--ydb/core/kqp/common/kqp_transform.h7
-rw-r--r--ydb/core/kqp/host/kqp_runner.cpp4
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp186
-rw-r--r--ydb/core/kqp/opt/kqp_opt_build_txs.cpp6
-rw-r--r--ydb/core/kqp/opt/logical/kqp_opt_log.cpp2
-rw-r--r--ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp4
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy.cpp2
-rw-r--r--ydb/core/protos/kqp.proto3
-rw-r--r--ydb/core/testlib/test_client.cpp1
-rw-r--r--ydb/public/api/grpc/draft/CMakeLists.txt1
-rw-r--r--ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto16
-rw-r--r--ydb/public/api/protos/ydb_topic.proto40
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp1
-rw-r--r--ydb/services/persqueue_v1/actors/CMakeLists.txt1
-rw-r--r--ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp103
-rw-r--r--ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.h42
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp1
-rw-r--r--ydb/services/persqueue_v1/topic.cpp60
-rw-r--r--ydb/services/persqueue_v1/topic.h29
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/topic_service_ut.cpp300
27 files changed, 1017 insertions, 24 deletions
diff --git a/ydb/core/grpc_services/service_table.h b/ydb/core/grpc_services/service_table.h
index adf8493daa..dd7dd90035 100644
--- a/ydb/core/grpc_services/service_table.h
+++ b/ydb/core/grpc_services/service_table.h
@@ -31,5 +31,7 @@ void DoDescribeTableOptionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFaci
void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
void DoExecuteScanQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&);
+void DoAddOffsetsToTransaction(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+
}
}
diff --git a/ydb/core/kqp/common/CMakeLists.txt b/ydb/core/kqp/common/CMakeLists.txt
index bda9e7d47c..f07836ba55 100644
--- a/ydb/core/kqp/common/CMakeLists.txt
+++ b/ydb/core/kqp/common/CMakeLists.txt
@@ -34,6 +34,7 @@ target_sources(core-kqp-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_yql.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_timeouts.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_lwtrace_probes.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_topic.cpp
)
generate_enum_serilization(core-kqp-common
${CMAKE_SOURCE_DIR}/ydb/core/kqp/common/kqp_tx_info.h
diff --git a/ydb/core/kqp/common/kqp_topic.cpp b/ydb/core/kqp/common/kqp_topic.cpp
new file mode 100644
index 0000000000..7aaaca9e75
--- /dev/null
+++ b/ydb/core/kqp/common/kqp_topic.cpp
@@ -0,0 +1,158 @@
+#include "kqp_topic.h"
+#include <ydb/core/base/path.h>
+
+#define LOG_D(msg) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_SESSION, msg)
+
+namespace NKikimr::NKqp::NTopic {
+
+//
+// TPartition
+//
+void TPartition::AddRange(const Ydb::Topic::OffsetsRange &range)
+{
+ AddRangeImpl(range.start(), range.end());
+}
+
+void TPartition::SetTabletId(ui64 value)
+{
+ TabletId_ = value;
+}
+
+void TPartition::Merge(const TPartition& rhs)
+{
+ for (auto& range : rhs.Offsets_) {
+ AddRangeImpl(range.first, range.second);
+ }
+}
+
+void TPartition::AddRangeImpl(ui64 begin, ui64 end)
+{
+ if (Offsets_.Intersects(begin, end)) {
+ ythrow TOffsetsRangeIntersectExpection() << "offset ranges intersect";
+ }
+
+ Offsets_.InsertInterval(begin, end);
+}
+
+//
+// TTopic
+//
+TPartition& TTopic::AddPartition(ui32 id)
+{
+ return Partitions_[id];
+}
+
+TPartition* TTopic::GetPartition(ui32 id)
+{
+ auto p = Partitions_.find(id);
+ if (p == Partitions_.end()) {
+ return nullptr;
+ }
+ return &p->second;
+}
+
+void TTopic::Merge(const TTopic& rhs)
+{
+ for (auto& [name, partition] : rhs.Partitions_) {
+ Partitions_[name].Merge(partition);
+ }
+}
+
+//
+// TOffsetsInfo
+//
+TTopic& TOffsetsInfo::AddTopic(const TString &path)
+{
+ return Topics_[path];
+}
+
+TTopic *TOffsetsInfo::GetTopic(const TString &path)
+{
+ auto p = Topics_.find(path);
+ if (p == Topics_.end()) {
+ return nullptr;
+ }
+ return &p->second;
+}
+
+void TOffsetsInfo::FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate) const
+{
+ for (auto& [topic, _] : Topics_) {
+ NSchemeCache::TSchemeCacheNavigate::TEntry entry;
+ entry.Path = NKikimr::SplitPath(topic);
+ entry.SyncVersion = true;
+ entry.ShowPrivatePath = true;
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
+
+ navigate.ResultSet.push_back(std::move(entry));
+ }
+}
+
+bool TOffsetsInfo::ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results,
+ Ydb::StatusIds_StatusCode& status,
+ TString& message)
+{
+ Y_VERIFY(results.size() == Topics_.size());
+
+ if (results.empty()) {
+ status = Ydb::StatusIds::BAD_REQUEST;
+ message = "empty request";
+ return false;
+ }
+
+ TStringBuilder builder;
+
+ for (auto& result : results) {
+ if (result.Kind != NSchemeCache::TSchemeCacheNavigate::KindTopic) {
+ builder << "Path '" << JoinPath(result.Path) << "' is not a topic";
+
+ status = Ydb::StatusIds::SCHEME_ERROR;
+ message = std::move(builder);
+
+ return false;
+ }
+
+ if (result.PQGroupInfo) {
+ const NKikimrSchemeOp::TPersQueueGroupDescription& description =
+ result.PQGroupInfo->Description;
+
+ TTopic* topic = GetTopic(CanonizePath(result.Path));
+
+ //
+ // TODO: если топика нет, то отправить сообщение с ошибкой SCHEME_ERROR
+ //
+ Y_VERIFY(topic != nullptr);
+
+ for (auto& p : description.GetPartitions()) {
+ if (auto partition = topic->GetPartition(p.GetPartitionId()); partition) {
+ LOG_D("topic=" << description.GetName()
+ << ", partition=" << p.GetPartitionId()
+ << ", tabletId=" << p.GetTabletId());
+
+ partition->SetTabletId(p.GetTabletId());
+ }
+ }
+ } else {
+ builder << "The '" << JoinPath(result.Path) << "' topic is missing";
+
+ status = Ydb::StatusIds::SCHEME_ERROR;
+ message = std::move(builder);
+
+ return false;
+ }
+ }
+
+ status = Ydb::StatusIds::SUCCESS;
+ message = "";
+
+ return true;
+}
+
+void TOffsetsInfo::Merge(const TOffsetsInfo& rhs)
+{
+ for (auto& [name, topic] : rhs.Topics_) {
+ Topics_[name].Merge(topic);
+ }
+}
+
+}
diff --git a/ydb/core/kqp/common/kqp_topic.h b/ydb/core/kqp/common/kqp_topic.h
new file mode 100644
index 0000000000..e319c441dc
--- /dev/null
+++ b/ydb/core/kqp/common/kqp_topic.h
@@ -0,0 +1,61 @@
+#pragma once
+
+#include <ydb/public/api/protos/ydb_status_codes.pb.h>
+#include <ydb/public/api/protos/ydb_topic.pb.h>
+
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+#include <util/system/types.h>
+
+#include <util/generic/fwd.h>
+#include <util/generic/hash.h>
+#include <util/generic/ylimits.h>
+
+#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h>
+
+#include <vector>
+
+namespace NKikimr::NKqp::NTopic {
+
+class TOffsetsRangeIntersectExpection : public yexception {
+};
+
+struct TPartition {
+ void AddRange(const Ydb::Topic::OffsetsRange &range);
+ void SetTabletId(ui64 value);
+
+ void Merge(const TPartition& rhs);
+
+private:
+ void AddRangeImpl(ui64 begin, ui64 end);
+
+ TDisjointIntervalTree<ui64> Offsets_;
+ ui64 TabletId_ = Max<ui64>();
+};
+
+struct TTopic {
+ TPartition& AddPartition(ui32 id);
+ TPartition* GetPartition(ui32 id);
+
+ void Merge(const TTopic& rhs);
+
+ THashMap<ui32, TPartition> Partitions_;
+};
+
+struct TOffsetsInfo {
+ TTopic& AddTopic(const TString &path);
+ TTopic* GetTopic(const TString &path);
+
+ void FillSchemeCacheNavigate(NSchemeCache::TSchemeCacheNavigate& navigate) const;
+ bool ProcessSchemeCacheNavigate(const NSchemeCache::TSchemeCacheNavigate::TResultSet& results,
+ Ydb::StatusIds_StatusCode& status,
+ TString& message);
+
+ void Merge(const TOffsetsInfo& rhs);
+
+ THashMap<TString, TTopic> Topics_;
+};
+
+}
diff --git a/ydb/core/kqp/common/kqp_transform.cpp b/ydb/core/kqp/common/kqp_transform.cpp
index d07f42f3b3..7db2407390 100644
--- a/ydb/core/kqp/common/kqp_transform.cpp
+++ b/ydb/core/kqp/common/kqp_transform.cpp
@@ -18,14 +18,14 @@ IGraphTransformer::TStatus TLogExprTransformer::operator()(const TExprNode::TPtr
return IGraphTransformer::TStatus::Ok;
}
-TAutoPtr<IGraphTransformer> TLogExprTransformer::Sync(const TString& description, NLog::EComponent component,
- NLog::ELevel level)
+TAutoPtr<IGraphTransformer> TLogExprTransformer::Sync(const TString& description, NYql::NLog::EComponent component,
+ NYql::NLog::ELevel level)
{
return CreateFunctorTransformer(TLogExprTransformer(description, component, level));
}
-void TLogExprTransformer::LogExpr(const TExprNode& input, TExprContext& ctx, const TString& description, NLog::EComponent component,
- NLog::ELevel level)
+void TLogExprTransformer::LogExpr(const TExprNode& input, TExprContext& ctx, const TString& description, NYql::NLog::EComponent component,
+ NYql::NLog::ELevel level)
{
YQL_CVLOG(level, component) << description << ":\n" << KqpExprToPrettyString(input, ctx);
}
diff --git a/ydb/core/kqp/common/kqp_transform.h b/ydb/core/kqp/common/kqp_transform.h
index 3e60200e46..93e85ab7b3 100644
--- a/ydb/core/kqp/common/kqp_transform.h
+++ b/ydb/core/kqp/common/kqp_transform.h
@@ -1,8 +1,10 @@
#pragma once
#include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h>
+
#include <ydb/core/kqp/common/kqp_gateway.h>
#include <ydb/core/kqp/common/kqp_tx_info.h>
+#include <ydb/core/kqp/common/kqp_topic.h>
#include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
@@ -239,6 +241,10 @@ public:
ForceNewEngineSettings.ForceNewEngineLevel = level;
}
+ void MergeTopicOffsets(const NTopic::TOffsetsInfo &offsets) {
+ TopicOffsets.Merge(offsets);
+ }
+
public:
struct TParamsState : public TThrRefBase {
TParamValueMap Values;
@@ -259,6 +265,7 @@ public:
TKqpTxLocks Locks;
TDeferredEffects DeferredEffects;
+ NTopic::TOffsetsInfo TopicOffsets;
TIntrusivePtr<TParamsState> ParamsState;
IKqpGateway::TKqpSnapshotHandle SnapshotHandle;
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp
index 0f6abe1d13..55ec3c395e 100644
--- a/ydb/core/kqp/host/kqp_runner.cpp
+++ b/ydb/core/kqp/host/kqp_runner.cpp
@@ -161,8 +161,8 @@ public:
KqlTypeAnnTransformer = CreateTypeAnnotationTransformer(CreateExtCallableTypeAnnotationTransformer(*typesCtx),
*typesCtx);
- auto logLevel = NLog::ELevel::TRACE;
- auto logComp = NLog::EComponent::ProviderKqp;
+ auto logLevel = NYql::NLog::ELevel::TRACE;
+ auto logComp = NYql::NLog::EComponent::ProviderKqp;
KqlOptimizeTransformer = TTransformationPipeline(typesCtx)
.AddServiceTransformers()
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index 0525d171e2..9aac53c984 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -16,11 +16,13 @@
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/cputime.h>
+#include <ydb/core/base/path.h>
#include <ydb/core/base/wilson.h>
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/sys_view/service/sysview_service.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/library/yql/utils/actor_log/log.h>
+#include <ydb/library/persqueue/topic_parser/topic_parser.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/event_pb.h>
@@ -91,6 +93,8 @@ struct TKqpQueryState {
TString TxId; // User tx
bool Commit = false;
+
+ NTopic::TOffsetsInfo Offsets;
};
struct TKqpCleanupCtx {
@@ -291,7 +295,7 @@ public:
}
}
- void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev) {
+ void HandleReady(TEvKqp::TEvQueryRequest::TPtr& ev, const NActors::TActorContext& ctx) {
ui64 proxyRequestId = ev->Cookie;
auto& event = ev->Get()->Record;
auto requestInfo = TKqpRequestInfo(event.GetTraceId(), event.GetRequest().GetSessionId());
@@ -392,6 +396,10 @@ public:
event.MutableRequest()->Swap(&QueryState->Request);
ForwardRequest(ev);
return;
+
+ case NKikimrKqp::QUERY_ACTION_TOPIC:
+ AddOffsetsToTransaction(ctx);
+ return;
}
StopIdleTimer();
@@ -399,6 +407,38 @@ public:
CompileQuery();
}
+ void AddOffsetsToTransaction(const NActors::TActorContext& ctx) {
+ YQL_ENSURE(QueryState);
+ auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
+
+ if (!PrepareQueryTransaction(requestInfo)) {
+ return;
+ }
+
+ QueryState->Offsets = NTopic::TOffsetsInfo();
+
+ for (auto& topic : QueryState->Request.topics()) {
+ auto path = CanonizePath(NPersQueue::GetFullTopicPath(ctx, QueryState->Request.GetDatabase(), topic.path()));
+ auto& t = QueryState->Offsets.AddTopic(path);
+
+ for (auto& partition : topic.partitions()) {
+ auto& p = t.AddPartition(partition.partition_id());
+
+ for (auto& range : partition.partition_offsets()) {
+ p.AddRange(range);
+ }
+ }
+ }
+
+ auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
+ navigate->DatabaseName = CanonizePath(QueryState->Request.GetDatabase());
+ QueryState->Offsets.FillSchemeCacheNavigate(*navigate);
+ navigate->UserToken = new NACLib::TUserToken(QueryState->UserToken);
+
+ Become(&TKqpSessionActor::TopicOpsState);
+ ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()));
+ }
+
void CompileQuery() {
YQL_ENSURE(QueryState);
auto& queryRequest = QueryState->Request;
@@ -638,10 +678,7 @@ public:
return {success, ctx.IssueManager.GetIssues()};
}
- bool PrepareQueryContext() {
- YQL_ENSURE(QueryState);
- auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
-
+ bool PrepareQueryTransaction(const TKqpRequestInfo& requestInfo) {
auto& queryRequest = QueryState->Request;
if (queryRequest.HasTxControl()) {
@@ -676,6 +713,17 @@ public:
QueryState->TxCtx->EffectiveIsolationLevel = NKikimrKqp::ISOLATION_LEVEL_UNDEFINED;
}
+ return true;
+ }
+
+ bool PrepareQueryContext() {
+ YQL_ENSURE(QueryState);
+ auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
+
+ if (!PrepareQueryTransaction(requestInfo)) {
+ return false;
+ }
+
auto& queryCtx = QueryState->QueryCtx;
queryCtx->TimeProvider = TAppData::TimeProvider;
queryCtx->RandomProvider = TAppData::RandomProvider;
@@ -688,6 +736,7 @@ public:
return false;
}
+ auto& queryRequest = QueryState->Request;
auto action = queryRequest.GetAction();
auto type = queryRequest.GetType();
@@ -1845,10 +1894,10 @@ public:
Cleanup(IsFatalError(ydbStatus));
}
- STATEFN(ReadyState) {
+ STFUNC(ReadyState) {
try {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvKqp::TEvQueryRequest, HandleReady);
+ HFunc(TEvKqp::TEvQueryRequest, HandleReady);
hFunc(TEvKqp::TEvPingSessionRequest, Handle);
hFunc(TEvKqp::TEvIdleTimeout, Handle);
@@ -1942,6 +1991,28 @@ public:
}
}
+ STATEFN(TopicOpsState) {
+ try {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvKqp::TEvQueryRequest, HandleTopicOps);
+
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, HandleTopicOps);
+
+ hFunc(TEvKqp::TEvPingSessionRequest, Handle);
+ hFunc(TEvKqp::TEvCloseSessionRequest, HandleTopicOps);
+ hFunc(TEvKqp::TEvInitiateSessionShutdown, Handle);
+ hFunc(TEvKqp::TEvContinueShutdown, Handle);
+ hFunc(TEvKqp::TEvIdleTimeout, HandleNoop);
+ default:
+ UnexpectedEvent("TopicOpsState", ev);
+ }
+ } catch (const TRequestFail& ex) {
+ ReplyQueryError(ex.RequestInfo, ex.Status, ex.what(), ex.Issues);
+ } catch (const yexception& ex) {
+ InternalError(ex.what());
+ }
+ }
+
private:
void UnexpectedEvent(const TString& state, TAutoPtr<NActors::IEventHandle>& ev) {
InternalError(TStringBuilder() << "TKqpSessionActor in state " << state << " recieve unexpected event " <<
@@ -1958,6 +2029,106 @@ private:
}
}
+ void HandleTopicOps(TEvKqp::TEvQueryRequest::TPtr& ev) {
+ ReplyBusy(ev);
+ }
+
+ void HandleTopicOps(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ TKqpRequestInfo requestInfo(QueryState->TraceId, SessionId);
+ NSchemeCache::TSchemeCacheNavigate* response = ev->Get()->Request.Get();
+
+ Ydb::StatusIds_StatusCode status;
+ TString message;
+
+ if (IsAccessDenied(*response, message)) {
+ ythrow TRequestFail(requestInfo, Ydb::StatusIds::UNAUTHORIZED) << message;
+ }
+ if (HasErrors(*response, message)) {
+ ythrow TRequestFail(requestInfo, Ydb::StatusIds::SCHEME_ERROR) << message;
+ }
+
+ QueryState->Offsets.ProcessSchemeCacheNavigate(response->ResultSet,
+ status,
+ message);
+ if (status != Ydb::StatusIds::SUCCESS) {
+ ythrow TRequestFail(requestInfo, status) << message;
+ }
+
+ if (!TryMergeTopicOffsets(QueryState->Offsets, message)) {
+ ythrow TRequestFail(requestInfo, Ydb::StatusIds::BAD_REQUEST) << message;
+ }
+
+ ReplySuccess();
+ }
+
+ bool IsAccessDenied(const NSchemeCache::TSchemeCacheNavigate& response,
+ TString& message)
+ {
+ bool denied = false;
+
+ TStringBuilder builder;
+ NACLib::TUserToken token(QueryState->UserToken);
+
+ builder << "Access for topic(s)";
+ for (auto& result : response.ResultSet) {
+ if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
+ continue;
+ }
+
+ auto rights = NACLib::EAccessRights::ReadAttributes | NACLib::EAccessRights::WriteAttributes;
+ if (result.SecurityObject && !result.SecurityObject->CheckAccess(rights, token)) {
+ builder << " '" << JoinPath(result.Path) << "'";
+ denied = true;
+ }
+ }
+
+ if (denied) {
+ builder << " is denied for subject '" << token.GetUserSID() << "'";
+
+ message = std::move(builder);
+ }
+
+ return denied;
+ }
+
+ bool HasErrors(const NSchemeCache::TSchemeCacheNavigate& response,
+ TString& message)
+ {
+ if (response.ErrorCount == 0) {
+ return false;
+ }
+
+ TStringBuilder builder;
+
+ builder << "Unable to navigate:";
+ for (const auto& result : response.ResultSet) {
+ if (result.Status != NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
+ builder << "path: '" << JoinPath(result.Path) << "' status: " << result.Status;
+ }
+ }
+ message = std::move(builder);
+
+ return true;
+ }
+
+ bool TryMergeTopicOffsets(const NTopic::TOffsetsInfo &offsets, TString& message) {
+ try {
+ YQL_ENSURE(QueryState);
+ QueryState->TxCtx->MergeTopicOffsets(offsets);
+ return true;
+ } catch (const NTopic::TOffsetsRangeIntersectExpection &ex) {
+ message = ex.what();
+ return false;
+ }
+ }
+
+ void HandleTopicOps(TEvKqp::TEvCloseSessionRequest::TPtr&) {
+ YQL_ENSURE(QueryState);
+ ReplyQueryError(TKqpRequestInfo(QueryState->TraceId, SessionId), Ydb::StatusIds::BAD_SESSION,
+ "Request cancelled due to explicit session close request");
+ Counters->ReportSessionActorClosedRequest(Settings.DbCounters);
+ }
+
private:
TActorId Owner;
TString SessionId;
@@ -1983,7 +2154,6 @@ private:
TActorId IdleTimerActorId;
ui32 IdleTimerId = 0;
std::optional<TSessionShutdownState> ShutdownState;
-
};
} // namespace
diff --git a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
index 711788e367..197c7dc774 100644
--- a/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
+++ b/ydb/core/kqp/opt/kqp_opt_build_txs.cpp
@@ -162,7 +162,7 @@ private:
static TMaybeNode<TExprList> BuildTxResults(const TKqlQueryResultList& results, TVector<TDqPhyStage>& stages,
TExprContext& ctx)
{
- if (NLog::YqlLogger().NeedToLog(NLog::EComponent::ProviderKqp, NLog::ELevel::TRACE)) {
+ if (NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE)) {
TStringBuilder sb;
sb << "-- BuildTxResults" << Endl;
sb << " results:" << Endl;
@@ -400,7 +400,7 @@ public:
DataTxTransformer = TTransformationPipeline(&typesCtx)
.AddServiceTransformers()
- .Add(TExprLogTransformer::Sync("TxOpt", NLog::EComponent::ProviderKqp, NLog::ELevel::TRACE), "TxOpt")
+ .Add(TExprLogTransformer::Sync("TxOpt", NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE), "TxOpt")
.Add(*TypeAnnTransformer, "TypeAnnotation")
.AddPostTypeAnnotation(/* forSubgraph */ true)
.Add(CreateKqpBuildPhyStagesTransformer(/* allowDependantConsumers */ false), "BuildPhysicalStages")
@@ -410,7 +410,7 @@ public:
ScanTxTransformer = TTransformationPipeline(&typesCtx)
.AddServiceTransformers()
- .Add(TExprLogTransformer::Sync("TxOpt", NLog::EComponent::ProviderKqp, NLog::ELevel::TRACE), "TxOpt")
+ .Add(TExprLogTransformer::Sync("TxOpt", NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE), "TxOpt")
.Add(*TypeAnnTransformer, "TypeAnnotation")
.AddPostTypeAnnotation(/* forSubgraph */ true)
.Add(CreateKqpBuildPhyStagesTransformer(config->SpillingEnabled()), "BuildPhysicalStages")
diff --git a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
index 61c8a5ddcf..5483697143 100644
--- a/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
+++ b/ydb/core/kqp/opt/logical/kqp_opt_log.cpp
@@ -19,7 +19,7 @@ class TKqpLogicalOptTransformer : public TOptimizeTransformerBase {
public:
TKqpLogicalOptTransformer(TTypeAnnotationContext& typesCtx, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx,
const TKikimrConfiguration::TPtr& config)
- : TOptimizeTransformerBase(nullptr, NLog::EComponent::ProviderKqp, {})
+ : TOptimizeTransformerBase(nullptr, NYql::NLog::EComponent::ProviderKqp, {})
, TypesCtx(typesCtx)
, KqpCtx(*kqpCtx)
, Config(config)
diff --git a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
index 5c1f1cabc0..65bae1cebe 100644
--- a/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
+++ b/ydb/core/kqp/opt/peephole/kqp_opt_peephole.cpp
@@ -28,7 +28,7 @@ using TStatus = IGraphTransformer::TStatus;
class TKqpPeepholeTransformer : public TOptimizeTransformerBase {
public:
TKqpPeepholeTransformer()
- : TOptimizeTransformerBase(nullptr, NLog::EComponent::ProviderKqp, {})
+ : TOptimizeTransformerBase(nullptr, NYql::NLog::EComponent::ProviderKqp, {})
{
#define HNDL(name) "KqpPeephole-"#name, Hndl(&TKqpPeepholeTransformer::name)
AddHandler(0, &TDqReplicate::Match, HNDL(RewriteReplicate));
@@ -260,7 +260,7 @@ public:
{
TxTransformer = TTransformationPipeline(&typesCtx)
.AddServiceTransformers()
- .Add(TLogExprTransformer::Sync("TxsPeephole", NLog::EComponent::ProviderKqp, NLog::ELevel::TRACE), "TxsPeephole")
+ .Add(TLogExprTransformer::Sync("TxsPeephole", NYql::NLog::EComponent::ProviderKqp, NYql::NLog::ELevel::TRACE), "TxsPeephole")
.Add(*TypeAnnTransformer, "TypeAnnotation")
.AddPostTypeAnnotation(/* forSubgraph */ true)
.Add(CreateKqpTxPeepholeTransformer(TypeAnnTransformer.Get(), typesCtx, config), "Peephole")
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
index 3619cf6fb5..223c276354 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy.cpp
@@ -21,7 +21,7 @@ using TStatus = IGraphTransformer::TStatus;
class TKqpPhysicalOptTransformer : public TOptimizeTransformerBase {
public:
TKqpPhysicalOptTransformer(TTypeAnnotationContext& typesCtx, const TIntrusivePtr<TKqpOptimizeContext>& kqpCtx)
- : TOptimizeTransformerBase(nullptr, NLog::EComponent::ProviderKqp, {})
+ : TOptimizeTransformerBase(nullptr, NYql::NLog::EComponent::ProviderKqp, {})
, TypesCtx(typesCtx)
, KqpCtx(*kqpCtx)
{
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index e0c220ad21..3a4e101ff5 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -10,6 +10,7 @@ import "ydb/public/api/protos/ydb_status_codes.proto";
import "ydb/public/api/protos/ydb_table.proto";
import "ydb/public/api/protos/ydb_value.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";
+import "ydb/public/api/protos/ydb_topic.proto";
import "ydb/core/protos/kqp_query_settings.proto";
import "ydb/library/mkql_proto/protos/minikql.proto";
import "ydb/library/yql/dq/actors/protos/dq_events.proto";
@@ -47,6 +48,7 @@ enum EQueryAction {
QUERY_ACTION_COMMIT_TX = 7;
QUERY_ACTION_ROLLBACK_TX = 8;
QUERY_ACTION_PARSE = 9;
+ QUERY_ACTION_TOPIC = 10;
};
enum EIsolationLevel {
@@ -93,6 +95,7 @@ message TQueryRequest {
reserved 19; // (deprecated) StatsMode
optional NYql.NDqProto.EDqStatsMode StatsMode = 20; // deprecated
optional Ydb.Table.QueryStatsCollection.Mode CollectStats = 21;
+ repeated Ydb.Topic.AddOffsetsToTransactionRequest.TopicOffsets Topics = 22;
}
message TKqpPathIdProto {
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index fb2d8cf6fd..1072532178 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -315,6 +315,7 @@ namespace Tests {
GRpcServer->AddService(new NGRpcService::TGRpcOperationService(system, counters, grpcRequestProxyId, true));
GRpcServer->AddService(new NGRpcService::V1::TGRpcPersQueueService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true));
GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicService(system, counters, NMsgBusProxy::CreatePersQueueMetaCacheV2Id(), grpcRequestProxyId, true));
+ GRpcServer->AddService(new NGRpcService::V1::TGRpcTopicServiceTx(system, counters, grpcRequestProxyId));
GRpcServer->AddService(new NGRpcService::TGRpcPQClusterDiscoveryService(system, counters, grpcRequestProxyId));
GRpcServer->AddService(new NKesus::TKesusGRpcService(system, counters, grpcRequestProxyId, true));
GRpcServer->AddService(new NGRpcService::TGRpcCmsService(system, counters, grpcRequestProxyId, true));
diff --git a/ydb/public/api/grpc/draft/CMakeLists.txt b/ydb/public/api/grpc/draft/CMakeLists.txt
index d95ff4a020..6da5beb110 100644
--- a/ydb/public/api/grpc/draft/CMakeLists.txt
+++ b/ydb/public/api/grpc/draft/CMakeLists.txt
@@ -27,6 +27,7 @@ target_proto_messages(api-grpc-draft PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_s3_internal_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto
+ ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto
)
target_proto_addincls(api-grpc-draft
./
diff --git a/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto b/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto
new file mode 100644
index 0000000000..120f32f500
--- /dev/null
+++ b/ydb/public/api/grpc/draft/ydb_topic_tx_v1.proto
@@ -0,0 +1,16 @@
+syntax = "proto3";
+option cc_enable_arenas = true;
+
+package Ydb.Topic.V1;
+
+option java_package = "com.yandex.ydb.topic.v1";
+
+import "ydb/public/api/protos/ydb_topic.proto";
+
+service TopicServiceTx {
+ // Add offsets to transaction
+ rpc AddOffsetsToTransaction(AddOffsetsToTransactionRequest) returns (AddOffsetsToTransactionResponse);
+}
+
+
+
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index 9b13a292e6..4501e3fee7 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -3,6 +3,7 @@ import "ydb/public/api/protos/ydb_operation.proto";
import "ydb/public/api/protos/ydb_scheme.proto";
import "ydb/public/api/protos/ydb_status_codes.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";
+import "ydb/public/api/protos/ydb_table.proto";
import "ydb/public/api/protos/annotations/validation.proto";
import "google/protobuf/duration.proto";
@@ -495,6 +496,45 @@ message StreamReadMessage {
}
}
+// Add offsets to transaction request sent from client to server.
+message AddOffsetsToTransactionRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+
+ // Session identifier from TableService.
+ string session_id = 2;
+
+ // Transaction identifier from TableService.
+ Ydb.Table.TransactionControl tx_control = 3;
+
+ // Ranges of offsets by topics.
+ repeated TopicOffsets topics = 4;
+
+ message TopicOffsets {
+ // Topic path.
+ string path = 1;
+
+ // Ranges of offsets by partitions.
+ repeated PartitionOffsets partitions = 2;
+
+ message PartitionOffsets {
+ // Partition identifier.
+ int64 partition_id = 1;
+
+ // List of offset ranges.
+ repeated OffsetsRange partition_offsets = 2;
+ }
+ }
+}
+
+// Add offsets to transaction response sent from server to client.
+message AddOffsetsToTransactionResponse {
+ // Result of request will be inside operation.
+ Ydb.Operations.Operation operation = 1;
+}
+
+// Add offsets to transaction result message that will be inside AddOffsetsToTransactionResponse.operation.
+message AddOffsetsToTransactionResult {
+}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Control messages
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp
index a5cfc8f32f..d16cb6e8b8 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp
@@ -479,6 +479,5 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
}, event);
}
-
}
} // namespace NYdb::NPersQueue::NTests
diff --git a/ydb/services/persqueue_v1/actors/CMakeLists.txt b/ydb/services/persqueue_v1/actors/CMakeLists.txt
index 39e69eccf9..255fa7270e 100644
--- a/ydb/services/persqueue_v1/actors/CMakeLists.txt
+++ b/ydb/services/persqueue_v1/actors/CMakeLists.txt
@@ -33,4 +33,5 @@ target_sources(services-persqueue_v1-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_init_auth_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/read_info_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/schema_actors.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp
)
diff --git a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp b/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp
new file mode 100644
index 0000000000..b6d1e4933b
--- /dev/null
+++ b/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.cpp
@@ -0,0 +1,103 @@
+#include "add_offsets_to_transaction_actor.h"
+
+namespace NKikimr::NGRpcService {
+
+TAddOffsetsToTransactionActor::TAddOffsetsToTransactionActor(IRequestOpCtx* request)
+ : TBase{request}
+{
+}
+
+void TAddOffsetsToTransactionActor::Bootstrap(const NActors::TActorContext& ctx)
+{
+ TBase::Bootstrap(ctx);
+ Become(&TAddOffsetsToTransactionActor::StateWork);
+ Proceed(ctx);
+}
+
+void TAddOffsetsToTransactionActor::Proceed(const NActors::TActorContext& ctx)
+{
+ const auto req = GetProtoRequest();
+
+ auto ev = MakeHolder<NKqp::TEvKqp::TEvQueryRequest>();
+ SetAuthToken(ev, *Request_);
+ SetDatabase(ev, *Request_);
+
+ NYql::TIssues issues;
+ if (CheckSession(req->session_id(), issues)) {
+ ev->Record.MutableRequest()->SetSessionId(req->session_id());
+ } else {
+ return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
+ }
+
+ //
+ // TODO: новый тип запроса. например, QUERY_TYPE_TOPIC
+ //
+ ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_UNDEFINED);
+ ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_TOPIC);
+
+ if (const auto traceId = Request_->GetTraceId(); traceId) {
+ ev->Record.SetTraceId(traceId.GetRef());
+ }
+
+ if (const auto requestType = Request_->GetRequestType(); requestType) {
+ ev->Record.SetRequestType(requestType.GetRef());
+ }
+
+ ev->Record.MutableRequest()->SetCancelAfterMs(GetCancelAfter().MilliSeconds());
+ ev->Record.MutableRequest()->SetTimeoutMs(GetOperationTimeout().MilliSeconds());
+
+ if (!req->has_tx_control()) {
+ NYql::TIssues issues;
+ issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Empty tx_control."));
+ return Reply(Ydb::StatusIds::BAD_REQUEST, issues, ctx);
+ }
+
+ //
+ // TODO: проверить комбинацию значений атрибутов tx_control
+ //
+
+ ev->Record.MutableRequest()->MutableTxControl()->CopyFrom(req->tx_control());
+
+ //
+ // скопировать информацию о смещениях
+ //
+ *ev->Record.MutableRequest()->MutableTopics() = req->Gettopics();
+
+ ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release());
+}
+
+void TAddOffsetsToTransactionActor::Handle(const NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx)
+{
+ const auto& record = ev->Get()->Record.GetRef();
+ SetCost(record.GetConsumedRu());
+ AddServerHintsIfAny(record);
+
+ if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
+ const auto& kqpResponse = record.GetResponse();
+ const auto& issueMessage = kqpResponse.GetQueryIssues();
+ auto queryResult = TEvAddOffsetsToTransactionRequest::AllocateResult<Ydb::Topic::AddOffsetsToTransactionResult>(Request_);
+
+ //
+ // TODO: сохранить результат
+ //
+
+ //
+ // TODO: статистика
+ //
+
+ //
+ // TODO: tx_meta
+ //
+
+ ReplyWithResult(Ydb::StatusIds::SUCCESS, issueMessage, *queryResult, ctx);
+ } else {
+ OnGenericQueryResponseError(record, ctx);
+ }
+}
+
+void DoAddOffsetsToTransaction(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &)
+{
+ TActivationContext::AsActorContext().Register(new TAddOffsetsToTransactionActor(p.release()));
+}
+
+}
diff --git a/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.h b/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.h
new file mode 100644
index 0000000000..ea8212668f
--- /dev/null
+++ b/ydb/services/persqueue_v1/actors/add_offsets_to_transaction_actor.h
@@ -0,0 +1,42 @@
+#pragma once
+
+#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/rpc_kqp_base.h>
+
+#include <ydb/core/kqp/kqp.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+#include <memory>
+
+namespace NKikimr::NGRpcService {
+
+using TEvAddOffsetsToTransactionRequest =
+ TGrpcRequestOperationCall<Ydb::Topic::AddOffsetsToTransactionRequest, Ydb::Topic::AddOffsetsToTransactionResponse>;
+
+class TAddOffsetsToTransactionActor : public TRpcKqpRequestActor<TAddOffsetsToTransactionActor, TEvAddOffsetsToTransactionRequest> {
+public:
+ using TBase = TRpcKqpRequestActor<TAddOffsetsToTransactionActor, TEvAddOffsetsToTransactionRequest>;
+ using TResult = Ydb::Topic::AddOffsetsToTransactionResult;
+
+ explicit TAddOffsetsToTransactionActor(IRequestOpCtx* msg);
+
+ void Bootstrap(const NActors::TActorContext& ctx);
+
+private:
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NKqp::TEvKqp::TEvQueryResponse, Handle);
+ default:
+ TBase::StateWork(ev, ctx);
+ break;
+ }
+ }
+
+ void Handle(const NKqp::TEvKqp::TEvQueryResponse::TPtr& ev, const NActors::TActorContext& ctx);
+
+ void Proceed(const NActors::TActorContext& ctx);
+};
+
+}
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index c46a80e129..0604c67749 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -4966,6 +4966,5 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
UNIT_ASSERT(partId == 1 || partId == 3);
}
}
-
}
}
diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp
index 8dd541760e..badaf32eb0 100644
--- a/ydb/services/persqueue_v1/topic.cpp
+++ b/ydb/services/persqueue_v1/topic.cpp
@@ -4,6 +4,7 @@
#include <ydb/core/base/counters.h>
#include <ydb/core/grpc_services/rpc_calls.h>
#include <ydb/core/grpc_services/grpc_helper.h>
+#include <ydb/core/grpc_services/service_table.h>
#include <ydb/core/tx/scheme_board/cache.h>
#include "grpc_pq_read.h"
@@ -121,11 +122,68 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
})
#undef ADD_REQUEST
+}
+void TGRpcTopicService::StopService() noexcept {
+ TGrpcServiceBase::StopService();
+}
+//
+// TGRpcTopicServiceTx
+//
+TGRpcTopicServiceTx::TGRpcTopicServiceTx(NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const NActors::TActorId& grpcRequestProxy)
+ : ActorSystem(system)
+ , Counters(counters)
+ , GRpcRequestProxy(grpcRequestProxy)
+{
}
-void TGRpcTopicService::StopService() noexcept {
+void TGRpcTopicServiceTx::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
+ CQ = cq;
+
+ if (ActorSystem->AppData<TAppData>()->PQConfig.GetEnabled()) {
+ SetupIncomingRequests(std::move(logger));
+ }
+}
+
+void TGRpcTopicServiceTx::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
+ Limiter = limiter;
+}
+
+bool TGRpcTopicServiceTx::IncRequest() {
+ return Limiter->Inc();
+}
+
+void TGRpcTopicServiceTx::DecRequest() {
+ Limiter->Dec();
+}
+
+void TGRpcTopicServiceTx::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
+ auto getCounterBlock = NKikimr::NGRpcService::CreateCounterCb(Counters, ActorSystem);
+
+#ifdef ADD_REQUEST_LIMIT
+#error ADD_REQUEST_LIMIT macro already defined
+#endif
+
+#define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE) \
+ MakeIntrusive<TGRpcRequest<Ydb::Topic::NAME##Request, Ydb::Topic::NAME##Response, TGRpcTopicServiceTx>> \
+ (this, this->GetService(), CQ, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem, ctx->GetPeer()); \
+ ActorSystem->Send(GRpcRequestProxy, \
+ new TGrpcRequestOperationCall<Ydb::Topic::NAME##Request, Ydb::Topic::NAME##Response> \
+ (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \
+ }, &Ydb::Topic::V1::TopicServiceTx::AsyncService::Request ## NAME, \
+ #NAME, logger, getCounterBlock("topic", #NAME))->Run();
+
+ ADD_REQUEST_LIMIT(AddOffsetsToTransaction, DoAddOffsetsToTransaction, Ru)
+
+#undef ADD_REQUEST_LIMIT
+}
+
+void TGRpcTopicServiceTx::StopService() noexcept {
TGrpcServiceBase::StopService();
}
diff --git a/ydb/services/persqueue_v1/topic.h b/ydb/services/persqueue_v1/topic.h
index a75f1b8375..ee5f8be7b1 100644
--- a/ydb/services/persqueue_v1/topic.h
+++ b/ydb/services/persqueue_v1/topic.h
@@ -3,6 +3,7 @@
#include <library/cpp/actors/core/actorsystem.h>
#include <ydb/public/api/grpc/ydb_topic_v1.grpc.pb.h>
+#include <ydb/public/api/grpc/draft/ydb_topic_tx_v1.grpc.pb.h>
#include <library/cpp/grpc/server/grpc_server.h>
@@ -35,6 +36,34 @@ private:
NActors::TActorId NewSchemeCache;
};
+class TGRpcTopicServiceTx
+ : public NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicServiceTx>
+{
+public:
+ TGRpcTopicServiceTx(NActors::TActorSystem* system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const NActors::TActorId& grpcRequestProxy);
+
+ void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
+ void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
+ void StopService() noexcept override;
+
+ using NGrpc::TGrpcServiceBase<Ydb::Topic::V1::TopicServiceTx>::GetService;
+
+ bool IncRequest();
+ void DecRequest();
+
+private:
+ void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
+
+ NActors::TActorSystem* ActorSystem;
+ grpc::ServerCompletionQueue* CQ = nullptr;
+
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters;
+ NGrpc::TGlobalLimiter* Limiter = nullptr;
+ NActors::TActorId GRpcRequestProxy;
+};
+
} // namespace V1
} // namespace NGRpcService
} // namespace NKikimr
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt
index 774adc10d7..a3f22ab07a 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin.txt
@@ -48,6 +48,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
)
add_test(
NAME
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt
index 0becc47bd6..cf4cd9f3a7 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux.txt
@@ -52,6 +52,7 @@ target_sources(ydb-services-persqueue_v1-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/persqueue_compat_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/rate_limiter_test_setup.cpp
${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/functions_executor_wrapper.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
)
add_test(
NAME
diff --git a/ydb/services/persqueue_v1/ut/topic_service_ut.cpp b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
new file mode 100644
index 0000000000..089a960073
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/topic_service_ut.cpp
@@ -0,0 +1,300 @@
+#include <ydb/public/api/grpc/draft/ydb_topic_tx_v1.grpc.pb.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
+
+#include <ydb/core/protos/services.pb.h>
+
+#include <util/stream/output.h>
+#include <util/string/builder.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NPersQueueTests {
+
+Y_UNIT_TEST_SUITE(TopicService) {
+
+NYdb::NTable::TSession CreateSession(NYdb::TDriver &driver) {
+ NYdb::NTable::TClientSettings settings;
+ NYdb::NTable::TTableClient client(driver, settings);
+
+ auto result = client.CreateSession().ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+
+ return result.GetSession();
+}
+
+NYdb::NTable::TTransaction BeginTransaction(NYdb::NTable::TSession &session) {
+ auto result = session.BeginTransaction().ExtractValueSync();
+ UNIT_ASSERT_EQUAL(result.IsTransportError(), false);
+
+ return result.GetTransaction();
+}
+
+template<class T>
+std::unique_ptr<typename T::Stub> CreateServiceStub(const NPersQueue::TTestServer &server) {
+ std::shared_ptr<grpc::Channel> channel;
+ std::unique_ptr<typename T::Stub> stub;
+
+ channel = grpc::CreateChannel("localhost:" + ToString(server.GrpcPort), grpc::InsecureChannelCredentials());
+ stub = T::NewStub(channel);
+
+ return stub;
+}
+
+std::unique_ptr<Ydb::Topic::V1::TopicServiceTx::Stub> CreateTopicServiceTxStub(const NPersQueue::TTestServer &server) {
+ return CreateServiceStub<Ydb::Topic::V1::TopicServiceTx>(server);
+}
+
+struct TOffsetRange {
+ ui64 Begin;
+ ui64 End;
+};
+
+struct TPartition {
+ ui64 Id;
+ TVector<TOffsetRange> Offsets;
+};
+
+struct TTopic {
+ TString Path;
+ TVector<TPartition> Partitions;
+};
+
+void AppendOffsetsRange(const TOffsetRange& r, google::protobuf::RepeatedPtrField<Ydb::Topic::OffsetsRange> *offsets)
+{
+ auto* range = offsets->Add();
+
+ range->set_start(r.Begin);
+ range->set_end(r.End);
+}
+
+void AppendPartition(const TPartition& p,
+ google::protobuf::RepeatedPtrField<Ydb::Topic::AddOffsetsToTransactionRequest_TopicOffsets_PartitionOffsets> *partitions)
+{
+ auto* partition = partitions->Add();
+
+ partition->set_partition_id(p.Id);
+
+ for (auto& r : p.Offsets) {
+ AppendOffsetsRange(r, partition->mutable_partition_offsets());
+ }
+}
+
+void AppendTopic(const TTopic &t,
+ google::protobuf::RepeatedPtrField<Ydb::Topic::AddOffsetsToTransactionRequest_TopicOffsets> *topics)
+{
+ auto* topic = topics->Add();
+
+ topic->set_path(t.Path);
+
+ for (auto& p : t.Partitions) {
+ AppendPartition(p, topic->mutable_partitions());
+ }
+}
+
+Ydb::Topic::AddOffsetsToTransactionRequest CreateRequest(const TString& session_id,
+ const TString& tx_id,
+ const TVector<TTopic>& topics)
+{
+ Ydb::Topic::AddOffsetsToTransactionRequest request;
+
+ request.set_session_id(session_id);
+ request.mutable_tx_control()->set_tx_id(tx_id);
+
+ for (auto& t : topics) {
+ AppendTopic(t, request.mutable_topics());
+ }
+
+ return request;
+}
+
+class TAddOffsetToTransactionFixture : public NUnitTest::TBaseFixture {
+protected:
+ TMaybe<NPersQueue::TTestServer> server;
+ TMaybe<NYdb::NTable::TSession> session;
+ TMaybe<NYdb::NTable::TTransaction> tx;
+ std::unique_ptr<Ydb::Topic::V1::TopicServiceTx::Stub> stub;
+
+ const TString DATABASE = "/Root";
+ const TString TOPIC_PARENT = "/Root/PQ";
+
+ const TString VALID_TOPIC_NAME = "rt3.dc1--topic1";
+ const TString VALID_SHORT_TOPIC_NAME = "topic1";
+ const TString VALID_TOPIC_PATH = TOPIC_PARENT + "/" + VALID_TOPIC_NAME;
+
+ const TString INVALID_TOPIC_NAME = VALID_TOPIC_NAME + "_2";
+ const TString INVALID_SHORT_TOPIC_NAME = VALID_SHORT_TOPIC_NAME + "_2";
+ const TString INVALID_TOPIC_PATH = TOPIC_PARENT + "/" + INVALID_TOPIC_NAME;
+
+ const TString AUTH_TOKEN = "x-user-x@builtin";
+
+ void SetUp(NUnitTest::TTestContext&) override {
+ server = NPersQueue::TTestServer(false);
+ server->ServerSettings.PQConfig.SetTopicsAreFirstClassCitizen(true);
+ server->StartServer();
+ server->EnableLogs({NKikimrServices::PQ_WRITE_PROXY
+ , NKikimrServices::PQ_READ_PROXY
+ , NKikimrServices::TX_PROXY_SCHEME_CACHE
+ , NKikimrServices::KQP_PROXY
+ , NKikimrServices::PERSQUEUE
+ , NKikimrServices::KQP_SESSION}, NActors::NLog::PRI_DEBUG);
+
+ auto partsCount = 5u;
+ server->AnnoyingClient->CreateTopicNoLegacy(VALID_TOPIC_PATH, partsCount);
+
+ NACLib::TDiffACL acl;
+ acl.AddAccess(NACLib::EAccessType::Allow, NACLib::DescribeSchema, AUTH_TOKEN);
+ acl.AddAccess(NACLib::EAccessType::Allow, NACLib::ReadAttributes, AUTH_TOKEN);
+ acl.AddAccess(NACLib::EAccessType::Allow, NACLib::WriteAttributes, AUTH_TOKEN);
+ server->AnnoyingClient->ModifyACL(TOPIC_PARENT, VALID_TOPIC_NAME, acl.SerializeAsString());
+
+ auto driverCfg = NYdb::TDriverConfig()
+ .SetEndpoint(TStringBuilder() << "localhost:" << server->GrpcPort)
+ .SetDatabase(DATABASE)
+ .SetAuthToken(AUTH_TOKEN);
+
+ auto ydbDriver = std::make_shared<NYdb::TDriver>(driverCfg);
+
+ session = CreateSession(*ydbDriver);
+ tx = BeginTransaction(*session);
+
+ stub = CreateTopicServiceTxStub(*server);
+ }
+
+ Ydb::Topic::AddOffsetsToTransactionResponse CallAddOffsetsToTransaction(const TVector<TTopic>& topics) {
+ grpc::ClientContext rcontext;
+ rcontext.AddMetadata("x-ydb-auth-ticket", AUTH_TOKEN);
+ rcontext.AddMetadata("x-ydb-database", DATABASE);
+
+ Ydb::Topic::AddOffsetsToTransactionResponse response;
+
+ grpc::Status status = stub->AddOffsetsToTransaction(&rcontext,
+ CreateRequest(session->GetId(), tx->GetId(), topics),
+ &response);
+ UNIT_ASSERT(status.ok());
+
+ return response;
+ }
+
+ void TestTopicPaths(const TString& path1, const TString& path2) {
+ const auto PARTITION_ID = 1;
+ const auto BEGIN = 4;
+ const auto END = 7;
+
+ auto response = CallAddOffsetsToTransaction({
+ TTopic{.Path=path1, .Partitions={
+ TPartition{.Id=PARTITION_ID, .Offsets={
+ TOffsetRange{.Begin=BEGIN, .End=END}
+ }}
+ }}
+ });
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+
+ response = CallAddOffsetsToTransaction({
+ TTopic{.Path=path2, .Partitions={
+ TPartition{.Id=PARTITION_ID, .Offsets={
+ TOffsetRange{.Begin=BEGIN, .End=END}
+ }}
+ }}
+ });
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_REQUEST);
+ }
+};
+
+Y_UNIT_TEST_F(TheRangesDoNotOverlap, TAddOffsetToTransactionFixture) {
+ Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({
+ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
+ TPartition{.Id=4, .Offsets={
+ TOffsetRange{.Begin=1, .End=3},
+ TOffsetRange{.Begin=5, .End=8}
+ }},
+ TPartition{.Id=1, .Offsets={
+ TOffsetRange{.Begin=2, .End=6}
+ }}
+ }}
+ });
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+
+ response = CallAddOffsetsToTransaction({
+ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
+ TPartition{.Id=1, .Offsets={
+ TOffsetRange{.Begin=8, .End=11}
+ }}
+ }}
+ });
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+}
+
+Y_UNIT_TEST_F(TheRangesOverlap, TAddOffsetToTransactionFixture) {
+ Ydb::Topic::AddOffsetsToTransactionResponse response = CallAddOffsetsToTransaction({
+ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
+ TPartition{.Id=4, .Offsets={
+ TOffsetRange{.Begin=1, .End=3},
+ TOffsetRange{.Begin=5, .End=8}
+ }},
+ TPartition{.Id=1, .Offsets={
+ TOffsetRange{.Begin=2, .End=6}
+ }}
+ }}
+ });
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+
+ response = CallAddOffsetsToTransaction({
+ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
+ TPartition{.Id=4, .Offsets={
+ TOffsetRange{.Begin=4, .End=7}
+ }}
+ }}
+ });
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::BAD_REQUEST);
+}
+
+Y_UNIT_TEST_F(UnknownTopic, TAddOffsetToTransactionFixture) {
+ auto response = CallAddOffsetsToTransaction({
+ TTopic{.Path=INVALID_TOPIC_PATH, .Partitions={
+ TPartition{.Id=4, .Offsets={
+ TOffsetRange{.Begin=4, .End=7}
+ }}
+ }}
+ });
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SCHEME_ERROR);
+}
+
+Y_UNIT_TEST_F(UseDoubleSlashInTopicPath, TAddOffsetToTransactionFixture) {
+ TestTopicPaths("//Root//PQ//rt3.dc1--topic1", "/Root/PQ/rt3.dc1--topic1");
+}
+
+Y_UNIT_TEST_F(RelativePath, TAddOffsetToTransactionFixture) {
+ TestTopicPaths("PQ/rt3.dc1--topic1", "/Root/PQ/rt3.dc1--topic1");
+}
+
+Y_UNIT_TEST_F(AccessRights, TAddOffsetToTransactionFixture) {
+ auto response = CallAddOffsetsToTransaction({
+ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
+ TPartition{.Id=4, .Offsets={
+ TOffsetRange{.Begin=4, .End=7}
+ }}
+ }}
+ });
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::SUCCESS);
+
+ NACLib::TDiffACL acl;
+ acl.RemoveAccess(NACLib::EAccessType::Allow, NACLib::ReadAttributes, AUTH_TOKEN);
+ server->AnnoyingClient->ModifyACL(TOPIC_PARENT, VALID_TOPIC_NAME, acl.SerializeAsString());
+
+ response = CallAddOffsetsToTransaction({
+ TTopic{.Path=VALID_TOPIC_PATH, .Partitions={
+ TPartition{.Id=4, .Offsets={
+ TOffsetRange{.Begin=14, .End=17}
+ }}
+ }}
+ });
+ UNIT_ASSERT_VALUES_EQUAL(response.operation().status(), Ydb::StatusIds::UNAUTHORIZED);
+}
+
+}
+
+}