aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-03-27 21:49:53 +0300
committerIlnaz Nizametdinov <i.nizametdinov@gmail.com>2022-03-27 21:49:53 +0300
commit760769f65e5243098c203338ffbae04aae57d61d (patch)
tree36e69b83fe61482d0ce2977d948d59eda4c69921
parenteba7afc473d087b21d8bfdfc1398f50fd8452aaf (diff)
downloadydb-760769f65e5243098c203338ffbae04aae57d61d.tar.gz
PQ & YDS readable logs, tests KIKIMR-14198
ref:b4144cad998d0d2bd0f953e0c03a2456fd819164
-rw-r--r--CMakeLists.darwin.txt2
-rw-r--r--CMakeLists.linux.txt2
-rw-r--r--ydb/core/base/path.h6
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.cpp21
-rw-r--r--ydb/core/client/server/msgbus_server_pq_metacache.h7
-rw-r--r--ydb/core/protos/flat_scheme_op.proto1
-rw-r--r--ydb/core/protos/pqconfig.proto6
-rw-r--r--ydb/core/testlib/test_client.cpp5
-rw-r--r--ydb/core/testlib/test_client.h1
-rw-r--r--ydb/core/tx/datashard/CMakeLists.txt4
-rw-r--r--ydb/core/tx/datashard/change_record.cpp116
-rw-r--r--ydb/core/tx/datashard/change_record.h7
-rw-r--r--ydb/core/tx/datashard/change_sender_async_index.cpp4
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp49
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.cpp2
-rw-r--r--ydb/core/tx/datashard/change_sender_common_ops.h2
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp314
-rw-r--r--ydb/core/tx/datashard/ut_change_exchange/CMakeLists.darwin.txt2
-rw-r--r--ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux.txt2
-rw-r--r--ydb/core/tx/scheme_board/cache.cpp2
-rw-r--r--ydb/core/tx/scheme_cache/scheme_cache.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp8
-rw-r--r--ydb/library/persqueue/topic_parser/topic_parser.h4
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp25
-rw-r--r--ydb/services/datastreams/shard_iterator.h23
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.h40
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_actor.h10
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_read_actor.cpp56
28 files changed, 594 insertions, 128 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index 26d76167eb..b4969035b3 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -574,6 +574,7 @@ add_subdirectory(ydb/public/lib/deprecated/client)
add_subdirectory(ydb/public/lib/value)
add_subdirectory(ydb/library/yql/dq/actors/compute)
add_subdirectory(ydb/library/yql/dq/tasks)
+add_subdirectory(ydb/services/lib/sharding)
add_subdirectory(ydb/core/tx/long_tx_service/public)
add_subdirectory(ydb/core/yq/libs/actors)
add_subdirectory(ydb/core/yq/libs/actors/logging)
@@ -752,7 +753,6 @@ add_subdirectory(ydb/services/persqueue_v1)
add_subdirectory(ydb/library/persqueue/tests)
add_subdirectory(ydb/services/lib/actors)
add_subdirectory(ydb/public/lib/jwt)
-add_subdirectory(ydb/services/lib/sharding)
add_subdirectory(ydb/core/cms)
add_subdirectory(ydb/core/driver_lib/base_utils)
add_subdirectory(ydb/core/driver_lib/cli_config_base)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index a80add5dfc..3fba0407c0 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -654,6 +654,7 @@ add_subdirectory(ydb/public/lib/deprecated/client)
add_subdirectory(ydb/public/lib/value)
add_subdirectory(ydb/library/yql/dq/actors/compute)
add_subdirectory(ydb/library/yql/dq/tasks)
+add_subdirectory(ydb/services/lib/sharding)
add_subdirectory(ydb/core/tx/long_tx_service/public)
add_subdirectory(ydb/core/yq/libs/actors)
add_subdirectory(ydb/core/yq/libs/actors/logging)
@@ -832,7 +833,6 @@ add_subdirectory(ydb/services/persqueue_v1)
add_subdirectory(ydb/library/persqueue/tests)
add_subdirectory(ydb/services/lib/actors)
add_subdirectory(ydb/public/lib/jwt)
-add_subdirectory(ydb/services/lib/sharding)
add_subdirectory(ydb/core/cms)
add_subdirectory(ydb/core/driver_lib/base_utils)
add_subdirectory(ydb/core/driver_lib/cli_config_base)
diff --git a/ydb/core/base/path.h b/ydb/core/base/path.h
index c2b93d2baf..402365d473 100644
--- a/ydb/core/base/path.h
+++ b/ydb/core/base/path.h
@@ -30,4 +30,10 @@ TString CombinePath(TIter begin, TIter end, bool canonize = true) {
: path;
}
+inline TVector<TString> ChildPath(const TVector<TString>& parentPath, const TString& childName) {
+ auto path = parentPath;
+ path.push_back(childName);
+ return path;
+}
+
}
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.cpp b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
index 6b752a60e8..d27e55b0d0 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.cpp
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.cpp
@@ -215,11 +215,12 @@ private:
void HandleDescribeTopics(TEvPqNewMetaCache::TEvDescribeTopicsRequest::TPtr& ev, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle describe topics");
+ const auto& msg = *ev->Get();
- SendSchemeCacheRequest(ev->Get()->Topics, !ev->Get()->PathPrefix.empty(), false, ev->Get()->SyncVersion, ctx);
+ SendSchemeCacheRequest(msg.Topics, !msg.PathPrefix.empty(), false, msg.SyncVersion, msg.ShowPrivate, ctx);
auto inserted = DescribeTopicsWaiters.insert(std::make_pair(
RequestId,
- TWaiter{ev->Sender, std::move(ev->Get()->Topics)}
+ TWaiter{ev->Sender, std::move(msg.Topics)}
)).second;
Y_VERIFY(inserted);
}
@@ -252,7 +253,7 @@ private:
}
if (DescribeAllTopicsWaiters.empty()) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Make full list SC request");
- SendSchemeCacheRequest(CurrentTopics, true, true, false, ctx);
+ SendSchemeCacheRequest(CurrentTopics, true, true, false, false, ctx);
}
LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Store waiter");
DescribeAllTopicsWaiters.push(waiter);
@@ -262,6 +263,9 @@ private:
void HandleSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
auto& result = ev->Get()->Request;
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_METACACHE, "Handle SchemeCache response"
+ << ": result# " << result->ToString(*AppData()->TypeRegistry));
+
if (result->Instant == 0) {
for (const auto& entry : result->ResultSet) {
if (!entry.PQGroupInfo) {
@@ -296,10 +300,10 @@ private:
}
}
-
- void SendSchemeCacheRequest(
- const TVector<TString>& topics, bool addDefaultPathPrefix, bool isFullListingRequest, bool syncVersion, const TActorContext& ctx
- ) {
+ void SendSchemeCacheRequest(const TVector<TString>& topics,
+ bool addDefaultPathPrefix, bool isFullListingRequest, bool syncVersion, bool showPrivate,
+ const TActorContext& ctx)
+ {
auto instant = isFullListingRequest ? 0 : ++RequestId;
auto schemeCacheRequest = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(instant);
for (const auto& path : topics) {
@@ -311,7 +315,8 @@ private:
entry.Path.insert(entry.Path.end(), split.begin(), split.end());
entry.SyncVersion = syncVersion;
- entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTopic;
+ entry.ShowPrivatePath = showPrivate;
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
schemeCacheRequest->ResultSet.emplace_back(std::move(entry));
}
ctx.Send(SchemeCacheId, new TEvTxProxySchemeCache::TEvNavigateKeySet(schemeCacheRequest.Release()));
diff --git a/ydb/core/client/server/msgbus_server_pq_metacache.h b/ydb/core/client/server/msgbus_server_pq_metacache.h
index f1d8248ef3..74c9ac1418 100644
--- a/ydb/core/client/server/msgbus_server_pq_metacache.h
+++ b/ydb/core/client/server/msgbus_server_pq_metacache.h
@@ -66,18 +66,21 @@ struct TEvPqNewMetaCache {
TString PathPrefix;
TVector<TString> Topics;
bool SyncVersion;
+ bool ShowPrivate = false;
TEvDescribeTopicsRequest() = default;
- explicit TEvDescribeTopicsRequest(const TVector<TString>& topics, bool syncVersion = true)
+ explicit TEvDescribeTopicsRequest(const TVector<TString>& topics, bool syncVersion = true, bool showPrivate = false)
: Topics(topics)
, SyncVersion(syncVersion)
+ , ShowPrivate(showPrivate)
{}
- TEvDescribeTopicsRequest(const TVector<TString>& topics, const TString& pathPrefix, bool syncVersion = true)
+ TEvDescribeTopicsRequest(const TVector<TString>& topics, const TString& pathPrefix, bool syncVersion = true, bool showPrivate = false)
: PathPrefix(pathPrefix)
, Topics(topics)
, SyncVersion(syncVersion)
+ , ShowPrivate(showPrivate)
{}
};
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index 385b647c5e..6085b91483 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -1231,6 +1231,7 @@ message TModifyScheme {
optional EOperationType OperationType = 2;
optional bool Internal = 36 [default = false]; // internal operations are not generated directly by the user
optional bool FailOnExist = 50 [default = false]; // as a replacement for TEvModifySchemeTransaction.FailOnExist
+ optional bool AllowAccessToPrivatePaths = 53 [default = false];
optional TMkDir MkDir = 3;
optional TTableDescription CreateTable = 4;
diff --git a/ydb/core/protos/pqconfig.proto b/ydb/core/protos/pqconfig.proto
index 7c85927449..e0343e2140 100644
--- a/ydb/core/protos/pqconfig.proto
+++ b/ydb/core/protos/pqconfig.proto
@@ -691,10 +691,16 @@ message TYdsNextToken {
}
message TYdsShardIterator {
+ enum ETopicKind {
+ KIND_COMMON = 0;
+ KIND_CDC = 1;
+ }
+
required string StreamName = 1;
required string StreamArn = 2;
required uint32 ShardId = 3;
required uint64 ReadTimestampMs = 4;
required uint32 SequenceNumber = 5;
required uint64 CreationTimestampMs = 6;
+ optional ETopicKind Kind = 7;
}
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 9623365974..9b0c97c731 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -875,6 +875,11 @@ namespace Tests {
return Runtime->GetAppData().FunctionRegistry;
}
+ const NYdb::TDriver& TServer::GetDriver() const {
+ Y_VERIFY(Driver);
+ return *Driver;
+ }
+
TServer::~TServer() {
if (Runtime->GetAppData().Mon) {
Runtime->GetAppData().Mon->Stop();
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index b3673907ae..30f3d13c11 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -240,6 +240,7 @@ namespace Tests {
const TServerSettings& GetSettings() const;
const NScheme::TTypeRegistry* GetTypeRegistry();
const NMiniKQL::IFunctionRegistry* GetFunctionRegistry();
+ const NYdb::TDriver& GetDriver() const;
ui32 StaticNodes() const {
return Settings->NodeCount;
diff --git a/ydb/core/tx/datashard/CMakeLists.txt b/ydb/core/tx/datashard/CMakeLists.txt
index 842cf3224b..2880f39156 100644
--- a/ydb/core/tx/datashard/CMakeLists.txt
+++ b/ydb/core/tx/datashard/CMakeLists.txt
@@ -18,6 +18,7 @@ target_link_libraries(core-tx-datashard PUBLIC
library-cpp-resource
cpp-actors-core
cpp-containers-flat_hash
+ cpp-digest-md5
cpp-html-pcdata
library-cpp-json
cpp-json-yson
@@ -44,6 +45,7 @@ target_link_libraries(core-tx-datashard PUBLIC
api-protos
lib-deprecated-kicli
dq-actors-compute
+ services-lib-sharding
)
target_sources(core-tx-datashard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/datashard/alter_cdc_stream_unit.cpp
@@ -251,6 +253,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
library-cpp-resource
cpp-actors-core
cpp-containers-flat_hash
+ cpp-digest-md5
cpp-html-pcdata
library-cpp-json
cpp-json-yson
@@ -277,6 +280,7 @@ target_link_libraries(core-tx-datashard.global PUBLIC
api-protos
lib-deprecated-kicli
dq-actors-compute
+ services-lib-sharding
)
target_sources(core-tx-datashard.global PRIVATE
${CMAKE_BINARY_DIR}/ydb/core/tx/datashard/eacf331f0ffc35d4b482f1d15a887d3b.cpp
diff --git a/ydb/core/tx/datashard/change_record.cpp b/ydb/core/tx/datashard/change_record.cpp
index cce147d67f..6f0f93e9f5 100644
--- a/ydb/core/tx/datashard/change_record.cpp
+++ b/ydb/core/tx/datashard/change_record.cpp
@@ -2,6 +2,7 @@
#include "export_common.h"
#include <library/cpp/json/json_reader.h>
+#include <library/cpp/json/json_writer.h>
#include <library/cpp/json/yson/json2yson.h>
#include <library/cpp/string_utils/base64/base64.h>
@@ -14,34 +15,6 @@
namespace NKikimr {
namespace NDataShard {
-i64 TChangeRecord::GetSeqNo() const {
- Y_VERIFY(Order <= Max<i64>());
- return static_cast<i64>(Order);
-}
-
-TConstArrayRef<TCell> TChangeRecord::GetKey() const {
- if (Key) {
- return *Key;
- }
-
- switch (Kind) {
- case EKind::AsyncIndex:
- case EKind::CdcDataChange: {
- NKikimrChangeExchange::TChangeRecord::TDataChange parsed;
- Y_VERIFY(parsed.ParseFromArray(Body.data(), Body.size()));
-
- TSerializedCellVec key;
- Y_VERIFY(TSerializedCellVec::TryParse(parsed.GetKey().GetData(), key));
-
- Key.ConstructInPlace(key.GetCells());
- break;
- }
- }
-
- Y_VERIFY(Key);
- return *Key;
-}
-
void TChangeRecord::SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const {
record.SetOrder(Order);
record.SetGroup(Group);
@@ -134,7 +107,7 @@ static NJson::TJsonValue ToJson(const TCell& cell, NScheme::TTypeId type) {
static void SerializeJsonKey(TUserTable::TCPtr schema, NJson::TJsonValue& key,
const NKikimrChangeExchange::TChangeRecord::TDataChange::TSerializedCells& in)
{
- Y_VERIFY(in.TagsSize() != schema->KeyColumnIds.size());
+ Y_VERIFY(in.TagsSize() == schema->KeyColumnIds.size());
for (size_t i = 0; i < schema->KeyColumnIds.size(); ++i) {
Y_VERIFY(in.GetTags(i) == schema->KeyColumnIds.at(i));
}
@@ -187,20 +160,25 @@ void TChangeRecord::SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value
SerializeJsonValue(Schema, value["newImage"], body.GetNewImage());
}
- if (!body.HasOldImage() && !body.HasNewImage()) {
- switch (body.GetRowOperationCase()) {
- case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert:
+ const auto hasAnyImage = body.HasOldImage() || body.HasNewImage();
+ switch (body.GetRowOperationCase()) {
+ case NKikimrChangeExchange::TChangeRecord::TDataChange::kUpsert:
+ value["update"].SetType(NJson::EJsonValueType::JSON_MAP);
+ if (!hasAnyImage) {
SerializeJsonValue(Schema, value["update"], body.GetUpsert());
- break;
- case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset:
+ }
+ break;
+ case NKikimrChangeExchange::TChangeRecord::TDataChange::kReset:
+ value["reset"].SetType(NJson::EJsonValueType::JSON_MAP);
+ if (!hasAnyImage) {
SerializeJsonValue(Schema, value["reset"], body.GetReset());
- break;
- case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase:
- value["erase"].SetType(NJson::EJsonValueType::JSON_MAP);
- break;
- default:
- Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
- }
+ }
+ break;
+ case NKikimrChangeExchange::TChangeRecord::TDataChange::kErase:
+ value["erase"].SetType(NJson::EJsonValueType::JSON_MAP);
+ break;
+ default:
+ Y_FAIL_S("Unexpected row operation: " << static_cast<int>(body.GetRowOperationCase()));
}
break;
@@ -212,6 +190,62 @@ void TChangeRecord::SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value
}
}
+TConstArrayRef<TCell> TChangeRecord::GetKey() const {
+ if (Key) {
+ return *Key;
+ }
+
+ switch (Kind) {
+ case EKind::AsyncIndex:
+ case EKind::CdcDataChange: {
+ NKikimrChangeExchange::TChangeRecord::TDataChange parsed;
+ Y_VERIFY(parsed.ParseFromArray(Body.data(), Body.size()));
+
+ TSerializedCellVec key;
+ Y_VERIFY(TSerializedCellVec::TryParse(parsed.GetKey().GetData(), key));
+
+ Key.ConstructInPlace(key.GetCells());
+ break;
+ }
+ }
+
+ Y_VERIFY(Key);
+ return *Key;
+}
+
+i64 TChangeRecord::GetSeqNo() const {
+ Y_VERIFY(Order <= Max<i64>());
+ return static_cast<i64>(Order);
+}
+
+TString TChangeRecord::GetPartitionKey() const {
+ if (PartitionKey) {
+ return *PartitionKey;
+ }
+
+ switch (Kind) {
+ case EKind::CdcDataChange: {
+ Y_VERIFY(Schema);
+
+ NKikimrChangeExchange::TChangeRecord::TDataChange body;
+ Y_VERIFY(body.ParseFromArray(Body.data(), Body.size()));
+
+ NJson::TJsonValue key;
+ SerializeJsonKey(Schema, key, body.GetKey());
+
+ PartitionKey.ConstructInPlace(WriteJson(key, false));
+ break;
+ }
+
+ case EKind::AsyncIndex: {
+ Y_FAIL("Not supported");
+ }
+ }
+
+ Y_VERIFY(PartitionKey);
+ return *PartitionKey;
+}
+
TString TChangeRecord::ToString() const {
TString result;
TStringOutput out(result);
diff --git a/ydb/core/tx/datashard/change_record.h b/ydb/core/tx/datashard/change_record.h
index fae0f79373..1ee4259f1b 100644
--- a/ydb/core/tx/datashard/change_record.h
+++ b/ydb/core/tx/datashard/change_record.h
@@ -36,8 +36,6 @@ public:
const TPathId& GetPathId() const { return PathId; }
EKind GetKind() const { return Kind; }
const TString& GetBody() const { return Body; }
- i64 GetSeqNo() const;
- TConstArrayRef<TCell> GetKey() const;
const TPathId& GetTableId() const { return TableId; }
ui64 GetSchemaVersion() const { return SchemaVersion; }
@@ -45,6 +43,10 @@ public:
void SerializeTo(NKikimrChangeExchange::TChangeRecord& record) const;
void SerializeTo(NJson::TJsonValue& key, NJson::TJsonValue& value) const;
+ TConstArrayRef<TCell> GetKey() const;
+ i64 GetSeqNo() const;
+ TString GetPartitionKey() const;
+
TString ToString() const;
void Out(IOutputStream& out) const;
@@ -62,6 +64,7 @@ private:
TUserTable::TCPtr Schema;
mutable TMaybe<TOwnedCellVec> Key;
+ mutable TMaybe<TString> PartitionKey;
}; // TChangeRecord
diff --git a/ydb/core/tx/datashard/change_sender_async_index.cpp b/ydb/core/tx/datashard/change_sender_async_index.cpp
index 1bc6796dbf..bb2d356fb7 100644
--- a/ydb/core/tx/datashard/change_sender_async_index.cpp
+++ b/ydb/core/tx/datashard/change_sender_async_index.cpp
@@ -613,11 +613,11 @@ class TAsyncIndexChangeSenderMain: public TActorBootstrapped<TAsyncIndexChangeSe
return KeyDesc && KeyDesc->Partitions;
}
- ui64 GetPartitionId(TConstArrayRef<TCell> key) const override {
+ ui64 GetPartitionId(const TChangeRecord& record) const override {
Y_VERIFY(KeyDesc);
Y_VERIFY(KeyDesc->Partitions);
- const auto range = TTableRange(key);
+ const auto range = TTableRange(record.GetKey());
Y_VERIFY(range.Point);
TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound(
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 3533a4997d..a526fd7dbe 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -5,12 +5,14 @@
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
+#include <library/cpp/digest/md5/md5.h>
#include <library/cpp/json/json_writer.h>
#include <ydb/core/persqueue/partition_key_range/partition_key_range.h>
#include <ydb/core/persqueue/writer/source_id_encoding.h>
#include <ydb/core/persqueue/writer/writer.h>
#include <ydb/core/protos/grpc_pq_old.pb.h>
+#include <ydb/services/lib/sharding/sharding.h>
namespace NKikimr {
namespace NDataShard {
@@ -18,7 +20,7 @@ namespace NDataShard {
using namespace NPQ;
class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderPartition> {
- static constexpr auto CodecRaw = 1;
+ static constexpr auto CodecRaw = 0;
TStringBuf GetLogPrefix() const {
if (!LogPrefix) {
@@ -616,28 +618,43 @@ class TCdcChangeSenderMain: public TActorBootstrapped<TCdcChangeSenderMain>
return KeyDesc && KeyDesc->Partitions;
}
- ui64 GetPartitionId(TConstArrayRef<TCell> key) const override {
+ ui64 GetPartitionId(const TChangeRecord& record) const override {
Y_VERIFY(KeyDesc);
Y_VERIFY(KeyDesc->Partitions);
- const auto range = TTableRange(key);
- Y_VERIFY(range.Point);
-
- TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound(
- KeyDesc->Partitions.begin(), KeyDesc->Partitions.end(), true,
- [&](const TKeyDesc::TPartitionInfo& partition, bool) {
- const int compares = CompareBorders<true, false>(
- partition.EndKeyPrefix.GetCells(), range.From,
- partition.IsInclusive || partition.IsPoint,
- range.InclusiveFrom || range.Point, KeyDesc->Schema
+ switch (Format) {
+ case NKikimrSchemeOp::ECdcStreamFormatProto: {
+ const auto range = TTableRange(record.GetKey());
+ Y_VERIFY(range.Point);
+
+ TVector<TKeyDesc::TPartitionInfo>::const_iterator it = LowerBound(
+ KeyDesc->Partitions.begin(), KeyDesc->Partitions.end(), true,
+ [&](const TKeyDesc::TPartitionInfo& partition, bool) {
+ const int compares = CompareBorders<true, false>(
+ partition.EndKeyPrefix.GetCells(), range.From,
+ partition.IsInclusive || partition.IsPoint,
+ range.InclusiveFrom || range.Point, KeyDesc->Schema
+ );
+
+ return (compares < 0);
+ }
);
- return (compares < 0);
+ Y_VERIFY(it != KeyDesc->Partitions.end());
+ return it->PartitionId;
+ }
+
+ case NKikimrSchemeOp::ECdcStreamFormatJson: {
+ using namespace NKikimr::NDataStreams::V1;
+ const auto hashKey = HexBytesToDecimal(MD5::Calc(record.GetPartitionKey()));
+ return ShardFromDecimal(hashKey, KeyDesc->Partitions.size());
}
- );
- Y_VERIFY(it != KeyDesc->Partitions.end());
- return it->PartitionId;
+ default: {
+ Y_FAIL_S("Unknown format"
+ << ": format# " << static_cast<int>(Format));
+ }
+ }
}
IActor* CreateSender(ui64 partitionId) override {
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.cpp b/ydb/core/tx/datashard/change_sender_common_ops.cpp
index ee8576248b..026c4033f2 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.cpp
+++ b/ydb/core/tx/datashard/change_sender_common_ops.cpp
@@ -115,7 +115,7 @@ void TBaseChangeSender::SendRecords() {
bool needToResolve = false;
while (it != PendingSent.end()) {
- const ui64 partitionId = Resolver->GetPartitionId(it->second.GetKey());
+ const ui64 partitionId = Resolver->GetPartitionId(it->second);
if (!Senders.contains(partitionId)) {
needToResolve = true;
++it;
diff --git a/ydb/core/tx/datashard/change_sender_common_ops.h b/ydb/core/tx/datashard/change_sender_common_ops.h
index e807a468aa..efff4a722e 100644
--- a/ydb/core/tx/datashard/change_sender_common_ops.h
+++ b/ydb/core/tx/datashard/change_sender_common_ops.h
@@ -75,7 +75,7 @@ public:
virtual void Resolve() = 0;
virtual bool IsResolving() const = 0;
virtual bool IsResolved() const = 0;
- virtual ui64 GetPartitionId(TConstArrayRef<TCell> key) const = 0;
+ virtual ui64 GetPartitionId(const TChangeRecord& record) const = 0;
};
class TBaseChangeSender: public IChangeSender {
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 3af5b4a2e5..5c98f3f605 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -1,5 +1,9 @@
#include "datashard_ut_common.h"
+#include <ydb/core/base/path.h>
+#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
+
#include <util/string/strip.h>
namespace NKikimr {
@@ -567,6 +571,314 @@ Y_UNIT_TEST_SUITE(AsyncIndexChangeExchange) {
opts.SetChangesQueueBytesLimit(1);
});
}
-}
+
+} // AsyncIndexChangeExchange
+
+Y_UNIT_TEST_SUITE(Cdc) {
+ using namespace NYdb::NPersQueue;
+ using namespace NYdb::NDataStreams::V1;
+
+ using TCdcStream = TShardedTableOptions::TCdcStream;
+
+ template <typename TDerived, typename TClient>
+ class TTestEnv {
+ public:
+ explicit TTestEnv(
+ const TShardedTableOptions& tableDesc,
+ const TCdcStream& streamDesc,
+ const TString& root = "Root",
+ const TString& tableName = "Table")
+ {
+ auto settings = TServerSettings(PortManager.GetPort(2134), {}, DefaultPQConfig())
+ .SetDomainName(root)
+ .SetGrpcPort(PortManager.GetPort(2135));
+
+ Server = new TServer(settings);
+ Server->EnableGRpc(settings.GrpcPort);
+
+ const auto database = JoinPath({root});
+ auto& runtime = *Server->GetRuntime();
+ EdgeActor = runtime.AllocateEdgeActor();
+
+ SetupLogging(runtime);
+ InitRoot(Server, EdgeActor);
+
+ CreateShardedTable(Server, EdgeActor, database, tableName, tableDesc);
+ WaitTxNotification(Server, EdgeActor, AsyncAlterAddStream(Server, database, tableName, streamDesc));
+
+ Client = TDerived::MakeClient(Server->GetDriver(), database);
+ }
+
+ TServer::TPtr GetServer() {
+ UNIT_ASSERT(Server);
+ return Server;
+ }
+
+ TClient& GetClient() {
+ UNIT_ASSERT(Client);
+ return *Client;
+ }
+
+ const TActorId& GetEdgeActor() const {
+ return EdgeActor;
+ }
+
+ private:
+ static NKikimrPQ::TPQConfig DefaultPQConfig() {
+ NKikimrPQ::TPQConfig pqConfig;
+ pqConfig.SetEnabled(true);
+ pqConfig.SetEnableProtoSourceIdInfo(true);
+ pqConfig.SetRoundRobinPartitionMapping(true);
+ pqConfig.SetTopicsAreFirstClassCitizen(true);
+ pqConfig.SetMaxReadCookies(10);
+ pqConfig.AddClientServiceType()->SetName("data-streams");
+ return pqConfig;
+ }
+
+ static void SetupLogging(TTestActorRuntime& runtime) {
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PQ_READ_PROXY, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::PQ_METACACHE, NLog::PRI_DEBUG);
+ }
+
+ private:
+ TPortManager PortManager;
+ TServer::TPtr Server;
+ TActorId EdgeActor;
+ THolder<TClient> Client;
+
+ }; // TTestEnv
+
+ class TTestPqEnv: public TTestEnv<TTestPqEnv, TPersQueueClient> {
+ public:
+ using TTestEnv<TTestPqEnv, TPersQueueClient>::TTestEnv;
+
+ static THolder<TPersQueueClient> MakeClient(const NYdb::TDriver& driver, const TString& database) {
+ return MakeHolder<TPersQueueClient>(driver, TPersQueueClientSettings().Database(database));
+ }
+ };
+
+ class TTestYdsEnv: public TTestEnv<TTestYdsEnv, TDataStreamsClient> {
+ public:
+ using TTestEnv<TTestYdsEnv, TDataStreamsClient>::TTestEnv;
+
+ static THolder<TDataStreamsClient> MakeClient(const NYdb::TDriver& driver, const TString& database) {
+ return MakeHolder<TDataStreamsClient>(driver, NYdb::TCommonClientSettings().Database(database));
+ }
+ };
+
+ TShardedTableOptions SimpleTable() {
+ return TShardedTableOptions()
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false},
+ });
+ }
+
+ TCdcStream KeysOnly(NKikimrSchemeOp::ECdcStreamFormat format) {
+ return TCdcStream{
+ .Name = "Stream",
+ .Mode = NKikimrSchemeOp::ECdcStreamModeKeysOnly,
+ .Format = format,
+ };
+ }
+
+ TCdcStream Updates(NKikimrSchemeOp::ECdcStreamFormat format) {
+ return TCdcStream{
+ .Name = "Stream",
+ .Mode = NKikimrSchemeOp::ECdcStreamModeUpdate,
+ .Format = format,
+ };
+ }
+
+ TCdcStream NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormat format) {
+ return TCdcStream{
+ .Name = "Stream",
+ .Mode = NKikimrSchemeOp::ECdcStreamModeNewAndOldImages,
+ .Format = format,
+ };
+ }
+
+ struct PqRunner {
+ static void Run(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
+ const TVector<TString>& queries, const TVector<std::pair<TString, TString>>& records)
+ {
+ TTestPqEnv env(tableDesc, streamDesc);
+
+ for (const auto& query : queries) {
+ ExecSQL(env.GetServer(), env.GetEdgeActor(), query);
+ }
+
+ auto& client = env.GetClient();
+
+ // add consumer
+ {
+ auto res = client.AddReadRule("/Root/Table/Stream",
+ TAddReadRuleSettings().ReadRule(TReadRuleSettings().ConsumerName("user"))).ExtractValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ }
+
+ // get records
+ auto reader = client.CreateReadSession(TReadSessionSettings()
+ .AppendTopics(TString("/Root/Table/Stream"))
+ .ConsumerName("user")
+ .DisableClusterDiscovery(true)
+ );
+
+ ui32 reads = 0;
+ while (reads < records.size()) {
+ auto ev = reader->GetEvent(true);
+ UNIT_ASSERT(ev);
+
+ if (auto* data = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*ev)) {
+ for (const auto& item : data->GetMessages()) {
+ const auto& record = records.at(reads++);
+ UNIT_ASSERT_VALUES_EQUAL(record.first, item.GetPartitionKey());
+ UNIT_ASSERT_VALUES_EQUAL(record.second, item.GetData());
+ }
+ } else if (auto* create = std::get_if<TReadSessionEvent::TCreatePartitionStreamEvent>(&*ev)) {
+ create->Confirm();
+ } else if (auto* destroy = std::get_if<TReadSessionEvent::TDestroyPartitionStreamEvent>(&*ev)) {
+ destroy->Confirm();
+ } else if (std::get_if<TSessionClosedEvent>(&*ev)) {
+ break;
+ }
+ }
+
+ // remove consumer
+ {
+ auto res = client.RemoveReadRule("/Root/Table/Stream",
+ TRemoveReadRuleSettings().ConsumerName("user")).ExtractValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ }
+ }
+ };
+
+ struct YdsRunner {
+ static void Run(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
+ const TVector<TString>& queries, const TVector<std::pair<TString, TString>>& records)
+ {
+ TTestYdsEnv env(tableDesc, streamDesc);
+
+ for (const auto& query : queries) {
+ ExecSQL(env.GetServer(), env.GetEdgeActor(), query);
+ }
+
+ auto& client = env.GetClient();
+
+ // add consumer
+ {
+ auto res = client.RegisterStreamConsumer("/Root/Table/Stream", "user").ExtractValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ }
+
+ // get shards
+ TString shardId;
+ {
+ auto res = client.ListShards("/Root/Table/Stream", {}).ExtractValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(res.GetResult().shards().size(), 1);
+ shardId = res.GetResult().shards().begin()->shard_id();
+ }
+
+ // get iterator
+ TString shardIt;
+ {
+ auto res = client.GetShardIterator("/Root/Table/Stream", shardId, Ydb::DataStreams::V1::ShardIteratorType::TRIM_HORIZON).ExtractValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ shardIt = res.GetResult().shard_iterator();
+ }
+
+ // get records
+ {
+ auto res = client.GetRecords(shardIt).ExtractValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(res.GetResult().records().size(), records.size());
+
+ for (ui32 i = 0; i < records.size(); ++i) {
+ const auto& actual = res.GetResult().records().at(i);
+ const auto& expected = records.at(i);
+ UNIT_ASSERT_VALUES_EQUAL(actual.partition_key(), expected.first);
+ UNIT_ASSERT_VALUES_EQUAL(actual.data(), expected.second);
+ }
+ }
+
+ // remove consumer
+ {
+ auto res = client.DeregisterStreamConsumer("/Root/Table/Stream", "user").ExtractValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ }
+ }
+ };
+
+ #define Y_UNIT_TEST_TWIN(N, VAR1, VAR2) \
+ template<typename TRunner> void N(NUnitTest::TTestContext&); \
+ struct TTestRegistration##N { \
+ TTestRegistration##N() { \
+ TCurrentTest::AddTest(#N "[" #VAR1 "]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<VAR1>), false); \
+ TCurrentTest::AddTest(#N "[" #VAR2 "]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<VAR2>), false); \
+ } \
+ }; \
+ static TTestRegistration##N testRegistration##N; \
+ template<typename TRunner> \
+ void N(NUnitTest::TTestContext&)
+
+ Y_UNIT_TEST_TWIN(KeysOnlyLog, PqRunner, YdsRunner) {
+ TRunner::Run(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson), {R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )", R"(
+ DELETE FROM `/Root/Table` WHERE key = 1;
+ )"}, {
+ {"[1]", R"({"update":{}})"},
+ {"[2]", R"({"update":{}})"},
+ {"[3]", R"({"update":{}})"},
+ {"[1]", R"({"erase":{}})"},
+ });
+ }
+
+ Y_UNIT_TEST_TWIN(UpdatesLog, PqRunner, YdsRunner) {
+ TRunner::Run(SimpleTable(), Updates(NKikimrSchemeOp::ECdcStreamFormatJson), {R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )", R"(
+ DELETE FROM `/Root/Table` WHERE key = 1;
+ )"}, {
+ {"[1]", R"({"update":{"value":10}})"},
+ {"[2]", R"({"update":{"value":20}})"},
+ {"[3]", R"({"update":{"value":30}})"},
+ {"[1]", R"({"erase":{}})"},
+ });
+ }
+
+ Y_UNIT_TEST_TWIN(NewAndOldImagesLog, PqRunner, YdsRunner) {
+ TRunner::Run(SimpleTable(), NewAndOldImages(NKikimrSchemeOp::ECdcStreamFormatJson), {R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 10),
+ (2, 20),
+ (3, 30);
+ )", R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 100),
+ (2, 200),
+ (3, 300);
+ )", R"(
+ DELETE FROM `/Root/Table` WHERE key = 1;
+ )"}, {
+ {"[1]", R"({"update":{},"newImage":{"value":10}})"},
+ {"[2]", R"({"update":{},"newImage":{"value":20}})"},
+ {"[3]", R"({"update":{},"newImage":{"value":30}})"},
+ {"[1]", R"({"update":{},"newImage":{"value":100},"oldImage":{"value":10}})"},
+ {"[2]", R"({"update":{},"newImage":{"value":200},"oldImage":{"value":20}})"},
+ {"[3]", R"({"update":{},"newImage":{"value":300},"oldImage":{"value":30}})"},
+ {"[1]", R"({"erase":{},"oldImage":{"value":100}})"},
+ });
+ }
+
+} // Cdc
} // NKikimr
diff --git a/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.darwin.txt b/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.darwin.txt
index 179d5251dc..8fabff31b6 100644
--- a/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.darwin.txt
+++ b/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.darwin.txt
@@ -28,6 +28,8 @@ target_link_libraries(ydb-core-tx-datashard-ut_change_exchange PUBLIC
ydb-core-tx
udf-service-exception_policy
public-lib-yson_value
+ cpp-client-ydb_datastreams
+ cpp-client-ydb_persqueue_public
cpp-client-ydb_result
)
target_sources(ydb-core-tx-datashard-ut_change_exchange PRIVATE
diff --git a/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux.txt b/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux.txt
index abb32cd8af..267d3ce3fc 100644
--- a/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux.txt
+++ b/ydb/core/tx/datashard/ut_change_exchange/CMakeLists.linux.txt
@@ -29,6 +29,8 @@ target_link_libraries(ydb-core-tx-datashard-ut_change_exchange PUBLIC
ydb-core-tx
udf-service-exception_policy
public-lib-yson_value
+ cpp-client-ydb_datastreams
+ cpp-client-ydb_persqueue_public
cpp-client-ydb_result
)
target_sources(ydb-core-tx-datashard-ut_change_exchange PRIVATE
diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp
index 708f62362f..cf0ffbe726 100644
--- a/ydb/core/tx/scheme_board/cache.cpp
+++ b/ydb/core/tx/scheme_board/cache.cpp
@@ -825,8 +825,6 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
}
case NKikimrSchemeOp::EPathTypeTableIndex:
return true;
- case NKikimrSchemeOp::EPathTypeCdcStream:
- return true;
default:
return false;
}
diff --git a/ydb/core/tx/scheme_cache/scheme_cache.cpp b/ydb/core/tx/scheme_cache/scheme_cache.cpp
index 5cf9a0b0d5..2ccc4b31a1 100644
--- a/ydb/core/tx/scheme_cache/scheme_cache.cpp
+++ b/ydb/core/tx/scheme_cache/scheme_cache.cpp
@@ -72,6 +72,7 @@ TString TSchemeCacheNavigate::ToString(const NScheme::TTypeRegistry& typeRegistr
<< " ErrorCount: " << ErrorCount
<< " DatabaseName: " << DatabaseName
<< " DomainOwnerId: " << DomainOwnerId
+ << " Instant: " << Instant
<< " ResultSet [" << ResultSetToString(ResultSet, typeRegistry) << "]"
<< " }";
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
index 9263fbc929..9bf454440d 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_pq.cpp
@@ -421,8 +421,11 @@ public:
.IsResolved()
.NotDeleted()
.IsPQGroup()
- .NotUnderOperation()
- .IsCommonSensePath();
+ .NotUnderOperation();
+
+ if (!Transaction.GetAllowAccessToPrivatePaths()) {
+ checks.IsCommonSensePath();
+ }
if (!checks) {
TString explain = TStringBuilder() << "path fail checks"
@@ -452,7 +455,6 @@ public:
}
newTabletConfig = tabletConfig;
-
TPersQueueGroupInfo::TPtr alterData = ParseParams(context, &newTabletConfig, alter, errStr);
if (!alterData) {
result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
diff --git a/ydb/library/persqueue/topic_parser/topic_parser.h b/ydb/library/persqueue/topic_parser/topic_parser.h
index 6dd1cf10e1..758a7e9eb6 100644
--- a/ydb/library/persqueue/topic_parser/topic_parser.h
+++ b/ydb/library/persqueue/topic_parser/topic_parser.h
@@ -120,6 +120,10 @@ public:
CHECK_VALID_AND_RETURN(PrimaryFullPath);
}
+ void SetPrimaryPath(const TString& path) {
+ PrimaryFullPath = path;
+ }
+
TString GetSecondaryPath() {
Y_FAIL("UNIMPLEMENTED");
CHECK_VALID_AND_RETURN(SecondaryFullPath);
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index 9987c8d4c0..f7b26e8440 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -285,11 +285,10 @@ namespace NKikimr::NDataStreams::V1 {
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
) {
- Y_UNUSED(pqGroupDescription);
Y_UNUSED(selfInfo);
TString error;
- if (!ValidateShardsCount(*GetProtoRequest(), groupConfig, error)) {
+ if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, Ydb::PersQueue::ErrorCode::BAD_REQUEST, error, ctx);
}
@@ -326,11 +325,10 @@ namespace NKikimr::NDataStreams::V1 {
const NKikimrSchemeOp::TPersQueueGroupDescription& pqGroupDescription,
const NKikimrSchemeOp::TDirEntry& selfInfo
) {
- Y_UNUSED(pqGroupDescription);
Y_UNUSED(selfInfo);
TString error;
- if (!ValidateShardsCount(*GetProtoRequest(), groupConfig, error)
+ if (!ValidateShardsCount(*GetProtoRequest(), pqGroupDescription, error)
|| !ValidateWriteSpeedLimit(*GetProtoRequest(), error, ctx)
|| !ValidateRetentionPeriod(*GetProtoRequest(), groupConfig, Nothing(), error))
{
@@ -868,8 +866,8 @@ namespace NKikimr::NDataStreams::V1 {
//-----------------------------------------------------------------------------------------
- class TRegisterStreamConsumerActor : public TUpdateSchemeActor<TRegisterStreamConsumerActor, NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest> {
- using TBase = TUpdateSchemeActor<TRegisterStreamConsumerActor, TEvDataStreamsRegisterStreamConsumerRequest>;
+ class TRegisterStreamConsumerActor : public TUpdateSchemeActor<TRegisterStreamConsumerActor, NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest, true> {
+ using TBase = TUpdateSchemeActor<TRegisterStreamConsumerActor, TEvDataStreamsRegisterStreamConsumerRequest, true>;
public:
TRegisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest* request);
@@ -943,8 +941,8 @@ namespace NKikimr::NDataStreams::V1 {
//-----------------------------------------------------------------------------------------
- class TDeregisterStreamConsumerActor : public TUpdateSchemeActor<TDeregisterStreamConsumerActor, NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest> {
- using TBase = TUpdateSchemeActor<TDeregisterStreamConsumerActor, TEvDataStreamsDeregisterStreamConsumerRequest>;
+ class TDeregisterStreamConsumerActor : public TUpdateSchemeActor<TDeregisterStreamConsumerActor, NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest, true> {
+ using TBase = TUpdateSchemeActor<TDeregisterStreamConsumerActor, TEvDataStreamsDeregisterStreamConsumerRequest, true>;
public:
TDeregisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest* request);
@@ -1101,8 +1099,11 @@ namespace NKikimr::NDataStreams::V1 {
auto partitionId = partition.GetPartitionId();
TString shardName = GetShardName(partitionId);
if (shardName == ShardId) {
- TShardIterator it(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber);
- SendResponse(ctx, it);
+ if (topicInfo->ShowPrivatePath) {
+ SendResponse(ctx, TShardIterator::Cdc(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber));
+ } else {
+ SendResponse(ctx, TShardIterator::Common(StreamName, StreamName, partitionId, ReadTimestampMs, SequenceNumber));
+ }
return;
}
}
@@ -1187,7 +1188,7 @@ namespace NKikimr::NDataStreams::V1 {
TStringBuilder() << "Limit '" << Limit << "' is out of bounds [1; " << MAX_LIMIT << "]", ctx);
}
- SendDescribeProposeRequest(ctx);
+ SendDescribeProposeRequest(ctx, ShardIterator.IsCdcTopic());
Become(&TGetRecordsActor::StateWork);
}
@@ -1334,7 +1335,7 @@ namespace NKikimr::NDataStreams::V1 {
TShardIterator shardIterator(ShardIterator.GetStreamName(),
ShardIterator.GetStreamArn(),
ShardIterator.GetShardId(),
- timestamp, seqNo);
+ timestamp, seqNo, ShardIterator.GetKind());
result.set_next_shard_iterator(shardIterator.Serialize());
result.set_millis_behind_latest(millisBehindLatestMs);
diff --git a/ydb/services/datastreams/shard_iterator.h b/ydb/services/datastreams/shard_iterator.h
index 7b6ce9c5cd..626f5110b7 100644
--- a/ydb/services/datastreams/shard_iterator.h
+++ b/ydb/services/datastreams/shard_iterator.h
@@ -21,16 +21,29 @@ namespace NKikimr::NDataStreams::V1 {
}
TShardIterator(const TString& streamName, const TString& streamArn,
- ui32 shardId, ui64 readTimestamp, ui32 sequenceNumber) {
+ ui32 shardId, ui64 readTimestamp, ui32 sequenceNumber,
+ NKikimrPQ::TYdsShardIterator::ETopicKind kind = NKikimrPQ::TYdsShardIterator::KIND_COMMON) {
Proto.SetStreamName(streamName);
Proto.SetStreamArn(streamArn);
Proto.SetShardId(shardId);
Proto.SetReadTimestampMs(readTimestamp);
Proto.SetSequenceNumber(sequenceNumber);
Proto.SetCreationTimestampMs(TInstant::Now().MilliSeconds());
+ Proto.SetKind(kind);
Valid = true;
}
+ static TShardIterator Common(const TString& streamName, const TString& streamArn,
+ ui32 shardId, ui64 readTimestamp, ui32 sequenceNumber) {
+ return TShardIterator(streamName, streamArn, shardId, readTimestamp, sequenceNumber);
+ }
+
+ static TShardIterator Cdc(const TString& streamName, const TString& streamArn,
+ ui32 shardId, ui64 readTimestamp, ui32 sequenceNumber) {
+ return TShardIterator(streamName, streamArn, shardId, readTimestamp, sequenceNumber,
+ NKikimrPQ::TYdsShardIterator::KIND_CDC);
+ }
+
TString Serialize() const {
TString data;
bool result = Proto.SerializeToString(&data);
@@ -65,6 +78,14 @@ namespace NKikimr::NDataStreams::V1 {
Proto.GetCreationTimestampMs() < LIFETIME_MS;
}
+ NKikimrPQ::TYdsShardIterator::ETopicKind GetKind() const {
+ return Proto.GetKind();
+ }
+
+ bool IsCdcTopic() const {
+ return Proto.GetKind() == NKikimrPQ::TYdsShardIterator::KIND_CDC;
+ }
+
bool IsValid() const {
return Valid;
}
diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h
index b056a05e64..0b4214e7c8 100644
--- a/ydb/services/lib/actors/pq_schema_actor.h
+++ b/ydb/services/lib/actors/pq_schema_actor.h
@@ -58,22 +58,20 @@ namespace NKikimr::NGRpcProxy::V1 {
{
}
- void PrepareTopicPath(const NActors::TActorContext &ctx) { // ToDo !!
- TopicPath = NPersQueue::GetFullTopicPath(ctx, this->Request_->GetDatabaseName(), TopicPath);
- }
-
TString GetTopicPath(const NActors::TActorContext& ctx) {
- PrepareTopicPath(ctx);
- return TopicPath;
+ auto path = NPersQueue::GetFullTopicPath(ctx, this->Request_->GetDatabaseName(), TopicPath);
+ if (PrivateTopicName) {
+ path = JoinPath(ChildPath(NKikimr::SplitPath(path), *PrivateTopicName));
+ }
+ return path;
}
protected:
// TDerived must implement FillProposeRequest(TEvProposeTransaction&, const TActorContext& ctx, TString workingDir, TString name);
void SendProposeRequest(const NActors::TActorContext &ctx) {
- PrepareTopicPath(ctx);
std::pair <TString, TString> pathPair;
try {
- pathPair = NKikimr::NGRpcService::SplitPath(TopicPath);
+ pathPair = NKikimr::NGRpcService::SplitPath(GetTopicPath(ctx));
} catch (const std::exception &ex) {
this->Request_->RaiseIssue(NYql::ExceptionToIssue(ex));
return this->ReplyWithResult(Ydb::StatusIds::BAD_REQUEST, ctx);
@@ -103,15 +101,15 @@ namespace NKikimr::NGRpcProxy::V1 {
}
}
- void SendDescribeProposeRequest(const NActors::TActorContext& ctx) {
- PrepareTopicPath(ctx);
+ void SendDescribeProposeRequest(const NActors::TActorContext& ctx, bool showPrivate = false) {
auto navigateRequest = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
navigateRequest->DatabaseName = CanonizePath(this->Request_->GetDatabaseName().GetOrElse(""));
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
- entry.Path = NKikimr::SplitPath(TopicPath);
+ entry.Path = NKikimr::SplitPath(GetTopicPath(ctx));
entry.SyncVersion = true;
- entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTopic;
+ entry.ShowPrivatePath = showPrivate || PrivateTopicName.Defined();
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
navigateRequest->ResultSet.emplace_back(entry);
if (this->Request_->GetInternalToken().empty()) {
@@ -157,7 +155,13 @@ namespace NKikimr::NGRpcProxy::V1 {
switch (response.Status) {
case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: {
- if (!result->ResultSet.front().PQGroupInfo) {
+ if (!response.PQGroupInfo) {
+ if (response.Kind == NSchemeCache::TSchemeCacheNavigate::KindCdcStream) {
+ Y_VERIFY(response.ListNodeEntry->Children.size() == 1);
+ PrivateTopicName = response.ListNodeEntry->Children.at(0).Name;
+ return SendDescribeProposeRequest(ctx);
+ }
+
this->Request_->RaiseIssue(
FillIssue(
TStringBuilder() << "path '" << path << "' creation is not completed",
@@ -246,12 +250,13 @@ namespace NKikimr::NGRpcProxy::V1 {
private:
bool IsDead = false;
- TString TopicPath;
+ const TString TopicPath;
+ TMaybe<TString> PrivateTopicName;
};
//-----------------------------------------------------------------------------------
- template<class TDerived, class TRequest>
+ template<class TDerived, class TRequest, bool AllowAccessToPrivatePaths = false>
class TUpdateSchemeActor : public TPQGrpcSchemaBase<TDerived, TRequest> {
using TBase = TPQGrpcSchemaBase<TDerived, TRequest>;
@@ -269,12 +274,17 @@ namespace NKikimr::NGRpcProxy::V1 {
NKikimrSchemeOp::TModifyScheme& modifyScheme(*proposal.Record.MutableTransaction()->MutableModifyScheme());
modifyScheme.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup);
modifyScheme.SetWorkingDir(workingDir);
+ modifyScheme.SetAllowAccessToPrivatePaths(AllowAccessToPrivatePaths);
auto* config = modifyScheme.MutableAlterPersQueueGroup();
Y_VERIFY(response.Self);
Y_VERIFY(response.PQGroupInfo);
config->CopyFrom(response.PQGroupInfo->Description);
+ // keep previous values or set in ModifyPersqueueConfig
+ config->ClearTotalGroupCount();
+ config->MutablePQTabletConfig()->ClearPartitionKeySchema();
+
{
auto applyIf = modifyScheme.AddApplyIf();
applyIf->SetPathId(response.Self->Info.GetPathId());
diff --git a/ydb/services/persqueue_v1/grpc_pq_actor.h b/ydb/services/persqueue_v1/grpc_pq_actor.h
index 7c11732d8e..9372254485 100644
--- a/ydb/services/persqueue_v1/grpc_pq_actor.h
+++ b/ydb/services/persqueue_v1/grpc_pq_actor.h
@@ -708,7 +708,7 @@ private:
void CloseSession(const TString& errorReason, const PersQueue::ErrorCode::ErrorCode code, const TActorContext& ctx);
-
+ void DescribeTopics(const NActors::TActorContext& ctx, bool showPrivate = false);
bool ProcessTopicSchemeCacheResponse(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry,
THashMap<TString, TTopicHolder>::iterator topicsIter, const TActorContext& ctx);
void HandleClientSchemeCacheResponse(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx);
@@ -1163,8 +1163,8 @@ public:
};
-class TAddReadRuleActor : public TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest> {
- using TBase = TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest>;
+class TAddReadRuleActor : public TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest, true> {
+ using TBase = TUpdateSchemeActor<TAddReadRuleActor, TEvPQAddReadRuleRequest, true>;
public:
TAddReadRuleActor(NKikimr::NGRpcService::TEvPQAddReadRuleRequest *request);
@@ -1176,8 +1176,8 @@ public:
const NKikimrSchemeOp::TDirEntry& selfInfo);
};
-class TRemoveReadRuleActor : public TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest> {
- using TBase = TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest>;
+class TRemoveReadRuleActor : public TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest, true> {
+ using TBase = TUpdateSchemeActor<TRemoveReadRuleActor, TEvPQRemoveReadRuleRequest, true>;
public:
TRemoveReadRuleActor(NKikimr::NGRpcService::TEvPQRemoveReadRuleRequest* request);
diff --git a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
index d7209ef46b..d83ae00434 100644
--- a/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
+++ b/ydb/services/persqueue_v1/grpc_pq_read_actor.cpp
@@ -17,6 +17,7 @@
#include <library/cpp/protobuf/util/repeated_field_utils.h>
#include <library/cpp/containers/disjoint_interval_tree/disjoint_interval_tree.h>
+#include <util/string/join.h>
#include <util/string/strip.h>
#include <util/charset/utf8.h>
@@ -2630,8 +2631,8 @@ TReadInitAndAuthActor::TReadInitAndAuthActor(
, Counters(counters)
, LocalCluster(localCluster)
{
- for (const auto& t : topics) {
- Topics[t.first].TopicNameConverter = t.second;
+ for (const auto& [path, converter] : topics) {
+ Topics[path].TopicNameConverter = converter;
}
}
@@ -2642,18 +2643,24 @@ TReadInitAndAuthActor::~TReadInitAndAuthActor() = default;
void TReadInitAndAuthActor::Bootstrap(const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " auth for : " << ClientId);
Become(&TThis::StateFunc);
+ DoCheckACL = AppData(ctx)->PQConfig.GetCheckACL() && Token;
+ DescribeTopics(ctx);
+}
+
+void TReadInitAndAuthActor::DescribeTopics(const NActors::TActorContext& ctx, bool showPrivate) {
TVector<TString> topicNames;
- for (const auto& topic : Topics) {
- topicNames.emplace_back(topic.second.TopicNameConverter->GetPrimaryPath());
+ for (const auto& [_, holder] : Topics) {
+ topicNames.emplace_back(holder.TopicNameConverter->GetPrimaryPath());
}
- DoCheckACL = AppData(ctx)->PQConfig.GetCheckACL() && Token;
- ctx.Send(MetaCacheId, new TEvDescribeTopicsRequest(topicNames, true));
+
+ LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " describe topics: " << JoinSeq(", ", topicNames));
+ ctx.Send(MetaCacheId, new TEvDescribeTopicsRequest(topicNames, true, showPrivate));
}
void TReadInitAndAuthActor::Die(const TActorContext& ctx) {
- for (auto& t : Topics) {
- if (t.second.PipeClient)
- NTabletPipe::CloseClient(ctx, t.second.PipeClient);
+ for (auto& [_, holder] : Topics) {
+ if (holder.PipeClient)
+ NTabletPipe::CloseClient(ctx, holder.PipeClient);
}
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " auth is DEAD");
@@ -2695,12 +2702,26 @@ bool TReadInitAndAuthActor::ProcessTopicSchemeCacheResponse(
void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsResponse::TPtr& ev, const TActorContext& ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Handle describe topics response");
+
+ bool reDescribe = false;
for (const auto& entry : ev->Get()->Result->ResultSet) {
- auto processResult = ProcessMetaCacheTopicResponse(entry);
auto path = JoinPath(entry.Path);
auto it = Topics.find(path);
Y_VERIFY(it != Topics.end());
+ if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindCdcStream) {
+ Y_VERIFY(entry.ListNodeEntry->Children.size() == 1);
+ const auto& topic = entry.ListNodeEntry->Children.at(0);
+
+ it->second.TopicNameConverter->SetPrimaryPath(JoinPath(ChildPath(entry.Path, topic.Name)));
+ Topics[it->second.TopicNameConverter->GetPrimaryPath()] = it->second;
+ Topics.erase(it);
+
+ reDescribe = true;
+ continue;
+ }
+
+ auto processResult = ProcessMetaCacheTopicResponse(entry);
if (processResult.IsFatal) {
Topics.erase(it);
if (Topics.empty()) {
@@ -2711,14 +2732,21 @@ void TReadInitAndAuthActor::HandleTopicsDescribeResponse(TEvDescribeTopicsRespon
continue;
}
}
- if (!ProcessTopicSchemeCacheResponse(entry, it, ctx))
- return;
+ if (!ProcessTopicSchemeCacheResponse(entry, it, ctx)) {
+ return;
+ }
}
+
if (Topics.empty()) {
CloseSession("no topics found", PersQueue::ErrorCode::BAD_REQUEST, ctx);
return;
}
+
+ if (reDescribe) {
+ return DescribeTopics(ctx, true);
+ }
+
// ToDo[migration] - separate option - ?
bool doCheckClientAcl = DoCheckACL && !AppData(ctx)->PQConfig.GetTopicsAreFirstClassCitizen();
if (doCheckClientAcl) {
@@ -2812,9 +2840,9 @@ bool TReadInitAndAuthActor::CheckACLPermissionsForNavigate(
void TReadInitAndAuthActor::FinishInitialization(const TActorContext& ctx) {
TTopicTabletsPairs res;
- for (auto& t : Topics) {
+ for (auto& [_, holder] : Topics) {
res.emplace_back(decltype(res)::value_type({
- t.second.TopicNameConverter, t.second.TabletID, t.second.CloudId, t.second.DbId, t.second.FolderId
+ holder.TopicNameConverter, holder.TabletID, holder.CloudId, holder.DbId, holder.FolderId
}));
}
ctx.Send(ParentId, new TEvPQProxy::TEvAuthResultOk(std::move(res)));