diff options
author | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-03-27 21:49:53 +0300 |
---|---|---|
committer | Ilnaz Nizametdinov <i.nizametdinov@gmail.com> | 2022-03-27 21:49:53 +0300 |
commit | 760769f65e5243098c203338ffbae04aae57d61d (patch) | |
tree | 36e69b83fe61482d0ce2977d948d59eda4c69921 | |
parent | eba7afc473d087b21d8bfdfc1398f50fd8452aaf (diff) | |
download | ydb-760769f65e5243098c203338ffbae04aae57d61d.tar.gz |
PQ & YDS readable logs, tests KIKIMR-14198
ref:b4144cad998d0d2bd0f953e0c03a2456fd819164
28 files changed, 594 insertions, 128 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 26d76167eb..b4969035b3 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -574,6 +574,7 @@ add_subdirectory(ydb/public/lib/deprecated/client) add_subdirectory(ydb/public/lib/value) add_subdirectory(ydb/library/yql/dq/actors/compute) add_subdirectory(ydb/library/yql/dq/tasks) +add_subdirectory(ydb/services/lib/sharding) add_subdirectory(ydb/core/tx/long_tx_service/public) add_subdirectory(ydb/core/yq/libs/actors) add_subdirectory(ydb/core/yq/libs/actors/logging) @@ -752,7 +753,6 @@ add_subdirectory(ydb/services/persqueue_v1) add_subdirectory(ydb/library/persqueue/tests) add_subdirectory(ydb/services/lib/actors) add_subdirectory(ydb/public/lib/jwt) -add_subdirectory(ydb/services/lib/sharding) add_subdirectory(ydb/core/cms) add_subdirectory(ydb/core/driver_lib/base_utils) add_subdirectory(ydb/core/driver_lib/cli_config_base) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index a80add5dfc..3fba0407c0 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -654,6 +654,7 @@ add_subdirectory(ydb/public/lib/deprecated/client) add_subdirectory(ydb/public/lib/value) add_subdirectory(ydb/library/yql/dq/actors/compute) add_subdirectory(ydb/library/yql/dq/tasks) +add_subdirectory(ydb/services/lib/sharding) add_subdirectory(ydb/core/tx/long_tx_service/public) add_subdirectory(ydb/core/yq/libs/actors) add_subdirectory(ydb/core/yq/libs/actors/logging) @@ -832,7 +833,6 @@ add_subdirectory(ydb/services/persqueue_v1) add_subdirectory(ydb/library/persqueue/tests) add_subdirectory(ydb/services/lib/actors) add_subdirectory(ydb/public/lib/jwt) -add_subdirectory(ydb/services/lib/sharding) add_subdirectory(ydb/core/cms) add_subdirectory(ydb/core/driver_lib/base_utils) add_subdirectory(ydb/core/driver_lib/cli_config_base) diff --git a/ydb/core/base/path.h b/ydb/core/base/path.h index c2b93d2baf..402365d473 100644 --- a/ydb/core/base/path.h +++ b/ydb/core/base/path.h @@ -30,4 +30,10 @@ TString CombinePath(TIter begin, TIter end, bool canonize = true) { : path; } +inline TVector<TString> ChildPath(const TVector<TString>& parentPath, const TString& childName) { + auto path = parentPath; + path.push_back(childName); + return path; +} + } diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp index 6b752a60e8..d27e55b0d0 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp +++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp @@ -215,11 +215,12 @@ private: void HandleDescribeTopics(TEvPqNewMetaCache::TEvDescribeTopicsRequest::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics"); + const auto& msg = *ev->Get(); - SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), false, ev->Get()->SyncVersion, ctx); + SendSchemeCacheRequest(msg.Topics, !msg.PathPrefix.empty(), false, msg.SyncVersion, msg.ShowPrivate, ctx); auto inserted = DescribeTopicsWaiters.insert(std::make_pair( RequestId, - TWaiter{ev->Sender, std::move(ev->Get()->Topics)} + TWaiter{ev->Sender, std::move(msg.Topics)} )).second; Y_VERIFY(inserted); } @@ -252,7 +253,7 @@ private: } if (DescribeAllTopicsWaiters.empty()) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Make full list SC request"); - SendSchemeCacheRequest(CurrentTopics, true, true, false, ctx); + SendSchemeCacheRequest(CurrentTopics, true, true, false, false, ctx); } LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Store waiter"); DescribeAllTopicsWaiters.push(waiter); @@ -262,6 +263,9 @@ private: void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { auto& result = ev->Get()->Request; + LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle SchemeCache response" + << ": result# " << result->ToString(*AppData()->TypeRegistry)); + if (result->Instant == 0) { for (const auto& entry : result->ResultSet) { if (!entry.PQGroupInfo) { @@ -296,10 +300,10 @@ private: } } - - void SendSchemeCacheRequest( - const TVector<TString>& topics, bool addDefaultPathPrefix, bool isFullListingRequest, bool syncVersion, const TActorContext& ctx - ) { + void SendSchemeCacheRequest(const TVector<TString>& topics, + bool addDefaultPathPrefix, bool isFullListingRequest, bool syncVersion, bool showPrivate, + const TActorContext& ctx) + { auto instant = isFullListingRequest ? 0 : ++RequestId; auto schemeCacheRequest = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(instant); for (const auto& path : topics) { @@ -311,7 +315,8 @@ private: entry.Path.insert(entry.Path.end(), split.begin(), split.end()); entry.SyncVersion = syncVersion; - entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTopic; + entry.ShowPrivatePath = showPrivate; + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList; schemeCacheRequest->ResultSet.emplace_back(std::move(entry)); } ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.Release())); diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h index f1d8248ef3..74c9ac1418 100644 --- a/ydb/core/client/server/msgbus_server_pq_metacache.h +++ b/ydb/core/client/server/msgbus_server_pq_metacache.h @@ -66,18 +66,21 @@ struct TEvPqNewMetaCache { TString PathPrefix; TVector<TString> Topics; bool SyncVersion; + bool ShowPrivate = false; TEvDescribeTopicsRequest() = default; - explicit TEvDescribeTopicsRequest(const TVector<TString>& topics, bool syncVersion = true) + explicit TEvDescribeTopicsRequest(const TVector<TString>& topics, bool syncVersion = true, bool showPrivate = false) : Topics(topics) , SyncVersion(syncVersion) + , ShowPrivate(showPrivate) {} - TEvDescribeTopicsRequest(const TVector<TString>& topics, const TString& pathPrefix, bool syncVersion = true) + TEvDescribeTopicsRequest(const TVector<TString>& topics, const TString& pathPrefix, bool syncVersion = true, bool showPrivate = false) : PathPrefix(pathPrefix) , Topics(topics) , SyncVersion(syncVersion) + , ShowPrivate(showPrivate) {} }; diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 385b647c5e..6085b91483 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -1231,6 +1231,7 @@ message TModifyScheme { optional EOperationType OperationType = 2; optional bool Internal = 36 [default = false]; // internal operations are not generated directly by the user optional bool FailOnExist = 50 [default = false]; // as a replacement for TEvModifySchemeTransaction.FailOnExist + optional bool AllowAccessToPrivatePaths = 53 [default = false]; optional TMkDir MkDir = 3; optional TTableDescription CreateTable = 4; diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto index 7c85927449..e0343e2140 100644 --- a/ydb/core/protos/pqconfig.proto +++ b/ydb/core/protos/pqconfig.proto @@ -691,10 +691,16 @@ message TYdsNextToken { } message TYdsShardIterator { + enum ETopicKind { + KIND_COMMON = 0; + KIND_CDC = 1; + } + required string StreamName = 1; required string StreamArn = 2; required uint32 ShardId = 3; required uint64 ReadTimestampMs = 4; required uint32 SequenceNumber = 5; required uint64 CreationTimestampMs = 6; + optional ETopicKind Kind = 7; } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 9623365974..9b0c97c731 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -875,6 +875,11 @@ namespace Tests { return Runtime->GetAppData().FunctionRegistry; } + const NYdb::TDriver& TServer::GetDriver() const { + Y_VERIFY(Driver); + return *Driver; + } + TServer::~TServer() { if (Runtime->GetAppData().Mon) { Runtime->GetAppData().Mon->Stop(); diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index b3673907ae..30f3d13c11 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -240,6 +240,7 @@ namespace Tests { const TServerSettings& GetSettings() const; const NScheme::TTypeRegistry* GetTypeRegistry(); const NMiniKQL::IFunctionRegistry* GetFunctionRegistry(); + const NYdb::TDriver& GetDriver() const; ui32 StaticNodes() const { return Settings->NodeCount; diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt index 842cf3224b..2880f39156 100644 --- a/ydb/core/tx/datashard/CMakeLists.txt +++ b/ydb/core/tx/datashard/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries(core-tx-datashard PUBLIC library-cpp-resource cpp-actors-core cpp-containers-flat_hash + cpp-digest-md5 cpp-html-pcdata library-cpp-json cpp-json-yson @@ -44,6 +45,7 @@ target_link_libraries(core-tx-datashard PUBLIC api-protos lib-deprecated-kicli dq-actors-compute + services-lib-sharding ) target_sources(core-tx-datashard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp @@ -251,6 +253,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC library-cpp-resource cpp-actors-core cpp-containers-flat_hash + cpp-digest-md5 cpp-html-pcdata library-cpp-json cpp-json-yson @@ -277,6 +280,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC api-protos lib-deprecated-kicli dq-actors-compute + services-lib-sharding ) target_sources(core-tx-datashard.global PRIVATE ${CMAKE_BINARY_DIR}/ydb/core/tx/datashard/eacf331f0ffc35d4b482f1d15a887d3b.cpp diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp index cce147d67f..6f0f93e9f5 100644 --- a/ydb/core/tx/datashard/change_record.cpp +++ b/ydb/core/tx/datashard/change_record.cpp @@ -2,6 +2,7 @@ #include "export_common.h" #include <library/cpp/json/json_reader.h> +#include <library/cpp/json/json_writer.h> #include <library/cpp/json/yson/json2yson.h> #include <library/cpp/string_utils/base64/base64.h> @@ -14,34 +15,6 @@ namespace NKikimr { namespace NDataShard { -i64 TChangeRecord::GetSeqNo() const { - Y_VERIFY(Order <= Max<i64>()); - return static_cast<i64>(Order); -} - -TConstArrayRef<TCell> TChangeRecord::GetKey() const { - if (Key) { - return *Key; - } - - switch (Kind) { - case EKind::AsyncIndex: - case EKind::CdcDataChange: { - NKikimrChangeExchange::TChangeRecord::TDataChange parsed; - Y_VERIFY(parsed.ParseFromArray(Body.data(), Body.size())); - - TSerializedCellVec key; - Y_VERIFY(TSerializedCellVec::TryParse(parsed.GetKey().GetData(), key)); - - Key.ConstructInPlace(key.GetCells()); - break; - } - } - - Y_VERIFY(Key); - return *Key; -} - void TChangeRecord::SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const { record.SetOrder(Order); record.SetGroup(Group); @@ -134,7 +107,7 @@ static NJson::TJsonValue ToJson(const TCell& cell, NScheme::TTypeId type) { static void SerializeJsonKey(TUserTable::TCPtr schema, NJson::TJsonValue& key, const NKikimrChangeExchange::TChangeRecord::TDataChange::TSerializedCells& in) { - Y_VERIFY(in.TagsSize() != schema->KeyColumnIds.size()); + Y_VERIFY(in.TagsSize() == schema->KeyColumnIds.size()); for (size_t i = 0; i < schema->KeyColumnIds.size(); ++i) { Y_VERIFY(in.GetTags(i) == schema->KeyColumnIds.at(i)); } @@ -187,20 +160,25 @@ void TChangeRecord::SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value SerializeJsonValue(Schema, value["newImage"], body.GetNewImage()); } - if (!body.HasOldImage() && !body.HasNewImage()) { - switch (body.GetRowOperationCase()) { - case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert: + const auto hasAnyImage = body.HasOldImage() || body.HasNewImage(); + switch (body.GetRowOperationCase()) { + case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert: + value["update"].SetType(NJson::EJsonValueType::JSON_MAP); + if (!hasAnyImage) { SerializeJsonValue(Schema, value["update"], body.GetUpsert()); - break; - case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset: + } + break; + case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset: + value["reset"].SetType(NJson::EJsonValueType::JSON_MAP); + if (!hasAnyImage) { SerializeJsonValue(Schema, value["reset"], body.GetReset()); - break; - case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase: - value["erase"].SetType(NJson::EJsonValueType::JSON_MAP); - break; - default: - Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); - } + } + break; + case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase: + value["erase"].SetType(NJson::EJsonValueType::JSON_MAP); + break; + default: + Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase())); } break; @@ -212,6 +190,62 @@ void TChangeRecord::SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value } } +TConstArrayRef<TCell> TChangeRecord::GetKey() const { + if (Key) { + return *Key; + } + + switch (Kind) { + case EKind::AsyncIndex: + case EKind::CdcDataChange: { + NKikimrChangeExchange::TChangeRecord::TDataChange parsed; + Y_VERIFY(parsed.ParseFromArray(Body.data(), Body.size())); + + TSerializedCellVec key; + Y_VERIFY(TSerializedCellVec::TryParse(parsed.GetKey().GetData(), key)); + + Key.ConstructInPlace(key.GetCells()); + break; + } + } + + Y_VERIFY(Key); + return *Key; +} + +i64 TChangeRecord::GetSeqNo() const { + Y_VERIFY(Order <= Max<i64>()); + return static_cast<i64>(Order); +} + +TString TChangeRecord::GetPartitionKey() const { + if (PartitionKey) { + return *PartitionKey; + } + + switch (Kind) { + case EKind::CdcDataChange: { + Y_VERIFY(Schema); + + NKikimrChangeExchange::TChangeRecord::TDataChange body; + Y_VERIFY(body.ParseFromArray(Body.data(), Body.size())); + + NJson::TJsonValue key; + SerializeJsonKey(Schema, key, body.GetKey()); + + PartitionKey.ConstructInPlace(WriteJson(key, false)); + break; + } + + case EKind::AsyncIndex: { + Y_FAIL("Not supported"); + } + } + + Y_VERIFY(PartitionKey); + return *PartitionKey; +} + TString TChangeRecord::ToString() const { TString result; TStringOutput out(result); diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h index fae0f79373..1ee4259f1b 100644 --- a/ydb/core/tx/datashard/change_record.h +++ b/ydb/core/tx/datashard/change_record.h @@ -36,8 +36,6 @@ public: const TPathId& GetPathId() const { return PathId; } EKind GetKind() const { return Kind; } const TString& GetBody() const { return Body; } - i64 GetSeqNo() const; - TConstArrayRef<TCell> GetKey() const; const TPathId& GetTableId() const { return TableId; } ui64 GetSchemaVersion() const { return SchemaVersion; } @@ -45,6 +43,10 @@ public: void SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const; void SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value) const; + TConstArrayRef<TCell> GetKey() const; + i64 GetSeqNo() const; + TString GetPartitionKey() const; + TString ToString() const; void Out(IOutputStream& out) const; @@ -62,6 +64,7 @@ private: TUserTable::TCPtr Schema; mutable TMaybe<TOwnedCellVec> Key; + mutable TMaybe<TString> PartitionKey; }; // TChangeRecord diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp index 1bc6796dbf..bb2d356fb7 100644 --- a/ydb/core/tx/datashard/change_sender_async_index.cpp +++ b/ydb/core/tx/datashard/change_sender_async_index.cpp @@ -613,11 +613,11 @@ class TAsyncIndexChangeSenderMain: public TActorBootstrapped<TAsyncIndexChangeSe return KeyDesc && KeyDesc->Partitions; } - ui64 GetPartitionId(TConstArrayRef<TCell> key) const override { + ui64 GetPartitionId(const TChangeRecord& record) const override { Y_VERIFY(KeyDesc); Y_VERIFY(KeyDesc->Partitions); - const auto range = TTableRange(key); + const auto range = TTableRange(record.GetKey()); Y_VERIFY(range.Point); TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound( diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 3533a4997d..a526fd7dbe 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -5,12 +5,14 @@ #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/core/log.h> +#include <library/cpp/digest/md5/md5.h> #include <library/cpp/json/json_writer.h> #include <ydb/core/persqueue/partition_key_range/partition_key_range.h> #include <ydb/core/persqueue/writer/source_id_encoding.h> #include <ydb/core/persqueue/writer/writer.h> #include <ydb/core/protos/grpc_pq_old.pb.h> +#include <ydb/services/lib/sharding/sharding.h> namespace NKikimr { namespace NDataShard { @@ -18,7 +20,7 @@ namespace NDataShard { using namespace NPQ; class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderPartition> { - static constexpr auto CodecRaw = 1; + static constexpr auto CodecRaw = 0; TStringBuf GetLogPrefix() const { if (!LogPrefix) { @@ -616,28 +618,43 @@ class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain> return KeyDesc && KeyDesc->Partitions; } - ui64 GetPartitionId(TConstArrayRef<TCell> key) const override { + ui64 GetPartitionId(const TChangeRecord& record) const override { Y_VERIFY(KeyDesc); Y_VERIFY(KeyDesc->Partitions); - const auto range = TTableRange(key); - Y_VERIFY(range.Point); - - TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound( - KeyDesc->Partitions.begin(), KeyDesc->Partitions.end(), true, - [&](const TKeyDesc::TPartitionInfo& partition, bool) { - const int compares = CompareBorders<true, false>( - partition.EndKeyPrefix.GetCells(), range.From, - partition.IsInclusive || partition.IsPoint, - range.InclusiveFrom || range.Point, KeyDesc->Schema + switch (Format) { + case NKikimrSchemeOp::ECdcStreamFormatProto: { + const auto range = TTableRange(record.GetKey()); + Y_VERIFY(range.Point); + + TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound( + KeyDesc->Partitions.begin(), KeyDesc->Partitions.end(), true, + [&](const TKeyDesc::TPartitionInfo& partition, bool) { + const int compares = CompareBorders<true, false>( + partition.EndKeyPrefix.GetCells(), range.From, + partition.IsInclusive || partition.IsPoint, + range.InclusiveFrom || range.Point, KeyDesc->Schema + ); + + return (compares < 0); + } ); - return (compares < 0); + Y_VERIFY(it != KeyDesc->Partitions.end()); + return it->PartitionId; + } + + case NKikimrSchemeOp::ECdcStreamFormatJson: { + using namespace NKikimr::NDataStreams::V1; + const auto hashKey = HexBytesToDecimal(MD5::Calc(record.GetPartitionKey())); + return ShardFromDecimal(hashKey, KeyDesc->Partitions.size()); } - ); - Y_VERIFY(it != KeyDesc->Partitions.end()); - return it->PartitionId; + default: { + Y_FAIL_S("Unknown format" + << ": format# " << static_cast<int>(Format)); + } + } } IActor* CreateSender(ui64 partitionId) override { diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp index ee8576248b..026c4033f2 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.cpp +++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp @@ -115,7 +115,7 @@ void TBaseChangeSender::SendRecords() { bool needToResolve = false; while (it != PendingSent.end()) { - const ui64 partitionId = Resolver->GetPartitionId(it->second.GetKey()); + const ui64 partitionId = Resolver->GetPartitionId(it->second); if (!Senders.contains(partitionId)) { needToResolve = true; ++it; diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h index e807a468aa..efff4a722e 100644 --- a/ydb/core/tx/datashard/change_sender_common_ops.h +++ b/ydb/core/tx/datashard/change_sender_common_ops.h @@ -75,7 +75,7 @@ public: virtual void Resolve() = 0; virtual bool IsResolving() const = 0; virtual bool IsResolved() const = 0; - virtual ui64 GetPartitionId(TConstArrayRef<TCell> key) const = 0; + virtual ui64 GetPartitionId(const TChangeRecord& record) const = 0; }; class TBaseChangeSender: public IChangeSender { diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 3af5b4a2e5..5c98f3f605 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -1,5 +1,9 @@ #include "datashard_ut_common.h" +#include <ydb/core/base/path.h> +#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> +#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h> + #include <util/string/strip.h> namespace NKikimr { @@ -567,6 +571,314 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) { opts.SetChangesQueueBytesLimit(1); }); } -} + +} // AsyncIndexChangeExchange + +Y_UNIT_TEST_SUITE(Cdc) { + using namespace NYdb::NPersQueue; + using namespace NYdb::NDataStreams::V1; + + using TCdcStream = TShardedTableOptions::TCdcStream; + + template <typename TDerived, typename TClient> + class TTestEnv { + public: + explicit TTestEnv( + const TShardedTableOptions& tableDesc, + const TCdcStream& streamDesc, + const TString& root = "Root", + const TString& tableName = "Table") + { + auto settings = TServerSettings(PortManager.GetPort(2134), {}, DefaultPQConfig()) + .SetDomainName(root) + .SetGrpcPort(PortManager.GetPort(2135)); + + Server = new TServer(settings); + Server->EnableGRpc(settings.GrpcPort); + + const auto database = JoinPath({root}); + auto& runtime = *Server->GetRuntime(); + EdgeActor = runtime.AllocateEdgeActor(); + + SetupLogging(runtime); + InitRoot(Server, EdgeActor); + + CreateShardedTable(Server, EdgeActor, database, tableName, tableDesc); + WaitTxNotification(Server, EdgeActor, AsyncAlterAddStream(Server, database, tableName, streamDesc)); + + Client = TDerived::MakeClient(Server->GetDriver(), database); + } + + TServer::TPtr GetServer() { + UNIT_ASSERT(Server); + return Server; + } + + TClient& GetClient() { + UNIT_ASSERT(Client); + return *Client; + } + + const TActorId& GetEdgeActor() const { + return EdgeActor; + } + + private: + static NKikimrPQ::TPQConfig DefaultPQConfig() { + NKikimrPQ::TPQConfig pqConfig; + pqConfig.SetEnabled(true); + pqConfig.SetEnableProtoSourceIdInfo(true); + pqConfig.SetRoundRobinPartitionMapping(true); + pqConfig.SetTopicsAreFirstClassCitizen(true); + pqConfig.SetMaxReadCookies(10); + pqConfig.AddClientServiceType()->SetName("data-streams"); + return pqConfig; + } + + static void SetupLogging(TTestActorRuntime& runtime) { + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NLog::PRI_DEBUG); + } + + private: + TPortManager PortManager; + TServer::TPtr Server; + TActorId EdgeActor; + THolder<TClient> Client; + + }; // TTestEnv + + class TTestPqEnv: public TTestEnv<TTestPqEnv, TPersQueueClient> { + public: + using TTestEnv<TTestPqEnv, TPersQueueClient>::TTestEnv; + + static THolder<TPersQueueClient> MakeClient(const NYdb::TDriver& driver, const TString& database) { + return MakeHolder<TPersQueueClient>(driver, TPersQueueClientSettings().Database(database)); + } + }; + + class TTestYdsEnv: public TTestEnv<TTestYdsEnv, TDataStreamsClient> { + public: + using TTestEnv<TTestYdsEnv, TDataStreamsClient>::TTestEnv; + + static THolder<TDataStreamsClient> MakeClient(const NYdb::TDriver& driver, const TString& database) { + return MakeHolder<TDataStreamsClient>(driver, NYdb::TCommonClientSettings().Database(database)); + } + }; + + TShardedTableOptions SimpleTable() { + return TShardedTableOptions() + .Columns({ + {"key", "Uint32", true, false}, + {"value", "Uint32", false, false}, + }); + } + + TCdcStream KeysOnly(NKikimrSchemeOp::ECdcStreamFormat format) { + return TCdcStream{ + .Name = "Stream", + .Mode = NKikimrSchemeOp::ECdcStreamModeKeysOnly, + .Format = format, + }; + } + + TCdcStream Updates(NKikimrSchemeOp::ECdcStreamFormat format) { + return TCdcStream{ + .Name = "Stream", + .Mode = NKikimrSchemeOp::ECdcStreamModeUpdate, + .Format = format, + }; + } + + TCdcStream NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormat format) { + return TCdcStream{ + .Name = "Stream", + .Mode = NKikimrSchemeOp::ECdcStreamModeNewAndOldImages, + .Format = format, + }; + } + + struct PqRunner { + static void Run(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, + const TVector<TString>& queries, const TVector<std::pair<TString, TString>>& records) + { + TTestPqEnv env(tableDesc, streamDesc); + + for (const auto& query : queries) { + ExecSQL(env.GetServer(), env.GetEdgeActor(), query); + } + + auto& client = env.GetClient(); + + // add consumer + { + auto res = client.AddReadRule("/Root/Table/Stream", + TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName("user"))).ExtractValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } + + // get records + auto reader = client.CreateReadSession(TReadSessionSettings() + .AppendTopics(TString("/Root/Table/Stream")) + .ConsumerName("user") + .DisableClusterDiscovery(true) + ); + + ui32 reads = 0; + while (reads < records.size()) { + auto ev = reader->GetEvent(true); + UNIT_ASSERT(ev); + + if (auto* data = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*ev)) { + for (const auto& item : data->GetMessages()) { + const auto& record = records.at(reads++); + UNIT_ASSERT_VALUES_EQUAL(record.first, item.GetPartitionKey()); + UNIT_ASSERT_VALUES_EQUAL(record.second, item.GetData()); + } + } else if (auto* create = std::get_if<TReadSessionEvent::TCreatePartitionStreamEvent>(&*ev)) { + create->Confirm(); + } else if (auto* destroy = std::get_if<TReadSessionEvent::TDestroyPartitionStreamEvent>(&*ev)) { + destroy->Confirm(); + } else if (std::get_if<TSessionClosedEvent>(&*ev)) { + break; + } + } + + // remove consumer + { + auto res = client.RemoveReadRule("/Root/Table/Stream", + TRemoveReadRuleSettings().ConsumerName("user")).ExtractValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } + } + }; + + struct YdsRunner { + static void Run(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, + const TVector<TString>& queries, const TVector<std::pair<TString, TString>>& records) + { + TTestYdsEnv env(tableDesc, streamDesc); + + for (const auto& query : queries) { + ExecSQL(env.GetServer(), env.GetEdgeActor(), query); + } + + auto& client = env.GetClient(); + + // add consumer + { + auto res = client.RegisterStreamConsumer("/Root/Table/Stream", "user").ExtractValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } + + // get shards + TString shardId; + { + auto res = client.ListShards("/Root/Table/Stream", {}).ExtractValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(res.GetResult().shards().size(), 1); + shardId = res.GetResult().shards().begin()->shard_id(); + } + + // get iterator + TString shardIt; + { + auto res = client.GetShardIterator("/Root/Table/Stream", shardId, Ydb::DataStreams::V1::ShardIteratorType::TRIM_HORIZON).ExtractValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + shardIt = res.GetResult().shard_iterator(); + } + + // get records + { + auto res = client.GetRecords(shardIt).ExtractValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(res.GetResult().records().size(), records.size()); + + for (ui32 i = 0; i < records.size(); ++i) { + const auto& actual = res.GetResult().records().at(i); + const auto& expected = records.at(i); + UNIT_ASSERT_VALUES_EQUAL(actual.partition_key(), expected.first); + UNIT_ASSERT_VALUES_EQUAL(actual.data(), expected.second); + } + } + + // remove consumer + { + auto res = client.DeregisterStreamConsumer("/Root/Table/Stream", "user").ExtractValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } + } + }; + + #define Y_UNIT_TEST_TWIN(N, VAR1, VAR2) \ + template<typename TRunner> void N(NUnitTest::TTestContext&); \ + struct TTestRegistration##N { \ + TTestRegistration##N() { \ + TCurrentTest::AddTest(#N "[" #VAR1 "]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<VAR1>), false); \ + TCurrentTest::AddTest(#N "[" #VAR2 "]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<VAR2>), false); \ + } \ + }; \ + static TTestRegistration##N testRegistration##N; \ + template<typename TRunner> \ + void N(NUnitTest::TTestContext&) + + Y_UNIT_TEST_TWIN(KeysOnlyLog, PqRunner, YdsRunner) { + TRunner::Run(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson), {R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )", R"( + DELETE FROM `/Root/Table` WHERE key = 1; + )"}, { + {"[1]", R"({"update":{}})"}, + {"[2]", R"({"update":{}})"}, + {"[3]", R"({"update":{}})"}, + {"[1]", R"({"erase":{}})"}, + }); + } + + Y_UNIT_TEST_TWIN(UpdatesLog, PqRunner, YdsRunner) { + TRunner::Run(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), {R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )", R"( + DELETE FROM `/Root/Table` WHERE key = 1; + )"}, { + {"[1]", R"({"update":{"value":10}})"}, + {"[2]", R"({"update":{"value":20}})"}, + {"[3]", R"({"update":{"value":30}})"}, + {"[1]", R"({"erase":{}})"}, + }); + } + + Y_UNIT_TEST_TWIN(NewAndOldImagesLog, PqRunner, YdsRunner) { + TRunner::Run(SimpleTable(), NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatJson), {R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 10), + (2, 20), + (3, 30); + )", R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 100), + (2, 200), + (3, 300); + )", R"( + DELETE FROM `/Root/Table` WHERE key = 1; + )"}, { + {"[1]", R"({"update":{},"newImage":{"value":10}})"}, + {"[2]", R"({"update":{},"newImage":{"value":20}})"}, + {"[3]", R"({"update":{},"newImage":{"value":30}})"}, + {"[1]", R"({"update":{},"newImage":{"value":100},"oldImage":{"value":10}})"}, + {"[2]", R"({"update":{},"newImage":{"value":200},"oldImage":{"value":20}})"}, + {"[3]", R"({"update":{},"newImage":{"value":300},"oldImage":{"value":30}})"}, + {"[1]", R"({"erase":{},"oldImage":{"value":100}})"}, + }); + } + +} // Cdc } // NKikimr diff --git a/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.darwin.txt b/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.darwin.txt index 179d5251dc..8fabff31b6 100644 --- a/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.darwin.txt +++ b/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.darwin.txt @@ -28,6 +28,8 @@ target_link_libraries(ydb-core-tx-datashard-ut_change_exchange PUBLIC ydb-core-tx udf-service-exception_policy public-lib-yson_value + cpp-client-ydb_datastreams + cpp-client-ydb_persqueue_public cpp-client-ydb_result ) target_sources(ydb-core-tx-datashard-ut_change_exchange PRIVATE diff --git a/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux.txt b/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux.txt index abb32cd8af..267d3ce3fc 100644 --- a/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux.txt +++ b/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux.txt @@ -29,6 +29,8 @@ target_link_libraries(ydb-core-tx-datashard-ut_change_exchange PUBLIC ydb-core-tx udf-service-exception_policy public-lib-yson_value + cpp-client-ydb_datastreams + cpp-client-ydb_persqueue_public cpp-client-ydb_result ) target_sources(ydb-core-tx-datashard-ut_change_exchange PRIVATE diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index 708f62362f..cf0ffbe726 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -825,8 +825,6 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { } case NKikimrSchemeOp::EPathTypeTableIndex: return true; - case NKikimrSchemeOp::EPathTypeCdcStream: - return true; default: return false; } diff --git a/ydb/core/tx/scheme_cache/scheme_cache.cpp b/ydb/core/tx/scheme_cache/scheme_cache.cpp index 5cf9a0b0d5..2ccc4b31a1 100644 --- a/ydb/core/tx/scheme_cache/scheme_cache.cpp +++ b/ydb/core/tx/scheme_cache/scheme_cache.cpp @@ -72,6 +72,7 @@ TString TSchemeCacheNavigate::ToString(const NScheme::TTypeRegistry& typeRegistr << " ErrorCount: " << ErrorCount << " DatabaseName: " << DatabaseName << " DomainOwnerId: " << DomainOwnerId + << " Instant: " << Instant << " ResultSet [" << ResultSetToString(ResultSet, typeRegistry) << "]" << " }"; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp index 9263fbc929..9bf454440d 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp @@ -421,8 +421,11 @@ public: .IsResolved() .NotDeleted() .IsPQGroup() - .NotUnderOperation() - .IsCommonSensePath(); + .NotUnderOperation(); + + if (!Transaction.GetAllowAccessToPrivatePaths()) { + checks.IsCommonSensePath(); + } if (!checks) { TString explain = TStringBuilder() << "path fail checks" @@ -452,7 +455,6 @@ public: } newTabletConfig = tabletConfig; - TPersQueueGroupInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr); if (!alterData) { result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h index 6dd1cf10e1..758a7e9eb6 100644 --- a/ydb/library/persqueue/topic_parser/topic_parser.h +++ b/ydb/library/persqueue/topic_parser/topic_parser.h @@ -120,6 +120,10 @@ public: CHECK_VALID_AND_RETURN(PrimaryFullPath); } + void SetPrimaryPath(const TString& path) { + PrimaryFullPath = path; + } + TString GetSecondaryPath() { Y_FAIL("UNIMPLEMENTED"); CHECK_VALID_AND_RETURN(SecondaryFullPath); diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 9987c8d4c0..f7b26e8440 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -285,11 +285,10 @@ namespace NKikimr::NDataStreams::V1 { const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo ) { - Y_UNUSED(pqGroupDescription); Y_UNUSED(selfInfo); TString error; - if (!ValidateShardsCount(*GetProtoRequest(), groupConfig, error)) { + if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx); } @@ -326,11 +325,10 @@ namespace NKikimr::NDataStreams::V1 { const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription, const NKikimrSchemeOp::TDirEntry& selfInfo ) { - Y_UNUSED(pqGroupDescription); Y_UNUSED(selfInfo); TString error; - if (!ValidateShardsCount(*GetProtoRequest(), groupConfig, error) + if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error) || !ValidateWriteSpeedLimit(*GetProtoRequest(), error, ctx) || !ValidateRetentionPeriod(*GetProtoRequest(), groupConfig, Nothing(), error)) { @@ -868,8 +866,8 @@ namespace NKikimr::NDataStreams::V1 { //----------------------------------------------------------------------------------------- - class TRegisterStreamConsumerActor : public TUpdateSchemeActor<TRegisterStreamConsumerActor, NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest> { - using TBase = TUpdateSchemeActor<TRegisterStreamConsumerActor, TEvDataStreamsRegisterStreamConsumerRequest>; + class TRegisterStreamConsumerActor : public TUpdateSchemeActor<TRegisterStreamConsumerActor, NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest, true> { + using TBase = TUpdateSchemeActor<TRegisterStreamConsumerActor, TEvDataStreamsRegisterStreamConsumerRequest, true>; public: TRegisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest* request); @@ -943,8 +941,8 @@ namespace NKikimr::NDataStreams::V1 { //----------------------------------------------------------------------------------------- - class TDeregisterStreamConsumerActor : public TUpdateSchemeActor<TDeregisterStreamConsumerActor, NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest> { - using TBase = TUpdateSchemeActor<TDeregisterStreamConsumerActor, TEvDataStreamsDeregisterStreamConsumerRequest>; + class TDeregisterStreamConsumerActor : public TUpdateSchemeActor<TDeregisterStreamConsumerActor, NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest, true> { + using TBase = TUpdateSchemeActor<TDeregisterStreamConsumerActor, TEvDataStreamsDeregisterStreamConsumerRequest, true>; public: TDeregisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest* request); @@ -1101,8 +1099,11 @@ namespace NKikimr::NDataStreams::V1 { auto partitionId = partition.GetPartitionId(); TString shardName = GetShardName(partitionId); if (shardName == ShardId) { - TShardIterator it(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber); - SendResponse(ctx, it); + if (topicInfo->ShowPrivatePath) { + SendResponse(ctx, TShardIterator::Cdc(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber)); + } else { + SendResponse(ctx, TShardIterator::Common(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber)); + } return; } } @@ -1187,7 +1188,7 @@ namespace NKikimr::NDataStreams::V1 { TStringBuilder() << "Limit '" << Limit << "' is out of bounds [1; " << MAX_LIMIT << "]", ctx); } - SendDescribeProposeRequest(ctx); + SendDescribeProposeRequest(ctx, ShardIterator.IsCdcTopic()); Become(&TGetRecordsActor::StateWork); } @@ -1334,7 +1335,7 @@ namespace NKikimr::NDataStreams::V1 { TShardIterator shardIterator(ShardIterator.GetStreamName(), ShardIterator.GetStreamArn(), ShardIterator.GetShardId(), - timestamp, seqNo); + timestamp, seqNo, ShardIterator.GetKind()); result.set_next_shard_iterator(shardIterator.Serialize()); result.set_millis_behind_latest(millisBehindLatestMs); diff --git a/ydb/services/datastreams/shard_iterator.h b/ydb/services/datastreams/shard_iterator.h index 7b6ce9c5cd..626f5110b7 100644 --- a/ydb/services/datastreams/shard_iterator.h +++ b/ydb/services/datastreams/shard_iterator.h @@ -21,16 +21,29 @@ namespace NKikimr::NDataStreams::V1 { } TShardIterator(const TString& streamName, const TString& streamArn, - ui32 shardId, ui64 readTimestamp, ui32 sequenceNumber) { + ui32 shardId, ui64 readTimestamp, ui32 sequenceNumber, + NKikimrPQ::TYdsShardIterator::ETopicKind kind = NKikimrPQ::TYdsShardIterator::KIND_COMMON) { Proto.SetStreamName(streamName); Proto.SetStreamArn(streamArn); Proto.SetShardId(shardId); Proto.SetReadTimestampMs(readTimestamp); Proto.SetSequenceNumber(sequenceNumber); Proto.SetCreationTimestampMs(TInstant::Now().MilliSeconds()); + Proto.SetKind(kind); Valid = true; } + static TShardIterator Common(const TString& streamName, const TString& streamArn, + ui32 shardId, ui64 readTimestamp, ui32 sequenceNumber) { + return TShardIterator(streamName, streamArn, shardId, readTimestamp, sequenceNumber); + } + + static TShardIterator Cdc(const TString& streamName, const TString& streamArn, + ui32 shardId, ui64 readTimestamp, ui32 sequenceNumber) { + return TShardIterator(streamName, streamArn, shardId, readTimestamp, sequenceNumber, + NKikimrPQ::TYdsShardIterator::KIND_CDC); + } + TString Serialize() const { TString data; bool result = Proto.SerializeToString(&data); @@ -65,6 +78,14 @@ namespace NKikimr::NDataStreams::V1 { Proto.GetCreationTimestampMs() < LIFETIME_MS; } + NKikimrPQ::TYdsShardIterator::ETopicKind GetKind() const { + return Proto.GetKind(); + } + + bool IsCdcTopic() const { + return Proto.GetKind() == NKikimrPQ::TYdsShardIterator::KIND_CDC; + } + bool IsValid() const { return Valid; } diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h index b056a05e64..0b4214e7c8 100644 --- a/ydb/services/lib/actors/pq_schema_actor.h +++ b/ydb/services/lib/actors/pq_schema_actor.h @@ -58,22 +58,20 @@ namespace NKikimr::NGRpcProxy::V1 { { } - void PrepareTopicPath(const NActors::TActorContext &ctx) { // ToDo !! - TopicPath = NPersQueue::GetFullTopicPath(ctx, this->Request_->GetDatabaseName(), TopicPath); - } - TString GetTopicPath(const NActors::TActorContext& ctx) { - PrepareTopicPath(ctx); - return TopicPath; + auto path = NPersQueue::GetFullTopicPath(ctx, this->Request_->GetDatabaseName(), TopicPath); + if (PrivateTopicName) { + path = JoinPath(ChildPath(NKikimr::SplitPath(path), *PrivateTopicName)); + } + return path; } protected: // TDerived must implement FillProposeRequest(TEvProposeTransaction&, const TActorContext& ctx, TString workingDir, TString name); void SendProposeRequest(const NActors::TActorContext &ctx) { - PrepareTopicPath(ctx); std::pair <TString, TString> pathPair; try { - pathPair = NKikimr::NGRpcService::SplitPath(TopicPath); + pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath(ctx)); } catch (const std::exception &ex) { this->Request_->RaiseIssue(NYql::ExceptionToIssue(ex)); return this->ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx); @@ -103,15 +101,15 @@ namespace NKikimr::NGRpcProxy::V1 { } } - void SendDescribeProposeRequest(const NActors::TActorContext& ctx) { - PrepareTopicPath(ctx); + void SendDescribeProposeRequest(const NActors::TActorContext& ctx, bool showPrivate = false) { auto navigateRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>(); navigateRequest->DatabaseName = CanonizePath(this->Request_->GetDatabaseName().GetOrElse("")); NSchemeCache::TSchemeCacheNavigate::TEntry entry; - entry.Path = NKikimr::SplitPath(TopicPath); + entry.Path = NKikimr::SplitPath(GetTopicPath(ctx)); entry.SyncVersion = true; - entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTopic; + entry.ShowPrivatePath = showPrivate || PrivateTopicName.Defined(); + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList; navigateRequest->ResultSet.emplace_back(entry); if (this->Request_->GetInternalToken().empty()) { @@ -157,7 +155,13 @@ namespace NKikimr::NGRpcProxy::V1 { switch (response.Status) { case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: { - if (!result->ResultSet.front().PQGroupInfo) { + if (!response.PQGroupInfo) { + if (response.Kind == NSchemeCache::TSchemeCacheNavigate::KindCdcStream) { + Y_VERIFY(response.ListNodeEntry->Children.size() == 1); + PrivateTopicName = response.ListNodeEntry->Children.at(0).Name; + return SendDescribeProposeRequest(ctx); + } + this->Request_->RaiseIssue( FillIssue( TStringBuilder() << "path '" << path << "' creation is not completed", @@ -246,12 +250,13 @@ namespace NKikimr::NGRpcProxy::V1 { private: bool IsDead = false; - TString TopicPath; + const TString TopicPath; + TMaybe<TString> PrivateTopicName; }; //----------------------------------------------------------------------------------- - template<class TDerived, class TRequest> + template<class TDerived, class TRequest, bool AllowAccessToPrivatePaths = false> class TUpdateSchemeActor : public TPQGrpcSchemaBase<TDerived, TRequest> { using TBase = TPQGrpcSchemaBase<TDerived, TRequest>; @@ -269,12 +274,17 @@ namespace NKikimr::NGRpcProxy::V1 { NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme()); modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup); modifyScheme.SetWorkingDir(workingDir); + modifyScheme.SetAllowAccessToPrivatePaths(AllowAccessToPrivatePaths); auto* config = modifyScheme.MutableAlterPersQueueGroup(); Y_VERIFY(response.Self); Y_VERIFY(response.PQGroupInfo); config->CopyFrom(response.PQGroupInfo->Description); + // keep previous values or set in ModifyPersqueueConfig + config->ClearTotalGroupCount(); + config->MutablePQTabletConfig()->ClearPartitionKeySchema(); + { auto applyIf = modifyScheme.AddApplyIf(); applyIf->SetPathId(response.Self->Info.GetPathId()); diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h index 7c11732d8e..9372254485 100644 --- a/ydb/services/persqueue_v1/grpc_pq_actor.h +++ b/ydb/services/persqueue_v1/grpc_pq_actor.h @@ -708,7 +708,7 @@ private: void CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode code, const TActorContext& ctx); - + void DescribeTopics(const NActors::TActorContext& ctx, bool showPrivate = false); bool ProcessTopicSchemeCacheResponse(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, THashMap<TString, TTopicHolder>::iterator topicsIter, const TActorContext& ctx); void HandleClientSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx); @@ -1163,8 +1163,8 @@ public: }; -class TAddReadRuleActor : public TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest> { - using TBase = TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest>; +class TAddReadRuleActor : public TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest, true> { + using TBase = TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest, true>; public: TAddReadRuleActor(NKikimr::NGRpcService::TEvPQAddReadRuleRequest *request); @@ -1176,8 +1176,8 @@ public: const NKikimrSchemeOp::TDirEntry& selfInfo); }; -class TRemoveReadRuleActor : public TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest> { - using TBase = TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest>; +class TRemoveReadRuleActor : public TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest, true> { + using TBase = TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest, true>; public: TRemoveReadRuleActor(NKikimr::NGRpcService::TEvPQRemoveReadRuleRequest* request); diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp index d7209ef46b..d83ae00434 100644 --- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp +++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp @@ -17,6 +17,7 @@ #include <library/cpp/protobuf/util/repeated_field_utils.h> #include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h> +#include <util/string/join.h> #include <util/string/strip.h> #include <util/charset/utf8.h> @@ -2630,8 +2631,8 @@ TReadInitAndAuthActor::TReadInitAndAuthActor( , Counters(counters) , LocalCluster(localCluster) { - for (const auto& t : topics) { - Topics[t.first].TopicNameConverter = t.second; + for (const auto& [path, converter] : topics) { + Topics[path].TopicNameConverter = converter; } } @@ -2642,18 +2643,24 @@ TReadInitAndAuthActor::~TReadInitAndAuthActor() = default; void TReadInitAndAuthActor::Bootstrap(const TActorContext &ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " auth for : " << ClientId); Become(&TThis::StateFunc); + DoCheckACL = AppData(ctx)->PQConfig.GetCheckACL() && Token; + DescribeTopics(ctx); +} + +void TReadInitAndAuthActor::DescribeTopics(const NActors::TActorContext& ctx, bool showPrivate) { TVector<TString> topicNames; - for (const auto& topic : Topics) { - topicNames.emplace_back(topic.second.TopicNameConverter->GetPrimaryPath()); + for (const auto& [_, holder] : Topics) { + topicNames.emplace_back(holder.TopicNameConverter->GetPrimaryPath()); } - DoCheckACL = AppData(ctx)->PQConfig.GetCheckACL() && Token; - ctx.Send(MetaCacheId, new TEvDescribeTopicsRequest(topicNames, true)); + + LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " describe topics: " << JoinSeq(", ", topicNames)); + ctx.Send(MetaCacheId, new TEvDescribeTopicsRequest(topicNames, true, showPrivate)); } void TReadInitAndAuthActor::Die(const TActorContext& ctx) { - for (auto& t : Topics) { - if (t.second.PipeClient) - NTabletPipe::CloseClient(ctx, t.second.PipeClient); + for (auto& [_, holder] : Topics) { + if (holder.PipeClient) + NTabletPipe::CloseClient(ctx, holder.PipeClient); } LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " auth is DEAD"); @@ -2695,12 +2702,26 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse( void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Handle describe topics response"); + + bool reDescribe = false; for (const auto& entry : ev->Get()->Result->ResultSet) { - auto processResult = ProcessMetaCacheTopicResponse(entry); auto path = JoinPath(entry.Path); auto it = Topics.find(path); Y_VERIFY(it != Topics.end()); + if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindCdcStream) { + Y_VERIFY(entry.ListNodeEntry->Children.size() == 1); + const auto& topic = entry.ListNodeEntry->Children.at(0); + + it->second.TopicNameConverter->SetPrimaryPath(JoinPath(ChildPath(entry.Path, topic.Name))); + Topics[it->second.TopicNameConverter->GetPrimaryPath()] = it->second; + Topics.erase(it); + + reDescribe = true; + continue; + } + + auto processResult = ProcessMetaCacheTopicResponse(entry); if (processResult.IsFatal) { Topics.erase(it); if (Topics.empty()) { @@ -2711,14 +2732,21 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon continue; } } - if (!ProcessTopicSchemeCacheResponse(entry, it, ctx)) - return; + if (!ProcessTopicSchemeCacheResponse(entry, it, ctx)) { + return; + } } + if (Topics.empty()) { CloseSession("no topics found", PersQueue::ErrorCode::BAD_REQUEST, ctx); return; } + + if (reDescribe) { + return DescribeTopics(ctx, true); + } + // ToDo[migration] - separate option - ? bool doCheckClientAcl = DoCheckACL && !AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen(); if (doCheckClientAcl) { @@ -2812,9 +2840,9 @@ bool TReadInitAndAuthActor::CheckACLPermissionsForNavigate( void TReadInitAndAuthActor::FinishInitialization(const TActorContext& ctx) { TTopicTabletsPairs res; - for (auto& t : Topics) { + for (auto& [_, holder] : Topics) { res.emplace_back(decltype(res)::value_type({ - t.second.TopicNameConverter, t.second.TabletID, t.second.CloudId, t.second.DbId, t.second.FolderId + holder.TopicNameConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.FolderId })); } ctx.Send(ParentId, new TEvPQProxy::TEvAuthResultOk(std::move(res))); |