summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <[email protected]>2023-06-05 15:46:38 +0300
committerilnaz <[email protected]>2023-06-05 15:46:38 +0300
commit9f1fc93ef9ffd5d279175e263cd2a67845db6ad4 (patch)
tree98db7664393931980c0b5586e88329e0f839bd60
parent425a6cff6fa3ac1cabb21913ce488cf2001756a2 (diff)
api: Changefeed's resolved_timestamps_interval option
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp3
-rw-r--r--ydb/core/ydb_convert/table_description.cpp8
-rw-r--r--ydb/public/api/protos/ydb_table.proto4
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp28
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.h4
5 files changed, 43 insertions, 4 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index 6dbb9a615c0..dbc7c6e7065 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -1361,7 +1361,8 @@ public:
}
const auto interval = TDuration::FromValue(value);
- Y_UNUSED(interval); // TODO(ilnaz): implement
+ auto& resolvedTimestamps = *add_changefeed->mutable_resolved_timestamps_interval();
+ resolvedTimestamps.set_seconds(interval.Seconds());
} else if (name == "retention_period") {
YQL_ENSURE(setting.Value().Maybe<TCoInterval>());
const auto value = FromString<i64>(
diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp
index 191a88d8986..e59c11f10f1 100644
--- a/ydb/core/ydb_convert/table_description.cpp
+++ b/ydb/core/ydb_convert/table_description.cpp
@@ -463,6 +463,10 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
changefeed->set_virtual_timestamps(stream.GetVirtualTimestamps());
changefeed->set_aws_region(stream.GetAwsRegion());
+ if (const auto value = stream.GetResolvedTimestampsIntervalMs()) {
+ changefeed->mutable_resolved_timestamps_interval()->set_seconds(TDuration::MilliSeconds(value).Seconds());
+ }
+
switch (stream.GetMode()) {
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeKeysOnly:
case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeUpdate:
@@ -507,6 +511,10 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out,
out.SetVirtualTimestamps(in.virtual_timestamps());
out.SetAwsRegion(in.aws_region());
+ if (in.has_resolved_timestamps_interval()) {
+ out.SetResolvedTimestampsIntervalMs(TDuration::Seconds(in.resolved_timestamps_interval().seconds()).MilliSeconds());
+ }
+
switch (in.mode()) {
case Ydb::Table::ChangefeedMode::MODE_KEYS_ONLY:
case Ydb::Table::ChangefeedMode::MODE_UPDATES:
diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto
index ac18b22744d..f807c874591 100644
--- a/ydb/public/api/protos/ydb_table.proto
+++ b/ydb/public/api/protos/ydb_table.proto
@@ -161,6 +161,8 @@ message Changefeed {
map<string, string> attributes = 7 [(map_key).length.range = {min: 1, max: 100}, (length).range = {min: 1, max: 4096}];
// Value that will be emitted in the `awsRegion` field of the record in DYNAMODB_STREAMS_JSON format
string aws_region = 8 [(length).le = 128];
+ // Periodically emit resolved timestamps. If unspecified, resolved timestamps are not emitted.
+ google.protobuf.Duration resolved_timestamps_interval = 9;
}
message ChangefeedDescription {
@@ -193,6 +195,8 @@ message ChangefeedDescription {
map<string, string> attributes = 6;
// Value that will be emitted in the `awsRegion` field of the record in DYNAMODB_STREAMS_JSON format
string aws_region = 7;
+ // Interval of emitting of resolved timestamps. If unspecified, resolved timestamps are not emitted.
+ google.protobuf.Duration resolved_timestamps_interval = 8;
}
message StoragePool {
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index 47954f56fd5..976f8c8b5bf 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -17,6 +17,8 @@
#include <ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h>
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
+#include <google/protobuf/util/time_util.h>
+
#include <library/cpp/cache/cache.h>
#include <util/generic/map.h>
@@ -4304,6 +4306,11 @@ TChangefeedDescription& TChangefeedDescription::WithVirtualTimestamps() {
return *this;
}
+TChangefeedDescription& TChangefeedDescription::WithResolvedTimestamps(const TDuration& value) {
+ ResolvedTimestamps_ = value;
+ return *this;
+}
+
TChangefeedDescription& TChangefeedDescription::WithRetentionPeriod(const TDuration& value) {
RetentionPeriod_ = value;
return *this;
@@ -4354,6 +4361,10 @@ bool TChangefeedDescription::GetVirtualTimestamps() const {
return VirtualTimestamps_;
}
+const std::optional<TDuration>& TChangefeedDescription::GetResolvedTimestamps() const {
+ return ResolvedTimestamps_;
+}
+
bool TChangefeedDescription::GetInitialScan() const {
return InitialScan_;
}
@@ -4407,6 +4418,10 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
if (proto.virtual_timestamps()) {
ret.WithVirtualTimestamps();
}
+ if (proto.has_resolved_timestamps_interval()) {
+ ret.WithResolvedTimestamps(TDuration::MilliSeconds(
+ ::google::protobuf::util::TimeUtil::DurationToMilliseconds(proto.resolved_timestamps_interval())));
+ }
if (!proto.aws_region().empty()) {
ret.WithAwsRegion(proto.aws_region());
}
@@ -4472,10 +4487,12 @@ void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
break;
}
+ if (ResolvedTimestamps_) {
+ SetDuration(*ResolvedTimestamps_, *proto.mutable_resolved_timestamps_interval());
+ }
+
if (RetentionPeriod_) {
- auto& retention = *proto.mutable_retention_period();
- retention.set_seconds(RetentionPeriod_->Seconds());
- retention.set_nanos(RetentionPeriod_->NanoSecondsOfSecond());
+ SetDuration(*RetentionPeriod_, *proto.mutable_retention_period());
}
for (const auto& [key, value] : Attributes_) {
@@ -4496,6 +4513,10 @@ void TChangefeedDescription::Out(IOutputStream& o) const {
<< ", format: " << Format_ << ""
<< ", virtual_timestamps: " << (VirtualTimestamps_ ? "on": "off") << "";
+ if (ResolvedTimestamps_) {
+ o << ", resolved_timestamps: " << *ResolvedTimestamps_;
+ }
+
if (RetentionPeriod_) {
o << ", retention_period: " << *RetentionPeriod_;
}
@@ -4512,6 +4533,7 @@ bool operator==(const TChangefeedDescription& lhs, const TChangefeedDescription&
&& lhs.GetMode() == rhs.GetMode()
&& lhs.GetFormat() == rhs.GetFormat()
&& lhs.GetVirtualTimestamps() == rhs.GetVirtualTimestamps()
+ && lhs.GetResolvedTimestamps() == rhs.GetResolvedTimestamps()
&& lhs.GetAwsRegion() == rhs.GetAwsRegion();
}
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h
index 458dbe6a67c..b402fa1e15a 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.h
@@ -209,6 +209,8 @@ public:
// Enable virtual timestamps
TChangefeedDescription& WithVirtualTimestamps();
+ // Enable resolved timestamps
+ TChangefeedDescription& WithResolvedTimestamps(const TDuration& interval);
// Customise retention period of underlying topic (24h by default).
TChangefeedDescription& WithRetentionPeriod(const TDuration& value);
// Initial scan will output the current state of the table first
@@ -225,6 +227,7 @@ public:
EChangefeedFormat GetFormat() const;
EChangefeedState GetState() const;
bool GetVirtualTimestamps() const;
+ const std::optional<TDuration>& GetResolvedTimestamps() const;
bool GetInitialScan() const;
const THashMap<TString, TString>& GetAttributes() const;
const TString& GetAwsRegion() const;
@@ -246,6 +249,7 @@ private:
EChangefeedFormat Format_;
EChangefeedState State_ = EChangefeedState::Unknown;
bool VirtualTimestamps_ = false;
+ std::optional<TDuration> ResolvedTimestamps_;
std::optional<TDuration> RetentionPeriod_;
bool InitialScan_ = false;
THashMap<TString, TString> Attributes_;