diff options
| author | robot-piglet <[email protected]> | 2025-06-23 10:49:41 +0300 | 
|---|---|---|
| committer | robot-piglet <[email protected]> | 2025-06-23 11:05:58 +0300 | 
| commit | bb0b9384b505c7ce8664a29174f1a89f6e72710b (patch) | |
| tree | 554e9a763304f304f1cbf5e9c3acc55e6033221b /yql/essentials/sql/v1 | |
| parent | 6d759dd686592eca66abca40d9e4de3bebe5ad14 (diff) | |
Intermediate changes
commit_hash:70eea6b3e17f4aafae5eadb49724ba9036a55372
Diffstat (limited to 'yql/essentials/sql/v1')
10 files changed, 188 insertions, 21 deletions
diff --git a/yql/essentials/sql/v1/complete/name/cache/cache.h b/yql/essentials/sql/v1/complete/name/cache/cache.h index d65b41d5636..357989edc50 100644 --- a/yql/essentials/sql/v1/complete/name/cache/cache.h +++ b/yql/essentials/sql/v1/complete/name/cache/cache.h @@ -30,7 +30,7 @@ namespace NSQLComplete {          using TPtr = TIntrusivePtr<ICache>;          struct TEntry { -            TValue Value = {}; +            TMaybe<TValue> Value = Nothing();              bool IsExpired = true;          }; diff --git a/yql/essentials/sql/v1/complete/name/cache/cached.h b/yql/essentials/sql/v1/complete/name/cache/cached.h index b265f6fde13..80edb159e5a 100644 --- a/yql/essentials/sql/v1/complete/name/cache/cached.h +++ b/yql/essentials/sql/v1/complete/name/cache/cached.h @@ -16,17 +16,47 @@ namespace NSQLComplete {          }          NThreading::TFuture<TValue> operator()(TKey key) const { -            return Cache_->Get(key).Apply([cache = Cache_, -                                           query = Query_, -                                           key = std::move(key)](auto f) { -                typename ICache<TKey, TValue>::TEntry entry = f.ExtractValue(); -                if (entry.IsExpired) { -                    query(key).Apply([cache, key = std::move(key)](auto f) { -                        cache->Update(key, f.ExtractValue()); -                    }); +            auto promise = NThreading::NewPromise<TValue>(); +            Cache_->Get(key).Apply([cache = Cache_, +                                    query = Query_, +                                    key = std::move(key), +                                    promise](auto f) mutable { +                typename ICache<TKey, TValue>::TEntry entry; +                try { +                    entry = f.ExtractValue(); +                } catch (...) { +                    promise.SetException(std::current_exception()); +                    return;                  } -                return std::move(entry.Value); + +                if (!entry.IsExpired) { +                    Y_ENSURE(entry.Value.Defined()); +                    promise.SetValue(std::move(*entry.Value)); +                    return; +                } + +                bool isEmpty = entry.Value.Empty(); +                if (!isEmpty) { +                    promise.SetValue(std::move(*entry.Value)); +                } + +                query(key).Apply([cache, key = std::move(key), isEmpty, promise](auto f) mutable { +                    TValue value; +                    try { +                        value = f.ExtractValue(); +                    } catch (...) { +                        promise.SetException(std::current_exception()); +                        return; +                    } + +                    if (isEmpty) { +                        promise.SetValue(value); +                    } + +                    cache->Update(key, std::move(value)); +                });              }); +            return promise;          }      private: diff --git a/yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp b/yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp index 18607dea438..912b104d781 100644 --- a/yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp +++ b/yql/essentials/sql/v1/complete/name/cache/cached_ut.cpp @@ -8,8 +8,45 @@  using namespace NSQLComplete; +class TFailableCache: public ICache<TString, TString> { +public: +    NThreading::TFuture<TEntry> Get(const TString& key) const try { +        if (IsGetFailing) { +            ythrow yexception() << "O_O"; +        } +        return NThreading::MakeFuture<TEntry>({.Value = key, .IsExpired = IsExpired}); +    } catch (...) { +        return NThreading::MakeErrorFuture<TEntry>(std::current_exception()); +    } + +    NThreading::TFuture<void> Update(const TString& /* key */, TString /* value */) const try { +        if (IsUpdateFailing) { +            ythrow yexception() << "O_O"; +        } +        return NThreading::MakeFuture(); +    } catch (...) { +        return NThreading::MakeErrorFuture<void>(std::current_exception()); +    } + +    bool IsGetFailing = false; +    bool IsExpired = false; +    bool IsUpdateFailing = false; +}; +  Y_UNIT_TEST_SUITE(CachedQueryTests) { +    Y_UNIT_TEST(OnEmpty_WhenGet_ThenWaitUntilReceived) { +        auto cache = MakeLocalCache<TString, TString>( +            NMonotonic::CreateDefaultMonotonicTimeProvider(), {.TTL = TDuration::Zero()}); +        auto cached = TCachedQuery<TString, TString>(cache, [&](const TString& key) { +            return NThreading::MakeFuture<TString>(key); +        }); + +        TString value = cached("1").GetValueSync(); + +        UNIT_ASSERT_VALUES_EQUAL(value, "1"); +    } +      Y_UNIT_TEST(OnExpired_WhenApplied_ThenDefferedUpdateAndReturnOld) {          size_t queried = 0;          auto cache = MakeLocalCache<TString, TString>( @@ -27,4 +64,52 @@ Y_UNIT_TEST_SUITE(CachedQueryTests) {          UNIT_ASSERT_VALUES_EQUAL(cached("1").GetValueSync(), "1");      } +    Y_UNIT_TEST(OnQueryError_WhenApplied_ThenNoDeadlock) { +        size_t queried = 0; +        auto cache = MakeLocalCache<TString, TString>( +            NMonotonic::CreateDefaultMonotonicTimeProvider(), {.TTL = TDuration::Zero()}); +        auto cached = TCachedQuery<TString, TString>(cache, [&](const TString&) { +            queried += 1; +            try { +                ythrow yexception() << "T_T"; +            } catch (...) { +                return NThreading::MakeErrorFuture<TString>(std::current_exception()); +            } +        }); + +        UNIT_ASSERT_EXCEPTION(cached("1").GetValueSync(), yexception); +        UNIT_ASSERT_EXCEPTION(cached("1").GetValueSync(), yexception); +        UNIT_ASSERT_VALUES_EQUAL(queried, 2); +    } + +    Y_UNIT_TEST(OnFailingCacheGet_WhenApplied_ThenNoDeadlock) { +        size_t queried = 0; +        auto cache = MakeIntrusive<TFailableCache>(); +        auto cached = TCachedQuery<TString, TString>(cache, [&](const TString& key) { +            queried += 1; +            return NThreading::MakeFuture<TString>(key); +        }); + +        cache->IsGetFailing = true; + +        UNIT_ASSERT_EXCEPTION(cached("1").GetValueSync(), yexception); +        UNIT_ASSERT_VALUES_EQUAL(queried, 0); +    } + +    Y_UNIT_TEST(OnFailingCacheUpdate_WhenApplied_ThenNoErrorAndNotCached) { +        size_t queried = 0; +        auto cache = MakeIntrusive<TFailableCache>(); +        auto cached = TCachedQuery<TString, TString>(cache, [&](const TString& key) { +            queried += 1; +            return NThreading::MakeFuture<TString>(key); +        }); + +        cache->IsExpired = true; +        cache->IsUpdateFailing = true; + +        UNIT_ASSERT_VALUES_EQUAL(cached("1").GetValueSync(), "1"); +        UNIT_ASSERT_VALUES_EQUAL(cached("1").GetValueSync(), "1"); +        UNIT_ASSERT_VALUES_EQUAL(queried, 2); +    } +  } // Y_UNIT_TEST_SUITE(CachedQueryTests) diff --git a/yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp b/yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp index 0823991e2d4..c518dadf271 100644 --- a/yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp +++ b/yql/essentials/sql/v1/complete/name/cache/local/cache_ut.cpp @@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(LocalCacheTests) {          auto entry = cache->Get("1").GetValueSync(); -        UNIT_ASSERT_VALUES_EQUAL(entry.Value, ""); +        UNIT_ASSERT_VALUES_EQUAL(entry.Value, Nothing());          UNIT_ASSERT_VALUES_EQUAL(entry.IsExpired, true);      } diff --git a/yql/essentials/sql/v1/complete/name/service/impatient/name_service.cpp b/yql/essentials/sql/v1/complete/name/service/impatient/name_service.cpp new file mode 100644 index 00000000000..9d7dbe28307 --- /dev/null +++ b/yql/essentials/sql/v1/complete/name/service/impatient/name_service.cpp @@ -0,0 +1,32 @@ +#include "name_service.h" + +namespace NSQLComplete { + +    namespace { + +        class TNameService: public INameService { +        public: +            explicit TNameService(INameService::TPtr origin) +                : Origin_(std::move(origin)) +            { +            } + +            NThreading::TFuture<TNameResponse> Lookup(TNameRequest request) const override { +                auto future = Origin_->Lookup(std::move(request)); +                if (future.IsReady()) { +                    return future; +                } +                return NThreading::MakeFuture<TNameResponse>({}); +            } + +        private: +            INameService::TPtr Origin_; +        }; + +    } // namespace + +    INameService::TPtr MakeImpatientNameService(INameService::TPtr origin) { +        return new TNameService(std::move(origin)); +    } + +} // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/name/service/impatient/name_service.h b/yql/essentials/sql/v1/complete/name/service/impatient/name_service.h new file mode 100644 index 00000000000..b6f02508118 --- /dev/null +++ b/yql/essentials/sql/v1/complete/name/service/impatient/name_service.h @@ -0,0 +1,9 @@ +#pragma once + +#include <yql/essentials/sql/v1/complete/name/service/name_service.h> + +namespace NSQLComplete { + +    INameService::TPtr MakeImpatientNameService(INameService::TPtr origin); + +} // namespace NSQLComplete diff --git a/yql/essentials/sql/v1/complete/name/service/impatient/ya.make b/yql/essentials/sql/v1/complete/name/service/impatient/ya.make new file mode 100644 index 00000000000..96fc40ed74b --- /dev/null +++ b/yql/essentials/sql/v1/complete/name/service/impatient/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( +    name_service.cpp +) + +PEERDIR( +    yql/essentials/sql/v1/complete/name/service +) + +END() diff --git a/yql/essentials/sql/v1/complete/name/service/ya.make b/yql/essentials/sql/v1/complete/name/service/ya.make index 2f8118f882d..75a55e0c71f 100644 --- a/yql/essentials/sql/v1/complete/name/service/ya.make +++ b/yql/essentials/sql/v1/complete/name/service/ya.make @@ -14,6 +14,7 @@ END()  RECURSE(      binding      cluster +    impatient      ranking      schema      static diff --git a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp index 6d6d91d0dbb..3440fd16c20 100644 --- a/yql/essentials/sql/v1/complete/sql_complete_ut.cpp +++ b/yql/essentials/sql/v1/complete/sql_complete_ut.cpp @@ -10,6 +10,7 @@  #include <yql/essentials/sql/v1/complete/name/service/ranking/frequency.h>  #include <yql/essentials/sql/v1/complete/name/service/ranking/ranking.h>  #include <yql/essentials/sql/v1/complete/name/service/cluster/name_service.h> +#include <yql/essentials/sql/v1/complete/name/service/impatient/name_service.h>  #include <yql/essentials/sql/v1/complete/name/service/schema/name_service.h>  #include <yql/essentials/sql/v1/complete/name/service/static/name_service.h>  #include <yql/essentials/sql/v1/complete/name/service/union/name_service.h> @@ -131,8 +132,13 @@ Y_UNIT_TEST_SUITE(SqlCompleteTests) {          TVector<INameService::TPtr> children = {              MakeStaticNameService(std::move(names), frequency), -            MakeSchemaNameService(MakeSimpleSchema(MakeStaticSimpleSchema(clustersJson))), -            MakeClusterNameService(MakeStaticClusterDiscovery(std::move(clusters))), +            MakeImpatientNameService( +                MakeSchemaNameService( +                    MakeSimpleSchema( +                        MakeStaticSimpleSchema(clustersJson)))), +            MakeImpatientNameService( +                MakeClusterNameService( +                    MakeStaticClusterDiscovery(std::move(clusters)))),          };          INameService::TPtr service = MakeUnionNameService(              std::move(children), MakeDefaultRanking(frequency)); @@ -1411,10 +1417,6 @@ JOIN yt:$cluster_name.test;              TVector<TCandidate> aliceExpected = {{TableName, "`alice`"}};              TVector<TCandidate> petyaExpected = {{TableName, "`petya`"}}; -            // Cache is empty -            UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT * FROM "), empty); -            UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT * FROM "), empty); -              // Updates in backround              UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT * FROM "), aliceExpected);              UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT * FROM "), petyaExpected); @@ -1423,10 +1425,6 @@ JOIN yt:$cluster_name.test;              TVector<TCandidate> aliceExpected = {{ColumnName, "alice"}};              TVector<TCandidate> petyaExpected = {{ColumnName, "petya"}}; -            // Cache is empty -            UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT a# FROM alice"), empty); -            UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT p# FROM petya"), empty); -              // Updates in backround              UNIT_ASSERT_VALUES_EQUAL(Complete(aliceEngine, "SELECT a# FROM alice"), aliceExpected);              UNIT_ASSERT_VALUES_EQUAL(Complete(petyaEngine, "SELECT p# FROM petya"), petyaExpected); diff --git a/yql/essentials/sql/v1/complete/ut/ya.make b/yql/essentials/sql/v1/complete/ut/ya.make index 0d2afc80d89..d2deebbc0ad 100644 --- a/yql/essentials/sql/v1/complete/ut/ya.make +++ b/yql/essentials/sql/v1/complete/ut/ya.make @@ -13,6 +13,7 @@ PEERDIR(      yql/essentials/sql/v1/complete/name/object/simple/cached      yql/essentials/sql/v1/complete/name/object/simple/static      yql/essentials/sql/v1/complete/name/service/cluster +    yql/essentials/sql/v1/complete/name/service/impatient      yql/essentials/sql/v1/complete/name/service/schema      yql/essentials/sql/v1/complete/name/service/static      yql/essentials/sql/v1/complete/name/service/union  | 
