aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTony-Romanov <150126326+Tony-Romanov@users.noreply.github.com>2025-04-15 11:54:40 +0200
committerGitHub <noreply@github.com>2025-04-15 11:54:40 +0200
commitfec9f00552e1ce9db12dcdf32b31a0b38f273151 (patch)
tree38873d49affbd866f2fac7bc8251f636fea8f18d
parent5f24489529364ae871a5043450c4ef592a583e69 (diff)
downloadydb-fec9f00552e1ce9db12dcdf32b31a0b38f273151.tar.gz
Add default retries. (#17201)
-rw-r--r--ydb/apps/etcd_proxy/readme.txt2
-rw-r--r--ydb/apps/etcd_proxy/service/etcd_impl.cpp8
-rw-r--r--ydb/apps/etcd_proxy/service/etcd_watch.cpp13
3 files changed, 18 insertions, 5 deletions
diff --git a/ydb/apps/etcd_proxy/readme.txt b/ydb/apps/etcd_proxy/readme.txt
index e4745faceac..6ba25734de2 100644
--- a/ydb/apps/etcd_proxy/readme.txt
+++ b/ydb/apps/etcd_proxy/readme.txt
@@ -9,7 +9,7 @@ To remove the restriction two tasks must be done:
And other todo's:
- Add merics.
- Add logging.
-- Implement retries on "Transaction lock invslideted" error.
+- Add retry policies.
- Implement compaction with control of a requested revision.
- Implement the watches for ranges. (Now the watches work only with a single key or a prefix.)
- Add unit tests for watches.
diff --git a/ydb/apps/etcd_proxy/service/etcd_impl.cpp b/ydb/apps/etcd_proxy/service/etcd_impl.cpp
index e760672f020..bd250df6c16 100644
--- a/ydb/apps/etcd_proxy/service/etcd_impl.cpp
+++ b/ydb/apps/etcd_proxy/service/etcd_impl.cpp
@@ -12,6 +12,8 @@
namespace NEtcd {
+using namespace NYdb::NQuery;
+
namespace {
std::string GetNameWithIndex(const std::string_view& name, const size_t* counter) {
@@ -820,9 +822,13 @@ private:
this->MakeQueryWithParams(sql, params);
sql << "-- " << GetRequestName() << " <<<<" << std::endl;
// std::cout << std::endl << sql.view() << std::endl;
+
+ TQueryClient::TQueryResultFunc callback = [query = sql.str(), args = params.Build()](TQueryClient::TSession session) -> TAsyncExecuteQueryResult {
+ return session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), args);
+ };
const auto my = this->SelfId();
const auto ass = NActors::TlsActivationContext->ExecutorThread.ActorSystem;
- Stuff->Client->ExecuteQuery(sql.str(), NYdb::NQuery::TTxControl::BeginTx().CommitTx(), params.Build()).Subscribe([my, ass](const auto& future) {
+ Stuff->Client->RetryQuery(std::move(callback)).Subscribe([my, ass](const auto& future) {
if (const auto res = future.GetValueSync(); res.IsSuccess())
ass->Send(my, new NEtcd::TEvQueryResult(res.GetResultSets()));
else
diff --git a/ydb/apps/etcd_proxy/service/etcd_watch.cpp b/ydb/apps/etcd_proxy/service/etcd_watch.cpp
index 878961658ef..09c060fc4a2 100644
--- a/ydb/apps/etcd_proxy/service/etcd_watch.cpp
+++ b/ydb/apps/etcd_proxy/service/etcd_watch.cpp
@@ -9,6 +9,7 @@
namespace NEtcd {
using namespace NActors;
+using namespace NYdb::NQuery;
namespace {
@@ -69,9 +70,12 @@ private:
sql << "update `leases` set `updated` = CurrentUtcDatetime(`id`) where " << leasePraramName << " = `id`;" << std::endl;
sql << "select `id`, `ttl` - unwrap(cast(CurrentUtcDatetime(`id`) - `updated` as Int64) / 1000000L) as `granted` from `leases` where " << leasePraramName << " = `id`;" << std::endl;
+ TQueryClient::TQueryResultFunc callback = [query = sql.str(), args = params.Build()](TQueryClient::TSession session) -> TAsyncExecuteQueryResult {
+ return session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), args);
+ };
const auto my = this->SelfId();
const auto ass = NActors::TlsActivationContext->ExecutorThread.ActorSystem;
- Stuff->Client->ExecuteQuery(sql.str(), NYdb::NQuery::TTxControl::BeginTx().CommitTx(), params.Build()).Subscribe([my, ass](const auto& future) {
+ Stuff->Client->RetryQuery(std::move(callback)).Subscribe([my, ass](const auto& future) {
if (const auto res = future.GetValueSync(); res.IsSuccess())
ass->Send(my, new TEvQueryResult(res.GetResultSets()));
else
@@ -173,9 +177,12 @@ private:
sql << "select * from `content` where " << revName << " <= `modified` and " << where.view() << " order by `modified` asc;" << std::endl;
// std::cout << std::endl << sql.view() << std::endl;
+ TQueryClient::TQueryResultFunc callback = [query = sql.str(), args = params.Build()](TQueryClient::TSession session) -> TAsyncExecuteQueryResult {
+ return session.ExecuteQuery(query, TTxControl::BeginTx().CommitTx(), args);
+ };
const auto my = this->SelfId();
const auto ass = NActors::TlsActivationContext->ExecutorThread.ActorSystem;
- Stuff->Client->ExecuteQuery(sql.str(), NYdb::NQuery::TTxControl::BeginTx().CommitTx(), params.Build()).Subscribe([my, ass](const auto& future) {
+ Stuff->Client->RetryQuery(std::move(callback)).Subscribe([my, ass](const auto& future) {
if (const auto res = future.GetValueSync(); res.IsSuccess())
ass->Send(my, new TEvQueryResult(res.GetResultSets()));
else
@@ -610,7 +617,7 @@ private:
const auto my = this->SelfId();
const auto ass = NActors::TlsActivationContext->ExecutorThread.ActorSystem;
- Stuff->Client->ExecuteQuery(sql.str(), NYdb::NQuery::TTxControl::BeginTx().CommitTx(), params.Build()).Subscribe([my, ass](const auto& future) {
+ Stuff->Client->ExecuteQuery(sql.str(), TTxControl::BeginTx().CommitTx(), params.Build()).Subscribe([my, ass](const auto& future) {
if (const auto res = future.GetValueSync(); res.IsSuccess())
ass->Send(my, new TEvQueryResult(res.GetResultSets()));
else