summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <[email protected]>2025-06-23 10:49:41 +0300
committerrobot-piglet <[email protected]>2025-06-23 11:05:58 +0300
commitbb0b9384b505c7ce8664a29174f1a89f6e72710b (patch)
tree554e9a763304f304f1cbf5e9c3acc55e6033221b
parent6d759dd686592eca66abca40d9e4de3bebe5ad14 (diff)
Intermediate changes
commit_hash:70eea6b3e17f4aafae5eadb49724ba9036a55372
-rw-r--r--yql/essentials/sql/v1/complete/name/cache/cache.h2
-rw-r--r--yql/essentials/sql/v1/complete/name/cache/cached.h48
-rw-r--r--yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp85
-rw-r--r--yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp2
-rw-r--r--yql/essentials/sql/v1/complete/name/service/impatient/name_service.cpp32
-rw-r--r--yql/essentials/sql/v1/complete/name/service/impatient/name_service.h9
-rw-r--r--yql/essentials/sql/v1/complete/name/service/impatient/ya.make11
-rw-r--r--yql/essentials/sql/v1/complete/name/service/ya.make1
-rw-r--r--yql/essentials/sql/v1/complete/sql_complete_ut.cpp18
-rw-r--r--yql/essentials/sql/v1/complete/ut/ya.make1
10 files changed, 188 insertions, 21 deletions
diff --git a/yql/essentials/sql/v1/complete/name/cache/cache.h b/yql/essentials/sql/v1/complete/name/cache/cache.h
index d65b41d5636..357989edc50 100644
--- a/yql/essentials/sql/v1/complete/name/cache/cache.h
+++ b/yql/essentials/sql/v1/complete/name/cache/cache.h
@@ -30,7 +30,7 @@ namespace NSQLComplete {
using TPtr = TIntrusivePtr<ICache>;
struct TEntry {
- TValue Value = {};
+ TMaybe<TValue> Value = Nothing();
bool IsExpired = true;
};
diff --git a/yql/essentials/sql/v1/complete/name/cache/cached.h b/yql/essentials/sql/v1/complete/name/cache/cached.h
index b265f6fde13..80edb159e5a 100644
--- a/yql/essentials/sql/v1/complete/name/cache/cached.h
+++ b/yql/essentials/sql/v1/complete/name/cache/cached.h
@@ -16,17 +16,47 @@ namespace NSQLComplete {
}
NThreading::TFuture<TValue> operator()(TKey key) const {
- return Cache_->Get(key).Apply([cache = Cache_,
- query = Query_,
- key = std::move(key)](auto f) {
- typename ICache<TKey, TValue>::TEntry entry = f.ExtractValue();
- if (entry.IsExpired) {
- query(key).Apply([cache, key = std::move(key)](auto f) {
- cache->Update(key, f.ExtractValue());
- });
+ auto promise = NThreading::NewPromise<TValue>();
+ Cache_->Get(key).Apply([cache = Cache_,
+ query = Query_,
+ key = std::move(key),
+ promise](auto f) mutable {
+ typename ICache<TKey, TValue>::TEntry entry;
+ try {
+ entry = f.ExtractValue();
+ } catch (...) {
+ promise.SetException(std::current_exception());
+ return;
}
- return std::move(entry.Value);
+
+ if (!entry.IsExpired) {
+ Y_ENSURE(entry.Value.Defined());
+ promise.SetValue(std::move(*entry.Value));
+ return;
+ }
+
+ bool isEmpty = entry.Value.Empty();
+ if (!isEmpty) {
+ promise.SetValue(std::move(*entry.Value));
+ }
+
+ query(key).Apply([cache, key = std::move(key), isEmpty, promise](auto f) mutable {
+ TValue value;
+ try {
+ value = f.ExtractValue();
+ } catch (...) {
+ promise.SetException(std::current_exception());
+ return;
+ }
+
+ if (isEmpty) {
+ promise.SetValue(value);
+ }
+
+ cache->Update(key, std::move(value));
+ });
});
+ return promise;
}
private:
diff --git a/yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp b/yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp
index 18607dea438..912b104d781 100644
--- a/yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp
+++ b/yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp
@@ -8,8 +8,45 @@
using namespace NSQLComplete;
+class TFailableCache: public ICache<TString, TString> {
+public:
+ NThreading::TFuture<TEntry> Get(const TString& key) const try {
+ if (IsGetFailing) {
+ ythrow yexception() << "O_O";
+ }
+ return NThreading::MakeFuture<TEntry>({.Value = key, .IsExpired = IsExpired});
+ } catch (...) {
+ return NThreading::MakeErrorFuture<TEntry>(std::current_exception());
+ }
+
+ NThreading::TFuture<void> Update(const TString& /* key */, TString /* value */) const try {
+ if (IsUpdateFailing) {
+ ythrow yexception() << "O_O";
+ }
+ return NThreading::MakeFuture();
+ } catch (...) {
+ return NThreading::MakeErrorFuture<void>(std::current_exception());
+ }
+
+ bool IsGetFailing = false;
+ bool IsExpired = false;
+ bool IsUpdateFailing = false;
+};
+
Y_UNIT_TEST_SUITE(CachedQueryTests) {
+ Y_UNIT_TEST(OnEmpty_WhenGet_ThenWaitUntilReceived) {
+ auto cache = MakeLocalCache<TString, TString>(
+ NMonotonic::CreateDefaultMonotonicTimeProvider(), {.TTL = TDuration::Zero()});
+ auto cached = TCachedQuery<TString, TString>(cache, [&](const TString& key) {
+ return NThreading::MakeFuture<TString>(key);
+ });
+
+ TString value = cached("1").GetValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL(value, "1");
+ }
+
Y_UNIT_TEST(OnExpired_WhenApplied_ThenDefferedUpdateAndReturnOld) {
size_t queried = 0;
auto cache = MakeLocalCache<TString, TString>(
@@ -27,4 +64,52 @@ Y_UNIT_TEST_SUITE(CachedQueryTests) {
UNIT_ASSERT_VALUES_EQUAL(cached("1").GetValueSync(), "1");
}
+ Y_UNIT_TEST(OnQueryError_WhenApplied_ThenNoDeadlock) {
+ size_t queried = 0;
+ auto cache = MakeLocalCache<TString, TString>(
+ NMonotonic::CreateDefaultMonotonicTimeProvider(), {.TTL = TDuration::Zero()});
+ auto cached = TCachedQuery<TString, TString>(cache, [&](const TString&) {
+ queried += 1;
+ try {
+ ythrow yexception() << "T_T";
+ } catch (...) {
+ return NThreading::MakeErrorFuture<TString>(std::current_exception());
+ }
+ });
+
+ UNIT_ASSERT_EXCEPTION(cached("1").GetValueSync(), yexception);
+ UNIT_ASSERT_EXCEPTION(cached("1").GetValueSync(), yexception);
+ UNIT_ASSERT_VALUES_EQUAL(queried, 2);
+ }
+
+ Y_UNIT_TEST(OnFailingCacheGet_WhenApplied_ThenNoDeadlock) {
+ size_t queried = 0;
+ auto cache = MakeIntrusive<TFailableCache>();
+ auto cached = TCachedQuery<TString, TString>(cache, [&](const TString& key) {
+ queried += 1;
+ return NThreading::MakeFuture<TString>(key);
+ });
+
+ cache->IsGetFailing = true;
+
+ UNIT_ASSERT_EXCEPTION(cached("1").GetValueSync(), yexception);
+ UNIT_ASSERT_VALUES_EQUAL(queried, 0);
+ }
+
+ Y_UNIT_TEST(OnFailingCacheUpdate_WhenApplied_ThenNoErrorAndNotCached) {
+ size_t queried = 0;
+ auto cache = MakeIntrusive<TFailableCache>();
+ auto cached = TCachedQuery<TString, TString>(cache, [&](const TString& key) {
+ queried += 1;
+ return NThreading::MakeFuture<TString>(key);
+ });
+
+ cache->IsExpired = true;
+ cache->IsUpdateFailing = true;
+
+ UNIT_ASSERT_VALUES_EQUAL(cached("1").GetValueSync(), "1");
+ UNIT_ASSERT_VALUES_EQUAL(cached("1").GetValueSync(), "1");
+ UNIT_ASSERT_VALUES_EQUAL(queried, 2);
+ }
+
} // Y_UNIT_TEST_SUITE(CachedQueryTests)
diff --git a/yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp b/yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp
index 0823991e2d4..c518dadf271 100644
--- a/yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp
+++ b/yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp
@@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(LocalCacheTests) {
auto entry = cache->Get("1").GetValueSync();
- UNIT_ASSERT_VALUES_EQUAL(entry.Value, "");
+ UNIT_ASSERT_VALUES_EQUAL(entry.Value, Nothing());
UNIT_ASSERT_VALUES_EQUAL(entry.IsExpired, true);
}
diff --git a/yql/essentials/sql/v1/complete/name/service/impatient/name_service.cpp b/yql/essentials/sql/v1/complete/name/service/impatient/name_service.cpp
new file mode 100644
index 00000000000..9d7dbe28307
--- /dev/null
+++ b/yql/essentials/sql/v1/complete/name/service/impatient/name_service.cpp
@@ -0,0 +1,32 @@
+#include "name_service.h"
+
+namespace NSQLComplete {
+
+ namespace {
+
+ class TNameService: public INameService {
+ public:
+ explicit TNameService(INameService::TPtr origin)
+ : Origin_(std::move(origin))
+ {
+ }
+
+ NThreading::TFuture<TNameResponse> Lookup(TNameRequest request) const override {
+ auto future = Origin_->Lookup(std::move(request));
+ if (future.IsReady()) {
+ return future;
+ }
+ return NThreading::MakeFuture<TNameResponse>({});
+ }
+
+ private:
+ INameService::TPtr Origin_;
+ };
+
+ } // namespace
+
+ INameService::TPtr MakeImpatientNameService(INameService::TPtr origin) {
+ return new TNameService(std::move(origin));
+ }
+
+} // namespace NSQLComplete
diff --git a/yql/essentials/sql/v1/complete/name/service/impatient/name_service.h b/yql/essentials/sql/v1/complete/name/service/impatient/name_service.h
new file mode 100644
index 00000000000..b6f02508118
--- /dev/null
+++ b/yql/essentials/sql/v1/complete/name/service/impatient/name_service.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <yql/essentials/sql/v1/complete/name/service/name_service.h>
+
+namespace NSQLComplete {
+
+ INameService::TPtr MakeImpatientNameService(INameService::TPtr origin);
+
+} // namespace NSQLComplete
diff --git a/yql/essentials/sql/v1/complete/name/service/impatient/ya.make b/yql/essentials/sql/v1/complete/name/service/impatient/ya.make
new file mode 100644
index 00000000000..96fc40ed74b
--- /dev/null
+++ b/yql/essentials/sql/v1/complete/name/service/impatient/ya.make
@@ -0,0 +1,11 @@
+LIBRARY()
+
+SRCS(
+ name_service.cpp
+)
+
+PEERDIR(
+ yql/essentials/sql/v1/complete/name/service
+)
+
+END()
diff --git a/yql/essentials/sql/v1/complete/name/service/ya.make b/yql/essentials/sql/v1/complete/name/service/ya.make
index 2f8118f882d..75a55e0c71f 100644
--- a/yql/essentials/sql/v1/complete/name/service/ya.make
+++ b/yql/essentials/sql/v1/complete/name/service/ya.make
@@ -14,6 +14,7 @@ END()
RECURSE(
binding
cluster
+ impatient
ranking
schema
static
diff --git a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp
index 6d6d91d0dbb..3440fd16c20 100644
--- a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp
+++ b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp
@@ -10,6 +10,7 @@
#include <yql/essentials/sql/v1/complete/name/service/ranking/frequency.h>
#include <yql/essentials/sql/v1/complete/name/service/ranking/ranking.h>
#include <yql/essentials/sql/v1/complete/name/service/cluster/name_service.h>
+#include <yql/essentials/sql/v1/complete/name/service/impatient/name_service.h>
#include <yql/essentials/sql/v1/complete/name/service/schema/name_service.h>
#include <yql/essentials/sql/v1/complete/name/service/static/name_service.h>
#include <yql/essentials/sql/v1/complete/name/service/union/name_service.h>
@@ -131,8 +132,13 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) {
TVector<INameService::TPtr> children = {
MakeStaticNameService(std::move(names), frequency),
- MakeSchemaNameService(MakeSimpleSchema(MakeStaticSimpleSchema(clustersJson))),
- MakeClusterNameService(MakeStaticClusterDiscovery(std::move(clusters))),
+ MakeImpatientNameService(
+ MakeSchemaNameService(
+ MakeSimpleSchema(
+ MakeStaticSimpleSchema(clustersJson)))),
+ MakeImpatientNameService(
+ MakeClusterNameService(
+ MakeStaticClusterDiscovery(std::move(clusters)))),
};
INameService::TPtr service = MakeUnionNameService(
std::move(children), MakeDefaultRanking(frequency));
@@ -1411,10 +1417,6 @@ JOIN yt:$cluster_name.test;
TVector<TCandidate> aliceExpected = {{TableName, "`alice`"}};
TVector<TCandidate> petyaExpected = {{TableName, "`petya`"}};
- // Cache is empty
- UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT * FROM "), empty);
- UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT * FROM "), empty);
-
// Updates in backround
UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT * FROM "), aliceExpected);
UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT * FROM "), petyaExpected);
@@ -1423,10 +1425,6 @@ JOIN yt:$cluster_name.test;
TVector<TCandidate> aliceExpected = {{ColumnName, "alice"}};
TVector<TCandidate> petyaExpected = {{ColumnName, "petya"}};
- // Cache is empty
- UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT a# FROM alice"), empty);
- UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT p# FROM petya"), empty);
-
// Updates in backround
UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT a# FROM alice"), aliceExpected);
UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT p# FROM petya"), petyaExpected);
diff --git a/yql/essentials/sql/v1/complete/ut/ya.make b/yql/essentials/sql/v1/complete/ut/ya.make
index 0d2afc80d89..d2deebbc0ad 100644
--- a/yql/essentials/sql/v1/complete/ut/ya.make
+++ b/yql/essentials/sql/v1/complete/ut/ya.make
@@ -13,6 +13,7 @@ PEERDIR(
yql/essentials/sql/v1/complete/name/object/simple/cached
yql/essentials/sql/v1/complete/name/object/simple/static
yql/essentials/sql/v1/complete/name/service/cluster
+ yql/essentials/sql/v1/complete/name/service/impatient
yql/essentials/sql/v1/complete/name/service/schema
yql/essentials/sql/v1/complete/name/service/static
yql/essentials/sql/v1/complete/name/service/union