diff options
author | dcherednik <dcherednik@ydb.tech> | 2023-06-30 20:42:35 +0300 |
---|---|---|
committer | dcherednik <dcherednik@ydb.tech> | 2023-06-30 20:42:35 +0300 |
commit | 36f2687eb7163b6e23469896f7f19a2b958fa984 (patch) | |
tree | a9673eac796ad8d1b107d373cd26e5c2d6ffe92e | |
parent | fa133227cb17a256e3e20c774312e12b779761ee (diff) | |
download | ydb-36f2687eb7163b6e23469896f7f19a2b958fa984.tar.gz |
Remove extra include dependencies for kqp.
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_tasks_graph.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/host/kqp_host.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 1 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.h | 280 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h | 287 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions.h | 3 |
6 files changed, 290 insertions, 283 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h index f26d411415f..d33fbc817c5 100644 --- a/ydb/core/kqp/executer_actor/kqp_tasks_graph.h +++ b/ydb/core/kqp/executer_actor/kqp_tasks_graph.h @@ -1,7 +1,6 @@ #pragma once #include <ydb/core/kqp/common/kqp_resolve.h> -#include <ydb/core/kqp/expr_nodes/kqp_expr_nodes.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/scheme/scheme_tabledefs.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index c82aed9c78f..6f5cdb0bae4 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -2,7 +2,6 @@ #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/kqp/provider/yql_kikimr_provider.h> -#include <ydb/core/kqp/provider/yql_kikimr_expr_nodes.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> namespace NKikimr { diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 43325c40bb3..7fed3c61e7c 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -1,4 +1,5 @@ #include "kqp_proxy_service.h" +#include "kqp_proxy_service_impl.h" #include "kqp_script_executions.h" #include <ydb/core/base/appdata.h> diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.h b/ydb/core/kqp/proxy_service/kqp_proxy_service.h index a19fcce2931..654617b6247 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.h @@ -1,10 +1,6 @@ #pragma once #include <ydb/core/base/appdata.h> -#include <ydb/core/kqp/common/kqp.h> -#include <ydb/core/kqp/counters/kqp_counters.h> -#include <ydb/core/kqp/rm_service/kqp_rm_service.h> -#include <ydb/core/protos/kqp.pb.h> #include <library/cpp/actors/core/actorid.h> @@ -12,94 +8,8 @@ namespace NKikimr::NKqp { -struct TKqpProxyRequest { - TActorId Sender; - ui64 SenderCookie = 0; - TString TraceId; - ui32 EventType; - TString SessionId; - TKqpDbCountersPtr DbCounters; - - TKqpProxyRequest(const TActorId& sender, ui64 senderCookie, const TString& traceId, - ui32 eventType) - : Sender(sender) - , SenderCookie(senderCookie) - , TraceId(traceId) - , EventType(eventType) - , SessionId() - {} - - void SetSessionId(const TString& sessionId, TKqpDbCountersPtr dbCounters) { - SessionId = sessionId; - DbCounters = dbCounters; - } -}; - - -class TKqpProxyRequestTracker { - ui64 RequestId; - THashMap<ui64, TKqpProxyRequest> PendingRequests; - -public: - TKqpProxyRequestTracker() - : RequestId(1) - {} - - ui64 RegisterRequest(const TActorId& sender, ui64 senderCookie, const TString& traceId, ui32 eventType) { - ui64 NewRequestId = ++RequestId; - PendingRequests.emplace(NewRequestId, TKqpProxyRequest(sender, senderCookie, traceId, eventType)); - return NewRequestId; - } - - const TKqpProxyRequest* FindPtr(ui64 requestId) const { - return PendingRequests.FindPtr(requestId); - } - - void SetSessionId(ui64 requestId, const TString& sessionId, TKqpDbCountersPtr dbCounters) { - TKqpProxyRequest* ptr = PendingRequests.FindPtr(requestId); - ptr->SetSessionId(sessionId, dbCounters); - } - - void Erase(ui64 requestId) { - PendingRequests.erase(requestId); - } -}; - - -template<typename TValue> -struct TProcessResult { - Ydb::StatusIds::StatusCode YdbStatus; - TString Error; - TValue Value; - bool ResourceExhausted = false; -}; - - -struct TKqpSessionInfo { - TString SessionId; - TActorId WorkerId; - TString Database; - TKqpDbCountersPtr DbCounters; - TInstant ShutdownStartedAt; - std::vector<i32> ReadyPos; - NActors::TMonotonic IdleTimeout; - // position in the idle list. - std::list<TKqpSessionInfo*>::iterator IdlePos; - - TKqpSessionInfo(const TString& sessionId, const TActorId& workerId, - const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos, - NActors::TMonotonic idleTimeout, std::list<TKqpSessionInfo*>::iterator idlePos) - : SessionId(sessionId) - , WorkerId(workerId) - , Database(database) - , DbCounters(dbCounters) - , ShutdownStartedAt() - , ReadyPos(std::move(pos)) - , IdleTimeout(idleTimeout) - , IdlePos(idlePos) - { - } -}; +class IQueryReplayBackendFactory; +struct TKqpProxySharedResources; struct TSimpleResourceStats { double Mean; @@ -131,192 +41,6 @@ struct TPeerStats { {} }; - -class TLocalSessionsRegistry { - THashMap<TString, TKqpSessionInfo> LocalSessions; - THashMap<TActorId, TString> TargetIdIndex; - THashSet<TString> ShutdownInFlightSessions; - THashMap<TString, ui32> SessionsCountPerDatabase; - std::vector<std::vector<TString>> ReadySessions; - TIntrusivePtr<IRandomProvider> RandomProvider; - std::list<TKqpSessionInfo*> IdleSessions; - -public: - TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider) - : ReadySessions(2) - , RandomProvider(randomProvider) - {} - - TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId, - const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing, - TDuration idleDuration) - { - std::vector<i32> pos(2, -1); - pos[0] = ReadySessions[0].size(); - ReadySessions[0].push_back(sessionId); - - if (supportsBalancing) { - pos[1] = ReadySessions[1].size(); - ReadySessions[1].push_back(sessionId); - } - - auto result = LocalSessions.emplace(sessionId, - TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos), - NActors::TActivationContext::Monotonic() + idleDuration, IdleSessions.end())); - SessionsCountPerDatabase[database]++; - Y_VERIFY(result.second, "Duplicate session id!"); - TargetIdIndex.emplace(workerId, sessionId); - StartIdleCheck(&(result.first->second), idleDuration); - return &result.first->second; - } - - const THashSet<TString>& GetShutdownInFlight() const { - return ShutdownInFlightSessions; - } - - TKqpSessionInfo* StartShutdownSession(const TString& sessionId) { - ShutdownInFlightSessions.emplace(sessionId); - auto ptr = LocalSessions.FindPtr(sessionId); - ptr->ShutdownStartedAt = TAppData::TimeProvider->Now(); - RemoveSessionFromLists(ptr); - return ptr; - } - - bool IsSessionIdle(const TKqpSessionInfo* sessionInfo) const { - return sessionInfo->IdlePos != IdleSessions.end(); - } - - const TKqpSessionInfo* GetIdleSession(const NActors::TMonotonic& now) { - if (IdleSessions.empty()) { - return nullptr; - } - - const TKqpSessionInfo* candidate = (*IdleSessions.begin()); - if (candidate->IdleTimeout > now) { - return nullptr; - } - - return candidate; - } - - void StartIdleCheck(const TKqpSessionInfo* sessionInfo, const TDuration idleDuration) { - if (!sessionInfo) - return; - - TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo); - - info->IdleTimeout = NActors::TActivationContext::Monotonic() + idleDuration; - if (info->IdlePos != IdleSessions.end()) { - IdleSessions.erase(info->IdlePos); - } - - info->IdlePos = IdleSessions.insert(IdleSessions.end(), info); - } - - void StopIdleCheck(const TKqpSessionInfo* sessionInfo) { - if (!sessionInfo) - return; - - TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo); - if (info->IdlePos != IdleSessions.end()) { - IdleSessions.erase(info->IdlePos); - info->IdlePos = IdleSessions.end(); - } - } - - TKqpSessionInfo* PickSessionToShutdown(bool force, ui32 minReasonableToKick) { - auto& sessions = force ? ReadySessions.at(0) : ReadySessions.at(1); - if (sessions.size() >= minReasonableToKick) { - ui64 idx = RandomProvider->GenRand() % sessions.size(); - return StartShutdownSession(sessions[idx]); - } - - return nullptr; - } - - THashMap<TString, TKqpSessionInfo>::const_iterator begin() const { - return LocalSessions.begin(); - } - - THashMap<TString, TKqpSessionInfo>::const_iterator end() const { - return LocalSessions.end(); - } - - size_t GetShutdownInFlightSize() const { - return ShutdownInFlightSessions.size(); - } - - void Erase(const TString& sessionId) { - auto it = LocalSessions.find(sessionId); - if (it != LocalSessions.end()) { - auto counter = SessionsCountPerDatabase.find(it->second.Database); - if (counter != SessionsCountPerDatabase.end()) { - counter->second--; - if (counter->second == 0) { - SessionsCountPerDatabase.erase(counter); - } - } - - StopIdleCheck(&(it->second)); - RemoveSessionFromLists(&(it->second)); - ShutdownInFlightSessions.erase(sessionId); - TargetIdIndex.erase(it->second.WorkerId); - LocalSessions.erase(it); - } - } - - bool IsPendingShutdown(const TString& sessionId) const { - return ShutdownInFlightSessions.find(sessionId) != ShutdownInFlightSessions.end(); - } - - bool CheckDatabaseLimits(const TString& database, ui32 databaseLimit) { - auto it = SessionsCountPerDatabase.find(database); - if (it == SessionsCountPerDatabase.end()){ - return true; - } - - if (it->second + 1 <= databaseLimit) { - return true; - } - - return false; - } - - size_t size() const { - return LocalSessions.size(); - } - - const TKqpSessionInfo* FindPtr(const TString& sessionId) const { - return LocalSessions.FindPtr(sessionId); - } - - void Erase(const TActorId& targetId) { - auto it = TargetIdIndex.find(targetId); - if (it != TargetIdIndex.end()){ - Erase(it->second); - } - } - -private: - void RemoveSessionFromLists(TKqpSessionInfo* ptr) { - for(ui32 i = 0; i < ptr->ReadyPos.size(); ++i) { - i32& pos = ptr->ReadyPos.at(i); - auto& sessions = ReadySessions.at(i); - if (pos != -1 && pos + 1 != static_cast<i32>(sessions.size())) { - auto& lastPos = LocalSessions.at(sessions.back()).ReadyPos.at(i); - Y_VERIFY(lastPos + 1 == static_cast<i32>(sessions.size())); - std::swap(sessions[pos], sessions[lastPos]); - lastPos = pos; - } - - if (pos != -1) { - sessions.pop_back(); - pos = -1; - } - } - } -}; - TSimpleResourceStats CalcPeerStats( const TVector<NKikimrKqp::TKqpProxyNodeResources>& data, const TString& selfDataCenterId, bool localDatacenterPolicy, std::function<double(const NKikimrKqp::TKqpProxyNodeResources& entry)> ExtractValue); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h new file mode 100644 index 00000000000..19f28f9a6cf --- /dev/null +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -0,0 +1,287 @@ +#pragma once + +#include <ydb/core/base/appdata.h> +#include <ydb/core/kqp/common/kqp.h> +#include <ydb/core/kqp/counters/kqp_counters.h> +#include <ydb/core/kqp/rm_service/kqp_rm_service.h> +#include <ydb/core/protos/kqp.pb.h> + +#include <library/cpp/actors/core/actorid.h> + +#include <util/datetime/base.h> + +namespace NKikimr::NKqp { + +struct TKqpProxyRequest { + TActorId Sender; + ui64 SenderCookie = 0; + TString TraceId; + ui32 EventType; + TString SessionId; + TKqpDbCountersPtr DbCounters; + + TKqpProxyRequest(const TActorId& sender, ui64 senderCookie, const TString& traceId, + ui32 eventType) + : Sender(sender) + , SenderCookie(senderCookie) + , TraceId(traceId) + , EventType(eventType) + , SessionId() + {} + + void SetSessionId(const TString& sessionId, TKqpDbCountersPtr dbCounters) { + SessionId = sessionId; + DbCounters = dbCounters; + } +}; + + +class TKqpProxyRequestTracker { + ui64 RequestId; + THashMap<ui64, TKqpProxyRequest> PendingRequests; + +public: + TKqpProxyRequestTracker() + : RequestId(1) + {} + + ui64 RegisterRequest(const TActorId& sender, ui64 senderCookie, const TString& traceId, ui32 eventType) { + ui64 NewRequestId = ++RequestId; + PendingRequests.emplace(NewRequestId, TKqpProxyRequest(sender, senderCookie, traceId, eventType)); + return NewRequestId; + } + + const TKqpProxyRequest* FindPtr(ui64 requestId) const { + return PendingRequests.FindPtr(requestId); + } + + void SetSessionId(ui64 requestId, const TString& sessionId, TKqpDbCountersPtr dbCounters) { + TKqpProxyRequest* ptr = PendingRequests.FindPtr(requestId); + ptr->SetSessionId(sessionId, dbCounters); + } + + void Erase(ui64 requestId) { + PendingRequests.erase(requestId); + } +}; + +template<typename TValue> +struct TProcessResult { + Ydb::StatusIds::StatusCode YdbStatus; + TString Error; + TValue Value; + bool ResourceExhausted = false; +}; + +struct TKqpSessionInfo { + TString SessionId; + TActorId WorkerId; + TString Database; + TKqpDbCountersPtr DbCounters; + TInstant ShutdownStartedAt; + std::vector<i32> ReadyPos; + NActors::TMonotonic IdleTimeout; + // position in the idle list. + std::list<TKqpSessionInfo*>::iterator IdlePos; + + TKqpSessionInfo(const TString& sessionId, const TActorId& workerId, + const TString& database, TKqpDbCountersPtr dbCounters, std::vector<i32>&& pos, + NActors::TMonotonic idleTimeout, std::list<TKqpSessionInfo*>::iterator idlePos) + : SessionId(sessionId) + , WorkerId(workerId) + , Database(database) + , DbCounters(dbCounters) + , ShutdownStartedAt() + , ReadyPos(std::move(pos)) + , IdleTimeout(idleTimeout) + , IdlePos(idlePos) + { + } +}; + +class TLocalSessionsRegistry { + THashMap<TString, TKqpSessionInfo> LocalSessions; + THashMap<TActorId, TString> TargetIdIndex; + THashSet<TString> ShutdownInFlightSessions; + THashMap<TString, ui32> SessionsCountPerDatabase; + std::vector<std::vector<TString>> ReadySessions; + TIntrusivePtr<IRandomProvider> RandomProvider; + std::list<TKqpSessionInfo*> IdleSessions; + +public: + TLocalSessionsRegistry(TIntrusivePtr<IRandomProvider> randomProvider) + : ReadySessions(2) + , RandomProvider(randomProvider) + {} + + TKqpSessionInfo* Create(const TString& sessionId, const TActorId& workerId, + const TString& database, TKqpDbCountersPtr dbCounters, bool supportsBalancing, + TDuration idleDuration) + { + std::vector<i32> pos(2, -1); + pos[0] = ReadySessions[0].size(); + ReadySessions[0].push_back(sessionId); + + if (supportsBalancing) { + pos[1] = ReadySessions[1].size(); + ReadySessions[1].push_back(sessionId); + } + + auto result = LocalSessions.emplace(sessionId, + TKqpSessionInfo(sessionId, workerId, database, dbCounters, std::move(pos), + NActors::TActivationContext::Monotonic() + idleDuration, IdleSessions.end())); + SessionsCountPerDatabase[database]++; + Y_VERIFY(result.second, "Duplicate session id!"); + TargetIdIndex.emplace(workerId, sessionId); + StartIdleCheck(&(result.first->second), idleDuration); + return &result.first->second; + } + + const THashSet<TString>& GetShutdownInFlight() const { + return ShutdownInFlightSessions; + } + + TKqpSessionInfo* StartShutdownSession(const TString& sessionId) { + ShutdownInFlightSessions.emplace(sessionId); + auto ptr = LocalSessions.FindPtr(sessionId); + ptr->ShutdownStartedAt = TAppData::TimeProvider->Now(); + RemoveSessionFromLists(ptr); + return ptr; + } + + bool IsSessionIdle(const TKqpSessionInfo* sessionInfo) const { + return sessionInfo->IdlePos != IdleSessions.end(); + } + + const TKqpSessionInfo* GetIdleSession(const NActors::TMonotonic& now) { + if (IdleSessions.empty()) { + return nullptr; + } + + const TKqpSessionInfo* candidate = (*IdleSessions.begin()); + if (candidate->IdleTimeout > now) { + return nullptr; + } + + return candidate; + } + + void StartIdleCheck(const TKqpSessionInfo* sessionInfo, const TDuration idleDuration) { + if (!sessionInfo) + return; + + TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo); + + info->IdleTimeout = NActors::TActivationContext::Monotonic() + idleDuration; + if (info->IdlePos != IdleSessions.end()) { + IdleSessions.erase(info->IdlePos); + } + + info->IdlePos = IdleSessions.insert(IdleSessions.end(), info); + } + + void StopIdleCheck(const TKqpSessionInfo* sessionInfo) { + if (!sessionInfo) + return; + + TKqpSessionInfo* info = const_cast<TKqpSessionInfo*>(sessionInfo); + if (info->IdlePos != IdleSessions.end()) { + IdleSessions.erase(info->IdlePos); + info->IdlePos = IdleSessions.end(); + } + } + + TKqpSessionInfo* PickSessionToShutdown(bool force, ui32 minReasonableToKick) { + auto& sessions = force ? ReadySessions.at(0) : ReadySessions.at(1); + if (sessions.size() >= minReasonableToKick) { + ui64 idx = RandomProvider->GenRand() % sessions.size(); + return StartShutdownSession(sessions[idx]); + } + + return nullptr; + } + + THashMap<TString, TKqpSessionInfo>::const_iterator begin() const { + return LocalSessions.begin(); + } + + THashMap<TString, TKqpSessionInfo>::const_iterator end() const { + return LocalSessions.end(); + } + + size_t GetShutdownInFlightSize() const { + return ShutdownInFlightSessions.size(); + } + + void Erase(const TString& sessionId) { + auto it = LocalSessions.find(sessionId); + if (it != LocalSessions.end()) { + auto counter = SessionsCountPerDatabase.find(it->second.Database); + if (counter != SessionsCountPerDatabase.end()) { + counter->second--; + if (counter->second == 0) { + SessionsCountPerDatabase.erase(counter); + } + } + + StopIdleCheck(&(it->second)); + RemoveSessionFromLists(&(it->second)); + ShutdownInFlightSessions.erase(sessionId); + TargetIdIndex.erase(it->second.WorkerId); + LocalSessions.erase(it); + } + } + + bool IsPendingShutdown(const TString& sessionId) const { + return ShutdownInFlightSessions.find(sessionId) != ShutdownInFlightSessions.end(); + } + + bool CheckDatabaseLimits(const TString& database, ui32 databaseLimit) { + auto it = SessionsCountPerDatabase.find(database); + if (it == SessionsCountPerDatabase.end()){ + return true; + } + + if (it->second + 1 <= databaseLimit) { + return true; + } + + return false; + } + + size_t size() const { + return LocalSessions.size(); + } + + const TKqpSessionInfo* FindPtr(const TString& sessionId) const { + return LocalSessions.FindPtr(sessionId); + } + + void Erase(const TActorId& targetId) { + auto it = TargetIdIndex.find(targetId); + if (it != TargetIdIndex.end()){ + Erase(it->second); + } + } + +private: + void RemoveSessionFromLists(TKqpSessionInfo* ptr) { + for(ui32 i = 0; i < ptr->ReadyPos.size(); ++i) { + i32& pos = ptr->ReadyPos.at(i); + auto& sessions = ReadySessions.at(i); + if (pos != -1 && pos + 1 != static_cast<i32>(sessions.size())) { + auto& lastPos = LocalSessions.at(sessions.back()).ReadyPos.at(i); + Y_VERIFY(lastPos + 1 == static_cast<i32>(sessions.size())); + std::swap(sessions[pos], sessions[lastPos]); + lastPos = pos; + } + + if (pos != -1) { + sessions.pop_back(); + pos = -1; + } + } + } +}; + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h index 33b2e3d0709..901c143e0ce 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.h +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h @@ -2,9 +2,6 @@ #include <ydb/core/kqp/common/events/script_executions.h> #include <ydb/core/kqp/common/kqp.h> #include <ydb/library/yql/public/issue/yql_issue.h> -#include <ydb/public/api/protos/draft/ydb_query.pb.h> -#include <ydb/public/api/protos/ydb_status_codes.pb.h> -#include <ydb/public/lib/operation_id/operation_id.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/actors/core/event_local.h> |