diff options
author | ilnaz <[email protected]> | 2023-06-05 15:46:38 +0300 |
---|---|---|
committer | ilnaz <[email protected]> | 2023-06-05 15:46:38 +0300 |
commit | 9f1fc93ef9ffd5d279175e263cd2a67845db6ad4 (patch) | |
tree | 98db7664393931980c0b5586e88329e0f839bd60 | |
parent | 425a6cff6fa3ac1cabb21913ce488cf2001756a2 (diff) |
api: Changefeed's resolved_timestamps_interval option
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 3 | ||||
-rw-r--r-- | ydb/core/ydb_convert/table_description.cpp | 8 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_table.proto | 4 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table.cpp | 28 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table.h | 4 |
5 files changed, 43 insertions, 4 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 6dbb9a615c0..dbc7c6e7065 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -1361,7 +1361,8 @@ public: } const auto interval = TDuration::FromValue(value); - Y_UNUSED(interval); // TODO(ilnaz): implement + auto& resolvedTimestamps = *add_changefeed->mutable_resolved_timestamps_interval(); + resolvedTimestamps.set_seconds(interval.Seconds()); } else if (name == "retention_period") { YQL_ENSURE(setting.Value().Maybe<TCoInterval>()); const auto value = FromString<i64>( diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index 191a88d8986..e59c11f10f1 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -463,6 +463,10 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out, changefeed->set_virtual_timestamps(stream.GetVirtualTimestamps()); changefeed->set_aws_region(stream.GetAwsRegion()); + if (const auto value = stream.GetResolvedTimestampsIntervalMs()) { + changefeed->mutable_resolved_timestamps_interval()->set_seconds(TDuration::MilliSeconds(value).Seconds()); + } + switch (stream.GetMode()) { case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeKeysOnly: case NKikimrSchemeOp::ECdcStreamMode::ECdcStreamModeUpdate: @@ -507,6 +511,10 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out, out.SetVirtualTimestamps(in.virtual_timestamps()); out.SetAwsRegion(in.aws_region()); + if (in.has_resolved_timestamps_interval()) { + out.SetResolvedTimestampsIntervalMs(TDuration::Seconds(in.resolved_timestamps_interval().seconds()).MilliSeconds()); + } + switch (in.mode()) { case Ydb::Table::ChangefeedMode::MODE_KEYS_ONLY: case Ydb::Table::ChangefeedMode::MODE_UPDATES: diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index ac18b22744d..f807c874591 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -161,6 +161,8 @@ message Changefeed { map<string, string> attributes = 7 [(map_key).length.range = {min: 1, max: 100}, (length).range = {min: 1, max: 4096}]; // Value that will be emitted in the `awsRegion` field of the record in DYNAMODB_STREAMS_JSON format string aws_region = 8 [(length).le = 128]; + // Periodically emit resolved timestamps. If unspecified, resolved timestamps are not emitted. + google.protobuf.Duration resolved_timestamps_interval = 9; } message ChangefeedDescription { @@ -193,6 +195,8 @@ message ChangefeedDescription { map<string, string> attributes = 6; // Value that will be emitted in the `awsRegion` field of the record in DYNAMODB_STREAMS_JSON format string aws_region = 7; + // Interval of emitting of resolved timestamps. If unspecified, resolved timestamps are not emitted. + google.protobuf.Duration resolved_timestamps_interval = 8; } message StoragePool { diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 47954f56fd5..976f8c8b5bf 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -17,6 +17,8 @@ #include <ydb/public/sdk/cpp/client/ydb_table/impl/request_migrator.h> #include <ydb/public/sdk/cpp/client/resources/ydb_resources.h> +#include <google/protobuf/util/time_util.h> + #include <library/cpp/cache/cache.h> #include <util/generic/map.h> @@ -4304,6 +4306,11 @@ TChangefeedDescription& TChangefeedDescription::WithVirtualTimestamps() { return *this; } +TChangefeedDescription& TChangefeedDescription::WithResolvedTimestamps(const TDuration& value) { + ResolvedTimestamps_ = value; + return *this; +} + TChangefeedDescription& TChangefeedDescription::WithRetentionPeriod(const TDuration& value) { RetentionPeriod_ = value; return *this; @@ -4354,6 +4361,10 @@ bool TChangefeedDescription::GetVirtualTimestamps() const { return VirtualTimestamps_; } +const std::optional<TDuration>& TChangefeedDescription::GetResolvedTimestamps() const { + return ResolvedTimestamps_; +} + bool TChangefeedDescription::GetInitialScan() const { return InitialScan_; } @@ -4407,6 +4418,10 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { if (proto.virtual_timestamps()) { ret.WithVirtualTimestamps(); } + if (proto.has_resolved_timestamps_interval()) { + ret.WithResolvedTimestamps(TDuration::MilliSeconds( + ::google::protobuf::util::TimeUtil::DurationToMilliseconds(proto.resolved_timestamps_interval()))); + } if (!proto.aws_region().empty()) { ret.WithAwsRegion(proto.aws_region()); } @@ -4472,10 +4487,12 @@ void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const { break; } + if (ResolvedTimestamps_) { + SetDuration(*ResolvedTimestamps_, *proto.mutable_resolved_timestamps_interval()); + } + if (RetentionPeriod_) { - auto& retention = *proto.mutable_retention_period(); - retention.set_seconds(RetentionPeriod_->Seconds()); - retention.set_nanos(RetentionPeriod_->NanoSecondsOfSecond()); + SetDuration(*RetentionPeriod_, *proto.mutable_retention_period()); } for (const auto& [key, value] : Attributes_) { @@ -4496,6 +4513,10 @@ void TChangefeedDescription::Out(IOutputStream& o) const { << ", format: " << Format_ << "" << ", virtual_timestamps: " << (VirtualTimestamps_ ? "on": "off") << ""; + if (ResolvedTimestamps_) { + o << ", resolved_timestamps: " << *ResolvedTimestamps_; + } + if (RetentionPeriod_) { o << ", retention_period: " << *RetentionPeriod_; } @@ -4512,6 +4533,7 @@ bool operator==(const TChangefeedDescription& lhs, const TChangefeedDescription& && lhs.GetMode() == rhs.GetMode() && lhs.GetFormat() == rhs.GetFormat() && lhs.GetVirtualTimestamps() == rhs.GetVirtualTimestamps() + && lhs.GetResolvedTimestamps() == rhs.GetResolvedTimestamps() && lhs.GetAwsRegion() == rhs.GetAwsRegion(); } diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index 458dbe6a67c..b402fa1e15a 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -209,6 +209,8 @@ public: // Enable virtual timestamps TChangefeedDescription& WithVirtualTimestamps(); + // Enable resolved timestamps + TChangefeedDescription& WithResolvedTimestamps(const TDuration& interval); // Customise retention period of underlying topic (24h by default). TChangefeedDescription& WithRetentionPeriod(const TDuration& value); // Initial scan will output the current state of the table first @@ -225,6 +227,7 @@ public: EChangefeedFormat GetFormat() const; EChangefeedState GetState() const; bool GetVirtualTimestamps() const; + const std::optional<TDuration>& GetResolvedTimestamps() const; bool GetInitialScan() const; const THashMap<TString, TString>& GetAttributes() const; const TString& GetAwsRegion() const; @@ -246,6 +249,7 @@ private: EChangefeedFormat Format_; EChangefeedState State_ = EChangefeedState::Unknown; bool VirtualTimestamps_ = false; + std::optional<TDuration> ResolvedTimestamps_; std::optional<TDuration> RetentionPeriod_; bool InitialScan_ = false; THashMap<TString, TString> Attributes_; |