diff options
author | ilnaz <ilnaz@ydb.tech> | 2023-01-25 17:08:09 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2023-01-25 17:08:09 +0300 |
commit | d5a7349e603909130b6d390ffb15225953b1a378 (patch) | |
tree | cfd74fa1eccc400b33235224bb1c227bd99d6947 | |
parent | cfbb1e03427298b62dc253a69fd8d631bf224a88 (diff) | |
download | ydb-d5a7349e603909130b6d390ffb15225953b1a378.tar.gz |
Public api, cpp sdk & cli
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_exec.cpp | 13 | ||||
-rw-r--r-- | ydb/core/ydb_convert/table_description.cpp | 21 | ||||
-rw-r--r-- | ydb/public/api/protos/ydb_table.proto | 3 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp | 5 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table.cpp | 31 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table.h | 6 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table_enum.h | 8 |
7 files changed, 78 insertions, 9 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index c2d37a9412..60e158cbcb 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -975,16 +975,17 @@ public: return SyncError(); } } else if (name == "initial_scan") { - // TODO: handle initial_scan setting - ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()), - TStringBuilder() << name << " setting is not supported yet")); - return SyncError(); + auto value = TString( + setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value() + ); + + add_changefeed->set_initial_scan(FromString<bool>(to_lower(value))); } else if (name == "virtual_timestamps") { - auto vt = TString( + auto value = TString( setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value() ); - add_changefeed->set_virtual_timestamps(FromString<bool>(to_lower(vt))); + add_changefeed->set_virtual_timestamps(FromString<bool>(to_lower(value))); } else if (name == "retention_period") { YQL_ENSURE(setting.Value().Maybe<TCoInterval>()); const auto value = FromString<i64>( diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index 69dc8547ed..435ce40153 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -420,7 +420,6 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out, auto changefeed = out.add_changefeeds(); changefeed->set_name(stream.GetName()); - changefeed->set_state(Ydb::Table::ChangefeedDescription::STATE_ENABLED); changefeed->set_virtual_timestamps(stream.GetVirtualTimestamps()); switch (stream.GetMode()) { @@ -442,6 +441,17 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out, default: break; } + + switch (stream.GetState()) { + case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateReady: + changefeed->set_state(Ydb::Table::ChangefeedDescription::STATE_ENABLED); + case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateDisabled: + changefeed->set_state(Ydb::Table::ChangefeedDescription::STATE_DISABLED); + case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan: + changefeed->set_state(Ydb::Table::ChangefeedDescription::STATE_INITIAL_SCAN); + default: + break; + } } } @@ -475,6 +485,15 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out, return false; } + if (in.initial_scan()) { + if (!AppData()->FeatureFlags.GetEnableChangefeedInitialScan()) { + status = Ydb::StatusIds::UNSUPPORTED; + error = "Changefeed initial scan is not supported yet"; + return false; + } + out.SetState(NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan); + } + return true; } diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto index 12e0c8918d..8b41405f32 100644 --- a/ydb/public/api/protos/ydb_table.proto +++ b/ydb/public/api/protos/ydb_table.proto @@ -152,6 +152,8 @@ message Changefeed { google.protobuf.Duration retention_period = 4; // Emit virtual timestamps of changes along with data or not bool virtual_timestamps = 5; + // Initial scan will output the current state of the table first + bool initial_scan = 6; } message ChangefeedDescription { @@ -159,6 +161,7 @@ message ChangefeedDescription { STATE_UNSPECIFIED = 0; STATE_ENABLED = 1; STATE_DISABLED = 2; + STATE_INITIAL_SCAN = 3; } // Name of the feed diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp index ed39b75b2c..c48fc04a25 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp @@ -343,7 +343,7 @@ namespace { return; } - TPrettyTable table({ "Name", "Mode", "Format", "VirtualTimestamps" }, + TPrettyTable table({ "Name", "Mode", "Format", "State", "VirtualTimestamps" }, TPrettyTableConfig().WithoutRowDelimiters()); for (const auto& changefeed : changefeeds) { @@ -351,7 +351,8 @@ namespace { .Column(0, changefeed.GetName()) .Column(1, changefeed.GetMode()) .Column(2, changefeed.GetFormat()) - .Column(3, changefeed.GetVirtualTimestamps() ? "on" : "off"); + .Column(3, changefeed.GetState()) + .Column(4, changefeed.GetVirtualTimestamps() ? "on" : "off"); } Cout << Endl << "Changefeeds:" << Endl << table; diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 6408230fde..8797a3332c 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -4407,6 +4407,11 @@ TChangefeedDescription& TChangefeedDescription::WithRetentionPeriod(const TDurat return *this; } +TChangefeedDescription& TChangefeedDescription::WithInitialScan() { + InitialScan_ = true; + return *this; +} + const TString& TChangefeedDescription::GetName() const { return Name_; } @@ -4419,10 +4424,18 @@ EChangefeedFormat TChangefeedDescription::GetFormat() const { return Format_; } +EChangefeedState TChangefeedDescription::GetState() const { + return State_; +} + bool TChangefeedDescription::GetVirtualTimestamps() const { return VirtualTimestamps_; } +bool TChangefeedDescription::GetInitialScan() const { + return InitialScan_; +} + template <typename TProto> TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { EChangefeedMode mode; @@ -4462,12 +4475,30 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) { ret.WithVirtualTimestamps(); } + if constexpr (std::is_same_v<TProto, Ydb::Table::ChangefeedDescription>) { + switch (proto.state()) { + case Ydb::Table::ChangefeedDescription::STATE_ENABLED: + ret.State_= EChangefeedState::Enabled; + break; + case Ydb::Table::ChangefeedDescription::STATE_DISABLED: + ret.State_ = EChangefeedState::Disabled; + break; + case Ydb::Table::ChangefeedDescription::STATE_INITIAL_SCAN: + ret.State_ = EChangefeedState::InitialScan; + break; + default: + ret.State_ = EChangefeedState::Unknown; + break; + } + } + return ret; } void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const { proto.set_name(Name_); proto.set_virtual_timestamps(VirtualTimestamps_); + proto.set_initial_scan(InitialScan_); switch (Mode_) { case EChangefeedMode::KeysOnly: diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h index 75c047bc87..dfc62d09e3 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table.h @@ -205,11 +205,15 @@ public: TChangefeedDescription& WithVirtualTimestamps(); // Customise retention period of underlying topic (24h by default). TChangefeedDescription& WithRetentionPeriod(const TDuration& value); + // Initial scan will output the current state of the table first + TChangefeedDescription& WithInitialScan(); const TString& GetName() const; EChangefeedMode GetMode() const; EChangefeedFormat GetFormat() const; + EChangefeedState GetState() const; bool GetVirtualTimestamps() const; + bool GetInitialScan() const; void SerializeTo(Ydb::Table::Changefeed& proto) const; TString ToString() const; @@ -226,8 +230,10 @@ private: TString Name_; EChangefeedMode Mode_; EChangefeedFormat Format_; + EChangefeedState State_ = EChangefeedState::Unknown; bool VirtualTimestamps_ = false; std::optional<TDuration> RetentionPeriod_; + bool InitialScan_ = false; }; bool operator==(const TChangefeedDescription& lhs, const TChangefeedDescription& rhs); diff --git a/ydb/public/sdk/cpp/client/ydb_table/table_enum.h b/ydb/public/sdk/cpp/client/ydb_table/table_enum.h index 5664774bfc..7a96e160cb 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table_enum.h +++ b/ydb/public/sdk/cpp/client/ydb_table/table_enum.h @@ -45,5 +45,13 @@ enum class EChangefeedFormat { Unknown = std::numeric_limits<int>::max() }; +enum class EChangefeedState { + Enabled, + Disabled, + InitialScan, + + Unknown = std::numeric_limits<int>::max() +}; + } // namespace NTable } // namespace NYdb |