diff options
author | robot-piglet <[email protected]> | 2025-06-23 10:49:41 +0300 |
---|---|---|
committer | robot-piglet <[email protected]> | 2025-06-23 11:05:58 +0300 |
commit | bb0b9384b505c7ce8664a29174f1a89f6e72710b (patch) | |
tree | 554e9a763304f304f1cbf5e9c3acc55e6033221b | |
parent | 6d759dd686592eca66abca40d9e4de3bebe5ad14 (diff) |
Intermediate changes
commit_hash:70eea6b3e17f4aafae5eadb49724ba9036a55372
10 files changed, 188 insertions, 21 deletions
diff --git a/yql/essentials/sql/v1/complete/name/cache/cache.h b/yql/essentials/sql/v1/complete/name/cache/cache.h index d65b41d5636..357989edc50 100644 --- a/yql/essentials/sql/v1/complete/name/cache/cache.h +++ b/yql/essentials/sql/v1/complete/name/cache/cache.h @@ -30,7 +30,7 @@ namespace NSQLComplete { using TPtr = TIntrusivePtr<ICache>; struct TEntry { - TValue Value = {}; + TMaybe<TValue> Value = Nothing(); bool IsExpired = true; }; diff --git a/yql/essentials/sql/v1/complete/name/cache/cached.h b/yql/essentials/sql/v1/complete/name/cache/cached.h index b265f6fde13..80edb159e5a 100644 --- a/yql/essentials/sql/v1/complete/name/cache/cached.h +++ b/yql/essentials/sql/v1/complete/name/cache/cached.h @@ -16,17 +16,47 @@ namespace NSQLComplete { } NThreading::TFuture<TValue> operator()(TKey key) const { - return Cache_->Get(key).Apply([cache = Cache_, - query = Query_, - key = std::move(key)](auto f) { - typename ICache<TKey, TValue>::TEntry entry = f.ExtractValue(); - if (entry.IsExpired) { - query(key).Apply([cache, key = std::move(key)](auto f) { - cache->Update(key, f.ExtractValue()); - }); + auto promise = NThreading::NewPromise<TValue>(); + Cache_->Get(key).Apply([cache = Cache_, + query = Query_, + key = std::move(key), + promise](auto f) mutable { + typename ICache<TKey, TValue>::TEntry entry; + try { + entry = f.ExtractValue(); + } catch (...) { + promise.SetException(std::current_exception()); + return; } - return std::move(entry.Value); + + if (!entry.IsExpired) { + Y_ENSURE(entry.Value.Defined()); + promise.SetValue(std::move(*entry.Value)); + return; + } + + bool isEmpty = entry.Value.Empty(); + if (!isEmpty) { + promise.SetValue(std::move(*entry.Value)); + } + + query(key).Apply([cache, key = std::move(key), isEmpty, promise](auto f) mutable { + TValue value; + try { + value = f.ExtractValue(); + } catch (...) { + promise.SetException(std::current_exception()); + return; + } + + if (isEmpty) { + promise.SetValue(value); + } + + cache->Update(key, std::move(value)); + }); }); + return promise; } private: diff --git a/yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp b/yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp index 18607dea438..912b104d781 100644 --- a/yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp +++ b/yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp @@ -8,8 +8,45 @@ using namespace NSQLComplete; +class TFailableCache: public ICache<TString, TString> { +public: + NThreading::TFuture<TEntry> Get(const TString& key) const try { + if (IsGetFailing) { + ythrow yexception() << "O_O"; + } + return NThreading::MakeFuture<TEntry>({.Value = key, .IsExpired = IsExpired}); + } catch (...) { + return NThreading::MakeErrorFuture<TEntry>(std::current_exception()); + } + + NThreading::TFuture<void> Update(const TString& /* key */, TString /* value */) const try { + if (IsUpdateFailing) { + ythrow yexception() << "O_O"; + } + return NThreading::MakeFuture(); + } catch (...) { + return NThreading::MakeErrorFuture<void>(std::current_exception()); + } + + bool IsGetFailing = false; + bool IsExpired = false; + bool IsUpdateFailing = false; +}; + Y_UNIT_TEST_SUITE(CachedQueryTests) { + Y_UNIT_TEST(OnEmpty_WhenGet_ThenWaitUntilReceived) { + auto cache = MakeLocalCache<TString, TString>( + NMonotonic::CreateDefaultMonotonicTimeProvider(), {.TTL = TDuration::Zero()}); + auto cached = TCachedQuery<TString, TString>(cache, [&](const TString& key) { + return NThreading::MakeFuture<TString>(key); + }); + + TString value = cached("1").GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL(value, "1"); + } + Y_UNIT_TEST(OnExpired_WhenApplied_ThenDefferedUpdateAndReturnOld) { size_t queried = 0; auto cache = MakeLocalCache<TString, TString>( @@ -27,4 +64,52 @@ Y_UNIT_TEST_SUITE(CachedQueryTests) { UNIT_ASSERT_VALUES_EQUAL(cached("1").GetValueSync(), "1"); } + Y_UNIT_TEST(OnQueryError_WhenApplied_ThenNoDeadlock) { + size_t queried = 0; + auto cache = MakeLocalCache<TString, TString>( + NMonotonic::CreateDefaultMonotonicTimeProvider(), {.TTL = TDuration::Zero()}); + auto cached = TCachedQuery<TString, TString>(cache, [&](const TString&) { + queried += 1; + try { + ythrow yexception() << "T_T"; + } catch (...) { + return NThreading::MakeErrorFuture<TString>(std::current_exception()); + } + }); + + UNIT_ASSERT_EXCEPTION(cached("1").GetValueSync(), yexception); + UNIT_ASSERT_EXCEPTION(cached("1").GetValueSync(), yexception); + UNIT_ASSERT_VALUES_EQUAL(queried, 2); + } + + Y_UNIT_TEST(OnFailingCacheGet_WhenApplied_ThenNoDeadlock) { + size_t queried = 0; + auto cache = MakeIntrusive<TFailableCache>(); + auto cached = TCachedQuery<TString, TString>(cache, [&](const TString& key) { + queried += 1; + return NThreading::MakeFuture<TString>(key); + }); + + cache->IsGetFailing = true; + + UNIT_ASSERT_EXCEPTION(cached("1").GetValueSync(), yexception); + UNIT_ASSERT_VALUES_EQUAL(queried, 0); + } + + Y_UNIT_TEST(OnFailingCacheUpdate_WhenApplied_ThenNoErrorAndNotCached) { + size_t queried = 0; + auto cache = MakeIntrusive<TFailableCache>(); + auto cached = TCachedQuery<TString, TString>(cache, [&](const TString& key) { + queried += 1; + return NThreading::MakeFuture<TString>(key); + }); + + cache->IsExpired = true; + cache->IsUpdateFailing = true; + + UNIT_ASSERT_VALUES_EQUAL(cached("1").GetValueSync(), "1"); + UNIT_ASSERT_VALUES_EQUAL(cached("1").GetValueSync(), "1"); + UNIT_ASSERT_VALUES_EQUAL(queried, 2); + } + } // Y_UNIT_TEST_SUITE(CachedQueryTests) diff --git a/yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp b/yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp index 0823991e2d4..c518dadf271 100644 --- a/yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp +++ b/yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp @@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(LocalCacheTests) { auto entry = cache->Get("1").GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(entry.Value, ""); + UNIT_ASSERT_VALUES_EQUAL(entry.Value, Nothing()); UNIT_ASSERT_VALUES_EQUAL(entry.IsExpired, true); } diff --git a/yql/essentials/sql/v1/complete/name/service/impatient/name_service.cpp b/yql/essentials/sql/v1/complete/name/service/impatient/name_service.cpp new file mode 100644 index 00000000000..9d7dbe28307 --- /dev/null +++ b/yql/essentials/sql/v1/complete/name/service/impatient/name_service.cpp @@ -0,0 +1,32 @@ +#include "name_service.h" + +namespace NSQLComplete { + + namespace { + + class TNameService: public INameService { + public: + explicit TNameService(INameService::TPtr origin) + : Origin_(std::move(origin)) + { + } + + NThreading::TFuture<TNameResponse> Lookup(TNameRequest request) const override { + auto future = Origin_->Lookup(std::move(request)); + if (future.IsReady()) { + return future; + } + return NThreading::MakeFuture<TNameResponse>({}); + } + + private: + INameService::TPtr Origin_; + }; + + } // namespace + + INameService::TPtr MakeImpatientNameService(INameService::TPtr origin) { + return new TNameService(std::move(origin)); + } + +} // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/name/service/impatient/name_service.h b/yql/essentials/sql/v1/complete/name/service/impatient/name_service.h new file mode 100644 index 00000000000..b6f02508118 --- /dev/null +++ b/yql/essentials/sql/v1/complete/name/service/impatient/name_service.h @@ -0,0 +1,9 @@ +#pragma once + +#include <yql/essentials/sql/v1/complete/name/service/name_service.h> + +namespace NSQLComplete { + + INameService::TPtr MakeImpatientNameService(INameService::TPtr origin); + +} // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/name/service/impatient/ya.make b/yql/essentials/sql/v1/complete/name/service/impatient/ya.make new file mode 100644 index 00000000000..96fc40ed74b --- /dev/null +++ b/yql/essentials/sql/v1/complete/name/service/impatient/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + name_service.cpp +) + +PEERDIR( + yql/essentials/sql/v1/complete/name/service +) + +END() diff --git a/yql/essentials/sql/v1/complete/name/service/ya.make b/yql/essentials/sql/v1/complete/name/service/ya.make index 2f8118f882d..75a55e0c71f 100644 --- a/yql/essentials/sql/v1/complete/name/service/ya.make +++ b/yql/essentials/sql/v1/complete/name/service/ya.make @@ -14,6 +14,7 @@ END() RECURSE( binding cluster + impatient ranking schema static diff --git a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp index 6d6d91d0dbb..3440fd16c20 100644 --- a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp +++ b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp @@ -10,6 +10,7 @@ #include <yql/essentials/sql/v1/complete/name/service/ranking/frequency.h> #include <yql/essentials/sql/v1/complete/name/service/ranking/ranking.h> #include <yql/essentials/sql/v1/complete/name/service/cluster/name_service.h> +#include <yql/essentials/sql/v1/complete/name/service/impatient/name_service.h> #include <yql/essentials/sql/v1/complete/name/service/schema/name_service.h> #include <yql/essentials/sql/v1/complete/name/service/static/name_service.h> #include <yql/essentials/sql/v1/complete/name/service/union/name_service.h> @@ -131,8 +132,13 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) { TVector<INameService::TPtr> children = { MakeStaticNameService(std::move(names), frequency), - MakeSchemaNameService(MakeSimpleSchema(MakeStaticSimpleSchema(clustersJson))), - MakeClusterNameService(MakeStaticClusterDiscovery(std::move(clusters))), + MakeImpatientNameService( + MakeSchemaNameService( + MakeSimpleSchema( + MakeStaticSimpleSchema(clustersJson)))), + MakeImpatientNameService( + MakeClusterNameService( + MakeStaticClusterDiscovery(std::move(clusters)))), }; INameService::TPtr service = MakeUnionNameService( std::move(children), MakeDefaultRanking(frequency)); @@ -1411,10 +1417,6 @@ JOIN yt:$cluster_name.test; TVector<TCandidate> aliceExpected = {{TableName, "`alice`"}}; TVector<TCandidate> petyaExpected = {{TableName, "`petya`"}}; - // Cache is empty - UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT * FROM "), empty); - UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT * FROM "), empty); - // Updates in backround UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT * FROM "), aliceExpected); UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT * FROM "), petyaExpected); @@ -1423,10 +1425,6 @@ JOIN yt:$cluster_name.test; TVector<TCandidate> aliceExpected = {{ColumnName, "alice"}}; TVector<TCandidate> petyaExpected = {{ColumnName, "petya"}}; - // Cache is empty - UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT a# FROM alice"), empty); - UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT p# FROM petya"), empty); - // Updates in backround UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT a# FROM alice"), aliceExpected); UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT p# FROM petya"), petyaExpected); diff --git a/yql/essentials/sql/v1/complete/ut/ya.make b/yql/essentials/sql/v1/complete/ut/ya.make index 0d2afc80d89..d2deebbc0ad 100644 --- a/yql/essentials/sql/v1/complete/ut/ya.make +++ b/yql/essentials/sql/v1/complete/ut/ya.make @@ -13,6 +13,7 @@ PEERDIR( yql/essentials/sql/v1/complete/name/object/simple/cached yql/essentials/sql/v1/complete/name/object/simple/static yql/essentials/sql/v1/complete/name/service/cluster + yql/essentials/sql/v1/complete/name/service/impatient yql/essentials/sql/v1/complete/name/service/schema yql/essentials/sql/v1/complete/name/service/static yql/essentials/sql/v1/complete/name/service/union |