aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2023-01-25 17:08:09 +0300
committerilnaz <ilnaz@ydb.tech>2023-01-25 17:08:09 +0300
commitd5a7349e603909130b6d390ffb15225953b1a378 (patch)
treecfd74fa1eccc400b33235224bb1c227bd99d6947
parentcfbb1e03427298b62dc253a69fd8d631bf224a88 (diff)
downloadydb-d5a7349e603909130b6d390ffb15225953b1a378.tar.gz
Public api, cpp sdk & cli
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp13
-rw-r--r--ydb/core/ydb_convert/table_description.cpp21
-rw-r--r--ydb/public/api/protos/ydb_table.proto3
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.cpp31
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table.h6
-rw-r--r--ydb/public/sdk/cpp/client/ydb_table/table_enum.h8
7 files changed, 78 insertions, 9 deletions
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index c2d37a9412..60e158cbcb 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -975,16 +975,17 @@ public:
return SyncError();
}
} else if (name == "initial_scan") {
- // TODO: handle initial_scan setting
- ctx.AddError(TIssue(ctx.GetPosition(setting.Name().Pos()),
- TStringBuilder() << name << " setting is not supported yet"));
- return SyncError();
+ auto value = TString(
+ setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
+ );
+
+ add_changefeed->set_initial_scan(FromString<bool>(to_lower(value)));
} else if (name == "virtual_timestamps") {
- auto vt = TString(
+ auto value = TString(
setting.Value().Cast<TCoDataCtor>().Literal().Cast<TCoAtom>().Value()
);
- add_changefeed->set_virtual_timestamps(FromString<bool>(to_lower(vt)));
+ add_changefeed->set_virtual_timestamps(FromString<bool>(to_lower(value)));
} else if (name == "retention_period") {
YQL_ENSURE(setting.Value().Maybe<TCoInterval>());
const auto value = FromString<i64>(
diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp
index 69dc8547ed..435ce40153 100644
--- a/ydb/core/ydb_convert/table_description.cpp
+++ b/ydb/core/ydb_convert/table_description.cpp
@@ -420,7 +420,6 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
auto changefeed = out.add_changefeeds();
changefeed->set_name(stream.GetName());
- changefeed->set_state(Ydb::Table::ChangefeedDescription::STATE_ENABLED);
changefeed->set_virtual_timestamps(stream.GetVirtualTimestamps());
switch (stream.GetMode()) {
@@ -442,6 +441,17 @@ void FillChangefeedDescription(Ydb::Table::DescribeTableResult& out,
default:
break;
}
+
+ switch (stream.GetState()) {
+ case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateReady:
+ changefeed->set_state(Ydb::Table::ChangefeedDescription::STATE_ENABLED);
+ case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateDisabled:
+ changefeed->set_state(Ydb::Table::ChangefeedDescription::STATE_DISABLED);
+ case NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan:
+ changefeed->set_state(Ydb::Table::ChangefeedDescription::STATE_INITIAL_SCAN);
+ default:
+ break;
+ }
}
}
@@ -475,6 +485,15 @@ bool FillChangefeedDescription(NKikimrSchemeOp::TCdcStreamDescription& out,
return false;
}
+ if (in.initial_scan()) {
+ if (!AppData()->FeatureFlags.GetEnableChangefeedInitialScan()) {
+ status = Ydb::StatusIds::UNSUPPORTED;
+ error = "Changefeed initial scan is not supported yet";
+ return false;
+ }
+ out.SetState(NKikimrSchemeOp::ECdcStreamState::ECdcStreamStateScan);
+ }
+
return true;
}
diff --git a/ydb/public/api/protos/ydb_table.proto b/ydb/public/api/protos/ydb_table.proto
index 12e0c8918d..8b41405f32 100644
--- a/ydb/public/api/protos/ydb_table.proto
+++ b/ydb/public/api/protos/ydb_table.proto
@@ -152,6 +152,8 @@ message Changefeed {
google.protobuf.Duration retention_period = 4;
// Emit virtual timestamps of changes along with data or not
bool virtual_timestamps = 5;
+ // Initial scan will output the current state of the table first
+ bool initial_scan = 6;
}
message ChangefeedDescription {
@@ -159,6 +161,7 @@ message ChangefeedDescription {
STATE_UNSPECIFIED = 0;
STATE_ENABLED = 1;
STATE_DISABLED = 2;
+ STATE_INITIAL_SCAN = 3;
}
// Name of the feed
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp
index ed39b75b2c..c48fc04a25 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp
@@ -343,7 +343,7 @@ namespace {
return;
}
- TPrettyTable table({ "Name", "Mode", "Format", "VirtualTimestamps" },
+ TPrettyTable table({ "Name", "Mode", "Format", "State", "VirtualTimestamps" },
TPrettyTableConfig().WithoutRowDelimiters());
for (const auto& changefeed : changefeeds) {
@@ -351,7 +351,8 @@ namespace {
.Column(0, changefeed.GetName())
.Column(1, changefeed.GetMode())
.Column(2, changefeed.GetFormat())
- .Column(3, changefeed.GetVirtualTimestamps() ? "on" : "off");
+ .Column(3, changefeed.GetState())
+ .Column(4, changefeed.GetVirtualTimestamps() ? "on" : "off");
}
Cout << Endl << "Changefeeds:" << Endl << table;
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
index 6408230fde..8797a3332c 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp
@@ -4407,6 +4407,11 @@ TChangefeedDescription& TChangefeedDescription::WithRetentionPeriod(const TDurat
return *this;
}
+TChangefeedDescription& TChangefeedDescription::WithInitialScan() {
+ InitialScan_ = true;
+ return *this;
+}
+
const TString& TChangefeedDescription::GetName() const {
return Name_;
}
@@ -4419,10 +4424,18 @@ EChangefeedFormat TChangefeedDescription::GetFormat() const {
return Format_;
}
+EChangefeedState TChangefeedDescription::GetState() const {
+ return State_;
+}
+
bool TChangefeedDescription::GetVirtualTimestamps() const {
return VirtualTimestamps_;
}
+bool TChangefeedDescription::GetInitialScan() const {
+ return InitialScan_;
+}
+
template <typename TProto>
TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
EChangefeedMode mode;
@@ -4462,12 +4475,30 @@ TChangefeedDescription TChangefeedDescription::FromProto(const TProto& proto) {
ret.WithVirtualTimestamps();
}
+ if constexpr (std::is_same_v<TProto, Ydb::Table::ChangefeedDescription>) {
+ switch (proto.state()) {
+ case Ydb::Table::ChangefeedDescription::STATE_ENABLED:
+ ret.State_= EChangefeedState::Enabled;
+ break;
+ case Ydb::Table::ChangefeedDescription::STATE_DISABLED:
+ ret.State_ = EChangefeedState::Disabled;
+ break;
+ case Ydb::Table::ChangefeedDescription::STATE_INITIAL_SCAN:
+ ret.State_ = EChangefeedState::InitialScan;
+ break;
+ default:
+ ret.State_ = EChangefeedState::Unknown;
+ break;
+ }
+ }
+
return ret;
}
void TChangefeedDescription::SerializeTo(Ydb::Table::Changefeed& proto) const {
proto.set_name(Name_);
proto.set_virtual_timestamps(VirtualTimestamps_);
+ proto.set_initial_scan(InitialScan_);
switch (Mode_) {
case EChangefeedMode::KeysOnly:
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.h b/ydb/public/sdk/cpp/client/ydb_table/table.h
index 75c047bc87..dfc62d09e3 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/table.h
@@ -205,11 +205,15 @@ public:
TChangefeedDescription& WithVirtualTimestamps();
// Customise retention period of underlying topic (24h by default).
TChangefeedDescription& WithRetentionPeriod(const TDuration& value);
+ // Initial scan will output the current state of the table first
+ TChangefeedDescription& WithInitialScan();
const TString& GetName() const;
EChangefeedMode GetMode() const;
EChangefeedFormat GetFormat() const;
+ EChangefeedState GetState() const;
bool GetVirtualTimestamps() const;
+ bool GetInitialScan() const;
void SerializeTo(Ydb::Table::Changefeed& proto) const;
TString ToString() const;
@@ -226,8 +230,10 @@ private:
TString Name_;
EChangefeedMode Mode_;
EChangefeedFormat Format_;
+ EChangefeedState State_ = EChangefeedState::Unknown;
bool VirtualTimestamps_ = false;
std::optional<TDuration> RetentionPeriod_;
+ bool InitialScan_ = false;
};
bool operator==(const TChangefeedDescription& lhs, const TChangefeedDescription& rhs);
diff --git a/ydb/public/sdk/cpp/client/ydb_table/table_enum.h b/ydb/public/sdk/cpp/client/ydb_table/table_enum.h
index 5664774bfc..7a96e160cb 100644
--- a/ydb/public/sdk/cpp/client/ydb_table/table_enum.h
+++ b/ydb/public/sdk/cpp/client/ydb_table/table_enum.h
@@ -45,5 +45,13 @@ enum class EChangefeedFormat {
Unknown = std::numeric_limits<int>::max()
};
+enum class EChangefeedState {
+ Enabled,
+ Disabled,
+ InitialScan,
+
+ Unknown = std::numeric_limits<int>::max()
+};
+
} // namespace NTable
} // namespace NYdb