aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTony-Romanov <150126326+Tony-Romanov@users.noreply.github.com>2025-04-17 16:11:51 +0200
committerGitHub <noreply@github.com>2025-04-17 16:11:51 +0200
commita97c3d07114bc6de0225a84da5224858ac2928ce (patch)
tree47ab27d2cbd0234591695ca1ba12ecc33ec8819f
parentf309eb309a259c43e7d2df60537da344fe76435a (diff)
downloadydb-a97c3d07114bc6de0225a84da5224858ac2928ce.tar.gz
Return back table for current slice. (#17352)
-rw-r--r--ydb/apps/etcd_proxy/proxy.cpp8
-rw-r--r--ydb/apps/etcd_proxy/service/create.sql22
-rw-r--r--ydb/apps/etcd_proxy/service/etcd_base_init.cpp2
-rw-r--r--ydb/apps/etcd_proxy/service/etcd_impl.cpp45
-rw-r--r--ydb/apps/etcd_proxy/service/etcd_impl.h2
-rw-r--r--ydb/apps/etcd_proxy/service/etcd_watch.cpp9
6 files changed, 53 insertions, 35 deletions
diff --git a/ydb/apps/etcd_proxy/proxy.cpp b/ydb/apps/etcd_proxy/proxy.cpp
index 40bd51b513f..f08b4a8dd00 100644
--- a/ydb/apps/etcd_proxy/proxy.cpp
+++ b/ydb/apps/etcd_proxy/proxy.cpp
@@ -210,7 +210,13 @@ int TProxy::ImportDatabase() {
const auto driver = NYdb::TDriver(config);
auto client = NYdb::NTable::TTableClient(driver);
- if (const auto res = client.BulkUpsert(Database + Folder + "/content", std::move(value)).ExtractValueSync(); !res.IsSuccess()) {
+ if (const auto res = client.BulkUpsert(Database + Folder + "/current", std::move(value)).ExtractValueSync(); !res.IsSuccess()) {
+ std::cout << res.GetIssues().ToString() << std::endl;
+ return 1;
+ }
+
+ const auto& param = NYdb::TParamsBuilder().AddParam("$Prefix").String(ImportPrefix_).Build().Build();
+ if (const auto res = Stuff->Client->ExecuteQuery("insert into `history` select * from `current` where startswith(`key`,$Prefix);", NYdb::NQuery::TTxControl::NoTx(), param).ExtractValueSync(); !res.IsSuccess()) {
std::cout << res.GetIssues().ToString() << std::endl;
return 1;
}
diff --git a/ydb/apps/etcd_proxy/service/create.sql b/ydb/apps/etcd_proxy/service/create.sql
index 284d782fe95..dba80ea67c6 100644
--- a/ydb/apps/etcd_proxy/service/create.sql
+++ b/ydb/apps/etcd_proxy/service/create.sql
@@ -1,4 +1,16 @@
-CREATE TABLE content
+CREATE TABLE current
+(
+ `key` Bytes NOT NULL,
+ `created` Int64 NOT NULL,
+ `modified` Int64 NOT NULL,
+ `version` Int64 NOT NULL,
+ `value` Bytes NOT NULL,
+ `lease` Int64 NOT NULL,
+ PRIMARY KEY (`key`)
+)
+WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 23, AUTO_PARTITIONING_PARTITION_SIZE_MB = 11);
+
+CREATE TABLE history
(
`key` Bytes NOT NULL,
`created` Int64 NOT NULL,
@@ -7,7 +19,8 @@ CREATE TABLE content
`value` Bytes NOT NULL,
`lease` Int64 NOT NULL,
PRIMARY KEY (`key`, `modified`)
-);
+)
+WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 23, AUTO_PARTITIONING_PARTITION_SIZE_MB = 101);
CREATE TABLE leases
(
@@ -16,6 +29,7 @@ CREATE TABLE leases
`created` Datetime NOT NULL,
`updated` Datetime NOT NULL,
PRIMARY KEY (`id`)
-);
+)
+WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 23, AUTO_PARTITIONING_PARTITION_SIZE_MB = 11);
-ALTER TABLE content ADD CHANGEFEED changes WITH (format="JSON", mode="UPDATES");
+ALTER TABLE history ADD CHANGEFEED changes WITH (format="JSON", mode="UPDATES");
diff --git a/ydb/apps/etcd_proxy/service/etcd_base_init.cpp b/ydb/apps/etcd_proxy/service/etcd_base_init.cpp
index 196a27a3066..d591cc87d54 100644
--- a/ydb/apps/etcd_proxy/service/etcd_base_init.cpp
+++ b/ydb/apps/etcd_proxy/service/etcd_base_init.cpp
@@ -9,7 +9,7 @@ std::string GetCreateTablesSQL(const std::string& prefix) {
}
std::string GetLastRevisionSQL(const std::string& prefix) {
- return prefix + "select nvl(max(`modified`), 1L) from `content`; select nvl(max(`id`), 1L) from `leases`;";
+ return prefix + "select nvl(max(`modified`), 1L) from `history`; select nvl(max(`id`), 1L) from `leases`;";
}
}
diff --git a/ydb/apps/etcd_proxy/service/etcd_impl.cpp b/ydb/apps/etcd_proxy/service/etcd_impl.cpp
index 4f613191cb3..a72f2200504 100644
--- a/ydb/apps/etcd_proxy/service/etcd_impl.cpp
+++ b/ydb/apps/etcd_proxy/service/etcd_impl.cpp
@@ -35,10 +35,13 @@ struct TOperation {
};
void MakeSlice(const std::string_view& where, std::ostream& sql, NYdb::TParamsBuilder& params, size_t* paramsCounter = nullptr, const i64 revision = 0LL) {
- sql << "select * from (select max_by(TableRow(), `modified`) from `content`" << where;
- if (revision)
+ if (revision) {
+ sql << "select * from (select max_by(TableRow(), `modified`) from `history`" << where;
sql << " and " << AddParam("Rev", params, revision, paramsCounter) << " >= `modified`";
- sql << " group by `key`) flatten columns where 0L < `version`";
+ sql << " group by `key`) flatten columns where 0L < `version`";
+ } else {
+ sql << "select * from `current`" << where;
+ }
}
void MakeSlice(const std::string_view& key, const std::string_view& rangeEnd, std::ostream& sql, NYdb::TParamsBuilder& params, size_t* paramsCounter, const i64 revision = 0LL) {
@@ -252,7 +255,7 @@ struct TPut : public TOperation {
const auto& oldResultSetName = GetNameWithIndex("Old", resultsCounter);
const auto& newResultSetName = GetNameWithIndex("New", resultsCounter);
- sql << oldResultSetName << " = select * from (select * from `content`" << keyFilter << " order by `modified` desc limit 1UL) where 0L < `version`;" << std::endl;
+ sql << oldResultSetName << " = select * from `current` " << keyFilter << ';' << std::endl;
sql << newResultSetName << " = select" << std::endl;
sql << '\t' << keyParamName << " as `key`," << std::endl;
sql << '\t' << "if(`version` > 0L, `created`, $Revision) as `created`," << std::endl;
@@ -274,7 +277,8 @@ struct TPut : public TOperation {
sql << " where " << txnFilter << ')';
sql << ';' << std::endl;
- sql << "insert into `content` select * from " << newResultSetName << ';' << std::endl;
+ sql << (update ? "update `current` on" : "upsert into `current`") << " select * from " << newResultSetName << ';' << std::endl;
+ sql << "insert into `history` select * from " << newResultSetName << ';' << std::endl;
if (GetPrevious || NotifyWatchtower || update) {
if (resultsCounter)
@@ -376,7 +380,7 @@ struct TDeleteRange : public TOperation {
sql << " where " << txnFilter;
sql << ';' << std::endl;
- sql << "insert into `content`" << std::endl;
+ sql << "insert into `history`" << std::endl;
sql << "select `key`, `created`, $Revision as `modified`, 0L as `version`, `value`, `lease` from " << oldResultSetName << ';' << std::endl;
sql << "select count(*) from " << oldResultSetName << ';' << std::endl;
@@ -385,6 +389,10 @@ struct TDeleteRange : public TOperation {
++(*resultsCounter);
sql << "select `key`, `value`, `created`, `modified`, `version`, `lease` from " << oldResultSetName << ';' << std::endl;
}
+ sql << "delete from `current`" << keyFilter;
+ if (!txnFilter.empty())
+ sql << " and " << txnFilter;
+ sql << ';' << std::endl;
}
void MakeQueryWithParams(std::ostream& sql, NYdb::TParamsBuilder& params, size_t* paramsCounter = nullptr, size_t* resultsCounter = nullptr, const std::string_view& txnFilter = {}) {
@@ -1074,12 +1082,12 @@ private:
}
void MakeQueryWithParams(std::ostream& sql, NYdb::TParamsBuilder& params) final {
- sql << "$Trash = select c.key as key, c.modified as modified from `content` as c inner join (" << std::endl;
- sql << "select max_by((`key`, `modified`), `modified`) as pair from `content`" << std::endl;
+ sql << "$Trash = select c.key as key, c.modified as modified from `history` as c inner join (" << std::endl;
+ sql << "select max_by((`key`, `modified`), `modified`) as pair from `history`" << std::endl;
sql << "where `modified` < " << AddParam("Revision", params, KeyRevision) << " and 0L = `version` group by `key`" << std::endl;
sql << ") as keys on keys.pair.0 = c.key where c.modified <= keys.pair.1;" << std::endl;
sql << "select count(*) from $Trash;" << std::endl;
- sql << "delete from `content` on select * from $Trash;" << std::endl;
+ sql << "delete from `history` on select * from $Trash;" << std::endl;
}
void ReplyWith(const NYdb::TResultSets& results, const TActorContext& ctx) final {
@@ -1163,16 +1171,15 @@ private:
sql << "select count(*) > 0UL from `leases` where " << leaseParamName << " = `id`;" << std::endl;
- sql << "$Victims = ";
- MakeSimpleSlice(sql, params);
- sql << " and " << leaseParamName << " = `lease`;" << std::endl;
-
- sql << "insert into `content`" << std::endl;
- sql << "select `key`, `created`, " << revisionParamName << " as `modified`, 0L as `version`, `value`, `lease` from $Victims;" << std::endl;
+ sql << "$Victims = select `key`, `value`, `created`, `modified`, `version`, `lease` from `current` where " << leaseParamName << " = `lease`;" << std::endl;
if constexpr (NotifyWatchtower) {
sql << "select `key`, `value`, `created`, `modified`, `version`, `lease` from $Victims;" << std::endl;
}
+
+ sql << "insert into `history`" << std::endl;
+ sql << "select `key`, `created`, " << revisionParamName << " as `modified`, 0L as `version`, `value`, `lease` from $Victims;" << std::endl;
+ sql << "delete from `current` on select `key` from $Victims;" << std::endl;
sql << "delete from `leases` where " << leaseParamName << " = `id`;" << std::endl;
}
@@ -1237,9 +1244,7 @@ private:
sql << "select `ttl`, `ttl` - unwrap(cast(CurrentUtcDatetime(`id`) - `updated` as Int64) / 1000000L) as `granted` from `leases` where " << leaseParamName << " = `id`;" << std::endl;
if (Keys) {
- sql << "select `key` from (";
- MakeSimpleSlice(sql, params);
- sql << " and " << leaseParamName << " = `lease`);" << std::endl;
+ sql << "select `key` from `current` where " << leaseParamName << " = `lease`;" << std::endl;
}
}
@@ -1343,10 +1348,6 @@ std::string MakeSimplePredicate(const std::string_view& key, const std::string_v
return keyParamName;
}
-void MakeSimpleSlice(std::ostream& sql, NYdb::TParamsBuilder& params) {
- MakeSlice(std::string_view(), sql, params);
-}
-
NActors::IActor* MakeRange(std::unique_ptr<NKikimr::NGRpcService::IRequestCtx> p, TSharedStuff::TPtr stuff) {
return new TRangeRequest(std::move(p), std::move(stuff));
}
diff --git a/ydb/apps/etcd_proxy/service/etcd_impl.h b/ydb/apps/etcd_proxy/service/etcd_impl.h
index 4449d4b4007..36e1407c4ee 100644
--- a/ydb/apps/etcd_proxy/service/etcd_impl.h
+++ b/ydb/apps/etcd_proxy/service/etcd_impl.h
@@ -20,7 +20,5 @@ namespace NEtcd {
template<typename TValueType>
std::string AddParam(const std::string_view& name, NYdb::TParamsBuilder& params, const TValueType& value, size_t* counter = nullptr);
- void MakeSimpleSlice(std::ostream& sql, NYdb::TParamsBuilder& params);
-
std::string MakeSimplePredicate(const std::string_view& key, const std::string_view& rangeEnd, std::ostream& sql, NYdb::TParamsBuilder& params, size_t* paramsCounter = nullptr);
}
diff --git a/ydb/apps/etcd_proxy/service/etcd_watch.cpp b/ydb/apps/etcd_proxy/service/etcd_watch.cpp
index 4f9994e7ecb..9f5d35adf17 100644
--- a/ydb/apps/etcd_proxy/service/etcd_watch.cpp
+++ b/ydb/apps/etcd_proxy/service/etcd_watch.cpp
@@ -172,9 +172,9 @@ private:
std::ostringstream sql;
sql << Stuff->TablePrefix;
if (WithPrevious) {
- sql << "select * from (select max_by(TableRow(), `modified`) from `content` where " << revName << " > `modified` and " << where.view() << " group by `key`) flatten columns union all" << std::endl;
+ sql << "select * from (select max_by(TableRow(), `modified`) from `history` where " << revName << " > `modified` and " << where.view() << " group by `key`) flatten columns union all" << std::endl;
}
- sql << "select * from `content` where " << revName << " <= `modified` and " << where.view() << " order by `modified` asc;" << std::endl;
+ sql << "select * from `history` where " << revName << " <= `modified` and " << where.view() << " order by `modified` asc;" << std::endl;
// std::cout << std::endl << sql.view() << std::endl;
TQueryClient::TQueryResultFunc callback = [query = sql.str(), args = params.Build()](TQueryClient::TSession session) -> TAsyncExecuteQueryResult {
@@ -600,12 +600,11 @@ private:
const auto& revName = AddParam("Revision", params, Revision);
sql << "$Leases = select 0L as `lease` union all select `id` as `lease` from `leases` where unwrap(interval('PT1S') * `ttl` + `updated`) > CurrentUtcDatetime(`id`);" << std::endl;
- sql << "$Victims = select `key`, `value`, `created`, `modified`, `version`, `lease` from (";
- MakeSimpleSlice(sql, params);
- sql << ") as h left only join $Leases as l using(`lease`);" << std::endl;
+ sql << "$Victims = select `key`, `value`, `created`, `modified`, `version`, `lease` from `current` as h left only join $Leases as l using(`lease`);" << std::endl;
sql << "insert into `content`" << std::endl;
sql << "select `key`, `created`, " << revName << " as `modified`, 0L as `version`, `value`, `lease` from $Victims;" << std::endl;
+ sql << "delete from `current` on select `key` from $Victims;" << std::endl;
if constexpr (NotifyWatchtower) {
sql << "select `key`, `value`, `created`, `modified`, `version`, `lease` from $Victims;" << std::endl;