diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-03-24 07:21:49 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-03-24 07:21:49 +0300 |
commit | c1b8a4c9133644d50c860cecf883ffd730198cc2 (patch) | |
tree | 8f83fd95b89e99e3adc8f1dedf2662a809d99e9b | |
parent | 3be02ee09a8438cf399fa05dc86e0f0e38f64328 (diff) | |
download | ydb-c1b8a4c9133644d50c860cecf883ffd730198cc2.tar.gz |
Changefeed attrs api
-rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 36 | ||||
-rw-r--r-- | ydb/core/ydb_convert/table_description.cpp | 34 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_table.proto | 4 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table.cpp | 27 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table.h | 6 |
5 files changed, 93 insertions, 14 deletions
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 6639b06a78..d4e0b1289e 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -2946,6 +2946,42 @@ Y_UNIT_TEST_SUITE(KqpScheme) { } } + Y_UNIT_TEST(ChangefeedAttributes) { + TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig())); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().ExtractValueSync().GetSession(); + + { + auto result = session.CreateTable("/Root/table", TTableBuilder() + .AddNullableColumn("Key", EPrimitiveType::Uint64) + .AddNullableColumn("Value", EPrimitiveType::String) + .SetPrimaryKeyColumn("Key") + .Build() + ).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + const auto changefeed = TChangefeedDescription("feed", EChangefeedMode::KeysOnly, EChangefeedFormat::Json) + .AddAttribute("key", "value"); + + { + auto result = session.AlterTable("/Root/table", TAlterTableSettings() + .AppendAddChangefeeds(changefeed) + ).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = session.DescribeTable("/Root/table").ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + const auto& changefeeds = result.GetTableDescription().GetChangefeedDescriptions(); + UNIT_ASSERT_VALUES_EQUAL(changefeeds.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(changefeeds.at(0), changefeed); + UNIT_ASSERT_VALUES_EQUAL(changefeeds.at(0).GetAttributes(), changefeed.GetAttributes()); + } + } + Y_UNIT_TEST(CreatedAt) { TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig())); auto scheme = NYdb::NScheme::TSchemeClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root")); diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index 3dd5129e03..7f74289944 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -429,6 +429,18 @@ bool FillIndexDescription(NKikimrSchemeOp::TIndexedTableCreationConfig& out, return true; } +template <typename TOutProto, typename TInProto> +void FillAttributesImpl(TOutProto& out, const TInProto& in) { + if (!in.UserAttributesSize()) { + return; + } + + auto& outAttrs = *out.mutable_attributes(); + for (const auto& inAttr : in.GetUserAttributes()) { + outAttrs[inAttr.GetKey()] = inAttr.GetValue(); + } +} + void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out, const NKikimrSchemeOp::TTableDescription& in) { @@ -467,6 +479,8 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out, default: break; } + + FillAttributesImpl(*changefeed, stream); } } @@ -509,6 +523,12 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out, out.SetState(NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan); } + for (const auto& [key, value] : in.attributes()) { + auto& attr = *out.AddUserAttributes(); + attr.SetKey(key); + attr.SetValue(value); + } + return true; } @@ -693,20 +713,6 @@ void FillColumnFamilies(Ydb::Table::CreateTableRequest& out, FillColumnFamiliesImpl(out, in); } -template <typename TYdbProto> -void FillAttributesImpl(TYdbProto& out, - const NKikimrSchemeOp::TPathDescription& in) { - - if (!in.UserAttributesSize()) { - return; - } - - auto& outAttrs = *out.mutable_attributes(); - for (const auto& inAttr : in.GetUserAttributes()) { - outAttrs[inAttr.GetKey()] = inAttr.GetValue(); - } -} - void FillAttributes(Ydb::Table::DescribeTableResult& out, const NKikimrSchemeOp::TPathDescription& in) { FillAttributesImpl(out, in); diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index 9405f11612..0f69b2d0fa 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -154,6 +154,8 @@ message Changefeed { bool virtual_timestamps = 5; // Initial scan will output the current state of the table first bool initial_scan = 6; + // Attributes. Total size is limited to 10 KB. + map<string, string> attributes = 7 [(map_key).length.range = {min: 1, max: 100}, (length).range = {min: 1, max: 4096}]; } message ChangefeedDescription { @@ -182,6 +184,8 @@ message ChangefeedDescription { State state = 4; // State of emitting of virtual timestamps along with data bool virtual_timestamps = 5; + // Attributes + map<string, string> attributes = 6; } message StoragePool { diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 1336a6605d..7c27ca7ea3 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -4262,6 +4262,21 @@ TChangefeedDescription& TChangefeedDescription::WithInitialScan() { return *this; } +TChangefeedDescription& TChangefeedDescription::AddAttribute(const TString& key, const TString& value) { + Attributes_[key] = value; + return *this; +} + +TChangefeedDescription& TChangefeedDescription::SetAttributes(const THashMap<TString, TString>& attrs) { + Attributes_ = attrs; + return *this; +} + +TChangefeedDescription& TChangefeedDescription::SetAttributes(THashMap<TString, TString>&& attrs) { + Attributes_ = std::move(attrs); + return *this; +} + const TString& TChangefeedDescription::GetName() const { return Name_; } @@ -4286,6 +4301,10 @@ bool TChangefeedDescription::GetInitialScan() const { return InitialScan_; } +const THashMap<TString, TString>& TChangefeedDescription::GetAttributes() const { + return Attributes_; +} + template <typename TProto> TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { EChangefeedMode mode; @@ -4342,6 +4361,10 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { } } + for (const auto& [key, value] : proto.attributes()) { + ret.Attributes_[key] = value; + } + return ret; } @@ -4383,6 +4406,10 @@ void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const { retention.set_seconds(RetentionPeriod_->Seconds()); retention.set_nanos(RetentionPeriod_->NanoSecondsOfSecond()); } + + for (const auto& [key, value] : Attributes_) { + (*proto.mutable_attributes())[key] = value; + } } TString TChangefeedDescription::ToString() const { diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index d2d900ea45..74c1a3e753 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -213,6 +213,10 @@ public: TChangefeedDescription& WithRetentionPeriod(const TDuration& value); // Initial scan will output the current state of the table first TChangefeedDescription& WithInitialScan(); + // Attributes + TChangefeedDescription& AddAttribute(const TString& key, const TString& value); + TChangefeedDescription& SetAttributes(const THashMap<TString, TString>& attrs); + TChangefeedDescription& SetAttributes(THashMap<TString, TString>&& attrs); const TString& GetName() const; EChangefeedMode GetMode() const; @@ -220,6 +224,7 @@ public: EChangefeedState GetState() const; bool GetVirtualTimestamps() const; bool GetInitialScan() const; + const THashMap<TString, TString>& GetAttributes() const; void SerializeTo(Ydb::Table::Changefeed& proto) const; TString ToString() const; @@ -240,6 +245,7 @@ private: bool VirtualTimestamps_ = false; std::optional<TDuration> RetentionPeriod_; bool InitialScan_ = false; + THashMap<TString, TString> Attributes_; }; bool operator==(const TChangefeedDescription& lhs, const TChangefeedDescription& rhs); |