diff options
author | Tony-Romanov <150126326+Tony-Romanov@users.noreply.github.com> | 2025-04-17 16:11:51 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-17 16:11:51 +0200 |
commit | a97c3d07114bc6de0225a84da5224858ac2928ce (patch) | |
tree | 47ab27d2cbd0234591695ca1ba12ecc33ec8819f | |
parent | f309eb309a259c43e7d2df60537da344fe76435a (diff) | |
download | ydb-a97c3d07114bc6de0225a84da5224858ac2928ce.tar.gz |
Return back table for current slice. (#17352)
-rw-r--r-- | ydb/apps/etcd_proxy/proxy.cpp | 8 | ||||
-rw-r--r-- | ydb/apps/etcd_proxy/service/create.sql | 22 | ||||
-rw-r--r-- | ydb/apps/etcd_proxy/service/etcd_base_init.cpp | 2 | ||||
-rw-r--r-- | ydb/apps/etcd_proxy/service/etcd_impl.cpp | 45 | ||||
-rw-r--r-- | ydb/apps/etcd_proxy/service/etcd_impl.h | 2 | ||||
-rw-r--r-- | ydb/apps/etcd_proxy/service/etcd_watch.cpp | 9 |
6 files changed, 53 insertions, 35 deletions
diff --git a/ydb/apps/etcd_proxy/proxy.cpp b/ydb/apps/etcd_proxy/proxy.cpp index 40bd51b513f..f08b4a8dd00 100644 --- a/ydb/apps/etcd_proxy/proxy.cpp +++ b/ydb/apps/etcd_proxy/proxy.cpp @@ -210,7 +210,13 @@ int TProxy::ImportDatabase() { const auto driver = NYdb::TDriver(config); auto client = NYdb::NTable::TTableClient(driver); - if (const auto res = client.BulkUpsert(Database + Folder + "/content", std::move(value)).ExtractValueSync(); !res.IsSuccess()) { + if (const auto res = client.BulkUpsert(Database + Folder + "/current", std::move(value)).ExtractValueSync(); !res.IsSuccess()) { + std::cout << res.GetIssues().ToString() << std::endl; + return 1; + } + + const auto& param = NYdb::TParamsBuilder().AddParam("$Prefix").String(ImportPrefix_).Build().Build(); + if (const auto res = Stuff->Client->ExecuteQuery("insert into `history` select * from `current` where startswith(`key`,$Prefix);", NYdb::NQuery::TTxControl::NoTx(), param).ExtractValueSync(); !res.IsSuccess()) { std::cout << res.GetIssues().ToString() << std::endl; return 1; } diff --git a/ydb/apps/etcd_proxy/service/create.sql b/ydb/apps/etcd_proxy/service/create.sql index 284d782fe95..dba80ea67c6 100644 --- a/ydb/apps/etcd_proxy/service/create.sql +++ b/ydb/apps/etcd_proxy/service/create.sql @@ -1,4 +1,16 @@ -CREATE TABLE content +CREATE TABLE current +( + `key` Bytes NOT NULL, + `created` Int64 NOT NULL, + `modified` Int64 NOT NULL, + `version` Int64 NOT NULL, + `value` Bytes NOT NULL, + `lease` Int64 NOT NULL, + PRIMARY KEY (`key`) +) +WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 23, AUTO_PARTITIONING_PARTITION_SIZE_MB = 11); + +CREATE TABLE history ( `key` Bytes NOT NULL, `created` Int64 NOT NULL, @@ -7,7 +19,8 @@ CREATE TABLE content `value` Bytes NOT NULL, `lease` Int64 NOT NULL, PRIMARY KEY (`key`, `modified`) -); +) +WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 23, AUTO_PARTITIONING_PARTITION_SIZE_MB = 101); CREATE TABLE leases ( @@ -16,6 +29,7 @@ CREATE TABLE leases `created` Datetime NOT NULL, `updated` Datetime NOT NULL, PRIMARY KEY (`id`) -); +) +WITH (AUTO_PARTITIONING_BY_LOAD = ENABLED, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 23, AUTO_PARTITIONING_PARTITION_SIZE_MB = 11); -ALTER TABLE content ADD CHANGEFEED changes WITH (format="JSON", mode="UPDATES"); +ALTER TABLE history ADD CHANGEFEED changes WITH (format="JSON", mode="UPDATES"); diff --git a/ydb/apps/etcd_proxy/service/etcd_base_init.cpp b/ydb/apps/etcd_proxy/service/etcd_base_init.cpp index 196a27a3066..d591cc87d54 100644 --- a/ydb/apps/etcd_proxy/service/etcd_base_init.cpp +++ b/ydb/apps/etcd_proxy/service/etcd_base_init.cpp @@ -9,7 +9,7 @@ std::string GetCreateTablesSQL(const std::string& prefix) { } std::string GetLastRevisionSQL(const std::string& prefix) { - return prefix + "select nvl(max(`modified`), 1L) from `content`; select nvl(max(`id`), 1L) from `leases`;"; + return prefix + "select nvl(max(`modified`), 1L) from `history`; select nvl(max(`id`), 1L) from `leases`;"; } } diff --git a/ydb/apps/etcd_proxy/service/etcd_impl.cpp b/ydb/apps/etcd_proxy/service/etcd_impl.cpp index 4f613191cb3..a72f2200504 100644 --- a/ydb/apps/etcd_proxy/service/etcd_impl.cpp +++ b/ydb/apps/etcd_proxy/service/etcd_impl.cpp @@ -35,10 +35,13 @@ struct TOperation { }; void MakeSlice(const std::string_view& where, std::ostream& sql, NYdb::TParamsBuilder& params, size_t* paramsCounter = nullptr, const i64 revision = 0LL) { - sql << "select * from (select max_by(TableRow(), `modified`) from `content`" << where; - if (revision) + if (revision) { + sql << "select * from (select max_by(TableRow(), `modified`) from `history`" << where; sql << " and " << AddParam("Rev", params, revision, paramsCounter) << " >= `modified`"; - sql << " group by `key`) flatten columns where 0L < `version`"; + sql << " group by `key`) flatten columns where 0L < `version`"; + } else { + sql << "select * from `current`" << where; + } } void MakeSlice(const std::string_view& key, const std::string_view& rangeEnd, std::ostream& sql, NYdb::TParamsBuilder& params, size_t* paramsCounter, const i64 revision = 0LL) { @@ -252,7 +255,7 @@ struct TPut : public TOperation { const auto& oldResultSetName = GetNameWithIndex("Old", resultsCounter); const auto& newResultSetName = GetNameWithIndex("New", resultsCounter); - sql << oldResultSetName << " = select * from (select * from `content`" << keyFilter << " order by `modified` desc limit 1UL) where 0L < `version`;" << std::endl; + sql << oldResultSetName << " = select * from `current` " << keyFilter << ';' << std::endl; sql << newResultSetName << " = select" << std::endl; sql << '\t' << keyParamName << " as `key`," << std::endl; sql << '\t' << "if(`version` > 0L, `created`, $Revision) as `created`," << std::endl; @@ -274,7 +277,8 @@ struct TPut : public TOperation { sql << " where " << txnFilter << ')'; sql << ';' << std::endl; - sql << "insert into `content` select * from " << newResultSetName << ';' << std::endl; + sql << (update ? "update `current` on" : "upsert into `current`") << " select * from " << newResultSetName << ';' << std::endl; + sql << "insert into `history` select * from " << newResultSetName << ';' << std::endl; if (GetPrevious || NotifyWatchtower || update) { if (resultsCounter) @@ -376,7 +380,7 @@ struct TDeleteRange : public TOperation { sql << " where " << txnFilter; sql << ';' << std::endl; - sql << "insert into `content`" << std::endl; + sql << "insert into `history`" << std::endl; sql << "select `key`, `created`, $Revision as `modified`, 0L as `version`, `value`, `lease` from " << oldResultSetName << ';' << std::endl; sql << "select count(*) from " << oldResultSetName << ';' << std::endl; @@ -385,6 +389,10 @@ struct TDeleteRange : public TOperation { ++(*resultsCounter); sql << "select `key`, `value`, `created`, `modified`, `version`, `lease` from " << oldResultSetName << ';' << std::endl; } + sql << "delete from `current`" << keyFilter; + if (!txnFilter.empty()) + sql << " and " << txnFilter; + sql << ';' << std::endl; } void MakeQueryWithParams(std::ostream& sql, NYdb::TParamsBuilder& params, size_t* paramsCounter = nullptr, size_t* resultsCounter = nullptr, const std::string_view& txnFilter = {}) { @@ -1074,12 +1082,12 @@ private: } void MakeQueryWithParams(std::ostream& sql, NYdb::TParamsBuilder& params) final { - sql << "$Trash = select c.key as key, c.modified as modified from `content` as c inner join (" << std::endl; - sql << "select max_by((`key`, `modified`), `modified`) as pair from `content`" << std::endl; + sql << "$Trash = select c.key as key, c.modified as modified from `history` as c inner join (" << std::endl; + sql << "select max_by((`key`, `modified`), `modified`) as pair from `history`" << std::endl; sql << "where `modified` < " << AddParam("Revision", params, KeyRevision) << " and 0L = `version` group by `key`" << std::endl; sql << ") as keys on keys.pair.0 = c.key where c.modified <= keys.pair.1;" << std::endl; sql << "select count(*) from $Trash;" << std::endl; - sql << "delete from `content` on select * from $Trash;" << std::endl; + sql << "delete from `history` on select * from $Trash;" << std::endl; } void ReplyWith(const NYdb::TResultSets& results, const TActorContext& ctx) final { @@ -1163,16 +1171,15 @@ private: sql << "select count(*) > 0UL from `leases` where " << leaseParamName << " = `id`;" << std::endl; - sql << "$Victims = "; - MakeSimpleSlice(sql, params); - sql << " and " << leaseParamName << " = `lease`;" << std::endl; - - sql << "insert into `content`" << std::endl; - sql << "select `key`, `created`, " << revisionParamName << " as `modified`, 0L as `version`, `value`, `lease` from $Victims;" << std::endl; + sql << "$Victims = select `key`, `value`, `created`, `modified`, `version`, `lease` from `current` where " << leaseParamName << " = `lease`;" << std::endl; if constexpr (NotifyWatchtower) { sql << "select `key`, `value`, `created`, `modified`, `version`, `lease` from $Victims;" << std::endl; } + + sql << "insert into `history`" << std::endl; + sql << "select `key`, `created`, " << revisionParamName << " as `modified`, 0L as `version`, `value`, `lease` from $Victims;" << std::endl; + sql << "delete from `current` on select `key` from $Victims;" << std::endl; sql << "delete from `leases` where " << leaseParamName << " = `id`;" << std::endl; } @@ -1237,9 +1244,7 @@ private: sql << "select `ttl`, `ttl` - unwrap(cast(CurrentUtcDatetime(`id`) - `updated` as Int64) / 1000000L) as `granted` from `leases` where " << leaseParamName << " = `id`;" << std::endl; if (Keys) { - sql << "select `key` from ("; - MakeSimpleSlice(sql, params); - sql << " and " << leaseParamName << " = `lease`);" << std::endl; + sql << "select `key` from `current` where " << leaseParamName << " = `lease`;" << std::endl; } } @@ -1343,10 +1348,6 @@ std::string MakeSimplePredicate(const std::string_view& key, const std::string_v return keyParamName; } -void MakeSimpleSlice(std::ostream& sql, NYdb::TParamsBuilder& params) { - MakeSlice(std::string_view(), sql, params); -} - NActors::IActor* MakeRange(std::unique_ptr<NKikimr::NGRpcService::IRequestCtx> p, TSharedStuff::TPtr stuff) { return new TRangeRequest(std::move(p), std::move(stuff)); } diff --git a/ydb/apps/etcd_proxy/service/etcd_impl.h b/ydb/apps/etcd_proxy/service/etcd_impl.h index 4449d4b4007..36e1407c4ee 100644 --- a/ydb/apps/etcd_proxy/service/etcd_impl.h +++ b/ydb/apps/etcd_proxy/service/etcd_impl.h @@ -20,7 +20,5 @@ namespace NEtcd { template<typename TValueType> std::string AddParam(const std::string_view& name, NYdb::TParamsBuilder& params, const TValueType& value, size_t* counter = nullptr); - void MakeSimpleSlice(std::ostream& sql, NYdb::TParamsBuilder& params); - std::string MakeSimplePredicate(const std::string_view& key, const std::string_view& rangeEnd, std::ostream& sql, NYdb::TParamsBuilder& params, size_t* paramsCounter = nullptr); } diff --git a/ydb/apps/etcd_proxy/service/etcd_watch.cpp b/ydb/apps/etcd_proxy/service/etcd_watch.cpp index 4f9994e7ecb..9f5d35adf17 100644 --- a/ydb/apps/etcd_proxy/service/etcd_watch.cpp +++ b/ydb/apps/etcd_proxy/service/etcd_watch.cpp @@ -172,9 +172,9 @@ private: std::ostringstream sql; sql << Stuff->TablePrefix; if (WithPrevious) { - sql << "select * from (select max_by(TableRow(), `modified`) from `content` where " << revName << " > `modified` and " << where.view() << " group by `key`) flatten columns union all" << std::endl; + sql << "select * from (select max_by(TableRow(), `modified`) from `history` where " << revName << " > `modified` and " << where.view() << " group by `key`) flatten columns union all" << std::endl; } - sql << "select * from `content` where " << revName << " <= `modified` and " << where.view() << " order by `modified` asc;" << std::endl; + sql << "select * from `history` where " << revName << " <= `modified` and " << where.view() << " order by `modified` asc;" << std::endl; // std::cout << std::endl << sql.view() << std::endl; TQueryClient::TQueryResultFunc callback = [query = sql.str(), args = params.Build()](TQueryClient::TSession session) -> TAsyncExecuteQueryResult { @@ -600,12 +600,11 @@ private: const auto& revName = AddParam("Revision", params, Revision); sql << "$Leases = select 0L as `lease` union all select `id` as `lease` from `leases` where unwrap(interval('PT1S') * `ttl` + `updated`) > CurrentUtcDatetime(`id`);" << std::endl; - sql << "$Victims = select `key`, `value`, `created`, `modified`, `version`, `lease` from ("; - MakeSimpleSlice(sql, params); - sql << ") as h left only join $Leases as l using(`lease`);" << std::endl; + sql << "$Victims = select `key`, `value`, `created`, `modified`, `version`, `lease` from `current` as h left only join $Leases as l using(`lease`);" << std::endl; sql << "insert into `content`" << std::endl; sql << "select `key`, `created`, " << revName << " as `modified`, 0L as `version`, `value`, `lease` from $Victims;" << std::endl; + sql << "delete from `current` on select `key` from $Victims;" << std::endl; if constexpr (NotifyWatchtower) { sql << "select `key`, `value`, `created`, `modified`, `version`, `lease` from $Victims;" << std::endl; |