diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-03-23 19:40:14 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-03-23 19:40:14 +0300 |
commit | ee59ba5f5ea78424777da7c44610bad45b16bada (patch) | |
tree | 5155ec97dae685e4ccf3ef1d194163b1267c3237 | |
parent | b0c73be20527d0e9658d4fff0bfaed641b5c5f28 (diff) | |
download | ydb-ee59ba5f5ea78424777da7c44610bad45b16bada.tar.gz |
Changefeed attrs core support
-rw-r--r-- | ydb/core/protos/flat_scheme_op.proto | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_path_describer.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_path_element.cpp | 10 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_path_element.h | 35 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_cdc_stream.cpp | 82 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp | 41 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ya.make | 1 |
12 files changed, 190 insertions, 3 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index eb3ed2e7cb..e4c1743c1b 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -707,6 +707,7 @@ message TCdcStreamDescription { optional NKikimrProto.TPathID PathId = 3; optional ECdcStreamState State = 4; optional uint64 SchemaVersion = 5; + repeated TUserAttribute UserAttributes = 8; } message TCreateCdcStream { diff --git a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt index 390da8bdb1..b63e3c58d2 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt @@ -82,6 +82,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC contrib-libs-protobuf cpp-deprecated-enum_codegen cpp-html-pcdata + library-cpp-json ydb-core-actorlib_impl ydb-core-audit ydb-core-base diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt index cbcd3fa759..dbe99b1f32 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt @@ -83,6 +83,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC contrib-libs-protobuf cpp-deprecated-enum_codegen cpp-html-pcdata + library-cpp-json ydb-core-actorlib_impl ydb-core-audit ydb-core-base diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt index cbcd3fa759..dbe99b1f32 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt @@ -83,6 +83,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC contrib-libs-protobuf cpp-deprecated-enum_codegen cpp-html-pcdata + library-cpp-json ydb-core-actorlib_impl ydb-core-audit ydb-core-base diff --git a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt index 12108caa6f..8a5eeb775c 100644 --- a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt @@ -82,6 +82,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC contrib-libs-protobuf cpp-deprecated-enum_codegen cpp-html-pcdata + library-cpp-json ydb-core-actorlib_impl ydb-core-audit ydb-core-base diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp index 31a6d5d6e7..9a38a3651b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp @@ -200,6 +200,14 @@ public: return result; } + TUserAttributes::TPtr userAttrs = new TUserAttributes(1); + if (!userAttrs->ApplyPatch(EUserAttributesOp::CreateChangefeed, streamDesc.GetUserAttributes(), errStr) || + !userAttrs->CheckLimits(errStr)) + { + result->SetError(NKikimrScheme::StatusInvalidParameter, errStr); + return result; + } + auto stream = TCdcStreamInfo::Create(streamDesc); Y_VERIFY(stream); @@ -213,6 +221,7 @@ public: context.DbChanges.PersistPath(pathId); context.DbChanges.PersistPath(tablePath.Base()->PathId); + context.DbChanges.PersistApplyUserAttrs(pathId); context.DbChanges.PersistAlterCdcStream(pathId); context.DbChanges.PersistTxState(OperationId); @@ -227,6 +236,7 @@ public: streamPath.Base()->CreateTxId = OperationId.GetTxId(); streamPath.Base()->LastTxId = OperationId.GetTxId(); streamPath.Base()->PathType = TPathElement::EPathType::EPathTypeCdcStream; + streamPath.Base()->UserAttrs->AlterData = userAttrs; context.SS->CdcStreams[pathId] = stream; context.SS->IncrementPathDbRefCount(pathId); diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index e61f0c85e5..0d74dda4c7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -1145,6 +1145,15 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name, PathIdFromPathId(pathId, desc.MutablePathId()); desc.SetState(info->State); desc.SetSchemaVersion(info->AlterVersion); + + Y_VERIFY(PathsById.contains(pathId)); + auto path = PathsById.at(pathId); + + for (const auto& [key, value] : path->UserAttrs->Attrs) { + auto& attr = *desc.AddUserAttributes(); + attr.SetKey(key); + attr.SetValue(value); + } } void TSchemeShard::DescribeSequence(const TPathId& pathId, const TString& name, diff --git a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp index 480c22777f..6cfc783fc1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp @@ -279,6 +279,9 @@ void TPathElement::ApplySpecialAttributes() { case EAttribute::DOCUMENT_API_VERSION: HandleAttributeValue(item.second, DocumentApiVersion); break; + case EAttribute::ASYNC_REPLICATION: + HandleAttributeValue(item.second, AsyncReplication); + break; default: break; } @@ -296,6 +299,13 @@ void TPathElement::HandleAttributeValue(const TString& value, ui64& target) { } } +void TPathElement::HandleAttributeValue(const TString& value, NJson::TJsonValue& target) { + NJson::TJsonValue parsed; + if (NJson::ReadJsonTree(value, &parsed)) { + target = std::move(parsed); + } +} + void TPathElement::ChangeVolumeSpaceBegin(TVolumeSpace newSpace, TVolumeSpace oldSpace) { auto update = [](TVolumeSpaceLimits& limits, ui64 newValue, ui64 oldValue) { if (newValue > oldValue) { diff --git a/ydb/core/tx/schemeshard/schemeshard_path_element.h b/ydb/core/tx/schemeshard/schemeshard_path_element.h index fb09051616..fe63399206 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_element.h +++ b/ydb/core/tx/schemeshard/schemeshard_path_element.h @@ -5,14 +5,15 @@ #include "schemeshard_user_attr_limits.h" #include <ydb/core/protos/flat_scheme_op.pb.h> +#include <ydb/core/util/yverify_stream.h> #include <ydb/library/aclib/aclib.h> +#include <library/cpp/json/json_reader.h> + #include <util/generic/map.h> #include <util/generic/ptr.h> #include <util/string/cast.h> -#include <ydb/core/util/yverify_stream.h> - namespace NKikimr { namespace NSchemeShard { @@ -26,6 +27,7 @@ constexpr TStringBuf ATTR_VOLUME_SPACE_LIMIT_SSD_NONREPL = "__volume_space_limit constexpr TStringBuf ATTR_VOLUME_SPACE_LIMIT_SSD_SYSTEM = "__volume_space_limit_ssd_system"; constexpr TStringBuf ATTR_EXTRA_PATH_SYMBOLS_ALLOWED = "__extra_path_symbols_allowed"; constexpr TStringBuf ATTR_DOCUMENT_API_VERSION = "__document_api_version"; +constexpr TStringBuf ATTR_ASYNC_REPLICATION = "__async_replication"; inline bool WeakCheck(char c) { // 33: ! " # $ % & ' ( ) * + , - . / @@ -58,6 +60,7 @@ enum class EAttribute { VOLUME_SPACE_LIMIT_SSD_NONREPL, DOCUMENT_API_VERSION, VOLUME_SPACE_LIMIT_SSD_SYSTEM, + ASYNC_REPLICATION, }; struct TVolumeSpace { @@ -81,6 +84,7 @@ enum class EUserAttributesOp { CreateSubDomain, CreateExtSubDomain, SyncUpdateTenants, + CreateChangefeed, }; struct TUserAttributes: TSimpleRefCount<TUserAttributes> { @@ -116,6 +120,7 @@ struct TUserAttributes: TSimpleRefCount<TUserAttributes> { HANDLE_ATTR(VOLUME_SPACE_LIMIT_SSD_SYSTEM); HANDLE_ATTR(EXTRA_PATH_SYMBOLS_ALLOWED); HANDLE_ATTR(DOCUMENT_API_VERSION); + HANDLE_ATTR(ASYNC_REPLICATION); #undef HANDLE_ATTR return EAttribute::UNKNOWN; } @@ -218,6 +223,12 @@ struct TUserAttributes: TSimpleRefCount<TUserAttributes> { return false; } return CheckAttributeUint64(name, value, errStr, /* minValue = */ 1); + case EAttribute::ASYNC_REPLICATION: + if (op != EUserAttributesOp::CreateChangefeed) { + errStr = Sprintf("UserAttributes: attribute '%s' can only be set during CreateChangefeed", name.c_str()); + return false; + } + return CheckAttributeJson(name, value, errStr); } Y_UNREACHABLE(); @@ -248,6 +259,12 @@ struct TUserAttributes: TSimpleRefCount<TUserAttributes> { return false; } return true; + case EAttribute::ASYNC_REPLICATION: + if (op != EUserAttributesOp::CreateChangefeed) { + errStr = Sprintf("UserAttributes: attribute '%s' can only be set during CreateChangefeed", name.c_str()); + return false; + } + return true; } Y_UNREACHABLE(); @@ -267,7 +284,7 @@ struct TUserAttributes: TSimpleRefCount<TUserAttributes> { if (!TryFromString(value, parsed)) { errStr = Sprintf("UserAttributes: attribute '%s' has invalid value '%s'", name.c_str(), value.c_str()); - return false; + return false; } if (parsed < minValue) { errStr = Sprintf("UserAttributes: attribute '%s' has invalid value '%s' < %" PRIu64, @@ -288,6 +305,16 @@ struct TUserAttributes: TSimpleRefCount<TUserAttributes> { errStr = Sprintf("UserAttributes::CheckLimits: unsupported attribute '%s'", item.first.c_str()); return true; } + + static bool CheckAttributeJson(const TString& name, const TString& value, TString& errStr) { + NJson::TJsonValue unused; + if (!NJson::ReadJsonTree(value, &unused)) { + errStr = Sprintf("UserAttributes: attribute '%s' has invalid value '%s'", + name.c_str(), value.c_str()); + return false; + } + return true; + } }; struct TPathElement : TSimpleRefCount<TPathElement> { @@ -334,6 +361,7 @@ struct TPathElement : TSimpleRefCount<TPathElement> { TVolumeSpaceLimits VolumeSpaceSSDNonrepl; TVolumeSpaceLimits VolumeSpaceSSDSystem; ui64 DocumentApiVersion = 0; + NJson::TJsonValue AsyncReplication; // Number of references to this path element in the database size_t DbRefCount = 0; @@ -399,6 +427,7 @@ public: void ApplySpecialAttributes(); void HandleAttributeValue(const TString& value, TString& target); void HandleAttributeValue(const TString& value, ui64& target); + void HandleAttributeValue(const TString& value, NJson::TJsonValue& target); void ChangeVolumeSpaceBegin(TVolumeSpace newSpace, TVolumeSpace oldSpace); void ChangeVolumeSpaceCommit(TVolumeSpace newSpace, TVolumeSpace oldSpace); bool CheckVolumeSpaceChange(TVolumeSpace newSpace, TVolumeSpace oldSpace, TString& errStr); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp index 619399646d..6e92c829e3 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp @@ -4,6 +4,9 @@ #include <ydb/core/tx/schemeshard/schemeshard_impl.h> #include <library/cpp/json/json_reader.h> +#include <library/cpp/json/json_writer.h> + +#include <util/string/escape.h> #include <util/string/printf.h> using namespace NKikimr; @@ -140,6 +143,85 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) { } } + Y_UNIT_TEST(Attributes) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); + ui64 txId = 100; + + TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + env.TestWaitNotification(runtime, txId); + + // user attr + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream1" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + UserAttributes { Key: "key" Value: "value" } + } + )"); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), { + [=](const NKikimrScheme::TEvDescribeSchemeResult& record) { + const auto& table = record.GetPathDescription().GetTable(); + UNIT_ASSERT_VALUES_EQUAL(table.CdcStreamsSize(), 1); + + const auto& stream = table.GetCdcStreams(0); + UNIT_ASSERT_VALUES_EQUAL(stream.UserAttributesSize(), 1); + + const auto& attr = stream.GetUserAttributes(0); + UNIT_ASSERT_VALUES_EQUAL(attr.GetKey(), "key"); + UNIT_ASSERT_VALUES_EQUAL(attr.GetValue(), "value"); + } + }); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream1"), { + NLs::UserAttrsHas({ + {"key", "value"}, + }) + }); + + // async replication attr + TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream2" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + UserAttributes { Key: "__async_replication" Value: "value" } + } + )", {NKikimrScheme::StatusInvalidParameter}); + + NJson::TJsonValue json; + json["id"] = "some-id"; + json["path"] = "/some/path"; + const auto jsonString = NJson::WriteJson(json, false); + + TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"( + TableName: "Table" + StreamDescription { + Name: "Stream2" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + UserAttributes { Key: "__async_replication" Value: "%s" } + } + )", EscapeC(jsonString).c_str())); + env.TestWaitNotification(runtime, txId); + + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream2"), { + NLs::UserAttrsHas({ + {"__async_replication", jsonString}, + }) + }); + } + Y_UNIT_TEST(Negative) { TTestBasicRuntime runtime; TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true)); diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp index a90a8b6705..62ba67cc9b 100644 --- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp +++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp @@ -302,4 +302,45 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) { }); } + Y_UNIT_TEST(Attributes) { + TTestWithReboots t; + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + { + TInactiveZone inactive(activeZone); + TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"( + Name: "Table" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Uint64" } + KeyColumnNames: ["key"] + )"); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + } + + auto request = CreateCdcStreamRequest(++t.TxId, "/MyRoot", R"( + TableName: "Table" + StreamDescription { + Name: "Stream" + Mode: ECdcStreamModeKeysOnly + Format: ECdcStreamFormatProto + UserAttributes { Key: "key" Value: "value" } + } + )"); + t.TestEnv->ReliablePropose(runtime, request, { + NKikimrScheme::StatusAccepted, + NKikimrScheme::StatusAlreadyExists, + NKikimrScheme::StatusMultipleModifications, + }); + t.TestEnv->TestWaitNotification(runtime, t.TxId); + + { + TInactiveZone inactive(activeZone); + TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), { + NLs::UserAttrsHas({ + {"key", "value"}, + }) + }); + } + }); + } + } // TCdcStreamWithRebootsTests diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 816120aecb..4de0cbf7b3 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -233,6 +233,7 @@ PEERDIR( contrib/libs/protobuf library/cpp/deprecated/enum_codegen library/cpp/html/pcdata + library/cpp/json ydb/core/actorlib_impl ydb/core/audit ydb/core/base |