aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-03-23 19:40:14 +0300
committerilnaz <ilnaz@ydb.tech>2023-03-23 19:40:14 +0300
commitee59ba5f5ea78424777da7c44610bad45b16bada (patch)
tree5155ec97dae685e4ccf3ef1d194163b1267c3237
parentb0c73be20527d0e9658d4fff0bfaed641b5c5f28 (diff)
downloadydb-ee59ba5f5ea78424777da7c44610bad45b16bada.tar.gz
Changefeed attrs core support
-rw-r--r--ydb/core/protos/flat_scheme_op.proto1
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp9
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_element.cpp10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_element.h35
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream.cpp82
-rw-r--r--ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp41
-rw-r--r--ydb/core/tx/schemeshard/ya.make1
12 files changed, 190 insertions, 3 deletions
diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto
index eb3ed2e7cb..e4c1743c1b 100644
--- a/ydb/core/protos/flat_scheme_op.proto
+++ b/ydb/core/protos/flat_scheme_op.proto
@@ -707,6 +707,7 @@ message TCdcStreamDescription {
optional NKikimrProto.TPathID PathId = 3;
optional ECdcStreamState State = 4;
optional uint64 SchemaVersion = 5;
+ repeated TUserAttribute UserAttributes = 8;
}
message TCreateCdcStream {
diff --git a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt
index 390da8bdb1..b63e3c58d2 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt
@@ -82,6 +82,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC
contrib-libs-protobuf
cpp-deprecated-enum_codegen
cpp-html-pcdata
+ library-cpp-json
ydb-core-actorlib_impl
ydb-core-audit
ydb-core-base
diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt
index cbcd3fa759..dbe99b1f32 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt
@@ -83,6 +83,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC
contrib-libs-protobuf
cpp-deprecated-enum_codegen
cpp-html-pcdata
+ library-cpp-json
ydb-core-actorlib_impl
ydb-core-audit
ydb-core-base
diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt
index cbcd3fa759..dbe99b1f32 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt
@@ -83,6 +83,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC
contrib-libs-protobuf
cpp-deprecated-enum_codegen
cpp-html-pcdata
+ library-cpp-json
ydb-core-actorlib_impl
ydb-core-audit
ydb-core-base
diff --git a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt
index 12108caa6f..8a5eeb775c 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt
@@ -82,6 +82,7 @@ target_link_libraries(core-tx-schemeshard PUBLIC
contrib-libs-protobuf
cpp-deprecated-enum_codegen
cpp-html-pcdata
+ library-cpp-json
ydb-core-actorlib_impl
ydb-core-audit
ydb-core-base
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
index 31a6d5d6e7..9a38a3651b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_cdc_stream.cpp
@@ -200,6 +200,14 @@ public:
return result;
}
+ TUserAttributes::TPtr userAttrs = new TUserAttributes(1);
+ if (!userAttrs->ApplyPatch(EUserAttributesOp::CreateChangefeed, streamDesc.GetUserAttributes(), errStr) ||
+ !userAttrs->CheckLimits(errStr))
+ {
+ result->SetError(NKikimrScheme::StatusInvalidParameter, errStr);
+ return result;
+ }
+
auto stream = TCdcStreamInfo::Create(streamDesc);
Y_VERIFY(stream);
@@ -213,6 +221,7 @@ public:
context.DbChanges.PersistPath(pathId);
context.DbChanges.PersistPath(tablePath.Base()->PathId);
+ context.DbChanges.PersistApplyUserAttrs(pathId);
context.DbChanges.PersistAlterCdcStream(pathId);
context.DbChanges.PersistTxState(OperationId);
@@ -227,6 +236,7 @@ public:
streamPath.Base()->CreateTxId = OperationId.GetTxId();
streamPath.Base()->LastTxId = OperationId.GetTxId();
streamPath.Base()->PathType = TPathElement::EPathType::EPathTypeCdcStream;
+ streamPath.Base()->UserAttrs->AlterData = userAttrs;
context.SS->CdcStreams[pathId] = stream;
context.SS->IncrementPathDbRefCount(pathId);
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index e61f0c85e5..0d74dda4c7 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -1145,6 +1145,15 @@ void TSchemeShard::DescribeCdcStream(const TPathId& pathId, const TString& name,
PathIdFromPathId(pathId, desc.MutablePathId());
desc.SetState(info->State);
desc.SetSchemaVersion(info->AlterVersion);
+
+ Y_VERIFY(PathsById.contains(pathId));
+ auto path = PathsById.at(pathId);
+
+ for (const auto& [key, value] : path->UserAttrs->Attrs) {
+ auto& attr = *desc.AddUserAttributes();
+ attr.SetKey(key);
+ attr.SetValue(value);
+ }
}
void TSchemeShard::DescribeSequence(const TPathId& pathId, const TString& name,
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp
index 480c22777f..6cfc783fc1 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_element.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_element.cpp
@@ -279,6 +279,9 @@ void TPathElement::ApplySpecialAttributes() {
case EAttribute::DOCUMENT_API_VERSION:
HandleAttributeValue(item.second, DocumentApiVersion);
break;
+ case EAttribute::ASYNC_REPLICATION:
+ HandleAttributeValue(item.second, AsyncReplication);
+ break;
default:
break;
}
@@ -296,6 +299,13 @@ void TPathElement::HandleAttributeValue(const TString& value, ui64& target) {
}
}
+void TPathElement::HandleAttributeValue(const TString& value, NJson::TJsonValue& target) {
+ NJson::TJsonValue parsed;
+ if (NJson::ReadJsonTree(value, &parsed)) {
+ target = std::move(parsed);
+ }
+}
+
void TPathElement::ChangeVolumeSpaceBegin(TVolumeSpace newSpace, TVolumeSpace oldSpace) {
auto update = [](TVolumeSpaceLimits& limits, ui64 newValue, ui64 oldValue) {
if (newValue > oldValue) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_element.h b/ydb/core/tx/schemeshard/schemeshard_path_element.h
index fb09051616..fe63399206 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_element.h
+++ b/ydb/core/tx/schemeshard/schemeshard_path_element.h
@@ -5,14 +5,15 @@
#include "schemeshard_user_attr_limits.h"
#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <ydb/core/util/yverify_stream.h>
#include <ydb/library/aclib/aclib.h>
+#include <library/cpp/json/json_reader.h>
+
#include <util/generic/map.h>
#include <util/generic/ptr.h>
#include <util/string/cast.h>
-#include <ydb/core/util/yverify_stream.h>
-
namespace NKikimr {
namespace NSchemeShard {
@@ -26,6 +27,7 @@ constexpr TStringBuf ATTR_VOLUME_SPACE_LIMIT_SSD_NONREPL = "__volume_space_limit
constexpr TStringBuf ATTR_VOLUME_SPACE_LIMIT_SSD_SYSTEM = "__volume_space_limit_ssd_system";
constexpr TStringBuf ATTR_EXTRA_PATH_SYMBOLS_ALLOWED = "__extra_path_symbols_allowed";
constexpr TStringBuf ATTR_DOCUMENT_API_VERSION = "__document_api_version";
+constexpr TStringBuf ATTR_ASYNC_REPLICATION = "__async_replication";
inline bool WeakCheck(char c) {
// 33: ! " # $ % & ' ( ) * + , - . /
@@ -58,6 +60,7 @@ enum class EAttribute {
VOLUME_SPACE_LIMIT_SSD_NONREPL,
DOCUMENT_API_VERSION,
VOLUME_SPACE_LIMIT_SSD_SYSTEM,
+ ASYNC_REPLICATION,
};
struct TVolumeSpace {
@@ -81,6 +84,7 @@ enum class EUserAttributesOp {
CreateSubDomain,
CreateExtSubDomain,
SyncUpdateTenants,
+ CreateChangefeed,
};
struct TUserAttributes: TSimpleRefCount<TUserAttributes> {
@@ -116,6 +120,7 @@ struct TUserAttributes: TSimpleRefCount<TUserAttributes> {
HANDLE_ATTR(VOLUME_SPACE_LIMIT_SSD_SYSTEM);
HANDLE_ATTR(EXTRA_PATH_SYMBOLS_ALLOWED);
HANDLE_ATTR(DOCUMENT_API_VERSION);
+ HANDLE_ATTR(ASYNC_REPLICATION);
#undef HANDLE_ATTR
return EAttribute::UNKNOWN;
}
@@ -218,6 +223,12 @@ struct TUserAttributes: TSimpleRefCount<TUserAttributes> {
return false;
}
return CheckAttributeUint64(name, value, errStr, /* minValue = */ 1);
+ case EAttribute::ASYNC_REPLICATION:
+ if (op != EUserAttributesOp::CreateChangefeed) {
+ errStr = Sprintf("UserAttributes: attribute '%s' can only be set during CreateChangefeed", name.c_str());
+ return false;
+ }
+ return CheckAttributeJson(name, value, errStr);
}
Y_UNREACHABLE();
@@ -248,6 +259,12 @@ struct TUserAttributes: TSimpleRefCount<TUserAttributes> {
return false;
}
return true;
+ case EAttribute::ASYNC_REPLICATION:
+ if (op != EUserAttributesOp::CreateChangefeed) {
+ errStr = Sprintf("UserAttributes: attribute '%s' can only be set during CreateChangefeed", name.c_str());
+ return false;
+ }
+ return true;
}
Y_UNREACHABLE();
@@ -267,7 +284,7 @@ struct TUserAttributes: TSimpleRefCount<TUserAttributes> {
if (!TryFromString(value, parsed)) {
errStr = Sprintf("UserAttributes: attribute '%s' has invalid value '%s'",
name.c_str(), value.c_str());
- return false;
+ return false;
}
if (parsed < minValue) {
errStr = Sprintf("UserAttributes: attribute '%s' has invalid value '%s' < %" PRIu64,
@@ -288,6 +305,16 @@ struct TUserAttributes: TSimpleRefCount<TUserAttributes> {
errStr = Sprintf("UserAttributes::CheckLimits: unsupported attribute '%s'", item.first.c_str());
return true;
}
+
+ static bool CheckAttributeJson(const TString& name, const TString& value, TString& errStr) {
+ NJson::TJsonValue unused;
+ if (!NJson::ReadJsonTree(value, &unused)) {
+ errStr = Sprintf("UserAttributes: attribute '%s' has invalid value '%s'",
+ name.c_str(), value.c_str());
+ return false;
+ }
+ return true;
+ }
};
struct TPathElement : TSimpleRefCount<TPathElement> {
@@ -334,6 +361,7 @@ struct TPathElement : TSimpleRefCount<TPathElement> {
TVolumeSpaceLimits VolumeSpaceSSDNonrepl;
TVolumeSpaceLimits VolumeSpaceSSDSystem;
ui64 DocumentApiVersion = 0;
+ NJson::TJsonValue AsyncReplication;
// Number of references to this path element in the database
size_t DbRefCount = 0;
@@ -399,6 +427,7 @@ public:
void ApplySpecialAttributes();
void HandleAttributeValue(const TString& value, TString& target);
void HandleAttributeValue(const TString& value, ui64& target);
+ void HandleAttributeValue(const TString& value, NJson::TJsonValue& target);
void ChangeVolumeSpaceBegin(TVolumeSpace newSpace, TVolumeSpace oldSpace);
void ChangeVolumeSpaceCommit(TVolumeSpace newSpace, TVolumeSpace oldSpace);
bool CheckVolumeSpaceChange(TVolumeSpace newSpace, TVolumeSpace oldSpace, TString& errStr);
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
index 619399646d..6e92c829e3 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream.cpp
@@ -4,6 +4,9 @@
#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
#include <library/cpp/json/json_reader.h>
+#include <library/cpp/json/json_writer.h>
+
+#include <util/string/escape.h>
#include <util/string/printf.h>
using namespace NKikimr;
@@ -140,6 +143,85 @@ Y_UNIT_TEST_SUITE(TCdcStreamTests) {
}
}
+ Y_UNIT_TEST(Attributes) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
+ ui64 txId = 100;
+
+ TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ // user attr
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream1"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ UserAttributes { Key: "key" Value: "value" }
+ }
+ )");
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePath(runtime, "/MyRoot/Table"), {
+ [=](const NKikimrScheme::TEvDescribeSchemeResult& record) {
+ const auto& table = record.GetPathDescription().GetTable();
+ UNIT_ASSERT_VALUES_EQUAL(table.CdcStreamsSize(), 1);
+
+ const auto& stream = table.GetCdcStreams(0);
+ UNIT_ASSERT_VALUES_EQUAL(stream.UserAttributesSize(), 1);
+
+ const auto& attr = stream.GetUserAttributes(0);
+ UNIT_ASSERT_VALUES_EQUAL(attr.GetKey(), "key");
+ UNIT_ASSERT_VALUES_EQUAL(attr.GetValue(), "value");
+ }
+ });
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream1"), {
+ NLs::UserAttrsHas({
+ {"key", "value"},
+ })
+ });
+
+ // async replication attr
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream2"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ UserAttributes { Key: "__async_replication" Value: "value" }
+ }
+ )", {NKikimrScheme::StatusInvalidParameter});
+
+ NJson::TJsonValue json;
+ json["id"] = "some-id";
+ json["path"] = "/some/path";
+ const auto jsonString = NJson::WriteJson(json, false);
+
+ TestCreateCdcStream(runtime, ++txId, "/MyRoot", Sprintf(R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream2"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ UserAttributes { Key: "__async_replication" Value: "%s" }
+ }
+ )", EscapeC(jsonString).c_str()));
+ env.TestWaitNotification(runtime, txId);
+
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream2"), {
+ NLs::UserAttrsHas({
+ {"__async_replication", jsonString},
+ })
+ });
+ }
+
Y_UNIT_TEST(Negative) {
TTestBasicRuntime runtime;
TTestEnv env(runtime, TTestEnvOptions().EnableProtoSourceIdInfo(true));
diff --git a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
index a90a8b6705..62ba67cc9b 100644
--- a/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
+++ b/ydb/core/tx/schemeshard/ut_cdc_stream_reboots.cpp
@@ -302,4 +302,45 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
});
}
+ Y_UNIT_TEST(Attributes) {
+ TTestWithReboots t;
+ t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
+ {
+ TInactiveZone inactive(activeZone);
+ TestCreateTable(runtime, ++t.TxId, "/MyRoot", R"(
+ Name: "Table"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Uint64" }
+ KeyColumnNames: ["key"]
+ )");
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+ }
+
+ auto request = CreateCdcStreamRequest(++t.TxId, "/MyRoot", R"(
+ TableName: "Table"
+ StreamDescription {
+ Name: "Stream"
+ Mode: ECdcStreamModeKeysOnly
+ Format: ECdcStreamFormatProto
+ UserAttributes { Key: "key" Value: "value" }
+ }
+ )");
+ t.TestEnv->ReliablePropose(runtime, request, {
+ NKikimrScheme::StatusAccepted,
+ NKikimrScheme::StatusAlreadyExists,
+ NKikimrScheme::StatusMultipleModifications,
+ });
+ t.TestEnv->TestWaitNotification(runtime, t.TxId);
+
+ {
+ TInactiveZone inactive(activeZone);
+ TestDescribeResult(DescribePrivatePath(runtime, "/MyRoot/Table/Stream"), {
+ NLs::UserAttrsHas({
+ {"key", "value"},
+ })
+ });
+ }
+ });
+ }
+
} // TCdcStreamWithRebootsTests
diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make
index 816120aecb..4de0cbf7b3 100644
--- a/ydb/core/tx/schemeshard/ya.make
+++ b/ydb/core/tx/schemeshard/ya.make
@@ -233,6 +233,7 @@ PEERDIR(
contrib/libs/protobuf
library/cpp/deprecated/enum_codegen
library/cpp/html/pcdata
+ library/cpp/json
ydb/core/actorlib_impl
ydb/core/audit
ydb/core/base