diff options
author | komels <komels@ydb.tech> | 2023-07-24 15:28:58 +0300 |
---|---|---|
committer | komels <komels@ydb.tech> | 2023-07-24 15:28:58 +0300 |
commit | c3f6b5b8a4c8e976c40f36f3ae4e36011ed24ab2 (patch) | |
tree | aeabb07660024aa08aa454f9cf2fc59da21f8ba9 | |
parent | 477d878e00267a8534b445571b1b2e73ea1435b8 (diff) | |
download | ydb-c3f6b5b8a4c8e976c40f36f3ae4e36011ed24ab2.tar.gz |
Describe topic with partlocations
Design doc:
https://wiki.yandex-team.ru/users/komels/kafka-topicmeta-design/#implementacija
57 files changed, 2202 insertions, 319 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index ce0ee341e9d..59686ca51a8 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -161,7 +161,8 @@ struct TKikimrEvents : TEvents { ES_DISCOVERY, ES_EXT_INDEX, ES_CONVEYOR, - ES_KQP_SCAN_EXCHANGE + ES_KQP_SCAN_EXCHANGE, + ES_IC_NODE_CACHE }; }; diff --git a/ydb/core/base/tablet_pipe.h b/ydb/core/base/tablet_pipe.h index 8eb63331a4e..d92d4a32ccf 100644 --- a/ydb/core/base/tablet_pipe.h +++ b/ydb/core/base/tablet_pipe.h @@ -54,7 +54,7 @@ namespace NKikimr { struct TEvConnectResult : public TEventPB<TEvConnectResult, NKikimrTabletPipe::TEvConnectResult, EvConnectResult> { TEvConnectResult() {} - TEvConnectResult(NKikimrProto::EReplyStatus status, ui64 tabletId, const TActorId& clientId, const TActorId& serverId, bool leader) + TEvConnectResult(NKikimrProto::EReplyStatus status, ui64 tabletId, const TActorId& clientId, const TActorId& serverId, bool leader, ui32 generation) { Record.SetStatus(status); Record.SetTabletId(tabletId); @@ -62,6 +62,7 @@ namespace NKikimr { ActorIdToProto(serverId, Record.MutableServerId()); Record.SetLeader(leader); Record.SetSupportsDataInPayload(true); + Record.SetGeneration(generation); } }; @@ -120,13 +121,14 @@ namespace NKikimr { }; struct TEvClientConnected : public TEventLocal<TEvClientConnected, EvClientConnected> { - TEvClientConnected(ui64 tabletId, NKikimrProto::EReplyStatus status, const TActorId& clientId, const TActorId& serverId, bool leader, bool dead) + TEvClientConnected(ui64 tabletId, NKikimrProto::EReplyStatus status, const TActorId& clientId, const TActorId& serverId, bool leader, bool dead, ui64 generation) : TabletId(tabletId) , Status(status) , ClientId(clientId) , ServerId(serverId) , Leader(leader) , Dead(dead) + , Generation(generation) {} const ui64 TabletId; @@ -135,6 +137,7 @@ namespace NKikimr { const TActorId ServerId; const bool Leader; const bool Dead; + const ui64 Generation; }; struct TEvServerConnected : public TEventLocal<TEvServerConnected, EvServerConnected> { @@ -200,17 +203,20 @@ namespace NKikimr { }; struct TEvActivate : public TEventLocal<TEvActivate, EvActivate> { - TEvActivate(ui64 tabletId, const TActorId& ownerId, const TActorId& recipientId, bool leader) + TEvActivate(ui64 tabletId, const TActorId& ownerId, const TActorId& recipientId, bool leader, ui32 generation) : TabletId(tabletId) , OwnerId(ownerId) , RecipientId(recipientId) , Leader(leader) + , Generation(generation) + {} const ui64 TabletId; const TActorId OwnerId; const TActorId RecipientId; const bool Leader; + const ui32 Generation; }; struct TEvShutdown : public TEventLocal<TEvShutdown, EvShutdown> { @@ -287,8 +293,8 @@ namespace NKikimr { // Creates and activated server, returns serverId. // Created server will forward messages to the specified recipent. - virtual TActorId Accept(TEvTabletPipe::TEvConnect::TPtr &ev, - TActorIdentity owner, TActorId recipient, bool leader = true) = 0; + virtual TActorId Accept(TEvTabletPipe::TEvConnect::TPtr &ev, TActorIdentity owner, TActorId recipient, + bool leader = true, ui64 generation = 0) = 0; // Rejects connect with an error. virtual void Reject(TEvTabletPipe::TEvConnect::TPtr &ev, TActorIdentity owner, NKikimrProto::EReplyStatus status, bool leader = true) = 0; @@ -305,7 +311,7 @@ namespace NKikimr { // Activates all inactive servers, created by Enqueue. // All activated servers will forward messages to the specified recipent. - virtual void Activate(TActorIdentity owner, TActorId recipientId, bool leader = true) = 0; + virtual void Activate(TActorIdentity owner, TActorId recipientId, bool leader = true, ui64 generation = 0) = 0; // Cleanup resources after reset virtual void Erase(TEvTabletPipe::TEvServerDestroyed::TPtr &ev) = 0; @@ -393,7 +399,7 @@ namespace NKikimr { IActor* CreateServer(ui64 tabletId, const TActorId& clientId, const TActorId& interconnectSession, ui32 features, ui64 connectCookie); // Promotes server actor to the active state. - void ActivateServer(ui64 tabletId, TActorId serverId, TActorIdentity owner, TActorId recipientId, bool leader); + void ActivateServer(ui64 tabletId, TActorId serverId, TActorIdentity owner, TActorId recipientId, bool leader, ui64 generation = 0); // Destroys server actor. void CloseServer(TActorIdentity owner, TActorId serverId); diff --git a/ydb/core/client/server/CMakeLists.darwin-x86_64.txt b/ydb/core/client/server/CMakeLists.darwin-x86_64.txt index b620712f4dd..577a6ca7a50 100644 --- a/ydb/core/client/server/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/client/server/CMakeLists.darwin-x86_64.txt @@ -95,4 +95,5 @@ target_sources(core-client-server PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/client/server/msgbus_server_whoami.cpp ${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_server.cpp ${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_proxy_status.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/client/server/ic_nodes_cache_service.cpp ) diff --git a/ydb/core/client/server/CMakeLists.linux-aarch64.txt b/ydb/core/client/server/CMakeLists.linux-aarch64.txt index 36a40015b50..4343eef3ec3 100644 --- a/ydb/core/client/server/CMakeLists.linux-aarch64.txt +++ b/ydb/core/client/server/CMakeLists.linux-aarch64.txt @@ -96,4 +96,5 @@ target_sources(core-client-server PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/client/server/msgbus_server_whoami.cpp ${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_server.cpp ${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_proxy_status.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/client/server/ic_nodes_cache_service.cpp ) diff --git a/ydb/core/client/server/CMakeLists.linux-x86_64.txt b/ydb/core/client/server/CMakeLists.linux-x86_64.txt index 36a40015b50..4343eef3ec3 100644 --- a/ydb/core/client/server/CMakeLists.linux-x86_64.txt +++ b/ydb/core/client/server/CMakeLists.linux-x86_64.txt @@ -96,4 +96,5 @@ target_sources(core-client-server PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/client/server/msgbus_server_whoami.cpp ${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_server.cpp ${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_proxy_status.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/client/server/ic_nodes_cache_service.cpp ) diff --git a/ydb/core/client/server/CMakeLists.windows-x86_64.txt b/ydb/core/client/server/CMakeLists.windows-x86_64.txt index b620712f4dd..577a6ca7a50 100644 --- a/ydb/core/client/server/CMakeLists.windows-x86_64.txt +++ b/ydb/core/client/server/CMakeLists.windows-x86_64.txt @@ -95,4 +95,5 @@ target_sources(core-client-server PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/client/server/msgbus_server_whoami.cpp ${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_server.cpp ${CMAKE_SOURCE_DIR}/ydb/core/client/server/grpc_proxy_status.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/client/server/ic_nodes_cache_service.cpp ) diff --git a/ydb/core/client/server/ic_nodes_cache_service.cpp b/ydb/core/client/server/ic_nodes_cache_service.cpp new file mode 100644 index 00000000000..fadff0700ee --- /dev/null +++ b/ydb/core/client/server/ic_nodes_cache_service.cpp @@ -0,0 +1,116 @@ +#include "ic_nodes_cache_service.h" +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> + + +namespace NKikimr::NIcNodeCache { +using namespace NActors; + +class TIcNodeCacheServiceActor : public TActorBootstrapped<TIcNodeCacheServiceActor> { + using TBase = TActorBootstrapped<TIcNodeCacheServiceActor>; +public: + TIcNodeCacheServiceActor(TIcNodeCacheServiceActor&&) = default; + TIcNodeCacheServiceActor& operator=(TIcNodeCacheServiceActor&&) = default; + + TIcNodeCacheServiceActor(const ::NMonitoring::TDynamicCounterPtr& counters, + const TDuration& cacheUpdateInterval) + : Counters(counters) + , CacheUpdateInterval(cacheUpdateInterval) + {} + + +private: + void RequestNodes() { + if (InfoRequested) + return; + + const static TActorId nameserviceId = GetNameserviceActorId(); + InfoRequested = true; + this->ActorContext().Send(nameserviceId, new TEvInterconnect::TEvListNodes()); + } + + void HandleNodesInfo(TEvInterconnect::TEvNodesInfo::TPtr& ev) { + Y_VERIFY(InfoRequested); + InfoRequested = false; + NodesCache.reset(new TNodeInfoVector(ev->Get()->Nodes)); + NextUpdate = TActivationContext::Now() + CacheUpdateInterval; + NodeIdsMapping.reset(new THashMap<ui64, ui64>()); + + RespondToAllWaiters(); + } + + void RespondToAllWaiters() { + while (!WaitersOnInit.empty()) { + RespondNodesInfo(WaitersOnInit.front()); + WaitersOnInit.pop_front(); + } + } + + void BuildNodesMap() { + Y_VERIFY_DEBUG(NodeIdsMapping->empty()); + for (auto i = 0u; i < NodesCache->size(); i++) { + auto res = NodeIdsMapping->insert( + std::make_pair((*NodesCache)[i].NodeId, i) + ); + Y_VERIFY_DEBUG(res.second); + } + } + + void HandleNodesRequest(TEvICNodesInfoCache::TEvGetAllNodesInfoRequest::TPtr& ev) { + if (ActorContext().Now() > NextUpdate) { + RequestNodes(); + } + + if(!NodesCache) { + WaitersOnInit.emplace_back(ev->Sender); + return; + } else { + RespondNodesInfo(ev->Sender); + } + } + + void RespondNodesInfo(const TActorId& recipient) { + if (NodeIdsMapping->empty()) { + BuildNodesMap(); + } + auto* response = new TEvICNodesInfoCache::TEvGetAllNodesInfoResponse(NodesCache, NodeIdsMapping); + ActorContext().Send(recipient, response); + } + + void HandleWakeup() { + if (TActivationContext::Now() > NextUpdate) { + RequestNodes(); + } + ActorContext().Schedule(CacheUpdateInterval / 2, new TEvents::TEvWakeup()); + } + +public: + void Bootstrap(const TActorContext&) { + Become(&TIcNodeCacheServiceActor::StateFunc); + HandleWakeup(); + } + + STRICT_STFUNC(StateFunc, + hFunc(TEvInterconnect::TEvNodesInfo, HandleNodesInfo) + sFunc(TEvents::TEvWakeup, HandleWakeup) + hFunc(TEvICNodesInfoCache::TEvGetAllNodesInfoRequest, HandleNodesRequest) + ) + +private: + TNodeInfoVectorPtr NodesCache; + std::shared_ptr<THashMap<ui64, ui64>> NodeIdsMapping; + bool InfoRequested = false; + ::NMonitoring::TDynamicCounterPtr Counters; + TDuration CacheUpdateInterval; + TInstant NextUpdate = TInstant::Zero(); + + TDeque<TActorId> WaitersOnInit; + +}; + +NActors::IActor* CreateICNodesInfoCacheService( + const ::NMonitoring::TDynamicCounterPtr& counters, const TDuration& cacheUpdateTimeout +) { + return new TIcNodeCacheServiceActor(counters, cacheUpdateTimeout); +} +} // NKikimr::NIcNodeCache
\ No newline at end of file diff --git a/ydb/core/client/server/ic_nodes_cache_service.h b/ydb/core/client/server/ic_nodes_cache_service.h new file mode 100644 index 00000000000..f277ae95dbd --- /dev/null +++ b/ydb/core/client/server/ic_nodes_cache_service.h @@ -0,0 +1,46 @@ +#pragma once + +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/interconnect/interconnect.h> +#include <library/cpp/actors/core/event_local.h> +#include <ydb/core/base/events.h> + +namespace NKikimr::NIcNodeCache { + +NActors::IActor* CreateICNodesInfoCacheService(const ::NMonitoring::TDynamicCounterPtr& counters, + const TDuration& cacheUpdateTimeout = TDuration::Seconds(10)); + +inline NActors::TActorId CreateICNodesInfoCacheServiceId() { + return NActors::TActorId(0, "ICNodeCache"); +} + +using TNodeInfoVector = TVector<NActors::TEvInterconnect::TNodeInfo>; +using TNodeInfoVectorPtr = std::shared_ptr<TNodeInfoVector>; + +struct TEvICNodesInfoCache { + enum EEv { + EvWakeup = EventSpaceBegin(TKikimrEvents::ES_IC_NODE_CACHE), + EvGetAllNodesInfoRequest, + EvGetAllNodesInfoResponse, + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_IC_NODE_CACHE), + "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_IC_NODE_CACHE)"); + + + struct TEvGetAllNodesInfoRequest : public NActors::TEventLocal<TEvGetAllNodesInfoRequest, EvGetAllNodesInfoRequest> { + }; + + struct TEvGetAllNodesInfoResponse : public NActors::TEventLocal<TEvGetAllNodesInfoResponse, EvGetAllNodesInfoResponse> { + TNodeInfoVectorPtr Nodes; + std::shared_ptr<THashMap<ui64, ui64>> NodeIdsMapping; + + + TEvGetAllNodesInfoResponse(const TNodeInfoVectorPtr& nodesInfo, const std::shared_ptr<THashMap<ui64, ui64>>& nodeIdsMapping) + : Nodes(nodesInfo) + , NodeIdsMapping(nodeIdsMapping) + {} + }; +}; +} // NKikimr::NIcNodeCache
\ No newline at end of file diff --git a/ydb/core/client/server/ya.make b/ydb/core/client/server/ya.make index b02436d2863..01aa4ae53ac 100644 --- a/ydb/core/client/server/ya.make +++ b/ydb/core/client/server/ya.make @@ -56,6 +56,7 @@ SRCS( grpc_server.h grpc_proxy_status.h grpc_proxy_status.cpp + ic_nodes_cache_service.cpp ) PEERDIR( diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h index 75bfa640c46..1081f524793 100644 --- a/ydb/core/driver_lib/run/config.h +++ b/ydb/core/driver_lib/run/config.h @@ -74,6 +74,7 @@ union TBasicKikimrServicesMask { bool EnableCompConveyor : 1; bool EnableLocalPgWire:1; bool EnableKafkaProxy:1; + bool EnableIcNodeCacheService:1; }; ui64 Raw; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 6407a597df2..c790d982684 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -31,6 +31,7 @@ #include <ydb/core/client/minikql_compile/mkql_compile_service.h> #include <ydb/core/client/server/grpc_proxy_status.h> #include <ydb/core/client/server/msgbus_server_pq_metacache.h> +#include <ydb/core/client/server/ic_nodes_cache_service.h> #include <ydb/core/client/server/msgbus_server_tracer.h> #include <ydb/core/cms/cms.h> @@ -2627,5 +2628,22 @@ void TKafkaProxyServiceInitializer::InitializeServices(NActors::TActorSystemSetu } } + +TIcNodeCacheServiceInitializer::TIcNodeCacheServiceInitializer(const TKikimrRunConfig& runConfig) + : IKikimrServicesInitializer(runConfig) +{ +} + +void TIcNodeCacheServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { + if (appData->FeatureFlags.GetEnableIcNodeCache()) { + setup->LocalServices.emplace_back( + TActorId(), + TActorSetupCmd(NIcNodeCache::CreateICNodesInfoCacheService(appData->Counters), + TMailboxType::HTSwap, appData->UserPoolId) + ); + } +} + + } // namespace NKikimrServicesInitializers } // namespace NKikimr diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index 1b97906e253..cef99ac7088 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -578,5 +578,12 @@ public: void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; }; +class TIcNodeCacheServiceInitializer : public IKikimrServicesInitializer { +public: + TIcNodeCacheServiceInitializer(const TKikimrRunConfig& runConfig); + + void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; +}; + } // namespace NKikimrServicesInitializers } // namespace NKikimr diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 0bdb30a16f3..20263507e75 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1459,6 +1459,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TPersQueueClusterTrackerInitializer(runConfig)); } + if (serviceMask.EnableIcNodeCacheService) { + sil->AddServiceInitializer(new TIcNodeCacheServiceInitializer(runConfig)); + } + if (BusServer && serviceMask.EnableMessageBusServices) { sil->AddServiceInitializer(new TMessageBusServicesInitializer(runConfig, *BusServer)); } diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 7f3028da4e5..ac66702ed4d 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -137,6 +137,7 @@ struct TRpcServices { EvAlterTopic, EvDescribeTopic, EvDescribeConsumer, + EvDescribePartition, EvGetDiskSpaceUsage, EvStopServingDatabase, EvCoordinationSession, diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index 4b99b65dbf3..e10d52aaf60 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -570,6 +570,7 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev) { HFunc(TEvAlterTopicRequest, PreHandle); HFunc(TEvDescribeTopicRequest, PreHandle); HFunc(TEvDescribeConsumerRequest, PreHandle); + HFunc(TEvDescribePartitionRequest, PreHandle); HFunc(TEvNodeCheckRequest, PreHandle); HFunc(TEvProxyRuntimeEvent, PreHandle); diff --git a/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h b/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h index 000c9050312..d8fe632d1cb 100644 --- a/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h +++ b/ydb/core/grpc_services/grpc_request_proxy_handle_methods.h @@ -29,6 +29,7 @@ protected: static void Handle(TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx); static void Handle(TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx); static void Handle(TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx); + static void Handle(TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx); }; } diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index cb2fcc48894..cbeeaec420d 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -81,6 +81,7 @@ using TEvCreateTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvCrea using TEvAlterTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvAlterTopic, Ydb::Topic::AlterTopicRequest, Ydb::Topic::AlterTopicResponse, true, TRateLimiterMode::Rps>; using TEvDescribeTopicRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeTopic, Ydb::Topic::DescribeTopicRequest, Ydb::Topic::DescribeTopicResponse, true, TRateLimiterMode::Rps>; using TEvDescribeConsumerRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribeConsumer, Ydb::Topic::DescribeConsumerRequest, Ydb::Topic::DescribeConsumerResponse, true, TRateLimiterMode::Rps>; +using TEvDescribePartitionRequest = TGRpcRequestValidationWrapper<TRpcServices::EvDescribePartition, Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse, true, TRateLimiterMode::Rps>; using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>; using TEvListFederationDatabasesRequest = TGRpcRequestWrapper<TRpcServices::EvListFederationDatabases, Ydb::FederationDiscovery::ListFederationDatabasesRequest, Ydb::FederationDiscovery::ListFederationDatabasesResponse, true>; diff --git a/ydb/core/persqueue/events/global.h b/ydb/core/persqueue/events/global.h index 9b0e97ef120..e7b2a53c9db 100644 --- a/ydb/core/persqueue/events/global.h +++ b/ydb/core/persqueue/events/global.h @@ -47,6 +47,8 @@ struct TEvPersQueue { EvProposeTransactionResult, EvCancelTransactionProposal, EvPeriodicTopicStats, + EvGetPartitionsLocation, + EvGetPartitionsLocationResponse, EvResponse = EvRequest + 256, EvInternalEvents = EvResponse + 256, EvEnd @@ -83,11 +85,18 @@ struct TEvPersQueue { struct TEvGetReadSessionsInfo: public TEventPB<TEvGetReadSessionsInfo, NKikimrPQ::TGetReadSessionsInfo, EvGetReadSessionsInfo> { - TEvGetReadSessionsInfo(const TString& consumer = "") { + explicit TEvGetReadSessionsInfo(const TString& consumer = "") { if (!consumer.empty()) { Record.SetClientId(consumer); } } + explicit TEvGetReadSessionsInfo(const TVector<ui32>& partitions) { + if (!partitions.empty()) { + for (auto p: partitions) { + Record.AddPartitions(p); + } + } + } }; struct TEvReadSessionsInfoResponse: public TEventPB<TEvReadSessionsInfoResponse, @@ -95,6 +104,19 @@ struct TEvPersQueue { TEvReadSessionsInfoResponse() {} }; + struct TEvGetPartitionsLocation: public TEventPB<TEvGetPartitionsLocation, + NKikimrPQ::TGetPartitionsLocation, EvGetPartitionsLocation> { + TEvGetPartitionsLocation(const TVector<ui64>& partitionIds = {}) { + for (const auto& p : partitionIds) { + Record.AddPartitions(p); + } + } + }; + + struct TEvGetPartitionsLocationResponse: public TEventPB<TEvGetPartitionsLocationResponse, + NKikimrPQ::TPartitionsLocationResponse, EvGetPartitionsLocationResponse> { + }; + struct TEvLockPartition : public TEventPB<TEvLockPartition, NKikimrPQ::TLockPartition, EvLockPartition> { TEvLockPartition() {} diff --git a/ydb/core/persqueue/read_balancer.cpp b/ydb/core/persqueue/read_balancer.cpp index 52e4b074dda..d05a169ab63 100644 --- a/ydb/core/persqueue/read_balancer.cpp +++ b/ydb/core/persqueue/read_balancer.cpp @@ -663,24 +663,33 @@ void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx) { - RestartPipe(ev->Get()->TabletId, ctx); + ClosePipe(ev->Get()->TabletId, ctx); RequestTabletIfNeeded(ev->Get()->TabletId, ctx); } void TPersQueueReadBalancer::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { + auto tabletId = ev->Get()->TabletId; + PipesRequested.erase(tabletId); + if (ev->Get()->Status != NKikimrProto::OK) { - RestartPipe(ev->Get()->TabletId, ctx); + ClosePipe(ev->Get()->TabletId, ctx); RequestTabletIfNeeded(ev->Get()->TabletId, ctx); + } else { + auto it = TabletPipes.find(tabletId); + if (!it.IsEnd()) { + it->second.Generation = ev->Get()->Generation; + it->second.NodeId = ev->Get()->ServerId.NodeId(); + } } } -void TPersQueueReadBalancer::RestartPipe(const ui64 tabletId, const TActorContext& ctx) +void TPersQueueReadBalancer::ClosePipe(const ui64 tabletId, const TActorContext& ctx) { auto it = TabletPipes.find(tabletId); if (it != TabletPipes.end()) { - NTabletPipe::CloseClient(ctx, it->second); + NTabletPipe::CloseClient(ctx, it->second.PipeActor); TabletPipes.erase(it); } } @@ -692,9 +701,11 @@ TActorId TPersQueueReadBalancer::GetPipeClient(const ui64 tabletId, const TActor if (it == TabletPipes.end()) { NTabletPipe::TClientConfig clientConfig; pipeClient = ctx.RegisterWithSameMailbox(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig)); - TabletPipes[tabletId] = pipeClient; + TabletPipes[tabletId].PipeActor = pipeClient; + auto res = PipesRequested.insert(tabletId); + Y_VERIFY(res.second); } else { - pipeClient = it->second; + pipeClient = it->second.PipeActor; } return pipeClient; @@ -1163,11 +1174,17 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr& auto it = ClientsInfo.find(record.GetClientId()); THolder<TEvPersQueue::TEvReadSessionsInfoResponse> response(new TEvPersQueue::TEvReadSessionsInfoResponse()); + THashSet<ui32> partitionsRequested; + for (auto p : record.GetPartitions()) { + partitionsRequested.insert(p); + } response->Record.SetTabletId(TabletID()); if (it != ClientsInfo.end()) { for (auto& c : it->second.ClientGroupsInfo) { for (auto& p : c.second.PartitionsInfo) { + if (partitionsRequested && !partitionsRequested.contains(p.first)) + continue; auto pi = response->Record.AddPartitionInfo(); pi->SetPartition(p.first); if (p.second.State == EPS_ACTIVE) { @@ -1305,6 +1322,55 @@ void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvPartitionReleased::TPtr& ev cit->second.ScheduleBalance(ctx); } +void TPersQueueReadBalancer::HandleOnInit(TEvPersQueue::TEvGetPartitionsLocation::TPtr& ev, const TActorContext& ctx) { + auto* evResponse = new TEvPersQueue::TEvGetPartitionsLocationResponse(); + evResponse->Record.SetStatus(false); + ctx.Send(ev->Sender, evResponse); +} + + +void TPersQueueReadBalancer::Handle(TEvPersQueue::TEvGetPartitionsLocation::TPtr& ev, const TActorContext& ctx) { + auto* evResponse = new TEvPersQueue::TEvGetPartitionsLocationResponse(); + const auto& request = ev->Get()->Record; + auto addPartitionToResponse = [&](ui64 partitionId, ui64 tabletId) { + auto* pResponse = evResponse->Record.AddLocations(); + pResponse->SetPartitionId(partitionId); + if (PipesRequested.contains(tabletId)) { + return false; + } + auto iter = TabletPipes.find(tabletId); + if (iter.IsEnd()) { + GetPipeClient(tabletId, ctx); + return false; + } + pResponse->SetNodeId(iter->second.NodeId.GetRef()); + pResponse->SetGeneration(iter->second.Generation.GetRef()); + return true; + }; + auto sendResponse = [&](bool status) { + evResponse->Record.SetStatus(status); + ctx.Send(ev->Sender, evResponse); + }; + bool ok = true; + if (request.PartitionsSize() == 0) { + if (!PipesRequested.empty() || TabletPipes.size() < TabletsInfo.size()) { + // Do not have all pipes connected. + return sendResponse(false); + } + for (const auto& [partitionId, partitionInfo] : PartitionsInfo) { + ok = addPartitionToResponse(partitionId, partitionInfo.TabletId) && ok; + } + } else { + for (const auto& partitionInRequest : request.GetPartitions()) { + auto partitionInfoIter = PartitionsInfo.find(partitionInRequest); + if (partitionInfoIter.IsEnd()) { + return sendResponse(false); + } + ok = addPartitionToResponse(partitionInRequest, partitionInfoIter->second.TabletId) && ok; + } + } + return sendResponse(ok); +} void TPersQueueReadBalancer::RebuildStructs() { diff --git a/ydb/core/persqueue/read_balancer.h b/ydb/core/persqueue/read_balancer.h index 2813634fa25..33b3b465b30 100644 --- a/ydb/core/persqueue/read_balancer.h +++ b/ydb/core/persqueue/read_balancer.h @@ -201,7 +201,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa StopWatchingSubDomainPathId(); for (auto& pipe : TabletPipes) { - NTabletPipe::CloseClient(ctx, pipe.second); + NTabletPipe::CloseClient(ctx, pipe.second.PipeActor); } TabletPipes.clear(); TActor<TPersQueueReadBalancer>::Die(ctx); @@ -276,6 +276,9 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa void HandleOnInit(TEvPersQueue::TEvRegisterReadSession::TPtr &ev, const TActorContext& ctx); void Handle(TEvPersQueue::TEvRegisterReadSession::TPtr &ev, const TActorContext& ctx); + void HandleOnInit(TEvPersQueue::TEvGetPartitionsLocation::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPersQueue::TEvGetPartitionsLocation::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPersQueue::TEvGetReadSessionsInfo::TPtr &ev, const TActorContext& ctx); void Handle(TEvPersQueue::TEvCheckACL::TPtr&, const TActorContext&); void Handle(TEvPersQueue::TEvGetPartitionIdForWrite::TPtr&, const TActorContext&); @@ -293,7 +296,7 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa TActorId GetPipeClient(const ui64 tabletId, const TActorContext&); void RequestTabletIfNeeded(const ui64 tabletId, const TActorContext&); - void RestartPipe(const ui64 tabletId, const TActorContext&); + void ClosePipe(const ui64 tabletId, const TActorContext&); void CheckStat(const TActorContext&); void UpdateCounters(const TActorContext&); @@ -467,7 +470,14 @@ class TPersQueueReadBalancer : public TActor<TPersQueueReadBalancer>, public TTa NMetrics::TResourceMetrics *ResourceMetrics; - THashMap<ui64, TActorId> TabletPipes; + struct TPipeLocation { + TActorId PipeActor; + TMaybe<ui64> NodeId; + TMaybe<ui32> Generation; + }; + + THashMap<ui64, TPipeLocation> TabletPipes; + THashSet<ui64> PipesRequested; bool WaitingForACL; @@ -575,6 +585,7 @@ public: HFunc(TEvPersQueue::TEvGetPartitionIdForWrite, Handle); HFunc(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound, Handle); HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); + HFunc(TEvPersQueue::TEvGetPartitionsLocation, HandleOnInit); default: StateInitImpl(ev, SelfId()); break; @@ -607,6 +618,7 @@ public: HFunc(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound, Handle); HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); HFunc(TEvPersQueue::TEvStatus, Handle); + HFunc(TEvPersQueue::TEvGetPartitionsLocation, Handle); default: HandleDefaultEvents(ev, SelfId()); diff --git a/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt index dee2d1aef8c..0bc497fc0de 100644 --- a/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/persqueue/ut/CMakeLists.darwin-x86_64.txt @@ -57,6 +57,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp ${CMAKE_BINARY_DIR}/ydb/core/persqueue/ut/20fb34d39c23be7518eb4226481d815e.cpp ) set_property( diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt index adbb8735bcf..3b9bda923cb 100644 --- a/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/persqueue/ut/CMakeLists.linux-aarch64.txt @@ -60,6 +60,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp ${CMAKE_BINARY_DIR}/ydb/core/persqueue/ut/20fb34d39c23be7518eb4226481d815e.cpp ) set_property( diff --git a/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt index 2533fc4e246..11abf7b9f10 100644 --- a/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/core/persqueue/ut/CMakeLists.linux-x86_64.txt @@ -61,6 +61,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp ${CMAKE_BINARY_DIR}/ydb/core/persqueue/ut/20fb34d39c23be7518eb4226481d815e.cpp ) set_property( diff --git a/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt b/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt index 9b747296e7e..aceb9d8cfa1 100644 --- a/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/core/persqueue/ut/CMakeLists.windows-x86_64.txt @@ -50,6 +50,7 @@ target_sources(ydb-core-persqueue-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/sourceid_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/type_codecs_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/user_info_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/persqueue/ut/pqrb_describes_ut.cpp ${CMAKE_BINARY_DIR}/ydb/core/persqueue/ut/20fb34d39c23be7518eb4226481d815e.cpp ) set_property( diff --git a/ydb/core/persqueue/ut/pqrb_describes_ut.cpp b/ydb/core/persqueue/ut/pqrb_describes_ut.cpp new file mode 100644 index 00000000000..634f3ffe538 --- /dev/null +++ b/ydb/core/persqueue/ut/pqrb_describes_ut.cpp @@ -0,0 +1,79 @@ +#include <library/cpp/testing/unittest/registar.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h> +#include <ydb/core/persqueue/events/global.h> + +namespace NKikimr::NPQ { +using namespace NPersQueue; + +Y_UNIT_TEST_SUITE(TPQRBDescribes) { + + Y_UNIT_TEST(PartitionLocations) { + + NPersQueue::TTestServer server; + TString topicName = "rt3.dc1--topic"; + TString topicPath = TString("/Root/PQ/") + topicName; + ui32 totalPartitions = 5; + server.AnnoyingClient->CreateTopic(topicName, totalPartitions); + + auto pathDescr = server.AnnoyingClient->Ls(topicPath)->Record.GetPathDescription().GetSelf(); + + ui64 balancerTabletId = pathDescr.GetBalancerTabletID(); + auto* runtime = server.CleverServer->GetRuntime(); + const auto edge = runtime->AllocateEdgeActor(); + + auto checkResponse = [&](TEvPersQueue::TEvGetPartitionsLocation* request, bool ok, ui64 partitionsCount = 0) { + runtime->SendToPipe(balancerTabletId, edge, request); + auto ev = runtime->GrabEdgeEvent<TEvPersQueue::TEvGetPartitionsLocationResponse>(); + const auto& response = ev->Record; + Cerr << "response: " << response.DebugString(); + + UNIT_ASSERT(response.GetStatus() == ok); + if (!ok) { + return ev; + } + UNIT_ASSERT_VALUES_EQUAL(response.LocationsSize(), partitionsCount); + THashSet<ui32> partitionsFound; + for (const auto& partitionInResponse : response.GetLocations()) { + auto res = partitionsFound.insert(partitionInResponse.GetPartitionId()); + UNIT_ASSERT(res.second); + UNIT_ASSERT_LT(partitionInResponse.GetPartitionId(), totalPartitions); + UNIT_ASSERT(partitionInResponse.GetNodeId() > 0); + UNIT_ASSERT(partitionInResponse.GetGeneration() > 0); + } + return ev; + }; + auto pollBalancer = [&] (ui64 retriesCount) { + auto waitTime = TDuration::MilliSeconds(500); + while (retriesCount) { + auto* req = new TEvPersQueue::TEvGetPartitionsLocation(); + runtime->SendToPipe(balancerTabletId, edge, req); + auto ev = runtime->GrabEdgeEvent<TEvPersQueue::TEvGetPartitionsLocationResponse>(); + if (!ev->Record.GetStatus()) { + --retriesCount; + Sleep(waitTime); + waitTime *= 2; + } else { + return; + } + } + UNIT_FAIL("Could not get positive response from balancer"); + + }; + pollBalancer(5); + checkResponse(new TEvPersQueue::TEvGetPartitionsLocation(), true, totalPartitions); + { + auto* req = new TEvPersQueue::TEvGetPartitionsLocation(); + req->Record.AddPartitions(3); + auto resp = checkResponse(req, true, 1); + UNIT_ASSERT_VALUES_EQUAL(resp->Record.GetLocations(0).GetPartitionId(), 3); + } + { + auto* req = new TEvPersQueue::TEvGetPartitionsLocation(); + req->Record.AddPartitions(50); + checkResponse(req, false); + } + + } +}; + +}
\ No newline at end of file diff --git a/ydb/core/persqueue/ut/ya.make b/ydb/core/persqueue/ut/ya.make index 41413c75f82..a7327da1ff8 100644 --- a/ydb/core/persqueue/ut/ya.make +++ b/ydb/core/persqueue/ut/ya.make @@ -38,6 +38,7 @@ SRCS( sourceid_ut.cpp type_codecs_ut.cpp user_info_ut.cpp + pqrb_describes_ut.cpp ) RESOURCE( diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index dcdcc9bf849..29cc4977f61 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -816,6 +816,7 @@ message TFeatureFlags { optional bool EnableSubscriptionsInDiscovery = 98 [default = false]; optional bool EnableGetNodeLabels = 99 [default = false]; optional bool EnableTopicMessageMeta = 100 [default = false]; + optional bool EnableIcNodeCache = 101 [default = true]; } message THttpProxyConfig { diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index e92d51cf9bb..1290f87f5a8 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -485,6 +485,7 @@ message TRegisterReadSession { message TGetReadSessionsInfo { optional string ClientId = 1; + repeated uint32 Partitions = 2; } @@ -534,6 +535,22 @@ message TReadSessionsInfoResponse { repeated TReadSessionInfo ReadSessions = 3; } + +message TGetPartitionsLocation { + repeated uint64 Partitions = 1; +} + +message TPartitionLocation { + optional uint32 PartitionId = 1; + optional uint64 NodeId = 2; + optional uint32 Generation = 3; +}; + +message TPartitionsLocationResponse { + optional bool Status = 1; + repeated TPartitionLocation Locations = 2; +} + message TLockPartition { optional uint32 Partition = 1; optional uint64 TabletId = 2; diff --git a/ydb/core/protos/tablet_pipe.proto b/ydb/core/protos/tablet_pipe.proto index 1001b23b766..f7369f7d0dd 100644 --- a/ydb/core/protos/tablet_pipe.proto +++ b/ydb/core/protos/tablet_pipe.proto @@ -21,6 +21,7 @@ message TEvConnectResult { optional NActorsProto.TActorId ServerId = 4; optional bool Leader = 5 [default = true]; optional bool SupportsDataInPayload = 6; + optional uint32 Generation = 7; }; message TEvPush { diff --git a/ydb/core/quoter/ut_helpers.cpp b/ydb/core/quoter/ut_helpers.cpp index a1b72386829..82de76413f1 100644 --- a/ydb/core/quoter/ut_helpers.cpp +++ b/ydb/core/quoter/ut_helpers.cpp @@ -237,12 +237,23 @@ void TKesusProxyTestSetup::WaitProxyStart() { void TKesusProxyTestSetup::SendNotConnected(TTestTabletPipeFactory::TTestTabletPipe* pipe) { WaitProxyStart(); - Runtime->Send(new IEventHandle(KesusProxyId, pipe->GetSelfID(), new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::ERROR, pipe->GetSelfID(), TActorId(), true, false)), 0, true); + Runtime->Send( + new IEventHandle( + KesusProxyId, pipe->GetSelfID(), + new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::ERROR, pipe->GetSelfID(), TActorId(), true, false, 0) + ), + 0, true + ); } void TKesusProxyTestSetup::SendConnected(TTestTabletPipeFactory::TTestTabletPipe* pipe) { WaitProxyStart(); - Runtime->Send(new IEventHandle(KesusProxyId, pipe->GetSelfID(), new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::OK, pipe->GetSelfID(), pipe->GetSelfID(), true, false)), 0, true); + Runtime->Send( + new IEventHandle( + KesusProxyId, pipe->GetSelfID(), + new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::OK, pipe->GetSelfID(), pipe->GetSelfID(), true, false, 0) + ), 0, true + ); } void TKesusProxyTestSetup::SendDestroyed(TTestTabletPipeFactory::TTestTabletPipe* pipe) { @@ -477,11 +488,17 @@ void TKesusProxyTestSetup::TTestTabletPipeFactory::TTestTabletPipe::HandleResour } void TKesusProxyTestSetup::TTestTabletPipeFactory::TTestTabletPipe::SendNotConnected() { - Send(Parent->Parent->KesusProxyId, new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::ERROR, SelfID, TActorId(), true, false)); + Send( + Parent->Parent->KesusProxyId, + new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::ERROR, SelfID, TActorId(), true, false, 0) + ); } void TKesusProxyTestSetup::TTestTabletPipeFactory::TTestTabletPipe::SendConnected() { - Send(Parent->Parent->KesusProxyId, new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::OK, SelfID, SelfID, true, false)); + Send( + Parent->Parent->KesusProxyId, + new TEvTabletPipe::TEvClientConnected(KESUS_TABLET_ID, NKikimrProto::OK, SelfID, SelfID, true, false, 0) + ); } void TKesusProxyTestSetup::TTestTabletPipeFactory::TTestTabletPipe::SendDestroyed() { diff --git a/ydb/core/tablet/tablet_pipe_client.cpp b/ydb/core/tablet/tablet_pipe_client.cpp index 84d12678d2b..398b13378bd 100644 --- a/ydb/core/tablet/tablet_pipe_client.cpp +++ b/ydb/core/tablet/tablet_pipe_client.cpp @@ -284,6 +284,7 @@ namespace NTabletPipe { ServerId = ActorIdFromProto(record.GetServerId()); Leader = record.GetLeader(); + Generation = record.GetGeneration(); SupportsDataInPayload = record.GetSupportsDataInPayload(); Y_VERIFY(!ServerId || record.GetStatus() == NKikimrProto::OK); @@ -295,7 +296,8 @@ namespace NTabletPipe { Become(&TThis::StateWork); - ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::OK, ctx.SelfID, ServerId, Leader, false)); + ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::OK, ctx.SelfID, ServerId, + Leader, false, Generation)); BLOG_D("send queued"); while (TAutoPtr<IEventHandle> x = PayloadQueue->Pop()) @@ -423,7 +425,7 @@ namespace NTabletPipe { } } - ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, definitelyDead)); + ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, definitelyDead, Generation)); return Die(ctx); } @@ -445,7 +447,7 @@ namespace NTabletPipe { return; auto *msg = ev->Get(); if (msg->Status != NKikimrProto::OK) { - ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, false)); + ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, false, Generation)); return Die(ctx); } } @@ -453,7 +455,7 @@ namespace NTabletPipe { void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx) { if (HiveClient != ev->Sender) return; - ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, false)); + ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, false, Generation)); return Die(ctx); } @@ -468,7 +470,7 @@ namespace NTabletPipe { Become(&TThis::StateCheckDead, RetryState.MakeCheckDelay(), new TEvTabletPipe::TEvClientCheckDelay()); } else { BLOG_D("connect failed"); - ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, false)); + ctx.Send(Owner, new TEvTabletPipe::TEvClientConnected(TabletId, NKikimrProto::ERROR, SelfId(), TActorId(), Leader, false, Generation)); return Die(ctx); } } @@ -674,6 +676,7 @@ namespace NTabletPipe { TAutoPtr<TPayloadQueue, TPayloadQueue::TPtrCleanDestructor> PayloadQueue; TClientRetryState RetryState; bool Leader; + ui64 Generation = 0; TActorId HiveClient; ui32 CurrentHiveForwards = 0; static constexpr ui32 MAX_HIVE_FORWARDS = 10; diff --git a/ydb/core/tablet/tablet_pipe_server.cpp b/ydb/core/tablet/tablet_pipe_server.cpp index ea5979c18c6..a1e829a30bd 100644 --- a/ydb/core/tablet/tablet_pipe_server.cpp +++ b/ydb/core/tablet/tablet_pipe_server.cpp @@ -196,6 +196,7 @@ namespace NTabletPipe { OwnerId = ev->Get()->OwnerId; RecipientId = ev->Get()->RecipientId; Leader = ev->Get()->Leader; + Generation = ev->Get()->Generation; Y_VERIFY(OwnerId); Y_VERIFY(RecipientId); if (InterconnectSession) { @@ -209,7 +210,7 @@ namespace NTabletPipe { void OnConnected(const TActorContext& ctx) { Become(&TThis::StateActive); - SendToClient(ctx, new TEvTabletPipe::TEvConnectResult(NKikimrProto::OK, TabletId, ClientId, ctx.SelfID, Leader), IEventHandle::FlagTrackDelivery, ConnectCookie); + SendToClient(ctx, new TEvTabletPipe::TEvConnectResult(NKikimrProto::OK, TabletId, ClientId, ctx.SelfID, Leader, Generation), IEventHandle::FlagTrackDelivery, ConnectCookie); ctx.Send(RecipientId, new TEvTabletPipe::TEvServerConnected(TabletId, ClientId, ctx.SelfID)); Connected = true; } @@ -263,6 +264,7 @@ namespace NTabletPipe { bool Leader; bool Connected; bool HadShutdown = false; + ui32 Generation = 0; }; class TConnectAcceptor: public IConnectAcceptor { @@ -274,7 +276,7 @@ namespace NTabletPipe { { } - TActorId Accept(TEvTabletPipe::TEvConnect::TPtr &ev, TActorIdentity owner, TActorId recipientId, bool leader) override { + TActorId Accept(TEvTabletPipe::TEvConnect::TPtr &ev, TActorIdentity owner, TActorId recipientId, bool leader, ui64 generation) override { Y_VERIFY(ev->Get()->Record.GetTabletId() == TabletId); const TActorId clientId = ActorIdFromProto(ev->Get()->Record.GetClientId()); IActor* server = CreateServer(TabletId, clientId, ev->InterconnectSession, ev->Get()->Record.GetFeatures(), ev->Cookie); @@ -282,7 +284,7 @@ namespace NTabletPipe { LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::PIPE_SERVER, "[" << TabletId << "]" << " Accept Connect Originator# " << ev->Sender); ServerIds.insert(serverId); - ActivateServer(TabletId, serverId, owner, recipientId, leader); + ActivateServer(TabletId, serverId, owner, recipientId, leader, generation); return serverId; } @@ -291,7 +293,7 @@ namespace NTabletPipe { const TActorId clientId = ActorIdFromProto(ev->Get()->Record.GetClientId()); LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::PIPE_SERVER, "[" << TabletId << "]" << " Reject Connect Originator# " << ev->Sender); - owner.Send(clientId, new TEvTabletPipe::TEvConnectResult(status, TabletId, clientId, TActorId(), leader)); + owner.Send(clientId, new TEvTabletPipe::TEvConnectResult(status, TabletId, clientId, TActorId(), leader, 0)); } void Stop(TActorIdentity owner) override { @@ -326,10 +328,10 @@ namespace NTabletPipe { return serverId; } - void Activate(TActorIdentity owner, TActorId recipientId, bool leader) override { + void Activate(TActorIdentity owner, TActorId recipientId, bool leader, ui64 generation) override { LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::PIPE_SERVER, "[" << TabletId << "]" << " Activate"); for (const auto& serverId : ActivatePending) { - ActivateServer(TabletId, serverId, owner, recipientId, leader); + ActivateServer(TabletId, serverId, owner, recipientId, leader, generation); } ActivatePending.clear(); @@ -367,8 +369,8 @@ namespace NTabletPipe { return new TConnectAcceptor(tabletId); } - void ActivateServer(ui64 tabletId, TActorId serverId, TActorIdentity owner, TActorId recipientId, bool leader) { - owner.Send(serverId, new TEvTabletPipe::TEvActivate(tabletId, owner, recipientId, leader)); + void ActivateServer(ui64 tabletId, TActorId serverId, TActorIdentity owner, TActorId recipientId, bool leader, ui64 generation) { + owner.Send(serverId, new TEvTabletPipe::TEvActivate(tabletId, owner, recipientId, leader, generation)); } void CloseServer(TActorIdentity owner, TActorId serverId) { diff --git a/ydb/core/tablet/tablet_sys.cpp b/ydb/core/tablet/tablet_sys.cpp index 715143e4929..86166c17f77 100644 --- a/ydb/core/tablet/tablet_sys.cpp +++ b/ydb/core/tablet/tablet_sys.cpp @@ -1505,7 +1505,7 @@ void TTablet::Handle(TEvTabletPipe::TEvConnect::TPtr& ev) { if (PipeConnectAcceptor->IsStopped()) { PipeConnectAcceptor->Reject(ev, SelfId(), NKikimrProto::TRYLATER, Leader); } else if (PipeConnectAcceptor->IsActive()) { - PipeConnectAcceptor->Accept(ev, SelfId(), UserTablet, Leader); + PipeConnectAcceptor->Accept(ev, SelfId(), UserTablet, Leader, SuggestedGeneration); } else { PipeConnectAcceptor->Enqueue(ev, SelfId()); } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 1d39ebfcfb8..45b39945e68 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -103,6 +103,7 @@ #include <ydb/library/folder_service/mock/mock_folder_service_adapter.h> #include <ydb/core/client/server/msgbus_server_tracer.h> +#include <ydb/core/client/server/ic_nodes_cache_service.h> #include <library/cpp/actors/interconnect/interconnect.h> @@ -850,7 +851,11 @@ namespace Tests { } } } - + { + IActor* icNodeCache = NIcNodeCache::CreateICNodesInfoCacheService(Runtime->GetDynamicCounters()); + TActorId icCacheId = Runtime->Register(icNodeCache, nodeIdx); + Runtime->RegisterService(NIcNodeCache::CreateICNodesInfoCacheServiceId(), icCacheId, nodeIdx); + } { auto driverConfig = NYdb::TDriverConfig().SetEndpoint(TStringBuilder() << "localhost:" << Settings->GrpcPort); if (!Driver) { diff --git a/ydb/public/api/protos/ydb_topic.proto b/ydb/public/api/protos/ydb_topic.proto index 9621b32d98b..b5a95b5f1da 100644 --- a/ydb/public/api/protos/ydb_topic.proto +++ b/ydb/public/api/protos/ydb_topic.proto @@ -760,7 +760,7 @@ message PartitionLocation { int32 node_id = 1; // Partition generation. - uint64 generation = 2; + int64 generation = 2; } // Describe topic request sent from client to server. diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 07d2fae71e0..892a0cddb78 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -77,7 +77,7 @@ namespace NKikimr::NDataStreams::V1 { void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TActorContext& ctx, const TString& workingDir, const TString& name); void StateWork(TAutoPtr<IEventHandle>& ev); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx); }; @@ -93,9 +93,8 @@ namespace NKikimr::NDataStreams::V1 { Become(&TCreateStreamActor::StateWork); } - void TCreateStreamActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { + void TCreateStreamActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { Y_UNUSED(ev); - Y_UNUSED(ctx); } @@ -204,8 +203,7 @@ namespace NKikimr::NDataStreams::V1 { void FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction& proposal, const TActorContext& ctx, const TString& workingDir, const TString& name); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, - const TActorContext& ctx); + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); private: bool EnforceDeletion; @@ -234,9 +232,8 @@ namespace NKikimr::NDataStreams::V1 { modifyScheme.MutableDrop()->SetName(name); } - void TDeleteStreamActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, - const TActorContext& ctx) { - if (ReplyIfNotTopic(ev, ctx)) { + void TDeleteStreamActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + if (ReplyIfNotTopic(ev)) { return; } @@ -247,10 +244,10 @@ namespace NKikimr::NDataStreams::V1 { if (readRules.size() > 0 && EnforceDeletion == false) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::IN_USE), TStringBuilder() << "Stream has registered consumers" << - "and EnforceConsumerDeletion flag is false", ctx); + "and EnforceConsumerDeletion flag is false", ActorContext()); } - SendProposeRequest(ctx); + SendProposeRequest(ActorContext()); } //----------------------------------------------------------------------------------------------------------- @@ -564,7 +561,7 @@ namespace NKikimr::NDataStreams::V1 { void Bootstrap(const NActors::TActorContext& ctx); void StateWork(TAutoPtr<IEventHandle>& ev); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx) { if (ev->Get()->Status != NKikimrProto::EReplyStatus::OK) { @@ -625,13 +622,13 @@ namespace NKikimr::NDataStreams::V1 { } } - void TDescribeStreamActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { + void TDescribeStreamActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get(); Y_VERIFY(result->ResultSet.size() == 1); // describe only one topic const auto& response = result->ResultSet.front(); const TString path = JoinSeq("/", response.Path); - if (ReplyIfNotTopic(ev, ctx)) { + if (ReplyIfNotTopic(ev)) { return; } @@ -644,7 +641,7 @@ namespace NKikimr::NDataStreams::V1 { tabletIds.insert(partition.GetTabletId()); } if (tabletIds.size() == 0) { - ReplyAndDie(ctx); + ReplyAndDie(ActorContext()); } RequestsInfly = tabletIds.size(); @@ -659,9 +656,9 @@ namespace NKikimr::NDataStreams::V1 { }; for (auto& tabletId : tabletIds) { - Pipes.push_back(ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, tabletId, clientConfig))); + Pipes.push_back(ActorContext().Register(NTabletPipe::CreateClient(ActorContext().SelfID, tabletId, clientConfig))); TAutoPtr<TEvPersQueue::TEvOffsets> req(new TEvPersQueue::TEvOffsets); - NTabletPipe::SendData(ctx, Pipes.back(), req.Release()); + NTabletPipe::SendData(ActorContext(), Pipes.back(), req.Release()); } } @@ -726,7 +723,8 @@ namespace NKikimr::NDataStreams::V1 { } } if (!startShardFound) { - return ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx); + return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, + TStringBuilder() << "Bad shard id " << GetProtoRequest()->exclusive_start_shard_id(), ctx); } return ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ctx); } @@ -909,7 +907,7 @@ namespace NKikimr::NDataStreams::V1 { void Bootstrap(const NActors::TActorContext& ctx); void StateWork(TAutoPtr<IEventHandle>& ev); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); protected: void SendResponse(const TActorContext& ctx, const std::vector<std::pair<TString, ui64>>& readRules, ui32 leftToRead); @@ -972,11 +970,11 @@ namespace NKikimr::NDataStreams::V1 { } } - void TListStreamConsumersActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { + void TListStreamConsumersActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get(); Y_VERIFY(result->ResultSet.size() == 1); // describe only one topic - if (ReplyIfNotTopic(ev, ctx)) { + if (ReplyIfNotTopic(ev)) { return; } @@ -991,7 +989,7 @@ namespace NKikimr::NDataStreams::V1 { if (alreadyRead > (ui32)streamReadRulesNames.size()) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::INVALID_ARGUMENT), TStringBuilder() << "Provided next_token is malformed - " << - "everything is already read", ctx); + "everything is already read", ActorContext()); } const auto rulesToRead = std::min(streamReadRulesNames.size() - alreadyRead, MaxResults); @@ -1003,7 +1001,7 @@ namespace NKikimr::NDataStreams::V1 { } leftToRead = streamReadRulesNames.size() - alreadyRead - rulesToRead; - SendResponse(ctx, readRules, leftToRead); + SendResponse(ActorContext(), readRules, leftToRead); } void TListStreamConsumersActor::SendResponse(const TActorContext& ctx, const std::vector<std::pair<TString, ui64>>& readRules, ui32 leftToRead) { @@ -1176,7 +1174,7 @@ namespace NKikimr::NDataStreams::V1 { void Bootstrap(const NActors::TActorContext& ctx); void StateWork(TAutoPtr<IEventHandle>& ev); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); private: @@ -1248,15 +1246,15 @@ namespace NKikimr::NDataStreams::V1 { } } - void TGetShardIteratorActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { - if (ReplyIfNotTopic(ev, ctx)) { + void TGetShardIteratorActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + if (ReplyIfNotTopic(ev)) { return; } const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get(); auto topicInfo = navigate->ResultSet.begin(); StreamName = NKikimr::CanonizePath(topicInfo->Path); - if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { + if (AppData(ActorContext())->PQConfig.GetRequireCredentialsInNewProtocol()) { NACLib::TUserToken token(this->Request_->GetSerializedToken()); if (!topicInfo->SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow, @@ -1266,7 +1264,7 @@ namespace NKikimr::NDataStreams::V1 { TStringBuilder() << "Access to stream " << this->GetProtoRequest()->stream_name() << " is denied for subject " - << token.GetUserSID(), ctx); + << token.GetUserSID(), ActorContext()); } } @@ -1276,16 +1274,18 @@ namespace NKikimr::NDataStreams::V1 { TString shardName = GetShardName(partitionId); if (shardName == ShardId) { if (topicInfo->ShowPrivatePath) { - SendResponse(ctx, TShardIterator::Cdc(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber)); + SendResponse(ActorContext(), + TShardIterator::Cdc(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber)); } else { - SendResponse(ctx, TShardIterator::Common(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber)); + SendResponse(ActorContext(), + TShardIterator::Common(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber)); } return; } } ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND), - TStringBuilder() << "No such shard: " << ShardId, ctx); + TStringBuilder() << "No such shard: " << ShardId, ActorContext()); } void TGetShardIteratorActor::SendResponse(const TActorContext& ctx, const TShardIterator& shardIt) { @@ -1322,7 +1322,7 @@ namespace NKikimr::NDataStreams::V1 { void Bootstrap(const NActors::TActorContext& ctx); void StateWork(TAutoPtr<IEventHandle>& ev); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); void Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx); void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx); @@ -1390,7 +1390,7 @@ namespace NKikimr::NDataStreams::V1 { ); NKikimrClient::TPersQueueRequest request; - request.MutablePartitionRequest()->SetTopic(this->GetTopicPath(ctx)); + request.MutablePartitionRequest()->SetTopic(this->GetTopicPath()); request.MutablePartitionRequest()->SetPartition(ShardIterator.GetShardId()); ActorIdToProto(PipeClient, request.MutablePartitionRequest()->MutablePipeClient()); @@ -1417,12 +1417,11 @@ namespace NKikimr::NDataStreams::V1 { } } - void TGetRecordsActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, - const TActorContext& ctx) { + void TGetRecordsActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { const auto &result = ev->Get()->Request.Get(); const auto response = result->ResultSet.front(); - if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { + if (AppData(ActorContext())->PQConfig.GetRequireCredentialsInNewProtocol()) { NACLib::TUserToken token(this->Request_->GetSerializedToken()); if (!response.SecurityObject->CheckAccess(NACLib::EAccessRights::SelectRow, @@ -1432,7 +1431,7 @@ namespace NKikimr::NDataStreams::V1 { TStringBuilder() << "Access to stream " << ShardIterator.GetStreamName() << " is denied for subject " - << token.GetUserSID(), ctx); + << token.GetUserSID(), ActorContext()); } } @@ -1444,13 +1443,13 @@ namespace NKikimr::NDataStreams::V1 { auto partitionId = partition.GetPartitionId(); if (partitionId == ShardIterator.GetShardId()) { TabletId = partition.GetTabletId(); - return SendReadRequest(ctx); + return SendReadRequest(ActorContext()); } } } ReplyWithError(Ydb::StatusIds::BAD_REQUEST, static_cast<size_t>(NYds::EErrorCodes::NOT_FOUND), - TStringBuilder() << "No such shard: " << ShardIterator.GetShardId(), ctx); + TStringBuilder() << "No such shard: " << ShardIterator.GetShardId(), ActorContext()); } void TGetRecordsActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorContext& ctx) { @@ -1531,7 +1530,7 @@ namespace NKikimr::NDataStreams::V1 { case EWakeupTag::RlAllowed: return SendResponse(ctx); case EWakeupTag::RlNoResource: - return ReplyWithResult(Ydb::StatusIds::OVERLOADED, ctx); + return RespondWithCode(Ydb::StatusIds::OVERLOADED); default: return HandleWakeup(ev, ctx); } @@ -1575,8 +1574,7 @@ namespace NKikimr::NDataStreams::V1 { void Handle(TEvPersQueue::TEvOffsetsResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx); void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext& ctx); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, - const TActorContext& ctx); + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); void Die(const TActorContext& ctx) override; private: @@ -1666,10 +1664,11 @@ namespace NKikimr::NDataStreams::V1 { } } - void TListShardsActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { - if (ReplyIfNotTopic(ev, ctx)) { + void TListShardsActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + if (ReplyIfNotTopic(ev)) { return; } + auto ctx = ActorContext(); const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get(); auto topicInfo = navigate->ResultSet.front(); @@ -1683,7 +1682,7 @@ namespace NKikimr::NDataStreams::V1 { TStringBuilder() << "Access to stream " << this->GetProtoRequest()->stream_name() << " is denied for subject " - << token.GetUserSID(), ctx); + << token.GetUserSID(), ActorContext()); } } @@ -1855,8 +1854,7 @@ namespace NKikimr::NDataStreams::V1 { void Bootstrap(const NActors::TActorContext& ctx); void StateWork(TAutoPtr<IEventHandle>& ev); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, - const TActorContext& ctx); + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); private: void SendResponse(const TActorContext& ctx); @@ -1885,9 +1883,9 @@ namespace NKikimr::NDataStreams::V1 { } void TDescribeStreamSummaryActor::HandleCacheNavigateResponse( - TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx + TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev ) { - if (ReplyIfNotTopic(ev, ctx)) { + if (ReplyIfNotTopic(ev)) { return; } @@ -1900,7 +1898,7 @@ namespace NKikimr::NDataStreams::V1 { PQGroup = response.PQGroupInfo->Description; SelfInfo = response.Self->Info; - SendResponse(ctx); + SendResponse(ActorContext()); } void TDescribeStreamSummaryActor::SendResponse(const TActorContext& ctx) { diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h index cac15961120..d9bbb104817 100644 --- a/ydb/services/datastreams/put_records_actor.h +++ b/ydb/services/datastreams/put_records_actor.h @@ -227,7 +227,7 @@ namespace NKikimr::NDataStreams::V1 { void Bootstrap(const NActors::TActorContext &ctx); void PreparePartitionActors(const NActors::TActorContext& ctx); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); protected: void Write(const TActorContext& ctx); @@ -305,7 +305,7 @@ namespace NKikimr::NDataStreams::V1 { void TPutRecordsActorBase<TDerived, TProto>::SendNavigateRequest(const TActorContext& ctx) { auto schemeCacheRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(); NSchemeCache::TSchemeCacheNavigate::TEntry entry; - entry.Path = NKikimr::SplitPath(this->GetTopicPath(ctx)); + entry.Path = NKikimr::SplitPath(this->GetTopicPath()); entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList; entry.SyncVersion = true; schemeCacheRequest->ResultSet.emplace_back(entry); @@ -313,14 +313,14 @@ namespace NKikimr::NDataStreams::V1 { } template<class TDerived, class TProto> - void TPutRecordsActorBase<TDerived, TProto>::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { - if (TBase::ReplyIfNotTopic(ev, ctx)) { + void TPutRecordsActorBase<TDerived, TProto>::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + if (TBase::ReplyIfNotTopic(ev)) { return; } const NSchemeCache::TSchemeCacheNavigate* navigate = ev->Get()->Request.Get(); auto topicInfo = navigate->ResultSet.begin(); - if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { + if (AppData(this->ActorContext())->PQConfig.GetRequireCredentialsInNewProtocol()) { NACLib::TUserToken token(this->Request_->GetSerializedToken()); if (!topicInfo->SecurityObject->CheckAccess(NACLib::EAccessRights::UpdateRow, token)) { return this->ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, @@ -328,7 +328,7 @@ namespace NKikimr::NDataStreams::V1 { TStringBuilder() << "Access for stream " << this->GetProtoRequest()->stream_name() << " is denied for subject " - << token.GetUserSID(), ctx); + << token.GetUserSID(), this->ActorContext()); } } @@ -336,21 +336,21 @@ namespace NKikimr::NDataStreams::V1 { PQGroupInfo = topicInfo->PQGroupInfo; SetMeteringMode(PQGroupInfo->Description.GetPQTabletConfig().GetMeteringMode()); - if (!AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen() && !PQGroupInfo->Description.GetPQTabletConfig().GetLocalDC()) { + if (!AppData(this->ActorContext())->PQConfig.GetTopicsAreFirstClassCitizen() && !PQGroupInfo->Description.GetPQTabletConfig().GetLocalDC()) { return this->ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, TStringBuilder() << "write to mirrored stream " << this->GetProtoRequest()->stream_name() - << " is forbidden", ctx); + << " is forbidden", this->ActorContext()); } if (IsQuotaRequired()) { const auto ru = 1 + CalcRuConsumption(GetPayloadSize()); - Y_VERIFY(MaybeRequestQuota(ru, EWakeupTag::RlAllowed, ctx)); + Y_VERIFY(MaybeRequestQuota(ru, EWakeupTag::RlAllowed, this->ActorContext())); } else { - Write(ctx); + Write(this->ActorContext()); } } @@ -369,7 +369,7 @@ namespace NKikimr::NDataStreams::V1 { if (items[part].empty()) continue; PartitionToActor[part].ActorId = ctx.Register( new TDatastreamsPartitionActor(ctx.SelfID, partition.GetTabletId(), part, - this->GetTopicPath(ctx), std::move(items[part]), + this->GetTopicPath(), std::move(items[part]), ShouldBeCharged)); } this->CheckFinish(ctx); diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h index 83067772feb..5f01f97f944 100644 --- a/ydb/services/lib/actors/pq_schema_actor.h +++ b/ydb/services/lib/actors/pq_schema_actor.h @@ -106,99 +106,51 @@ namespace NKikimr::NGRpcProxy::V1 { } }; - template<class TDerived, class TRequest> - class TPQGrpcSchemaBase : public NKikimr::NGRpcService::TRpcSchemeRequestActor<TDerived, TRequest> { - protected: - using TBase = NKikimr::NGRpcService::TRpcSchemeRequestActor<TDerived, TRequest>; - - using TProtoRequest = typename TRequest::TRequest; + template<class TDerived> + class TPQSchemaBase { public: - TPQGrpcSchemaBase(NGRpcService::IRequestOpCtx *request, const TString& topicPath) - : TBase(request) - , TopicPath(topicPath) - { - } - TPQGrpcSchemaBase(NGRpcService::IRequestOpCtx* request) - : TBase(request) - , TopicPath(TBase::GetProtoRequest()->path()) + TPQSchemaBase(const TString& topicPath, const TString& database) + : TopicPath(topicPath) + , Database(database) { - //auto path = TBase::GetProtoRequest()->path(); - } - - TString GetTopicPath(const NActors::TActorContext& ctx) { - auto path = NPersQueue::GetFullTopicPath(ctx, this->Request_->GetDatabaseName(), TopicPath); - if (PrivateTopicName) { - path = JoinPath(ChildPath(NKikimr::SplitPath(path), *PrivateTopicName)); - } - return path; - } - - const TMaybe<TString>& GetCdcStreamName() const { - return CdcStreamName; } protected: - // TDerived must implement FillProposeRequest(TEvProposeTransaction&, const TActorContext& ctx, TString workingDir, TString name); - void SendProposeRequest(const NActors::TActorContext &ctx) { - std::pair <TString, TString> pathPair; - try { - pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath(ctx)); - } catch (const std::exception &ex) { - this->Request_->RaiseIssue(NYql::ExceptionToIssue(ex)); - return this->ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx); - } - - const auto &workingDir = pathPair.first; - const auto &name = pathPair.second; - - std::unique_ptr <TEvTxUserProxy::TEvProposeTransaction> proposal( - new TEvTxUserProxy::TEvProposeTransaction()); - - SetDatabase(proposal.get(), *this->Request_); + virtual TString GetTopicPath() const = 0; + virtual void RespondWithCode(Ydb::StatusIds::StatusCode status) = 0; + virtual void AddIssue(const NYql::TIssue& issue) = 0; + virtual bool SetRequestToken(NSchemeCache::TSchemeCacheNavigate* request) const = 0; - if (this->Request_->GetSerializedToken().empty()) { - if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { - return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED, - "Unauthenticated access is forbidden, please provide credentials", ctx); - } - } else { - proposal->Record.SetUserToken(this->Request_->GetSerializedToken()); - } - - static_cast<TDerived*>(this)->FillProposeRequest(*proposal, ctx, workingDir, name); + virtual bool ProcessCdc(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry) { + Y_UNUSED(entry); + return false; + }; - if (!IsDead) { - ctx.Send(MakeTxProxyID(), proposal.release()); - } - } - void SendDescribeProposeRequest(const NActors::TActorContext& ctx, bool showPrivate = false) { + void SendDescribeProposeRequest(const NActors::TActorContext& ctx, bool showPrivate) { auto navigateRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(); - navigateRequest->DatabaseName = CanonizePath(this->Request_->GetDatabaseName().GetOrElse("")); + navigateRequest->DatabaseName = CanonizePath(Database); NSchemeCache::TSchemeCacheNavigate::TEntry entry; - entry.Path = NKikimr::SplitPath(GetTopicPath(ctx)); + entry.Path = NKikimr::SplitPath(GetTopicPath()); entry.SyncVersion = true; - entry.ShowPrivatePath = showPrivate || PrivateTopicName.Defined(); + entry.ShowPrivatePath = showPrivate; entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList; navigateRequest->ResultSet.emplace_back(entry); - if (this->Request_->GetSerializedToken().empty()) { - if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { - return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED, - "Unauthenticated access is forbidden, please provide credentials", ctx); - } - } else { - navigateRequest->UserToken = new NACLib::TUserToken(this->Request_->GetSerializedToken()); + if (!SetRequestToken(navigateRequest.get())) { + AddIssue(FillIssue("Unauthenticated access is forbidden, please provide credentials", + Ydb::PersQueue::ErrorCode::ACCESS_DENIED)); + return RespondWithCode(Ydb::StatusIds::UNAUTHORIZED); } if (!IsDead) { ctx.Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(navigateRequest.release())); } } - bool ReplyIfNotTopic(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { + bool ReplyIfNotTopic(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get(); Y_VERIFY(result->ResultSet.size() == 1); const auto& response = result->ResultSet.front(); @@ -207,16 +159,15 @@ namespace NKikimr::NGRpcProxy::V1 { if (ev->Get()->Request.Get()->ResultSet.size() != 1 || ev->Get()->Request.Get()->ResultSet.begin()->Kind != NSchemeCache::TSchemeCacheNavigate::KindTopic) { - this->Request_->RaiseIssue(FillIssue(TStringBuilder() << "path '" << path << "' is not a topic", - Ydb::PersQueue::ErrorCode::VALIDATION_ERROR)); - TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + AddIssue(FillIssue(TStringBuilder() << "path '" << path << "' is not a topic", + Ydb::PersQueue::ErrorCode::VALIDATION_ERROR)); + RespondWithCode(Ydb::StatusIds::SCHEME_ERROR); return true; } - return false; } - void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get(); Y_VERIFY(result->ResultSet.size() == 1); const auto& response = result->ResultSet.front(); @@ -225,124 +176,232 @@ namespace NKikimr::NGRpcProxy::V1 { switch (response.Status) { case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: { if (response.Kind == NSchemeCache::TSchemeCacheNavigate::KindCdcStream) { - if constexpr (THasCdcStreamCompatibility<TDerived>::Value) { - if (static_cast<TDerived*>(this)->IsCdcStreamCompatible()) { - Y_VERIFY(response.ListNodeEntry->Children.size() == 1); - PrivateTopicName = response.ListNodeEntry->Children.at(0).Name; - - if (response.Self) { - CdcStreamName = response.Self->Info.GetName(); - } - - return SendDescribeProposeRequest(ctx); - } + if (ProcessCdc(response)) { + return; } - this->Request_->RaiseIssue( + AddIssue( FillIssue( TStringBuilder() << "path '" << path << "' is not compatible scheme object", Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT ) ); - return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR); } else if (!response.PQGroupInfo) { - this->Request_->RaiseIssue( + AddIssue( FillIssue( TStringBuilder() << "path '" << path << "' creation is not completed", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ) ); - return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR); } - return static_cast<TDerived*>(this)->HandleCacheNavigateResponse(ev, ctx); + return static_cast<TDerived*>(this)->HandleCacheNavigateResponse(ev); } break; case NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown: { - this->Request_->RaiseIssue( + AddIssue( FillIssue( TStringBuilder() << "path '" << path << "' does not exist or you " << "do not have access rights", Ydb::PersQueue::ErrorCode::ACCESS_DENIED ) ); - return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR); } case NSchemeCache::TSchemeCacheNavigate::EStatus::TableCreationNotComplete: { - this->Request_->RaiseIssue( + AddIssue( FillIssue( TStringBuilder() << "table creation is not completed", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ) ); - return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR); } break; case NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable: { - this->Request_->RaiseIssue( + AddIssue( FillIssue( TStringBuilder() << "path '" << path << "' is not a table", Ydb::PersQueue::ErrorCode::VALIDATION_ERROR ) ); - return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR); } break; case NSchemeCache::TSchemeCacheNavigate::EStatus::RootUnknown: { - this->Request_->RaiseIssue( + AddIssue( FillIssue( TStringBuilder() << "unknown database root", Ydb::PersQueue::ErrorCode::INVALID_ARGUMENT ) ); - return TBase::Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR); } break; default: - return TBase::Reply(Ydb::StatusIds::GENERIC_ERROR, ctx); + return RespondWithCode(Ydb::StatusIds::GENERIC_ERROR); } } - void ReplyWithError(Ydb::StatusIds::StatusCode status, - Ydb::PersQueue::ErrorCode::ErrorCode pqStatus, - const TString& messageText, const NActors::TActorContext& ctx) { - this->Request_->RaiseIssue(FillIssue(messageText, pqStatus)); - this->Request_->ReplyWithYdbStatus(status); - this->Die(ctx); - IsDead = true; + + void StateWork(TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + default: + Y_FAIL(); + } + } + + protected: + bool IsDead = false; + const TString TopicPath; + const TString Database; + }; + + + template<class TDerived, class TRequest> + class TPQGrpcSchemaBase : public NKikimr::NGRpcService::TRpcSchemeRequestActor<TDerived, TRequest>, + public TPQSchemaBase<TPQGrpcSchemaBase<TDerived, TRequest>> { + protected: + using TBase = NKikimr::NGRpcService::TRpcSchemeRequestActor<TDerived, TRequest>; + using TActorBase = TPQSchemaBase<TPQGrpcSchemaBase<TDerived, TRequest>>; + + using TProtoRequest = typename TRequest::TRequest; + + public: + + TPQGrpcSchemaBase(NGRpcService::IRequestOpCtx *request, const TString& topicPath) + : TBase(request) + , TActorBase(topicPath, this->Request_->GetDatabaseName().GetOrElse("")) + { + } + TPQGrpcSchemaBase(NGRpcService::IRequestOpCtx* request) + : TBase(request) + , TActorBase(TBase::GetProtoRequest()->path(), this->Request_->GetDatabaseName().GetOrElse("")) + { + } + + TString GetTopicPath() const override { + auto path = NPersQueue::GetFullTopicPath(this->ActorContext(), this->Request_->GetDatabaseName(), TActorBase::TopicPath); + if (PrivateTopicName) { + path = JoinPath(ChildPath(NKikimr::SplitPath(path), *PrivateTopicName)); + } + return path; + } + + const TMaybe<TString>& GetCdcStreamName() const { + return CdcStreamName; + } + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + return static_cast<TDerived*>(this)->HandleCacheNavigateResponse(ev); + } + + + protected: + // TDerived must implement FillProposeRequest(TEvProposeTransaction&, const TActorContext& ctx, TString workingDir, TString name); + void SendProposeRequest(const NActors::TActorContext &ctx) { + std::pair <TString, TString> pathPair; + try { + pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath()); + } catch (const std::exception &ex) { + this->Request_->RaiseIssue(NYql::ExceptionToIssue(ex)); + return this->RespondWithCode(Ydb::StatusIds::BAD_REQUEST); + } + + const auto& workingDir = pathPair.first; + const auto& name = pathPair.second; + + std::unique_ptr <TEvTxUserProxy::TEvProposeTransaction> proposal( + new TEvTxUserProxy::TEvProposeTransaction()); + + SetDatabase(proposal.get(), *this->Request_); + + if (this->Request_->GetSerializedToken().empty()) { + if (AppData(ctx)->PQConfig.GetRequireCredentialsInNewProtocol()) { + return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, Ydb::PersQueue::ErrorCode::ACCESS_DENIED, + "Unauthenticated access is forbidden, please provide credentials", ctx); + } + } else { + proposal->Record.SetUserToken(this->Request_->GetSerializedToken()); + } + + static_cast<TDerived*>(this)->FillProposeRequest(*proposal, ctx, workingDir, name); + + if (!TActorBase::IsDead) { + ctx.Send(MakeTxProxyID(), proposal.release()); + } + } + + void SendDescribeProposeRequest(const NActors::TActorContext& ctx, bool showPrivate = false) { + return TActorBase::SendDescribeProposeRequest(ctx, showPrivate || PrivateTopicName.Defined()); + } + + bool SetRequestToken(NSchemeCache::TSchemeCacheNavigate* request) const override { + if (this->Request_->GetSerializedToken().empty()) { + return !(AppData()->PQConfig.GetRequireCredentialsInNewProtocol()); + } else { + request->UserToken = new NACLib::TUserToken(this->Request_->GetSerializedToken()); + return true; + } + } + bool ProcessCdc(const NSchemeCache::TSchemeCacheNavigate::TEntry& response) override { + if constexpr (THasCdcStreamCompatibility<TDerived>::Value) { + if (static_cast<TDerived*>(this)->IsCdcStreamCompatible()) { + Y_VERIFY(response.ListNodeEntry->Children.size() == 1); + PrivateTopicName = response.ListNodeEntry->Children.at(0).Name; + + if (response.Self) { + CdcStreamName = response.Self->Info.GetName(); + } + + SendDescribeProposeRequest(TBase::ActorContext()); + return true; + } + } + return false; + } + + void AddIssue(const NYql::TIssue& issue) override { + this->Request_->RaiseIssue(issue); } void ReplyWithError(Ydb::StatusIds::StatusCode status, size_t additionalStatus, const TString& messageText, const NActors::TActorContext& ctx) { + if (TActorBase::IsDead) + return; this->Request_->RaiseIssue(FillIssue(messageText, additionalStatus)); this->Request_->ReplyWithYdbStatus(status); this->Die(ctx); - IsDead = true; + TActorBase::IsDead = true; } - void ReplyWithResult(Ydb::StatusIds::StatusCode status, const NActors::TActorContext& ctx) { + void RespondWithCode(Ydb::StatusIds::StatusCode status) override { + if (TActorBase::IsDead) + return; this->Request_->ReplyWithYdbStatus(status); - this->Die(ctx); - IsDead = true; + this->Die(this->ActorContext()); + TActorBase::IsDead = true; } template<class TProtoResult> void ReplyWithResult(Ydb::StatusIds::StatusCode status, const TProtoResult& result, const NActors::TActorContext& ctx) { + if (TActorBase::IsDead) + return; this->Request_->SendResult(result, status); this->Die(ctx); - IsDead = true; + TActorBase::IsDead = true; } void StateWork(TAutoPtr<IEventHandle>& ev) { switch (ev->GetTypeRewrite()) { - HFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, TActorBase::Handle); default: TBase::StateWork(ev); } } private: - bool IsDead = false; - const TString TopicPath; TMaybe<TString> PrivateTopicName; TMaybe<TString> CdcStreamName; }; @@ -400,11 +459,11 @@ namespace NKikimr::NGRpcProxy::V1 { this->DescribeSchemeResult.Reset(); } - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { const NSchemeCache::TSchemeCacheNavigate* result = ev->Get()->Request.Get(); Y_VERIFY(result->ResultSet.size() == 1); DescribeSchemeResult = std::move(ev); - return this->SendProposeRequest(ctx); + return this->SendProposeRequest(this->ActorContext()); } void Handle(TEvTxUserProxy::TEvProposeTransactionStatus::TPtr& ev, const TActorContext& ctx) { @@ -415,7 +474,7 @@ namespace NKikimr::NGRpcProxy::V1 { { return TBase::ReplyWithError(Ydb::StatusIds::OVERLOADED, Ydb::PersQueue::ErrorCode::OVERLOAD, - TStringBuilder() << "Topic with name " << TBase::GetTopicPath(ctx) << " has another alter in progress", + TStringBuilder() << "Topic with name " << TBase::GetTopicPath() << " has another alter in progress", ctx); } @@ -429,8 +488,77 @@ namespace NKikimr::NGRpcProxy::V1 { } } - private: + protected: THolder<NActors::TEventHandle<TEvTxProxySchemeCache::TEvNavigateKeySetResult>> DescribeSchemeResult; }; + + template<class TDerived, class TRequest, class TEvResponse> + class TPQInternalSchemaActor : public TPQSchemaBase<TPQInternalSchemaActor<TDerived, TRequest, TEvResponse>> + , public TActorBootstrapped<TPQInternalSchemaActor<TDerived, TRequest, TEvResponse>> + { + protected: + using TBase = TPQSchemaBase<TPQInternalSchemaActor<TDerived, TRequest, TEvResponse>>; + + public: + + TPQInternalSchemaActor(const TRequest& request, const TActorId& requester) + : TBase(request.Topic, request.Database) + , Request(request) + , Requester(requester) + , Response(MakeHolder<TEvResponse>()) + { + } + + virtual void Bootstrap(const TActorContext&) = 0; + virtual void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) = 0; + + TString GetTopicPath() const override { + return TBase::TopicPath; + } + + void SendDescribeProposeRequest() { + return TBase::SendDescribeProposeRequest(this->ActorContext(), false); + } + + bool HandleCacheNavigateResponseBase(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic + if (this->ReplyIfNotTopic(ev)) { + return false; + } + PQGroupInfo = ev->Get()->Request->ResultSet[0].PQGroupInfo; + return true; + } + + bool SetRequestToken(NSchemeCache::TSchemeCacheNavigate* request) const override { + if (Request.Token.empty()) { + return !(AppData()->PQConfig.GetRequireCredentialsInNewProtocol()); + } else { + request->UserToken = new NACLib::TUserToken(Request.Token); + return true; + } + } + + void AddIssue(const NYql::TIssue& issue) override { + Response->Issues.AddIssue(issue); + } + + + void RespondWithCode(Ydb::StatusIds::StatusCode status) override { + Response->Status = status; + this->ActorContext().Send(Requester, Response.Release()); + this->Die(this->ActorContext()); + TBase::IsDead = true; + } + + + private: + TRequest Request; + TActorId Requester; + TMaybe<TString> PrivateTopicName; + protected: + THolder<TEvResponse> Response; + TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo; + }; + } diff --git a/ydb/services/persqueue_v1/actors/events.h b/ydb/services/persqueue_v1/actors/events.h index 536714d08cc..cf1ee625238 100644 --- a/ydb/services/persqueue_v1/actors/events.h +++ b/ydb/services/persqueue_v1/actors/events.h @@ -8,6 +8,7 @@ #include <ydb/core/persqueue/percentile_counter.h> #include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h> +#include <ydb/public/api/protos/ydb_status_codes.pb.h> #include <ydb/services/lib/actors/type_definitions.h> @@ -63,6 +64,7 @@ struct TEvPQProxy { EvTopicUpdateToken, EvCommitRange, EvRequestTablet, + EvPartitionLocationResponse, EvEnd }; @@ -444,5 +446,49 @@ struct TEvPQProxy { ui64 TabletId; }; + struct TLocalResponseBase { + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + }; + + struct TPartitionLocationInfo { + ui64 PartitionId; + ui64 IncGeneration; + ui64 NodeId; + TString Hostname; + }; + + struct TEvPartitionLocationResponse : public NActors::TEventLocal<TEvRequestTablet, EvPartitionLocationResponse> + , public TLocalResponseBase + + { + TEvPartitionLocationResponse() {} + TVector<TPartitionLocationInfo> Partitions; + }; + +}; + +struct TLocalRequestBase { + TLocalRequestBase(const TString& topic, const TString& database, const TString& token) + : Topic(topic) + , Database(database) + , Token(token) + {} + + TString Topic; + TString Database; + TString Token; + +}; + +struct TGetPartitionsLocationRequest : public TLocalRequestBase { + TGetPartitionsLocationRequest() = default; + TGetPartitionsLocationRequest(const TString& topic, const TString& database, const TString& token, const TVector<ui32>& partitionIds) + : TLocalRequestBase(topic, database, token) + , PartitionIds(partitionIds) + {} + + TVector<ui32> PartitionIds; + }; } diff --git a/ydb/services/persqueue_v1/actors/schema_actors.cpp b/ydb/services/persqueue_v1/actors/schema_actors.cpp index 8830ab95086..0d71a4b146d 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.cpp +++ b/ydb/services/persqueue_v1/actors/schema_actors.cpp @@ -1,10 +1,11 @@ -#include "schema_actors.h" + #include "schema_actors.h" #include "persqueue_utils.h" #include <ydb/core/ydb_convert/ydb_convert.h> #include <ydb/library/persqueue/obfuscate/obfuscate.h> +#include <ydb/core/client/server/ic_nodes_cache_service.h> namespace NKikimr::NGRpcProxy::V1 { @@ -61,9 +62,9 @@ void TPQDescribeTopicActor::StateWork(TAutoPtr<IEventHandle>& ev) { } -void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { +void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic - if (ReplyIfNotTopic(ev, ctx)) { + if (ReplyIfNotTopic(ev)) { return; } @@ -80,6 +81,7 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T } auto settings = result.mutable_settings(); + if (response.PQGroupInfo) { const auto &pqDescr = response.PQGroupInfo->Description; settings->set_partitions_count(pqDescr.GetTotalGroupCount()); @@ -119,7 +121,7 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T settings->set_message_group_seqno_retention_period_ms(partConfig.GetSourceIdLifetimeSeconds() * 1000); settings->set_max_partition_message_groups_seqno_stored(partConfig.GetSourceIdMaxCounts()); - if (local || AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen()) { + if (local || AppData(ActorContext())->PQConfig.GetTopicsAreFirstClassCitizen()) { settings->set_max_partition_write_speed(partConfig.GetWriteSpeedInBytesPerSecond()); settings->set_max_partition_write_burst(partConfig.GetBurstSize()); } @@ -131,10 +133,10 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T settings->add_supported_codecs((Ydb::PersQueue::V1::Codec) (codec + 1)); } - const auto& pqConfig = AppData(ctx)->PQConfig; + const auto& pqConfig = AppData(ActorContext())->PQConfig; for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { auto rr = settings->add_read_rules(); - auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ctx); + auto consumerName = NPersQueue::ConvertOldConsumerName(config.GetReadRules(i), ActorContext()); rr->set_consumer_name(consumerName); rr->set_starting_message_timestamp_ms(config.GetReadFromTimestampsMs(i)); rr->set_supported_format( @@ -160,7 +162,7 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T "service type must be set for all read rules", Ydb::PersQueue::ErrorCode::ERROR )); - Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx); + Reply(Ydb::StatusIds::INTERNAL_ERROR, ActorContext()); return; } rr->set_service_type(pqConfig.GetDefaultClientServiceType().GetName()); @@ -202,14 +204,13 @@ void TPQDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::T rmr->set_database(partConfig.GetMirrorFrom().GetDatabase()); } } - return ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ctx); + return ReplyWithResult(Ydb::StatusIds::SUCCESS, result, ActorContext()); } void TPQDescribeTopicActor::Bootstrap(const NActors::TActorContext& ctx) { TBase::Bootstrap(ctx); - SendDescribeProposeRequest(ctx); Become(&TPQDescribeTopicActor::StateWork); } @@ -350,7 +351,7 @@ void TPQCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransacti workingDir, proposal.Record.GetDatabaseName(), LocalCluster); if (!error.empty()) { Request_->RaiseIssue(FillIssue(error, Ydb::PersQueue::ErrorCode::BAD_REQUEST)); - return ReplyWithResult(status, ctx); + return RespondWithCode(status); } } @@ -359,11 +360,11 @@ void TPQCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransacti if (!LocalCluster.empty() && config.GetLocalDC() && config.GetDC() != LocalCluster) { Request_->RaiseIssue(FillIssue(TStringBuilder() << "Local cluster is not correct - provided '" << config.GetDC() << "' instead of " << LocalCluster, Ydb::PersQueue::ErrorCode::BAD_REQUEST)); - return ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx); + return RespondWithCode(Ydb::StatusIds::BAD_REQUEST); } if (Count(Clusters, config.GetDC()) == 0 && !Clusters.empty()) { Request_->RaiseIssue(FillIssue(TStringBuilder() << "Unknown cluster '" << config.GetDC() << "'", Ydb::PersQueue::ErrorCode::BAD_REQUEST)); - return ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx); + return RespondWithCode(Ydb::StatusIds::BAD_REQUEST); } } @@ -383,7 +384,7 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction if (!error.empty()) { Request_->RaiseIssue(FillIssue(error, Ydb::PersQueue::ErrorCode::BAD_REQUEST)); - return ReplyWithResult(status, ctx); + return RespondWithCode(status); } } @@ -393,11 +394,11 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction if (!LocalCluster.empty() && config.GetLocalDC() && config.GetDC() != LocalCluster) { Request_->RaiseIssue(FillIssue(TStringBuilder() << "Local cluster is not correct - provided '" << config.GetDC() << "' instead of " << LocalCluster, Ydb::PersQueue::ErrorCode::BAD_REQUEST)); - return ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx); + return RespondWithCode(Ydb::StatusIds::BAD_REQUEST); } if (Count(Clusters, config.GetDC()) == 0 && !Clusters.empty()) { Request_->RaiseIssue(FillIssue(TStringBuilder() << "Unknown cluster '" << config.GetDC() << "'", Ydb::PersQueue::ErrorCode::BAD_REQUEST)); - return ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx); + return RespondWithCode(Ydb::StatusIds::BAD_REQUEST); } } @@ -413,7 +414,7 @@ void TPQAlterTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransactio if (!error.empty()) { Request_->RaiseIssue(FillIssue(error, Ydb::PersQueue::ErrorCode::BAD_REQUEST)); - return ReplyWithResult(status, ctx); + return RespondWithCode(status); } } @@ -448,20 +449,24 @@ void TAlterTopicActor::ModifyPersqueueConfig( auto status = FillProposeRequestImpl(*GetProtoRequest(), groupConfig, ctx, error, GetCdcStreamName().Defined()); if (!error.empty()) { Request_->RaiseIssue(FillIssue(error, Ydb::PersQueue::ErrorCode::BAD_REQUEST)); - return ReplyWithResult(status, ctx); + return RespondWithCode(status); } } TDescribeTopicActor::TDescribeTopicActor(NKikimr::NGRpcService::TEvDescribeTopicRequest* request) : TBase(request, request->GetProtoRequest()->path()) - , TDescribeTopicActorImpl("") + , TDescribeTopicActorImpl(TDescribeTopicActorSettings::DescribeTopic( + request->GetProtoRequest()->include_stats(), + request->GetProtoRequest()->include_location())) { } TDescribeTopicActor::TDescribeTopicActor(NKikimr::NGRpcService::IRequestOpCtx * ctx) : TBase(ctx, dynamic_cast<const Ydb::Topic::DescribeTopicRequest*>(ctx->GetRequest())->path()) - , TDescribeTopicActorImpl("") + , TDescribeTopicActorImpl(TDescribeTopicActorSettings::DescribeTopic( + dynamic_cast<const Ydb::Topic::DescribeTopicRequest*>(ctx->GetRequest())->include_stats(), + dynamic_cast<const Ydb::Topic::DescribeTopicRequest*>(ctx->GetRequest())->include_location())) { } @@ -469,20 +474,27 @@ TDescribeTopicActor::TDescribeTopicActor(NKikimr::NGRpcService::IRequestOpCtx * TDescribeConsumerActor::TDescribeConsumerActor(NKikimr::NGRpcService::TEvDescribeConsumerRequest* request) : TBase(request, request->GetProtoRequest()->path()) - , TDescribeTopicActorImpl(request->GetProtoRequest()->consumer()) + , TDescribeTopicActorImpl(TDescribeTopicActorSettings::DescribeConsumer( + request->GetProtoRequest()->consumer(), + request->GetProtoRequest()->include_stats(), + request->GetProtoRequest()->include_location())) { } TDescribeConsumerActor::TDescribeConsumerActor(NKikimr::NGRpcService::IRequestOpCtx * ctx) : TBase(ctx, dynamic_cast<const Ydb::Topic::DescribeConsumerRequest*>(ctx->GetRequest())->path()) - , TDescribeTopicActorImpl(dynamic_cast<const Ydb::Topic::DescribeConsumerRequest*>(ctx->GetRequest())->consumer()) + , TDescribeTopicActorImpl(TDescribeTopicActorSettings::DescribeConsumer( + dynamic_cast<const Ydb::Topic::DescribeConsumerRequest*>(ctx->GetRequest())->consumer(), + dynamic_cast<const Ydb::Topic::DescribeTopicRequest*>(ctx->GetRequest())->include_stats(), + dynamic_cast<const Ydb::Topic::DescribeTopicRequest*>(ctx->GetRequest())->include_location())) { } -TDescribeTopicActorImpl::TDescribeTopicActorImpl(const TString& consumer) - : Consumer(consumer) +TDescribeTopicActorImpl::TDescribeTopicActorImpl(const TDescribeTopicActorSettings& settings) + : Settings(settings) { + } @@ -492,6 +504,8 @@ bool TDescribeTopicActorImpl::StateWork(TAutoPtr<IEventHandle>& ev, const TActor HFuncCtx(TEvTabletPipe::TEvClientConnected, Handle, ctx); HFuncCtx(NKikimr::TEvPersQueue::TEvStatusResponse, Handle, ctx); HFuncCtx(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse, Handle, ctx); + HFuncCtx(TEvPersQueue::TEvGetPartitionsLocationResponse, Handle, ctx); + HFuncCtx(TEvents::TEvWakeup, HandleWakeup, ctx); default: return false; } return true; @@ -534,7 +548,6 @@ void TDescribeConsumerActor::RaiseError(const TString& error, const Ydb::PersQue TBase::Reply(status, ctx); } - void TDescribeTopicActorImpl::RestartTablet(ui64 tabletId, const TActorContext& ctx, TActorId pipe, const TDuration& delay) { auto it = Tablets.find(tabletId); if (it == Tablets.end()) return; @@ -542,6 +555,10 @@ void TDescribeTopicActorImpl::RestartTablet(ui64 tabletId, const TActorContext& if (--it->second.RetriesLeft == 0) { return RaiseError(TStringBuilder() << "Tablet " << tabletId << " unresponsible", Ydb::PersQueue::ErrorCode::ERROR, Ydb::StatusIds::INTERNAL_ERROR, ctx); } + it->second.Pipe = TActorId{}; + if (tabletId == BalancerTabletId) { + BalancerPipe = nullptr; + } Y_VERIFY(RequestsInfly > 0); --RequestsInfly; if (delay == TDuration::Zero()) { @@ -556,22 +573,92 @@ void TDescribeTopicActorImpl::Handle(TEvPQProxy::TEvRequestTablet::TPtr& ev, con --RequestsInfly; auto it = Tablets.find(ev->Get()->TabletId); if (it == Tablets.end()) return; + if (ev->Get()->TabletId == BalancerTabletId && PendingLocation) { + PendingLocation = false; + } + RequestTablet(it->second, ctx); } +void TDescribeTopicActorImpl::RequestTablet(ui64 tabletId, const TActorContext& ctx) { + auto it = Tablets.find(tabletId); + if (it != Tablets.end()) { + RequestTablet(it->second, ctx); + } +} + +TActorId CreatePipe(ui64 tabletId, const TActorContext& ctx) { + return ctx.Register(NTabletPipe::CreateClient( + ctx.SelfID, tabletId, NTabletPipe::TClientConfig(NTabletPipe::TClientRetryPolicy::WithRetries()) + )); +} + void TDescribeTopicActorImpl::RequestTablet(TTabletInfo& tablet, const TActorContext& ctx) { - tablet.Pipe = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, tablet.TabletId, NTabletPipe::TClientConfig(NTabletPipe::TClientRetryPolicy::WithRetries()))); + if (!tablet.Pipe) + tablet.Pipe = CreatePipe(tablet.TabletId, ctx); if (tablet.TabletId == BalancerTabletId) { - THolder<NKikimr::TEvPersQueue::TEvGetReadSessionsInfo> ev(new NKikimr::TEvPersQueue::TEvGetReadSessionsInfo(NPersQueue::ConvertNewConsumerName(Consumer, ctx))); - NTabletPipe::SendData(ctx, tablet.Pipe, ev.Release()); + BalancerPipe = &tablet.Pipe; + return RequestBalancer(ctx); } else { - THolder<NKikimr::TEvPersQueue::TEvStatus> ev(new NKikimr::TEvPersQueue::TEvStatus(Consumer.empty() ? "" : NPersQueue::ConvertNewConsumerName(Consumer, ctx), Consumer.empty())); + THolder<NKikimr::TEvPersQueue::TEvStatus> ev(new NKikimr::TEvPersQueue::TEvStatus( + Settings.Consumer.empty() ? "" : NPersQueue::ConvertNewConsumerName(Settings.Consumer, ctx), + Settings.Consumer.empty() + )); NTabletPipe::SendData(ctx, tablet.Pipe, ev.Release()); } ++RequestsInfly; } +void TDescribeTopicActorImpl::RequestBalancer(const TActorContext& ctx) { + Y_VERIFY(BalancerTabletId); + if (Settings.RequireLocation && !PendingLocation && !GotLocation) { + return RequestPartitionsLocationIfRequired(ctx); + } + switch (Settings.Mode) { + case TDescribeTopicActorSettings::EMode::DescribeConsumer: + case TDescribeTopicActorSettings::EMode::DescribeTopic: + if (Settings.RequireStats) { + NTabletPipe::SendData( + ctx, *BalancerPipe, + new TEvPersQueue::TEvGetReadSessionsInfo(NPersQueue::ConvertNewConsumerName(Settings.Consumer, ctx)) + ); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Request sessions"); + ++RequestsInfly; + } + break; + case TDescribeTopicActorSettings::EMode::DescribePartitions: { + break; + } + } +} + +void TDescribeTopicActorImpl::RequestPartitionsLocationIfRequired(const TActorContext& ctx) { + if (!Settings.RequireLocation || PendingLocation) + return; + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Request location"); + THashSet<ui64> partIds; + TVector<ui64> partsVector; + for (auto p : Settings.Partitions) { + if (p >= TotalPartitions) { + return RaiseError( + TStringBuilder() << "No partition " << Settings.Partitions[0] << " in topic", + Ydb::PersQueue::ErrorCode::BAD_REQUEST, Ydb::StatusIds::BAD_REQUEST, ctx + ); + } + auto res = partIds.insert(p); + if (res.second) { + partsVector.push_back(p); + } + } + NTabletPipe::SendData( + ctx, *BalancerPipe, + new TEvPersQueue::TEvGetPartitionsLocation(partsVector) + ); + PendingLocation = true; + GotLocation = false; +} + void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) { auto it = Tablets.find(ev->Get()->Record.GetTabletId()); if (it == Tablets.end()) return; @@ -592,7 +679,7 @@ void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::T if (RequestsInfly == 0) { RequestAdditionalInfo(ctx); - if (RequestsInfly == 0) { + if (RequestsInfly == 0 && !PendingLocation) { Reply(ctx); } } @@ -600,25 +687,58 @@ void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::T void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Got sessions"); + if (BalancerTabletId == 0) return; auto it = Tablets.find(BalancerTabletId); Y_VERIFY(it != Tablets.end()); --RequestsInfly; - NTabletPipe::CloseClient(ctx, it->second.Pipe); - it->second.Pipe = TActorId{}; + NTabletPipe::CloseClient(ctx, *BalancerPipe); + *BalancerPipe = TActorId{}; BalancerTabletId = 0; ApplyResponse(it->second, ev, ctx); if (RequestsInfly == 0) { RequestAdditionalInfo(ctx); - if (RequestsInfly == 0) { + if (RequestsInfly == 0 && !PendingLocation) { Reply(ctx); } } } +void TDescribeTopicActorImpl::Handle(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Got location"); + if (BalancerTabletId == 0) + return; + auto it = Tablets.find(BalancerTabletId); + Y_VERIFY(it != Tablets.end()); + PendingLocation = false; + const auto& record = ev->Get()->Record; + if (record.GetStatus()) { + auto res = ApplyResponse(ev, ctx); + if (res) { + GotLocation = true; + CheckCloseBalancerPipe(ctx); + if (!RequestsInfly && !PendingLocation) { + Reply(ctx); + } + return; + } + } + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Something wrong on location, retry"); + //Something gone wrong, retry + ctx.Schedule(TDuration::MilliSeconds(200), new TEvents::TEvWakeup()); +} + +void TDescribeTopicActorImpl::HandleWakeup(TEvents::TEvWakeup::TPtr&, const NActors::TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "DescribeTopicImpl " << ctx.SelfID.ToString() << ": Wakeup"); + RequestPartitionsLocationIfRequired(ctx); + if (!RequestsInfly && !PendingLocation) { + return Reply(ctx); + } +} void TDescribeTopicActorImpl::RequestAdditionalInfo(const TActorContext& ctx) { if (BalancerTabletId) { @@ -626,10 +746,16 @@ void TDescribeTopicActorImpl::RequestAdditionalInfo(const TActorContext& ctx) { } } -void TDescribeTopicActorImpl::RequestTablet(ui64 tabletId, const TActorContext& ctx) { - auto it = Tablets.find(tabletId); - if (it != Tablets.end()) { - RequestTablet(it->second, ctx); +void TDescribeTopicActorImpl::CheckCloseBalancerPipe(const TActorContext& ctx) { + switch (Settings.Mode) { + case TDescribeTopicActorSettings::EMode::DescribePartitions: + if (RequestsInfly || PendingLocation) + return; + // no break; + default: + NTabletPipe::CloseClient(ctx, *BalancerPipe); + *BalancerPipe = TActorId{}; + BalancerTabletId = 0; } } @@ -648,6 +774,11 @@ void UpdateProtoTime(T* proto, const ui64 ms, bool storeMin) { } } +void SetPartitionLocation(const NKikimrPQ::TPartitionLocation& location, Ydb::Topic::PartitionLocation* result) { + result->set_node_id(location.GetNodeId()); + result->set_generation(location.GetGeneration()); +} + void TDescribeTopicActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) { Y_UNUSED(ctx); @@ -663,6 +794,19 @@ void AddWindowsStat(Ydb::Topic::MultipleWindowsStat *stat, ui64 perMin, ui64 per stat->set_per_day(stat->per_day() + perDay); } +void FillPartitionStats(const NKikimrPQ::TStatusResponse::TPartResult& partResult, Ydb::Topic::PartitionStats* partStats, ui64 nodeId) { + partStats->set_store_size_bytes(partResult.GetPartitionSize()); + partStats->mutable_partition_offsets()->set_start(partResult.GetStartOffset()); + partStats->mutable_partition_offsets()->set_end(partResult.GetEndOffset()); + + SetProtoTime(partStats->mutable_last_write_time(), partResult.GetLastWriteTimestampMs()); + SetProtoTime(partStats->mutable_max_write_time_lag(), partResult.GetWriteLagMs()); + + AddWindowsStat(partStats->mutable_bytes_written(), partResult.GetAvgWriteSpeedPerMin(), partResult.GetAvgWriteSpeedPerHour(), partResult.GetAvgWriteSpeedPerDay()); + + partStats->set_partition_node_id(nodeId); +} + void TDescribeTopicActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) { Y_UNUSED(ctx); @@ -718,30 +862,41 @@ void TDescribeTopicActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPer for (auto& partRes : *(Result.mutable_partitions())) { auto it = res.find(partRes.partition_id()); - if (it == res.end()) continue; - - const auto& partResult = it->second; - auto partStats = partRes.mutable_partition_stats(); - - partStats->set_store_size_bytes(partResult.GetPartitionSize()); - partStats->mutable_partition_offsets()->set_start(partResult.GetStartOffset()); - partStats->mutable_partition_offsets()->set_end(partResult.GetEndOffset()); + if (it == res.end()) + continue; + FillPartitionStats(it->second, partRes.mutable_partition_stats(), tabletInfo.NodeId); + } +} - SetProtoTime(partStats->mutable_last_write_time(), partResult.GetLastWriteTimestampMs()); - SetProtoTime(partStats->mutable_max_write_time_lag(), partResult.GetWriteLagMs()); +bool TDescribeTopicActor::ApplyResponse( + TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& +) { + const auto& record = ev->Get()->Record; + Y_VERIFY(record.LocationsSize() == TotalPartitions); + Y_VERIFY(Settings.RequireLocation); - AddWindowsStat(partStats->mutable_bytes_written(), partResult.GetAvgWriteSpeedPerMin(), partResult.GetAvgWriteSpeedPerHour(), partResult.GetAvgWriteSpeedPerDay()); + for (auto i = 0u; i < TotalPartitions; ++i) { + const auto& location = record.GetLocations(i); + auto* locationResult = Result.mutable_partitions(i)->mutable_partition_location(); + SetPartitionLocation(location, locationResult); - partStats->set_partition_node_id(tabletInfo.NodeId); } + return true; } + void TDescribeTopicActor::Reply(const TActorContext& ctx) { + if (TBase::IsDead) { + return; + } return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx); } void TDescribeConsumerActor::Reply(const TActorContext& ctx) { + if (TBase::IsDead) { + return; + } return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx); } @@ -798,7 +953,7 @@ void TDescribeConsumerActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEv partStats->set_partition_node_id(tabletInfo.NodeId); - if (Consumer) { + if (Settings.Consumer) { auto consStats = partRes.mutable_partition_consumer_stats(); consStats->set_last_read_offset(partResult.GetLagsInfo().GetReadPosition().GetOffset()); @@ -831,7 +986,21 @@ void TDescribeConsumerActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEv } } +bool TDescribeConsumerActor::ApplyResponse( + TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& +) { + const auto& record = ev->Get()->Record; + Y_VERIFY(record.LocationsSize() == TotalPartitions); + Y_VERIFY(Settings.RequireLocation); + for (auto i = 0u; i < TotalPartitions; ++i) { + const auto& location = record.GetLocations(i); + auto* locationResult = Result.mutable_partitions(i)->mutable_partition_location(); + SetPartitionLocation(location, locationResult); + } + return true; +} + bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig& config, ui32 i, const NActors::TActorContext& ctx, Ydb::StatusIds::StatusCode& status, TString& error) @@ -871,9 +1040,9 @@ bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfi return true; } -void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { +void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic - if (ReplyIfNotTopic(ev, ctx)) { + if (ReplyIfNotTopic(ev)) { return; } @@ -888,7 +1057,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv } if (response.PQGroupInfo) { - const auto &pqDescr = response.PQGroupInfo->Description; + const auto& pqDescr = response.PQGroupInfo->Description; Result.mutable_partitioning_settings()->set_min_active_partitions(pqDescr.GetTotalGroupCount()); for(ui32 i = 0; i < pqDescr.GetTotalGroupCount(); ++i) { auto part = Result.add_partitions(); @@ -929,7 +1098,7 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv (*Result.mutable_attributes())["_message_group_seqno_retention_period_ms"] = TStringBuilder() << (partConfig.GetSourceIdLifetimeSeconds() * 1000); (*Result.mutable_attributes())["__max_partition_message_groups_seqno_stored"] = TStringBuilder() << partConfig.GetSourceIdMaxCounts(); - const auto& pqConfig = AppData(ctx)->PQConfig; + const auto& pqConfig = AppData(ActorContext())->PQConfig; if (local || pqConfig.GetTopicsAreFirstClassCitizen()) { Result.set_partition_write_speed_bytes_per_second(partConfig.GetWriteSpeedInBytesPerSecond()); @@ -958,34 +1127,37 @@ void TDescribeTopicActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEv break; } } - auto consumerName = NPersQueue::ConvertNewConsumerName(Consumer, ctx); + auto consumerName = NPersQueue::ConvertNewConsumerName(Settings.Consumer, ActorContext()); bool found = false; for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { if (consumerName == config.GetReadRules(i)) found = true; auto rr = Result.add_consumers(); Ydb::StatusIds::StatusCode status; TString error; - if (!FillConsumerProto(rr, config, i, ctx, status, error)) { - return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ctx); + if (!FillConsumerProto(rr, config, i, ActorContext(), status, error)) { + return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ActorContext()); } } - if (GetProtoRequest()->include_stats()) { - if (Consumer && !found) { - Request_->RaiseIssue(FillIssue(TStringBuilder() << "no consumer '" << Consumer << "' in topic", Ydb::PersQueue::ErrorCode::BAD_REQUEST)); - return ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, ctx); + if (GetProtoRequest()->include_stats() || GetProtoRequest()->include_location()) { + if (Settings.Consumer && !found) { + Request_->RaiseIssue(FillIssue( + TStringBuilder() << "no consumer '" << Settings.Consumer << "' in topic", + Ydb::PersQueue::ErrorCode::BAD_REQUEST + )); + return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR); } - ProcessTablets(pqDescr, ctx); + ProcessTablets(pqDescr, ActorContext()); return; } } - return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx); + return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ActorContext()); } -void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { +void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic - if (ReplyIfNotTopic(ev, ctx)) { + if (ReplyIfNotTopic(ev)) { return; } const auto& response = ev->Get()->Request.Get()->ResultSet.front(); @@ -998,7 +1170,7 @@ void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache:: if (const auto& name = GetCdcStreamName()) { selfEntry->set_name(*name); } - selfEntry->set_name(selfEntry->name() + "/" + Consumer); + selfEntry->set_name(selfEntry->name() + "/" + Settings.Consumer); if (response.PQGroupInfo) { const auto& pqDescr = response.PQGroupInfo->Description; @@ -1010,7 +1182,7 @@ void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache:: part->set_active(true); } - auto consumerName = NPersQueue::ConvertNewConsumerName(Consumer, ctx); + auto consumerName = NPersQueue::ConvertNewConsumerName(Settings.Consumer, ActorContext()); bool found = false; for (ui32 i = 0; i < config.ReadRulesSize(); ++i) { if (consumerName != config.GetReadRules(i)) @@ -1019,44 +1191,71 @@ void TDescribeConsumerActor::HandleCacheNavigateResponse(TEvTxProxySchemeCache:: auto rr = Result.mutable_consumer(); Ydb::StatusIds::StatusCode status; TString error; - if (!FillConsumerProto(rr, config, i, ctx, status, error)) { - return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ctx); + if (!FillConsumerProto(rr, config, i, ActorContext(), status, error)) { + return RaiseError(error, Ydb::PersQueue::ErrorCode::ERROR, status, ActorContext()); } break; } if (!found) { - Request_->RaiseIssue(FillIssue(TStringBuilder() << "no consumer '" << Consumer << "' in topic", Ydb::PersQueue::ErrorCode::BAD_REQUEST)); - return ReplyWithResult(Ydb::StatusIds::SCHEME_ERROR, ctx); + Request_->RaiseIssue(FillIssue( + TStringBuilder() << "no consumer '" << Settings.Consumer << "' in topic", + Ydb::PersQueue::ErrorCode::BAD_REQUEST + )); + return RespondWithCode(Ydb::StatusIds::SCHEME_ERROR); } - if (GetProtoRequest()->include_stats()) { - ProcessTablets(pqDescr, ctx); + if (GetProtoRequest()->include_stats() || GetProtoRequest()->include_location()) { + ProcessTablets(pqDescr, ActorContext()); return; } } - return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx); + return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ActorContext()); } -bool TDescribeTopicActorImpl::ProcessTablets(const NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, const TActorContext& ctx) { + +bool TDescribeTopicActorImpl::ProcessTablets( + const NKikimrSchemeOp::TPersQueueGroupDescription& pqDescr, const TActorContext& ctx +) { + auto addBalancer = [&] { + BalancerTabletId = pqDescr.GetBalancerTabletID(); + Tablets[BalancerTabletId].TabletId = BalancerTabletId; + }; + + auto partitionFilter = [&] (ui32 partId) { + if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribePartitions) { + return Settings.RequireStats && partId == Settings.Partitions[0]; + } else { + return Settings.RequireStats; + } + return true; + }; + TotalPartitions = pqDescr.GetTotalGroupCount(); + for (ui32 i = 0; i < pqDescr.PartitionsSize(); ++i) { const auto& pi = pqDescr.GetPartitions(i); + if (!partitionFilter(pi.GetPartitionId())) { + continue; + } Tablets[pi.GetTabletId()].Partitions.push_back(pi.GetPartitionId()); Tablets[pi.GetTabletId()].TabletId = pi.GetTabletId(); } for (auto& pair : Tablets) { RequestTablet(pair.second, ctx); } - if (!Consumer.empty()) { - BalancerTabletId = pqDescr.GetBalancerTabletID(); - Tablets[BalancerTabletId].TabletId = BalancerTabletId; + + if (Settings.RequireLocation) { + addBalancer(); + RequestAdditionalInfo(ctx); + } else if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribeConsumer) { + addBalancer(); } - - if (RequestsInfly == 0) { + if (RequestsInfly == 0 && !PendingLocation) { Reply(ctx); return false; } + return true; } @@ -1066,6 +1265,7 @@ void TDescribeTopicActor::Bootstrap(const NActors::TActorContext& ctx) SendDescribeProposeRequest(ctx); Become(&TDescribeTopicActor::StateWork); + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "Describe topic actor for path " << GetProtoRequest()->path()); } void TDescribeConsumerActor::Bootstrap(const NActors::TActorContext& ctx) @@ -1076,4 +1276,206 @@ void TDescribeConsumerActor::Bootstrap(const NActors::TActorContext& ctx) Become(&TDescribeConsumerActor::StateWork); } + +template<class TProtoType> +TDescribeTopicActorSettings SettingsFromDescribePartRequest(TProtoType* request) { + return TDescribeTopicActorSettings::DescribePartitionSettings( + request->partition_id(), request->include_stats(), request->include_location() + ); +} + +TDescribePartitionActor::TDescribePartitionActor(NKikimr::NGRpcService::TEvDescribePartitionRequest* request) + : TBase(request, request->GetProtoRequest()->path()) + , TDescribeTopicActorImpl(SettingsFromDescribePartRequest(request->GetProtoRequest())) +{ +} + +TDescribePartitionActor::TDescribePartitionActor(NKikimr::NGRpcService::IRequestOpCtx* ctx) + : TBase(ctx, dynamic_cast<const Ydb::Topic::DescribePartitionRequest*>(ctx->GetRequest())->path()) + , TDescribeTopicActorImpl(SettingsFromDescribePartRequest(dynamic_cast<const Ydb::Topic::DescribePartitionRequest*>(ctx->GetRequest()))) +{ +} + +void TDescribePartitionActor::Bootstrap(const NActors::TActorContext& ctx) +{ + TBase::Bootstrap(ctx); + SendDescribeProposeRequest(ctx); + Become(&TDescribePartitionActor::StateWork); +} + +void TDescribePartitionActor::StateWork(TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + default: + if (!TDescribeTopicActorImpl::StateWork(ev, ActorContext())) { + TBase::StateWork(ev); + }; + } +} + +void TDescribePartitionActor::HandleCacheNavigateResponse( + TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev +) { + Y_VERIFY(ev->Get()->Request.Get()->ResultSet.size() == 1); // describe for only one topic + if (ReplyIfNotTopic(ev)) { + return; + } + PQGroupInfo = ev->Get()->Request->ResultSet[0].PQGroupInfo; + auto* partRes = Result.mutable_partition(); + partRes->set_partition_id(Settings.Partitions[0]); + partRes->set_active(true); + ProcessTablets(PQGroupInfo->Description, this->ActorContext()); +} + +void TDescribePartitionActor::ApplyResponse(TTabletInfo&, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr&, const TActorContext&) { + Y_FAIL(""); +} + +void TDescribePartitionActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext&) { + auto* partResult = Result.mutable_partition(); + + const auto& record = ev->Get()->Record; + for (auto partData : record.GetPartResult()) { + if ((ui32)partData.GetPartition() != Settings.Partitions[0]) + continue; + + Y_VERIFY((ui32)(partData.GetPartition()) == Settings.Partitions[0]); + partResult->set_partition_id(partData.GetPartition()); + partResult->set_active(true); + FillPartitionStats(partData, partResult->mutable_partition_stats(), tabletInfo.NodeId); + } +} + +bool TDescribePartitionActor::ApplyResponse( + TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& +) { + const auto& record = ev->Get()->Record; + if (Settings.Partitions) { + Y_VERIFY(record.LocationsSize() == 1); + } + + const auto& location = record.GetLocations(0); + auto* pResult = Result.mutable_partition(); + pResult->set_partition_id(location.GetPartitionId()); + pResult->set_active(true); + auto* locationResult = pResult->mutable_partition_location(); + SetPartitionLocation(location, locationResult); + return true; +} + +void TDescribePartitionActor::RaiseError( + const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, + const TActorContext& +) { + if (TBase::IsDead) + return; + this->Request_->RaiseIssue(FillIssue(error, errorCode)); + TBase::RespondWithCode(status); +} + +void TDescribePartitionActor::Reply(const TActorContext& ctx) { + if (TBase::IsDead) { + return; + } + if (Settings.RequireLocation) { + Y_VERIFY(Result.partition().has_partition_location()); + } + return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx); +} + +using namespace NIcNodeCache; + +TPartitionsLocationActor::TPartitionsLocationActor(const TGetPartitionsLocationRequest& request, const TActorId& requester) + : TBase(request, requester) + , TDescribeTopicActorImpl(TDescribeTopicActorSettings::GetPartitionsLocation(request.PartitionIds)) +{ +} + + +void TPartitionsLocationActor::Bootstrap(const NActors::TActorContext&) +{ + SendDescribeProposeRequest(); + Become(&TPartitionsLocationActor::StateWork); + SendNodesRequest(); + +} + +void TPartitionsLocationActor::StateWork(TAutoPtr<IEventHandle>& ev) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, Handle); + default: + if (!TDescribeTopicActorImpl::StateWork(ev, ActorContext())) { + TBase::StateWork(ev); + }; + } +} + +void TPartitionsLocationActor::HandleCacheNavigateResponse( + TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev +) { + if (!TBase::HandleCacheNavigateResponseBase(ev)) { + return; + } + ProcessTablets(PQGroupInfo->Description, this->ActorContext()); +} + +bool TPartitionsLocationActor::ApplyResponse( + TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& +) { + const auto& record = ev->Get()->Record; + for (auto i = 0u; i < record.LocationsSize(); i++) { + const auto& part = record.GetLocations(i); + TEvPQProxy::TPartitionLocationInfo partLocation; + ui64 nodeId = part.GetNodeId(); + + partLocation.PartitionId = part.GetPartitionId(); + partLocation.IncGeneration = part.GetGeneration() + 1; + partLocation.NodeId = nodeId; + Response->Partitions.emplace_back(std::move(partLocation)); + } + if (GotNodesInfo) + Finalize(); + else + GotPartitions = true; + return true; +} + +void TPartitionsLocationActor::SendNodesRequest() const { + auto* icEv = new TEvICNodesInfoCache::TEvGetAllNodesInfoRequest(); + ActorContext().Send(CreateICNodesInfoCacheServiceId(), icEv); + } + +void TPartitionsLocationActor::Handle(TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev) { + NodesInfoEv = ev; + if (GotPartitions) + Finalize(); + else + GotNodesInfo = true; +} + +void TPartitionsLocationActor::Finalize() { + if (Settings.Partitions) { + Y_VERIFY(Response->Partitions.size() == Settings.Partitions.size()); + } else { + Y_VERIFY(Response->Partitions.size() == PQGroupInfo->Description.PartitionsSize()); + } + for (auto& pInResponse : Response->Partitions) { + auto iter = NodesInfoEv->Get()->NodeIdsMapping->find(pInResponse.NodeId); + if (iter.IsEnd()) { + return RaiseError( + TStringBuilder() << "Hostname not found for nodeId " << pInResponse.NodeId, + Ydb::PersQueue::ErrorCode::ERROR, + Ydb::StatusIds::INTERNAL_ERROR, ActorContext() + ); + } + pInResponse.Hostname = (*NodesInfoEv->Get()->Nodes)[iter->second].Host; + } + TBase::RespondWithCode(Ydb::StatusIds::SUCCESS); +} + +void TPartitionsLocationActor::RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext&) { + this->AddIssue(FillIssue(error, errorCode)); + this->RespondWithCode(status); +} + +} // namespace NKikimr::NGRpcProxy::V1 diff --git a/ydb/services/persqueue_v1/actors/schema_actors.h b/ydb/services/persqueue_v1/actors/schema_actors.h index 1579ab50274..e5d25dcaee9 100644 --- a/ydb/services/persqueue_v1/actors/schema_actors.h +++ b/ydb/services/persqueue_v1/actors/schema_actors.h @@ -1,13 +1,14 @@ #pragma once #include "events.h" -#include <ydb/services/lib/actors/pq_schema_actor.h> #include <ydb/core/persqueue/events/global.h> +#include <ydb/services/lib/actors/pq_schema_actor.h> +#include <ydb/core/client/server/ic_nodes_cache_service.h> + namespace NKikimr::NGRpcProxy::V1 { using namespace NKikimr::NGRpcService; - class TDropPropose { public: TDropPropose() {} @@ -26,7 +27,7 @@ public: void Bootstrap(const NActors::TActorContext& ctx); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx){ Y_UNUSED(ev); Y_UNUSED(ctx); } + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); } }; class TDropTopicActor : public TPQGrpcSchemaBase<TDropTopicActor, NKikimr::NGRpcService::TEvDropTopicRequest>, public TDropPropose { @@ -39,7 +40,7 @@ public: void Bootstrap(const NActors::TActorContext& ctx); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx){ Y_UNUSED(ev); Y_UNUSED(ctx); } + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); } }; class TPQDescribeTopicActor : public TPQGrpcSchemaBase<TPQDescribeTopicActor, NKikimr::NGRpcService::TEvPQDescribeTopicRequest> @@ -55,7 +56,58 @@ public: void Bootstrap(const NActors::TActorContext& ctx); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); +}; + +struct TDescribeTopicActorSettings { + enum class EMode { + DescribeTopic, + DescribeConsumer, + DescribePartitions, + }; + EMode Mode; + TString Consumer; + TVector<ui32> Partitions; + bool RequireStats = false; + bool RequireLocation = false; + + TDescribeTopicActorSettings() + : Mode(EMode::DescribeTopic) + {} + + TDescribeTopicActorSettings(const TString& consumer) + : Mode(EMode::DescribeConsumer) + , Consumer(consumer) + {} + TDescribeTopicActorSettings(EMode mode, bool requireStats, bool requireLocation) + : Mode(mode) + , RequireStats(requireStats) + , RequireLocation(requireLocation) + {} + + static TDescribeTopicActorSettings DescribeTopic(bool requireStats, bool requireLocation) { + return TDescribeTopicActorSettings{EMode::DescribeTopic, requireStats, requireLocation}; + } + + static TDescribeTopicActorSettings DescribeConsumer(const TString& consumer, bool requireStats, bool requireLocation) + { + TDescribeTopicActorSettings res{EMode::DescribeConsumer, requireStats, requireLocation}; + res.Consumer = consumer; + return res; + } + + static TDescribeTopicActorSettings GetPartitionsLocation(const TVector<ui32>& partitions) { + TDescribeTopicActorSettings res{EMode::DescribePartitions, false, true}; + res.Partitions = partitions; + return res; + } + + static TDescribeTopicActorSettings DescribePartitionSettings(ui32 partition, bool stats, bool location) { + TDescribeTopicActorSettings res{EMode::DescribePartitions, stats, location}; + res.Partitions = {partition}; + return res; + } + }; class TDescribeTopicActorImpl @@ -67,9 +119,15 @@ protected: TActorId Pipe; ui32 NodeId = 0; ui32 RetriesLeft = 3; + + TTabletInfo() = default; + TTabletInfo(ui64 tabletId) + : TabletId(tabletId) + {} }; + public: - TDescribeTopicActorImpl(const TString& consumer); + TDescribeTopicActorImpl(const TDescribeTopicActorSettings& settings); virtual ~TDescribeTopicActorImpl() = default; void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext& ctx); @@ -77,36 +135,49 @@ public: void Handle(NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx); void Handle(NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx); + void Handle(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx); void Handle(TEvPQProxy::TEvRequestTablet::TPtr& ev, const TActorContext& ctx); + void HandleWakeup(TEvents::TEvWakeup::TPtr& ev, const TActorContext& ctx); + bool ProcessTablets(const NKikimrSchemeOp::TPersQueueGroupDescription& description, const TActorContext& ctx); void RequestTablet(TTabletInfo& tablet, const TActorContext& ctx); void RequestTablet(ui64 tabletId, const TActorContext& ctx); void RestartTablet(ui64 tabletId, const TActorContext& ctx, TActorId pipe = {}, const TDuration& delay = TDuration::Zero()); void RequestAdditionalInfo(const TActorContext& ctx); + void RequestBalancer(const TActorContext& ctx); + void RequestPartitionsLocationIfRequired(const TActorContext& ctx); + void CheckCloseBalancerPipe(const TActorContext& ctx); bool StateWork(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx); - void Bootstrap(const NActors::TActorContext& ctx); + virtual void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) = 0; - virtual void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) = 0; + virtual void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, + const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) = 0; + virtual void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, + const TActorContext& ctx) = 0; + virtual void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, + const TActorContext& ctx) = 0; + virtual bool ApplyResponse(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr&, const TActorContext&) = 0; - virtual void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) = 0; - virtual void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) = 0; - virtual void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) = 0; virtual void Reply(const TActorContext& ctx) = 0; private: std::map<ui64, TTabletInfo> Tablets; ui32 RequestsInfly = 0; + bool PendingLocation = false; + bool GotLocation = false; ui64 BalancerTabletId = 0; + TActorId* BalancerPipe = nullptr; protected: - TString Consumer; + ui32 TotalPartitions = 0; + TDescribeTopicActorSettings Settings; }; class TDescribeTopicActor : public TPQGrpcSchemaBase<TDescribeTopicActor, NKikimr::NGRpcService::TEvDescribeTopicRequest> @@ -127,9 +198,10 @@ public: void StateWork(TAutoPtr<IEventHandle>& ev); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) override; + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override; void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override; void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) override; + bool ApplyResponse(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) override; virtual void Reply(const TActorContext& ctx) override; private: @@ -154,16 +226,45 @@ public: void StateWork(TAutoPtr<IEventHandle>& ev); void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) override; - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) override; + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override; void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override; void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) override; + bool ApplyResponse(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) override; virtual void Reply(const TActorContext& ctx) override; private: Ydb::Topic::DescribeConsumerResult Result; }; +class TDescribePartitionActor : public TPQGrpcSchemaBase<TDescribePartitionActor, NKikimr::NGRpcService::TEvDescribePartitionRequest> + , public TDescribeTopicActorImpl +{ +using TBase = TPQGrpcSchemaBase<TDescribePartitionActor, NKikimr::NGRpcService::TEvDescribePartitionRequest>; +using TTabletInfo = TDescribeTopicActorImpl::TTabletInfo; + +public: + TDescribePartitionActor(NKikimr::NGRpcService::TEvDescribePartitionRequest* request); + TDescribePartitionActor(NKikimr::NGRpcService::IRequestOpCtx * ctx); + + ~TDescribePartitionActor() = default; + void Bootstrap(const NActors::TActorContext& ctx); + + void StateWork(TAutoPtr<IEventHandle>& ev); + + void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, + const Ydb::StatusIds::StatusCode status, const TActorContext& ctx) override; + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override; + void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext& ctx) override; + void ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvReadSessionsInfoResponse::TPtr& ev, const TActorContext& ctx) override; + bool ApplyResponse(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) override; + + virtual void Reply(const TActorContext& ctx) override; + +private: + TIntrusiveConstPtr<NSchemeCache::TSchemeCacheNavigate::TPQGroupInfo> PQGroupInfo; + Ydb::Topic::DescribePartitionResult Result; +}; class TAddReadRuleActor : public TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest> , public TCdcStreamCompatible @@ -173,7 +274,7 @@ class TAddReadRuleActor : public TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddR public: TAddReadRuleActor(NKikimr::NGRpcService::TEvPQAddReadRuleRequest *request); - void Bootstrap(const NActors::TActorContext &ctx); + void Bootstrap(const NActors::TActorContext& ctx); void ModifyPersqueueConfig(const TActorContext& ctx, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, @@ -197,7 +298,7 @@ public: class TPQCreateTopicActor : public TPQGrpcSchemaBase<TPQCreateTopicActor, NKikimr::NGRpcService::TEvPQCreateTopicRequest> { -using TBase = TPQGrpcSchemaBase<TPQCreateTopicActor, TEvPQCreateTopicRequest>; + using TBase = TPQGrpcSchemaBase<TPQCreateTopicActor, TEvPQCreateTopicRequest>; public: TPQCreateTopicActor(NKikimr::NGRpcService::TEvPQCreateTopicRequest* request, const TString& localCluster, const TVector<TString>& clusters); @@ -208,7 +309,7 @@ public: void Bootstrap(const NActors::TActorContext& ctx); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx){ Y_UNUSED(ev); Y_UNUSED(ctx); } + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); } private: TString LocalCluster; @@ -217,7 +318,7 @@ private: class TCreateTopicActor : public TPQGrpcSchemaBase<TCreateTopicActor, NKikimr::NGRpcService::TEvCreateTopicRequest> { -using TBase = TPQGrpcSchemaBase<TCreateTopicActor, TEvCreateTopicRequest>; + using TBase = TPQGrpcSchemaBase<TCreateTopicActor, TEvCreateTopicRequest>; public: TCreateTopicActor(NKikimr::NGRpcService::TEvCreateTopicRequest* request, const TString& localCluster, const TVector<TString>& clusters); @@ -229,7 +330,7 @@ public: void Bootstrap(const NActors::TActorContext& ctx); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx){ Y_UNUSED(ev); Y_UNUSED(ctx); } + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); } private: TString LocalCluster; @@ -238,7 +339,7 @@ private: class TPQAlterTopicActor : public TPQGrpcSchemaBase<TPQAlterTopicActor, NKikimr::NGRpcService::TEvPQAlterTopicRequest> { -using TBase = TPQGrpcSchemaBase<TPQAlterTopicActor, TEvPQAlterTopicRequest>; + using TBase = TPQGrpcSchemaBase<TPQAlterTopicActor, TEvPQAlterTopicRequest>; public: TPQAlterTopicActor(NKikimr::NGRpcService::TEvPQAlterTopicRequest* request, const TString& localCluster); @@ -249,7 +350,7 @@ public: void Bootstrap(const NActors::TActorContext& ctx); - void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx){ Y_UNUSED(ev); Y_UNUSED(ctx); } + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev){ Y_UNUSED(ev); } private: TString LocalCluster; @@ -265,7 +366,7 @@ public: TAlterTopicActor(NKikimr::NGRpcService::TEvAlterTopicRequest *request); TAlterTopicActor(NKikimr::NGRpcService::IRequestOpCtx* request); - void Bootstrap(const NActors::TActorContext &ctx); + void Bootstrap(const NActors::TActorContext& ctx); void ModifyPersqueueConfig(const TActorContext& ctx, NKikimrSchemeOp::TPersQueueGroupDescription& groupConfig, const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, @@ -273,4 +374,49 @@ public: }; -} +class TPartitionsLocationActor : public TPQInternalSchemaActor<TPartitionsLocationActor, + TGetPartitionsLocationRequest, + TEvPQProxy::TEvPartitionLocationResponse> + , public TDescribeTopicActorImpl { + +using TBase = TPQInternalSchemaActor<TPartitionsLocationActor, TGetPartitionsLocationRequest, + TEvPQProxy::TEvPartitionLocationResponse>; + +public: + TPartitionsLocationActor(const TGetPartitionsLocationRequest& request, const TActorId& requester); + + ~TPartitionsLocationActor() = default; + + void Bootstrap(const NActors::TActorContext& ctx) override; + + void StateWork(TAutoPtr<IEventHandle>& ev); + + void HandleCacheNavigateResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) override; + void ApplyResponse(TTabletInfo&, + NKikimr::TEvPersQueue::TEvStatusResponse::TPtr&, + const TActorContext&) override { + Y_FAIL(); + } + virtual void ApplyResponse(TTabletInfo&, TEvPersQueue::TEvReadSessionsInfoResponse::TPtr&, + const TActorContext&) override { + Y_FAIL(); + } + + void Finalize(); + + bool ApplyResponse(TEvPersQueue::TEvGetPartitionsLocationResponse::TPtr& ev, const TActorContext& ctx) override; + void Reply(const TActorContext&) override {}; + + void Handle(NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr& ev); + void RaiseError(const TString& error, const Ydb::PersQueue::ErrorCode::ErrorCode errorCode, const Ydb::StatusIds::StatusCode status, const TActorContext&) override; +private: + void SendNodesRequest() const; + + NIcNodeCache::TEvICNodesInfoCache::TEvGetAllNodesInfoResponse::TPtr NodesInfoEv; + THashSet<ui64> PartitionIds; + + bool GotPartitions = false; + bool GotNodesInfo = false; +}; + +} // namespace NKikimr::NGRpcProxy::V1 diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.cpp b/ydb/services/persqueue_v1/grpc_pq_schema.cpp index dae9a44de5e..95693c0aad1 100644 --- a/ydb/services/persqueue_v1/grpc_pq_schema.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_schema.cpp @@ -141,6 +141,11 @@ void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest: ctx.Register(new TDescribeConsumerActor(ev->Release().Release())); } +void TPQSchemaService::Handle(NKikimr::NGRpcService::TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx) { + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, "new Describe partition request"); + ctx.Register(new TDescribePartitionActor(ev->Release().Release())); +} + } namespace NKikimr { @@ -178,6 +183,10 @@ void TGRpcRequestProxyHandleMethods::Handle(TEvDescribeTopicRequest::TPtr& ev, c ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } +void TGRpcRequestProxyHandleMethods::Handle(TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx) { + ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); +} + void NKikimr::NGRpcService::TGRpcRequestProxyHandleMethods::Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx) { ctx.Send(NKikimr::NGRpcProxy::V1::GetPQSchemaServiceActorID(), ev->Release().Release()); } @@ -201,6 +210,7 @@ void TGRpcRequestProxyHandleMethods::Handle(TEvPQRemoveReadRuleRequest::TPtr& ev DECLARE_RPC(DescribeTopic); DECLARE_RPC(DescribeConsumer); +DECLARE_RPC(DescribePartition); #undef DECLARE_RPC diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.h b/ydb/services/persqueue_v1/grpc_pq_schema.h index 58ba33eb587..4fdba0955a1 100644 --- a/ydb/services/persqueue_v1/grpc_pq_schema.h +++ b/ydb/services/persqueue_v1/grpc_pq_schema.h @@ -42,6 +42,7 @@ private: HFunc(NKikimr::NGRpcService::TEvAlterTopicRequest, Handle); HFunc(NKikimr::NGRpcService::TEvDescribeTopicRequest, Handle); HFunc(NKikimr::NGRpcService::TEvDescribeConsumerRequest, Handle); + HFunc(NKikimr::NGRpcService::TEvDescribePartitionRequest, Handle); hFunc(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate, Handle); } } @@ -58,6 +59,7 @@ private: void Handle(NKikimr::NGRpcService::TEvAlterTopicRequest::TPtr& ev, const TActorContext& ctx); void Handle(NKikimr::NGRpcService::TEvDescribeTopicRequest::TPtr& ev, const TActorContext& ctx); void Handle(NKikimr::NGRpcService::TEvDescribeConsumerRequest::TPtr& ev, const TActorContext& ctx); + void Handle(NKikimr::NGRpcService::TEvDescribePartitionRequest::TPtr& ev, const TActorContext& ctx); void Handle(NPQ::NClusterTracker::TEvClusterTracker::TEvClustersUpdate::TPtr& ev); diff --git a/ydb/services/persqueue_v1/topic.cpp b/ydb/services/persqueue_v1/topic.cpp index a74b98de949..42d1a31946e 100644 --- a/ydb/services/persqueue_v1/topic.cpp +++ b/ydb/services/persqueue_v1/topic.cpp @@ -115,6 +115,9 @@ void TGRpcTopicService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { ADD_REQUEST(DescribeConsumer, TopicService, DescribeConsumerRequest, DescribeConsumerResponse, { ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribeConsumerRequest(ctx, IsRlAllowed())); }) + ADD_REQUEST(DescribePartition, TopicService, DescribePartitionRequest, DescribePartitionResponse, { + ActorSystem_->Send(GRpcRequestProxyId_, new NGRpcService::TEvDescribePartitionRequest(ctx, IsRlAllowed())); + }) #undef ADD_REQUEST #ifdef ADD_REQUEST_LIMIT diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt index 872b8eb58f9..fc7ec1c7c30 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(describes_ut) add_subdirectory(new_schemecache_ut) add_executable(ydb-services-persqueue_v1-ut) diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt index ca936d0636a..7e21a17b083 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(describes_ut) add_subdirectory(new_schemecache_ut) add_executable(ydb-services-persqueue_v1-ut) diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt index 7a009a2e332..657b3aae640 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.linux-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(describes_ut) add_subdirectory(new_schemecache_ut) add_executable(ydb-services-persqueue_v1-ut) diff --git a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt index 7f2067278e5..3e155ee8ccf 100644 --- a/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/services/persqueue_v1/ut/CMakeLists.windows-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(describes_ut) add_subdirectory(new_schemecache_ut) add_executable(ydb-services-persqueue_v1-ut) diff --git a/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.darwin-x86_64.txt b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..2933016185f --- /dev/null +++ b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,81 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-persqueue_v1-ut-describes_ut) +target_compile_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1 +) +target_link_libraries(ydb-services-persqueue_v1-ut-describes_ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-persqueue_v1 + core-testlib-default + core-client-server + ydb_persqueue_core-ut-ut_utils +) +target_link_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp +) +set_property( + TARGET + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-services-persqueue_v1-ut-describes_ut + TEST_TARGET + ydb-services-persqueue_v1-ut-describes_ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + TIMEOUT + 300 +) +target_allocator(ydb-services-persqueue_v1-ut-describes_ut + system_allocator +) +vcs_info(ydb-services-persqueue_v1-ut-describes_ut) diff --git a/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-aarch64.txt b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..733eac0df59 --- /dev/null +++ b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,84 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-persqueue_v1-ut-describes_ut) +target_compile_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1 +) +target_link_libraries(ydb-services-persqueue_v1-ut-describes_ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-testing-unittest_main + ydb-services-persqueue_v1 + core-testlib-default + core-client-server + ydb_persqueue_core-ut-ut_utils +) +target_link_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp +) +set_property( + TARGET + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-services-persqueue_v1-ut-describes_ut + TEST_TARGET + ydb-services-persqueue_v1-ut-describes_ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + TIMEOUT + 300 +) +target_allocator(ydb-services-persqueue_v1-ut-describes_ut + cpp-malloc-jemalloc +) +vcs_info(ydb-services-persqueue_v1-ut-describes_ut) diff --git a/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-x86_64.txt b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..fb655893e2f --- /dev/null +++ b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.linux-x86_64.txt @@ -0,0 +1,86 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-persqueue_v1-ut-describes_ut) +target_compile_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1 +) +target_link_libraries(ydb-services-persqueue_v1-ut-describes_ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-persqueue_v1 + core-testlib-default + core-client-server + ydb_persqueue_core-ut-ut_utils +) +target_link_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp +) +set_property( + TARGET + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-services-persqueue_v1-ut-describes_ut + TEST_TARGET + ydb-services-persqueue_v1-ut-describes_ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + TIMEOUT + 300 +) +target_allocator(ydb-services-persqueue_v1-ut-describes_ut + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(ydb-services-persqueue_v1-ut-describes_ut) diff --git a/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.txt b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.txt new file mode 100644 index 00000000000..f8b31df0c11 --- /dev/null +++ b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.windows-x86_64.txt b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..311d4cc3736 --- /dev/null +++ b/ydb/services/persqueue_v1/ut/describes_ut/CMakeLists.windows-x86_64.txt @@ -0,0 +1,74 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-persqueue_v1-ut-describes_ut) +target_compile_options(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1 +) +target_link_libraries(ydb-services-persqueue_v1-ut-describes_ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-persqueue_v1 + core-testlib-default + core-client-server + ydb_persqueue_core-ut-ut_utils +) +target_sources(ydb-services-persqueue_v1-ut-describes_ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp +) +set_property( + TARGET + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-services-persqueue_v1-ut-describes_ut + TEST_TARGET + ydb-services-persqueue_v1-ut-describes_ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-persqueue_v1-ut-describes_ut + PROPERTY + TIMEOUT + 300 +) +target_allocator(ydb-services-persqueue_v1-ut-describes_ut + system_allocator +) +vcs_info(ydb-services-persqueue_v1-ut-describes_ut) diff --git a/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp new file mode 100644 index 00000000000..382aee6829f --- /dev/null +++ b/ydb/services/persqueue_v1/ut/describes_ut/describe_topic_ut.cpp @@ -0,0 +1,299 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <ydb/core/client/server/ic_nodes_cache_service.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h> +#include <ydb/services/persqueue_v1/ut/test_utils.h> +#include <ydb/services/persqueue_v1/actors/schema_actors.h> +#include <ydb/core/client/server/ic_nodes_cache_service.h> + + +namespace NKikimr::NPersQueueTests { + +using namespace NKikimr::NGRpcProxy::V1; +using namespace NIcNodeCache; + +const static TString topicName = "rt3.dc1--topic-x"; + +class TDescribeTestServer { +public: + NPersQueue::TTestServer Server; + bool UseBadTopic = false; + + TDescribeTestServer(ui32 partsCount = 15) + : Server() + , PartsCount(partsCount) + { + Server.EnableLogs({ NKikimrServices::KQP_PROXY }, NActors::NLog::PRI_EMERG); + Server.EnableLogs({ NKikimrServices::PERSQUEUE, NKikimrServices::PQ_METACACHE }, NActors::NLog::PRI_INFO); + Server.EnableLogs({ NKikimrServices::PERSQUEUE_CLUSTER_TRACKER }, NActors::NLog::PRI_INFO); + + Server.AnnoyingClient->CreateTopicNoLegacy(topicName, partsCount); + Channel = grpc::CreateChannel( + "localhost:" + ToString(Server.GrpcPort), grpc::InsecureChannelCredentials() + ); + Stub = Ydb::Topic::V1::TopicService::NewStub(Channel); + + } + + TTestActorRuntime* GetRuntime() const { + return Server.CleverServer->GetRuntime(); + } + bool DescribePartition(ui32 partId, bool askLocation, bool askStats, bool mayFail = false) { + grpc::ClientContext rcontext; + Ydb::Topic::DescribePartitionRequest request; + Ydb::Topic::DescribePartitionResponse response; + Ydb::Topic::DescribePartitionResult result; + request.set_path(JoinPath({"/Root/PQ/", UseBadTopic ? "bad-topic" : topicName})); + request.set_partition_id(partId); + if (askLocation) + request.set_include_location(true); + if (askStats) + request.set_include_stats(true); + + Stub->DescribePartition(&rcontext, request, &response); + Cerr << "Got response: " << response.DebugString() << Endl; + if (UseBadTopic) { + UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SCHEME_ERROR); + return true; + } + if (partId < PartsCount) { + UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SUCCESS); + } else { + UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::BAD_REQUEST); + NYql::TIssues opIssues; + NYql::IssuesFromMessage(response.operation().issues(), opIssues); + TString expectedError= TStringBuilder() << "No partition " << partId << " in topic"; + UNIT_ASSERT(opIssues.ToOneLineString().Contains(expectedError)); + return true; + } + + auto unpackOk = response.operation().result().UnpackTo(&result); + UNIT_ASSERT(unpackOk); + + if (askStats) { + UNIT_ASSERT(result.partition().has_partition_stats()); + } + UNIT_ASSERT_VALUES_EQUAL(result.partition().partition_id(), partId); + if (askLocation) { + UNIT_ASSERT(result.partition().has_partition_location()); + UNIT_ASSERT(result.partition().partition_location().node_id() > 0); + auto genComparizon = (result.partition().partition_location().generation() > 0); + if (mayFail) { + return genComparizon; + } else { + UNIT_ASSERT_C(genComparizon, response.DebugString().c_str()); + } + } + return true; + } + void DescribeTopic(bool askStats, bool askLocation) { + grpc::ClientContext rcontext; + Ydb::Topic::DescribeTopicRequest request; + Ydb::Topic::DescribeTopicResponse response; + Ydb::Topic::DescribeTopicResult result; + request.set_path(JoinPath({"/Root/PQ/", UseBadTopic ? "bad-topic" : topicName})); + if (askStats) + request.set_include_stats(true); + if (askLocation) + request.set_include_location(true); + + Stub->DescribeTopic(&rcontext, request, &response); + Cerr << "Got response: " << response.DebugString() << Endl; + + if (UseBadTopic) { + UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SCHEME_ERROR); + return; + } + UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SUCCESS); + + auto unpackOk = response.operation().result().UnpackTo(&result); + UNIT_ASSERT(unpackOk); + if (askStats) { + UNIT_ASSERT(result.has_topic_stats()); + } else { + UNIT_ASSERT(!result.has_topic_stats()); + } + UNIT_ASSERT_VALUES_EQUAL(result.partitions_size(), PartsCount); + for (const auto& p : result.partitions()) { + UNIT_ASSERT(askStats == p.has_partition_stats()); + UNIT_ASSERT(askLocation == p.has_partition_location()); + if (askLocation) { + UNIT_ASSERT(p.partition_location().node_id() > 0); + } + } + } + void DescribeConsumer(const TString& consumerName, bool askStats, bool askLocation) { + grpc::ClientContext rcontext; + Ydb::Topic::DescribeConsumerRequest request; + Ydb::Topic::DescribeConsumerResponse response; + Ydb::Topic::DescribeConsumerResult result; + request.set_path(JoinPath({"/Root/PQ/", UseBadTopic ? "bad-topic" : topicName})); + if (askStats) + request.set_include_stats(true); + if (askLocation) + request.set_include_location(true); + request.set_consumer(consumerName); + Stub->DescribeConsumer(&rcontext, request, &response); + Cerr << "Got response: " << response.DebugString() << Endl; + + if (UseBadTopic) { + UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SCHEME_ERROR); + return; + } + UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SUCCESS); + + auto unpackOk = response.operation().result().UnpackTo(&result); + UNIT_ASSERT(unpackOk); + UNIT_ASSERT_VALUES_EQUAL(result.partitions_size(), PartsCount); + for (const auto& p : result.partitions()) { + UNIT_ASSERT(askStats == p.has_partition_stats()); + UNIT_ASSERT(askStats == p.has_partition_consumer_stats()); + UNIT_ASSERT(askLocation == p.has_partition_location()); + if (askLocation) { + UNIT_ASSERT(p.partition_location().node_id() > 0); + } + } + } + void AddConsumer(const TString& consumer) { + Ydb::Topic::AlterTopicRequest request; + request.set_path(TStringBuilder() << "/Root/PQ/" << topicName); + + auto addConsumer = request.add_add_consumers(); + addConsumer->set_name(consumer); + addConsumer->set_important(true); + grpc::ClientContext rcontext; + Ydb::Topic::AlterTopicResponse response; + Stub->AlterTopic(&rcontext, request, &response); + UNIT_ASSERT(response.operation().status() == Ydb::StatusIds::SUCCESS); + } + +private: + std::shared_ptr<grpc::Channel> Channel; + std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> Stub; + ui32 PartsCount; +}; + +Y_UNIT_TEST_SUITE(TTopicApiDescribes) { + void KillTopicTablets(NPersQueue::TTestServer& server, const TString& topicName) { + auto pqGroup = server.AnnoyingClient->Ls(TString("/Root/PQ/" + topicName))->Record.GetPathDescription() + .GetPersQueueGroup(); + + THashSet<ui64> tablets; + for (const auto& p : pqGroup.GetPartitions()) { + auto res = tablets.insert(p.GetTabletId()); + if (res.second) { + server.AnnoyingClient->KillTablet(*server.CleverServer, p.GetTabletId()); + } + } + server.CleverServer->GetRuntime()->DispatchEvents(); + } + + Y_UNIT_TEST(GetLocalDescribe) { + TDescribeTestServer server{}; + auto* runtime = server.GetRuntime(); + + runtime->GetAppData().FeatureFlags.SetEnableIcNodeCache(true); + runtime->SetUseRealInterconnect(); + const auto edge = runtime->AllocateEdgeActor(); + + TString currentTopicName = topicName; + auto getDescribe = [&] (const TVector<ui32>& parts, bool fails = false) { + auto partitionActor = runtime->Register(new TPartitionsLocationActor( + TGetPartitionsLocationRequest{TString("/Root/PQ/") + currentTopicName, "", "", parts}, edge + )); + runtime->EnableScheduleForActor(partitionActor); + runtime->DispatchEvents(); + auto ev = runtime->GrabEdgeEvent<TEvPQProxy::TEvPartitionLocationResponse>(); + if (currentTopicName != topicName) { + UNIT_ASSERT_VALUES_EQUAL(ev->Status, Ydb::StatusIds::SCHEME_ERROR); + return ev; + } + if (fails) { + UNIT_ASSERT_VALUES_EQUAL(ev->Status, Ydb::StatusIds::BAD_REQUEST); + return ev; + } + UNIT_ASSERT_C(ev->Status == Ydb::StatusIds::SUCCESS, ev->Issues.ToString()); + UNIT_ASSERT_VALUES_EQUAL(ev->Partitions.size(), parts ? parts.size() : 15); + return ev; + + }; + + auto ev = getDescribe({}); + + THashSet<ui64> allParts; + for (const auto& p : ev->Partitions) { + UNIT_ASSERT(!p.Hostname.Empty()); + UNIT_ASSERT(p.NodeId > 0); + UNIT_ASSERT(p.IncGeneration > 0); + UNIT_ASSERT(p.PartitionId < 15); + allParts.insert(p.PartitionId); + } + UNIT_ASSERT_VALUES_EQUAL(allParts.size(), 15); + + ev = getDescribe({1, 3, 5}); + allParts.clear(); + for (const auto& p : ev->Partitions) { + auto res = allParts.insert(p.PartitionId); + UNIT_ASSERT(res.second); + UNIT_ASSERT(p.PartitionId == 1 || p.PartitionId == 3 || p.PartitionId == 5); + } + + getDescribe({1000}, true); + currentTopicName = "bad-topic"; + getDescribe({}, true); + } + Y_UNIT_TEST(GetPartitionDescribe) { + ui32 partsCount = 15; + TDescribeTestServer server(partsCount); + auto* runtime = server.GetRuntime(); + runtime->SetRegistrationObserverFunc( + [&](TTestActorRuntimeBase& runtime, const TActorId& parentId, const TActorId& actorId) { + runtime.EnableScheduleForActor(parentId); + runtime.EnableScheduleForActor(actorId); + } + ); + + server.DescribePartition(150, true, false); + server.DescribePartition(2, false, true); + server.DescribePartition(0, false, false); + KillTopicTablets(server.Server, topicName); + bool checkRes = server.DescribePartition(1, true, false, true); + + if (!checkRes) { + KillTopicTablets(server.Server, topicName); + server.DescribePartition(1, true, false); + } + server.DescribePartition(3, true, true); + server.UseBadTopic = true; + server.DescribePartition(0, true, true); + + } + Y_UNIT_TEST(DescribeTopic) { + auto server = TDescribeTestServer(); + Cerr << "Describe topic with stats and location\n"; + server.DescribeTopic(true, true); + KillTopicTablets(server.Server, topicName); + Cerr << "Describe topic with stats\n"; + server.DescribeTopic(true, false); + Cerr << "Describe topic with location\n"; + server.DescribeTopic(false, true); + Cerr << "Describe topic with no stats or location\n"; + server.DescribeTopic(false, false); + server.UseBadTopic = true; + Cerr << "Describe bad topic\n"; + server.DescribeTopic(true, true); + } + Y_UNIT_TEST(DescribeConsumer) { + auto server = TDescribeTestServer(); + server.AddConsumer("my-consumer"); + server.DescribeConsumer("my-consumer", true, true); + KillTopicTablets(server.Server, topicName); + server.DescribeConsumer("my-consumer",true, false); + server.DescribeConsumer("my-consumer",false, true); + server.DescribeConsumer("my-consumer",false, false); + server.UseBadTopic = true; + server.DescribeConsumer("my-consumer",true, true); + } +} + +} // namespace diff --git a/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp b/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp new file mode 100644 index 00000000000..5fbd9a4723f --- /dev/null +++ b/ydb/services/persqueue_v1/ut/describes_ut/ic_cache_ut.cpp @@ -0,0 +1,35 @@ +#include <library/cpp/testing/unittest/registar.h> + +#include <ydb/core/client/server/ic_nodes_cache_service.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils/test_server.h> +#include <ydb/services/persqueue_v1/ut/test_utils.h> + + +namespace NKikimr::NPersQueueTests { + +using namespace NIcNodeCache; + +Y_UNIT_TEST_SUITE(TIcNodeCache) { + Y_UNIT_TEST(GetNodesInfoTest) { + NPersQueue::TTestServer server; + auto* runtime = server.CleverServer->GetRuntime(); + runtime->GetAppData().FeatureFlags.SetEnableIcNodeCache(true); + + const auto edge = runtime->AllocateEdgeActor(); + runtime->RegisterService(CreateICNodesInfoCacheServiceId(), runtime->Register( + CreateICNodesInfoCacheService(nullptr, TDuration::Seconds(1))) + ); + auto makeRequest = [&]() { + auto* request = new TEvICNodesInfoCache::TEvGetAllNodesInfoRequest(); + runtime->Send(CreateICNodesInfoCacheServiceId(), edge, request); + auto ev = runtime->GrabEdgeEvent<TEvICNodesInfoCache::TEvGetAllNodesInfoResponse>(); + UNIT_ASSERT_VALUES_EQUAL(ev->Nodes->size(), 2); + }; + makeRequest(); + for (auto i = 0u; i < 6; i++) { + Sleep(TDuration::MilliSeconds(500)); + makeRequest(); + } + } +}; +} // NKikimr::NPersQueueTests
\ No newline at end of file diff --git a/ydb/services/persqueue_v1/ut/describes_ut/ya.make b/ydb/services/persqueue_v1/ut/describes_ut/ya.make new file mode 100644 index 00000000000..94f0ad74cde --- /dev/null +++ b/ydb/services/persqueue_v1/ut/describes_ut/ya.make @@ -0,0 +1,29 @@ +UNITTEST_FOR(ydb/services/persqueue_v1) + +FORK_SUBTESTS() + +IF (SANITIZER_TYPE OR WITH_VALGRIND) + TIMEOUT(1200) + SIZE(LARGE) + TAG(ya:fat) + REQUIREMENTS(ram:32) +ELSE() + TIMEOUT(300) + SIZE(MEDIUM) +ENDIF() + +SRCS( + ic_cache_ut.cpp + describe_topic_ut.cpp +) + +PEERDIR( + ydb/core/testlib/default + ydb/core/client/server + ydb/services/persqueue_v1 + ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/ut_utils +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/services/persqueue_v1/ya.make b/ydb/services/persqueue_v1/ya.make index aeb1026c03a..f88136af9cd 100644 --- a/ydb/services/persqueue_v1/ya.make +++ b/ydb/services/persqueue_v1/ya.make @@ -46,4 +46,5 @@ RECURSE( RECURSE_FOR_TESTS( ut ut/new_schemecache_ut + ut/describes_ut ) |