aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorkomels <komels@ydb.tech>2023-07-24 15:28:58 +0300
committerkomels <komels@ydb.tech>2023-07-24 15:28:58 +0300
commitc3f6b5b8a4c8e976c40f36f3ae4e36011ed24ab2 (patch)
treeaeabb07660024aa08aa454f9cf2fc59da21f8ba9
parent477d878e00267a8534b445571b1b2e73ea1435b8 (diff)
downloadydb-c3f6b5b8a4c8e976c40f36f3ae4e36011ed24ab2.tar.gz
Describe topic with partlocations
Design doc: https://wiki.yandex-team.ru/users/komels/kafka-topicmeta-design/#implementacija
-rw-r--r--ydb/core/base/events.h3
-rw-r--r--ydb/core/base/tablet_pipe.h20
-rw-r--r--ydb/core/client/server/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/client/server/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/client/server/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/client/server/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/client/server/ic_nodes_cache_service.cpp116
-rw-r--r--ydb/core/client/server/ic_nodes_cache_service.h46
-rw-r--r--ydb/core/client/server/ya.make1
-rw-r--r--ydb/core/driver_lib/run/config.h1
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp18
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.h7
-rw-r--r--ydb/core/driver_lib/run/run.cpp4
-rw-r--r--ydb/core/grpc_services/base/base.h1
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp1
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy_handle_methods.h1
-rw-r--r--ydb/core/grpc_services/rpc_calls.h1
-rw-r--r--ydb/core/persqueue/events/global.h24
-rw-r--r--ydb/core/persqueue/read_balancer.cpp78
-rw-r--r--ydb/core/persqueue/read_balancer.h18
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/persqueue/ut/pqrb_describes_ut.cpp79
-rw-r--r--ydb/core/persqueue/ut/ya.make1
-rw-r--r--ydb/core/protos/config.proto1
-rw-r--r--ydb/core/protos/pqconfig.proto17
-rw-r--r--ydb/core/protos/tablet_pipe.proto1
-rw-r--r--ydb/core/quoter/ut_helpers.cpp25
-rw-r--r--ydb/core/tablet/tablet_pipe_client.cpp13
-rw-r--r--ydb/core/tablet/tablet_pipe_server.cpp18
-rw-r--r--ydb/core/tablet/tablet_sys.cpp2
-rw-r--r--ydb/core/testlib/test_client.cpp7
-rw-r--r--ydb/public/api/protos/ydb_topic.proto2
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp96
-rw-r--r--ydb/services/datastreams/put_records_actor.h22
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.h366
-rw-r--r--ydb/services/persqueue_v1/actors/events.h46
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.cpp560
-rw-r--r--ydb/services/persqueue_v1/actors/schema_actors.h192
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_schema.cpp10
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_schema.h2
-rw-r--r--ydb/services/persqueue_v1/topic.cpp3
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.darwin-x86_64.txt81
-rw-r--r--ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-aarch64.txt84
-rw-r--r--ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-x86_64.txt86
-rw-r--r--ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.txt17
-rw-r--r--ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.windows-x86_64.txt74
-rw-r--r--ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp299
-rw-r--r--ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp35
-rw-r--r--ydb/services/persqueue_v1/ut/describes_ut/ya.make29
-rw-r--r--ydb/services/persqueue_v1/ya.make1
57 files changed, 2202 insertions, 319 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index ce0ee341e9d..59686ca51a8 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -161,7 +161,8 @@ struct TKikimrEvents : TEvents {
ES_DISCOVERY,
ES_EXT_INDEX,
ES_CONVEYOR,
- ES_KQP_SCAN_EXCHANGE
+ ES_KQP_SCAN_EXCHANGE,
+ ES_IC_NODE_CACHE
};
};
diff --git a/ydb/core/base/tablet_pipe.h b/ydb/core/base/tablet_pipe.h
index 8eb63331a4e..d92d4a32ccf 100644
--- a/ydb/core/base/tablet_pipe.h
+++ b/ydb/core/base/tablet_pipe.h
@@ -54,7 +54,7 @@ namespace NKikimr {
struct TEvConnectResult : public TEventPB<TEvConnectResult, NKikimrTabletPipe::TEvConnectResult, EvConnectResult> {
TEvConnectResult() {}
- TEvConnectResult(NKikimrProto::EReplyStatus status, ui64 tabletId, const TActorId& clientId, const TActorId& serverId, bool leader)
+ TEvConnectResult(NKikimrProto::EReplyStatus status, ui64 tabletId, const TActorId& clientId, const TActorId& serverId, bool leader, ui32 generation)
{
Record.SetStatus(status);
Record.SetTabletId(tabletId);
@@ -62,6 +62,7 @@ namespace NKikimr {
ActorIdToProto(serverId, Record.MutableServerId());
Record.SetLeader(leader);
Record.SetSupportsDataInPayload(true);
+ Record.SetGeneration(generation);
}
};
@@ -120,13 +121,14 @@ namespace NKikimr {
};
struct TEvClientConnected : public TEventLocal<TEvClientConnected, EvClientConnected> {
- TEvClientConnected(ui64 tabletId, NKikimrProto::EReplyStatus status, const TActorId& clientId, const TActorId& serverId, bool leader, bool dead)
+ TEvClientConnected(ui64 tabletId, NKikimrProto::EReplyStatus status, const TActorId& clientId, const TActorId& serverId, bool leader, bool dead, ui64 generation)
: TabletId(tabletId)
, Status(status)
, ClientId(clientId)
, ServerId(serverId)
, Leader(leader)
, Dead(dead)
+ , Generation(generation)
{}
const ui64 TabletId;
@@ -135,6 +137,7 @@ namespace NKikimr {
const TActorId ServerId;
const bool Leader;
const bool Dead;
+ const ui64 Generation;
};
struct TEvServerConnected : public TEventLocal<TEvServerConnected, EvServerConnected> {
@@ -200,17 +203,20 @@ namespace NKikimr {
};
struct TEvActivate : public TEventLocal<TEvActivate, EvActivate> {
- TEvActivate(ui64 tabletId, const TActorId& ownerId, const TActorId& recipientId, bool leader)
+ TEvActivate(ui64 tabletId, const TActorId& ownerId, const TActorId& recipientId, bool leader, ui32 generation)
: TabletId(tabletId)
, OwnerId(ownerId)
, RecipientId(recipientId)
, Leader(leader)
+ , Generation(generation)
+
{}
const ui64 TabletId;
const TActorId OwnerId;
const TActorId RecipientId;
const bool Leader;
+ const ui32 Generation;
};
struct TEvShutdown : public TEventLocal<TEvShutdown, EvShutdown> {
@@ -287,8 +293,8 @@ namespace NKikimr {
// Creates and activated server, returns serverId.
// Created server will forward messages to the specified recipent.
- virtual TActorId Accept(TEvTabletPipe::TEvConnect::TPtr &ev,
- TActorIdentity owner, TActorId recipient, bool leader = true) = 0;
+ virtual TActorId Accept(TEvTabletPipe::TEvConnect::TPtr &ev, TActorIdentity owner, TActorId recipient,
+ bool leader = true, ui64 generation = 0) = 0;
// Rejects connect with an error.
virtual void Reject(TEvTabletPipe::TEvConnect::TPtr &ev, TActorIdentity owner, NKikimrProto::EReplyStatus status, bool leader = true) = 0;
@@ -305,7 +311,7 @@ namespace NKikimr {
// Activates all inactive servers, created by Enqueue.
// All activated servers will forward messages to the specified recipent.
- virtual void Activate(TActorIdentity owner, TActorId recipientId, bool leader = true) = 0;
+ virtual void Activate(TActorIdentity owner, TActorId recipientId, bool leader = true, ui64 generation = 0) = 0;
// Cleanup resources after reset
virtual void Erase(TEvTabletPipe::TEvServerDestroyed::TPtr &ev) = 0;
@@ -393,7 +399,7 @@ namespace NKikimr {
IActor* CreateServer(ui64 tabletId, const TActorId& clientId, const TActorId& interconnectSession, ui32 features, ui64 connectCookie);
// Promotes server actor to the active state.
- void ActivateServer(ui64 tabletId, TActorId serverId, TActorIdentity owner, TActorId recipientId, bool leader);
+ void ActivateServer(ui64 tabletId, TActorId serverId, TActorIdentity owner, TActorId recipientId, bool leader, ui64 generation = 0);
// Destroys server actor.
void CloseServer(TActorIdentity owner, TActorId serverId);
diff --git a/ydb/core/client/server/CMakeLists.darwin-x86_64.txt b/ydb/core/client/server/CMakeLists.darwin-x86_64.txt
index b620712f4dd..577a6ca7a50 100644
--- a/ydb/core/client/server/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/client/server/CMakeLists.darwin-x86_64.txt
@@ -95,4 +95,5 @@ target_sources(core-client-server PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/client/server/msgbus_server_whoami.cpp
${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_server.cpp
${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_proxy_status.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/client/server/ic_nodes_cache_service.cpp
)
diff --git a/ydb/core/client/server/CMakeLists.linux-aarch64.txt b/ydb/core/client/server/CMakeLists.linux-aarch64.txt
index 36a40015b50..4343eef3ec3 100644
--- a/ydb/core/client/server/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/client/server/CMakeLists.linux-aarch64.txt
@@ -96,4 +96,5 @@ target_sources(core-client-server PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/client/server/msgbus_server_whoami.cpp
${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_server.cpp
${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_proxy_status.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/client/server/ic_nodes_cache_service.cpp
)
diff --git a/ydb/core/client/server/CMakeLists.linux-x86_64.txt b/ydb/core/client/server/CMakeLists.linux-x86_64.txt
index 36a40015b50..4343eef3ec3 100644
--- a/ydb/core/client/server/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/client/server/CMakeLists.linux-x86_64.txt
@@ -96,4 +96,5 @@ target_sources(core-client-server PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/client/server/msgbus_server_whoami.cpp
${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_server.cpp
${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_proxy_status.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/client/server/ic_nodes_cache_service.cpp
)
diff --git a/ydb/core/client/server/CMakeLists.windows-x86_64.txt b/ydb/core/client/server/CMakeLists.windows-x86_64.txt
index b620712f4dd..577a6ca7a50 100644
--- a/ydb/core/client/server/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/client/server/CMakeLists.windows-x86_64.txt
@@ -95,4 +95,5 @@ target_sources(core-client-server PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/client/server/msgbus_server_whoami.cpp
${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_server.cpp
${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_proxy_status.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/client/server/ic_nodes_cache_service.cpp
)
diff --git a/ydb/core/client/server/ic_nodes_cache_service.cpp b/ydb/core/client/server/ic_nodes_cache_service.cpp
new file mode 100644
index 00000000000..fadff0700ee
--- /dev/null
+++ b/ydb/core/client/server/ic_nodes_cache_service.cpp
@@ -0,0 +1,116 @@
+#include "ic_nodes_cache_service.h"
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+
+
+namespace NKikimr::NIcNodeCache {
+using namespace NActors;
+
+class TIcNodeCacheServiceActor : public TActorBootstrapped<TIcNodeCacheServiceActor> {
+ using TBase = TActorBootstrapped<TIcNodeCacheServiceActor>;
+public:
+ TIcNodeCacheServiceActor(TIcNodeCacheServiceActor&&) = default;
+ TIcNodeCacheServiceActor& operator=(TIcNodeCacheServiceActor&&) = default;
+
+ TIcNodeCacheServiceActor(const ::NMonitoring::TDynamicCounterPtr& counters,
+ const TDuration& cacheUpdateInterval)
+ : Counters(counters)
+ , CacheUpdateInterval(cacheUpdateInterval)
+ {}
+
+
+private:
+ void RequestNodes() {
+ if (InfoRequested)
+ return;
+
+ const static TActorId nameserviceId = GetNameserviceActorId();
+ InfoRequested = true;
+ this->ActorContext().Send(nameserviceId, new TEvInterconnect::TEvListNodes());
+ }
+
+ void HandleNodesInfo(TEvInterconnect::TEvNodesInfo::TPtr& ev) {
+ Y_VERIFY(InfoRequested);
+ InfoRequested = false;
+ NodesCache.reset(new TNodeInfoVector(ev->Get()->Nodes));
+ NextUpdate = TActivationContext::Now() + CacheUpdateInterval;
+ NodeIdsMapping.reset(new THashMap<ui64, ui64>());
+
+ RespondToAllWaiters();
+ }
+
+ void RespondToAllWaiters() {
+ while (!WaitersOnInit.empty()) {
+ RespondNodesInfo(WaitersOnInit.front());
+ WaitersOnInit.pop_front();
+ }
+ }
+
+ void BuildNodesMap() {
+ Y_VERIFY_DEBUG(NodeIdsMapping->empty());
+ for (auto i = 0u; i < NodesCache->size(); i++) {
+ auto res = NodeIdsMapping->insert(
+ std::make_pair((*NodesCache)[i].NodeId, i)
+ );
+ Y_VERIFY_DEBUG(res.second);
+ }
+ }
+
+ void HandleNodesRequest(TEvICNodesInfoCache::TEvGetAllNodesInfoRequest::TPtr& ev) {
+ if (ActorContext().Now() > NextUpdate) {
+ RequestNodes();
+ }
+
+ if(!NodesCache) {
+ WaitersOnInit.emplace_back(ev->Sender);
+ return;
+ } else {
+ RespondNodesInfo(ev->Sender);
+ }
+ }
+
+ void RespondNodesInfo(const TActorId& recipient) {
+ if (NodeIdsMapping->empty()) {
+ BuildNodesMap();
+ }
+ auto* response = new TEvICNodesInfoCache::TEvGetAllNodesInfoResponse(NodesCache, NodeIdsMapping);
+ ActorContext().Send(recipient, response);
+ }
+
+ void HandleWakeup() {
+ if (TActivationContext::Now() > NextUpdate) {
+ RequestNodes();
+ }
+ ActorContext().Schedule(CacheUpdateInterval / 2, new TEvents::TEvWakeup());
+ }
+
+public:
+ void Bootstrap(const TActorContext&) {
+ Become(&TIcNodeCacheServiceActor::StateFunc);
+ HandleWakeup();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvInterconnect::TEvNodesInfo, HandleNodesInfo)
+ sFunc(TEvents::TEvWakeup, HandleWakeup)
+ hFunc(TEvICNodesInfoCache::TEvGetAllNodesInfoRequest, HandleNodesRequest)
+ )
+
+private:
+ TNodeInfoVectorPtr NodesCache;
+ std::shared_ptr<THashMap<ui64, ui64>> NodeIdsMapping;
+ bool InfoRequested = false;
+ ::NMonitoring::TDynamicCounterPtr Counters;
+ TDuration CacheUpdateInterval;
+ TInstant NextUpdate = TInstant::Zero();
+
+ TDeque<TActorId> WaitersOnInit;
+
+};
+
+NActors::IActor* CreateICNodesInfoCacheService(
+ const ::NMonitoring::TDynamicCounterPtr& counters, const TDuration& cacheUpdateTimeout
+) {
+ return new TIcNodeCacheServiceActor(counters, cacheUpdateTimeout);
+}
+} // NKikimr::NIcNodeCache \ No newline at end of file
diff --git a/ydb/core/client/server/ic_nodes_cache_service.h b/ydb/core/client/server/ic_nodes_cache_service.h
new file mode 100644
index 00000000000..f277ae95dbd
--- /dev/null
+++ b/ydb/core/client/server/ic_nodes_cache_service.h
@@ -0,0 +1,46 @@
+#pragma once
+
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/interconnect/interconnect.h>
+#include <library/cpp/actors/core/event_local.h>
+#include <ydb/core/base/events.h>
+
+namespace NKikimr::NIcNodeCache {
+
+NActors::IActor* CreateICNodesInfoCacheService(const ::NMonitoring::TDynamicCounterPtr& counters,
+ const TDuration& cacheUpdateTimeout = TDuration::Seconds(10));
+
+inline NActors::TActorId CreateICNodesInfoCacheServiceId() {
+ return NActors::TActorId(0, "ICNodeCache");
+}
+
+using TNodeInfoVector = TVector<NActors::TEvInterconnect::TNodeInfo>;
+using TNodeInfoVectorPtr = std::shared_ptr<TNodeInfoVector>;
+
+struct TEvICNodesInfoCache {
+ enum EEv {
+ EvWakeup = EventSpaceBegin(TKikimrEvents::ES_IC_NODE_CACHE),
+ EvGetAllNodesInfoRequest,
+ EvGetAllNodesInfoResponse,
+ EvEnd
+ };
+
+ static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_IC_NODE_CACHE),
+ "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_IC_NODE_CACHE)");
+
+
+ struct TEvGetAllNodesInfoRequest : public NActors::TEventLocal<TEvGetAllNodesInfoRequest, EvGetAllNodesInfoRequest> {
+ };
+
+ struct TEvGetAllNodesInfoResponse : public NActors::TEventLocal<TEvGetAllNodesInfoResponse, EvGetAllNodesInfoResponse> {
+ TNodeInfoVectorPtr Nodes;
+ std::shared_ptr<THashMap<ui64, ui64>> NodeIdsMapping;
+
+
+ TEvGetAllNodesInfoResponse(const TNodeInfoVectorPtr& nodesInfo, const std::shared_ptr<THashMap<ui64, ui64>>& nodeIdsMapping)
+ : Nodes(nodesInfo)
+ , NodeIdsMapping(nodeIdsMapping)
+ {}
+ };
+};
+} // NKikimr::NIcNodeCache \ No newline at end of file
diff --git a/ydb/core/client/server/ya.make b/ydb/core/client/server/ya.make
index b02436d2863..01aa4ae53ac 100644
--- a/ydb/core/client/server/ya.make
+++ b/ydb/core/client/server/ya.make
@@ -56,6 +56,7 @@ SRCS(
grpc_server.h
grpc_proxy_status.h
grpc_proxy_status.cpp
+ ic_nodes_cache_service.cpp
)
PEERDIR(
diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h
index 75bfa640c46..1081f524793 100644
--- a/ydb/core/driver_lib/run/config.h
+++ b/ydb/core/driver_lib/run/config.h
@@ -74,6 +74,7 @@ union TBasicKikimrServicesMask {
bool EnableCompConveyor : 1;
bool EnableLocalPgWire:1;
bool EnableKafkaProxy:1;
+ bool EnableIcNodeCacheService:1;
};
ui64 Raw;
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 6407a597df2..c790d982684 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -31,6 +31,7 @@
#include <ydb/core/client/minikql_compile/mkql_compile_service.h>
#include <ydb/core/client/server/grpc_proxy_status.h>
#include <ydb/core/client/server/msgbus_server_pq_metacache.h>
+#include <ydb/core/client/server/ic_nodes_cache_service.h>
#include <ydb/core/client/server/msgbus_server_tracer.h>
#include <ydb/core/cms/cms.h>
@@ -2627,5 +2628,22 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu
}
}
+
+TIcNodeCacheServiceInitializer::TIcNodeCacheServiceInitializer(const TKikimrRunConfig& runConfig)
+ : IKikimrServicesInitializer(runConfig)
+{
+}
+
+void TIcNodeCacheServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
+ if (appData->FeatureFlags.GetEnableIcNodeCache()) {
+ setup->LocalServices.emplace_back(
+ TActorId(),
+ TActorSetupCmd(NIcNodeCache::CreateICNodesInfoCacheService(appData->Counters),
+ TMailboxType::HTSwap, appData->UserPoolId)
+ );
+ }
+}
+
+
} // namespace NKikimrServicesInitializers
} // namespace NKikimr
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h
index 1b97906e253..cef99ac7088 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.h
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h
@@ -578,5 +578,12 @@ public:
void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};
+class TIcNodeCacheServiceInitializer : public IKikimrServicesInitializer {
+public:
+ TIcNodeCacheServiceInitializer(const TKikimrRunConfig& runConfig);
+
+ void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
+};
+
} // namespace NKikimrServicesInitializers
} // namespace NKikimr
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 0bdb30a16f3..20263507e75 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -1459,6 +1459,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TPersQueueClusterTrackerInitializer(runConfig));
}
+ if (serviceMask.EnableIcNodeCacheService) {
+ sil->AddServiceInitializer(new TIcNodeCacheServiceInitializer(runConfig));
+ }
+
if (BusServer && serviceMask.EnableMessageBusServices) {
sil->AddServiceInitializer(new TMessageBusServicesInitializer(runConfig, *BusServer));
}
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 7f3028da4e5..ac66702ed4d 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -137,6 +137,7 @@ struct TRpcServices {
EvAlterTopic,
EvDescribeTopic,
EvDescribeConsumer,
+ EvDescribePartition,
EvGetDiskSpaceUsage,
EvStopServingDatabase,
EvCoordinationSession,
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index 4b99b65dbf3..e10d52aaf60 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -570,6 +570,7 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev) {
HFunc(TEvAlterTopicRequest, PreHandle);
HFunc(TEvDescribeTopicRequest, PreHandle);
HFunc(TEvDescribeConsumerRequest, PreHandle);
+ HFunc(TEvDescribePartitionRequest, PreHandle);
HFunc(TEvNodeCheckRequest, PreHandle);
HFunc(TEvProxyRuntimeEvent, PreHandle);
diff --git a/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h b/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h
index 000c9050312..d8fe632d1cb 100644
--- a/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h
+++ b/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h
@@ -29,6 +29,7 @@ protected:
static void Handle(TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
static void Handle(TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx);
+ static void Handle(TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx);
};
}
diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h
index cb2fcc48894..cbeeaec420d 100644
--- a/ydb/core/grpc_services/rpc_calls.h
+++ b/ydb/core/grpc_services/rpc_calls.h
@@ -81,6 +81,7 @@ using TEvCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvCrea
using TEvAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvAlterTopic, Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse, true, TRateLimiterMode::Rps>;
using TEvDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeTopic, Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse, true, TRateLimiterMode::Rps>;
using TEvDescribeConsumerRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeConsumer, Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse, true, TRateLimiterMode::Rps>;
+using TEvDescribePartitionRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribePartition, Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse, true, TRateLimiterMode::Rps>;
using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>;
using TEvListFederationDatabasesRequest = TGRpcRequestWrapper<TRpcServices::EvListFederationDatabases, Ydb::FederationDiscovery::ListFederationDatabasesRequest, Ydb::FederationDiscovery::ListFederationDatabasesResponse, true>;
diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h
index 9b0e97ef120..e7b2a53c9db 100644
--- a/ydb/core/persqueue/events/global.h
+++ b/ydb/core/persqueue/events/global.h
@@ -47,6 +47,8 @@ struct TEvPersQueue {
EvProposeTransactionResult,
EvCancelTransactionProposal,
EvPeriodicTopicStats,
+ EvGetPartitionsLocation,
+ EvGetPartitionsLocationResponse,
EvResponse = EvRequest + 256,
EvInternalEvents = EvResponse + 256,
EvEnd
@@ -83,11 +85,18 @@ struct TEvPersQueue {
struct TEvGetReadSessionsInfo: public TEventPB<TEvGetReadSessionsInfo,
NKikimrPQ::TGetReadSessionsInfo, EvGetReadSessionsInfo> {
- TEvGetReadSessionsInfo(const TString& consumer = "") {
+ explicit TEvGetReadSessionsInfo(const TString& consumer = "") {
if (!consumer.empty()) {
Record.SetClientId(consumer);
}
}
+ explicit TEvGetReadSessionsInfo(const TVector<ui32>& partitions) {
+ if (!partitions.empty()) {
+ for (auto p: partitions) {
+ Record.AddPartitions(p);
+ }
+ }
+ }
};
struct TEvReadSessionsInfoResponse: public TEventPB<TEvReadSessionsInfoResponse,
@@ -95,6 +104,19 @@ struct TEvPersQueue {
TEvReadSessionsInfoResponse() {}
};
+ struct TEvGetPartitionsLocation: public TEventPB<TEvGetPartitionsLocation,
+ NKikimrPQ::TGetPartitionsLocation, EvGetPartitionsLocation> {
+ TEvGetPartitionsLocation(const TVector<ui64>& partitionIds = {}) {
+ for (const auto& p : partitionIds) {
+ Record.AddPartitions(p);
+ }
+ }
+ };
+
+ struct TEvGetPartitionsLocationResponse: public TEventPB<TEvGetPartitionsLocationResponse,
+ NKikimrPQ::TPartitionsLocationResponse, EvGetPartitionsLocationResponse> {
+ };
+
struct TEvLockPartition : public TEventPB<TEvLockPartition,
NKikimrPQ::TLockPartition, EvLockPartition> {
TEvLockPartition() {}
diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp
index 52e4b074dda..d05a169ab63 100644
--- a/ydb/core/persqueue/read_balancer.cpp
+++ b/ydb/core/persqueue/read_balancer.cpp
@@ -663,24 +663,33 @@ void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr&
void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx)
{
- RestartPipe(ev->Get()->TabletId, ctx);
+ ClosePipe(ev->Get()->TabletId, ctx);
RequestTabletIfNeeded(ev->Get()->TabletId, ctx);
}
void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx)
{
+ auto tabletId = ev->Get()->TabletId;
+ PipesRequested.erase(tabletId);
+
if (ev->Get()->Status != NKikimrProto::OK) {
- RestartPipe(ev->Get()->TabletId, ctx);
+ ClosePipe(ev->Get()->TabletId, ctx);
RequestTabletIfNeeded(ev->Get()->TabletId, ctx);
+ } else {
+ auto it = TabletPipes.find(tabletId);
+ if (!it.IsEnd()) {
+ it->second.Generation = ev->Get()->Generation;
+ it->second.NodeId = ev->Get()->ServerId.NodeId();
+ }
}
}
-void TPersQueueReadBalancer::RestartPipe(const ui64 tabletId, const TActorContext& ctx)
+void TPersQueueReadBalancer::ClosePipe(const ui64 tabletId, const TActorContext& ctx)
{
auto it = TabletPipes.find(tabletId);
if (it != TabletPipes.end()) {
- NTabletPipe::CloseClient(ctx, it->second);
+ NTabletPipe::CloseClient(ctx, it->second.PipeActor);
TabletPipes.erase(it);
}
}
@@ -692,9 +701,11 @@ TActorId TPersQueueReadBalancer::GetPipeClient(const ui64 tabletId, const TActor
if (it == TabletPipes.end()) {
NTabletPipe::TClientConfig clientConfig;
pipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig));
- TabletPipes[tabletId] = pipeClient;
+ TabletPipes[tabletId].PipeActor = pipeClient;
+ auto res = PipesRequested.insert(tabletId);
+ Y_VERIFY(res.second);
} else {
- pipeClient = it->second;
+ pipeClient = it->second.PipeActor;
}
return pipeClient;
@@ -1163,11 +1174,17 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr&
auto it = ClientsInfo.find(record.GetClientId());
THolder<TEvPersQueue::TEvReadSessionsInfoResponse> response(new TEvPersQueue::TEvReadSessionsInfoResponse());
+ THashSet<ui32> partitionsRequested;
+ for (auto p : record.GetPartitions()) {
+ partitionsRequested.insert(p);
+ }
response->Record.SetTabletId(TabletID());
if (it != ClientsInfo.end()) {
for (auto& c : it->second.ClientGroupsInfo) {
for (auto& p : c.second.PartitionsInfo) {
+ if (partitionsRequested && !partitionsRequested.contains(p.first))
+ continue;
auto pi = response->Record.AddPartitionInfo();
pi->SetPartition(p.first);
if (p.second.State == EPS_ACTIVE) {
@@ -1305,6 +1322,55 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvPartitionReleased::TPtr& ev
cit->second.ScheduleBalance(ctx);
}
+void TPersQueueReadBalancer::HandleOnInit(TEvPersQueue::TEvGetPartitionsLocation::TPtr& ev, const TActorContext& ctx) {
+ auto* evResponse = new TEvPersQueue::TEvGetPartitionsLocationResponse();
+ evResponse->Record.SetStatus(false);
+ ctx.Send(ev->Sender, evResponse);
+}
+
+
+void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetPartitionsLocation::TPtr& ev, const TActorContext& ctx) {
+ auto* evResponse = new TEvPersQueue::TEvGetPartitionsLocationResponse();
+ const auto& request = ev->Get()->Record;
+ auto addPartitionToResponse = [&](ui64 partitionId, ui64 tabletId) {
+ auto* pResponse = evResponse->Record.AddLocations();
+ pResponse->SetPartitionId(partitionId);
+ if (PipesRequested.contains(tabletId)) {
+ return false;
+ }
+ auto iter = TabletPipes.find(tabletId);
+ if (iter.IsEnd()) {
+ GetPipeClient(tabletId, ctx);
+ return false;
+ }
+ pResponse->SetNodeId(iter->second.NodeId.GetRef());
+ pResponse->SetGeneration(iter->second.Generation.GetRef());
+ return true;
+ };
+ auto sendResponse = [&](bool status) {
+ evResponse->Record.SetStatus(status);
+ ctx.Send(ev->Sender, evResponse);
+ };
+ bool ok = true;
+ if (request.PartitionsSize() == 0) {
+ if (!PipesRequested.empty() || TabletPipes.size() < TabletsInfo.size()) {
+ // Do not have all pipes connected.
+ return sendResponse(false);
+ }
+ for (const auto& [partitionId, partitionInfo] : PartitionsInfo) {
+ ok = addPartitionToResponse(partitionId, partitionInfo.TabletId) && ok;
+ }
+ } else {
+ for (const auto& partitionInRequest : request.GetPartitions()) {
+ auto partitionInfoIter = PartitionsInfo.find(partitionInRequest);
+ if (partitionInfoIter.IsEnd()) {
+ return sendResponse(false);
+ }
+ ok = addPartitionToResponse(partitionInRequest, partitionInfoIter->second.TabletId) && ok;
+ }
+ }
+ return sendResponse(ok);
+}
void TPersQueueReadBalancer::RebuildStructs() {
diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h
index 2813634fa25..33b3b465b30 100644
--- a/ydb/core/persqueue/read_balancer.h
+++ b/ydb/core/persqueue/read_balancer.h
@@ -201,7 +201,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
StopWatchingSubDomainPathId();
for (auto& pipe : TabletPipes) {
- NTabletPipe::CloseClient(ctx, pipe.second);
+ NTabletPipe::CloseClient(ctx, pipe.second.PipeActor);
}
TabletPipes.clear();
TActor<TPersQueueReadBalancer>::Die(ctx);
@@ -276,6 +276,9 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
void HandleOnInit(TEvPersQueue::TEvRegisterReadSession::TPtr &ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvRegisterReadSession::TPtr &ev, const TActorContext& ctx);
+ void HandleOnInit(TEvPersQueue::TEvGetPartitionsLocation::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPersQueue::TEvGetPartitionsLocation::TPtr& ev, const TActorContext& ctx);
+
void Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr &ev, const TActorContext& ctx);
void Handle(TEvPersQueue::TEvCheckACL::TPtr&, const TActorContext&);
void Handle(TEvPersQueue::TEvGetPartitionIdForWrite::TPtr&, const TActorContext&);
@@ -293,7 +296,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
TActorId GetPipeClient(const ui64 tabletId, const TActorContext&);
void RequestTabletIfNeeded(const ui64 tabletId, const TActorContext&);
- void RestartPipe(const ui64 tabletId, const TActorContext&);
+ void ClosePipe(const ui64 tabletId, const TActorContext&);
void CheckStat(const TActorContext&);
void UpdateCounters(const TActorContext&);
@@ -467,7 +470,14 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa
NMetrics::TResourceMetrics *ResourceMetrics;
- THashMap<ui64, TActorId> TabletPipes;
+ struct TPipeLocation {
+ TActorId PipeActor;
+ TMaybe<ui64> NodeId;
+ TMaybe<ui32> Generation;
+ };
+
+ THashMap<ui64, TPipeLocation> TabletPipes;
+ THashSet<ui64> PipesRequested;
bool WaitingForACL;
@@ -575,6 +585,7 @@ public:
HFunc(TEvPersQueue::TEvGetPartitionIdForWrite, Handle);
HFunc(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound, Handle);
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
+ HFunc(TEvPersQueue::TEvGetPartitionsLocation, HandleOnInit);
default:
StateInitImpl(ev, SelfId());
break;
@@ -607,6 +618,7 @@ public:
HFunc(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound, Handle);
HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
HFunc(TEvPersQueue::TEvStatus, Handle);
+ HFunc(TEvPersQueue::TEvGetPartitionsLocation, Handle);
default:
HandleDefaultEvents(ev, SelfId());
diff --git a/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt
index dee2d1aef8c..0bc497fc0de 100644
--- a/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt
@@ -57,6 +57,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp
${CMAKE_BINARY_DIR}/ydb/core/persqueue/ut/20fb34d39c23be7518eb4226481d815e.cpp
)
set_property(
diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt
index adbb8735bcf..3b9bda923cb 100644
--- a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt
@@ -60,6 +60,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp
${CMAKE_BINARY_DIR}/ydb/core/persqueue/ut/20fb34d39c23be7518eb4226481d815e.cpp
)
set_property(
diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt
index 2533fc4e246..11abf7b9f10 100644
--- a/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt
@@ -61,6 +61,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp
${CMAKE_BINARY_DIR}/ydb/core/persqueue/ut/20fb34d39c23be7518eb4226481d815e.cpp
)
set_property(
diff --git a/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt
index 9b747296e7e..aceb9d8cfa1 100644
--- a/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt
@@ -50,6 +50,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp
${CMAKE_BINARY_DIR}/ydb/core/persqueue/ut/20fb34d39c23be7518eb4226481d815e.cpp
)
set_property(
diff --git a/ydb/core/persqueue/ut/pqrb_describes_ut.cpp b/ydb/core/persqueue/ut/pqrb_describes_ut.cpp
new file mode 100644
index 00000000000..634f3ffe538
--- /dev/null
+++ b/ydb/core/persqueue/ut/pqrb_describes_ut.cpp
@@ -0,0 +1,79 @@
+#include <library/cpp/testing/unittest/registar.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
+#include <ydb/core/persqueue/events/global.h>
+
+namespace NKikimr::NPQ {
+using namespace NPersQueue;
+
+Y_UNIT_TEST_SUITE(TPQRBDescribes) {
+
+ Y_UNIT_TEST(PartitionLocations) {
+
+ NPersQueue::TTestServer server;
+ TString topicName = "rt3.dc1--topic";
+ TString topicPath = TString("/Root/PQ/") + topicName;
+ ui32 totalPartitions = 5;
+ server.AnnoyingClient->CreateTopic(topicName, totalPartitions);
+
+ auto pathDescr = server.AnnoyingClient->Ls(topicPath)->Record.GetPathDescription().GetSelf();
+
+ ui64 balancerTabletId = pathDescr.GetBalancerTabletID();
+ auto* runtime = server.CleverServer->GetRuntime();
+ const auto edge = runtime->AllocateEdgeActor();
+
+ auto checkResponse = [&](TEvPersQueue::TEvGetPartitionsLocation* request, bool ok, ui64 partitionsCount = 0) {
+ runtime->SendToPipe(balancerTabletId, edge, request);
+ auto ev = runtime->GrabEdgeEvent<TEvPersQueue::TEvGetPartitionsLocationResponse>();
+ const auto& response = ev->Record;
+ Cerr << "response: " << response.DebugString();
+
+ UNIT_ASSERT(response.GetStatus() == ok);
+ if (!ok) {
+ return ev;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(response.LocationsSize(), partitionsCount);
+ THashSet<ui32> partitionsFound;
+ for (const auto& partitionInResponse : response.GetLocations()) {
+ auto res = partitionsFound.insert(partitionInResponse.GetPartitionId());
+ UNIT_ASSERT(res.second);
+ UNIT_ASSERT_LT(partitionInResponse.GetPartitionId(), totalPartitions);
+ UNIT_ASSERT(partitionInResponse.GetNodeId() > 0);
+ UNIT_ASSERT(partitionInResponse.GetGeneration() > 0);
+ }
+ return ev;
+ };
+ auto pollBalancer = [&] (ui64 retriesCount) {
+ auto waitTime = TDuration::MilliSeconds(500);
+ while (retriesCount) {
+ auto* req = new TEvPersQueue::TEvGetPartitionsLocation();
+ runtime->SendToPipe(balancerTabletId, edge, req);
+ auto ev = runtime->GrabEdgeEvent<TEvPersQueue::TEvGetPartitionsLocationResponse>();
+ if (!ev->Record.GetStatus()) {
+ --retriesCount;
+ Sleep(waitTime);
+ waitTime *= 2;
+ } else {
+ return;
+ }
+ }
+ UNIT_FAIL("Could not get positive response from balancer");
+
+ };
+ pollBalancer(5);
+ checkResponse(new TEvPersQueue::TEvGetPartitionsLocation(), true, totalPartitions);
+ {
+ auto* req = new TEvPersQueue::TEvGetPartitionsLocation();
+ req->Record.AddPartitions(3);
+ auto resp = checkResponse(req, true, 1);
+ UNIT_ASSERT_VALUES_EQUAL(resp->Record.GetLocations(0).GetPartitionId(), 3);
+ }
+ {
+ auto* req = new TEvPersQueue::TEvGetPartitionsLocation();
+ req->Record.AddPartitions(50);
+ checkResponse(req, false);
+ }
+
+ }
+};
+
+} \ No newline at end of file
diff --git a/ydb/core/persqueue/ut/ya.make b/ydb/core/persqueue/ut/ya.make
index 41413c75f82..a7327da1ff8 100644
--- a/ydb/core/persqueue/ut/ya.make
+++ b/ydb/core/persqueue/ut/ya.make
@@ -38,6 +38,7 @@ SRCS(
sourceid_ut.cpp
type_codecs_ut.cpp
user_info_ut.cpp
+ pqrb_describes_ut.cpp
)
RESOURCE(
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index dcdcc9bf849..29cc4977f61 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -816,6 +816,7 @@ message TFeatureFlags {
optional bool EnableSubscriptionsInDiscovery = 98 [default = false];
optional bool EnableGetNodeLabels = 99 [default = false];
optional bool EnableTopicMessageMeta = 100 [default = false];
+ optional bool EnableIcNodeCache = 101 [default = true];
}
message THttpProxyConfig {
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index e92d51cf9bb..1290f87f5a8 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -485,6 +485,7 @@ message TRegisterReadSession {
message TGetReadSessionsInfo {
optional string ClientId = 1;
+ repeated uint32 Partitions = 2;
}
@@ -534,6 +535,22 @@ message TReadSessionsInfoResponse {
repeated TReadSessionInfo ReadSessions = 3;
}
+
+message TGetPartitionsLocation {
+ repeated uint64 Partitions = 1;
+}
+
+message TPartitionLocation {
+ optional uint32 PartitionId = 1;
+ optional uint64 NodeId = 2;
+ optional uint32 Generation = 3;
+};
+
+message TPartitionsLocationResponse {
+ optional bool Status = 1;
+ repeated TPartitionLocation Locations = 2;
+}
+
message TLockPartition {
optional uint32 Partition = 1;
optional uint64 TabletId = 2;
diff --git a/ydb/core/protos/tablet_pipe.proto b/ydb/core/protos/tablet_pipe.proto
index 1001b23b766..f7369f7d0dd 100644
--- a/ydb/core/protos/tablet_pipe.proto
+++ b/ydb/core/protos/tablet_pipe.proto
@@ -21,6 +21,7 @@ message TEvConnectResult {
optional NActorsProto.TActorId ServerId = 4;
optional bool Leader = 5 [default = true];
optional bool SupportsDataInPayload = 6;
+ optional uint32 Generation = 7;
};
message TEvPush {
diff --git a/ydb/core/quoter/ut_helpers.cpp b/ydb/core/quoter/ut_helpers.cpp
index a1b72386829..82de76413f1 100644
--- a/ydb/core/quoter/ut_helpers.cpp
+++ b/ydb/core/quoter/ut_helpers.cpp
@@ -237,12 +237,23 @@ void TKesusProxyTestSetup::WaitProxyStart() {
void TKesusProxyTestSetup::SendNotConnected(TTestTabletPipeFactory::TTestTabletPipe* pipe) {
WaitProxyStart();
- Runtime->Send(new IEventHandle(KesusProxyId, pipe->GetSelfID(), new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::ERROR, pipe->GetSelfID(), TActorId(), true, false)), 0, true);
+ Runtime->Send(
+ new IEventHandle(
+ KesusProxyId, pipe->GetSelfID(),
+ new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::ERROR, pipe->GetSelfID(), TActorId(), true, false, 0)
+ ),
+ 0, true
+ );
}
void TKesusProxyTestSetup::SendConnected(TTestTabletPipeFactory::TTestTabletPipe* pipe) {
WaitProxyStart();
- Runtime->Send(new IEventHandle(KesusProxyId, pipe->GetSelfID(), new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::OK, pipe->GetSelfID(), pipe->GetSelfID(), true, false)), 0, true);
+ Runtime->Send(
+ new IEventHandle(
+ KesusProxyId, pipe->GetSelfID(),
+ new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::OK, pipe->GetSelfID(), pipe->GetSelfID(), true, false, 0)
+ ), 0, true
+ );
}
void TKesusProxyTestSetup::SendDestroyed(TTestTabletPipeFactory::TTestTabletPipe* pipe) {
@@ -477,11 +488,17 @@ void TKesusProxyTestSetup::TTestTabletPipeFactory::TTestTabletPipe::HandleResour
}
void TKesusProxyTestSetup::TTestTabletPipeFactory::TTestTabletPipe::SendNotConnected() {
- Send(Parent->Parent->KesusProxyId, new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::ERROR, SelfID, TActorId(), true, false));
+ Send(
+ Parent->Parent->KesusProxyId,
+ new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::ERROR, SelfID, TActorId(), true, false, 0)
+ );
}
void TKesusProxyTestSetup::TTestTabletPipeFactory::TTestTabletPipe::SendConnected() {
- Send(Parent->Parent->KesusProxyId, new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::OK, SelfID, SelfID, true, false));
+ Send(
+ Parent->Parent->KesusProxyId,
+ new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::OK, SelfID, SelfID, true, false, 0)
+ );
}
void TKesusProxyTestSetup::TTestTabletPipeFactory::TTestTabletPipe::SendDestroyed() {
diff --git a/ydb/core/tablet/tablet_pipe_client.cpp b/ydb/core/tablet/tablet_pipe_client.cpp
index 84d12678d2b..398b13378bd 100644
--- a/ydb/core/tablet/tablet_pipe_client.cpp
+++ b/ydb/core/tablet/tablet_pipe_client.cpp
@@ -284,6 +284,7 @@ namespace NTabletPipe {
ServerId = ActorIdFromProto(record.GetServerId());
Leader = record.GetLeader();
+ Generation = record.GetGeneration();
SupportsDataInPayload = record.GetSupportsDataInPayload();
Y_VERIFY(!ServerId || record.GetStatus() == NKikimrProto::OK);
@@ -295,7 +296,8 @@ namespace NTabletPipe {
Become(&TThis::StateWork);
- ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::OK, ctx.SelfID, ServerId, Leader, false));
+ ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::OK, ctx.SelfID, ServerId,
+ Leader, false, Generation));
BLOG_D("send queued");
while (TAutoPtr<IEventHandle> x = PayloadQueue->Pop())
@@ -423,7 +425,7 @@ namespace NTabletPipe {
}
}
- ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, definitelyDead));
+ ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, definitelyDead, Generation));
return Die(ctx);
}
@@ -445,7 +447,7 @@ namespace NTabletPipe {
return;
auto *msg = ev->Get();
if (msg->Status != NKikimrProto::OK) {
- ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, false));
+ ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, false, Generation));
return Die(ctx);
}
}
@@ -453,7 +455,7 @@ namespace NTabletPipe {
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx) {
if (HiveClient != ev->Sender)
return;
- ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, false));
+ ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, false, Generation));
return Die(ctx);
}
@@ -468,7 +470,7 @@ namespace NTabletPipe {
Become(&TThis::StateCheckDead, RetryState.MakeCheckDelay(), new TEvTabletPipe::TEvClientCheckDelay());
} else {
BLOG_D("connect failed");
- ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, false));
+ ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, false, Generation));
return Die(ctx);
}
}
@@ -674,6 +676,7 @@ namespace NTabletPipe {
TAutoPtr<TPayloadQueue, TPayloadQueue::TPtrCleanDestructor> PayloadQueue;
TClientRetryState RetryState;
bool Leader;
+ ui64 Generation = 0;
TActorId HiveClient;
ui32 CurrentHiveForwards = 0;
static constexpr ui32 MAX_HIVE_FORWARDS = 10;
diff --git a/ydb/core/tablet/tablet_pipe_server.cpp b/ydb/core/tablet/tablet_pipe_server.cpp
index ea5979c18c6..a1e829a30bd 100644
--- a/ydb/core/tablet/tablet_pipe_server.cpp
+++ b/ydb/core/tablet/tablet_pipe_server.cpp
@@ -196,6 +196,7 @@ namespace NTabletPipe {
OwnerId = ev->Get()->OwnerId;
RecipientId = ev->Get()->RecipientId;
Leader = ev->Get()->Leader;
+ Generation = ev->Get()->Generation;
Y_VERIFY(OwnerId);
Y_VERIFY(RecipientId);
if (InterconnectSession) {
@@ -209,7 +210,7 @@ namespace NTabletPipe {
void OnConnected(const TActorContext& ctx) {
Become(&TThis::StateActive);
- SendToClient(ctx, new TEvTabletPipe::TEvConnectResult(NKikimrProto::OK, TabletId, ClientId, ctx.SelfID, Leader), IEventHandle::FlagTrackDelivery, ConnectCookie);
+ SendToClient(ctx, new TEvTabletPipe::TEvConnectResult(NKikimrProto::OK, TabletId, ClientId, ctx.SelfID, Leader, Generation), IEventHandle::FlagTrackDelivery, ConnectCookie);
ctx.Send(RecipientId, new TEvTabletPipe::TEvServerConnected(TabletId, ClientId, ctx.SelfID));
Connected = true;
}
@@ -263,6 +264,7 @@ namespace NTabletPipe {
bool Leader;
bool Connected;
bool HadShutdown = false;
+ ui32 Generation = 0;
};
class TConnectAcceptor: public IConnectAcceptor {
@@ -274,7 +276,7 @@ namespace NTabletPipe {
{
}
- TActorId Accept(TEvTabletPipe::TEvConnect::TPtr &ev, TActorIdentity owner, TActorId recipientId, bool leader) override {
+ TActorId Accept(TEvTabletPipe::TEvConnect::TPtr &ev, TActorIdentity owner, TActorId recipientId, bool leader, ui64 generation) override {
Y_VERIFY(ev->Get()->Record.GetTabletId() == TabletId);
const TActorId clientId = ActorIdFromProto(ev->Get()->Record.GetClientId());
IActor* server = CreateServer(TabletId, clientId, ev->InterconnectSession, ev->Get()->Record.GetFeatures(), ev->Cookie);
@@ -282,7 +284,7 @@ namespace NTabletPipe {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::PIPE_SERVER, "[" << TabletId << "]"
<< " Accept Connect Originator# " << ev->Sender);
ServerIds.insert(serverId);
- ActivateServer(TabletId, serverId, owner, recipientId, leader);
+ ActivateServer(TabletId, serverId, owner, recipientId, leader, generation);
return serverId;
}
@@ -291,7 +293,7 @@ namespace NTabletPipe {
const TActorId clientId = ActorIdFromProto(ev->Get()->Record.GetClientId());
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::PIPE_SERVER, "[" << TabletId << "]"
<< " Reject Connect Originator# " << ev->Sender);
- owner.Send(clientId, new TEvTabletPipe::TEvConnectResult(status, TabletId, clientId, TActorId(), leader));
+ owner.Send(clientId, new TEvTabletPipe::TEvConnectResult(status, TabletId, clientId, TActorId(), leader, 0));
}
void Stop(TActorIdentity owner) override {
@@ -326,10 +328,10 @@ namespace NTabletPipe {
return serverId;
}
- void Activate(TActorIdentity owner, TActorId recipientId, bool leader) override {
+ void Activate(TActorIdentity owner, TActorId recipientId, bool leader, ui64 generation) override {
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::PIPE_SERVER, "[" << TabletId << "]" << " Activate");
for (const auto& serverId : ActivatePending) {
- ActivateServer(TabletId, serverId, owner, recipientId, leader);
+ ActivateServer(TabletId, serverId, owner, recipientId, leader, generation);
}
ActivatePending.clear();
@@ -367,8 +369,8 @@ namespace NTabletPipe {
return new TConnectAcceptor(tabletId);
}
- void ActivateServer(ui64 tabletId, TActorId serverId, TActorIdentity owner, TActorId recipientId, bool leader) {
- owner.Send(serverId, new TEvTabletPipe::TEvActivate(tabletId, owner, recipientId, leader));
+ void ActivateServer(ui64 tabletId, TActorId serverId, TActorIdentity owner, TActorId recipientId, bool leader, ui64 generation) {
+ owner.Send(serverId, new TEvTabletPipe::TEvActivate(tabletId, owner, recipientId, leader, generation));
}
void CloseServer(TActorIdentity owner, TActorId serverId) {
diff --git a/ydb/core/tablet/tablet_sys.cpp b/ydb/core/tablet/tablet_sys.cpp
index 715143e4929..86166c17f77 100644
--- a/ydb/core/tablet/tablet_sys.cpp
+++ b/ydb/core/tablet/tablet_sys.cpp
@@ -1505,7 +1505,7 @@ void TTablet::Handle(TEvTabletPipe::TEvConnect::TPtr& ev) {
if (PipeConnectAcceptor->IsStopped()) {
PipeConnectAcceptor->Reject(ev, SelfId(), NKikimrProto::TRYLATER, Leader);
} else if (PipeConnectAcceptor->IsActive()) {
- PipeConnectAcceptor->Accept(ev, SelfId(), UserTablet, Leader);
+ PipeConnectAcceptor->Accept(ev, SelfId(), UserTablet, Leader, SuggestedGeneration);
} else {
PipeConnectAcceptor->Enqueue(ev, SelfId());
}
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 1d39ebfcfb8..45b39945e68 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -103,6 +103,7 @@
#include <ydb/library/folder_service/mock/mock_folder_service_adapter.h>
#include <ydb/core/client/server/msgbus_server_tracer.h>
+#include <ydb/core/client/server/ic_nodes_cache_service.h>
#include <library/cpp/actors/interconnect/interconnect.h>
@@ -850,7 +851,11 @@ namespace Tests {
}
}
}
-
+ {
+ IActor* icNodeCache = NIcNodeCache::CreateICNodesInfoCacheService(Runtime->GetDynamicCounters());
+ TActorId icCacheId = Runtime->Register(icNodeCache, nodeIdx);
+ Runtime->RegisterService(NIcNodeCache::CreateICNodesInfoCacheServiceId(), icCacheId, nodeIdx);
+ }
{
auto driverConfig = NYdb::TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << Settings->GrpcPort);
if (!Driver) {
diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto
index 9621b32d98b..b5a95b5f1da 100644
--- a/ydb/public/api/protos/ydb_topic.proto
+++ b/ydb/public/api/protos/ydb_topic.proto
@@ -760,7 +760,7 @@ message PartitionLocation {
int32 node_id = 1;
// Partition generation.
- uint64 generation = 2;
+ int64 generation = 2;
}
// Describe topic request sent from client to server.
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index 07d2fae71e0..892a0cddb78 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -77,7 +77,7 @@ namespace NKikimr::NDataStreams::V1 {
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TActorContext& ctx,
const TString& workingDir, const TString& name);
void StateWork(TAutoPtr<IEventHandle>& ev);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx);
};
@@ -93,9 +93,8 @@ namespace NKikimr::NDataStreams::V1 {
Become(&TCreateStreamActor::StateWork);
}
- void TCreateStreamActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
+ void TCreateStreamActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
Y_UNUSED(ev);
- Y_UNUSED(ctx);
}
@@ -204,8 +203,7 @@ namespace NKikimr::NDataStreams::V1 {
void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TActorContext& ctx,
const TString& workingDir, const TString& name);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev,
- const TActorContext& ctx);
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
private:
bool EnforceDeletion;
@@ -234,9 +232,8 @@ namespace NKikimr::NDataStreams::V1 {
modifyScheme.MutableDrop()->SetName(name);
}
- void TDeleteStreamActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev,
- const TActorContext& ctx) {
- if (ReplyIfNotTopic(ev, ctx)) {
+ void TDeleteStreamActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ if (ReplyIfNotTopic(ev)) {
return;
}
@@ -247,10 +244,10 @@ namespace NKikimr::NDataStreams::V1 {
if (readRules.size() > 0 && EnforceDeletion == false) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::IN_USE),
TStringBuilder() << "Stream has registered consumers" <<
- "and EnforceConsumerDeletion flag is false", ctx);
+ "and EnforceConsumerDeletion flag is false", ActorContext());
}
- SendProposeRequest(ctx);
+ SendProposeRequest(ActorContext());
}
//-----------------------------------------------------------------------------------------------------------
@@ -564,7 +561,7 @@ namespace NKikimr::NDataStreams::V1 {
void Bootstrap(const NActors::TActorContext& ctx);
void StateWork(TAutoPtr<IEventHandle>& ev);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) {
if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) {
@@ -625,13 +622,13 @@ namespace NKikimr::NDataStreams::V1 {
}
}
- void TDescribeStreamActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
+ void TDescribeStreamActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
Y_VERIFY(result->ResultSet.size() == 1); // describe only one topic
const auto& response = result->ResultSet.front();
const TString path = JoinSeq("/", response.Path);
- if (ReplyIfNotTopic(ev, ctx)) {
+ if (ReplyIfNotTopic(ev)) {
return;
}
@@ -644,7 +641,7 @@ namespace NKikimr::NDataStreams::V1 {
tabletIds.insert(partition.GetTabletId());
}
if (tabletIds.size() == 0) {
- ReplyAndDie(ctx);
+ ReplyAndDie(ActorContext());
}
RequestsInfly = tabletIds.size();
@@ -659,9 +656,9 @@ namespace NKikimr::NDataStreams::V1 {
};
for (auto& tabletId : tabletIds) {
- Pipes.push_back(ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig)));
+ Pipes.push_back(ActorContext().Register(NTabletPipe::CreateClient(ActorContext().SelfID, tabletId, clientConfig)));
TAutoPtr<TEvPersQueue::TEvOffsets> req(new TEvPersQueue::TEvOffsets);
- NTabletPipe::SendData(ctx, Pipes.back(), req.Release());
+ NTabletPipe::SendData(ActorContext(), Pipes.back(), req.Release());
}
}
@@ -726,7 +723,8 @@ namespace NKikimr::NDataStreams::V1 {
}
}
if (!startShardFound) {
- return ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx);
+ return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST,
+ TStringBuilder() << "Bad shard id " << GetProtoRequest()->exclusive_start_shard_id(), ctx);
}
return ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ctx);
}
@@ -909,7 +907,7 @@ namespace NKikimr::NDataStreams::V1 {
void Bootstrap(const NActors::TActorContext& ctx);
void StateWork(TAutoPtr<IEventHandle>& ev);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
protected:
void SendResponse(const TActorContext& ctx, const std::vector<std::pair<TString, ui64>>& readRules, ui32 leftToRead);
@@ -972,11 +970,11 @@ namespace NKikimr::NDataStreams::V1 {
}
}
- void TListStreamConsumersActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
+ void TListStreamConsumersActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
Y_VERIFY(result->ResultSet.size() == 1); // describe only one topic
- if (ReplyIfNotTopic(ev, ctx)) {
+ if (ReplyIfNotTopic(ev)) {
return;
}
@@ -991,7 +989,7 @@ namespace NKikimr::NDataStreams::V1 {
if (alreadyRead > (ui32)streamReadRulesNames.size()) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT),
TStringBuilder() << "Provided next_token is malformed - " <<
- "everything is already read", ctx);
+ "everything is already read", ActorContext());
}
const auto rulesToRead = std::min(streamReadRulesNames.size() - alreadyRead, MaxResults);
@@ -1003,7 +1001,7 @@ namespace NKikimr::NDataStreams::V1 {
}
leftToRead = streamReadRulesNames.size() - alreadyRead - rulesToRead;
- SendResponse(ctx, readRules, leftToRead);
+ SendResponse(ActorContext(), readRules, leftToRead);
}
void TListStreamConsumersActor::SendResponse(const TActorContext& ctx, const std::vector<std::pair<TString, ui64>>& readRules, ui32 leftToRead) {
@@ -1176,7 +1174,7 @@ namespace NKikimr::NDataStreams::V1 {
void Bootstrap(const NActors::TActorContext& ctx);
void StateWork(TAutoPtr<IEventHandle>& ev);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
private:
@@ -1248,15 +1246,15 @@ namespace NKikimr::NDataStreams::V1 {
}
}
- void TGetShardIteratorActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
- if (ReplyIfNotTopic(ev, ctx)) {
+ void TGetShardIteratorActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ if (ReplyIfNotTopic(ev)) {
return;
}
const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get();
auto topicInfo = navigate->ResultSet.begin();
StreamName = NKikimr::CanonizePath(topicInfo->Path);
- if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
+ if (AppData(ActorContext())->PQConfig.GetRequireCredentialsInNewProtocol()) {
NACLib::TUserToken token(this->Request_->GetSerializedToken());
if (!topicInfo->SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow,
@@ -1266,7 +1264,7 @@ namespace NKikimr::NDataStreams::V1 {
TStringBuilder() << "Access to stream "
<< this->GetProtoRequest()->stream_name()
<< " is denied for subject "
- << token.GetUserSID(), ctx);
+ << token.GetUserSID(), ActorContext());
}
}
@@ -1276,16 +1274,18 @@ namespace NKikimr::NDataStreams::V1 {
TString shardName = GetShardName(partitionId);
if (shardName == ShardId) {
if (topicInfo->ShowPrivatePath) {
- SendResponse(ctx, TShardIterator::Cdc(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber));
+ SendResponse(ActorContext(),
+ TShardIterator::Cdc(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber));
} else {
- SendResponse(ctx, TShardIterator::Common(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber));
+ SendResponse(ActorContext(),
+ TShardIterator::Common(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber));
}
return;
}
}
ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND),
- TStringBuilder() << "No such shard: " << ShardId, ctx);
+ TStringBuilder() << "No such shard: " << ShardId, ActorContext());
}
void TGetShardIteratorActor::SendResponse(const TActorContext& ctx, const TShardIterator& shardIt) {
@@ -1322,7 +1322,7 @@ namespace NKikimr::NDataStreams::V1 {
void Bootstrap(const NActors::TActorContext& ctx);
void StateWork(TAutoPtr<IEventHandle>& ev);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx);
@@ -1390,7 +1390,7 @@ namespace NKikimr::NDataStreams::V1 {
);
NKikimrClient::TPersQueueRequest request;
- request.MutablePartitionRequest()->SetTopic(this->GetTopicPath(ctx));
+ request.MutablePartitionRequest()->SetTopic(this->GetTopicPath());
request.MutablePartitionRequest()->SetPartition(ShardIterator.GetShardId());
ActorIdToProto(PipeClient, request.MutablePartitionRequest()->MutablePipeClient());
@@ -1417,12 +1417,11 @@ namespace NKikimr::NDataStreams::V1 {
}
}
- void TGetRecordsActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev,
- const TActorContext& ctx) {
+ void TGetRecordsActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const auto &result = ev->Get()->Request.Get();
const auto response = result->ResultSet.front();
- if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
+ if (AppData(ActorContext())->PQConfig.GetRequireCredentialsInNewProtocol()) {
NACLib::TUserToken token(this->Request_->GetSerializedToken());
if (!response.SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow,
@@ -1432,7 +1431,7 @@ namespace NKikimr::NDataStreams::V1 {
TStringBuilder() << "Access to stream "
<< ShardIterator.GetStreamName()
<< " is denied for subject "
- << token.GetUserSID(), ctx);
+ << token.GetUserSID(), ActorContext());
}
}
@@ -1444,13 +1443,13 @@ namespace NKikimr::NDataStreams::V1 {
auto partitionId = partition.GetPartitionId();
if (partitionId == ShardIterator.GetShardId()) {
TabletId = partition.GetTabletId();
- return SendReadRequest(ctx);
+ return SendReadRequest(ActorContext());
}
}
}
ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND),
- TStringBuilder() << "No such shard: " << ShardIterator.GetShardId(), ctx);
+ TStringBuilder() << "No such shard: " << ShardIterator.GetShardId(), ActorContext());
}
void TGetRecordsActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) {
@@ -1531,7 +1530,7 @@ namespace NKikimr::NDataStreams::V1 {
case EWakeupTag::RlAllowed:
return SendResponse(ctx);
case EWakeupTag::RlNoResource:
- return ReplyWithResult(Ydb::StatusIds::OVERLOADED, ctx);
+ return RespondWithCode(Ydb::StatusIds::OVERLOADED);
default:
return HandleWakeup(ev, ctx);
}
@@ -1575,8 +1574,7 @@ namespace NKikimr::NDataStreams::V1 {
void Handle(TEvPersQueue::TEvOffsetsResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev,
- const TActorContext& ctx);
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
void Die(const TActorContext& ctx) override;
private:
@@ -1666,10 +1664,11 @@ namespace NKikimr::NDataStreams::V1 {
}
}
- void TListShardsActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
- if (ReplyIfNotTopic(ev, ctx)) {
+ void TListShardsActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ if (ReplyIfNotTopic(ev)) {
return;
}
+ auto ctx = ActorContext();
const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get();
auto topicInfo = navigate->ResultSet.front();
@@ -1683,7 +1682,7 @@ namespace NKikimr::NDataStreams::V1 {
TStringBuilder() << "Access to stream "
<< this->GetProtoRequest()->stream_name()
<< " is denied for subject "
- << token.GetUserSID(), ctx);
+ << token.GetUserSID(), ActorContext());
}
}
@@ -1855,8 +1854,7 @@ namespace NKikimr::NDataStreams::V1 {
void Bootstrap(const NActors::TActorContext& ctx);
void StateWork(TAutoPtr<IEventHandle>& ev);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev,
- const TActorContext& ctx);
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
private:
void SendResponse(const TActorContext& ctx);
@@ -1885,9 +1883,9 @@ namespace NKikimr::NDataStreams::V1 {
}
void TDescribeStreamSummaryActor::HandleCacheNavigateResponse(
- TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx
+ TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev
) {
- if (ReplyIfNotTopic(ev, ctx)) {
+ if (ReplyIfNotTopic(ev)) {
return;
}
@@ -1900,7 +1898,7 @@ namespace NKikimr::NDataStreams::V1 {
PQGroup = response.PQGroupInfo->Description;
SelfInfo = response.Self->Info;
- SendResponse(ctx);
+ SendResponse(ActorContext());
}
void TDescribeStreamSummaryActor::SendResponse(const TActorContext& ctx) {
diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h
index cac15961120..d9bbb104817 100644
--- a/ydb/services/datastreams/put_records_actor.h
+++ b/ydb/services/datastreams/put_records_actor.h
@@ -227,7 +227,7 @@ namespace NKikimr::NDataStreams::V1 {
void Bootstrap(const NActors::TActorContext &ctx);
void PreparePartitionActors(const NActors::TActorContext& ctx);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
protected:
void Write(const TActorContext& ctx);
@@ -305,7 +305,7 @@ namespace NKikimr::NDataStreams::V1 {
void TPutRecordsActorBase<TDerived, TProto>::SendNavigateRequest(const TActorContext& ctx) {
auto schemeCacheRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
- entry.Path = NKikimr::SplitPath(this->GetTopicPath(ctx));
+ entry.Path = NKikimr::SplitPath(this->GetTopicPath());
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
entry.SyncVersion = true;
schemeCacheRequest->ResultSet.emplace_back(entry);
@@ -313,14 +313,14 @@ namespace NKikimr::NDataStreams::V1 {
}
template<class TDerived, class TProto>
- void TPutRecordsActorBase<TDerived, TProto>::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
- if (TBase::ReplyIfNotTopic(ev, ctx)) {
+ void TPutRecordsActorBase<TDerived, TProto>::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ if (TBase::ReplyIfNotTopic(ev)) {
return;
}
const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get();
auto topicInfo = navigate->ResultSet.begin();
- if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
+ if (AppData(this->ActorContext())->PQConfig.GetRequireCredentialsInNewProtocol()) {
NACLib::TUserToken token(this->Request_->GetSerializedToken());
if (!topicInfo->SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, token)) {
return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED,
@@ -328,7 +328,7 @@ namespace NKikimr::NDataStreams::V1 {
TStringBuilder() << "Access for stream "
<< this->GetProtoRequest()->stream_name()
<< " is denied for subject "
- << token.GetUserSID(), ctx);
+ << token.GetUserSID(), this->ActorContext());
}
}
@@ -336,21 +336,21 @@ namespace NKikimr::NDataStreams::V1 {
PQGroupInfo = topicInfo->PQGroupInfo;
SetMeteringMode(PQGroupInfo->Description.GetPQTabletConfig().GetMeteringMode());
- if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() && !PQGroupInfo->Description.GetPQTabletConfig().GetLocalDC()) {
+ if (!AppData(this->ActorContext())->PQConfig.GetTopicsAreFirstClassCitizen() && !PQGroupInfo->Description.GetPQTabletConfig().GetLocalDC()) {
return this->ReplyWithError(Ydb::StatusIds::BAD_REQUEST,
Ydb::PersQueue::ErrorCode::BAD_REQUEST,
TStringBuilder() << "write to mirrored stream "
<< this->GetProtoRequest()->stream_name()
- << " is forbidden", ctx);
+ << " is forbidden", this->ActorContext());
}
if (IsQuotaRequired()) {
const auto ru = 1 + CalcRuConsumption(GetPayloadSize());
- Y_VERIFY(MaybeRequestQuota(ru, EWakeupTag::RlAllowed, ctx));
+ Y_VERIFY(MaybeRequestQuota(ru, EWakeupTag::RlAllowed, this->ActorContext()));
} else {
- Write(ctx);
+ Write(this->ActorContext());
}
}
@@ -369,7 +369,7 @@ namespace NKikimr::NDataStreams::V1 {
if (items[part].empty()) continue;
PartitionToActor[part].ActorId = ctx.Register(
new TDatastreamsPartitionActor(ctx.SelfID, partition.GetTabletId(), part,
- this->GetTopicPath(ctx), std::move(items[part]),
+ this->GetTopicPath(), std::move(items[part]),
ShouldBeCharged));
}
this->CheckFinish(ctx);
diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h
index 83067772feb..5f01f97f944 100644
--- a/ydb/services/lib/actors/pq_schema_actor.h
+++ b/ydb/services/lib/actors/pq_schema_actor.h
@@ -106,99 +106,51 @@ namespace NKikimr::NGRpcProxy::V1 {
}
};
- template<class TDerived, class TRequest>
- class TPQGrpcSchemaBase : public NKikimr::NGRpcService::TRpcSchemeRequestActor<TDerived, TRequest> {
- protected:
- using TBase = NKikimr::NGRpcService::TRpcSchemeRequestActor<TDerived, TRequest>;
-
- using TProtoRequest = typename TRequest::TRequest;
+ template<class TDerived>
+ class TPQSchemaBase {
public:
- TPQGrpcSchemaBase(NGRpcService::IRequestOpCtx *request, const TString& topicPath)
- : TBase(request)
- , TopicPath(topicPath)
- {
- }
- TPQGrpcSchemaBase(NGRpcService::IRequestOpCtx* request)
- : TBase(request)
- , TopicPath(TBase::GetProtoRequest()->path())
+ TPQSchemaBase(const TString& topicPath, const TString& database)
+ : TopicPath(topicPath)
+ , Database(database)
{
- //auto path = TBase::GetProtoRequest()->path();
- }
-
- TString GetTopicPath(const NActors::TActorContext& ctx) {
- auto path = NPersQueue::GetFullTopicPath(ctx, this->Request_->GetDatabaseName(), TopicPath);
- if (PrivateTopicName) {
- path = JoinPath(ChildPath(NKikimr::SplitPath(path), *PrivateTopicName));
- }
- return path;
- }
-
- const TMaybe<TString>& GetCdcStreamName() const {
- return CdcStreamName;
}
protected:
- // TDerived must implement FillProposeRequest(TEvProposeTransaction&, const TActorContext& ctx, TString workingDir, TString name);
- void SendProposeRequest(const NActors::TActorContext &ctx) {
- std::pair <TString, TString> pathPair;
- try {
- pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath(ctx));
- } catch (const std::exception &ex) {
- this->Request_->RaiseIssue(NYql::ExceptionToIssue(ex));
- return this->ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx);
- }
-
- const auto &workingDir = pathPair.first;
- const auto &name = pathPair.second;
-
- std::unique_ptr <TEvTxUserProxy::TEvProposeTransaction> proposal(
- new TEvTxUserProxy::TEvProposeTransaction());
-
- SetDatabase(proposal.get(), *this->Request_);
+ virtual TString GetTopicPath() const = 0;
+ virtual void RespondWithCode(Ydb::StatusIds::StatusCode status) = 0;
+ virtual void AddIssue(const NYql::TIssue& issue) = 0;
+ virtual bool SetRequestToken(NSchemeCache::TSchemeCacheNavigate* request) const = 0;
- if (this->Request_->GetSerializedToken().empty()) {
- if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
- return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
- "Unauthenticated access is forbidden, please provide credentials", ctx);
- }
- } else {
- proposal->Record.SetUserToken(this->Request_->GetSerializedToken());
- }
-
- static_cast<TDerived*>(this)->FillProposeRequest(*proposal, ctx, workingDir, name);
+ virtual bool ProcessCdc(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry) {
+ Y_UNUSED(entry);
+ return false;
+ };
- if (!IsDead) {
- ctx.Send(MakeTxProxyID(), proposal.release());
- }
- }
- void SendDescribeProposeRequest(const NActors::TActorContext& ctx, bool showPrivate = false) {
+ void SendDescribeProposeRequest(const NActors::TActorContext& ctx, bool showPrivate) {
auto navigateRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
- navigateRequest->DatabaseName = CanonizePath(this->Request_->GetDatabaseName().GetOrElse(""));
+ navigateRequest->DatabaseName = CanonizePath(Database);
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
- entry.Path = NKikimr::SplitPath(GetTopicPath(ctx));
+ entry.Path = NKikimr::SplitPath(GetTopicPath());
entry.SyncVersion = true;
- entry.ShowPrivatePath = showPrivate || PrivateTopicName.Defined();
+ entry.ShowPrivatePath = showPrivate;
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
navigateRequest->ResultSet.emplace_back(entry);
- if (this->Request_->GetSerializedToken().empty()) {
- if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
- return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
- "Unauthenticated access is forbidden, please provide credentials", ctx);
- }
- } else {
- navigateRequest->UserToken = new NACLib::TUserToken(this->Request_->GetSerializedToken());
+ if (!SetRequestToken(navigateRequest.get())) {
+ AddIssue(FillIssue("Unauthenticated access is forbidden, please provide credentials",
+ Ydb::PersQueue::ErrorCode::ACCESS_DENIED));
+ return RespondWithCode(Ydb::StatusIds::UNAUTHORIZED);
}
if (!IsDead) {
ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigateRequest.release()));
}
}
- bool ReplyIfNotTopic(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
+ bool ReplyIfNotTopic(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
Y_VERIFY(result->ResultSet.size() == 1);
const auto& response = result->ResultSet.front();
@@ -207,16 +159,15 @@ namespace NKikimr::NGRpcProxy::V1 {
if (ev->Get()->Request.Get()->ResultSet.size() != 1 ||
ev->Get()->Request.Get()->ResultSet.begin()->Kind !=
NSchemeCache::TSchemeCacheNavigate::KindTopic) {
- this->Request_->RaiseIssue(FillIssue(TStringBuilder() << "path '" << path << "' is not a topic",
- Ydb::PersQueue::ErrorCode::VALIDATION_ERROR));
- TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
+ AddIssue(FillIssue(TStringBuilder() << "path '" << path << "' is not a topic",
+ Ydb::PersQueue::ErrorCode::VALIDATION_ERROR));
+ RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
return true;
}
-
return false;
}
- void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
Y_VERIFY(result->ResultSet.size() == 1);
const auto& response = result->ResultSet.front();
@@ -225,124 +176,232 @@ namespace NKikimr::NGRpcProxy::V1 {
switch (response.Status) {
case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: {
if (response.Kind == NSchemeCache::TSchemeCacheNavigate::KindCdcStream) {
- if constexpr (THasCdcStreamCompatibility<TDerived>::Value) {
- if (static_cast<TDerived*>(this)->IsCdcStreamCompatible()) {
- Y_VERIFY(response.ListNodeEntry->Children.size() == 1);
- PrivateTopicName = response.ListNodeEntry->Children.at(0).Name;
-
- if (response.Self) {
- CdcStreamName = response.Self->Info.GetName();
- }
-
- return SendDescribeProposeRequest(ctx);
- }
+ if (ProcessCdc(response)) {
+ return;
}
- this->Request_->RaiseIssue(
+ AddIssue(
FillIssue(
TStringBuilder() << "path '" << path << "' is not compatible scheme object",
Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT
)
);
- return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
+ return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
} else if (!response.PQGroupInfo) {
- this->Request_->RaiseIssue(
+ AddIssue(
FillIssue(
TStringBuilder() << "path '" << path << "' creation is not completed",
Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
)
);
- return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
+ return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
}
- return static_cast<TDerived*>(this)->HandleCacheNavigateResponse(ev, ctx);
+ return static_cast<TDerived*>(this)->HandleCacheNavigateResponse(ev);
}
break;
case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown: {
- this->Request_->RaiseIssue(
+ AddIssue(
FillIssue(
TStringBuilder() << "path '" << path << "' does not exist or you " <<
"do not have access rights",
Ydb::PersQueue::ErrorCode::ACCESS_DENIED
)
);
- return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
+ return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
}
case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete: {
- this->Request_->RaiseIssue(
+ AddIssue(
FillIssue(
TStringBuilder() << "table creation is not completed",
Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
)
);
- return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
+ return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
}
break;
case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable: {
- this->Request_->RaiseIssue(
+ AddIssue(
FillIssue(
TStringBuilder() << "path '" << path << "' is not a table",
Ydb::PersQueue::ErrorCode::VALIDATION_ERROR
)
);
- return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
+ return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
}
break;
case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown: {
- this->Request_->RaiseIssue(
+ AddIssue(
FillIssue(
TStringBuilder() << "unknown database root",
Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT
)
);
- return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
+ return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
}
break;
default:
- return TBase::Reply(Ydb::StatusIds::GENERIC_ERROR, ctx);
+ return RespondWithCode(Ydb::StatusIds::GENERIC_ERROR);
}
}
- void ReplyWithError(Ydb::StatusIds::StatusCode status,
- Ydb::PersQueue::ErrorCode::ErrorCode pqStatus,
- const TString& messageText, const NActors::TActorContext& ctx) {
- this->Request_->RaiseIssue(FillIssue(messageText, pqStatus));
- this->Request_->ReplyWithYdbStatus(status);
- this->Die(ctx);
- IsDead = true;
+
+ void StateWork(TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ default:
+ Y_FAIL();
+ }
+ }
+
+ protected:
+ bool IsDead = false;
+ const TString TopicPath;
+ const TString Database;
+ };
+
+
+ template<class TDerived, class TRequest>
+ class TPQGrpcSchemaBase : public NKikimr::NGRpcService::TRpcSchemeRequestActor<TDerived, TRequest>,
+ public TPQSchemaBase<TPQGrpcSchemaBase<TDerived, TRequest>> {
+ protected:
+ using TBase = NKikimr::NGRpcService::TRpcSchemeRequestActor<TDerived, TRequest>;
+ using TActorBase = TPQSchemaBase<TPQGrpcSchemaBase<TDerived, TRequest>>;
+
+ using TProtoRequest = typename TRequest::TRequest;
+
+ public:
+
+ TPQGrpcSchemaBase(NGRpcService::IRequestOpCtx *request, const TString& topicPath)
+ : TBase(request)
+ , TActorBase(topicPath, this->Request_->GetDatabaseName().GetOrElse(""))
+ {
+ }
+ TPQGrpcSchemaBase(NGRpcService::IRequestOpCtx* request)
+ : TBase(request)
+ , TActorBase(TBase::GetProtoRequest()->path(), this->Request_->GetDatabaseName().GetOrElse(""))
+ {
+ }
+
+ TString GetTopicPath() const override {
+ auto path = NPersQueue::GetFullTopicPath(this->ActorContext(), this->Request_->GetDatabaseName(), TActorBase::TopicPath);
+ if (PrivateTopicName) {
+ path = JoinPath(ChildPath(NKikimr::SplitPath(path), *PrivateTopicName));
+ }
+ return path;
+ }
+
+ const TMaybe<TString>& GetCdcStreamName() const {
+ return CdcStreamName;
+ }
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ return static_cast<TDerived*>(this)->HandleCacheNavigateResponse(ev);
+ }
+
+
+ protected:
+ // TDerived must implement FillProposeRequest(TEvProposeTransaction&, const TActorContext& ctx, TString workingDir, TString name);
+ void SendProposeRequest(const NActors::TActorContext &ctx) {
+ std::pair <TString, TString> pathPair;
+ try {
+ pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath());
+ } catch (const std::exception &ex) {
+ this->Request_->RaiseIssue(NYql::ExceptionToIssue(ex));
+ return this->RespondWithCode(Ydb::StatusIds::BAD_REQUEST);
+ }
+
+ const auto& workingDir = pathPair.first;
+ const auto& name = pathPair.second;
+
+ std::unique_ptr <TEvTxUserProxy::TEvProposeTransaction> proposal(
+ new TEvTxUserProxy::TEvProposeTransaction());
+
+ SetDatabase(proposal.get(), *this->Request_);
+
+ if (this->Request_->GetSerializedToken().empty()) {
+ if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) {
+ return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED,
+ "Unauthenticated access is forbidden, please provide credentials", ctx);
+ }
+ } else {
+ proposal->Record.SetUserToken(this->Request_->GetSerializedToken());
+ }
+
+ static_cast<TDerived*>(this)->FillProposeRequest(*proposal, ctx, workingDir, name);
+
+ if (!TActorBase::IsDead) {
+ ctx.Send(MakeTxProxyID(), proposal.release());
+ }
+ }
+
+ void SendDescribeProposeRequest(const NActors::TActorContext& ctx, bool showPrivate = false) {
+ return TActorBase::SendDescribeProposeRequest(ctx, showPrivate || PrivateTopicName.Defined());
+ }
+
+ bool SetRequestToken(NSchemeCache::TSchemeCacheNavigate* request) const override {
+ if (this->Request_->GetSerializedToken().empty()) {
+ return !(AppData()->PQConfig.GetRequireCredentialsInNewProtocol());
+ } else {
+ request->UserToken = new NACLib::TUserToken(this->Request_->GetSerializedToken());
+ return true;
+ }
+ }
+ bool ProcessCdc(const NSchemeCache::TSchemeCacheNavigate::TEntry& response) override {
+ if constexpr (THasCdcStreamCompatibility<TDerived>::Value) {
+ if (static_cast<TDerived*>(this)->IsCdcStreamCompatible()) {
+ Y_VERIFY(response.ListNodeEntry->Children.size() == 1);
+ PrivateTopicName = response.ListNodeEntry->Children.at(0).Name;
+
+ if (response.Self) {
+ CdcStreamName = response.Self->Info.GetName();
+ }
+
+ SendDescribeProposeRequest(TBase::ActorContext());
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void AddIssue(const NYql::TIssue& issue) override {
+ this->Request_->RaiseIssue(issue);
}
void ReplyWithError(Ydb::StatusIds::StatusCode status, size_t additionalStatus,
const TString& messageText, const NActors::TActorContext& ctx) {
+ if (TActorBase::IsDead)
+ return;
this->Request_->RaiseIssue(FillIssue(messageText, additionalStatus));
this->Request_->ReplyWithYdbStatus(status);
this->Die(ctx);
- IsDead = true;
+ TActorBase::IsDead = true;
}
- void ReplyWithResult(Ydb::StatusIds::StatusCode status, const NActors::TActorContext& ctx) {
+ void RespondWithCode(Ydb::StatusIds::StatusCode status) override {
+ if (TActorBase::IsDead)
+ return;
this->Request_->ReplyWithYdbStatus(status);
- this->Die(ctx);
- IsDead = true;
+ this->Die(this->ActorContext());
+ TActorBase::IsDead = true;
}
template<class TProtoResult>
void ReplyWithResult(Ydb::StatusIds::StatusCode status, const TProtoResult& result, const NActors::TActorContext& ctx) {
+ if (TActorBase::IsDead)
+ return;
this->Request_->SendResult(result, status);
this->Die(ctx);
- IsDead = true;
+ TActorBase::IsDead = true;
}
void StateWork(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
- HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, TActorBase::Handle);
default: TBase::StateWork(ev);
}
}
private:
- bool IsDead = false;
- const TString TopicPath;
TMaybe<TString> PrivateTopicName;
TMaybe<TString> CdcStreamName;
};
@@ -400,11 +459,11 @@ namespace NKikimr::NGRpcProxy::V1 {
this->DescribeSchemeResult.Reset();
}
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get();
Y_VERIFY(result->ResultSet.size() == 1);
DescribeSchemeResult = std::move(ev);
- return this->SendProposeRequest(ctx);
+ return this->SendProposeRequest(this->ActorContext());
}
void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx) {
@@ -415,7 +474,7 @@ namespace NKikimr::NGRpcProxy::V1 {
{
return TBase::ReplyWithError(Ydb::StatusIds::OVERLOADED,
Ydb::PersQueue::ErrorCode::OVERLOAD,
- TStringBuilder() << "Topic with name " << TBase::GetTopicPath(ctx) << " has another alter in progress",
+ TStringBuilder() << "Topic with name " << TBase::GetTopicPath() << " has another alter in progress",
ctx);
}
@@ -429,8 +488,77 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
- private:
+ protected:
THolder<NActors::TEventHandle<TEvTxProxySchemeCache::TEvNavigateKeySetResult>> DescribeSchemeResult;
};
+
+ template<class TDerived, class TRequest, class TEvResponse>
+ class TPQInternalSchemaActor : public TPQSchemaBase<TPQInternalSchemaActor<TDerived, TRequest, TEvResponse>>
+ , public TActorBootstrapped<TPQInternalSchemaActor<TDerived, TRequest, TEvResponse>>
+ {
+ protected:
+ using TBase = TPQSchemaBase<TPQInternalSchemaActor<TDerived, TRequest, TEvResponse>>;
+
+ public:
+
+ TPQInternalSchemaActor(const TRequest& request, const TActorId& requester)
+ : TBase(request.Topic, request.Database)
+ , Request(request)
+ , Requester(requester)
+ , Response(MakeHolder<TEvResponse>())
+ {
+ }
+
+ virtual void Bootstrap(const TActorContext&) = 0;
+ virtual void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) = 0;
+
+ TString GetTopicPath() const override {
+ return TBase::TopicPath;
+ }
+
+ void SendDescribeProposeRequest() {
+ return TBase::SendDescribeProposeRequest(this->ActorContext(), false);
+ }
+
+ bool HandleCacheNavigateResponseBase(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic
+ if (this->ReplyIfNotTopic(ev)) {
+ return false;
+ }
+ PQGroupInfo = ev->Get()->Request->ResultSet[0].PQGroupInfo;
+ return true;
+ }
+
+ bool SetRequestToken(NSchemeCache::TSchemeCacheNavigate* request) const override {
+ if (Request.Token.empty()) {
+ return !(AppData()->PQConfig.GetRequireCredentialsInNewProtocol());
+ } else {
+ request->UserToken = new NACLib::TUserToken(Request.Token);
+ return true;
+ }
+ }
+
+ void AddIssue(const NYql::TIssue& issue) override {
+ Response->Issues.AddIssue(issue);
+ }
+
+
+ void RespondWithCode(Ydb::StatusIds::StatusCode status) override {
+ Response->Status = status;
+ this->ActorContext().Send(Requester, Response.Release());
+ this->Die(this->ActorContext());
+ TBase::IsDead = true;
+ }
+
+
+ private:
+ TRequest Request;
+ TActorId Requester;
+ TMaybe<TString> PrivateTopicName;
+ protected:
+ THolder<TEvResponse> Response;
+ TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
+ };
+
}
diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h
index 536714d08cc..cf1ee625238 100644
--- a/ydb/services/persqueue_v1/actors/events.h
+++ b/ydb/services/persqueue_v1/actors/events.h
@@ -8,6 +8,7 @@
#include <ydb/core/persqueue/percentile_counter.h>
#include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h>
+#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/services/lib/actors/type_definitions.h>
@@ -63,6 +64,7 @@ struct TEvPQProxy {
EvTopicUpdateToken,
EvCommitRange,
EvRequestTablet,
+ EvPartitionLocationResponse,
EvEnd
};
@@ -444,5 +446,49 @@ struct TEvPQProxy {
ui64 TabletId;
};
+ struct TLocalResponseBase {
+ Ydb::StatusIds::StatusCode Status;
+ NYql::TIssues Issues;
+ };
+
+ struct TPartitionLocationInfo {
+ ui64 PartitionId;
+ ui64 IncGeneration;
+ ui64 NodeId;
+ TString Hostname;
+ };
+
+ struct TEvPartitionLocationResponse : public NActors::TEventLocal<TEvRequestTablet, EvPartitionLocationResponse>
+ , public TLocalResponseBase
+
+ {
+ TEvPartitionLocationResponse() {}
+ TVector<TPartitionLocationInfo> Partitions;
+ };
+
+};
+
+struct TLocalRequestBase {
+ TLocalRequestBase(const TString& topic, const TString& database, const TString& token)
+ : Topic(topic)
+ , Database(database)
+ , Token(token)
+ {}
+
+ TString Topic;
+ TString Database;
+ TString Token;
+
+};
+
+struct TGetPartitionsLocationRequest : public TLocalRequestBase {
+ TGetPartitionsLocationRequest() = default;
+ TGetPartitionsLocationRequest(const TString& topic, const TString& database, const TString& token, const TVector<ui32>& partitionIds)
+ : TLocalRequestBase(topic, database, token)
+ , PartitionIds(partitionIds)
+ {}
+
+ TVector<ui32> PartitionIds;
+
};
}
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp
index 8830ab95086..0d71a4b146d 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.cpp
+++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp
@@ -1,10 +1,11 @@
-#include "schema_actors.h"
+ #include "schema_actors.h"
#include "persqueue_utils.h"
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/library/persqueue/obfuscate/obfuscate.h>
+#include <ydb/core/client/server/ic_nodes_cache_service.h>
namespace NKikimr::NGRpcProxy::V1 {
@@ -61,9 +62,9 @@ void TPQDescribeTopicActor::StateWork(TAutoPtr<IEventHandle>& ev) {
}
-void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
+void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic
- if (ReplyIfNotTopic(ev, ctx)) {
+ if (ReplyIfNotTopic(ev)) {
return;
}
@@ -80,6 +81,7 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T
}
auto settings = result.mutable_settings();
+
if (response.PQGroupInfo) {
const auto &pqDescr = response.PQGroupInfo->Description;
settings->set_partitions_count(pqDescr.GetTotalGroupCount());
@@ -119,7 +121,7 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T
settings->set_message_group_seqno_retention_period_ms(partConfig.GetSourceIdLifetimeSeconds() * 1000);
settings->set_max_partition_message_groups_seqno_stored(partConfig.GetSourceIdMaxCounts());
- if (local || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) {
+ if (local || AppData(ActorContext())->PQConfig.GetTopicsAreFirstClassCitizen()) {
settings->set_max_partition_write_speed(partConfig.GetWriteSpeedInBytesPerSecond());
settings->set_max_partition_write_burst(partConfig.GetBurstSize());
}
@@ -131,10 +133,10 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T
settings->add_supported_codecs((Ydb::PersQueue::V1::Codec) (codec + 1));
}
- const auto& pqConfig = AppData(ctx)->PQConfig;
+ const auto& pqConfig = AppData(ActorContext())->PQConfig;
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
auto rr = settings->add_read_rules();
- auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx);
+ auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ActorContext());
rr->set_consumer_name(consumerName);
rr->set_starting_message_timestamp_ms(config.GetReadFromTimestampsMs(i));
rr->set_supported_format(
@@ -160,7 +162,7 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T
"service type must be set for all read rules",
Ydb::PersQueue::ErrorCode::ERROR
));
- Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
+ Reply(Ydb::StatusIds::INTERNAL_ERROR, ActorContext());
return;
}
rr->set_service_type(pqConfig.GetDefaultClientServiceType().GetName());
@@ -202,14 +204,13 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T
rmr->set_database(partConfig.GetMirrorFrom().GetDatabase());
}
}
- return ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ctx);
+ return ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ActorContext());
}
void TPQDescribeTopicActor::Bootstrap(const NActors::TActorContext& ctx)
{
TBase::Bootstrap(ctx);
-
SendDescribeProposeRequest(ctx);
Become(&TPQDescribeTopicActor::StateWork);
}
@@ -350,7 +351,7 @@ void TPQCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransacti
workingDir, proposal.Record.GetDatabaseName(), LocalCluster);
if (!error.empty()) {
Request_->RaiseIssue(FillIssue(error, Ydb::PersQueue::ErrorCode::BAD_REQUEST));
- return ReplyWithResult(status, ctx);
+ return RespondWithCode(status);
}
}
@@ -359,11 +360,11 @@ void TPQCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransacti
if (!LocalCluster.empty() && config.GetLocalDC() && config.GetDC() != LocalCluster) {
Request_->RaiseIssue(FillIssue(TStringBuilder() << "Local cluster is not correct - provided '" << config.GetDC()
<< "' instead of " << LocalCluster, Ydb::PersQueue::ErrorCode::BAD_REQUEST));
- return ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx);
+ return RespondWithCode(Ydb::StatusIds::BAD_REQUEST);
}
if (Count(Clusters, config.GetDC()) == 0 && !Clusters.empty()) {
Request_->RaiseIssue(FillIssue(TStringBuilder() << "Unknown cluster '" << config.GetDC() << "'", Ydb::PersQueue::ErrorCode::BAD_REQUEST));
- return ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx);
+ return RespondWithCode(Ydb::StatusIds::BAD_REQUEST);
}
}
@@ -383,7 +384,7 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction
if (!error.empty()) {
Request_->RaiseIssue(FillIssue(error, Ydb::PersQueue::ErrorCode::BAD_REQUEST));
- return ReplyWithResult(status, ctx);
+ return RespondWithCode(status);
}
}
@@ -393,11 +394,11 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction
if (!LocalCluster.empty() && config.GetLocalDC() && config.GetDC() != LocalCluster) {
Request_->RaiseIssue(FillIssue(TStringBuilder() << "Local cluster is not correct - provided '" << config.GetDC()
<< "' instead of " << LocalCluster, Ydb::PersQueue::ErrorCode::BAD_REQUEST));
- return ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx);
+ return RespondWithCode(Ydb::StatusIds::BAD_REQUEST);
}
if (Count(Clusters, config.GetDC()) == 0 && !Clusters.empty()) {
Request_->RaiseIssue(FillIssue(TStringBuilder() << "Unknown cluster '" << config.GetDC() << "'", Ydb::PersQueue::ErrorCode::BAD_REQUEST));
- return ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx);
+ return RespondWithCode(Ydb::StatusIds::BAD_REQUEST);
}
}
@@ -413,7 +414,7 @@ void TPQAlterTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransactio
if (!error.empty()) {
Request_->RaiseIssue(FillIssue(error, Ydb::PersQueue::ErrorCode::BAD_REQUEST));
- return ReplyWithResult(status, ctx);
+ return RespondWithCode(status);
}
}
@@ -448,20 +449,24 @@ void TAlterTopicActor::ModifyPersqueueConfig(
auto status = FillProposeRequestImpl(*GetProtoRequest(), groupConfig, ctx, error, GetCdcStreamName().Defined());
if (!error.empty()) {
Request_->RaiseIssue(FillIssue(error, Ydb::PersQueue::ErrorCode::BAD_REQUEST));
- return ReplyWithResult(status, ctx);
+ return RespondWithCode(status);
}
}
TDescribeTopicActor::TDescribeTopicActor(NKikimr::NGRpcService::TEvDescribeTopicRequest* request)
: TBase(request, request->GetProtoRequest()->path())
- , TDescribeTopicActorImpl("")
+ , TDescribeTopicActorImpl(TDescribeTopicActorSettings::DescribeTopic(
+ request->GetProtoRequest()->include_stats(),
+ request->GetProtoRequest()->include_location()))
{
}
TDescribeTopicActor::TDescribeTopicActor(NKikimr::NGRpcService::IRequestOpCtx * ctx)
: TBase(ctx, dynamic_cast<const Ydb::Topic::DescribeTopicRequest*>(ctx->GetRequest())->path())
- , TDescribeTopicActorImpl("")
+ , TDescribeTopicActorImpl(TDescribeTopicActorSettings::DescribeTopic(
+ dynamic_cast<const Ydb::Topic::DescribeTopicRequest*>(ctx->GetRequest())->include_stats(),
+ dynamic_cast<const Ydb::Topic::DescribeTopicRequest*>(ctx->GetRequest())->include_location()))
{
}
@@ -469,20 +474,27 @@ TDescribeTopicActor::TDescribeTopicActor(NKikimr::NGRpcService::IRequestOpCtx *
TDescribeConsumerActor::TDescribeConsumerActor(NKikimr::NGRpcService::TEvDescribeConsumerRequest* request)
: TBase(request, request->GetProtoRequest()->path())
- , TDescribeTopicActorImpl(request->GetProtoRequest()->consumer())
+ , TDescribeTopicActorImpl(TDescribeTopicActorSettings::DescribeConsumer(
+ request->GetProtoRequest()->consumer(),
+ request->GetProtoRequest()->include_stats(),
+ request->GetProtoRequest()->include_location()))
{
}
TDescribeConsumerActor::TDescribeConsumerActor(NKikimr::NGRpcService::IRequestOpCtx * ctx)
: TBase(ctx, dynamic_cast<const Ydb::Topic::DescribeConsumerRequest*>(ctx->GetRequest())->path())
- , TDescribeTopicActorImpl(dynamic_cast<const Ydb::Topic::DescribeConsumerRequest*>(ctx->GetRequest())->consumer())
+ , TDescribeTopicActorImpl(TDescribeTopicActorSettings::DescribeConsumer(
+ dynamic_cast<const Ydb::Topic::DescribeConsumerRequest*>(ctx->GetRequest())->consumer(),
+ dynamic_cast<const Ydb::Topic::DescribeTopicRequest*>(ctx->GetRequest())->include_stats(),
+ dynamic_cast<const Ydb::Topic::DescribeTopicRequest*>(ctx->GetRequest())->include_location()))
{
}
-TDescribeTopicActorImpl::TDescribeTopicActorImpl(const TString& consumer)
- : Consumer(consumer)
+TDescribeTopicActorImpl::TDescribeTopicActorImpl(const TDescribeTopicActorSettings& settings)
+ : Settings(settings)
{
+
}
@@ -492,6 +504,8 @@ bool TDescribeTopicActorImpl::StateWork(TAutoPtr<IEventHandle>& ev, const TActor
HFuncCtx(TEvTabletPipe::TEvClientConnected, Handle, ctx);
HFuncCtx(NKikimr::TEvPersQueue::TEvStatusResponse, Handle, ctx);
HFuncCtx(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse, Handle, ctx);
+ HFuncCtx(TEvPersQueue::TEvGetPartitionsLocationResponse, Handle, ctx);
+ HFuncCtx(TEvents::TEvWakeup, HandleWakeup, ctx);
default: return false;
}
return true;
@@ -534,7 +548,6 @@ void TDescribeConsumerActor::RaiseError(const TString& error, const Ydb::PersQue
TBase::Reply(status, ctx);
}
-
void TDescribeTopicActorImpl::RestartTablet(ui64 tabletId, const TActorContext& ctx, TActorId pipe, const TDuration& delay) {
auto it = Tablets.find(tabletId);
if (it == Tablets.end()) return;
@@ -542,6 +555,10 @@ void TDescribeTopicActorImpl::RestartTablet(ui64 tabletId, const TActorContext&
if (--it->second.RetriesLeft == 0) {
return RaiseError(TStringBuilder() << "Tablet " << tabletId << " unresponsible", Ydb::PersQueue::ErrorCode::ERROR, Ydb::StatusIds::INTERNAL_ERROR, ctx);
}
+ it->second.Pipe = TActorId{};
+ if (tabletId == BalancerTabletId) {
+ BalancerPipe = nullptr;
+ }
Y_VERIFY(RequestsInfly > 0);
--RequestsInfly;
if (delay == TDuration::Zero()) {
@@ -556,22 +573,92 @@ void TDescribeTopicActorImpl::Handle(TEvPQProxy::TEvRequestTablet::TPtr& ev, con
--RequestsInfly;
auto it = Tablets.find(ev->Get()->TabletId);
if (it == Tablets.end()) return;
+ if (ev->Get()->TabletId == BalancerTabletId && PendingLocation) {
+ PendingLocation = false;
+ }
+
RequestTablet(it->second, ctx);
}
+void TDescribeTopicActorImpl::RequestTablet(ui64 tabletId, const TActorContext& ctx) {
+ auto it = Tablets.find(tabletId);
+ if (it != Tablets.end()) {
+ RequestTablet(it->second, ctx);
+ }
+}
+
+TActorId CreatePipe(ui64 tabletId, const TActorContext& ctx) {
+ return ctx.Register(NTabletPipe::CreateClient(
+ ctx.SelfID, tabletId, NTabletPipe::TClientConfig(NTabletPipe::TClientRetryPolicy::WithRetries())
+ ));
+}
+
void TDescribeTopicActorImpl::RequestTablet(TTabletInfo& tablet, const TActorContext& ctx) {
- tablet.Pipe = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, tablet.TabletId, NTabletPipe::TClientConfig(NTabletPipe::TClientRetryPolicy::WithRetries())));
+ if (!tablet.Pipe)
+ tablet.Pipe = CreatePipe(tablet.TabletId, ctx);
if (tablet.TabletId == BalancerTabletId) {
- THolder<NKikimr::TEvPersQueue::TEvGetReadSessionsInfo> ev(new NKikimr::TEvPersQueue::TEvGetReadSessionsInfo(NPersQueue::ConvertNewConsumerName(Consumer, ctx)));
- NTabletPipe::SendData(ctx, tablet.Pipe, ev.Release());
+ BalancerPipe = &tablet.Pipe;
+ return RequestBalancer(ctx);
} else {
- THolder<NKikimr::TEvPersQueue::TEvStatus> ev(new NKikimr::TEvPersQueue::TEvStatus(Consumer.empty() ? "" : NPersQueue::ConvertNewConsumerName(Consumer, ctx), Consumer.empty()));
+ THolder<NKikimr::TEvPersQueue::TEvStatus> ev(new NKikimr::TEvPersQueue::TEvStatus(
+ Settings.Consumer.empty() ? "" : NPersQueue::ConvertNewConsumerName(Settings.Consumer, ctx),
+ Settings.Consumer.empty()
+ ));
NTabletPipe::SendData(ctx, tablet.Pipe, ev.Release());
}
++RequestsInfly;
}
+void TDescribeTopicActorImpl::RequestBalancer(const TActorContext& ctx) {
+ Y_VERIFY(BalancerTabletId);
+ if (Settings.RequireLocation && !PendingLocation && !GotLocation) {
+ return RequestPartitionsLocationIfRequired(ctx);
+ }
+ switch (Settings.Mode) {
+ case TDescribeTopicActorSettings::EMode::DescribeConsumer:
+ case TDescribeTopicActorSettings::EMode::DescribeTopic:
+ if (Settings.RequireStats) {
+ NTabletPipe::SendData(
+ ctx, *BalancerPipe,
+ new TEvPersQueue::TEvGetReadSessionsInfo(NPersQueue::ConvertNewConsumerName(Settings.Consumer, ctx))
+ );
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Request sessions");
+ ++RequestsInfly;
+ }
+ break;
+ case TDescribeTopicActorSettings::EMode::DescribePartitions: {
+ break;
+ }
+ }
+}
+
+void TDescribeTopicActorImpl::RequestPartitionsLocationIfRequired(const TActorContext& ctx) {
+ if (!Settings.RequireLocation || PendingLocation)
+ return;
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Request location");
+ THashSet<ui64> partIds;
+ TVector<ui64> partsVector;
+ for (auto p : Settings.Partitions) {
+ if (p >= TotalPartitions) {
+ return RaiseError(
+ TStringBuilder() << "No partition " << Settings.Partitions[0] << " in topic",
+ Ydb::PersQueue::ErrorCode::BAD_REQUEST, Ydb::StatusIds::BAD_REQUEST, ctx
+ );
+ }
+ auto res = partIds.insert(p);
+ if (res.second) {
+ partsVector.push_back(p);
+ }
+ }
+ NTabletPipe::SendData(
+ ctx, *BalancerPipe,
+ new TEvPersQueue::TEvGetPartitionsLocation(partsVector)
+ );
+ PendingLocation = true;
+ GotLocation = false;
+}
+
void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) {
auto it = Tablets.find(ev->Get()->Record.GetTabletId());
if (it == Tablets.end()) return;
@@ -592,7 +679,7 @@ void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::T
if (RequestsInfly == 0) {
RequestAdditionalInfo(ctx);
- if (RequestsInfly == 0) {
+ if (RequestsInfly == 0 && !PendingLocation) {
Reply(ctx);
}
}
@@ -600,25 +687,58 @@ void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::T
void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Got sessions");
+
if (BalancerTabletId == 0)
return;
auto it = Tablets.find(BalancerTabletId);
Y_VERIFY(it != Tablets.end());
--RequestsInfly;
- NTabletPipe::CloseClient(ctx, it->second.Pipe);
- it->second.Pipe = TActorId{};
+ NTabletPipe::CloseClient(ctx, *BalancerPipe);
+ *BalancerPipe = TActorId{};
BalancerTabletId = 0;
ApplyResponse(it->second, ev, ctx);
if (RequestsInfly == 0) {
RequestAdditionalInfo(ctx);
- if (RequestsInfly == 0) {
+ if (RequestsInfly == 0 && !PendingLocation) {
Reply(ctx);
}
}
}
+void TDescribeTopicActorImpl::Handle(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Got location");
+ if (BalancerTabletId == 0)
+ return;
+ auto it = Tablets.find(BalancerTabletId);
+ Y_VERIFY(it != Tablets.end());
+ PendingLocation = false;
+ const auto& record = ev->Get()->Record;
+ if (record.GetStatus()) {
+ auto res = ApplyResponse(ev, ctx);
+ if (res) {
+ GotLocation = true;
+ CheckCloseBalancerPipe(ctx);
+ if (!RequestsInfly && !PendingLocation) {
+ Reply(ctx);
+ }
+ return;
+ }
+ }
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Something wrong on location, retry");
+ //Something gone wrong, retry
+ ctx.Schedule(TDuration::MilliSeconds(200), new TEvents::TEvWakeup());
+}
+
+void TDescribeTopicActorImpl::HandleWakeup(TEvents::TEvWakeup::TPtr&, const NActors::TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Wakeup");
+ RequestPartitionsLocationIfRequired(ctx);
+ if (!RequestsInfly && !PendingLocation) {
+ return Reply(ctx);
+ }
+}
void TDescribeTopicActorImpl::RequestAdditionalInfo(const TActorContext& ctx) {
if (BalancerTabletId) {
@@ -626,10 +746,16 @@ void TDescribeTopicActorImpl::RequestAdditionalInfo(const TActorContext& ctx) {
}
}
-void TDescribeTopicActorImpl::RequestTablet(ui64 tabletId, const TActorContext& ctx) {
- auto it = Tablets.find(tabletId);
- if (it != Tablets.end()) {
- RequestTablet(it->second, ctx);
+void TDescribeTopicActorImpl::CheckCloseBalancerPipe(const TActorContext& ctx) {
+ switch (Settings.Mode) {
+ case TDescribeTopicActorSettings::EMode::DescribePartitions:
+ if (RequestsInfly || PendingLocation)
+ return;
+ // no break;
+ default:
+ NTabletPipe::CloseClient(ctx, *BalancerPipe);
+ *BalancerPipe = TActorId{};
+ BalancerTabletId = 0;
}
}
@@ -648,6 +774,11 @@ void UpdateProtoTime(T* proto, const ui64 ms, bool storeMin) {
}
}
+void SetPartitionLocation(const NKikimrPQ::TPartitionLocation& location, Ydb::Topic::PartitionLocation* result) {
+ result->set_node_id(location.GetNodeId());
+ result->set_generation(location.GetGeneration());
+}
+
void TDescribeTopicActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ctx);
@@ -663,6 +794,19 @@ void AddWindowsStat(Ydb::Topic::MultipleWindowsStat *stat, ui64 perMin, ui64 per
stat->set_per_day(stat->per_day() + perDay);
}
+void FillPartitionStats(const NKikimrPQ::TStatusResponse::TPartResult& partResult, Ydb::Topic::PartitionStats* partStats, ui64 nodeId) {
+ partStats->set_store_size_bytes(partResult.GetPartitionSize());
+ partStats->mutable_partition_offsets()->set_start(partResult.GetStartOffset());
+ partStats->mutable_partition_offsets()->set_end(partResult.GetEndOffset());
+
+ SetProtoTime(partStats->mutable_last_write_time(), partResult.GetLastWriteTimestampMs());
+ SetProtoTime(partStats->mutable_max_write_time_lag(), partResult.GetWriteLagMs());
+
+ AddWindowsStat(partStats->mutable_bytes_written(), partResult.GetAvgWriteSpeedPerMin(), partResult.GetAvgWriteSpeedPerHour(), partResult.GetAvgWriteSpeedPerDay());
+
+ partStats->set_partition_node_id(nodeId);
+}
+
void TDescribeTopicActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) {
Y_UNUSED(ctx);
@@ -718,30 +862,41 @@ void TDescribeTopicActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPer
for (auto& partRes : *(Result.mutable_partitions())) {
auto it = res.find(partRes.partition_id());
- if (it == res.end()) continue;
-
- const auto& partResult = it->second;
- auto partStats = partRes.mutable_partition_stats();
-
- partStats->set_store_size_bytes(partResult.GetPartitionSize());
- partStats->mutable_partition_offsets()->set_start(partResult.GetStartOffset());
- partStats->mutable_partition_offsets()->set_end(partResult.GetEndOffset());
+ if (it == res.end())
+ continue;
+ FillPartitionStats(it->second, partRes.mutable_partition_stats(), tabletInfo.NodeId);
+ }
+}
- SetProtoTime(partStats->mutable_last_write_time(), partResult.GetLastWriteTimestampMs());
- SetProtoTime(partStats->mutable_max_write_time_lag(), partResult.GetWriteLagMs());
+bool TDescribeTopicActor::ApplyResponse(
+ TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext&
+) {
+ const auto& record = ev->Get()->Record;
+ Y_VERIFY(record.LocationsSize() == TotalPartitions);
+ Y_VERIFY(Settings.RequireLocation);
- AddWindowsStat(partStats->mutable_bytes_written(), partResult.GetAvgWriteSpeedPerMin(), partResult.GetAvgWriteSpeedPerHour(), partResult.GetAvgWriteSpeedPerDay());
+ for (auto i = 0u; i < TotalPartitions; ++i) {
+ const auto& location = record.GetLocations(i);
+ auto* locationResult = Result.mutable_partitions(i)->mutable_partition_location();
+ SetPartitionLocation(location, locationResult);
- partStats->set_partition_node_id(tabletInfo.NodeId);
}
+ return true;
}
+
void TDescribeTopicActor::Reply(const TActorContext& ctx) {
+ if (TBase::IsDead) {
+ return;
+ }
return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
}
void TDescribeConsumerActor::Reply(const TActorContext& ctx) {
+ if (TBase::IsDead) {
+ return;
+ }
return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
}
@@ -798,7 +953,7 @@ void TDescribeConsumerActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEv
partStats->set_partition_node_id(tabletInfo.NodeId);
- if (Consumer) {
+ if (Settings.Consumer) {
auto consStats = partRes.mutable_partition_consumer_stats();
consStats->set_last_read_offset(partResult.GetLagsInfo().GetReadPosition().GetOffset());
@@ -831,7 +986,21 @@ void TDescribeConsumerActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEv
}
}
+bool TDescribeConsumerActor::ApplyResponse(
+ TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext&
+) {
+ const auto& record = ev->Get()->Record;
+ Y_VERIFY(record.LocationsSize() == TotalPartitions);
+ Y_VERIFY(Settings.RequireLocation);
+ for (auto i = 0u; i < TotalPartitions; ++i) {
+ const auto& location = record.GetLocations(i);
+ auto* locationResult = Result.mutable_partitions(i)->mutable_partition_location();
+ SetPartitionLocation(location, locationResult);
+ }
+ return true;
+}
+
bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig& config, ui32 i,
const NActors::TActorContext& ctx, Ydb::StatusIds::StatusCode& status, TString& error)
@@ -871,9 +1040,9 @@ bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfi
return true;
}
-void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
+void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic
- if (ReplyIfNotTopic(ev, ctx)) {
+ if (ReplyIfNotTopic(ev)) {
return;
}
@@ -888,7 +1057,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
}
if (response.PQGroupInfo) {
- const auto &pqDescr = response.PQGroupInfo->Description;
+ const auto& pqDescr = response.PQGroupInfo->Description;
Result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount());
for(ui32 i = 0; i < pqDescr.GetTotalGroupCount(); ++i) {
auto part = Result.add_partitions();
@@ -929,7 +1098,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
(*Result.mutable_attributes())["_message_group_seqno_retention_period_ms"] = TStringBuilder() << (partConfig.GetSourceIdLifetimeSeconds() * 1000);
(*Result.mutable_attributes())["__max_partition_message_groups_seqno_stored"] = TStringBuilder() << partConfig.GetSourceIdMaxCounts();
- const auto& pqConfig = AppData(ctx)->PQConfig;
+ const auto& pqConfig = AppData(ActorContext())->PQConfig;
if (local || pqConfig.GetTopicsAreFirstClassCitizen()) {
Result.set_partition_write_speed_bytes_per_second(partConfig.GetWriteSpeedInBytesPerSecond());
@@ -958,34 +1127,37 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv
break;
}
}
- auto consumerName = NPersQueue::ConvertNewConsumerName(Consumer, ctx);
+ auto consumerName = NPersQueue::ConvertNewConsumerName(Settings.Consumer, ActorContext());
bool found = false;
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
if (consumerName == config.GetReadRules(i)) found = true;
auto rr = Result.add_consumers();
Ydb::StatusIds::StatusCode status;
TString error;
- if (!FillConsumerProto(rr, config, i, ctx, status, error)) {
- return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ctx);
+ if (!FillConsumerProto(rr, config, i, ActorContext(), status, error)) {
+ return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ActorContext());
}
}
- if (GetProtoRequest()->include_stats()) {
- if (Consumer && !found) {
- Request_->RaiseIssue(FillIssue(TStringBuilder() << "no consumer '" << Consumer << "' in topic", Ydb::PersQueue::ErrorCode::BAD_REQUEST));
- return ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, ctx);
+ if (GetProtoRequest()->include_stats() || GetProtoRequest()->include_location()) {
+ if (Settings.Consumer && !found) {
+ Request_->RaiseIssue(FillIssue(
+ TStringBuilder() << "no consumer '" << Settings.Consumer << "' in topic",
+ Ydb::PersQueue::ErrorCode::BAD_REQUEST
+ ));
+ return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
}
- ProcessTablets(pqDescr, ctx);
+ ProcessTablets(pqDescr, ActorContext());
return;
}
}
- return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
+ return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ActorContext());
}
-void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
+void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic
- if (ReplyIfNotTopic(ev, ctx)) {
+ if (ReplyIfNotTopic(ev)) {
return;
}
const auto& response = ev->Get()->Request.Get()->ResultSet.front();
@@ -998,7 +1170,7 @@ void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::
if (const auto& name = GetCdcStreamName()) {
selfEntry->set_name(*name);
}
- selfEntry->set_name(selfEntry->name() + "/" + Consumer);
+ selfEntry->set_name(selfEntry->name() + "/" + Settings.Consumer);
if (response.PQGroupInfo) {
const auto& pqDescr = response.PQGroupInfo->Description;
@@ -1010,7 +1182,7 @@ void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::
part->set_active(true);
}
- auto consumerName = NPersQueue::ConvertNewConsumerName(Consumer, ctx);
+ auto consumerName = NPersQueue::ConvertNewConsumerName(Settings.Consumer, ActorContext());
bool found = false;
for (ui32 i = 0; i < config.ReadRulesSize(); ++i) {
if (consumerName != config.GetReadRules(i))
@@ -1019,44 +1191,71 @@ void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::
auto rr = Result.mutable_consumer();
Ydb::StatusIds::StatusCode status;
TString error;
- if (!FillConsumerProto(rr, config, i, ctx, status, error)) {
- return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ctx);
+ if (!FillConsumerProto(rr, config, i, ActorContext(), status, error)) {
+ return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ActorContext());
}
break;
}
if (!found) {
- Request_->RaiseIssue(FillIssue(TStringBuilder() << "no consumer '" << Consumer << "' in topic", Ydb::PersQueue::ErrorCode::BAD_REQUEST));
- return ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, ctx);
+ Request_->RaiseIssue(FillIssue(
+ TStringBuilder() << "no consumer '" << Settings.Consumer << "' in topic",
+ Ydb::PersQueue::ErrorCode::BAD_REQUEST
+ ));
+ return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR);
}
- if (GetProtoRequest()->include_stats()) {
- ProcessTablets(pqDescr, ctx);
+ if (GetProtoRequest()->include_stats() || GetProtoRequest()->include_location()) {
+ ProcessTablets(pqDescr, ActorContext());
return;
}
}
- return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
+ return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ActorContext());
}
-bool TDescribeTopicActorImpl::ProcessTablets(const NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, const TActorContext& ctx) {
+
+bool TDescribeTopicActorImpl::ProcessTablets(
+ const NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, const TActorContext& ctx
+) {
+ auto addBalancer = [&] {
+ BalancerTabletId = pqDescr.GetBalancerTabletID();
+ Tablets[BalancerTabletId].TabletId = BalancerTabletId;
+ };
+
+ auto partitionFilter = [&] (ui32 partId) {
+ if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribePartitions) {
+ return Settings.RequireStats && partId == Settings.Partitions[0];
+ } else {
+ return Settings.RequireStats;
+ }
+ return true;
+ };
+ TotalPartitions = pqDescr.GetTotalGroupCount();
+
for (ui32 i = 0; i < pqDescr.PartitionsSize(); ++i) {
const auto& pi = pqDescr.GetPartitions(i);
+ if (!partitionFilter(pi.GetPartitionId())) {
+ continue;
+ }
Tablets[pi.GetTabletId()].Partitions.push_back(pi.GetPartitionId());
Tablets[pi.GetTabletId()].TabletId = pi.GetTabletId();
}
for (auto& pair : Tablets) {
RequestTablet(pair.second, ctx);
}
- if (!Consumer.empty()) {
- BalancerTabletId = pqDescr.GetBalancerTabletID();
- Tablets[BalancerTabletId].TabletId = BalancerTabletId;
+
+ if (Settings.RequireLocation) {
+ addBalancer();
+ RequestAdditionalInfo(ctx);
+ } else if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribeConsumer) {
+ addBalancer();
}
-
- if (RequestsInfly == 0) {
+ if (RequestsInfly == 0 && !PendingLocation) {
Reply(ctx);
return false;
}
+
return true;
}
@@ -1066,6 +1265,7 @@ void TDescribeTopicActor::Bootstrap(const NActors::TActorContext& ctx)
SendDescribeProposeRequest(ctx);
Become(&TDescribeTopicActor::StateWork);
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "Describe topic actor for path " << GetProtoRequest()->path());
}
void TDescribeConsumerActor::Bootstrap(const NActors::TActorContext& ctx)
@@ -1076,4 +1276,206 @@ void TDescribeConsumerActor::Bootstrap(const NActors::TActorContext& ctx)
Become(&TDescribeConsumerActor::StateWork);
}
+
+template<class TProtoType>
+TDescribeTopicActorSettings SettingsFromDescribePartRequest(TProtoType* request) {
+ return TDescribeTopicActorSettings::DescribePartitionSettings(
+ request->partition_id(), request->include_stats(), request->include_location()
+ );
+}
+
+TDescribePartitionActor::TDescribePartitionActor(NKikimr::NGRpcService::TEvDescribePartitionRequest* request)
+ : TBase(request, request->GetProtoRequest()->path())
+ , TDescribeTopicActorImpl(SettingsFromDescribePartRequest(request->GetProtoRequest()))
+{
+}
+
+TDescribePartitionActor::TDescribePartitionActor(NKikimr::NGRpcService::IRequestOpCtx* ctx)
+ : TBase(ctx, dynamic_cast<const Ydb::Topic::DescribePartitionRequest*>(ctx->GetRequest())->path())
+ , TDescribeTopicActorImpl(SettingsFromDescribePartRequest(dynamic_cast<const Ydb::Topic::DescribePartitionRequest*>(ctx->GetRequest())))
+{
+}
+
+void TDescribePartitionActor::Bootstrap(const NActors::TActorContext& ctx)
+{
+ TBase::Bootstrap(ctx);
+ SendDescribeProposeRequest(ctx);
+ Become(&TDescribePartitionActor::StateWork);
+}
+
+void TDescribePartitionActor::StateWork(TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ default:
+ if (!TDescribeTopicActorImpl::StateWork(ev, ActorContext())) {
+ TBase::StateWork(ev);
+ };
+ }
+}
+
+void TDescribePartitionActor::HandleCacheNavigateResponse(
+ TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev
+) {
+ Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic
+ if (ReplyIfNotTopic(ev)) {
+ return;
+ }
+ PQGroupInfo = ev->Get()->Request->ResultSet[0].PQGroupInfo;
+ auto* partRes = Result.mutable_partition();
+ partRes->set_partition_id(Settings.Partitions[0]);
+ partRes->set_active(true);
+ ProcessTablets(PQGroupInfo->Description, this->ActorContext());
+}
+
+void TDescribePartitionActor::ApplyResponse(TTabletInfo&, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr&, const TActorContext&) {
+ Y_FAIL("");
+}
+
+void TDescribePartitionActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext&) {
+ auto* partResult = Result.mutable_partition();
+
+ const auto& record = ev->Get()->Record;
+ for (auto partData : record.GetPartResult()) {
+ if ((ui32)partData.GetPartition() != Settings.Partitions[0])
+ continue;
+
+ Y_VERIFY((ui32)(partData.GetPartition()) == Settings.Partitions[0]);
+ partResult->set_partition_id(partData.GetPartition());
+ partResult->set_active(true);
+ FillPartitionStats(partData, partResult->mutable_partition_stats(), tabletInfo.NodeId);
+ }
+}
+
+bool TDescribePartitionActor::ApplyResponse(
+ TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext&
+) {
+ const auto& record = ev->Get()->Record;
+ if (Settings.Partitions) {
+ Y_VERIFY(record.LocationsSize() == 1);
+ }
+
+ const auto& location = record.GetLocations(0);
+ auto* pResult = Result.mutable_partition();
+ pResult->set_partition_id(location.GetPartitionId());
+ pResult->set_active(true);
+ auto* locationResult = pResult->mutable_partition_location();
+ SetPartitionLocation(location, locationResult);
+ return true;
+}
+
+void TDescribePartitionActor::RaiseError(
+ const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status,
+ const TActorContext&
+) {
+ if (TBase::IsDead)
+ return;
+ this->Request_->RaiseIssue(FillIssue(error, errorCode));
+ TBase::RespondWithCode(status);
+}
+
+void TDescribePartitionActor::Reply(const TActorContext& ctx) {
+ if (TBase::IsDead) {
+ return;
+ }
+ if (Settings.RequireLocation) {
+ Y_VERIFY(Result.partition().has_partition_location());
+ }
+ return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
+}
+
+using namespace NIcNodeCache;
+
+TPartitionsLocationActor::TPartitionsLocationActor(const TGetPartitionsLocationRequest& request, const TActorId& requester)
+ : TBase(request, requester)
+ , TDescribeTopicActorImpl(TDescribeTopicActorSettings::GetPartitionsLocation(request.PartitionIds))
+{
+}
+
+
+void TPartitionsLocationActor::Bootstrap(const NActors::TActorContext&)
+{
+ SendDescribeProposeRequest();
+ Become(&TPartitionsLocationActor::StateWork);
+ SendNodesRequest();
+
+}
+
+void TPartitionsLocationActor::StateWork(TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, Handle);
+ default:
+ if (!TDescribeTopicActorImpl::StateWork(ev, ActorContext())) {
+ TBase::StateWork(ev);
+ };
+ }
+}
+
+void TPartitionsLocationActor::HandleCacheNavigateResponse(
+ TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev
+) {
+ if (!TBase::HandleCacheNavigateResponseBase(ev)) {
+ return;
+ }
+ ProcessTablets(PQGroupInfo->Description, this->ActorContext());
+}
+
+bool TPartitionsLocationActor::ApplyResponse(
+ TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext&
+) {
+ const auto& record = ev->Get()->Record;
+ for (auto i = 0u; i < record.LocationsSize(); i++) {
+ const auto& part = record.GetLocations(i);
+ TEvPQProxy::TPartitionLocationInfo partLocation;
+ ui64 nodeId = part.GetNodeId();
+
+ partLocation.PartitionId = part.GetPartitionId();
+ partLocation.IncGeneration = part.GetGeneration() + 1;
+ partLocation.NodeId = nodeId;
+ Response->Partitions.emplace_back(std::move(partLocation));
+ }
+ if (GotNodesInfo)
+ Finalize();
+ else
+ GotPartitions = true;
+ return true;
+}
+
+void TPartitionsLocationActor::SendNodesRequest() const {
+ auto* icEv = new TEvICNodesInfoCache::TEvGetAllNodesInfoRequest();
+ ActorContext().Send(CreateICNodesInfoCacheServiceId(), icEv);
+
}
+
+void TPartitionsLocationActor::Handle(TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev) {
+ NodesInfoEv = ev;
+ if (GotPartitions)
+ Finalize();
+ else
+ GotNodesInfo = true;
+}
+
+void TPartitionsLocationActor::Finalize() {
+ if (Settings.Partitions) {
+ Y_VERIFY(Response->Partitions.size() == Settings.Partitions.size());
+ } else {
+ Y_VERIFY(Response->Partitions.size() == PQGroupInfo->Description.PartitionsSize());
+ }
+ for (auto& pInResponse : Response->Partitions) {
+ auto iter = NodesInfoEv->Get()->NodeIdsMapping->find(pInResponse.NodeId);
+ if (iter.IsEnd()) {
+ return RaiseError(
+ TStringBuilder() << "Hostname not found for nodeId " << pInResponse.NodeId,
+ Ydb::PersQueue::ErrorCode::ERROR,
+ Ydb::StatusIds::INTERNAL_ERROR, ActorContext()
+ );
+ }
+ pInResponse.Hostname = (*NodesInfoEv->Get()->Nodes)[iter->second].Host;
+ }
+ TBase::RespondWithCode(Ydb::StatusIds::SUCCESS);
+}
+
+void TPartitionsLocationActor::RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext&) {
+ this->AddIssue(FillIssue(error, errorCode));
+ this->RespondWithCode(status);
+}
+
+} // namespace NKikimr::NGRpcProxy::V1
diff --git a/ydb/services/persqueue_v1/actors/schema_actors.h b/ydb/services/persqueue_v1/actors/schema_actors.h
index 1579ab50274..e5d25dcaee9 100644
--- a/ydb/services/persqueue_v1/actors/schema_actors.h
+++ b/ydb/services/persqueue_v1/actors/schema_actors.h
@@ -1,13 +1,14 @@
#pragma once
#include "events.h"
-#include <ydb/services/lib/actors/pq_schema_actor.h>
#include <ydb/core/persqueue/events/global.h>
+#include <ydb/services/lib/actors/pq_schema_actor.h>
+#include <ydb/core/client/server/ic_nodes_cache_service.h>
+
namespace NKikimr::NGRpcProxy::V1 {
using namespace NKikimr::NGRpcService;
-
class TDropPropose {
public:
TDropPropose() {}
@@ -26,7 +27,7 @@ public:
void Bootstrap(const NActors::TActorContext& ctx);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx){ Y_UNUSED(ev); Y_UNUSED(ctx); }
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); }
};
class TDropTopicActor : public TPQGrpcSchemaBase<TDropTopicActor, NKikimr::NGRpcService::TEvDropTopicRequest>, public TDropPropose {
@@ -39,7 +40,7 @@ public:
void Bootstrap(const NActors::TActorContext& ctx);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx){ Y_UNUSED(ev); Y_UNUSED(ctx); }
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); }
};
class TPQDescribeTopicActor : public TPQGrpcSchemaBase<TPQDescribeTopicActor, NKikimr::NGRpcService::TEvPQDescribeTopicRequest>
@@ -55,7 +56,58 @@ public:
void Bootstrap(const NActors::TActorContext& ctx);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
+};
+
+struct TDescribeTopicActorSettings {
+ enum class EMode {
+ DescribeTopic,
+ DescribeConsumer,
+ DescribePartitions,
+ };
+ EMode Mode;
+ TString Consumer;
+ TVector<ui32> Partitions;
+ bool RequireStats = false;
+ bool RequireLocation = false;
+
+ TDescribeTopicActorSettings()
+ : Mode(EMode::DescribeTopic)
+ {}
+
+ TDescribeTopicActorSettings(const TString& consumer)
+ : Mode(EMode::DescribeConsumer)
+ , Consumer(consumer)
+ {}
+ TDescribeTopicActorSettings(EMode mode, bool requireStats, bool requireLocation)
+ : Mode(mode)
+ , RequireStats(requireStats)
+ , RequireLocation(requireLocation)
+ {}
+
+ static TDescribeTopicActorSettings DescribeTopic(bool requireStats, bool requireLocation) {
+ return TDescribeTopicActorSettings{EMode::DescribeTopic, requireStats, requireLocation};
+ }
+
+ static TDescribeTopicActorSettings DescribeConsumer(const TString& consumer, bool requireStats, bool requireLocation)
+ {
+ TDescribeTopicActorSettings res{EMode::DescribeConsumer, requireStats, requireLocation};
+ res.Consumer = consumer;
+ return res;
+ }
+
+ static TDescribeTopicActorSettings GetPartitionsLocation(const TVector<ui32>& partitions) {
+ TDescribeTopicActorSettings res{EMode::DescribePartitions, false, true};
+ res.Partitions = partitions;
+ return res;
+ }
+
+ static TDescribeTopicActorSettings DescribePartitionSettings(ui32 partition, bool stats, bool location) {
+ TDescribeTopicActorSettings res{EMode::DescribePartitions, stats, location};
+ res.Partitions = {partition};
+ return res;
+ }
+
};
class TDescribeTopicActorImpl
@@ -67,9 +119,15 @@ protected:
TActorId Pipe;
ui32 NodeId = 0;
ui32 RetriesLeft = 3;
+
+ TTabletInfo() = default;
+ TTabletInfo(ui64 tabletId)
+ : TabletId(tabletId)
+ {}
};
+
public:
- TDescribeTopicActorImpl(const TString& consumer);
+ TDescribeTopicActorImpl(const TDescribeTopicActorSettings& settings);
virtual ~TDescribeTopicActorImpl() = default;
void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx);
@@ -77,36 +135,49 @@ public:
void Handle(NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx);
void Handle(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQProxy::TEvRequestTablet::TPtr& ev, const TActorContext& ctx);
+ void HandleWakeup(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx);
+
bool ProcessTablets(const NKikimrSchemeOp::TPersQueueGroupDescription& description, const TActorContext& ctx);
void RequestTablet(TTabletInfo& tablet, const TActorContext& ctx);
void RequestTablet(ui64 tabletId, const TActorContext& ctx);
void RestartTablet(ui64 tabletId, const TActorContext& ctx, TActorId pipe = {}, const TDuration& delay = TDuration::Zero());
void RequestAdditionalInfo(const TActorContext& ctx);
+ void RequestBalancer(const TActorContext& ctx);
+ void RequestPartitionsLocationIfRequired(const TActorContext& ctx);
+ void CheckCloseBalancerPipe(const TActorContext& ctx);
bool StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx);
- void Bootstrap(const NActors::TActorContext& ctx);
+ virtual void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) = 0;
- virtual void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) = 0;
+ virtual void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode,
+ const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) = 0;
+ virtual void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev,
+ const TActorContext& ctx) = 0;
+ virtual void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev,
+ const TActorContext& ctx) = 0;
+ virtual bool ApplyResponse(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr&, const TActorContext&) = 0;
- virtual void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) = 0;
- virtual void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) = 0;
- virtual void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) = 0;
virtual void Reply(const TActorContext& ctx) = 0;
private:
std::map<ui64, TTabletInfo> Tablets;
ui32 RequestsInfly = 0;
+ bool PendingLocation = false;
+ bool GotLocation = false;
ui64 BalancerTabletId = 0;
+ TActorId* BalancerPipe = nullptr;
protected:
- TString Consumer;
+ ui32 TotalPartitions = 0;
+ TDescribeTopicActorSettings Settings;
};
class TDescribeTopicActor : public TPQGrpcSchemaBase<TDescribeTopicActor, NKikimr::NGRpcService::TEvDescribeTopicRequest>
@@ -127,9 +198,10 @@ public:
void StateWork(TAutoPtr<IEventHandle>& ev);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) override;
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override;
void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override;
void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) override;
+ bool ApplyResponse(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) override;
virtual void Reply(const TActorContext& ctx) override;
private:
@@ -154,16 +226,45 @@ public:
void StateWork(TAutoPtr<IEventHandle>& ev);
void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) override;
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) override;
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override;
void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override;
void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) override;
+ bool ApplyResponse(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) override;
virtual void Reply(const TActorContext& ctx) override;
private:
Ydb::Topic::DescribeConsumerResult Result;
};
+class TDescribePartitionActor : public TPQGrpcSchemaBase<TDescribePartitionActor, NKikimr::NGRpcService::TEvDescribePartitionRequest>
+ , public TDescribeTopicActorImpl
+{
+using TBase = TPQGrpcSchemaBase<TDescribePartitionActor, NKikimr::NGRpcService::TEvDescribePartitionRequest>;
+using TTabletInfo = TDescribeTopicActorImpl::TTabletInfo;
+
+public:
+ TDescribePartitionActor(NKikimr::NGRpcService::TEvDescribePartitionRequest* request);
+ TDescribePartitionActor(NKikimr::NGRpcService::IRequestOpCtx * ctx);
+
+ ~TDescribePartitionActor() = default;
+ void Bootstrap(const NActors::TActorContext& ctx);
+
+ void StateWork(TAutoPtr<IEventHandle>& ev);
+
+ void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode,
+ const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) override;
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override;
+ void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override;
+ void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) override;
+ bool ApplyResponse(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) override;
+
+ virtual void Reply(const TActorContext& ctx) override;
+
+private:
+ TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo;
+ Ydb::Topic::DescribePartitionResult Result;
+};
class TAddReadRuleActor : public TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest>
, public TCdcStreamCompatible
@@ -173,7 +274,7 @@ class TAddReadRuleActor : public TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddR
public:
TAddReadRuleActor(NKikimr::NGRpcService::TEvPQAddReadRuleRequest *request);
- void Bootstrap(const NActors::TActorContext &ctx);
+ void Bootstrap(const NActors::TActorContext& ctx);
void ModifyPersqueueConfig(const TActorContext& ctx,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
@@ -197,7 +298,7 @@ public:
class TPQCreateTopicActor : public TPQGrpcSchemaBase<TPQCreateTopicActor, NKikimr::NGRpcService::TEvPQCreateTopicRequest> {
-using TBase = TPQGrpcSchemaBase<TPQCreateTopicActor, TEvPQCreateTopicRequest>;
+ using TBase = TPQGrpcSchemaBase<TPQCreateTopicActor, TEvPQCreateTopicRequest>;
public:
TPQCreateTopicActor(NKikimr::NGRpcService::TEvPQCreateTopicRequest* request, const TString& localCluster, const TVector<TString>& clusters);
@@ -208,7 +309,7 @@ public:
void Bootstrap(const NActors::TActorContext& ctx);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx){ Y_UNUSED(ev); Y_UNUSED(ctx); }
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); }
private:
TString LocalCluster;
@@ -217,7 +318,7 @@ private:
class TCreateTopicActor : public TPQGrpcSchemaBase<TCreateTopicActor, NKikimr::NGRpcService::TEvCreateTopicRequest> {
-using TBase = TPQGrpcSchemaBase<TCreateTopicActor, TEvCreateTopicRequest>;
+ using TBase = TPQGrpcSchemaBase<TCreateTopicActor, TEvCreateTopicRequest>;
public:
TCreateTopicActor(NKikimr::NGRpcService::TEvCreateTopicRequest* request, const TString& localCluster, const TVector<TString>& clusters);
@@ -229,7 +330,7 @@ public:
void Bootstrap(const NActors::TActorContext& ctx);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx){ Y_UNUSED(ev); Y_UNUSED(ctx); }
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); }
private:
TString LocalCluster;
@@ -238,7 +339,7 @@ private:
class TPQAlterTopicActor : public TPQGrpcSchemaBase<TPQAlterTopicActor, NKikimr::NGRpcService::TEvPQAlterTopicRequest> {
-using TBase = TPQGrpcSchemaBase<TPQAlterTopicActor, TEvPQAlterTopicRequest>;
+ using TBase = TPQGrpcSchemaBase<TPQAlterTopicActor, TEvPQAlterTopicRequest>;
public:
TPQAlterTopicActor(NKikimr::NGRpcService::TEvPQAlterTopicRequest* request, const TString& localCluster);
@@ -249,7 +350,7 @@ public:
void Bootstrap(const NActors::TActorContext& ctx);
- void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx){ Y_UNUSED(ev); Y_UNUSED(ctx); }
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); }
private:
TString LocalCluster;
@@ -265,7 +366,7 @@ public:
TAlterTopicActor(NKikimr::NGRpcService::TEvAlterTopicRequest *request);
TAlterTopicActor(NKikimr::NGRpcService::IRequestOpCtx* request);
- void Bootstrap(const NActors::TActorContext &ctx);
+ void Bootstrap(const NActors::TActorContext& ctx);
void ModifyPersqueueConfig(const TActorContext& ctx,
NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig,
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
@@ -273,4 +374,49 @@ public:
};
-}
+class TPartitionsLocationActor : public TPQInternalSchemaActor<TPartitionsLocationActor,
+ TGetPartitionsLocationRequest,
+ TEvPQProxy::TEvPartitionLocationResponse>
+ , public TDescribeTopicActorImpl {
+
+using TBase = TPQInternalSchemaActor<TPartitionsLocationActor, TGetPartitionsLocationRequest,
+ TEvPQProxy::TEvPartitionLocationResponse>;
+
+public:
+ TPartitionsLocationActor(const TGetPartitionsLocationRequest& request, const TActorId& requester);
+
+ ~TPartitionsLocationActor() = default;
+
+ void Bootstrap(const NActors::TActorContext& ctx) override;
+
+ void StateWork(TAutoPtr<IEventHandle>& ev);
+
+ void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override;
+ void ApplyResponse(TTabletInfo&,
+ NKikimr::TEvPersQueue::TEvStatusResponse::TPtr&,
+ const TActorContext&) override {
+ Y_FAIL();
+ }
+ virtual void ApplyResponse(TTabletInfo&, TEvPersQueue::TEvReadSessionsInfoResponse::TPtr&,
+ const TActorContext&) override {
+ Y_FAIL();
+ }
+
+ void Finalize();
+
+ bool ApplyResponse(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) override;
+ void Reply(const TActorContext&) override {};
+
+ void Handle(NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev);
+ void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext&) override;
+private:
+ void SendNodesRequest() const;
+
+ NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr NodesInfoEv;
+ THashSet<ui64> PartitionIds;
+
+ bool GotPartitions = false;
+ bool GotNodesInfo = false;
+};
+
+} // namespace NKikimr::NGRpcProxy::V1
diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.cpp b/ydb/services/persqueue_v1/grpc_pq_schema.cpp
index dae9a44de5e..95693c0aad1 100644
--- a/ydb/services/persqueue_v1/grpc_pq_schema.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_schema.cpp
@@ -141,6 +141,11 @@ void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest:
ctx.Register(new TDescribeConsumerActor(ev->Release().Release()));
}
+void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx) {
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Describe partition request");
+ ctx.Register(new TDescribePartitionActor(ev->Release().Release()));
+}
+
}
namespace NKikimr {
@@ -178,6 +183,10 @@ void TGRpcRequestProxyHandleMethods::Handle(TEvDescribeTopicRequest::TPtr& ev, c
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
+void TGRpcRequestProxyHandleMethods::Handle(TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx) {
+ ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
+}
+
void NKikimr::NGRpcService::TGRpcRequestProxyHandleMethods::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx) {
ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release());
}
@@ -201,6 +210,7 @@ void TGRpcRequestProxyHandleMethods::Handle(TEvPQRemoveReadRuleRequest::TPtr& ev
DECLARE_RPC(DescribeTopic);
DECLARE_RPC(DescribeConsumer);
+DECLARE_RPC(DescribePartition);
#undef DECLARE_RPC
diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.h b/ydb/services/persqueue_v1/grpc_pq_schema.h
index 58ba33eb587..4fdba0955a1 100644
--- a/ydb/services/persqueue_v1/grpc_pq_schema.h
+++ b/ydb/services/persqueue_v1/grpc_pq_schema.h
@@ -42,6 +42,7 @@ private:
HFunc(NKikimr::NGRpcService::TEvAlterTopicRequest, Handle);
HFunc(NKikimr::NGRpcService::TEvDescribeTopicRequest, Handle);
HFunc(NKikimr::NGRpcService::TEvDescribeConsumerRequest, Handle);
+ HFunc(NKikimr::NGRpcService::TEvDescribePartitionRequest, Handle);
hFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle);
}
}
@@ -58,6 +59,7 @@ private:
void Handle(NKikimr::NGRpcService::TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx);
void Handle(NKikimr::NGRpcService::TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx);
void Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx);
+ void Handle(NKikimr::NGRpcService::TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx);
void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev);
diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp
index a74b98de949..42d1a31946e 100644
--- a/ydb/services/persqueue_v1/topic.cpp
+++ b/ydb/services/persqueue_v1/topic.cpp
@@ -115,6 +115,9 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
ADD_REQUEST(DescribeConsumer, TopicService, DescribeConsumerRequest, DescribeConsumerResponse, {
ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeConsumerRequest(ctx, IsRlAllowed()));
})
+ ADD_REQUEST(DescribePartition, TopicService, DescribePartitionRequest, DescribePartitionResponse, {
+ ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribePartitionRequest(ctx, IsRlAllowed()));
+ })
#undef ADD_REQUEST
#ifdef ADD_REQUEST_LIMIT
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt
index 872b8eb58f9..fc7ec1c7c30 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(describes_ut)
add_subdirectory(new_schemecache_ut)
add_executable(ydb-services-persqueue_v1-ut)
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
index ca936d0636a..7e21a17b083 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(describes_ut)
add_subdirectory(new_schemecache_ut)
add_executable(ydb-services-persqueue_v1-ut)
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt
index 7a009a2e332..657b3aae640 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(describes_ut)
add_subdirectory(new_schemecache_ut)
add_executable(ydb-services-persqueue_v1-ut)
diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt
index 7f2067278e5..3e155ee8ccf 100644
--- a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(describes_ut)
add_subdirectory(new_schemecache_ut)
add_executable(ydb-services-persqueue_v1-ut)
diff --git a/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 00000000000..2933016185f
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,81 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-persqueue_v1-ut-describes_ut)
+target_compile_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1
+)
+target_link_libraries(ydb-services-persqueue_v1-ut-describes_ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-persqueue_v1
+ core-testlib-default
+ core-client-server
+ ydb_persqueue_core-ut-ut_utils
+)
+target_link_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-services-persqueue_v1-ut-describes_ut
+ TEST_TARGET
+ ydb-services-persqueue_v1-ut-describes_ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ TIMEOUT
+ 300
+)
+target_allocator(ydb-services-persqueue_v1-ut-describes_ut
+ system_allocator
+)
+vcs_info(ydb-services-persqueue_v1-ut-describes_ut)
diff --git a/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-aarch64.txt
new file mode 100644
index 00000000000..733eac0df59
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,84 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-persqueue_v1-ut-describes_ut)
+target_compile_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1
+)
+target_link_libraries(ydb-services-persqueue_v1-ut-describes_ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-testing-unittest_main
+ ydb-services-persqueue_v1
+ core-testlib-default
+ core-client-server
+ ydb_persqueue_core-ut-ut_utils
+)
+target_link_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-services-persqueue_v1-ut-describes_ut
+ TEST_TARGET
+ ydb-services-persqueue_v1-ut-describes_ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ TIMEOUT
+ 300
+)
+target_allocator(ydb-services-persqueue_v1-ut-describes_ut
+ cpp-malloc-jemalloc
+)
+vcs_info(ydb-services-persqueue_v1-ut-describes_ut)
diff --git a/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-x86_64.txt
new file mode 100644
index 00000000000..fb655893e2f
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,86 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-persqueue_v1-ut-describes_ut)
+target_compile_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1
+)
+target_link_libraries(ydb-services-persqueue_v1-ut-describes_ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-persqueue_v1
+ core-testlib-default
+ core-client-server
+ ydb_persqueue_core-ut-ut_utils
+)
+target_link_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-services-persqueue_v1-ut-describes_ut
+ TEST_TARGET
+ ydb-services-persqueue_v1-ut-describes_ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ TIMEOUT
+ 300
+)
+target_allocator(ydb-services-persqueue_v1-ut-describes_ut
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(ydb-services-persqueue_v1-ut-describes_ut)
diff --git a/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.txt b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.txt
new file mode 100644
index 00000000000..f8b31df0c11
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.windows-x86_64.txt
new file mode 100644
index 00000000000..311d4cc3736
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,74 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-persqueue_v1-ut-describes_ut)
+target_compile_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1
+)
+target_link_libraries(ydb-services-persqueue_v1-ut-describes_ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-persqueue_v1
+ core-testlib-default
+ core-client-server
+ ydb_persqueue_core-ut-ut_utils
+)
+target_sources(ydb-services-persqueue_v1-ut-describes_ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-services-persqueue_v1-ut-describes_ut
+ TEST_TARGET
+ ydb-services-persqueue_v1-ut-describes_ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-persqueue_v1-ut-describes_ut
+ PROPERTY
+ TIMEOUT
+ 300
+)
+target_allocator(ydb-services-persqueue_v1-ut-describes_ut
+ system_allocator
+)
+vcs_info(ydb-services-persqueue_v1-ut-describes_ut)
diff --git a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
new file mode 100644
index 00000000000..382aee6829f
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp
@@ -0,0 +1,299 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <ydb/core/client/server/ic_nodes_cache_service.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
+#include <ydb/services/persqueue_v1/ut/test_utils.h>
+#include <ydb/services/persqueue_v1/actors/schema_actors.h>
+#include <ydb/core/client/server/ic_nodes_cache_service.h>
+
+
+namespace NKikimr::NPersQueueTests {
+
+using namespace NKikimr::NGRpcProxy::V1;
+using namespace NIcNodeCache;
+
+const static TString topicName = "rt3.dc1--topic-x";
+
+class TDescribeTestServer {
+public:
+ NPersQueue::TTestServer Server;
+ bool UseBadTopic = false;
+
+ TDescribeTestServer(ui32 partsCount = 15)
+ : Server()
+ , PartsCount(partsCount)
+ {
+ Server.EnableLogs({ NKikimrServices::KQP_PROXY }, NActors::NLog::PRI_EMERG);
+ Server.EnableLogs({ NKikimrServices::PERSQUEUE, NKikimrServices::PQ_METACACHE }, NActors::NLog::PRI_INFO);
+ Server.EnableLogs({ NKikimrServices::PERSQUEUE_CLUSTER_TRACKER }, NActors::NLog::PRI_INFO);
+
+ Server.AnnoyingClient->CreateTopicNoLegacy(topicName, partsCount);
+ Channel = grpc::CreateChannel(
+ "localhost:" + ToString(Server.GrpcPort), grpc::InsecureChannelCredentials()
+ );
+ Stub = Ydb::Topic::V1::TopicService::NewStub(Channel);
+
+ }
+
+ TTestActorRuntime* GetRuntime() const {
+ return Server.CleverServer->GetRuntime();
+ }
+ bool DescribePartition(ui32 partId, bool askLocation, bool askStats, bool mayFail = false) {
+ grpc::ClientContext rcontext;
+ Ydb::Topic::DescribePartitionRequest request;
+ Ydb::Topic::DescribePartitionResponse response;
+ Ydb::Topic::DescribePartitionResult result;
+ request.set_path(JoinPath({"/Root/PQ/", UseBadTopic ? "bad-topic" : topicName}));
+ request.set_partition_id(partId);
+ if (askLocation)
+ request.set_include_location(true);
+ if (askStats)
+ request.set_include_stats(true);
+
+ Stub->DescribePartition(&rcontext, request, &response);
+ Cerr << "Got response: " << response.DebugString() << Endl;
+ if (UseBadTopic) {
+ UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SCHEME_ERROR);
+ return true;
+ }
+ if (partId < PartsCount) {
+ UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SUCCESS);
+ } else {
+ UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::BAD_REQUEST);
+ NYql::TIssues opIssues;
+ NYql::IssuesFromMessage(response.operation().issues(), opIssues);
+ TString expectedError= TStringBuilder() << "No partition " << partId << " in topic";
+ UNIT_ASSERT(opIssues.ToOneLineString().Contains(expectedError));
+ return true;
+ }
+
+ auto unpackOk = response.operation().result().UnpackTo(&result);
+ UNIT_ASSERT(unpackOk);
+
+ if (askStats) {
+ UNIT_ASSERT(result.partition().has_partition_stats());
+ }
+ UNIT_ASSERT_VALUES_EQUAL(result.partition().partition_id(), partId);
+ if (askLocation) {
+ UNIT_ASSERT(result.partition().has_partition_location());
+ UNIT_ASSERT(result.partition().partition_location().node_id() > 0);
+ auto genComparizon = (result.partition().partition_location().generation() > 0);
+ if (mayFail) {
+ return genComparizon;
+ } else {
+ UNIT_ASSERT_C(genComparizon, response.DebugString().c_str());
+ }
+ }
+ return true;
+ }
+ void DescribeTopic(bool askStats, bool askLocation) {
+ grpc::ClientContext rcontext;
+ Ydb::Topic::DescribeTopicRequest request;
+ Ydb::Topic::DescribeTopicResponse response;
+ Ydb::Topic::DescribeTopicResult result;
+ request.set_path(JoinPath({"/Root/PQ/", UseBadTopic ? "bad-topic" : topicName}));
+ if (askStats)
+ request.set_include_stats(true);
+ if (askLocation)
+ request.set_include_location(true);
+
+ Stub->DescribeTopic(&rcontext, request, &response);
+ Cerr << "Got response: " << response.DebugString() << Endl;
+
+ if (UseBadTopic) {
+ UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SCHEME_ERROR);
+ return;
+ }
+ UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SUCCESS);
+
+ auto unpackOk = response.operation().result().UnpackTo(&result);
+ UNIT_ASSERT(unpackOk);
+ if (askStats) {
+ UNIT_ASSERT(result.has_topic_stats());
+ } else {
+ UNIT_ASSERT(!result.has_topic_stats());
+ }
+ UNIT_ASSERT_VALUES_EQUAL(result.partitions_size(), PartsCount);
+ for (const auto& p : result.partitions()) {
+ UNIT_ASSERT(askStats == p.has_partition_stats());
+ UNIT_ASSERT(askLocation == p.has_partition_location());
+ if (askLocation) {
+ UNIT_ASSERT(p.partition_location().node_id() > 0);
+ }
+ }
+ }
+ void DescribeConsumer(const TString& consumerName, bool askStats, bool askLocation) {
+ grpc::ClientContext rcontext;
+ Ydb::Topic::DescribeConsumerRequest request;
+ Ydb::Topic::DescribeConsumerResponse response;
+ Ydb::Topic::DescribeConsumerResult result;
+ request.set_path(JoinPath({"/Root/PQ/", UseBadTopic ? "bad-topic" : topicName}));
+ if (askStats)
+ request.set_include_stats(true);
+ if (askLocation)
+ request.set_include_location(true);
+ request.set_consumer(consumerName);
+ Stub->DescribeConsumer(&rcontext, request, &response);
+ Cerr << "Got response: " << response.DebugString() << Endl;
+
+ if (UseBadTopic) {
+ UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SCHEME_ERROR);
+ return;
+ }
+ UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SUCCESS);
+
+ auto unpackOk = response.operation().result().UnpackTo(&result);
+ UNIT_ASSERT(unpackOk);
+ UNIT_ASSERT_VALUES_EQUAL(result.partitions_size(), PartsCount);
+ for (const auto& p : result.partitions()) {
+ UNIT_ASSERT(askStats == p.has_partition_stats());
+ UNIT_ASSERT(askStats == p.has_partition_consumer_stats());
+ UNIT_ASSERT(askLocation == p.has_partition_location());
+ if (askLocation) {
+ UNIT_ASSERT(p.partition_location().node_id() > 0);
+ }
+ }
+ }
+ void AddConsumer(const TString& consumer) {
+ Ydb::Topic::AlterTopicRequest request;
+ request.set_path(TStringBuilder() << "/Root/PQ/" << topicName);
+
+ auto addConsumer = request.add_add_consumers();
+ addConsumer->set_name(consumer);
+ addConsumer->set_important(true);
+ grpc::ClientContext rcontext;
+ Ydb::Topic::AlterTopicResponse response;
+ Stub->AlterTopic(&rcontext, request, &response);
+ UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SUCCESS);
+ }
+
+private:
+ std::shared_ptr<grpc::Channel> Channel;
+ std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> Stub;
+ ui32 PartsCount;
+};
+
+Y_UNIT_TEST_SUITE(TTopicApiDescribes) {
+ void KillTopicTablets(NPersQueue::TTestServer& server, const TString& topicName) {
+ auto pqGroup = server.AnnoyingClient->Ls(TString("/Root/PQ/" + topicName))->Record.GetPathDescription()
+ .GetPersQueueGroup();
+
+ THashSet<ui64> tablets;
+ for (const auto& p : pqGroup.GetPartitions()) {
+ auto res = tablets.insert(p.GetTabletId());
+ if (res.second) {
+ server.AnnoyingClient->KillTablet(*server.CleverServer, p.GetTabletId());
+ }
+ }
+ server.CleverServer->GetRuntime()->DispatchEvents();
+ }
+
+ Y_UNIT_TEST(GetLocalDescribe) {
+ TDescribeTestServer server{};
+ auto* runtime = server.GetRuntime();
+
+ runtime->GetAppData().FeatureFlags.SetEnableIcNodeCache(true);
+ runtime->SetUseRealInterconnect();
+ const auto edge = runtime->AllocateEdgeActor();
+
+ TString currentTopicName = topicName;
+ auto getDescribe = [&] (const TVector<ui32>& parts, bool fails = false) {
+ auto partitionActor = runtime->Register(new TPartitionsLocationActor(
+ TGetPartitionsLocationRequest{TString("/Root/PQ/") + currentTopicName, "", "", parts}, edge
+ ));
+ runtime->EnableScheduleForActor(partitionActor);
+ runtime->DispatchEvents();
+ auto ev = runtime->GrabEdgeEvent<TEvPQProxy::TEvPartitionLocationResponse>();
+ if (currentTopicName != topicName) {
+ UNIT_ASSERT_VALUES_EQUAL(ev->Status, Ydb::StatusIds::SCHEME_ERROR);
+ return ev;
+ }
+ if (fails) {
+ UNIT_ASSERT_VALUES_EQUAL(ev->Status, Ydb::StatusIds::BAD_REQUEST);
+ return ev;
+ }
+ UNIT_ASSERT_C(ev->Status == Ydb::StatusIds::SUCCESS, ev->Issues.ToString());
+ UNIT_ASSERT_VALUES_EQUAL(ev->Partitions.size(), parts ? parts.size() : 15);
+ return ev;
+
+ };
+
+ auto ev = getDescribe({});
+
+ THashSet<ui64> allParts;
+ for (const auto& p : ev->Partitions) {
+ UNIT_ASSERT(!p.Hostname.Empty());
+ UNIT_ASSERT(p.NodeId > 0);
+ UNIT_ASSERT(p.IncGeneration > 0);
+ UNIT_ASSERT(p.PartitionId < 15);
+ allParts.insert(p.PartitionId);
+ }
+ UNIT_ASSERT_VALUES_EQUAL(allParts.size(), 15);
+
+ ev = getDescribe({1, 3, 5});
+ allParts.clear();
+ for (const auto& p : ev->Partitions) {
+ auto res = allParts.insert(p.PartitionId);
+ UNIT_ASSERT(res.second);
+ UNIT_ASSERT(p.PartitionId == 1 || p.PartitionId == 3 || p.PartitionId == 5);
+ }
+
+ getDescribe({1000}, true);
+ currentTopicName = "bad-topic";
+ getDescribe({}, true);
+ }
+ Y_UNIT_TEST(GetPartitionDescribe) {
+ ui32 partsCount = 15;
+ TDescribeTestServer server(partsCount);
+ auto* runtime = server.GetRuntime();
+ runtime->SetRegistrationObserverFunc(
+ [&](TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) {
+ runtime.EnableScheduleForActor(parentId);
+ runtime.EnableScheduleForActor(actorId);
+ }
+ );
+
+ server.DescribePartition(150, true, false);
+ server.DescribePartition(2, false, true);
+ server.DescribePartition(0, false, false);
+ KillTopicTablets(server.Server, topicName);
+ bool checkRes = server.DescribePartition(1, true, false, true);
+
+ if (!checkRes) {
+ KillTopicTablets(server.Server, topicName);
+ server.DescribePartition(1, true, false);
+ }
+ server.DescribePartition(3, true, true);
+ server.UseBadTopic = true;
+ server.DescribePartition(0, true, true);
+
+ }
+ Y_UNIT_TEST(DescribeTopic) {
+ auto server = TDescribeTestServer();
+ Cerr << "Describe topic with stats and location\n";
+ server.DescribeTopic(true, true);
+ KillTopicTablets(server.Server, topicName);
+ Cerr << "Describe topic with stats\n";
+ server.DescribeTopic(true, false);
+ Cerr << "Describe topic with location\n";
+ server.DescribeTopic(false, true);
+ Cerr << "Describe topic with no stats or location\n";
+ server.DescribeTopic(false, false);
+ server.UseBadTopic = true;
+ Cerr << "Describe bad topic\n";
+ server.DescribeTopic(true, true);
+ }
+ Y_UNIT_TEST(DescribeConsumer) {
+ auto server = TDescribeTestServer();
+ server.AddConsumer("my-consumer");
+ server.DescribeConsumer("my-consumer", true, true);
+ KillTopicTablets(server.Server, topicName);
+ server.DescribeConsumer("my-consumer",true, false);
+ server.DescribeConsumer("my-consumer",false, true);
+ server.DescribeConsumer("my-consumer",false, false);
+ server.UseBadTopic = true;
+ server.DescribeConsumer("my-consumer",true, true);
+ }
+}
+
+} // namespace
diff --git a/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp b/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp
new file mode 100644
index 00000000000..5fbd9a4723f
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp
@@ -0,0 +1,35 @@
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <ydb/core/client/server/ic_nodes_cache_service.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h>
+#include <ydb/services/persqueue_v1/ut/test_utils.h>
+
+
+namespace NKikimr::NPersQueueTests {
+
+using namespace NIcNodeCache;
+
+Y_UNIT_TEST_SUITE(TIcNodeCache) {
+ Y_UNIT_TEST(GetNodesInfoTest) {
+ NPersQueue::TTestServer server;
+ auto* runtime = server.CleverServer->GetRuntime();
+ runtime->GetAppData().FeatureFlags.SetEnableIcNodeCache(true);
+
+ const auto edge = runtime->AllocateEdgeActor();
+ runtime->RegisterService(CreateICNodesInfoCacheServiceId(), runtime->Register(
+ CreateICNodesInfoCacheService(nullptr, TDuration::Seconds(1)))
+ );
+ auto makeRequest = [&]() {
+ auto* request = new TEvICNodesInfoCache::TEvGetAllNodesInfoRequest();
+ runtime->Send(CreateICNodesInfoCacheServiceId(), edge, request);
+ auto ev = runtime->GrabEdgeEvent<TEvICNodesInfoCache::TEvGetAllNodesInfoResponse>();
+ UNIT_ASSERT_VALUES_EQUAL(ev->Nodes->size(), 2);
+ };
+ makeRequest();
+ for (auto i = 0u; i < 6; i++) {
+ Sleep(TDuration::MilliSeconds(500));
+ makeRequest();
+ }
+ }
+};
+} // NKikimr::NPersQueueTests \ No newline at end of file
diff --git a/ydb/services/persqueue_v1/ut/describes_ut/ya.make b/ydb/services/persqueue_v1/ut/describes_ut/ya.make
new file mode 100644
index 00000000000..94f0ad74cde
--- /dev/null
+++ b/ydb/services/persqueue_v1/ut/describes_ut/ya.make
@@ -0,0 +1,29 @@
+UNITTEST_FOR(ydb/services/persqueue_v1)
+
+FORK_SUBTESTS()
+
+IF (SANITIZER_TYPE OR WITH_VALGRIND)
+ TIMEOUT(1200)
+ SIZE(LARGE)
+ TAG(ya:fat)
+ REQUIREMENTS(ram:32)
+ELSE()
+ TIMEOUT(300)
+ SIZE(MEDIUM)
+ENDIF()
+
+SRCS(
+ ic_cache_ut.cpp
+ describe_topic_ut.cpp
+)
+
+PEERDIR(
+ ydb/core/testlib/default
+ ydb/core/client/server
+ ydb/services/persqueue_v1
+ ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/services/persqueue_v1/ya.make b/ydb/services/persqueue_v1/ya.make
index aeb1026c03a..f88136af9cd 100644
--- a/ydb/services/persqueue_v1/ya.make
+++ b/ydb/services/persqueue_v1/ya.make
@@ -46,4 +46,5 @@ RECURSE(
RECURSE_FOR_TESTS(
ut
ut/new_schemecache_ut
+ ut/describes_ut
)