aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-03-24 07:21:49 +0300
committerilnaz <ilnaz@ydb.tech>2023-03-24 07:21:49 +0300
commitc1b8a4c9133644d50c860cecf883ffd730198cc2 (patch)
tree8f83fd95b89e99e3adc8f1dedf2662a809d99e9b
parent3be02ee09a8438cf399fa05dc86e0f0e38f64328 (diff)
downloadydb-c1b8a4c9133644d50c860cecf883ffd730198cc2.tar.gz
Changefeed attrs api
-rw-r--r--ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp36
-rw-r--r--ydb/core/ydb_convert/table_description.cpp34
-rw-r--r--ydb/public/api/protos/ydb_table.proto4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp27
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.h6
5 files changed, 93 insertions, 14 deletions
diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
index 6639b06a78..d4e0b1289e 100644
--- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
+++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
@@ -2946,6 +2946,42 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
}
}
+ Y_UNIT_TEST(ChangefeedAttributes) {
+ TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().ExtractValueSync().GetSession();
+
+ {
+ auto result = session.CreateTable("/Root/table", TTableBuilder()
+ .AddNullableColumn("Key", EPrimitiveType::Uint64)
+ .AddNullableColumn("Value", EPrimitiveType::String)
+ .SetPrimaryKeyColumn("Key")
+ .Build()
+ ).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ const auto changefeed = TChangefeedDescription("feed", EChangefeedMode::KeysOnly, EChangefeedFormat::Json)
+ .AddAttribute("key", "value");
+
+ {
+ auto result = session.AlterTable("/Root/table", TAlterTableSettings()
+ .AppendAddChangefeeds(changefeed)
+ ).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = session.DescribeTable("/Root/table").ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+
+ const auto& changefeeds = result.GetTableDescription().GetChangefeedDescriptions();
+ UNIT_ASSERT_VALUES_EQUAL(changefeeds.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(changefeeds.at(0), changefeed);
+ UNIT_ASSERT_VALUES_EQUAL(changefeeds.at(0).GetAttributes(), changefeed.GetAttributes());
+ }
+ }
+
Y_UNIT_TEST(CreatedAt) {
TKikimrRunner kikimr(TKikimrSettings().SetPQConfig(DefaultPQConfig()));
auto scheme = NYdb::NScheme::TSchemeClient(kikimr.GetDriver(), TCommonClientSettings().Database("/Root"));
diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp
index 3dd5129e03..7f74289944 100644
--- a/ydb/core/ydb_convert/table_description.cpp
+++ b/ydb/core/ydb_convert/table_description.cpp
@@ -429,6 +429,18 @@ bool FillIndexDescription(NKikimrSchemeOp::TIndexedTableCreationConfig& out,
return true;
}
+template <typename TOutProto, typename TInProto>
+void FillAttributesImpl(TOutProto& out, const TInProto& in) {
+ if (!in.UserAttributesSize()) {
+ return;
+ }
+
+ auto& outAttrs = *out.mutable_attributes();
+ for (const auto& inAttr : in.GetUserAttributes()) {
+ outAttrs[inAttr.GetKey()] = inAttr.GetValue();
+ }
+}
+
void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
const NKikimrSchemeOp::TTableDescription& in) {
@@ -467,6 +479,8 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
default:
break;
}
+
+ FillAttributesImpl(*changefeed, stream);
}
}
@@ -509,6 +523,12 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out,
out.SetState(NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan);
}
+ for (const auto& [key, value] : in.attributes()) {
+ auto& attr = *out.AddUserAttributes();
+ attr.SetKey(key);
+ attr.SetValue(value);
+ }
+
return true;
}
@@ -693,20 +713,6 @@ void FillColumnFamilies(Ydb::Table::CreateTableRequest& out,
FillColumnFamiliesImpl(out, in);
}
-template <typename TYdbProto>
-void FillAttributesImpl(TYdbProto& out,
- const NKikimrSchemeOp::TPathDescription& in) {
-
- if (!in.UserAttributesSize()) {
- return;
- }
-
- auto& outAttrs = *out.mutable_attributes();
- for (const auto& inAttr : in.GetUserAttributes()) {
- outAttrs[inAttr.GetKey()] = inAttr.GetValue();
- }
-}
-
void FillAttributes(Ydb::Table::DescribeTableResult& out,
const NKikimrSchemeOp::TPathDescription& in) {
FillAttributesImpl(out, in);
diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto
index 9405f11612..0f69b2d0fa 100644
--- a/ydb/public/api/protos/ydb_table.proto
+++ b/ydb/public/api/protos/ydb_table.proto
@@ -154,6 +154,8 @@ message Changefeed {
bool virtual_timestamps = 5;
// Initial scan will output the current state of the table first
bool initial_scan = 6;
+ // Attributes. Total size is limited to 10 KB.
+ map<string, string> attributes = 7 [(map_key).length.range = {min: 1, max: 100}, (length).range = {min: 1, max: 4096}];
}
message ChangefeedDescription {
@@ -182,6 +184,8 @@ message ChangefeedDescription {
State state = 4;
// State of emitting of virtual timestamps along with data
bool virtual_timestamps = 5;
+ // Attributes
+ map<string, string> attributes = 6;
}
message StoragePool {
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index 1336a6605d..7c27ca7ea3 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -4262,6 +4262,21 @@ TChangefeedDescription& TChangefeedDescription::WithInitialScan() {
return *this;
}
+TChangefeedDescription& TChangefeedDescription::AddAttribute(const TString& key, const TString& value) {
+ Attributes_[key] = value;
+ return *this;
+}
+
+TChangefeedDescription& TChangefeedDescription::SetAttributes(const THashMap<TString, TString>& attrs) {
+ Attributes_ = attrs;
+ return *this;
+}
+
+TChangefeedDescription& TChangefeedDescription::SetAttributes(THashMap<TString, TString>&& attrs) {
+ Attributes_ = std::move(attrs);
+ return *this;
+}
+
const TString& TChangefeedDescription::GetName() const {
return Name_;
}
@@ -4286,6 +4301,10 @@ bool TChangefeedDescription::GetInitialScan() const {
return InitialScan_;
}
+const THashMap<TString, TString>& TChangefeedDescription::GetAttributes() const {
+ return Attributes_;
+}
+
template <typename TProto>
TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
EChangefeedMode mode;
@@ -4342,6 +4361,10 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
}
}
+ for (const auto& [key, value] : proto.attributes()) {
+ ret.Attributes_[key] = value;
+ }
+
return ret;
}
@@ -4383,6 +4406,10 @@ void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
retention.set_seconds(RetentionPeriod_->Seconds());
retention.set_nanos(RetentionPeriod_->NanoSecondsOfSecond());
}
+
+ for (const auto& [key, value] : Attributes_) {
+ (*proto.mutable_attributes())[key] = value;
+ }
}
TString TChangefeedDescription::ToString() const {
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h
index d2d900ea45..74c1a3e753 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.h
@@ -213,6 +213,10 @@ public:
TChangefeedDescription& WithRetentionPeriod(const TDuration& value);
// Initial scan will output the current state of the table first
TChangefeedDescription& WithInitialScan();
+ // Attributes
+ TChangefeedDescription& AddAttribute(const TString& key, const TString& value);
+ TChangefeedDescription& SetAttributes(const THashMap<TString, TString>& attrs);
+ TChangefeedDescription& SetAttributes(THashMap<TString, TString>&& attrs);
const TString& GetName() const;
EChangefeedMode GetMode() const;
@@ -220,6 +224,7 @@ public:
EChangefeedState GetState() const;
bool GetVirtualTimestamps() const;
bool GetInitialScan() const;
+ const THashMap<TString, TString>& GetAttributes() const;
void SerializeTo(Ydb::Table::Changefeed& proto) const;
TString ToString() const;
@@ -240,6 +245,7 @@ private:
bool VirtualTimestamps_ = false;
std::optional<TDuration> RetentionPeriod_;
bool InitialScan_ = false;
+ THashMap<TString, TString> Attributes_;
};
bool operator==(const TChangefeedDescription& lhs, const TChangefeedDescription& rhs);