diff options
author | nusratbek <nusratbek@yandex-team.ru> | 2022-02-10 16:52:24 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:52:24 +0300 |
commit | 9ef43d18a5630a61d0f746fdbd86ac344db72829 (patch) | |
tree | 3c0c7f84262a5dbdc4d112d177301af7c1a423d9 | |
parent | a815b0df01710081be7ee69fbb91ca234f0af5dc (diff) | |
download | ydb-9ef43d18a5630a61d0f746fdbd86ac344db72829.tar.gz |
Restoring authorship annotation for <nusratbek@yandex-team.ru>. Commit 1 of 2.
-rw-r--r-- | ydb/public/sdk/cpp/client/extensions/solomon_stats/README.md | 58 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.cpp | 72 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.h | 96 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/extensions/solomon_stats/ya.make | 30 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h | 520 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_extension/extension.cpp | 44 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_extension/extension.h | 44 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_table/table.cpp | 276 | ||||
-rw-r--r-- | ydb/services/ydb/ut/ya.make | 18 | ||||
-rw-r--r-- | ydb/services/ydb/ydb_stats_ut.cpp | 564 |
10 files changed, 861 insertions, 861 deletions
diff --git a/ydb/public/sdk/cpp/client/extensions/solomon_stats/README.md b/ydb/public/sdk/cpp/client/extensions/solomon_stats/README.md index 869678d8dd7..ac6f9e0bd42 100644 --- a/ydb/public/sdk/cpp/client/extensions/solomon_stats/README.md +++ b/ydb/public/sdk/cpp/client/extensions/solomon_stats/README.md @@ -1,5 +1,5 @@ # YDB C++ SDK Metrics - + You can plug in YDB C++ SDK extension to monitor how your application interacts with YDB. In particular you can monitor: - Transport errors; - Per host messages in fligh; @@ -8,11 +8,11 @@ You can plug in YDB C++ SDK extension to monitor how your application interacts - Number of sessions per host; - Number of server endpoints available; - Query size, latency, etc. - + You can get these metrics via http server provided by YDB C++ SDK or implement your own MetricRegistry if it more convenient. ## Setting up Solomon Monitoring - + > This is Yandex specific section for setting up internal monitoring called Solomon ### Setup Solomon Environment @@ -20,45 +20,45 @@ TSolomonStatPullExtension class allows you to quickly setup you application moni [Create project, cluster and service, and connect them.](https://wiki.yandex-team.ru/solomon/howtostart/). Add hostnames which run your application to **Cluster hosts** (use hostnames without "http://"). Solomon's fetcher will use **Port** field for all added hosts. Fill in the following params: -- **Monitoring model** : **PULL**; -- **URL Path** : **/stats**; -- **Interval** : **10**; +- **Monitoring model** : **PULL**; +- **URL Path** : **/stats**; +- **Interval** : **10**; - **Port** : a port of http server YDB SDK runs. - + ### Setup TSolomonStatPullExtension in Your Application Code - + After creating NYdb::TDriver you need to add Solomon Monitoring extension. If you set up incorrect hostname or port **TSystemError** exception will be thrown. - + > **Important**: you must plug in monitoring before driver creation. -```cl +```cl #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.h> - -... - -{ + +... + +{ auto config = NYdb::TDriverConfig(); - NYdb::TDriver driver(config); - try { - + NYdb::TDriver driver(config); + try { + const TString host = ... // use hostname without http:// - const ui64 port = ... + const ui64 port = ... const TString project = ... // Solomon's project id const TString service = ... // Solomon's service id const TString cluster = ... // Solomon's cluster id - NSolomonStatExtension::TSolomonStatPullExtension::TParams params(host, port, project, service, cluster); - driver.AddExtension<NSolomonStatExtension::TSolomonStatPullExtension>(params); - - } catch (TSystemError& error) { - ... - } -} - -``` - + NSolomonStatExtension::TSolomonStatPullExtension::TParams params(host, port, project, service, cluster); + driver.AddExtension<NSolomonStatExtension::TSolomonStatPullExtension>(params); + + } catch (TSystemError& error) { + ... + } +} + +``` + ## Setup Monitoring of Your Choice - + Implementing NMonitoring::IMetricRegistry provides more flexibility. You can deliver application metrics to Prometheus or any other system of your choice, just register your specific NMonitoring::IMetricRegistry implementation via AddMetricRegistry function. > **Important**: you must plug in monitoring before driver creation. diff --git a/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.cpp b/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.cpp index ec7b0f0db68..bdad901dfd4 100644 --- a/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.cpp +++ b/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.cpp @@ -1,47 +1,47 @@ -#include "pull_client.h" - -namespace NSolomonStatExtension { - -TSolomonStatPullExtension::TParams::TParams(const TString& host - , ui16 port - , const TString& project - , const TString& service - , const TString& cluster - , const TVector<std::pair<TString, TString>>& labels) - : Host_(host), Port_(port), Labels_() +#include "pull_client.h" + +namespace NSolomonStatExtension { + +TSolomonStatPullExtension::TParams::TParams(const TString& host + , ui16 port + , const TString& project + , const TString& service + , const TString& cluster + , const TVector<std::pair<TString, TString>>& labels) + : Host_(host), Port_(port), Labels_() { Labels_.Add("project", project); Labels_.Add("service", service); Labels_.Add("cluster", cluster); for (const auto& label: labels) { Labels_.Add(label.first, label.second); - } + } } - + NMonitoring::TLabels TSolomonStatPullExtension::TParams::GetLabels() const { - return Labels_; -} - - -TSolomonStatPullExtension::TSolomonStatPage::TSolomonStatPage(const TString& title, const TString& path, IApi* api) - : NMonitoring::IMonPage(title, path), Api_(api) - { } - -void TSolomonStatPullExtension::TSolomonStatPage::Output(NMonitoring::IMonHttpRequest& request) { - request.Output() << NMonitoring::HTTPOKJSON; - auto json = NMonitoring::EncoderJson(&request.Output()); - Api_->Accept(json.Get()); -} - -TSolomonStatPullExtension::TSolomonStatPullExtension(const TSolomonStatPullExtension::TParams& params, IApi* api) + return Labels_; +} + + +TSolomonStatPullExtension::TSolomonStatPage::TSolomonStatPage(const TString& title, const TString& path, IApi* api) + : NMonitoring::IMonPage(title, path), Api_(api) + { } + +void TSolomonStatPullExtension::TSolomonStatPage::Output(NMonitoring::IMonHttpRequest& request) { + request.Output() << NMonitoring::HTTPOKJSON; + auto json = NMonitoring::EncoderJson(&request.Output()); + Api_->Accept(json.Get()); +} + +TSolomonStatPullExtension::TSolomonStatPullExtension(const TSolomonStatPullExtension::TParams& params, IApi* api) : MetricRegistry_(new NMonitoring::TMetricRegistry(params.GetLabels())) , MonService_(params.Port_, params.Host_, 0), Page_( new TSolomonStatPage("stats", "Statistics", api) ) { api->SetMetricRegistry(MetricRegistry_.get()); - MonService_.Register(Page_); - MonService_.StartOrThrow(); - } - -TSolomonStatPullExtension::~TSolomonStatPullExtension() - { } - -} // NSolomonStatExtension + MonService_.Register(Page_); + MonService_.StartOrThrow(); + } + +TSolomonStatPullExtension::~TSolomonStatPullExtension() + { } + +} // NSolomonStatExtension diff --git a/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.h b/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.h index 8d410c2fa2e..ce1d9f994b9 100644 --- a/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.h +++ b/ydb/public/sdk/cpp/client/extensions/solomon_stats/pull_client.h @@ -1,5 +1,5 @@ -#pragma once - +#pragma once + #include <ydb/public/sdk/cpp/client/ydb_extension/extension.h> #include <library/cpp/http/server/response.h> @@ -8,53 +8,53 @@ #include <library/cpp/monlib/metrics/metric_registry.h> #include <library/cpp/monlib/service/pages/mon_page.h> #include <library/cpp/monlib/service/monservice.h> - + #include <util/generic/vector.h> -namespace NSolomonStatExtension { - -class TSolomonStatPullExtension: public NYdb::IExtension { -public: - using IApi = NYdb::NSdkStats::IStatApi; - - class TParams { - friend class TSolomonStatPullExtension; - - public: - TParams(const TString& host - , ui16 port - , const TString& project - , const TString& service - , const TString& cluster - , const TVector<std::pair<TString, TString>>& labels = {}); - +namespace NSolomonStatExtension { + +class TSolomonStatPullExtension: public NYdb::IExtension { +public: + using IApi = NYdb::NSdkStats::IStatApi; + + class TParams { + friend class TSolomonStatPullExtension; + + public: + TParams(const TString& host + , ui16 port + , const TString& project + , const TString& service + , const TString& cluster + , const TVector<std::pair<TString, TString>>& labels = {}); + NMonitoring::TLabels GetLabels() const; - - private: - const TString Host_; - ui16 Port_; - NMonitoring::TLabels Labels_; - }; - - TSolomonStatPullExtension(const TParams& params, IApi* api); - ~TSolomonStatPullExtension(); - -private: - class TSolomonStatPage: public NMonitoring::IMonPage { - friend class TSolomonStatPullExtension; - public: - TSolomonStatPage(const TString& title, const TString& path, IApi* api); - - void Output(NMonitoring::IMonHttpRequest& request) override ; - - private: - IApi* Api_; - }; - -private: + + private: + const TString Host_; + ui16 Port_; + NMonitoring::TLabels Labels_; + }; + + TSolomonStatPullExtension(const TParams& params, IApi* api); + ~TSolomonStatPullExtension(); + +private: + class TSolomonStatPage: public NMonitoring::IMonPage { + friend class TSolomonStatPullExtension; + public: + TSolomonStatPage(const TString& title, const TString& path, IApi* api); + + void Output(NMonitoring::IMonHttpRequest& request) override ; + + private: + IApi* Api_; + }; + +private: std::shared_ptr<NMonitoring::TMetricRegistry> MetricRegistry_; - NMonitoring::TMonService2 MonService_; - TIntrusivePtr<TSolomonStatPage> Page_; -}; - -} // namespace NSolomonStatExtension + NMonitoring::TMonService2 MonService_; + TIntrusivePtr<TSolomonStatPage> Page_; +}; + +} // namespace NSolomonStatExtension diff --git a/ydb/public/sdk/cpp/client/extensions/solomon_stats/ya.make b/ydb/public/sdk/cpp/client/extensions/solomon_stats/ya.make index 18fa0b584fd..1f37285d583 100644 --- a/ydb/public/sdk/cpp/client/extensions/solomon_stats/ya.make +++ b/ydb/public/sdk/cpp/client/extensions/solomon_stats/ya.make @@ -1,21 +1,21 @@ -LIBRARY() - -OWNER( - dcherednik - g:kikimr -) - -SRCS( - pull_client.cpp +LIBRARY() + +OWNER( + dcherednik + g:kikimr +) + +SRCS( + pull_client.cpp pull_connector.cpp -) - -PEERDIR( +) + +PEERDIR( library/cpp/monlib/encode/json library/cpp/monlib/metrics library/cpp/monlib/service library/cpp/monlib/service/pages ydb/public/sdk/cpp/client/ydb_extension -) - -END() +) + +END() diff --git a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h index cae40996824..c9f31083ef4 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_stats/stats.h @@ -1,4 +1,4 @@ -#pragma once +#pragma once #include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/common/type_switcher.h> @@ -7,15 +7,15 @@ #include <library/cpp/monlib/metrics/metric_registry.h> #include <library/cpp/monlib/metrics/histogram_collector.h> -#include <util/string/builder.h> - +#include <util/string/builder.h> + #include <atomic> #include <memory> -namespace NYdb { - -namespace NSdkStats { - +namespace NYdb { + +namespace NSdkStats { + // works only for case normal (foo_bar) underscore inline TStringType UnderscoreToUpperCamel(const TStringType& in) { @@ -43,18 +43,18 @@ inline TStringType UnderscoreToUpperCamel(const TStringType& in) { return result; } -template<typename TPointer> -class TAtomicPointer { -public: - - TAtomicPointer(TPointer* pointer = nullptr) { - Set(pointer); - } - +template<typename TPointer> +class TAtomicPointer { +public: + + TAtomicPointer(TPointer* pointer = nullptr) { + Set(pointer); + } + TAtomicPointer(const TAtomicPointer& other) { Set(other.Get()); - } - + } + TAtomicPointer& operator=(const TAtomicPointer& other) { Set(other.Get()); return *this; @@ -64,202 +64,202 @@ public: return Pointer_.load(); } - void Set(TPointer* pointer) { + void Set(TPointer* pointer) { Pointer_.store(pointer); - } - -private: + } + +private: std::atomic<TPointer*> Pointer_; -}; - -template<typename TPointer> -class TAtomicCounter: public TAtomicPointer<TPointer> { - public: - void Add(ui64 value) { - if (auto counter = this->Get()) { - counter->Add(value); - } - } - - void Inc() { - if (auto counter = this->Get()) { - counter->Inc(); - } - } - - void Dec() { - if (auto counter = this->Get()) { - counter->Dec(); - } - } - - void SetValue(ui64 value) { - if (auto counter = this->Get()) { - counter->Set(value); - } - } -}; - -template<typename TCounter> -class FastLocalCounter { -public: - FastLocalCounter(TAtomicCounter<TCounter>& counter) - : Counter(counter), Value(0) - { } - - ~FastLocalCounter() { - Counter.Add(Value); - } - - FastLocalCounter<TCounter>& operator++ () { - ++Value; - return *this; - } - - TAtomicCounter<TCounter>& Counter; - ui64 Value; -}; - -template<typename TPointer> -class TAtomicHistogram: public TAtomicPointer<TPointer> { -public: - - void Record(i64 value) { - if (auto histogram = this->Get()) { - histogram->Record(value); - } - } - - bool IsCollecting() { - return this->Get() != nullptr; - } -}; - -// Sessions count for all clients -// Every client has 3 TSessionCounter for active, in session pool, in settler sessions -// TSessionCounters in different clients with same role share one sensor -class TSessionCounter: public TAtomicPointer<NMonitoring::TIntGauge> { -public: - - // Call with mutex - void Apply(i64 newValue) { - if (auto gauge = this->Get()) { - gauge->Add(newValue - oldValue); - oldValue = newValue; - } - } - - ~TSessionCounter() { - NMonitoring::TIntGauge* gauge = this->Get(); - if (gauge) { - gauge->Add(-oldValue); - } - } - -private: - i64 oldValue = 0; -}; - -struct TStatCollector { +}; + +template<typename TPointer> +class TAtomicCounter: public TAtomicPointer<TPointer> { + public: + void Add(ui64 value) { + if (auto counter = this->Get()) { + counter->Add(value); + } + } + + void Inc() { + if (auto counter = this->Get()) { + counter->Inc(); + } + } + + void Dec() { + if (auto counter = this->Get()) { + counter->Dec(); + } + } + + void SetValue(ui64 value) { + if (auto counter = this->Get()) { + counter->Set(value); + } + } +}; + +template<typename TCounter> +class FastLocalCounter { +public: + FastLocalCounter(TAtomicCounter<TCounter>& counter) + : Counter(counter), Value(0) + { } + + ~FastLocalCounter() { + Counter.Add(Value); + } + + FastLocalCounter<TCounter>& operator++ () { + ++Value; + return *this; + } + + TAtomicCounter<TCounter>& Counter; + ui64 Value; +}; + +template<typename TPointer> +class TAtomicHistogram: public TAtomicPointer<TPointer> { +public: + + void Record(i64 value) { + if (auto histogram = this->Get()) { + histogram->Record(value); + } + } + + bool IsCollecting() { + return this->Get() != nullptr; + } +}; + +// Sessions count for all clients +// Every client has 3 TSessionCounter for active, in session pool, in settler sessions +// TSessionCounters in different clients with same role share one sensor +class TSessionCounter: public TAtomicPointer<NMonitoring::TIntGauge> { +public: + + // Call with mutex + void Apply(i64 newValue) { + if (auto gauge = this->Get()) { + gauge->Add(newValue - oldValue); + oldValue = newValue; + } + } + + ~TSessionCounter() { + NMonitoring::TIntGauge* gauge = this->Get(); + if (gauge) { + gauge->Add(-oldValue); + } + } + +private: + i64 oldValue = 0; +}; + +struct TStatCollector { using TMetricRegistry = NMonitoring::TMetricRegistry; - -public: - - struct TEndpointElectorStatCollector { - - TEndpointElectorStatCollector(NMonitoring::TIntGauge* endpointCount = nullptr - , NMonitoring::TIntGauge* pessimizationRatio = nullptr - , NMonitoring::TIntGauge* activeEndpoints = nullptr) - : EndpointCount(endpointCount) - , PessimizationRatio(pessimizationRatio) - , EndpointActive(activeEndpoints) - { } - - NMonitoring::TIntGauge* EndpointCount; - NMonitoring::TIntGauge* PessimizationRatio; - NMonitoring::TIntGauge* EndpointActive; - }; - - struct TSessionPoolStatCollector { - - enum class EStatCollectorType: size_t { - SESSIONPOOL, - SETTLERPOOL - }; - - TSessionPoolStatCollector(NMonitoring::TIntGauge* activeSessions = nullptr - , NMonitoring::TIntGauge* inPoolSessions = nullptr + +public: + + struct TEndpointElectorStatCollector { + + TEndpointElectorStatCollector(NMonitoring::TIntGauge* endpointCount = nullptr + , NMonitoring::TIntGauge* pessimizationRatio = nullptr + , NMonitoring::TIntGauge* activeEndpoints = nullptr) + : EndpointCount(endpointCount) + , PessimizationRatio(pessimizationRatio) + , EndpointActive(activeEndpoints) + { } + + NMonitoring::TIntGauge* EndpointCount; + NMonitoring::TIntGauge* PessimizationRatio; + NMonitoring::TIntGauge* EndpointActive; + }; + + struct TSessionPoolStatCollector { + + enum class EStatCollectorType: size_t { + SESSIONPOOL, + SETTLERPOOL + }; + + TSessionPoolStatCollector(NMonitoring::TIntGauge* activeSessions = nullptr + , NMonitoring::TIntGauge* inPoolSessions = nullptr , NMonitoring::TRate* fakeSessions = nullptr) : ActiveSessions(activeSessions), InPoolSessions(inPoolSessions), FakeSessions(fakeSessions) - { } - - NMonitoring::TIntGauge* ActiveSessions; - NMonitoring::TIntGauge* InPoolSessions; + { } + + NMonitoring::TIntGauge* ActiveSessions; + NMonitoring::TIntGauge* InPoolSessions; NMonitoring::TRate* FakeSessions; - }; - - struct TClientRetryOperationStatCollector { - + }; + + struct TClientRetryOperationStatCollector { + TClientRetryOperationStatCollector() : MetricRegistry_(), Database_() {} - + TClientRetryOperationStatCollector(NMonitoring::TMetricRegistry* registry, const TStringType& database) : MetricRegistry_(registry), Database_(database) - { } - - void IncSyncRetryOperation(const EStatus& status) { + { } + + void IncSyncRetryOperation(const EStatus& status) { if (auto registry = MetricRegistry_.Get()) { TString statusName = TStringBuilder() << status; TString sensor = TStringBuilder() << "RetryOperation/" << UnderscoreToUpperCamel(statusName); registry->Rate({ {"database", Database_}, {"sensor", sensor} })->Inc(); - } - } - - void IncAsyncRetryOperation(const EStatus& status) { + } + } + + void IncAsyncRetryOperation(const EStatus& status) { if (auto registry = MetricRegistry_.Get()) { TString statusName = TStringBuilder() << status; TString sensor = TStringBuilder() << "RetryOperation/" << UnderscoreToUpperCamel(statusName); registry->Rate({ {"database", Database_}, {"sensor", sensor} })->Inc(); - } - } - - private: + } + } + + private: TAtomicPointer<NMonitoring::TMetricRegistry> MetricRegistry_; TStringType Database_; - }; - - struct TClientStatCollector { - + }; + + struct TClientStatCollector { + TClientStatCollector(NMonitoring::TRate* cacheMiss = nullptr - , NMonitoring::THistogram* querySize = nullptr - , NMonitoring::THistogram* paramsSize = nullptr + , NMonitoring::THistogram* querySize = nullptr + , NMonitoring::THistogram* paramsSize = nullptr , NMonitoring::TRate* sessionRemoved = nullptr , NMonitoring::TRate* requestMigrated = nullptr - , TClientRetryOperationStatCollector retryOperationStatCollector = TClientRetryOperationStatCollector()) + , TClientRetryOperationStatCollector retryOperationStatCollector = TClientRetryOperationStatCollector()) : CacheMiss(cacheMiss) , QuerySize(querySize) , ParamsSize(paramsSize) , SessionRemovedDueBalancing(sessionRemoved) , RequestMigrated(requestMigrated) , RetryOperationStatCollector(retryOperationStatCollector) - { } - + { } + NMonitoring::TRate* CacheMiss; - NMonitoring::THistogram* QuerySize; - NMonitoring::THistogram* ParamsSize; + NMonitoring::THistogram* QuerySize; + NMonitoring::THistogram* ParamsSize; NMonitoring::TRate* SessionRemovedDueBalancing; NMonitoring::TRate* RequestMigrated; - TClientRetryOperationStatCollector RetryOperationStatCollector; - }; - + TClientRetryOperationStatCollector RetryOperationStatCollector; + }; + TStatCollector(const TStringType& database, TMetricRegistry* sensorsRegistry) : Database_(database) , DatabaseLabel_({"database", database}) - { - if (sensorsRegistry) { + { + if (sensorsRegistry) { SetMetricRegistry(sensorsRegistry); - } - } - + } + } + void SetMetricRegistry(TMetricRegistry* sensorsRegistry) { Y_VERIFY(sensorsRegistry, "TMetricRegistry is null in stats collector."); MetricRegistryPtr_.Set(sensorsRegistry); @@ -280,53 +280,53 @@ public: GRpcInFlight_.Set(sensorsRegistry->IntGauge({ DatabaseLabel_, {"sensor", "Grpc/InFlight"} })); RequestLatency_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/Latency"} }, - NMonitoring::ExponentialHistogram(20, 2, 1))); + NMonitoring::ExponentialHistogram(20, 2, 1))); QuerySize_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/QuerySize"} }, - NMonitoring::ExponentialHistogram(20, 2, 32))); + NMonitoring::ExponentialHistogram(20, 2, 32))); ParamsSize_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/ParamsSize"} }, - NMonitoring::ExponentialHistogram(10, 2, 32))); + NMonitoring::ExponentialHistogram(10, 2, 32))); ResultSize_.Set(sensorsRegistry->HistogramRate({ DatabaseLabel_, {"sensor", "Request/ResultSize"} }, - NMonitoring::ExponentialHistogram(20, 2, 32))); - } - - void IncDiscoveryDuePessimization() { - DiscoveryDuePessimization_.Inc(); - } - - void IncDiscoveryDueExpiration() { - DiscoveryDueExpiration_.Inc(); - } - - void IncDiscoveryFailDueTransportError() { - DiscoveryFailDueTransportError_.Inc(); - } - - void IncReqFailQueueOverflow() { - RequestFailDueQueueOverflow_.Inc(); - } - - void IncReqFailNoEndpoint() { - RequestFailDueNoEndpoint_.Inc(); - } - - void IncReqFailDueTransportError() { - RequestFailDueTransportError_.Inc(); - } - - void IncRequestLatency(TDuration duration) { - RequestLatency_.Record(duration.MilliSeconds()); - } - - void IncResultSize(const size_t& size) { - ResultSize_.Record(size); - } - + NMonitoring::ExponentialHistogram(20, 2, 32))); + } + + void IncDiscoveryDuePessimization() { + DiscoveryDuePessimization_.Inc(); + } + + void IncDiscoveryDueExpiration() { + DiscoveryDueExpiration_.Inc(); + } + + void IncDiscoveryFailDueTransportError() { + DiscoveryFailDueTransportError_.Inc(); + } + + void IncReqFailQueueOverflow() { + RequestFailDueQueueOverflow_.Inc(); + } + + void IncReqFailNoEndpoint() { + RequestFailDueNoEndpoint_.Inc(); + } + + void IncReqFailDueTransportError() { + RequestFailDueTransportError_.Inc(); + } + + void IncRequestLatency(TDuration duration) { + RequestLatency_.Record(duration.MilliSeconds()); + } + + void IncResultSize(const size_t& size) { + ResultSize_.Record(size); + } + void IncCounter(const TStringType& sensor) { if (auto registry = MetricRegistryPtr_.Get()) { - registry->Counter({ {"database", Database_}, {"sensor", sensor} })->Inc(); - } - } - + registry->Counter({ {"database", Database_}, {"sensor", sensor} })->Inc(); + } + } + void SetSessionCV(ui32 cv) { SessionCV_.SetValue(cv); } @@ -339,47 +339,47 @@ public: GRpcInFlight_.Dec(); } - TEndpointElectorStatCollector GetEndpointElectorStatCollector() { + TEndpointElectorStatCollector GetEndpointElectorStatCollector() { if (auto registry = MetricRegistryPtr_.Get()) { auto endpointCoint = registry->IntGauge({ {"database", Database_}, {"sensor", "Endpoints/Total"} }); auto pessimizationRatio = registry->IntGauge({ {"database", Database_}, {"sensor", "Endpoints/BadRatio"} }); auto activeEndpoints = registry->IntGauge({ {"database", Database_}, {"sensor", "Endpoints/Good"} }); - return TEndpointElectorStatCollector(endpointCoint, pessimizationRatio, activeEndpoints); - } else { - return TEndpointElectorStatCollector(); - } - } - - TSessionPoolStatCollector GetSessionPoolStatCollector(TSessionPoolStatCollector::EStatCollectorType type) { - if (!IsCollecting()) { - return TSessionPoolStatCollector(); - } - - switch (type) { + return TEndpointElectorStatCollector(endpointCoint, pessimizationRatio, activeEndpoints); + } else { + return TEndpointElectorStatCollector(); + } + } + + TSessionPoolStatCollector GetSessionPoolStatCollector(TSessionPoolStatCollector::EStatCollectorType type) { + if (!IsCollecting()) { + return TSessionPoolStatCollector(); + } + + switch (type) { case TSessionPoolStatCollector::EStatCollectorType::SESSIONPOOL: return TSessionPoolStatCollector(ActiveSessions_.Get(), InPoolSessions_.Get(), FakeSessions_.Get()); - case TSessionPoolStatCollector::EStatCollectorType::SETTLERPOOL: + case TSessionPoolStatCollector::EStatCollectorType::SETTLERPOOL: return TSessionPoolStatCollector(nullptr, SettlerSessions_.Get(), nullptr); - } + } return TSessionPoolStatCollector(); - } - - TClientStatCollector GetClientStatCollector() { - if (IsCollecting()) { + } + + TClientStatCollector GetClientStatCollector() { + if (IsCollecting()) { return TClientStatCollector(CacheMiss_.Get(), QuerySize_.Get(), ParamsSize_.Get(), SessionRemovedDueBalancing_.Get(), RequestMigrated_.Get(), TClientRetryOperationStatCollector(MetricRegistryPtr_.Get(), Database_)); - } else { - return TClientStatCollector(); - } - } - - bool IsCollecting() { + } else { + return TClientStatCollector(); + } + } + + bool IsCollecting() { return MetricRegistryPtr_.Get() != nullptr; - } - + } + void IncSessionsOnHost(const TStringType& host); void DecSessionsOnHost(const TStringType& host); @@ -389,7 +389,7 @@ public: void DecGRpcInFlightByHost(const TStringType& host); void DeleteHost(const TStringType& host); -private: +private: const TStringType Database_; const NMonitoring::TLabel DatabaseLabel_; TAtomicPointer<TMetricRegistry> MetricRegistryPtr_; @@ -399,21 +399,21 @@ private: TAtomicCounter<NMonitoring::TRate> RequestFailDueNoEndpoint_; TAtomicCounter<NMonitoring::TRate> RequestFailDueTransportError_; TAtomicCounter<NMonitoring::TRate> DiscoveryFailDueTransportError_; - TAtomicPointer<NMonitoring::TIntGauge> ActiveSessions_; - TAtomicPointer<NMonitoring::TIntGauge> InPoolSessions_; - TAtomicPointer<NMonitoring::TIntGauge> SettlerSessions_; + TAtomicPointer<NMonitoring::TIntGauge> ActiveSessions_; + TAtomicPointer<NMonitoring::TIntGauge> InPoolSessions_; + TAtomicPointer<NMonitoring::TIntGauge> SettlerSessions_; TAtomicCounter<NMonitoring::TIntGauge> SessionCV_; TAtomicCounter<NMonitoring::TRate> SessionRemovedDueBalancing_; TAtomicCounter<NMonitoring::TRate> RequestMigrated_; TAtomicCounter<NMonitoring::TRate> FakeSessions_; TAtomicCounter<NMonitoring::TRate> CacheMiss_; TAtomicCounter<NMonitoring::TIntGauge> GRpcInFlight_; - TAtomicHistogram<NMonitoring::THistogram> RequestLatency_; - TAtomicHistogram<NMonitoring::THistogram> QuerySize_; - TAtomicHistogram<NMonitoring::THistogram> ParamsSize_; - TAtomicHistogram<NMonitoring::THistogram> ResultSize_; -}; - -} // namespace NSdkStats - -} // namespace Nydb + TAtomicHistogram<NMonitoring::THistogram> RequestLatency_; + TAtomicHistogram<NMonitoring::THistogram> QuerySize_; + TAtomicHistogram<NMonitoring::THistogram> ParamsSize_; + TAtomicHistogram<NMonitoring::THistogram> ResultSize_; +}; + +} // namespace NSdkStats + +} // namespace Nydb diff --git a/ydb/public/sdk/cpp/client/ydb_extension/extension.cpp b/ydb/public/sdk/cpp/client/ydb_extension/extension.cpp index 4cb966a5ce8..9df96ed6fd3 100644 --- a/ydb/public/sdk/cpp/client/ydb_extension/extension.cpp +++ b/ydb/public/sdk/cpp/client/ydb_extension/extension.cpp @@ -1,28 +1,28 @@ #include "extension.h" - -#define INCLUDE_YDB_INTERNAL_H + +#define INCLUDE_YDB_INTERNAL_H #include <ydb/public/sdk/cpp/client/impl/ydb_internal/grpc_connections/grpc_connections.h> #include <ydb/public/sdk/cpp/client/impl/ydb_internal/stats_extractor/extractor.h> -#undef INCLUDE_YDB_INTERNAL_H - -namespace NYdb { - -void IExtension::SelfRegister(TDriver driver) { - CreateInternalInterface(driver)->RegisterExtension(this); -} - -void IExtensionApi::SelfRegister(TDriver driver) { - CreateInternalInterface(driver)->RegisterExtensionApi(this); -} - -namespace NSdkStats { - +#undef INCLUDE_YDB_INTERNAL_H + +namespace NYdb { + +void IExtension::SelfRegister(TDriver driver) { + CreateInternalInterface(driver)->RegisterExtension(this); +} + +void IExtensionApi::SelfRegister(TDriver driver) { + CreateInternalInterface(driver)->RegisterExtensionApi(this); +} + +namespace NSdkStats { + IStatApi* IStatApi::Create(TDriver driver) { return new TStatsExtractor(CreateInternalInterface(driver)); -} - -} // namespace YSdkStats - +} + +} // namespace YSdkStats + class TDiscoveryMutator : public IDiscoveryMutatorApi { public: TDiscoveryMutator(std::shared_ptr<TGRpcConnectionsImpl> driverImpl) @@ -40,5 +40,5 @@ IDiscoveryMutatorApi* IDiscoveryMutatorApi::Create(TDriver driver) { return new TDiscoveryMutator(CreateInternalInterface(driver)); } -} // namespace NYdb - +} // namespace NYdb + diff --git a/ydb/public/sdk/cpp/client/ydb_extension/extension.h b/ydb/public/sdk/cpp/client/ydb_extension/extension.h index 3371bffddc5..68ff0a6ce8b 100644 --- a/ydb/public/sdk/cpp/client/ydb_extension/extension.h +++ b/ydb/public/sdk/cpp/client/ydb_extension/extension.h @@ -1,9 +1,9 @@ -#pragma once - +#pragma once + #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <library/cpp/monlib/metrics/metric_registry.h> - + namespace Ydb { namespace Discovery { @@ -12,8 +12,8 @@ class ListEndpointsResult; } } -namespace NYdb { - +namespace NYdb { + class IExtensionApi { public: friend class TDriver; @@ -31,20 +31,20 @@ private: void SelfRegister(TDriver driver); }; -namespace NSdkStats { - -class IStatApi: public IExtensionApi { -public: +namespace NSdkStats { + +class IStatApi: public IExtensionApi { +public: static IStatApi* Create(TDriver driver); -public: +public: virtual void SetMetricRegistry(NMonitoring::IMetricRegistry* sensorsRegistry) = 0; virtual void Accept(NMonitoring::IMetricConsumer* consumer) const = 0; -}; - -class DestroyedClientException: public yexception {}; - -} // namespace NSdkStats - +}; + +class DestroyedClientException: public yexception {}; + +} // namespace NSdkStats + class IDiscoveryMutatorApi: public IExtensionApi { public: @@ -56,13 +56,13 @@ public: }; -template<typename TExtension> +template<typename TExtension> void TDriver::AddExtension(typename TExtension::TParams params) { typename TExtension::IApi* api = TExtension::IApi::Create(*this); - auto extension = new TExtension(params, api); - extension->SelfRegister(*this); + auto extension = new TExtension(params, api); + extension->SelfRegister(*this); if (api) api->SelfRegister(*this); -} - -} // namespace NYdb +} + +} // namespace NYdb diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index 610fec38847..af4a21c8725 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -1319,7 +1319,7 @@ class TSessionPoolImpl { (std::shared_ptr<TTableClient::TImpl> client, const TCreateSessionSettings& settings); public: using TKeepAliveCmd = std::function<void(TSession session)>; - using TDeletePredicate = std::function<bool(TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount)>; + using TDeletePredicate = std::function<bool(TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount)>; TSessionPoolImpl(ui32 maxActiveSessions); // TAwareSessonProvider: // function is called if session pool is empty, @@ -1334,27 +1334,27 @@ public: bool DropSessionOnEndpoint(std::shared_ptr<TTableClient::TImpl> client, const TString& endpoint); // Returns true if session returned to pool successfully bool ReturnSession(TSession::TImpl* impl, bool active); - TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate); + TPeriodicCb CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, TKeepAliveCmd&& cmd, TDeletePredicate&& predicate); i64 GetActiveSessions() const; i64 GetActiveSessionsLimit() const; i64 GetCurrentPoolSize() const; void DecrementActiveCounter(); void Drain(std::function<bool(std::unique_ptr<TSession::TImpl>&&)> cb, bool close); - void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector); + void SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector collector); static void CreateFakeSession(NThreading::TPromise<TCreateSessionResult>& promise, std::shared_ptr<TTableClient::TImpl> client); private: - void UpdateStats(); - + void UpdateStats(); + mutable std::mutex Mtx_; TMultiMap<TInstant, std::unique_ptr<TSession::TImpl>> Sessions_; bool Closed_; i64 ActiveSessions_; const ui32 MaxActiveSessions_; - NSdkStats::TSessionCounter ActiveSessionsCounter_; - NSdkStats::TSessionCounter InPoolSessionsCounter_; + NSdkStats::TSessionCounter ActiveSessionsCounter_; + NSdkStats::TSessionCounter InPoolSessionsCounter_; NSdkStats::TAtomicCounter<NMonitoring::TRate> FakeSessionsCounter_; }; @@ -1471,19 +1471,19 @@ public: , Settings_(settings) , SessionPool_(Settings_.SessionPoolSettings_.MaxActiveSessions_) , SettlerPool_(0) - { - if (!DbDriverState_->StatCollector.IsCollecting()) { - return; - } - - SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector()); - SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector( - NSdkStats::TStatCollector::TSessionPoolStatCollector::EStatCollectorType::SESSIONPOOL - )); - SettlerPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector( - NSdkStats::TStatCollector::TSessionPoolStatCollector::EStatCollectorType::SETTLERPOOL - )); - } + { + if (!DbDriverState_->StatCollector.IsCollecting()) { + return; + } + + SetStatCollector(DbDriverState_->StatCollector.GetClientStatCollector()); + SessionPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector( + NSdkStats::TStatCollector::TSessionPoolStatCollector::EStatCollectorType::SESSIONPOOL + )); + SettlerPool_.SetStatCollector(DbDriverState_->StatCollector.GetSessionPoolStatCollector( + NSdkStats::TStatCollector::TSessionPoolStatCollector::EStatCollectorType::SETTLERPOOL + )); + } ~TImpl() { RequestMigrator_.Wait(); @@ -1506,10 +1506,10 @@ public: promise.SetException("no more client"); return promise.GetFuture(); } - return strong->Drain(); + return strong->Drain(); }; - - DbDriverState_->AddCb(std::move(cb), TDbDriverState::ENotifyType::STOP); + + DbDriverState_->AddCb(std::move(cb), TDbDriverState::ENotifyType::STOP); } NThreading::TFuture<void> Drain() { @@ -1557,21 +1557,21 @@ public: } void StartPeriodicSessionPoolTask() { - - auto deletePredicate = [](TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount) { - - const auto sessionPoolSettings = client->Settings_.SessionPoolSettings_; - const auto spentTime = session->GetTimeToTouchFast() - session->GetTimeInPastFast(); - - if (spentTime >= sessionPoolSettings.CloseIdleThreshold_) { - if (sessionsCount > sessionPoolSettings.MinPoolSize_) { - return true; - } - } - - return false; - }; - + + auto deletePredicate = [](TSession::TImpl* session, TTableClient::TImpl* client, size_t sessionsCount) { + + const auto sessionPoolSettings = client->Settings_.SessionPoolSettings_; + const auto spentTime = session->GetTimeToTouchFast() - session->GetTimeInPastFast(); + + if (spentTime >= sessionPoolSettings.CloseIdleThreshold_) { + if (sessionsCount > sessionPoolSettings.MinPoolSize_) { + return true; + } + } + + return false; + }; + auto keepAliveCmd = [](TSession session) { Y_VERIFY(session.GetId()); @@ -1619,8 +1619,8 @@ public: Connections_->AddPeriodicTask( SessionPool_.CreatePeriodicTask( weak, - std::move(keepAliveCmd), - std::move(deletePredicate) + std::move(keepAliveCmd), + std::move(deletePredicate) ), PERIODIC_ACTION_INTERVAL); } @@ -1679,10 +1679,10 @@ public: void StartPeriodicSettlerTask() { - auto deletePredicate = [](TSession::TImpl* , TTableClient::TImpl* , size_t) { - return false; - }; - + auto deletePredicate = [](TSession::TImpl* , TTableClient::TImpl* , size_t) { + return false; + }; + auto ttl = Settings_.SettlerSessionPoolTTL_; auto keepAliveCmd = [ttl](TSession session) { Y_VERIFY(session.GetId()); @@ -1699,8 +1699,8 @@ public: Connections_->AddPeriodicTask( SettlerPool_.CreatePeriodicTask( weak, - std::move(keepAliveCmd), - std::move(deletePredicate) + std::move(keepAliveCmd), + std::move(deletePredicate) ), SETTLER_PERIODIC_ACTION_INTERVAL); } @@ -1976,8 +1976,8 @@ public: settings.ClientTimeout_, preferedLocation); - std::weak_ptr<TDbDriverState> state = DbDriverState_; - + std::weak_ptr<TDbDriverState> state = DbDriverState_; + return createSessionPromise.GetFuture(); } @@ -2149,8 +2149,8 @@ public: return ExecuteDataQuery(session, dataQuery, txControl, params, settings, true); } - CacheMissCounter.Inc(); - + CacheMissCounter.Inc(); + return InjectSessionStatusInterception(session.SessionImpl_, ExecuteDataQueryInternal(session, query, txControl, params, settings, false), true, GetMinTimeToTouch(Settings_.SessionPoolSettings_)); @@ -2207,8 +2207,8 @@ public: promise.SetValue(std::move(prepareQueryResult)); }; - CollectQuerySize(query, QuerySizeHistogram); - + CollectQuerySize(query, QuerySizeHistogram); + Connections_->RunDeferred<Ydb::Table::V1::TableService, Ydb::Table::PrepareDataQueryRequest, Ydb::Table::PrepareDataQueryResponse>( std::move(request), extractor, @@ -2513,15 +2513,15 @@ public: return Settings_.SessionPoolSettings_.RetryLimit_; } - void SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector) { - CacheMissCounter.Set(collector.CacheMiss); - QuerySizeHistogram.Set(collector.QuerySize); - ParamsSizeHistogram.Set(collector.ParamsSize); - RetryOperationStatCollector = collector.RetryOperationStatCollector; + void SetStatCollector(const NSdkStats::TStatCollector::TClientStatCollector& collector) { + CacheMissCounter.Set(collector.CacheMiss); + QuerySizeHistogram.Set(collector.QuerySize); + ParamsSizeHistogram.Set(collector.ParamsSize); + RetryOperationStatCollector = collector.RetryOperationStatCollector; SessionRemovedDueBalancing.Set(collector.SessionRemovedDueBalancing); RequestMigrated.Set(collector.RequestMigrated); - } - + } + TAsyncBulkUpsertResult BulkUpsert(const TString& table, TValue&& rows, const TBulkUpsertSettings& settings) { auto request = MakeOperationRequest<Ydb::Table::BulkUpsertRequest>(settings); request.set_table(table); @@ -2675,42 +2675,42 @@ private: *request->mutable_parameters() = params; } - static void CollectParams( - ::google::protobuf::Map<TString, Ydb::TypedValue>* params, - NSdkStats::TAtomicHistogram<NMonitoring::THistogram> histgoram) - { - - if (params && histgoram.IsCollecting()) { - size_t size = 0; - for (auto& keyvalue: *params) { - size += keyvalue.second.ByteSizeLong(); - } - histgoram.Record(size); - } - } - - static void CollectParams( - const ::google::protobuf::Map<TString, Ydb::TypedValue>& params, - NSdkStats::TAtomicHistogram<NMonitoring::THistogram> histgoram) - { - - if (histgoram.IsCollecting()) { - size_t size = 0; - for (auto& keyvalue: params) { - size += keyvalue.second.ByteSizeLong(); - } - histgoram.Record(size); - } - } - - static void CollectQuerySize(const TString& query, NSdkStats::TAtomicHistogram<NMonitoring::THistogram>& querySizeHistogram) { - if (querySizeHistogram.IsCollecting()) { - querySizeHistogram.Record(query.size()); - } - } - - static void CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<NMonitoring::THistogram>&) {} - + static void CollectParams( + ::google::protobuf::Map<TString, Ydb::TypedValue>* params, + NSdkStats::TAtomicHistogram<NMonitoring::THistogram> histgoram) + { + + if (params && histgoram.IsCollecting()) { + size_t size = 0; + for (auto& keyvalue: *params) { + size += keyvalue.second.ByteSizeLong(); + } + histgoram.Record(size); + } + } + + static void CollectParams( + const ::google::protobuf::Map<TString, Ydb::TypedValue>& params, + NSdkStats::TAtomicHistogram<NMonitoring::THistogram> histgoram) + { + + if (histgoram.IsCollecting()) { + size_t size = 0; + for (auto& keyvalue: params) { + size += keyvalue.second.ByteSizeLong(); + } + histgoram.Record(size); + } + } + + static void CollectQuerySize(const TString& query, NSdkStats::TAtomicHistogram<NMonitoring::THistogram>& querySizeHistogram) { + if (querySizeHistogram.IsCollecting()) { + querySizeHistogram.Record(query.size()); + } + } + + static void CollectQuerySize(const TDataQuery&, NSdkStats::TAtomicHistogram<NMonitoring::THistogram>&) {} + template <typename TQueryType, typename TParamsType> TAsyncDataQueryResult ExecuteDataQueryInternal(const TSession& session, const TQueryType& query, const TTxControl& txControl, TParamsType params, @@ -2729,10 +2729,10 @@ private: request.set_collect_stats(GetStatsCollectionMode(settings.CollectQueryStats_)); SetQuery(query, request.mutable_query()); - CollectQuerySize(query, QuerySizeHistogram); + CollectQuerySize(query, QuerySizeHistogram); SetParams(params, &request); - CollectParams(params, ParamsSizeHistogram); + CollectParams(params, ParamsSizeHistogram); SetQueryCachePolicy(query, settings, request.mutable_query_cache_policy()); @@ -2855,11 +2855,11 @@ private: std::shared_ptr<TTableClient::TImpl> client, const TCreateSessionSettings& settings); -public: +public: NSdkStats::TAtomicCounter<NMonitoring::TRate> CacheMissCounter; - NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector; - NSdkStats::TAtomicHistogram<NMonitoring::THistogram> QuerySizeHistogram; - NSdkStats::TAtomicHistogram<NMonitoring::THistogram> ParamsSizeHistogram; + NSdkStats::TStatCollector::TClientRetryOperationStatCollector RetryOperationStatCollector; + NSdkStats::TAtomicHistogram<NMonitoring::THistogram> QuerySizeHistogram; + NSdkStats::TAtomicHistogram<NMonitoring::THistogram> ParamsSizeHistogram; NSdkStats::TAtomicCounter<NMonitoring::TRate> SessionRemovedDueBalancing; NSdkStats::TAtomicCounter<NMonitoring::TRate> RequestMigrated; @@ -2966,10 +2966,10 @@ TAsyncCreateSessionResult TSessionPoolImpl::GetSession( sessionImpl = std::move(it->second); Sessions_.erase(it); } - UpdateStats(); + UpdateStats(); } if (returnFakeSession) { - FakeSessionsCounter_.Inc(); + FakeSessionsCounter_.Inc(); CreateFakeSession(createSessionPromise, client); return createSessionPromise.GetFuture(); } else if (sessionImpl) { @@ -3028,7 +3028,7 @@ bool TSessionPoolImpl::ReturnSession(TSession::TImpl* impl, bool active) { ActiveSessions_--; impl->SetNeedUpdateActiveCounter(false); } - UpdateStats(); + UpdateStats(); } return true; } @@ -3053,9 +3053,9 @@ void TSessionPoolImpl::Drain(std::function<bool(std::unique_ptr<TSession::TImpl> } TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TImpl> weakClient, - TKeepAliveCmd&& cmd, TDeletePredicate&& deletePredicate) + TKeepAliveCmd&& cmd, TDeletePredicate&& deletePredicate) { - auto periodicCb = [this, weakClient, cmd=std::move(cmd), deletePredicate=std::move(deletePredicate)](NYql::TIssues&&, EStatus status) { + auto periodicCb = [this, weakClient, cmd=std::move(cmd), deletePredicate=std::move(deletePredicate)](NYql::TIssues&&, EStatus status) { if (status != EStatus::SUCCESS) { return false; } @@ -3069,8 +3069,8 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm auto keepAliveBatchSize = PERIODIC_ACTION_BATCH_SIZE; TVector<std::unique_ptr<TSession::TImpl>> sessionsToTouch; sessionsToTouch.reserve(keepAliveBatchSize); - TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete; - sessionsToDelete.reserve(keepAliveBatchSize); + TVector<std::unique_ptr<TSession::TImpl>> sessionsToDelete; + sessionsToDelete.reserve(keepAliveBatchSize); auto now = TInstant::Now(); { std::lock_guard guard(Mtx_); @@ -3080,15 +3080,15 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm while (it != sessions.end() && keepAliveBatchSize--) { if (now < it->second->GetTimeToTouchFast()) break; - - if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) { - sessionsToDelete.emplace_back(std::move(it->second)); - } else { - sessionsToTouch.emplace_back(std::move(it->second)); - } + + if (deletePredicate(it->second.get(), strongClient.get(), sessions.size())) { + sessionsToDelete.emplace_back(std::move(it->second)); + } else { + sessionsToTouch.emplace_back(std::move(it->second)); + } sessions.erase(it++); } - UpdateStats(); + UpdateStats(); } for (auto& sessionImpl : sessionsToTouch) { @@ -3101,14 +3101,14 @@ TPeriodicCb TSessionPoolImpl::CreatePeriodicTask(std::weak_ptr<TTableClient::TIm cmd(session); } } - - for (auto& sessionImpl : sessionsToDelete) { - if (sessionImpl) { - Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE || - sessionImpl->GetState() == TSession::TImpl::S_DISCONNECTED); + + for (auto& sessionImpl : sessionsToDelete) { + if (sessionImpl) { + Y_VERIFY(sessionImpl->GetState() == TSession::TImpl::S_IDLE || + sessionImpl->GetState() == TSession::TImpl::S_DISCONNECTED); TTableClient::TImpl::CloseAndDeleteSession(std::move(sessionImpl), strongClient); - } - } + } + } } return true; @@ -3171,17 +3171,17 @@ TSessionInspectorFn TSession::TImpl::GetSessionInspector( }; } -void TSessionPoolImpl::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector statCollector) { - ActiveSessionsCounter_.Set(statCollector.ActiveSessions); - InPoolSessionsCounter_.Set(statCollector.InPoolSessions); - FakeSessionsCounter_.Set(statCollector.FakeSessions); -} - -void TSessionPoolImpl::UpdateStats() { - ActiveSessionsCounter_.Apply(ActiveSessions_); - InPoolSessionsCounter_.Apply(Sessions_.size()); -} - +void TSessionPoolImpl::SetStatCollector(NSdkStats::TStatCollector::TSessionPoolStatCollector statCollector) { + ActiveSessionsCounter_.Set(statCollector.ActiveSessions); + InPoolSessionsCounter_.Set(statCollector.InPoolSessions); + FakeSessionsCounter_.Set(statCollector.FakeSessions); +} + +void TSessionPoolImpl::UpdateStats() { + ActiveSessionsCounter_.Apply(ActiveSessions_); + InPoolSessionsCounter_.Apply(Sessions_.size()); +} + TTableClient::TTableClient(const TDriver& driver, const TClientSettings& settings) : Impl_(new TImpl(CreateInternalInterface(driver), settings)) { Impl_->StartPeriodicSessionPoolTask(); @@ -3265,7 +3265,7 @@ protected: , Promise(NThreading::NewPromise<TStatus>()) , RetryNumber(0) {} - + static void RunOp(TRetryContextPtr self) { self->Execute(); } @@ -3432,7 +3432,7 @@ TAsyncStatus TTableClient::RetryOperation(TOperationWithoutSessionFunc&& operati TStatus TTableClient::RetryOperationSyncHelper(const TOperationWrapperSyncFunc& operationWrapper, const TRetryOperationSettings& settings) { TRetryState retryState; TMaybe<NYdb::TStatus> status; - + for (ui32 retryNumber = 0; retryNumber <= settings.MaxRetries_; ++retryNumber) { status = operationWrapper(retryState); @@ -3488,7 +3488,7 @@ TStatus TTableClient::RetryOperationSyncHelper(const TOperationWrapperSyncFunc& default: return *status; } - Impl_->RetryOperationStatCollector.IncSyncRetryOperation(status->GetStatus()); + Impl_->RetryOperationStatCollector.IncSyncRetryOperation(status->GetStatus()); } return *status; @@ -3905,8 +3905,8 @@ TAsyncPrepareQueryResult TSession::PrepareDataQuery(const TString& query, const return MakeFuture(result); } - Client_->CacheMissCounter.Inc(); - + Client_->CacheMissCounter.Inc(); + return InjectSessionStatusInterception( SessionImpl_, Client_->PrepareDataQuery(*this, query, settings), diff --git a/ydb/services/ydb/ut/ya.make b/ydb/services/ydb/ut/ya.make index b7209faa5f9..3cab9f5607f 100644 --- a/ydb/services/ydb/ut/ya.make +++ b/ydb/services/ydb/ut/ya.make @@ -7,15 +7,15 @@ OWNER( FORK_SUBTESTS() -IF (SANITIZER_TYPE OR WITH_VALGRIND) +IF (SANITIZER_TYPE OR WITH_VALGRIND) SPLIT_FACTOR(60) - TIMEOUT(3600) - SIZE(LARGE) - TAG(ya:fat) -ELSE() - TIMEOUT(300) - SIZE(MEDIUM) -ENDIF() + TIMEOUT(3600) + SIZE(LARGE) + TAG(ya:fat) +ELSE() + TIMEOUT(300) + SIZE(MEDIUM) +ENDIF() SRCS( ydb_bulk_upsert_ut.cpp @@ -28,7 +28,7 @@ SRCS( ydb_scripting_ut.cpp ydb_table_ut.cpp ydb_table_split_ut.cpp - ydb_stats_ut.cpp + ydb_stats_ut.cpp ydb_long_tx_ut.cpp ydb_logstore_ut.cpp ydb_olapstore_ut.cpp diff --git a/ydb/services/ydb/ydb_stats_ut.cpp b/ydb/services/ydb/ydb_stats_ut.cpp index b40e22fc42d..16494823b80 100644 --- a/ydb/services/ydb/ydb_stats_ut.cpp +++ b/ydb/services/ydb/ydb_stats_ut.cpp @@ -10,329 +10,329 @@ #include <library/cpp/monlib/encode/json/json.h> #include <util/generic/ptr.h> -#include <util/system/valgrind.h> - -struct TStatCounters { - i64 EndpointCount = 0; - i64 EndpointActive = 0; - i64 PessimizationRatio = 0; - - ui64 DiscoveryDuePessimization = 0; - ui64 DiscoveryDueExpiration = 0; - ui64 DiscoveryDueTransportError = 0; - - ui64 RequestFailDueNoEndpoint = 0; - ui64 RequestFailDueQueueOverflow = 0; - ui64 RequestFailDueTransportError = 0; - - i64 ActiveSessionsRatio = 0; - i64 ReadySessionsRatio = 0; - ui64 FakeSessions = 0; - ui64 CacheMiss = 0; +#include <util/system/valgrind.h> + +struct TStatCounters { + i64 EndpointCount = 0; + i64 EndpointActive = 0; + i64 PessimizationRatio = 0; + + ui64 DiscoveryDuePessimization = 0; + ui64 DiscoveryDueExpiration = 0; + ui64 DiscoveryDueTransportError = 0; + + ui64 RequestFailDueNoEndpoint = 0; + ui64 RequestFailDueQueueOverflow = 0; + ui64 RequestFailDueTransportError = 0; + + i64 ActiveSessionsRatio = 0; + i64 ReadySessionsRatio = 0; + ui64 FakeSessions = 0; + ui64 CacheMiss = 0; ui64 RetryOperationDueAborted = 0; -}; - +}; + class TMetricEncoder: public NMonitoring::IMetricEncoder { -public: - - enum class ECounterType: size_t { - - ENDPOINTCOUNT, - ENDPOINTACTIVE, - ENDPOINTPESSIMIZATIONRATIO, - - ACTIVESESSIONSRATIO, - READYSESSIONSRATIO, - FAKESESSIONS, - CACHEMISS, +public: + + enum class ECounterType: size_t { + + ENDPOINTCOUNT, + ENDPOINTACTIVE, + ENDPOINTPESSIMIZATIONRATIO, + + ACTIVESESSIONSRATIO, + READYSESSIONSRATIO, + FAKESESSIONS, + CACHEMISS, RETRYOPERATIONDUEABORTED, - - DISCOVERYDUEPESSIMIZATION, - DISCOVERYDUEEXPIRATION, - DISCOVERYFAILDUETRANSPORTERROR, - - REQUESTFAILDUEQUEUEOVERFLOW, - REQUESTFAILDUENOENDPOINT, - REQUESTFAILDUETRANSPORTERROR, - REQUESTLATENCY, - - UNKNOWN - }; - - void Close() override { } - - void OnStreamBegin() override { } + + DISCOVERYDUEPESSIMIZATION, + DISCOVERYDUEEXPIRATION, + DISCOVERYFAILDUETRANSPORTERROR, + + REQUESTFAILDUEQUEUEOVERFLOW, + REQUESTFAILDUENOENDPOINT, + REQUESTFAILDUETRANSPORTERROR, + REQUESTLATENCY, + + UNKNOWN + }; + + void Close() override { } + + void OnStreamBegin() override { } void OnStreamEnd() override { } - - void OnCommonTime(TInstant) override { } - + + void OnCommonTime(TInstant) override { } + void OnMetricBegin(NMonitoring::EMetricType) override { } void OnMetricEnd() override { } - - void OnLabelsBegin() override { } - void OnLabelsEnd() override { } - + + void OnLabelsBegin() override { } + void OnLabelsEnd() override { } + void OnLabel(const TStringBuf name, const TStringBuf value) override { - if (name != "sensor") { - return; - } - + if (name != "sensor") { + return; + } + if (value == "Discovery/TooManyBadEndpoints") { - State = ECounterType::DISCOVERYDUEPESSIMIZATION; + State = ECounterType::DISCOVERYDUEPESSIMIZATION; } else if (value == "Discovery/Regular") { - State = ECounterType::DISCOVERYDUEEXPIRATION; + State = ECounterType::DISCOVERYDUEEXPIRATION; } else if (value == "Request/FailedDiscoveryQueueOverflow") { - State = ECounterType::REQUESTFAILDUEQUEUEOVERFLOW; + State = ECounterType::REQUESTFAILDUEQUEUEOVERFLOW; } else if (value == "Request/FailedNoEndpoint") { - State = ECounterType::REQUESTFAILDUENOENDPOINT; + State = ECounterType::REQUESTFAILDUENOENDPOINT; } else if (value == "Request/FailedTransportError") { - State = ECounterType::REQUESTFAILDUETRANSPORTERROR; + State = ECounterType::REQUESTFAILDUETRANSPORTERROR; } else if (value == "Discovery/FailedTransportError") { - State = ECounterType::DISCOVERYFAILDUETRANSPORTERROR; + State = ECounterType::DISCOVERYFAILDUETRANSPORTERROR; } else if (value == "Endpoints/Total") { - State = ECounterType::ENDPOINTCOUNT; + State = ECounterType::ENDPOINTCOUNT; } else if (value == "Endpoints/BadRatio") { - State = ECounterType::ENDPOINTPESSIMIZATIONRATIO; + State = ECounterType::ENDPOINTPESSIMIZATIONRATIO; } else if (value == "Endpoints/Good") { - State = ECounterType::ENDPOINTACTIVE; + State = ECounterType::ENDPOINTACTIVE; } else if (value == "Sessions/InUse") { - State = ECounterType::ACTIVESESSIONSRATIO; - } else if (value == "ready sessions ratio") { - State = ECounterType::READYSESSIONSRATIO; + State = ECounterType::ACTIVESESSIONSRATIO; + } else if (value == "ready sessions ratio") { + State = ECounterType::READYSESSIONSRATIO; } else if (value == "Sessions/SessionsLimitExceeded") { - State = ECounterType::FAKESESSIONS; + State = ECounterType::FAKESESSIONS; } else if (value == "Request/ClientQueryCacheMiss") { - State = ECounterType::CACHEMISS; + State = ECounterType::CACHEMISS; } else if (value == "RetryOperation/Aborted") { State = ECounterType::RETRYOPERATIONDUEABORTED; - } else if (value == "request latency") { - State = ECounterType::REQUESTLATENCY; - } else { - State = ECounterType::UNKNOWN; - } - } - - void OnDouble(TInstant, double) override { } - - void OnInt64(TInstant, i64 value) override { - switch (State) { - case ECounterType::ENDPOINTCOUNT: Counters.EndpointCount = value; break; - case ECounterType::ENDPOINTACTIVE: Counters.EndpointActive = value; break; - case ECounterType::ENDPOINTPESSIMIZATIONRATIO: Counters.PessimizationRatio = value; break; - - case ECounterType::ACTIVESESSIONSRATIO: Counters.ActiveSessionsRatio = value; break; - case ECounterType::READYSESSIONSRATIO: Counters.ReadySessionsRatio = value; break; - default: return; - } - } - - void OnUint64(TInstant, ui64 value) override { - switch (State) { - case ECounterType::DISCOVERYDUEPESSIMIZATION: Counters.DiscoveryDuePessimization = value; break; - case ECounterType::DISCOVERYDUEEXPIRATION: Counters.DiscoveryDueExpiration = value; break; - case ECounterType::DISCOVERYFAILDUETRANSPORTERROR: Counters.DiscoveryDueTransportError = value; break; - - case ECounterType::REQUESTFAILDUENOENDPOINT: Counters.RequestFailDueNoEndpoint = value; break; - case ECounterType::REQUESTFAILDUEQUEUEOVERFLOW: Counters.RequestFailDueQueueOverflow = value; break; - case ECounterType::REQUESTFAILDUETRANSPORTERROR: Counters.RequestFailDueTransportError = value; break; - - case ECounterType::FAKESESSIONS: Counters.FakeSessions = value; break; - case ECounterType::CACHEMISS: Counters.CacheMiss = value; break; + } else if (value == "request latency") { + State = ECounterType::REQUESTLATENCY; + } else { + State = ECounterType::UNKNOWN; + } + } + + void OnDouble(TInstant, double) override { } + + void OnInt64(TInstant, i64 value) override { + switch (State) { + case ECounterType::ENDPOINTCOUNT: Counters.EndpointCount = value; break; + case ECounterType::ENDPOINTACTIVE: Counters.EndpointActive = value; break; + case ECounterType::ENDPOINTPESSIMIZATIONRATIO: Counters.PessimizationRatio = value; break; + + case ECounterType::ACTIVESESSIONSRATIO: Counters.ActiveSessionsRatio = value; break; + case ECounterType::READYSESSIONSRATIO: Counters.ReadySessionsRatio = value; break; + default: return; + } + } + + void OnUint64(TInstant, ui64 value) override { + switch (State) { + case ECounterType::DISCOVERYDUEPESSIMIZATION: Counters.DiscoveryDuePessimization = value; break; + case ECounterType::DISCOVERYDUEEXPIRATION: Counters.DiscoveryDueExpiration = value; break; + case ECounterType::DISCOVERYFAILDUETRANSPORTERROR: Counters.DiscoveryDueTransportError = value; break; + + case ECounterType::REQUESTFAILDUENOENDPOINT: Counters.RequestFailDueNoEndpoint = value; break; + case ECounterType::REQUESTFAILDUEQUEUEOVERFLOW: Counters.RequestFailDueQueueOverflow = value; break; + case ECounterType::REQUESTFAILDUETRANSPORTERROR: Counters.RequestFailDueTransportError = value; break; + + case ECounterType::FAKESESSIONS: Counters.FakeSessions = value; break; + case ECounterType::CACHEMISS: Counters.CacheMiss = value; break; case ECounterType::RETRYOPERATIONDUEABORTED: Counters.RetryOperationDueAborted = value; break; - default: return; - } - } - - void OnHistogram(TInstant, NMonitoring::IHistogramSnapshotPtr) override { } - + default: return; + } + } + + void OnHistogram(TInstant, NMonitoring::IHistogramSnapshotPtr) override { } + void OnLogHistogram(TInstant, NMonitoring::TLogHistogramSnapshotPtr) override { } void OnSummaryDouble(TInstant, NMonitoring::ISummaryDoubleSnapshotPtr) override { } - ECounterType State = ECounterType::UNKNOWN; - TStatCounters Counters; -}; - -class TCountersExtractor; - -class TCountersExtractExtension: public NYdb::IExtension { -public: - - class TParams { - public: - - TParams& SetExtractor(TCountersExtractor* extractor) { - Extractor = extractor; - return *this; - } - - NMonitoring::TLabels GetApiParams() const { - return {}; - } - - TCountersExtractor* Extractor; - }; - - using IApi = NYdb::NSdkStats::IStatApi; - - TCountersExtractExtension(const TParams& params, IApi* api); - - TStatCounters Pull() { + ECounterType State = ECounterType::UNKNOWN; + TStatCounters Counters; +}; + +class TCountersExtractor; + +class TCountersExtractExtension: public NYdb::IExtension { +public: + + class TParams { + public: + + TParams& SetExtractor(TCountersExtractor* extractor) { + Extractor = extractor; + return *this; + } + + NMonitoring::TLabels GetApiParams() const { + return {}; + } + + TCountersExtractor* Extractor; + }; + + using IApi = NYdb::NSdkStats::IStatApi; + + TCountersExtractExtension(const TParams& params, IApi* api); + + TStatCounters Pull() { TMetricEncoder extractor; - Api_->Accept(&extractor); - return extractor.Counters; - } - -private: + Api_->Accept(&extractor); + return extractor.Counters; + } + +private: std::shared_ptr<NMonitoring::TMetricRegistry> MetricRegistry_; - IApi* Api_ = nullptr; -}; - -class TCountersExtractor { -public: - TStatCounters Extract() { - return Extension_->Pull(); - } - - void Register(TCountersExtractExtension* extension) { - Extension_ = extension; - } - -private: - TCountersExtractExtension* Extension_ = nullptr; -}; - -TCountersExtractExtension::TCountersExtractExtension(const TParams& params, IApi* api) + IApi* Api_ = nullptr; +}; + +class TCountersExtractor { +public: + TStatCounters Extract() { + return Extension_->Pull(); + } + + void Register(TCountersExtractExtension* extension) { + Extension_ = extension; + } + +private: + TCountersExtractExtension* Extension_ = nullptr; +}; + +TCountersExtractExtension::TCountersExtractExtension(const TParams& params, IApi* api) : MetricRegistry_(new NMonitoring::TMetricRegistry()) , Api_(api) { api->SetMetricRegistry(MetricRegistry_.get()); params.Extractor->Register(this); } - -using namespace NYdb::NTable; - -static const TDuration OPERATION_TIMEOUT = TDuration::Seconds(NValgrind::PlainOrUnderValgrind(5, 100)); - -static NYdb::TStatus SimpleSelect(TSession session, const TString& query) { - auto txControl = NYdb::NTable::TTxControl::BeginTx().CommitTx(); - auto settings = TExecDataQuerySettings().KeepInQueryCache(true).OperationTimeout(OPERATION_TIMEOUT); - auto result = session.ExecuteDataQuery(query, txControl, settings).GetValueSync(); - return result; -} - -Y_UNIT_TEST_SUITE(ClientStatsCollector) { - Y_UNIT_TEST(CounterCacheMiss) { - NYdb::TKikimrWithGrpcAndRootSchema server; - auto endpoint = TStringBuilder() << "localhost:" << server.GetPort(); - NYdb::TDriver driver(NYdb::TDriverConfig().SetEndpoint(endpoint)); - TCountersExtractor extractor; - driver.AddExtension<TCountersExtractExtension>(TCountersExtractExtension::TParams().SetExtractor(&extractor)); - NYdb::NTable::TTableClient client(driver); - - auto createSessionResult = client.GetSession(TCreateSessionSettings().ClientTimeout(OPERATION_TIMEOUT)).GetValueSync(); - UNIT_ASSERT(createSessionResult.IsSuccess()); - - auto session = createSessionResult.GetSession(); - UNIT_ASSERT(SimpleSelect(session, "SELECT 1;").IsSuccess()); - - TStatCounters counters = extractor.Extract(); - UNIT_ASSERT_VALUES_EQUAL(counters.CacheMiss, 1); - - UNIT_ASSERT(SimpleSelect(session, "SELECT 1;").IsSuccess()); - - counters = extractor.Extract(); - UNIT_ASSERT_VALUES_EQUAL(counters.CacheMiss, 1); - - UNIT_ASSERT(SimpleSelect(session, "SELECT 2;").IsSuccess()); - - counters = extractor.Extract(); - UNIT_ASSERT_VALUES_EQUAL(counters.CacheMiss, 2); - - driver.Stop(true); - } - - Y_UNIT_TEST(CounterRetryOperation) { - NYdb::TKikimrWithGrpcAndRootSchema server; - auto endpoint = TStringBuilder() << "localhost:" << server.GetPort(); - NYdb::TDriver driver(NYdb::TDriverConfig().SetEndpoint(endpoint)); - TCountersExtractor extractor; - driver.AddExtension<TCountersExtractExtension>(TCountersExtractExtension::TParams().SetExtractor(&extractor)); - NYdb::NTable::TTableClient client(driver); - - auto retrySettings = TRetryOperationSettings().GetSessionClientTimeout(OPERATION_TIMEOUT); - - UNIT_ASSERT(client.RetryOperationSync([](TSession session){ - auto desc = TTableBuilder() - .AddNullableColumn("id", NYdb::EPrimitiveType::Uint64) - .AddNullableColumn("name", NYdb::EPrimitiveType::Utf8) - .SetPrimaryKeyColumn("id") - .Build(); - - auto settings = NYdb::NTable::TCreateTableSettings().OperationTimeout(TDuration::Seconds(100)); - return session.CreateTable("Root/names", std::move(desc), settings).GetValueSync(); - }, retrySettings).IsSuccess()); - - auto settings = TExecDataQuerySettings().OperationTimeout(OPERATION_TIMEOUT); - auto txSettings = TBeginTxSettings().OperationTimeout(OPERATION_TIMEOUT); - - auto upsertOperation = [&client, settings, retrySettings] { - UNIT_ASSERT(client.RetryOperationSync([settings](TSession session){ - auto query = Sprintf(R"( - UPSERT into [Root/names] (id, name) VALUES (1, "Alex"); - )"); - return session.ExecuteDataQuery(query, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), settings).GetValueSync(); - }, retrySettings).IsSuccess()); - }; - - const ui64 retriesCount = 2; - auto retrySelectSettings = TRetryOperationSettings().MaxRetries(retriesCount).GetSessionClientTimeout(OPERATION_TIMEOUT); - - UNIT_ASSERT(client.RetryOperationSync([&upsertOperation, settings, txSettings](TSession session){ - auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW(), txSettings).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(beginResult.IsSuccess(), true); - - auto tx = beginResult.GetTransaction(); - auto query = Sprintf(R"( - SELECT * FROM [Root/names]; + +using namespace NYdb::NTable; + +static const TDuration OPERATION_TIMEOUT = TDuration::Seconds(NValgrind::PlainOrUnderValgrind(5, 100)); + +static NYdb::TStatus SimpleSelect(TSession session, const TString& query) { + auto txControl = NYdb::NTable::TTxControl::BeginTx().CommitTx(); + auto settings = TExecDataQuerySettings().KeepInQueryCache(true).OperationTimeout(OPERATION_TIMEOUT); + auto result = session.ExecuteDataQuery(query, txControl, settings).GetValueSync(); + return result; +} + +Y_UNIT_TEST_SUITE(ClientStatsCollector) { + Y_UNIT_TEST(CounterCacheMiss) { + NYdb::TKikimrWithGrpcAndRootSchema server; + auto endpoint = TStringBuilder() << "localhost:" << server.GetPort(); + NYdb::TDriver driver(NYdb::TDriverConfig().SetEndpoint(endpoint)); + TCountersExtractor extractor; + driver.AddExtension<TCountersExtractExtension>(TCountersExtractExtension::TParams().SetExtractor(&extractor)); + NYdb::NTable::TTableClient client(driver); + + auto createSessionResult = client.GetSession(TCreateSessionSettings().ClientTimeout(OPERATION_TIMEOUT)).GetValueSync(); + UNIT_ASSERT(createSessionResult.IsSuccess()); + + auto session = createSessionResult.GetSession(); + UNIT_ASSERT(SimpleSelect(session, "SELECT 1;").IsSuccess()); + + TStatCounters counters = extractor.Extract(); + UNIT_ASSERT_VALUES_EQUAL(counters.CacheMiss, 1); + + UNIT_ASSERT(SimpleSelect(session, "SELECT 1;").IsSuccess()); + + counters = extractor.Extract(); + UNIT_ASSERT_VALUES_EQUAL(counters.CacheMiss, 1); + + UNIT_ASSERT(SimpleSelect(session, "SELECT 2;").IsSuccess()); + + counters = extractor.Extract(); + UNIT_ASSERT_VALUES_EQUAL(counters.CacheMiss, 2); + + driver.Stop(true); + } + + Y_UNIT_TEST(CounterRetryOperation) { + NYdb::TKikimrWithGrpcAndRootSchema server; + auto endpoint = TStringBuilder() << "localhost:" << server.GetPort(); + NYdb::TDriver driver(NYdb::TDriverConfig().SetEndpoint(endpoint)); + TCountersExtractor extractor; + driver.AddExtension<TCountersExtractExtension>(TCountersExtractExtension::TParams().SetExtractor(&extractor)); + NYdb::NTable::TTableClient client(driver); + + auto retrySettings = TRetryOperationSettings().GetSessionClientTimeout(OPERATION_TIMEOUT); + + UNIT_ASSERT(client.RetryOperationSync([](TSession session){ + auto desc = TTableBuilder() + .AddNullableColumn("id", NYdb::EPrimitiveType::Uint64) + .AddNullableColumn("name", NYdb::EPrimitiveType::Utf8) + .SetPrimaryKeyColumn("id") + .Build(); + + auto settings = NYdb::NTable::TCreateTableSettings().OperationTimeout(TDuration::Seconds(100)); + return session.CreateTable("Root/names", std::move(desc), settings).GetValueSync(); + }, retrySettings).IsSuccess()); + + auto settings = TExecDataQuerySettings().OperationTimeout(OPERATION_TIMEOUT); + auto txSettings = TBeginTxSettings().OperationTimeout(OPERATION_TIMEOUT); + + auto upsertOperation = [&client, settings, retrySettings] { + UNIT_ASSERT(client.RetryOperationSync([settings](TSession session){ + auto query = Sprintf(R"( + UPSERT into [Root/names] (id, name) VALUES (1, "Alex"); + )"); + return session.ExecuteDataQuery(query, TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx(), settings).GetValueSync(); + }, retrySettings).IsSuccess()); + }; + + const ui64 retriesCount = 2; + auto retrySelectSettings = TRetryOperationSettings().MaxRetries(retriesCount).GetSessionClientTimeout(OPERATION_TIMEOUT); + + UNIT_ASSERT(client.RetryOperationSync([&upsertOperation, settings, txSettings](TSession session){ + auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW(), txSettings).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(beginResult.IsSuccess(), true); + + auto tx = beginResult.GetTransaction(); + auto query = Sprintf(R"( + SELECT * FROM [Root/names]; UPSERT INTO [Root/names] (id, name) VALUES (2, "Bob"); - )"); - auto queryResult = session.ExecuteDataQuery(query, TTxControl::Tx(tx), settings).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(queryResult.IsSuccess(), true); - - upsertOperation(); - - return tx.Commit().GetValueSync(); - }, retrySelectSettings).IsSuccess() == false); - - TStatCounters counters = extractor.Extract(); + )"); + auto queryResult = session.ExecuteDataQuery(query, TTxControl::Tx(tx), settings).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(queryResult.IsSuccess(), true); + + upsertOperation(); + + return tx.Commit().GetValueSync(); + }, retrySelectSettings).IsSuccess() == false); + + TStatCounters counters = extractor.Extract(); UNIT_ASSERT_VALUES_EQUAL(counters.RetryOperationDueAborted, retriesCount); - - UNIT_ASSERT(client.RetryOperation([&upsertOperation, settings, txSettings](TSession session){ - auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW(), txSettings).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(beginResult.IsSuccess(), true); - - auto tx = beginResult.GetTransaction(); - auto query = Sprintf(R"( - SELECT * FROM [Root/names]; + + UNIT_ASSERT(client.RetryOperation([&upsertOperation, settings, txSettings](TSession session){ + auto beginResult = session.BeginTransaction(TTxSettings::SerializableRW(), txSettings).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(beginResult.IsSuccess(), true); + + auto tx = beginResult.GetTransaction(); + auto query = Sprintf(R"( + SELECT * FROM [Root/names]; UPSERT INTO [Root/names] (id, name) VALUES (2, "Bob"); - )"); - auto queryResult = session.ExecuteDataQuery(query, TTxControl::Tx(tx), settings).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL(queryResult.IsSuccess(), true); - - upsertOperation(); - + )"); + auto queryResult = session.ExecuteDataQuery(query, TTxControl::Tx(tx), settings).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL(queryResult.IsSuccess(), true); + + upsertOperation(); + return tx.Commit().Apply([](const auto& future) { return NThreading::MakeFuture<NYdb::TStatus>(future.GetValue()); }); - }, retrySelectSettings).GetValueSync().IsSuccess() == false); - - counters = extractor.Extract(); + }, retrySelectSettings).GetValueSync().IsSuccess() == false); + + counters = extractor.Extract(); // cumulative counter UNIT_ASSERT_VALUES_EQUAL(counters.RetryOperationDueAborted, retriesCount * 2); - - driver.Stop(true); - } + + driver.Stop(true); + } Y_UNIT_TEST(ExternalMetricRegistryByRawPtr) { NMonitoring::TMetricRegistry sensorsRegistry; @@ -382,4 +382,4 @@ Y_UNIT_TEST_SUITE(ClientStatsCollector) { Y_UNIT_TEST(ExternalMetricRegistryStdSharedPtr) { TestExternalMetricRegistry<std::shared_ptr>(); } -} +} |