diff options
author | Tony-Romanov <150126326+Tony-Romanov@users.noreply.github.com> | 2025-04-15 11:54:40 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-15 11:54:40 +0200 |
commit | fec9f00552e1ce9db12dcdf32b31a0b38f273151 (patch) | |
tree | 38873d49affbd866f2fac7bc8251f636fea8f18d | |
parent | 5f24489529364ae871a5043450c4ef592a583e69 (diff) | |
download | ydb-fec9f00552e1ce9db12dcdf32b31a0b38f273151.tar.gz |
Add default retries. (#17201)
-rw-r--r-- | ydb/apps/etcd_proxy/readme.txt | 2 | ||||
-rw-r--r-- | ydb/apps/etcd_proxy/service/etcd_impl.cpp | 8 | ||||
-rw-r--r-- | ydb/apps/etcd_proxy/service/etcd_watch.cpp | 13 |
3 files changed, 18 insertions, 5 deletions
diff --git a/ydb/apps/etcd_proxy/readme.txt b/ydb/apps/etcd_proxy/readme.txt index e4745faceac..6ba25734de2 100644 --- a/ydb/apps/etcd_proxy/readme.txt +++ b/ydb/apps/etcd_proxy/readme.txt @@ -9,7 +9,7 @@ To remove the restriction two tasks must be done: And other todo's: - Add merics. - Add logging. -- Implement retries on "Transaction lock invslideted" error. +- Add retry policies. - Implement compaction with control of a requested revision. - Implement the watches for ranges. (Now the watches work only with a single key or a prefix.) - Add unit tests for watches. diff --git a/ydb/apps/etcd_proxy/service/etcd_impl.cpp b/ydb/apps/etcd_proxy/service/etcd_impl.cpp index e760672f020..bd250df6c16 100644 --- a/ydb/apps/etcd_proxy/service/etcd_impl.cpp +++ b/ydb/apps/etcd_proxy/service/etcd_impl.cpp @@ -12,6 +12,8 @@ namespace NEtcd { +using namespace NYdb::NQuery; + namespace { std::string GetNameWithIndex(const std::string_view& name, const size_t* counter) { @@ -820,9 +822,13 @@ private: this->MakeQueryWithParams(sql, params); sql << "-- " << GetRequestName() << " <<<<" << std::endl; // std::cout << std::endl << sql.view() << std::endl; + + TQueryClient::TQueryResultFunc callback = [query = sql.str(), args = params.Build()](TQueryClient::TSession session) -> TAsyncExecuteQueryResult { + return session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), args); + }; const auto my = this->SelfId(); const auto ass = NActors::TlsActivationContext->ExecutorThread.ActorSystem; - Stuff->Client->ExecuteQuery(sql.str(), NYdb::NQuery::TTxControl::BeginTx().CommitTx(), params.Build()).Subscribe([my, ass](const auto& future) { + Stuff->Client->RetryQuery(std::move(callback)).Subscribe([my, ass](const auto& future) { if (const auto res = future.GetValueSync(); res.IsSuccess()) ass->Send(my, new NEtcd::TEvQueryResult(res.GetResultSets())); else diff --git a/ydb/apps/etcd_proxy/service/etcd_watch.cpp b/ydb/apps/etcd_proxy/service/etcd_watch.cpp index 878961658ef..09c060fc4a2 100644 --- a/ydb/apps/etcd_proxy/service/etcd_watch.cpp +++ b/ydb/apps/etcd_proxy/service/etcd_watch.cpp @@ -9,6 +9,7 @@ namespace NEtcd { using namespace NActors; +using namespace NYdb::NQuery; namespace { @@ -69,9 +70,12 @@ private: sql << "update `leases` set `updated` = CurrentUtcDatetime(`id`) where " << leasePraramName << " = `id`;" << std::endl; sql << "select `id`, `ttl` - unwrap(cast(CurrentUtcDatetime(`id`) - `updated` as Int64) / 1000000L) as `granted` from `leases` where " << leasePraramName << " = `id`;" << std::endl; + TQueryClient::TQueryResultFunc callback = [query = sql.str(), args = params.Build()](TQueryClient::TSession session) -> TAsyncExecuteQueryResult { + return session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), args); + }; const auto my = this->SelfId(); const auto ass = NActors::TlsActivationContext->ExecutorThread.ActorSystem; - Stuff->Client->ExecuteQuery(sql.str(), NYdb::NQuery::TTxControl::BeginTx().CommitTx(), params.Build()).Subscribe([my, ass](const auto& future) { + Stuff->Client->RetryQuery(std::move(callback)).Subscribe([my, ass](const auto& future) { if (const auto res = future.GetValueSync(); res.IsSuccess()) ass->Send(my, new TEvQueryResult(res.GetResultSets())); else @@ -173,9 +177,12 @@ private: sql << "select * from `content` where " << revName << " <= `modified` and " << where.view() << " order by `modified` asc;" << std::endl; // std::cout << std::endl << sql.view() << std::endl; + TQueryClient::TQueryResultFunc callback = [query = sql.str(), args = params.Build()](TQueryClient::TSession session) -> TAsyncExecuteQueryResult { + return session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), args); + }; const auto my = this->SelfId(); const auto ass = NActors::TlsActivationContext->ExecutorThread.ActorSystem; - Stuff->Client->ExecuteQuery(sql.str(), NYdb::NQuery::TTxControl::BeginTx().CommitTx(), params.Build()).Subscribe([my, ass](const auto& future) { + Stuff->Client->RetryQuery(std::move(callback)).Subscribe([my, ass](const auto& future) { if (const auto res = future.GetValueSync(); res.IsSuccess()) ass->Send(my, new TEvQueryResult(res.GetResultSets())); else @@ -610,7 +617,7 @@ private: const auto my = this->SelfId(); const auto ass = NActors::TlsActivationContext->ExecutorThread.ActorSystem; - Stuff->Client->ExecuteQuery(sql.str(), NYdb::NQuery::TTxControl::BeginTx().CommitTx(), params.Build()).Subscribe([my, ass](const auto& future) { + Stuff->Client->ExecuteQuery(sql.str(), TTxControl::BeginTx().CommitTx(), params.Build()).Subscribe([my, ass](const auto& future) { if (const auto res = future.GetValueSync(); res.IsSuccess()) ass->Send(my, new TEvQueryResult(res.GetResultSets())); else |