diff options
author | hor911 <hor911@yandex-team.ru> | 2022-02-10 16:50:44 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:44 +0300 |
commit | 9d76a2b677c8183292896b7d718af8cacbc6ba39 (patch) | |
tree | 74d39283ab3d7ee4ad00ec4b0930a0960357da0e | |
parent | ea46c401e7900b229add3e6074dbf89adc84ebfc (diff) | |
download | ydb-9d76a2b677c8183292896b7d718af8cacbc6ba39.tar.gz |
Restoring authorship annotation for <hor911@yandex-team.ru>. Commit 1 of 2.
77 files changed, 2586 insertions, 2586 deletions
diff --git a/library/cpp/actors/interconnect/interconnect.h b/library/cpp/actors/interconnect/interconnect.h index 225a5243fd..6c3286b76a 100644 --- a/library/cpp/actors/interconnect/interconnect.h +++ b/library/cpp/actors/interconnect/interconnect.h @@ -125,28 +125,28 @@ namespace NActors { TActorId GetNameserviceActorId(); - /** - * Const table-lookup based name service - */ - + /** + * Const table-lookup based name service + */ + IActor* CreateNameserverTable( const TIntrusivePtr<TTableNameserverSetup>& setup, ui32 poolId = 0); /** - * Name service which can be paired with external discovery service. - * Copies information from setup on the start (table may be empty). - * Handles TEvNodesInfo to change list of known nodes. - * - * If PendingPeriod is not zero, wait for unknown nodeId - */ - - IActor* CreateDynamicNameserver( - const TIntrusivePtr<TTableNameserverSetup>& setup, - const TDuration& pendingPeriod = TDuration::Zero(), - ui32 poolId = 0); - - /** + * Name service which can be paired with external discovery service. + * Copies information from setup on the start (table may be empty). + * Handles TEvNodesInfo to change list of known nodes. + * + * If PendingPeriod is not zero, wait for unknown nodeId + */ + + IActor* CreateDynamicNameserver( + const TIntrusivePtr<TTableNameserverSetup>& setup, + const TDuration& pendingPeriod = TDuration::Zero(), + ui32 poolId = 0); + + /** * Creates an actor that resolves host/port and replies with either: * * - TEvLocalNodeInfo on success @@ -158,14 +158,14 @@ namespace NActors { const TString& host, ui16 port, ui32 nodeId, const TString& defaultAddress, const TActorId& replyTo, const TActorId& replyFrom, TInstant deadline); - inline IActor* CreateResolveActor( - ui32 nodeId, const TTableNameserverSetup::TNodeInfo& nodeInfo, - const TActorId& replyTo, const TActorId& replyFrom, TInstant deadline) - { - return CreateResolveActor(nodeInfo.ResolveHost, nodeInfo.Port, nodeId, nodeInfo.Address, - replyTo, replyFrom, deadline); - } - + inline IActor* CreateResolveActor( + ui32 nodeId, const TTableNameserverSetup::TNodeInfo& nodeInfo, + const TActorId& replyTo, const TActorId& replyFrom, TInstant deadline) + { + return CreateResolveActor(nodeInfo.ResolveHost, nodeInfo.Port, nodeId, nodeInfo.Address, + replyTo, replyFrom, deadline); + } + /** * Creates an actor that resolves host/port and replies with either: * diff --git a/library/cpp/actors/interconnect/interconnect_nameserver_base.h b/library/cpp/actors/interconnect/interconnect_nameserver_base.h index df614f6c2b..6d4f370443 100644 --- a/library/cpp/actors/interconnect/interconnect_nameserver_base.h +++ b/library/cpp/actors/interconnect/interconnect_nameserver_base.h @@ -1,83 +1,83 @@ -#include "interconnect.h" -#include "interconnect_impl.h" -#include "interconnect_address.h" -#include "events_local.h" - -#include <library/cpp/actors/core/hfunc.h> -#include <library/cpp/actors/memory_log/memlog.h> - -namespace NActors { - - template<typename TDerived> - class TInterconnectNameserverBase : public TActor<TDerived> { - protected: - const TMap<ui32, TTableNameserverSetup::TNodeInfo>& NodeTable; - - TInterconnectNameserverBase(void (TDerived::*func)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) - , const TMap<ui32, TTableNameserverSetup::TNodeInfo>& nodeTable) - : TActor<TDerived>(func) - , NodeTable(nodeTable) - { - } - public: - - void HandleMissedNodeId(TEvInterconnect::TEvResolveNode::TPtr& ev, - const TActorContext& ctx, - const TInstant&) { - auto reply = new TEvLocalNodeInfo; - reply->NodeId = ev->Get()->Record.GetNodeId(); - ctx.Send(ev->Sender, reply); - } - - void Handle(TEvInterconnect::TEvResolveNode::TPtr& ev, - const TActorContext& ctx) { - const TEvInterconnect::TEvResolveNode* request = ev->Get(); - auto& record = request->Record; - const ui32 nodeId = record.GetNodeId(); - const TInstant deadline = record.HasDeadline() ? TInstant::FromValue(record.GetDeadline()) : TInstant::Max(); - auto it = NodeTable.find(nodeId); - - if (it == NodeTable.end()) { - static_cast<TDerived*>(this)->HandleMissedNodeId(ev, ctx, deadline); - } else { - IActor::RegisterWithSameMailbox( - CreateResolveActor(nodeId, it->second, ev->Sender, this->SelfId(), deadline)); - } - } - - void Handle(TEvResolveAddress::TPtr& ev, - const TActorContext&) { - const TEvResolveAddress* request = ev->Get(); - - IActor::RegisterWithSameMailbox( - CreateResolveActor(request->Address, request->Port, ev->Sender, this->SelfId(), TInstant::Max())); - } - - void Handle(TEvInterconnect::TEvListNodes::TPtr& ev, - const TActorContext& ctx) { - THolder<TEvInterconnect::TEvNodesInfo> - reply(new TEvInterconnect::TEvNodesInfo()); - reply->Nodes.reserve(NodeTable.size()); - for (const auto& pr : NodeTable) { - reply->Nodes.emplace_back(pr.first, - pr.second.Address, pr.second.Host, pr.second.ResolveHost, - pr.second.Port, pr.second.Location); - } - ctx.Send(ev->Sender, reply.Release()); - } - - void Handle(TEvInterconnect::TEvGetNode::TPtr& ev, - const TActorContext& ctx) { - ui32 nodeId = ev->Get()->NodeId; - THolder<TEvInterconnect::TEvNodeInfo> - reply(new TEvInterconnect::TEvNodeInfo(nodeId)); - auto it = NodeTable.find(nodeId); - if (it != NodeTable.end()) { - reply->Node = MakeHolder<TEvInterconnect::TNodeInfo>(it->first, it->second.Address, - it->second.Host, it->second.ResolveHost, - it->second.Port, it->second.Location); - } - ctx.Send(ev->Sender, reply.Release()); - } - }; -} +#include "interconnect.h" +#include "interconnect_impl.h" +#include "interconnect_address.h" +#include "events_local.h" + +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/memory_log/memlog.h> + +namespace NActors { + + template<typename TDerived> + class TInterconnectNameserverBase : public TActor<TDerived> { + protected: + const TMap<ui32, TTableNameserverSetup::TNodeInfo>& NodeTable; + + TInterconnectNameserverBase(void (TDerived::*func)(TAutoPtr<IEventHandle>& ev, const TActorContext& ctx) + , const TMap<ui32, TTableNameserverSetup::TNodeInfo>& nodeTable) + : TActor<TDerived>(func) + , NodeTable(nodeTable) + { + } + public: + + void HandleMissedNodeId(TEvInterconnect::TEvResolveNode::TPtr& ev, + const TActorContext& ctx, + const TInstant&) { + auto reply = new TEvLocalNodeInfo; + reply->NodeId = ev->Get()->Record.GetNodeId(); + ctx.Send(ev->Sender, reply); + } + + void Handle(TEvInterconnect::TEvResolveNode::TPtr& ev, + const TActorContext& ctx) { + const TEvInterconnect::TEvResolveNode* request = ev->Get(); + auto& record = request->Record; + const ui32 nodeId = record.GetNodeId(); + const TInstant deadline = record.HasDeadline() ? TInstant::FromValue(record.GetDeadline()) : TInstant::Max(); + auto it = NodeTable.find(nodeId); + + if (it == NodeTable.end()) { + static_cast<TDerived*>(this)->HandleMissedNodeId(ev, ctx, deadline); + } else { + IActor::RegisterWithSameMailbox( + CreateResolveActor(nodeId, it->second, ev->Sender, this->SelfId(), deadline)); + } + } + + void Handle(TEvResolveAddress::TPtr& ev, + const TActorContext&) { + const TEvResolveAddress* request = ev->Get(); + + IActor::RegisterWithSameMailbox( + CreateResolveActor(request->Address, request->Port, ev->Sender, this->SelfId(), TInstant::Max())); + } + + void Handle(TEvInterconnect::TEvListNodes::TPtr& ev, + const TActorContext& ctx) { + THolder<TEvInterconnect::TEvNodesInfo> + reply(new TEvInterconnect::TEvNodesInfo()); + reply->Nodes.reserve(NodeTable.size()); + for (const auto& pr : NodeTable) { + reply->Nodes.emplace_back(pr.first, + pr.second.Address, pr.second.Host, pr.second.ResolveHost, + pr.second.Port, pr.second.Location); + } + ctx.Send(ev->Sender, reply.Release()); + } + + void Handle(TEvInterconnect::TEvGetNode::TPtr& ev, + const TActorContext& ctx) { + ui32 nodeId = ev->Get()->NodeId; + THolder<TEvInterconnect::TEvNodeInfo> + reply(new TEvInterconnect::TEvNodeInfo(nodeId)); + auto it = NodeTable.find(nodeId); + if (it != NodeTable.end()) { + reply->Node = MakeHolder<TEvInterconnect::TNodeInfo>(it->first, it->second.Address, + it->second.Host, it->second.ResolveHost, + it->second.Port, it->second.Location); + } + ctx.Send(ev->Sender, reply.Release()); + } + }; +} diff --git a/library/cpp/actors/interconnect/interconnect_nameserver_dynamic.cpp b/library/cpp/actors/interconnect/interconnect_nameserver_dynamic.cpp index 5e48401b14..0b09285123 100644 --- a/library/cpp/actors/interconnect/interconnect_nameserver_dynamic.cpp +++ b/library/cpp/actors/interconnect/interconnect_nameserver_dynamic.cpp @@ -1,178 +1,178 @@ -#include "interconnect.h" -#include "interconnect_impl.h" -#include "interconnect_address.h" -#include "interconnect_nameserver_base.h" -#include "events_local.h" -#include "logging.h" - -#include <library/cpp/actors/core/hfunc.h> -#include <library/cpp/actors/core/log.h> - -namespace NActors { - - class TInterconnectDynamicNameserver - : public TInterconnectNameserverBase<TInterconnectDynamicNameserver> - , public TInterconnectLoggingBase - { - struct TPendingRequest { - TEvInterconnect::TEvResolveNode::TPtr Request; - TInstant Deadline; - - TPendingRequest(TEvInterconnect::TEvResolveNode::TPtr request, const TInstant& deadline) - : Request(request), Deadline(deadline) - { - } - }; - - TMap<ui32, TTableNameserverSetup::TNodeInfo> NodeTable; - TVector<TPendingRequest> PendingRequests; - TDuration PendingPeriod; - - void PrintInfo() { - TString logMsg = TStringBuilder() << "Table size: " << NodeTable.size(); - for (const auto& [nodeId, node] : NodeTable) { - TString str = TStringBuilder() << "\n > Node " << nodeId << " `" << node.Address << "`:" << node.Port << ", host: " << node.Host << ", resolveHost: " << node.ResolveHost; - logMsg += str; - } - LOG_DEBUG_IC("ICN01", "%s", logMsg.c_str()); - } - - bool IsNodeUpdated(const ui32 nodeId, const TString& address, const ui32 port) { - bool printInfo = false; - auto it = NodeTable.find(nodeId); - if (it == NodeTable.end()) { - LOG_DEBUG_IC("ICN02", "New node %u `%s`: %u", - nodeId, address.c_str(), port); - printInfo = true; - } else if (it->second.Address != address || it->second.Port != port) { - LOG_DEBUG_IC("ICN03", "Updated node %u `%s`: %u (from `%s`: %u)", - nodeId, address.c_str(), port, it->second.Address.c_str(), it->second.Port); - printInfo = true; - Send(TActivationContext::InterconnectProxy(nodeId), new TEvInterconnect::TEvDisconnect); - } - return printInfo; - } - - void DiscardTimedOutRequests(const TActorContext& ctx, ui32 compactionCount = 0) { - - auto now = Now(); - - for (auto& pending : PendingRequests) { - if (pending.Deadline > now) { - LOG_ERROR_IC("ICN06", "Unknown nodeId: %u", pending.Request->Get()->Record.GetNodeId()); - auto reply = new TEvLocalNodeInfo; - reply->NodeId = pending.Request->Get()->Record.GetNodeId(); - ctx.Send(pending.Request->Sender, reply); - pending.Request.Reset(); - compactionCount++; - } - } - - if (compactionCount) { - TVector<TPendingRequest> requests; - if (compactionCount < PendingRequests.size()) { // sanity check - requests.reserve(PendingRequests.size() - compactionCount); - } - for (auto& pending : PendingRequests) { - if (pending.Request) { - requests.emplace_back(pending.Request, pending.Deadline); - } - } - PendingRequests.swap(requests); - } - } - - void SchedulePeriodic() { - Schedule(TDuration::MilliSeconds(200), new TEvents::TEvWakeup()); - } - - public: - static constexpr EActivityType ActorActivityType() { - return NAMESERVICE; - } - - TInterconnectDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup>& setup, const TDuration& pendingPeriod, ui32 /*resolvePoolId*/ ) - : TInterconnectNameserverBase<TInterconnectDynamicNameserver>(&TInterconnectDynamicNameserver::StateFunc, NodeTable) - , NodeTable(setup->StaticNodeTable) - , PendingPeriod(pendingPeriod) - { - Y_VERIFY(setup->IsEntriesUnique()); - } - - STFUNC(StateFunc) { - try { - switch (ev->GetTypeRewrite()) { - HFunc(TEvInterconnect::TEvResolveNode, Handle); - HFunc(TEvResolveAddress, Handle); - HFunc(TEvInterconnect::TEvListNodes, Handle); - HFunc(TEvInterconnect::TEvGetNode, Handle); - HFunc(TEvInterconnect::TEvNodesInfo, HandleUpdate); - CFunc(TEvents::TEvWakeup::EventType, HandlePeriodic); - } - } catch (...) { - LOG_ERROR_IC("ICN09", "%s", CurrentExceptionMessage().c_str()); - } - } - - void HandleMissedNodeId(TEvInterconnect::TEvResolveNode::TPtr& ev, - const TActorContext& ctx, - const TInstant& deadline) { - if (PendingPeriod) { - if (PendingRequests.size() == 0) { - SchedulePeriodic(); - } - PendingRequests.emplace_back(std::move(ev), Min(deadline, Now() + PendingPeriod)); - } else { - LOG_ERROR_IC("ICN07", "Unknown nodeId: %u", ev->Get()->Record.GetNodeId()); - TInterconnectNameserverBase::HandleMissedNodeId(ev, ctx, deadline); - } - } - - void HandleUpdate(TEvInterconnect::TEvNodesInfo::TPtr& ev, - const TActorContext& ctx) { - - auto request = ev->Get(); - LOG_DEBUG_IC("ICN04", "Update TEvNodesInfo with sz: %lu ", request->Nodes.size()); - - bool printInfo = false; - ui32 compactionCount = 0; - - for (const auto& node : request->Nodes) { - printInfo |= IsNodeUpdated(node.NodeId, node.Address, node.Port); - - NodeTable[node.NodeId] = TTableNameserverSetup::TNodeInfo( - node.Address, node.Host, node.ResolveHost, node.Port, node.Location); - - for (auto& pending : PendingRequests) { - if (pending.Request->Get()->Record.GetNodeId() == node.NodeId) { - LOG_DEBUG_IC("ICN05", "Pending nodeId: %u discovered", node.NodeId); - RegisterWithSameMailbox( - CreateResolveActor(node.NodeId, NodeTable[node.NodeId], pending.Request->Sender, SelfId(), pending.Deadline)); - pending.Request.Reset(); - compactionCount++; - } - } - } - - if (printInfo) { - PrintInfo(); - } - - DiscardTimedOutRequests(ctx, compactionCount); - } - - void HandlePeriodic(const TActorContext& ctx) { - DiscardTimedOutRequests(ctx, 0); - if (PendingRequests.size()) { - SchedulePeriodic(); - } - } - }; - - IActor* CreateDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup>& setup, - const TDuration& pendingPeriod, - ui32 poolId) { - return new TInterconnectDynamicNameserver(setup, pendingPeriod, poolId); - } - -} +#include "interconnect.h" +#include "interconnect_impl.h" +#include "interconnect_address.h" +#include "interconnect_nameserver_base.h" +#include "events_local.h" +#include "logging.h" + +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + +namespace NActors { + + class TInterconnectDynamicNameserver + : public TInterconnectNameserverBase<TInterconnectDynamicNameserver> + , public TInterconnectLoggingBase + { + struct TPendingRequest { + TEvInterconnect::TEvResolveNode::TPtr Request; + TInstant Deadline; + + TPendingRequest(TEvInterconnect::TEvResolveNode::TPtr request, const TInstant& deadline) + : Request(request), Deadline(deadline) + { + } + }; + + TMap<ui32, TTableNameserverSetup::TNodeInfo> NodeTable; + TVector<TPendingRequest> PendingRequests; + TDuration PendingPeriod; + + void PrintInfo() { + TString logMsg = TStringBuilder() << "Table size: " << NodeTable.size(); + for (const auto& [nodeId, node] : NodeTable) { + TString str = TStringBuilder() << "\n > Node " << nodeId << " `" << node.Address << "`:" << node.Port << ", host: " << node.Host << ", resolveHost: " << node.ResolveHost; + logMsg += str; + } + LOG_DEBUG_IC("ICN01", "%s", logMsg.c_str()); + } + + bool IsNodeUpdated(const ui32 nodeId, const TString& address, const ui32 port) { + bool printInfo = false; + auto it = NodeTable.find(nodeId); + if (it == NodeTable.end()) { + LOG_DEBUG_IC("ICN02", "New node %u `%s`: %u", + nodeId, address.c_str(), port); + printInfo = true; + } else if (it->second.Address != address || it->second.Port != port) { + LOG_DEBUG_IC("ICN03", "Updated node %u `%s`: %u (from `%s`: %u)", + nodeId, address.c_str(), port, it->second.Address.c_str(), it->second.Port); + printInfo = true; + Send(TActivationContext::InterconnectProxy(nodeId), new TEvInterconnect::TEvDisconnect); + } + return printInfo; + } + + void DiscardTimedOutRequests(const TActorContext& ctx, ui32 compactionCount = 0) { + + auto now = Now(); + + for (auto& pending : PendingRequests) { + if (pending.Deadline > now) { + LOG_ERROR_IC("ICN06", "Unknown nodeId: %u", pending.Request->Get()->Record.GetNodeId()); + auto reply = new TEvLocalNodeInfo; + reply->NodeId = pending.Request->Get()->Record.GetNodeId(); + ctx.Send(pending.Request->Sender, reply); + pending.Request.Reset(); + compactionCount++; + } + } + + if (compactionCount) { + TVector<TPendingRequest> requests; + if (compactionCount < PendingRequests.size()) { // sanity check + requests.reserve(PendingRequests.size() - compactionCount); + } + for (auto& pending : PendingRequests) { + if (pending.Request) { + requests.emplace_back(pending.Request, pending.Deadline); + } + } + PendingRequests.swap(requests); + } + } + + void SchedulePeriodic() { + Schedule(TDuration::MilliSeconds(200), new TEvents::TEvWakeup()); + } + + public: + static constexpr EActivityType ActorActivityType() { + return NAMESERVICE; + } + + TInterconnectDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup>& setup, const TDuration& pendingPeriod, ui32 /*resolvePoolId*/ ) + : TInterconnectNameserverBase<TInterconnectDynamicNameserver>(&TInterconnectDynamicNameserver::StateFunc, NodeTable) + , NodeTable(setup->StaticNodeTable) + , PendingPeriod(pendingPeriod) + { + Y_VERIFY(setup->IsEntriesUnique()); + } + + STFUNC(StateFunc) { + try { + switch (ev->GetTypeRewrite()) { + HFunc(TEvInterconnect::TEvResolveNode, Handle); + HFunc(TEvResolveAddress, Handle); + HFunc(TEvInterconnect::TEvListNodes, Handle); + HFunc(TEvInterconnect::TEvGetNode, Handle); + HFunc(TEvInterconnect::TEvNodesInfo, HandleUpdate); + CFunc(TEvents::TEvWakeup::EventType, HandlePeriodic); + } + } catch (...) { + LOG_ERROR_IC("ICN09", "%s", CurrentExceptionMessage().c_str()); + } + } + + void HandleMissedNodeId(TEvInterconnect::TEvResolveNode::TPtr& ev, + const TActorContext& ctx, + const TInstant& deadline) { + if (PendingPeriod) { + if (PendingRequests.size() == 0) { + SchedulePeriodic(); + } + PendingRequests.emplace_back(std::move(ev), Min(deadline, Now() + PendingPeriod)); + } else { + LOG_ERROR_IC("ICN07", "Unknown nodeId: %u", ev->Get()->Record.GetNodeId()); + TInterconnectNameserverBase::HandleMissedNodeId(ev, ctx, deadline); + } + } + + void HandleUpdate(TEvInterconnect::TEvNodesInfo::TPtr& ev, + const TActorContext& ctx) { + + auto request = ev->Get(); + LOG_DEBUG_IC("ICN04", "Update TEvNodesInfo with sz: %lu ", request->Nodes.size()); + + bool printInfo = false; + ui32 compactionCount = 0; + + for (const auto& node : request->Nodes) { + printInfo |= IsNodeUpdated(node.NodeId, node.Address, node.Port); + + NodeTable[node.NodeId] = TTableNameserverSetup::TNodeInfo( + node.Address, node.Host, node.ResolveHost, node.Port, node.Location); + + for (auto& pending : PendingRequests) { + if (pending.Request->Get()->Record.GetNodeId() == node.NodeId) { + LOG_DEBUG_IC("ICN05", "Pending nodeId: %u discovered", node.NodeId); + RegisterWithSameMailbox( + CreateResolveActor(node.NodeId, NodeTable[node.NodeId], pending.Request->Sender, SelfId(), pending.Deadline)); + pending.Request.Reset(); + compactionCount++; + } + } + } + + if (printInfo) { + PrintInfo(); + } + + DiscardTimedOutRequests(ctx, compactionCount); + } + + void HandlePeriodic(const TActorContext& ctx) { + DiscardTimedOutRequests(ctx, 0); + if (PendingRequests.size()) { + SchedulePeriodic(); + } + } + }; + + IActor* CreateDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup>& setup, + const TDuration& pendingPeriod, + ui32 poolId) { + return new TInterconnectDynamicNameserver(setup, pendingPeriod, poolId); + } + +} diff --git a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp index 43419bf70d..b8eade113f 100644 --- a/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp +++ b/library/cpp/actors/interconnect/interconnect_nameserver_table.cpp @@ -1,15 +1,15 @@ #include "interconnect.h" #include "interconnect_impl.h" #include "interconnect_address.h" -#include "interconnect_nameserver_base.h" +#include "interconnect_nameserver_base.h" #include "events_local.h" #include <library/cpp/actors/core/hfunc.h> #include <library/cpp/actors/memory_log/memlog.h> namespace NActors { - - class TInterconnectNameserverTable: public TInterconnectNameserverBase<TInterconnectNameserverTable> { + + class TInterconnectNameserverTable: public TInterconnectNameserverBase<TInterconnectNameserverTable> { TIntrusivePtr<TTableNameserverSetup> Config; public: @@ -18,7 +18,7 @@ namespace NActors { } TInterconnectNameserverTable(const TIntrusivePtr<TTableNameserverSetup>& setup, ui32 /*resolvePoolId*/) - : TInterconnectNameserverBase<TInterconnectNameserverTable>(&TInterconnectNameserverTable::StateFunc, setup->StaticNodeTable) + : TInterconnectNameserverBase<TInterconnectNameserverTable>(&TInterconnectNameserverTable::StateFunc, setup->StaticNodeTable) , Config(setup) { Y_VERIFY(Config->IsEntriesUnique()); @@ -26,11 +26,11 @@ namespace NActors { STFUNC(StateFunc) { try { - switch (ev->GetTypeRewrite()) { - HFunc(TEvInterconnect::TEvResolveNode, Handle); - HFunc(TEvResolveAddress, Handle); - HFunc(TEvInterconnect::TEvListNodes, Handle); - HFunc(TEvInterconnect::TEvGetNode, Handle); + switch (ev->GetTypeRewrite()) { + HFunc(TEvInterconnect::TEvResolveNode, Handle); + HFunc(TEvResolveAddress, Handle); + HFunc(TEvInterconnect::TEvListNodes, Handle); + HFunc(TEvInterconnect::TEvGetNode, Handle); } } catch (...) { // on error - do nothing diff --git a/library/cpp/actors/interconnect/ya.make b/library/cpp/actors/interconnect/ya.make index 60d29b0fc0..f2b16af4b9 100644 --- a/library/cpp/actors/interconnect/ya.make +++ b/library/cpp/actors/interconnect/ya.make @@ -29,7 +29,7 @@ SRCS( interconnect_impl.h interconnect_mon.cpp interconnect_mon.h - interconnect_nameserver_dynamic.cpp + interconnect_nameserver_dynamic.cpp interconnect_nameserver_table.cpp interconnect_proxy_wrapper.cpp interconnect_proxy_wrapper.h diff --git a/ydb/core/base/ticket_parser.h b/ydb/core/base/ticket_parser.h index 62004b3a89..41b01fb087 100644 --- a/ydb/core/base/ticket_parser.h +++ b/ydb/core/base/ticket_parser.h @@ -162,14 +162,14 @@ template <> inline void Out<NKikimr::TEvTicketParser::TError>(IOutputStream& str, const NKikimr::TEvTicketParser::TError& error) { str << error.Message; } - -namespace NKikimr { -namespace NGRpcService { - -class ICheckerIface { -public: - virtual void SetEntries(const TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry>& entries) = 0; -}; - -} -} + +namespace NKikimr { +namespace NGRpcService { + +class ICheckerIface { +public: + virtual void SetEntries(const TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry>& entries) = 0; +}; + +} +} diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 819c1478d1..b88e8de4ab 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -577,23 +577,23 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s TActorSetupCmd(resolver, TMailboxType::HTSwap, systemPoolId)); IActor *nameservice; - - switch (nsConfig.GetType()) { - case NKikimrConfig::TStaticNameserviceConfig::NS_FIXED: - nameservice = NActors::CreateNameserverTable(table, appData->IOPoolId); - break; - case NKikimrConfig::TStaticNameserviceConfig::NS_DEFAULT: - case NKikimrConfig::TStaticNameserviceConfig::NS_NODE_BROKER: - if (Config.GetDynamicNodeConfig().HasNodeInfo()) { - auto& info = Config.GetDynamicNodeConfig().GetNodeInfo(); - nameservice = NNodeBroker::CreateDynamicNameserver(table, info, *appData->DomainsInfo, appData->IOPoolId); - } else { - nameservice = NNodeBroker::CreateDynamicNameserver(table, appData->IOPoolId); - } - break; - case NKikimrConfig::TStaticNameserviceConfig::NS_EXTERNAL: - nameservice = NActors::CreateDynamicNameserver(table, TDuration::Seconds(3), appData->IOPoolId); - break; + + switch (nsConfig.GetType()) { + case NKikimrConfig::TStaticNameserviceConfig::NS_FIXED: + nameservice = NActors::CreateNameserverTable(table, appData->IOPoolId); + break; + case NKikimrConfig::TStaticNameserviceConfig::NS_DEFAULT: + case NKikimrConfig::TStaticNameserviceConfig::NS_NODE_BROKER: + if (Config.GetDynamicNodeConfig().HasNodeInfo()) { + auto& info = Config.GetDynamicNodeConfig().GetNodeInfo(); + nameservice = NNodeBroker::CreateDynamicNameserver(table, info, *appData->DomainsInfo, appData->IOPoolId); + } else { + nameservice = NNodeBroker::CreateDynamicNameserver(table, appData->IOPoolId); + } + break; + case NKikimrConfig::TStaticNameserviceConfig::NS_EXTERNAL: + nameservice = NActors::CreateDynamicNameserver(table, TDuration::Seconds(3), appData->IOPoolId); + break; } setup->LocalServices.emplace_back( @@ -716,7 +716,7 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s setup->Interconnect.ProxyActors[destId] = TActorSetupCmd(new TInterconnectProxyTCP(destId, icCommon), TMailboxType::ReadAsFilled, interconnectPoolId); } else { - TYandexQueryInitializer::SetIcPort(node.second.second); + TYandexQueryInitializer::SetIcPort(node.second.second); icCommon->TechnicalSelfHostName = node.second.Host; TString address = "::"; //bind ipv6 interfaces by default if (node.second.first) diff --git a/ydb/core/grpc_services/rpc_analytics_internal.cpp b/ydb/core/grpc_services/rpc_analytics_internal.cpp index 48227a57fd..c120c1bf34 100644 --- a/ydb/core/grpc_services/rpc_analytics_internal.cpp +++ b/ydb/core/grpc_services/rpc_analytics_internal.cpp @@ -26,7 +26,7 @@ namespace { template <typename TEv, typename TReq> void SendResponse(const TEv& ev, TReq& req) { if (!ev->Get()->Record) { - req.RaiseIssues(ev->Get()->Issues); + req.RaiseIssues(ev->Get()->Issues); req.ReplyWithYdbStatus(ev->Get()->Status); } else { req.SendResult(*ev->Get()->Record, ev->Get()->Status); diff --git a/ydb/core/grpc_services/rpc_yq.cpp b/ydb/core/grpc_services/rpc_yq.cpp index 6fe5f868d6..66b430b343 100644 --- a/ydb/core/grpc_services/rpc_yq.cpp +++ b/ydb/core/grpc_services/rpc_yq.cpp @@ -1,7 +1,7 @@ #include "rpc_common.h" #include "rpc_deferrable.h" -#include <ydb/core/grpc_services/service_yq.h> +#include <ydb/core/grpc_services/service_yq.h> #include <ydb/core/yq/libs/audit/events/events.h> #include <ydb/core/yq/libs/audit/yq_audit_service.h> #include <ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h> @@ -20,20 +20,20 @@ namespace NGRpcService { using namespace Ydb; -template <typename RpcRequestType, typename EvRequestType, typename EvResponseType> -class TYandexQueryRequestRPC : public TRpcOperationRequestActor< - TYandexQueryRequestRPC<RpcRequestType,EvRequestType,EvResponseType>, RpcRequestType> { - -public: - using TBase = TRpcOperationRequestActor< - TYandexQueryRequestRPC<RpcRequestType,EvRequestType,EvResponseType>, - RpcRequestType>; - using TBase::Become; - using TBase::Send; - using TBase::PassAway; - using TBase::Request_; - using TBase::GetProtoRequest; - +template <typename RpcRequestType, typename EvRequestType, typename EvResponseType> +class TYandexQueryRequestRPC : public TRpcOperationRequestActor< + TYandexQueryRequestRPC<RpcRequestType,EvRequestType,EvResponseType>, RpcRequestType> { + +public: + using TBase = TRpcOperationRequestActor< + TYandexQueryRequestRPC<RpcRequestType,EvRequestType,EvResponseType>, + RpcRequestType>; + using TBase::Become; + using TBase::Send; + using TBase::PassAway; + using TBase::Request_; + using TBase::GetProtoRequest; + protected: TString Token; TString FolderId; @@ -43,38 +43,38 @@ protected: TString RequestId; public: - TYandexQueryRequestRPC(IRequestOpCtx* request) - : TBase(request) {} + TYandexQueryRequestRPC(IRequestOpCtx* request) + : TBase(request) {} void Bootstrap() { - auto requestCtx = Request_.get(); - - auto request = dynamic_cast<RpcRequestType*>(requestCtx); - Y_VERIFY(request); - - auto proxyCtx = dynamic_cast<IRequestProxyCtx*>(requestCtx); - Y_VERIFY(proxyCtx); - - PeerName = Request_->GetPeerName(); - UserAgent = Request_->GetPeerMetaValues("user-agent").GetOrElse("empty"); - RequestId = Request_->GetPeerMetaValues("x-request-id").GetOrElse(CreateGuidAsString()); - - TMaybe<TString> authToken = proxyCtx->GetYdbToken(); + auto requestCtx = Request_.get(); + + auto request = dynamic_cast<RpcRequestType*>(requestCtx); + Y_VERIFY(request); + + auto proxyCtx = dynamic_cast<IRequestProxyCtx*>(requestCtx); + Y_VERIFY(proxyCtx); + + PeerName = Request_->GetPeerName(); + UserAgent = Request_->GetPeerMetaValues("user-agent").GetOrElse("empty"); + RequestId = Request_->GetPeerMetaValues("x-request-id").GetOrElse(CreateGuidAsString()); + + TMaybe<TString> authToken = proxyCtx->GetYdbToken(); if (!authToken) { ReplyWithStatus("Token is empty", StatusIds::BAD_REQUEST); return; } Token = *authToken; - const TString scope = Request_->GetPeerMetaValues("x-yq-scope").GetOrElse(""); + const TString scope = Request_->GetPeerMetaValues("x-yq-scope").GetOrElse(""); if (!scope.StartsWith("yandexcloud://")) { - ReplyWithStatus("x-yq-scope should start with yandexcloud:// but got " + scope, StatusIds::BAD_REQUEST); + ReplyWithStatus("x-yq-scope should start with yandexcloud:// but got " + scope, StatusIds::BAD_REQUEST); return; } const TVector<TString> path = StringSplitter(scope).Split('/').SkipEmpty(); - if (path.size() != 2) { - ReplyWithStatus("x-yq-scope format is invalid. Must be yandexcloud://folder_id, but got " + scope, StatusIds::BAD_REQUEST); + if (path.size() != 2) { + ReplyWithStatus("x-yq-scope format is invalid. Must be yandexcloud://folder_id, but got " + scope, StatusIds::BAD_REQUEST); return; } @@ -89,12 +89,12 @@ public: return; } - const TString& internalToken = proxyCtx->GetInternalToken(); + const TString& internalToken = proxyCtx->GetInternalToken(); TVector<TString> permissions; if (internalToken) { NACLib::TUserToken userToken(internalToken); User = userToken.GetUserSID(); - for (const auto& sid: request->Sids) { + for (const auto& sid: request->Sids) { if (userToken.IsExist(sid)) { permissions.push_back(sid); } @@ -102,44 +102,44 @@ public: } if (!User) { - ReplyWithStatus("Authorization error. Permission denied", StatusIds::UNAUTHORIZED); + ReplyWithStatus("Authorization error. Permission denied", StatusIds::UNAUTHORIZED); return; } const auto* req = GetProtoRequest(); - auto ev = MakeHolder<EvRequestType>(FolderId, *req, User, Token, permissions); - Send(NYq::ControlPlaneProxyActorId(), ev.Release()); - Become(&TYandexQueryRequestRPC<RpcRequestType, EvRequestType, EvResponseType>::StateFunc); + auto ev = MakeHolder<EvRequestType>(FolderId, *req, User, Token, permissions); + Send(NYq::ControlPlaneProxyActorId(), ev.Release()); + Become(&TYandexQueryRequestRPC<RpcRequestType, EvRequestType, EvResponseType>::StateFunc); } protected: void ReplyWithStatus(const TString& issueMessage, StatusIds::StatusCode status) { - Request_->RaiseIssue(NYql::TIssue(issueMessage)); + Request_->RaiseIssue(NYql::TIssue(issueMessage)); Request_->ReplyWithYdbStatus(status); PassAway(); } - + STRICT_STFUNC(StateFunc, - hFunc(EvResponseType, Handle); + hFunc(EvResponseType, Handle); ) - template <typename TResponse, typename TReq> - void SendResponse(const TResponse& response, TReq& req) { + template <typename TResponse, typename TReq> + void SendResponse(const TResponse& response, TReq& req) { if (response.Issues) { - req.RaiseIssues(response.Issues); - req.ReplyWithYdbStatus(StatusIds::BAD_REQUEST); + req.RaiseIssues(response.Issues); + req.ReplyWithYdbStatus(StatusIds::BAD_REQUEST); } else { - req.SendResult(response.Result, StatusIds::SUCCESS); + req.SendResult(response.Result, StatusIds::SUCCESS); } } - template <typename TResponse, typename TReq> requires requires (TResponse r) { r.AuditDetails; } - void SendResponse(const TResponse& response, TReq& req) { + template <typename TResponse, typename TReq> requires requires (TResponse r) { r.AuditDetails; } + void SendResponse(const TResponse& response, TReq& req) { if (response.Issues) { - req.RaiseIssues(response.Issues); - req.ReplyWithYdbStatus(StatusIds::BAD_REQUEST); + req.RaiseIssues(response.Issues); + req.ReplyWithYdbStatus(StatusIds::BAD_REQUEST); } else { - req.SendResult(response.Result, StatusIds::SUCCESS); + req.SendResult(response.Result, StatusIds::SUCCESS); } NYq::TEvAuditService::TExtraInfo extraInfo{ @@ -158,199 +158,199 @@ protected: response.AuditDetails)); } - void Handle(typename EvResponseType::TPtr& ev) { - SendResponse(*ev->Get(), *Request_); + void Handle(typename EvResponseType::TPtr& ev) { + SendResponse(*ev->Get(), *Request_); PassAway(); } }; -using TYandexQueryCreateQueryRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::CreateQueryRequest, YandexQuery::CreateQueryResponse>, - NYq::TEvControlPlaneProxy::TEvCreateQueryRequest, - NYq::TEvControlPlaneProxy::TEvCreateQueryResponse>; +using TYandexQueryCreateQueryRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::CreateQueryRequest, YandexQuery::CreateQueryResponse>, + NYq::TEvControlPlaneProxy::TEvCreateQueryRequest, + NYq::TEvControlPlaneProxy::TEvCreateQueryResponse>; -void DoYandexQueryCreateQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryCreateQueryRPC(p.release())); +void DoYandexQueryCreateQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryCreateQueryRPC(p.release())); } -using TYandexQueryListQueriesRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::ListQueriesRequest, YandexQuery::ListQueriesResponse>, - NYq::TEvControlPlaneProxy::TEvListQueriesRequest, - NYq::TEvControlPlaneProxy::TEvListQueriesResponse>; +using TYandexQueryListQueriesRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ListQueriesRequest, YandexQuery::ListQueriesResponse>, + NYq::TEvControlPlaneProxy::TEvListQueriesRequest, + NYq::TEvControlPlaneProxy::TEvListQueriesResponse>; -void DoYandexQueryListQueriesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryListQueriesRPC(p.release())); +void DoYandexQueryListQueriesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryListQueriesRPC(p.release())); } -using TYandexQueryDescribeQueryRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::DescribeQueryRequest, YandexQuery::DescribeQueryResponse>, - NYq::TEvControlPlaneProxy::TEvDescribeQueryRequest, - NYq::TEvControlPlaneProxy::TEvDescribeQueryResponse>; +using TYandexQueryDescribeQueryRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DescribeQueryRequest, YandexQuery::DescribeQueryResponse>, + NYq::TEvControlPlaneProxy::TEvDescribeQueryRequest, + NYq::TEvControlPlaneProxy::TEvDescribeQueryResponse>; -void DoYandexQueryDescribeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryDescribeQueryRPC(p.release())); +void DoYandexQueryDescribeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDescribeQueryRPC(p.release())); } -using TYandexQueryGetQueryStatusRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::GetQueryStatusRequest, YandexQuery::GetQueryStatusResponse>, - NYq::TEvControlPlaneProxy::TEvGetQueryStatusRequest, - NYq::TEvControlPlaneProxy::TEvGetQueryStatusResponse>; +using TYandexQueryGetQueryStatusRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::GetQueryStatusRequest, YandexQuery::GetQueryStatusResponse>, + NYq::TEvControlPlaneProxy::TEvGetQueryStatusRequest, + NYq::TEvControlPlaneProxy::TEvGetQueryStatusResponse>; -void DoYandexQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryGetQueryStatusRPC(p.release())); +void DoYandexQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryGetQueryStatusRPC(p.release())); } -using TYandexQueryModifyQueryRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::ModifyQueryRequest, YandexQuery::ModifyQueryResponse>, - NYq::TEvControlPlaneProxy::TEvModifyQueryRequest, - NYq::TEvControlPlaneProxy::TEvModifyQueryResponse>; +using TYandexQueryModifyQueryRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ModifyQueryRequest, YandexQuery::ModifyQueryResponse>, + NYq::TEvControlPlaneProxy::TEvModifyQueryRequest, + NYq::TEvControlPlaneProxy::TEvModifyQueryResponse>; -void DoYandexQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryModifyQueryRPC(p.release())); +void DoYandexQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryModifyQueryRPC(p.release())); } -using TYandexQueryDeleteQueryRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::DeleteQueryRequest, YandexQuery::DeleteQueryResponse>, - NYq::TEvControlPlaneProxy::TEvDeleteQueryRequest, - NYq::TEvControlPlaneProxy::TEvDeleteQueryResponse>; +using TYandexQueryDeleteQueryRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DeleteQueryRequest, YandexQuery::DeleteQueryResponse>, + NYq::TEvControlPlaneProxy::TEvDeleteQueryRequest, + NYq::TEvControlPlaneProxy::TEvDeleteQueryResponse>; -void DoYandexQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryDeleteQueryRPC(p.release())); +void DoYandexQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDeleteQueryRPC(p.release())); } -using TYandexQueryControlQueryRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::ControlQueryRequest, YandexQuery::ControlQueryResponse>, - NYq::TEvControlPlaneProxy::TEvControlQueryRequest, - NYq::TEvControlPlaneProxy::TEvControlQueryResponse>; +using TYandexQueryControlQueryRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ControlQueryRequest, YandexQuery::ControlQueryResponse>, + NYq::TEvControlPlaneProxy::TEvControlQueryRequest, + NYq::TEvControlPlaneProxy::TEvControlQueryResponse>; -void DoYandexQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryControlQueryRPC(p.release())); +void DoYandexQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryControlQueryRPC(p.release())); } -using TYandexQueryGetResultDataRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::GetResultDataRequest, YandexQuery::GetResultDataResponse>, - NYq::TEvControlPlaneProxy::TEvGetResultDataRequest, - NYq::TEvControlPlaneProxy::TEvGetResultDataResponse>; +using TYandexQueryGetResultDataRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::GetResultDataRequest, YandexQuery::GetResultDataResponse>, + NYq::TEvControlPlaneProxy::TEvGetResultDataRequest, + NYq::TEvControlPlaneProxy::TEvGetResultDataResponse>; -void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryGetResultDataRPC(p.release())); +void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryGetResultDataRPC(p.release())); } -using TYandexQueryListJobsRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::ListJobsRequest, YandexQuery::ListJobsResponse>, - NYq::TEvControlPlaneProxy::TEvListJobsRequest, - NYq::TEvControlPlaneProxy::TEvListJobsResponse>; +using TYandexQueryListJobsRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ListJobsRequest, YandexQuery::ListJobsResponse>, + NYq::TEvControlPlaneProxy::TEvListJobsRequest, + NYq::TEvControlPlaneProxy::TEvListJobsResponse>; -void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryListJobsRPC(p.release())); +void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryListJobsRPC(p.release())); } -using TYandexQueryDescribeJobRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::DescribeJobRequest, YandexQuery::DescribeJobResponse>, - NYq::TEvControlPlaneProxy::TEvDescribeJobRequest, - NYq::TEvControlPlaneProxy::TEvDescribeJobResponse>; +using TYandexQueryDescribeJobRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DescribeJobRequest, YandexQuery::DescribeJobResponse>, + NYq::TEvControlPlaneProxy::TEvDescribeJobRequest, + NYq::TEvControlPlaneProxy::TEvDescribeJobResponse>; -void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryDescribeJobRPC(p.release())); +void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDescribeJobRPC(p.release())); } -using TYandexQueryCreateConnectionRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::CreateConnectionRequest, YandexQuery::CreateConnectionResponse>, - NYq::TEvControlPlaneProxy::TEvCreateConnectionRequest, - NYq::TEvControlPlaneProxy::TEvCreateConnectionResponse>; +using TYandexQueryCreateConnectionRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::CreateConnectionRequest, YandexQuery::CreateConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvCreateConnectionRequest, + NYq::TEvControlPlaneProxy::TEvCreateConnectionResponse>; -void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryCreateConnectionRPC(p.release())); +void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryCreateConnectionRPC(p.release())); } -using TYandexQueryListConnectionsRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::ListConnectionsRequest, YandexQuery::ListConnectionsResponse>, - NYq::TEvControlPlaneProxy::TEvListConnectionsRequest, - NYq::TEvControlPlaneProxy::TEvListConnectionsResponse>; +using TYandexQueryListConnectionsRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ListConnectionsRequest, YandexQuery::ListConnectionsResponse>, + NYq::TEvControlPlaneProxy::TEvListConnectionsRequest, + NYq::TEvControlPlaneProxy::TEvListConnectionsResponse>; -void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryListConnectionsRPC(p.release())); +void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryListConnectionsRPC(p.release())); } -using TYandexQueryDescribeConnectionRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::DescribeConnectionRequest, YandexQuery::DescribeConnectionResponse>, - NYq::TEvControlPlaneProxy::TEvDescribeConnectionRequest, - NYq::TEvControlPlaneProxy::TEvDescribeConnectionResponse>; +using TYandexQueryDescribeConnectionRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DescribeConnectionRequest, YandexQuery::DescribeConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvDescribeConnectionRequest, + NYq::TEvControlPlaneProxy::TEvDescribeConnectionResponse>; -void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryDescribeConnectionRPC(p.release())); +void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDescribeConnectionRPC(p.release())); } -using TYandexQueryModifyConnectionRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::ModifyConnectionRequest, YandexQuery::ModifyConnectionResponse>, - NYq::TEvControlPlaneProxy::TEvModifyConnectionRequest, - NYq::TEvControlPlaneProxy::TEvModifyConnectionResponse>; +using TYandexQueryModifyConnectionRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ModifyConnectionRequest, YandexQuery::ModifyConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvModifyConnectionRequest, + NYq::TEvControlPlaneProxy::TEvModifyConnectionResponse>; -void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryModifyConnectionRPC(p.release())); +void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryModifyConnectionRPC(p.release())); } -using TYandexQueryDeleteConnectionRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::DeleteConnectionRequest, YandexQuery::DeleteConnectionResponse>, - NYq::TEvControlPlaneProxy::TEvDeleteConnectionRequest, - NYq::TEvControlPlaneProxy::TEvDeleteConnectionResponse>; +using TYandexQueryDeleteConnectionRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DeleteConnectionRequest, YandexQuery::DeleteConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvDeleteConnectionRequest, + NYq::TEvControlPlaneProxy::TEvDeleteConnectionResponse>; -void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryDeleteConnectionRPC(p.release())); +void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDeleteConnectionRPC(p.release())); } -using TYandexQueryTestConnectionRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::TestConnectionRequest, YandexQuery::TestConnectionResponse>, - NYq::TEvControlPlaneProxy::TEvTestConnectionRequest, - NYq::TEvControlPlaneProxy::TEvTestConnectionResponse>; +using TYandexQueryTestConnectionRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::TestConnectionRequest, YandexQuery::TestConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvTestConnectionRequest, + NYq::TEvControlPlaneProxy::TEvTestConnectionResponse>; -void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryTestConnectionRPC(p.release())); +void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryTestConnectionRPC(p.release())); } -using TYandexQueryCreateBindingRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::CreateBindingRequest, YandexQuery::CreateBindingResponse>, - NYq::TEvControlPlaneProxy::TEvCreateBindingRequest, - NYq::TEvControlPlaneProxy::TEvCreateBindingResponse>; +using TYandexQueryCreateBindingRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::CreateBindingRequest, YandexQuery::CreateBindingResponse>, + NYq::TEvControlPlaneProxy::TEvCreateBindingRequest, + NYq::TEvControlPlaneProxy::TEvCreateBindingResponse>; -void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& ) { - TActivationContext::AsActorContext().Register(new TYandexQueryCreateBindingRPC(p.release())); +void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& ) { + TActivationContext::AsActorContext().Register(new TYandexQueryCreateBindingRPC(p.release())); } -using TYandexQueryListBindingsRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::ListBindingsRequest, YandexQuery::ListBindingsResponse>, - NYq::TEvControlPlaneProxy::TEvListBindingsRequest, - NYq::TEvControlPlaneProxy::TEvListBindingsResponse>; +using TYandexQueryListBindingsRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ListBindingsRequest, YandexQuery::ListBindingsResponse>, + NYq::TEvControlPlaneProxy::TEvListBindingsRequest, + NYq::TEvControlPlaneProxy::TEvListBindingsResponse>; -void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryListBindingsRPC(p.release())); +void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryListBindingsRPC(p.release())); } -using TYandexQueryDescribeBindingRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::DescribeBindingRequest, YandexQuery::DescribeBindingResponse>, - NYq::TEvControlPlaneProxy::TEvDescribeBindingRequest, - NYq::TEvControlPlaneProxy::TEvDescribeBindingResponse>; +using TYandexQueryDescribeBindingRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DescribeBindingRequest, YandexQuery::DescribeBindingResponse>, + NYq::TEvControlPlaneProxy::TEvDescribeBindingRequest, + NYq::TEvControlPlaneProxy::TEvDescribeBindingResponse>; -void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryDescribeBindingRPC(p.release())); +void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDescribeBindingRPC(p.release())); } -using TYandexQueryModifyBindingRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::ModifyBindingRequest, YandexQuery::ModifyBindingResponse>, - NYq::TEvControlPlaneProxy::TEvModifyBindingRequest, - NYq::TEvControlPlaneProxy::TEvModifyBindingResponse>; +using TYandexQueryModifyBindingRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ModifyBindingRequest, YandexQuery::ModifyBindingResponse>, + NYq::TEvControlPlaneProxy::TEvModifyBindingRequest, + NYq::TEvControlPlaneProxy::TEvModifyBindingResponse>; -void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryModifyBindingRPC(p.release())); +void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryModifyBindingRPC(p.release())); } -using TYandexQueryDeleteBindingRPC = TYandexQueryRequestRPC< - TGrpcYqRequestOperationCall<YandexQuery::DeleteBindingRequest, YandexQuery::DeleteBindingResponse>, - NYq::TEvControlPlaneProxy::TEvDeleteBindingRequest, - NYq::TEvControlPlaneProxy::TEvDeleteBindingResponse>; +using TYandexQueryDeleteBindingRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DeleteBindingRequest, YandexQuery::DeleteBindingResponse>, + NYq::TEvControlPlaneProxy::TEvDeleteBindingRequest, + NYq::TEvControlPlaneProxy::TEvDeleteBindingResponse>; -void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYandexQueryDeleteBindingRPC(p.release())); +void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDeleteBindingRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/service_yq.h b/ydb/core/grpc_services/service_yq.h index 8a9f96cdf2..dfa0a60193 100644 --- a/ydb/core/grpc_services/service_yq.h +++ b/ydb/core/grpc_services/service_yq.h @@ -1,92 +1,92 @@ -#pragma once - -#include <algorithm> -#include <memory> - -#include <ydb/core/base/ticket_parser.h> -#include <ydb/core/yq/libs/control_plane_proxy/utils.h> - -namespace NKikimr { -namespace NGRpcService { - -class IRequestOpCtx; -class IFacilityProvider; - -template <typename TReq, typename TResp> -class TGrpcYqRequestOperationCall : public TGrpcRequestOperationCall<TReq, TResp> { - -public: - using TBase = TGrpcRequestOperationCall<TReq, TResp>; - using TBase::GetProtoRequest; - using TBase::GetPeerMetaValues; - - const TVector<TString>& Permissions; - TVector<TString> Sids; - - TGrpcYqRequestOperationCall(NGrpc::IRequestContextBase* ctx, - void (*cb)(std::unique_ptr<IRequestOpCtx>, const IFacilityProvider&), - const TVector<TString>& permissions) - : TGrpcRequestOperationCall<TReq, TResp>(ctx, cb, {}), Permissions(permissions) { - } - - bool TryCustomAttributeProcess(const TSchemeBoardEvents::TDescribeSchemeResult& , ICheckerIface* iface) override { - - const TString scope = GetPeerMetaValues("x-yq-scope").GetOrElse(""); - if (scope.StartsWith("yandexcloud://")) { - const TVector<TString> path = StringSplitter(scope).Split('/').SkipEmpty(); - if (path.size() == 2) { - const TString& folderId = path.back(); - TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> entries {{ - Permissions, - { - {"folder_id", folderId}, - {"database_id", "db"} - } - }}; - std::transform(Permissions.begin(), Permissions.end(), std::back_inserter(Sids), - [](const TString& s) -> TString { return s + "@as"; }); - - auto serviceAccountId = NYq::ExtractServiceAccountId(*GetProtoRequest()); - if (serviceAccountId) { - entries.push_back({ - {"iam.serviceAccounts.use"}, - { - {"service_account_id", serviceAccountId}, - {"database_id", "db"} - }}); - Sids.push_back("iam.serviceAccounts.use@as"); - } - - iface->SetEntries(entries); - return true; - } - } - - return false; - } -}; - -void DoYandexQueryCreateQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoYandexQueryListQueriesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoYandexQueryDescribeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoYandexQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoYandexQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoYandexQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoYandexQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); - -} // namespace NGRpcService -} // namespace NKikimr +#pragma once + +#include <algorithm> +#include <memory> + +#include <ydb/core/base/ticket_parser.h> +#include <ydb/core/yq/libs/control_plane_proxy/utils.h> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +template <typename TReq, typename TResp> +class TGrpcYqRequestOperationCall : public TGrpcRequestOperationCall<TReq, TResp> { + +public: + using TBase = TGrpcRequestOperationCall<TReq, TResp>; + using TBase::GetProtoRequest; + using TBase::GetPeerMetaValues; + + const TVector<TString>& Permissions; + TVector<TString> Sids; + + TGrpcYqRequestOperationCall(NGrpc::IRequestContextBase* ctx, + void (*cb)(std::unique_ptr<IRequestOpCtx>, const IFacilityProvider&), + const TVector<TString>& permissions) + : TGrpcRequestOperationCall<TReq, TResp>(ctx, cb, {}), Permissions(permissions) { + } + + bool TryCustomAttributeProcess(const TSchemeBoardEvents::TDescribeSchemeResult& , ICheckerIface* iface) override { + + const TString scope = GetPeerMetaValues("x-yq-scope").GetOrElse(""); + if (scope.StartsWith("yandexcloud://")) { + const TVector<TString> path = StringSplitter(scope).Split('/').SkipEmpty(); + if (path.size() == 2) { + const TString& folderId = path.back(); + TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> entries {{ + Permissions, + { + {"folder_id", folderId}, + {"database_id", "db"} + } + }}; + std::transform(Permissions.begin(), Permissions.end(), std::back_inserter(Sids), + [](const TString& s) -> TString { return s + "@as"; }); + + auto serviceAccountId = NYq::ExtractServiceAccountId(*GetProtoRequest()); + if (serviceAccountId) { + entries.push_back({ + {"iam.serviceAccounts.use"}, + { + {"service_account_id", serviceAccountId}, + {"database_id", "db"} + }}); + Sids.push_back("iam.serviceAccounts.use@as"); + } + + iface->SetEntries(entries); + return true; + } + } + + return false; + } +}; + +void DoYandexQueryCreateQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryListQueriesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryDescribeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 750207da8e..7731d8f486 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -303,8 +303,8 @@ private: Ydb::StatusIds::StatusCode status = ev->Get()->Record.GetStatus(); IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); - State = NDqProto::COMPUTE_STATE_FAILURE; - ReportStateAndMaybeDie(status, issues); + State = NDqProto::COMPUTE_STATE_FAILURE; + ReportStateAndMaybeDie(status, issues); } private: diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index f8e3cd0c77..679e0a7b04 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -239,8 +239,8 @@ private: << PendingScanData.size() << " pending messages," << " stream will be terminated"); - State = NDqProto::COMPUTE_STATE_FAILURE; - ReportStateAndMaybeDie(Ydb::StatusIds::OVERLOADED, TIssues({issue})); + State = NDqProto::COMPUTE_STATE_FAILURE; + ReportStateAndMaybeDie(Ydb::StatusIds::OVERLOADED, TIssues({issue})); } void HandleExecute(TEvKqpCompute::TEvScanInitActor::TPtr& ev) { @@ -499,8 +499,8 @@ private: return; } - State = NDqProto::COMPUTE_STATE_FAILURE; - ReportStateAndMaybeDie(status, issues); + State = NDqProto::COMPUTE_STATE_FAILURE; + ReportStateAndMaybeDie(status, issues); return; } diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index d64169d4fc..3207beee63 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -111,14 +111,14 @@ message TActorSystemConfig { } message TStaticNameserviceConfig { - - enum ENameserviceType { - NS_DEFAULT = 0; // default (nodebroker) - NS_FIXED = 1; // static table lookup - NS_NODE_BROKER = 2; // nodebroker based - NS_EXTERNAL = 3; // may be paired with external discovery - } - + + enum ENameserviceType { + NS_DEFAULT = 0; // default (nodebroker) + NS_FIXED = 1; // static table lookup + NS_NODE_BROKER = 2; // nodebroker based + NS_EXTERNAL = 3; // may be paired with external discovery + } + message TEndpoint { optional string Name = 1; optional string Address = 2; @@ -145,7 +145,7 @@ message TStaticNameserviceConfig { optional string ClusterUUID = 2; repeated string AcceptUUID = 3; optional bool SuppressVersionCheck = 4; - optional ENameserviceType Type = 5; + optional ENameserviceType Type = 5; } message TDynamicNameserviceConfig { diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index c17c8a7dc3..1dd5cec5ee 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -284,7 +284,7 @@ enum EServiceKikimr { STREAMS_SERVICE = 1012; STREAMS_STORAGE_SERVICE = 1013; STREAMS_SCHEDULER_SERVICE = 1014; - STREAMS_RESOURCE_SERVICE = 1015; + STREAMS_RESOURCE_SERVICE = 1015; STREAMS_CHECKPOINT_COORDINATOR = 1016; STREAMS_CONTROL_PLANE_SERVICE = 1017; STREAMS_GRAND_LEADER_SERVICE = 1018; diff --git a/ydb/core/yq/libs/actors/logging/log.h b/ydb/core/yq/libs/actors/logging/log.h index a5a938f014..8af35e71f9 100644 --- a/ydb/core/yq/libs/actors/logging/log.h +++ b/ydb/core/yq/libs/actors/logging/log.h @@ -54,17 +54,17 @@ #define LOG_STREAMS_SCHEDULER_SERVICE_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_SCHEDULER_SERVICE, logRecordStream) #define LOG_STREAMS_SCHEDULER_SERVICE_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_SCHEDULER_SERVICE, logRecordStream) #define LOG_STREAMS_SCHEDULER_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_SCHEDULER_SERVICE, logRecordStream) - -// Component: STREAMS_RESOURCE_SERVICE. -#define LOG_STREAMS_RESOURCE_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_RESOURCE_SERVICE, logRecordStream) -#define LOG_STREAMS_RESOURCE_SERVICE_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_RESOURCE_SERVICE, logRecordStream) -#define LOG_STREAMS_RESOURCE_SERVICE_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_RESOURCE_SERVICE, logRecordStream) -#define LOG_STREAMS_RESOURCE_SERVICE_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_RESOURCE_SERVICE, logRecordStream) -#define LOG_STREAMS_RESOURCE_SERVICE_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_RESOURCE_SERVICE, logRecordStream) -#define LOG_STREAMS_RESOURCE_SERVICE_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_RESOURCE_SERVICE, logRecordStream) -#define LOG_STREAMS_RESOURCE_SERVICE_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_RESOURCE_SERVICE, logRecordStream) -#define LOG_STREAMS_RESOURCE_SERVICE_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_RESOURCE_SERVICE, logRecordStream) -#define LOG_STREAMS_RESOURCE_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_RESOURCE_SERVICE, logRecordStream) + +// Component: STREAMS_RESOURCE_SERVICE. +#define LOG_STREAMS_RESOURCE_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_RESOURCE_SERVICE, logRecordStream) +#define LOG_STREAMS_RESOURCE_SERVICE_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_RESOURCE_SERVICE, logRecordStream) +#define LOG_STREAMS_RESOURCE_SERVICE_CRIT(logRecordStream) LOG_STREAMS_IMPL(CRIT, STREAMS_RESOURCE_SERVICE, logRecordStream) +#define LOG_STREAMS_RESOURCE_SERVICE_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, STREAMS_RESOURCE_SERVICE, logRecordStream) +#define LOG_STREAMS_RESOURCE_SERVICE_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, STREAMS_RESOURCE_SERVICE, logRecordStream) +#define LOG_STREAMS_RESOURCE_SERVICE_NOTICE(logRecordStream) LOG_STREAMS_IMPL(NOTICE, STREAMS_RESOURCE_SERVICE, logRecordStream) +#define LOG_STREAMS_RESOURCE_SERVICE_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_RESOURCE_SERVICE, logRecordStream) +#define LOG_STREAMS_RESOURCE_SERVICE_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_RESOURCE_SERVICE, logRecordStream) +#define LOG_STREAMS_RESOURCE_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_RESOURCE_SERVICE, logRecordStream) // Component: STREAMS_CHECKPOINT_COORDINATOR. #define LOG_STREAMS_CHECKPOINT_COORDINATOR_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_CHECKPOINT_COORDINATOR, logRecordStream) @@ -109,7 +109,7 @@ #define LOG_STREAMS_META_STORAGE_SERVICE_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, STREAMS_META_STORAGE_SERVICE, logRecordStream) #define LOG_STREAMS_META_STORAGE_SERVICE_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, STREAMS_META_STORAGE_SERVICE, logRecordStream) #define LOG_STREAMS_META_STORAGE_SERVICE_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, STREAMS_META_STORAGE_SERVICE, logRecordStream) - + // Component: STREAMS_GRAPH_LEADER. #define LOG_STREAMS_GRAPH_LEADER_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_GRAPH_LEADER, logRecordStream) #define LOG_STREAMS_GRAPH_LEADER_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_GRAPH_LEADER, logRecordStream) diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp index 62d92aec0f..35e7abc5ae 100644 --- a/ydb/core/yq/libs/actors/nodes_manager.cpp +++ b/ydb/core/yq/libs/actors/nodes_manager.cpp @@ -57,20 +57,20 @@ public: const NDqs::TWorkerManagerCounters& workerManagerCounters, TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider, - const ::NYq::NCommon::TServiceCounters& serviceCounters, + const ::NYq::NCommon::TServiceCounters& serviceCounters, const NConfig::TPrivateApiConfig& privateApiConfig, const ui32& icPort, const TString& address, - const TString& tenant, + const TString& tenant, ui64 mkqlInitialMemoryLimit, const NMonitoring::TDynamicCounterPtr& clientCounters) : WorkerManagerCounters(workerManagerCounters) , TimeProvider(timeProvider) , RandomProvider(randomProvider) - , ServiceCounters(serviceCounters, "node_manager") + , ServiceCounters(serviceCounters, "node_manager") , PrivateApiConfig(privateApiConfig) , Tenant(tenant) - , MkqlInitialMemoryLimit(mkqlInitialMemoryLimit) + , MkqlInitialMemoryLimit(mkqlInitialMemoryLimit) , YqSharedResources(yqSharedResources) , IcPort(icPort) , Address(address) @@ -88,50 +88,50 @@ public: static constexpr char ActorName[] = "YQ_NODES_MANAGER"; void PassAway() final { - LOG_I("PassAway STOPPED"); + LOG_I("PassAway STOPPED"); NActors::IActor::PassAway(); } void Bootstrap(const TActorContext&) { Become(&TYqlNodesManagerActor::StateFunc); - ServiceCounters.Counters->GetCounter("EvBootstrap", true)->Inc(); - LOG_I("Bootstrap STARTED"); - NodesHealthCheck(); + ServiceCounters.Counters->GetCounter("EvBootstrap", true)->Inc(); + LOG_I("Bootstrap STARTED"); + NodesHealthCheck(); } private: - void Handle(NDqs::TEvAllocateWorkersRequest::TPtr& ev) { - ServiceCounters.Counters->GetCounter("EvAllocateWorkersRequest", true)->Inc(); + void Handle(NDqs::TEvAllocateWorkersRequest::TPtr& ev) { + ServiceCounters.Counters->GetCounter("EvAllocateWorkersRequest", true)->Inc(); const auto &rec = ev->Get()->Record; const auto count = rec.GetCount(); Y_ASSERT(count != 0); auto resourceId = rec.GetResourceId(); if (!resourceId) { - resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId(); + resourceId = (ui64(++ResourceIdPart) << 32) | SelfId().NodeId(); } TVector<TPeer> nodes; for (ui32 i = 0; i < count; ++i) { - TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0}; + TPeer node = {SelfId().NodeId(), InstanceId + "," + HostName(), 0, 0, 0}; if (!Peers.empty()) { - auto FirstPeer = NextPeer; - while (true) { - if (NextPeer >= Peers.size()) { - NextPeer = 0; - } - - auto& nextNode = Peers[NextPeer]; - ++NextPeer; - - if (NextPeer == FirstPeer // we closed loop w/o success, fallback to round robin then - || nextNode.MemoryLimit == 0 // not limit defined for the node - || nextNode.MemoryLimit > nextNode.MemoryAllocated + MkqlInitialMemoryLimit // memory is enough - ) { - // adjust allocated size to place next tasks correctly, will be reset after next health check - nextNode.MemoryAllocated += MkqlInitialMemoryLimit; - node = nextNode; - break; - } + auto FirstPeer = NextPeer; + while (true) { + if (NextPeer >= Peers.size()) { + NextPeer = 0; + } + + auto& nextNode = Peers[NextPeer]; + ++NextPeer; + + if (NextPeer == FirstPeer // we closed loop w/o success, fallback to round robin then + || nextNode.MemoryLimit == 0 // not limit defined for the node + || nextNode.MemoryLimit > nextNode.MemoryAllocated + MkqlInitialMemoryLimit // memory is enough + ) { + // adjust allocated size to place next tasks correctly, will be reset after next health check + nextNode.MemoryAllocated += MkqlInitialMemoryLimit; + node = nextNode; + break; + } } } nodes.push_back(node); @@ -146,50 +146,50 @@ private: *worker->MutableGuid() = node.InstanceId; worker->SetNodeId(node.NodeId); } - LOG_D("TEvAllocateWorkersResponse " << req->Record.DebugString()); + LOG_D("TEvAllocateWorkersResponse " << req->Record.DebugString()); Send(ev->Sender, req.Release()); } - void Handle(NDqs::TEvFreeWorkersNotify::TPtr&) { - ServiceCounters.Counters->GetCounter("EvFreeWorkersNotify", true)->Inc(); + void Handle(NDqs::TEvFreeWorkersNotify::TPtr&) { + ServiceCounters.Counters->GetCounter("EvFreeWorkersNotify", true)->Inc(); } STRICT_STFUNC( StateFunc, - hFunc(NActors::TEvents::TEvWakeup, HandleWakeup) - hFunc(NDqs::TEvAllocateWorkersRequest, Handle) - hFunc(NDqs::TEvFreeWorkersNotify, Handle) - hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) + hFunc(NActors::TEvents::TEvWakeup, HandleWakeup) + hFunc(NDqs::TEvAllocateWorkersRequest, Handle) + hFunc(NDqs::TEvFreeWorkersNotify, Handle) + hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) hFunc(TEvHealthNodesResponse, HandleResponse) ) - void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev) { - ServiceCounters.Counters->GetCounter("EvWakeup", true)->Inc(); + void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev) { + ServiceCounters.Counters->GetCounter("EvWakeup", true)->Inc(); auto tag = ev->Get()->Tag; switch (tag) { case WU_NodesHealthCheck: - NodesHealthCheck(); + NodesHealthCheck(); break; } } - void NodesHealthCheck() { + void NodesHealthCheck() { const TDuration ttl = TDuration::Seconds(5); - Schedule(ttl, new NActors::TEvents::TEvWakeup(WU_NodesHealthCheck)); + Schedule(ttl, new NActors::TEvents::TEvWakeup(WU_NodesHealthCheck)); - ServiceCounters.Counters->GetCounter("NodesHealthCheck", true)->Inc(); + ServiceCounters.Counters->GetCounter("NodesHealthCheck", true)->Inc(); Yq::Private::NodesHealthCheckRequest request; request.set_tenant(Tenant); - auto& node = *request.mutable_node(); - node.set_node_id(SelfId().NodeId()); - node.set_instance_id(InstanceId); - node.set_hostname(HostName()); - node.set_active_workers(AtomicGet(WorkerManagerCounters.ActiveWorkers->GetAtomic())); - node.set_memory_limit(AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic())); - node.set_memory_allocated(AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic())); + auto& node = *request.mutable_node(); + node.set_node_id(SelfId().NodeId()); + node.set_instance_id(InstanceId); + node.set_hostname(HostName()); + node.set_active_workers(AtomicGet(WorkerManagerCounters.ActiveWorkers->GetAtomic())); + node.set_memory_limit(AtomicGet(WorkerManagerCounters.MkqlMemoryLimit->GetAtomic())); + node.set_memory_allocated(AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic())); node.set_interconnect_port(IcPort); node.set_node_address(Address); const auto actorSystem = NActors::TActivationContext::ActorSystem(); @@ -206,9 +206,9 @@ private: }); } - void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&) { + void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&) { LOG_E("TYqlNodesManagerActor::OnUndelivered"); - ServiceCounters.Counters->GetCounter("OnUndelivered", true)->Inc(); + ServiceCounters.Counters->GetCounter("OnUndelivered", true)->Inc(); } void HandleResponse(TEvHealthNodesResponse::TPtr& ev) { @@ -225,8 +225,8 @@ private: Peers.clear(); for (const auto& node : res.nodes()) { - Peers.push_back({node.node_id(), node.instance_id() + "," + node.hostname(), - node.active_workers(), node.memory_limit(), node.memory_allocated()}); + Peers.push_back({node.node_id(), node.instance_id() + "," + node.hostname(), + node.active_workers(), node.memory_limit(), node.memory_allocated()}); if (node.interconnect_port()) { nodesInfo.emplace_back(TEvInterconnect::TNodeInfo{ @@ -239,8 +239,8 @@ private: } } - ServiceCounters.Counters->GetCounter("PeerCount", false)->Set(Peers.size()); - ServiceCounters.Counters->GetCounter("NodesHealthCheckOk", true)->Inc(); + ServiceCounters.Counters->GetCounter("PeerCount", false)->Set(Peers.size()); + ServiceCounters.Counters->GetCounter("NodesHealthCheckOk", true)->Inc(); LOG_D("Send NodeInfo with size: " << nodesInfo.size() << " to DynamicNameserver"); if (!nodesInfo.empty()) { @@ -248,7 +248,7 @@ private: } } catch (yexception &e) { LOG_E(e.what()); - ServiceCounters.Counters->GetCounter("NodesHealthCheckFail", true)->Inc(); + ServiceCounters.Counters->GetCounter("NodesHealthCheckFail", true)->Inc(); } } @@ -256,10 +256,10 @@ private: NDqs::TWorkerManagerCounters WorkerManagerCounters; TIntrusivePtr<ITimeProvider> TimeProvider; TIntrusivePtr<IRandomProvider> RandomProvider; - ::NYq::NCommon::TServiceCounters ServiceCounters; + ::NYq::NCommon::TServiceCounters ServiceCounters; NConfig::TPrivateApiConfig PrivateApiConfig; TString Tenant; - ui64 MkqlInitialMemoryLimit; + ui64 MkqlInitialMemoryLimit; NYq::TYqSharedResources::TPtr YqSharedResources; @@ -271,9 +271,9 @@ private: struct TPeer { ui32 NodeId; TString InstanceId; - ui64 ActiveWorkers; - ui64 MemoryLimit; - ui64 MemoryAllocated; + ui64 ActiveWorkers; + ui64 MemoryLimit; + ui64 MemoryAllocated; }; TVector<TPeer> Peers; ui32 ResourceIdPart = 0; @@ -295,12 +295,12 @@ IActor* CreateYqlNodesManager( const NDqs::TWorkerManagerCounters& workerManagerCounters, TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider, - const ::NYq::NCommon::TServiceCounters& serviceCounters, + const ::NYq::NCommon::TServiceCounters& serviceCounters, const NConfig::TPrivateApiConfig& privateApiConfig, const NYq::TYqSharedResources::TPtr& yqSharedResources, const ui32& icPort, const TString& address, - const TString& tenant, + const TString& tenant, ui64 mkqlInitialMemoryLimit, const NMonitoring::TDynamicCounterPtr& clientCounters) { return new TYqlNodesManagerActor(yqSharedResources, workerManagerCounters, diff --git a/ydb/core/yq/libs/actors/nodes_manager.h b/ydb/core/yq/libs/actors/nodes_manager.h index 7301219b58..b6c70c2e76 100644 --- a/ydb/core/yq/libs/actors/nodes_manager.h +++ b/ydb/core/yq/libs/actors/nodes_manager.h @@ -25,12 +25,12 @@ IActor* CreateYqlNodesManager( const NYql::NDqs::TWorkerManagerCounters& workerManagerCounters, TIntrusivePtr<ITimeProvider> timeProvider, TIntrusivePtr<IRandomProvider> randomProvider, - const ::NYq::NCommon::TServiceCounters& serviceCounters, + const ::NYq::NCommon::TServiceCounters& serviceCounters, const NConfig::TPrivateApiConfig& privateApiConfig, const NYq::TYqSharedResources::TPtr& yqSharedResources, const ui32& icPort, const TString& address, - const TString& tenant = "", + const TString& tenant = "", ui64 mkqlInitialMemoryLimit = 0, const NMonitoring::TDynamicCounterPtr& clientCounters = MakeIntrusive<NMonitoring::TDynamicCounters>()); diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp index 0095963c8f..7b39b181fb 100644 --- a/ydb/core/yq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp @@ -121,7 +121,7 @@ public: , TimeProvider(timeProvider) , RandomProvider(randomProvider) , DqCompFactory(dqCompFactory) - , ServiceCounters(serviceCounters, "pending_fetcher") + , ServiceCounters(serviceCounters, "pending_fetcher") , CredentialsFactory(credentialsFactory) , S3Gateway(s3Gateway) , PqCmConnections(std::move(pqCmConnections)) @@ -237,7 +237,7 @@ private: YqSharedResources->YdbDriver, S3Gateway, FunctionRegistry, RandomProvider, ModuleResolver, ModuleResolver->GetNextUniqueId(), - DqCompFactory, PqCmConnections, + DqCompFactory, PqCmConnections, CommonConfig, CheckpointCoordinatorConfig, PrivateApiConfig, GatewaysConfig, PingerConfig, task.text(), task.scope(), task.user_token(), @@ -336,7 +336,7 @@ NActors::IActor* CreatePendingFetcher( timeProvider, randomProvider, dqCompFactory, - serviceCounters, + serviceCounters, credentialsFactory, s3Gateway, std::move(pqCmConnections), diff --git a/ydb/core/yq/libs/actors/pinger.cpp b/ydb/core/yq/libs/actors/pinger.cpp index 55fc80d6a0..f2aedd53fb 100644 --- a/ydb/core/yq/libs/actors/pinger.cpp +++ b/ydb/core/yq/libs/actors/pinger.cpp @@ -314,7 +314,7 @@ private: LOG_D("Ping response success: " << ev->Get()->Result.GetResult()); StartLeaseTime = now; auto action = ev->Get()->Action; - if (action != YandexQuery::QUERY_ACTION_UNSPECIFIED && !Finishing) { + if (action != YandexQuery::QUERY_ACTION_UNSPECIFIED && !Finishing) { LOG_D("Query action: " << YandexQuery::QueryAction_Name(action)); SendQueryAction(action); } diff --git a/ydb/core/yq/libs/actors/proxy.h b/ydb/core/yq/libs/actors/proxy.h index 442d29ac44..4cc5d9a313 100644 --- a/ydb/core/yq/libs/actors/proxy.h +++ b/ydb/core/yq/libs/actors/proxy.h @@ -21,7 +21,7 @@ #include <library/cpp/random_provider/random_provider.h> #include <ydb/core/yq/libs/common/service_counters.h> - + namespace NKikimr { namespace NMiniKQL { class IFunctionRegistry; @@ -51,10 +51,10 @@ NActors::IActor* CreatePendingFetcher( const NMonitoring::TDynamicCounterPtr& clientCounters ); -NActors::IActor* CreateRunActor( - const ::NYq::NCommon::TServiceCounters& serviceCounters, +NActors::IActor* CreateRunActor( + const ::NYq::NCommon::TServiceCounters& serviceCounters, TRunActorParams&& params - ); + ); struct TResultId { TString Id; @@ -66,7 +66,7 @@ struct TResultId { NActors::IActor* CreateResultWriter( const NYdb::TDriver& driver, - const NActors::TActorId& executerId, + const NActors::TActorId& executerId, const TString& resultType, const NConfig::TPrivateApiConfig& privateApiConfig, const TResultId& resultId, diff --git a/ydb/core/yq/libs/actors/result_writer.cpp b/ydb/core/yq/libs/actors/result_writer.cpp index d51b8aae9f..27039ecf08 100644 --- a/ydb/core/yq/libs/actors/result_writer.cpp +++ b/ydb/core/yq/libs/actors/result_writer.cpp @@ -37,7 +37,7 @@ class TResultWriter : public NActors::TActorBootstrapped<TResultWriter> { public: TResultWriter( const NYdb::TDriver& driver, - const NActors::TActorId& executerId, + const NActors::TActorId& executerId, const TString& resultType, const NConfig::TPrivateApiConfig& privateApiConfig, const TResultId& resultId, @@ -45,7 +45,7 @@ public: const TString& traceId, const TInstant& deadline, const NMonitoring::TDynamicCounterPtr& clientCounters) - : ExecuterId(executerId) + : ExecuterId(executerId) , ResultBuilder(MakeHolder<TProtoBuilder>(resultType, columns)) , ResultId({resultId}) , TraceId(traceId) @@ -85,7 +85,7 @@ private: void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&, const NActors::TActorContext& ) { auto req = MakeHolder<TEvDqFailure>(TIssue("Undelivered").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR), /*retriable=*/ false, /*needFallback=*/false); - Send(ExecuterId, req.Release()); + Send(ExecuterId, req.Release()); HasError = true; } @@ -98,24 +98,24 @@ private: Send(ExecuterId, req.Release()); } - void MaybeFinish() { + void MaybeFinish() { if (Finished && Requests.empty()) { - Send(ExecuterId, new TEvGraphFinished()); + Send(ExecuterId, new TEvGraphFinished()); } } void OnQueryResult(TEvQueryResponse::TPtr& ev, const TActorContext&) { Finished = true; NYql::NDqProto::TQueryResponse queryResult(ev->Get()->Record); - + *queryResult.MutableYson() = ResultBuilder->BuildYson(Head); - if (!Issues.Empty()) { - IssuesToMessage(Issues, queryResult.MutableIssues()); + if (!Issues.Empty()) { + IssuesToMessage(Issues, queryResult.MutableIssues()); } - queryResult.SetTruncated(Truncated); - queryResult.SetRowsCount(Rows); - - Send(ExecuterId, new TEvQueryResponse(std::move(queryResult))); + queryResult.SetTruncated(Truncated); + queryResult.SetRowsCount(Rows); + + Send(ExecuterId, new TEvQueryResponse(std::move(queryResult))); } void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) { } @@ -131,7 +131,7 @@ private: if (it == Requests.end()) { HasError = true; auto req = MakeHolder<TEvDqFailure>(TIssue("Unknown RequestId").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR), /*retriable=*/ false, /*needFallback=*/false); - Send(ExecuterId, req.Release()); + Send(ExecuterId, req.Release()); return; } auto& request = it->second; @@ -309,7 +309,7 @@ private: LOG_E(CurrentExceptionMessage()); auto req = MakeHolder<TEvDqFailure>(TIssue("Internal error on data write").SetCode(NYql::DEFAULT_ERROR, TSeverityIds::S_ERROR), /*retriable=*/ false, /*needFallback=*/false); - Send(ExecuterId, req.Release()); + Send(ExecuterId, req.Release()); HasError = true; } } @@ -330,7 +330,7 @@ private: ui64 Size = 0; ui64 Rows = 0; - const TActorId ExecuterId; + const TActorId ExecuterId; THolder<TProtoBuilder> ResultBuilder; const TResultId ResultId; const TString TraceId; @@ -356,7 +356,7 @@ private: NActors::IActor* CreateResultWriter( const NYdb::TDriver& driver, - const NActors::TActorId& executerId, + const NActors::TActorId& executerId, const TString& resultType, const NConfig::TPrivateApiConfig& privateApiConfig, const TResultId& resultId, diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 5549f8f254..27acf655b0 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -64,19 +64,19 @@ #include <ydb/core/yq/libs/control_plane_storage/events/events.h> #include <google/protobuf/util/time_util.h> -#include <util/string/split.h> +#include <util/string/split.h> #include <ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h> #include <ydb/core/yq/libs/checkpointing_common/defs.h> #include <ydb/core/yq/libs/checkpoint_storage/storage_service.h> #include <ydb/core/yq/libs/db_resolver/db_async_resolver_impl.h> #include <ydb/core/yq/libs/common/database_token_builder.h> #include <ydb/core/yq/libs/private_client/private_client.h> - + #define LOG_E(stream) \ - LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, Params.QueryId << " RunActor : " << stream) + LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, Params.QueryId << " RunActor : " << stream) #define LOG_D(stream) \ - LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, Params.QueryId << " RunActor : " << stream) + LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, Params.QueryId << " RunActor : " << stream) namespace NYq { @@ -84,57 +84,57 @@ using namespace NActors; using namespace NYql; using namespace NDqs; -class TDeferredCountersCleanupActor : public NActors::TActorBootstrapped<TDeferredCountersCleanupActor> { -public: - TDeferredCountersCleanupActor( +class TDeferredCountersCleanupActor : public NActors::TActorBootstrapped<TDeferredCountersCleanupActor> { +public: + TDeferredCountersCleanupActor( const NMonitoring::TDynamicCounterPtr& rootCountersParent, const NMonitoring::TDynamicCounterPtr& publicCountersParent, const TString& queryId) : RootCountersParent(rootCountersParent) - , PublicCountersParent(publicCountersParent) - , QueryId(queryId) - { - } - + , PublicCountersParent(publicCountersParent) + , QueryId(queryId) + { + } + static constexpr char ActorName[] = "YQ_DEFERRED_COUNTERS_CLEANUP"; void Bootstrap() { Become(&TDeferredCountersCleanupActor::StateFunc, TDuration::Seconds(60), new NActors::TEvents::TEvWakeup()); - } - - STRICT_STFUNC(StateFunc, - hFunc(NActors::TEvents::TEvWakeup, Handle) - ) - - void Handle(NActors::TEvents::TEvWakeup::TPtr&) { - if (RootCountersParent) { - RootCountersParent->RemoveSubgroup("query_id", QueryId); - } - if (PublicCountersParent) { - PublicCountersParent->RemoveSubgroup("query_id", QueryId); - } - PassAway(); - } - -private: - const NMonitoring::TDynamicCounterPtr RootCountersParent; - const NMonitoring::TDynamicCounterPtr PublicCountersParent; - const TString QueryId; -}; - + } + + STRICT_STFUNC(StateFunc, + hFunc(NActors::TEvents::TEvWakeup, Handle) + ) + + void Handle(NActors::TEvents::TEvWakeup::TPtr&) { + if (RootCountersParent) { + RootCountersParent->RemoveSubgroup("query_id", QueryId); + } + if (PublicCountersParent) { + PublicCountersParent->RemoveSubgroup("query_id", QueryId); + } + PassAway(); + } + +private: + const NMonitoring::TDynamicCounterPtr RootCountersParent; + const NMonitoring::TDynamicCounterPtr PublicCountersParent; + const TString QueryId; +}; + class TRunActor : public NActors::TActorBootstrapped<TRunActor> { public: - explicit TRunActor( - const ::NYq::NCommon::TServiceCounters& serviceCounters + explicit TRunActor( + const ::NYq::NCommon::TServiceCounters& serviceCounters , TRunActorParams&& params) : Params(std::move(params)) , CreatedAt(TInstant::Now()) - , ServiceCounters(serviceCounters) - , QueryCounters(serviceCounters) + , ServiceCounters(serviceCounters) + , QueryCounters(serviceCounters) , EnableCheckpointCoordinator(Params.QueryType == YandexQuery::QueryContent::STREAMING && Params.CheckpointCoordinatorConfig.GetEnabled()) , MaxTasksPerOperation(Params.CommonConfig.GetMaxTasksPerOperation() ? Params.CommonConfig.GetMaxTasksPerOperation() : 40) - { - } + { + } static constexpr char ActorName[] = "YQ_RUN_ACTOR"; @@ -145,7 +145,7 @@ public: CreatePingerActor( Params.Scope, Params.UserId, - Params.QueryId, + Params.QueryId, Params.Owner, TPrivateClient( Params.Driver, @@ -160,29 +160,29 @@ public: Params.Deadline )); Become(&TRunActor::StateFuncWrapper<&TRunActor::StateFunc>); - try { - Run(); - } catch (const std::exception&) { - FailOnException(); - } + try { + Run(); + } catch (const std::exception&) { + FailOnException(); + } } private: template <void (TRunActor::* DelegatedStateFunc)(STFUNC_SIG)> - STFUNC(StateFuncWrapper) { - try { + STFUNC(StateFuncWrapper) { + try { (this->*DelegatedStateFunc)(ev, ctx); } catch (...) { - FailOnException(); - } - } - - STRICT_STFUNC(StateFunc, - HFunc(TEvents::TEvAsyncContinue, Handle); + FailOnException(); + } + } + + STRICT_STFUNC(StateFunc, + HFunc(TEvents::TEvAsyncContinue, Handle); hFunc(NActors::TEvents::TEvUndelivered, Handle); hFunc(NYq::TEvents::TEvGraphParams, Handle); hFunc(NYq::TEvents::TEvDataStreamsReadRulesCreationResult, Handle); - hFunc(NYql::NDqs::TEvQueryResponse, Handle); + hFunc(NYql::NDqs::TEvQueryResponse, Handle); hFunc(TEvents::TEvQueryActionResult, Handle); hFunc(TEvents::TEvForwardPingResponse, Handle); hFunc(TEvCheckpointCoordinator::TEvZeroCheckpointDone, Handle); @@ -236,70 +236,70 @@ private: } void PassAway() override { - if (!Params.Automatic) { - // Cleanup non-automatic counters only - Register(new TDeferredCountersCleanupActor(RootCountersParent, PublicCountersParent, Params.QueryId)); - } + if (!Params.Automatic) { + // Cleanup non-automatic counters only + Register(new TDeferredCountersCleanupActor(RootCountersParent, PublicCountersParent, Params.QueryId)); + } KillChildrenActors(); NActors::TActorBootstrapped<TRunActor>::PassAway(); } - void Run() { + void Run() { if (!Params.DqGraphs.empty() && Params.Status != YandexQuery::QueryMeta::STARTING) { FillDqGraphParams(); } - switch (Params.Status) { - case YandexQuery::QueryMeta::ABORTING_BY_USER: - case YandexQuery::QueryMeta::ABORTING_BY_SYSTEM: + switch (Params.Status) { + case YandexQuery::QueryMeta::ABORTING_BY_USER: + case YandexQuery::QueryMeta::ABORTING_BY_SYSTEM: case YandexQuery::QueryMeta::FAILING: case YandexQuery::QueryMeta::COMPLETING: FinalizingStatusIsWritten = true; Finish(GetFinalStatusFromFinalizingStatus(Params.Status)); - break; - case YandexQuery::QueryMeta::STARTING: + break; + case YandexQuery::QueryMeta::STARTING: HandleConnections(); RunProgram(); - break; - case YandexQuery::QueryMeta::RESUMING: - case YandexQuery::QueryMeta::RUNNING: - ReRunQuery(); - break; - default: + break; + case YandexQuery::QueryMeta::RESUMING: + case YandexQuery::QueryMeta::RUNNING: + ReRunQuery(); + break; + default: Abort("Fail to start query from unexpected status " + YandexQuery::QueryMeta::ComputeStatus_Name(Params.Status), YandexQuery::QueryMeta::FAILED); - break; + break; } } void HandleConnections() { - LOG_D("HandleConnections"); - + LOG_D("HandleConnections"); + THashMap<std::pair<TString, DatabaseType>, TEvents::TDatabaseAuth> databaseIds; - for (const auto& connection : Params.Connections) { - if (!connection.content().name()) { + for (const auto& connection : Params.Connections) { + if (!connection.content().name()) { LOG_D("Connection with empty name " << connection.meta().id()); - continue; - } + continue; + } Connections[connection.content().name()] = connection; // Necessary for TDatabaseAsyncResolverWithMeta - YqConnections.emplace(connection.meta().id(), connection); - } + YqConnections.emplace(connection.meta().id(), connection); + } } - void RunProgram() { + void RunProgram() { LOG_D("RunProgram"); - if (!CompileQuery()) { + if (!CompileQuery()) { Abort("Failed to compile query", YandexQuery::QueryMeta::FAILED); - } - } - + } + } + void FailOnException() { Fail(CurrentExceptionMessage()); } void Fail(const TString& errorMessage) { - LOG_E("Fail for query " << Params.QueryId << ", finishing: " << Finishing << ", details: " << errorMessage); + LOG_E("Fail for query " << Params.QueryId << ", finishing: " << Finishing << ", details: " << errorMessage); if (YqConnections.empty()) { Issues.AddIssue("YqConnections array is empty"); @@ -329,25 +329,25 @@ private: } void Handle(TEvents::TEvQueryActionResult::TPtr& ev) { - Action = ev->Get()->Action; + Action = ev->Get()->Action; LOG_D("New query action received: " << YandexQuery::QueryAction_Name(Action)); - switch (Action) { - case YandexQuery::ABORT: + switch (Action) { + case YandexQuery::ABORT: case YandexQuery::ABORT_GRACEFULLY: // not fully implemented Abort("Aborted by user", YandexQuery::QueryMeta::ABORTED_BY_USER); - break; + break; case YandexQuery::PAUSE: // not implemented case YandexQuery::PAUSE_GRACEFULLY: // not implemented case YandexQuery::RESUME: // not implemented Abort(TStringBuilder() << "Unsupported query action: " << YandexQuery::QueryAction_Name(Action), YandexQuery::QueryMeta::FAILED); break; - default: + default: Abort(TStringBuilder() << "Unknown query action: " << YandexQuery::QueryAction_Name(Action), YandexQuery::QueryMeta::FAILED); - break; + break; } } - void CheckForConsumers() { + void CheckForConsumers() { struct TTopicIndependentConsumers { struct TTopicIndependentConsumer { TString ConsumerName; @@ -395,26 +395,26 @@ private: }; THashMap<TString, TTopicIndependentConsumers> topicToIndependentConsumers; - ui32 graphIndex = 0; - for (auto& graphParams : DqGraphParams) { + ui32 graphIndex = 0; + for (auto& graphParams : DqGraphParams) { LOG_D("Graph " << graphIndex); - graphIndex++; - const TString consumerNamePrefix = graphIndex == 1 ? Params.QueryId : TStringBuilder() << Params.QueryId << '-' << graphIndex; // Simple name in simple case - const auto& secureParams = graphParams.GetSecureParams(); - for (NYql::NDqProto::TDqTask& task : *graphParams.MutableTasks()) { - for (NYql::NDqProto::TTaskInput& taskInput : *task.MutableInputs()) { - if (taskInput.GetTypeCase() == NYql::NDqProto::TTaskInput::kSource && taskInput.GetSource().GetType() == "PqSource") { - google::protobuf::Any& settingsAny = *taskInput.MutableSource()->MutableSettings(); + graphIndex++; + const TString consumerNamePrefix = graphIndex == 1 ? Params.QueryId : TStringBuilder() << Params.QueryId << '-' << graphIndex; // Simple name in simple case + const auto& secureParams = graphParams.GetSecureParams(); + for (NYql::NDqProto::TDqTask& task : *graphParams.MutableTasks()) { + for (NYql::NDqProto::TTaskInput& taskInput : *task.MutableInputs()) { + if (taskInput.GetTypeCase() == NYql::NDqProto::TTaskInput::kSource && taskInput.GetSource().GetType() == "PqSource") { + google::protobuf::Any& settingsAny = *taskInput.MutableSource()->MutableSettings(); YQL_ENSURE(settingsAny.Is<NYql::NPq::NProto::TDqPqTopicSource>()); NYql::NPq::NProto::TDqPqTopicSource srcDesc; - YQL_ENSURE(settingsAny.UnpackTo(&srcDesc)); + YQL_ENSURE(settingsAny.UnpackTo(&srcDesc)); - if (!srcDesc.GetConsumerName()) { - const auto [consumerName, isNewConsumer] = - topicToIndependentConsumers[srcDesc.GetTopicPath()] + if (!srcDesc.GetConsumerName()) { + const auto [consumerName, isNewConsumer] = + topicToIndependentConsumers[srcDesc.GetTopicPath()] .AddPartitionsSet(NYql::NPq::GetTopicPartitionsSet(task.GetMeta()), consumerNamePrefix); - srcDesc.SetConsumerName(consumerName); - settingsAny.PackFrom(srcDesc); + srcDesc.SetConsumerName(consumerName); + settingsAny.PackFrom(srcDesc); if (isNewConsumer) { auto s = consumerName; LOG_D("Create consumer \"" << s << "\" for topic \"" << srcDesc.GetTopicPath() << "\""); @@ -428,7 +428,7 @@ private: } TopicsForConsumersCreation.emplace_back(std::move(srcDesc)); - } + } } } } @@ -443,20 +443,20 @@ private: return; } - if (ev->Cookie == SaveQueryInfoCookie) { - if (TopicsForConsumersCreation.size()) { - ReadRulesCreatorId = Register( - ::NYq::MakeReadRuleCreatorActor( - SelfId(), - Params.QueryId, + if (ev->Cookie == SaveQueryInfoCookie) { + if (TopicsForConsumersCreation.size()) { + ReadRulesCreatorId = Register( + ::NYq::MakeReadRuleCreatorActor( + SelfId(), + Params.QueryId, Params.Driver, - std::move(TopicsForConsumersCreation), - std::move(CredentialsForConsumersCreation) - ) - ); - } else { - RunDqGraphs(); - } + std::move(TopicsForConsumersCreation), + std::move(CredentialsForConsumersCreation) + ) + ); + } else { + RunDqGraphs(); + } } else if (ev->Cookie == SetLoadFromCheckpointModeCookie) { Send(CheckpointCoordinatorId, new TEvCheckpointCoordinator::TEvRunGraph()); } @@ -499,57 +499,57 @@ private: void Handle(NYq::TEvents::TEvGraphParams::TPtr& ev) { LOG_D("Graph params with tasks: " << ev->Get()->GraphParams.TasksSize()); DqGraphParams.push_back(ev->Get()->GraphParams); - } - + } + void Handle(TEvCheckpointCoordinator::TEvZeroCheckpointDone::TPtr&) { LOG_D("Coordinator saved zero checkpoint"); Y_VERIFY(CheckpointCoordinatorId); SetLoadFromCheckpointMode(); } - i32 UpdateResultIndices() { - i32 count = 0; - for (const auto& graphParams : DqGraphParams) { - DqGrapResultIndices.push_back(graphParams.GetResultType() ? count++ : -1); - } - return count; - } - - void PrepareGraphs() { + i32 UpdateResultIndices() { + i32 count = 0; + for (const auto& graphParams : DqGraphParams) { + DqGrapResultIndices.push_back(graphParams.GetResultType() ? count++ : -1); + } + return count; + } + + void PrepareGraphs() { if (AbortOnExceedingDqGraphsLimits()) { return; } Yq::Private::PingTaskRequest request; - - TStringStream exprOut; - TStringStream planOut; - Program->Print(&exprOut, &planOut); - const auto planStr = NJson2Yson::ConvertYson2Json(planOut.Str()); - request.set_ast(exprOut.Str()); - request.set_plan(planStr); - - request.set_result_set_count(UpdateResultIndices()); - QueryStateUpdateRequest.set_result_set_count(UpdateResultIndices()); - for (const auto& graphParams : DqGraphParams) { - if (graphParams.GetResultType()) { + + TStringStream exprOut; + TStringStream planOut; + Program->Print(&exprOut, &planOut); + const auto planStr = NJson2Yson::ConvertYson2Json(planOut.Str()); + request.set_ast(exprOut.Str()); + request.set_plan(planStr); + + request.set_result_set_count(UpdateResultIndices()); + QueryStateUpdateRequest.set_result_set_count(UpdateResultIndices()); + for (const auto& graphParams : DqGraphParams) { + if (graphParams.GetResultType()) { TProtoBuilder builder(graphParams.GetResultType(), {graphParams.GetColumns().begin(), graphParams.GetColumns().end()}); const auto emptyResultSet = builder.BuildResultSet({}); - auto* header = QueryStateUpdateRequest.add_result_set_meta(); + auto* header = QueryStateUpdateRequest.add_result_set_meta(); (*header->mutable_column()) = emptyResultSet.columns(); - } - } - *request.mutable_result_set_meta() = QueryStateUpdateRequest.result_set_meta(); - - CheckForConsumers(); - + } + } + *request.mutable_result_set_meta() = QueryStateUpdateRequest.result_set_meta(); + + CheckForConsumers(); + Params.CreatedTopicConsumers.clear(); Params.CreatedTopicConsumers.reserve(TopicsForConsumersCreation.size()); for (const NYql::NPq::NProto::TDqPqTopicSource& src : TopicsForConsumersCreation) { - auto& consumer = *request.add_created_topic_consumers(); - consumer.set_database_id(src.GetDatabaseId()); - consumer.set_database(src.GetDatabase()); - consumer.set_topic_path(src.GetTopicPath()); - consumer.set_consumer_name(src.GetConsumerName()); + auto& consumer = *request.add_created_topic_consumers(); + consumer.set_database_id(src.GetDatabaseId()); + consumer.set_database(src.GetDatabase()); + consumer.set_topic_path(src.GetTopicPath()); + consumer.set_consumer_name(src.GetConsumerName()); consumer.set_cluster_endpoint(src.GetEndpoint()); consumer.set_use_ssl(src.GetUseSsl()); consumer.set_token_name(src.GetToken().GetName()); @@ -557,15 +557,15 @@ private: // Save for deletion Params.CreatedTopicConsumers.push_back(consumer); - } - - for (const auto& graphParams : DqGraphParams) { - request.add_dq_graph(graphParams.SerializeAsString()); - } - - Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, SaveQueryInfoCookie); - } - + } + + for (const auto& graphParams : DqGraphParams) { + request.add_dq_graph(graphParams.SerializeAsString()); + } + + Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, SaveQueryInfoCookie); + } + void SetLoadFromCheckpointMode() { Yq::Private::PingTaskRequest request; request.set_state_load_mode(YandexQuery::FROM_LAST_CHECKPOINT); @@ -574,102 +574,102 @@ private: Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, SetLoadFromCheckpointModeCookie); } - TString BuildNormalizedStatistics(const NDqProto::TQueryResponse& response) { - - struct TStatisticsNode { - std::map<TString, TStatisticsNode> Children; - i64 Avg; - i64 Count; - i64 Min; - i64 Max; - i64 Sum; + TString BuildNormalizedStatistics(const NDqProto::TQueryResponse& response) { + + struct TStatisticsNode { + std::map<TString, TStatisticsNode> Children; + i64 Avg; + i64 Count; + i64 Min; + i64 Max; + i64 Sum; void Write(NYson::TYsonWriter& writer) { - writer.OnBeginMap(); - if (Children.empty()) { - writer.OnKeyedItem("sum"); - writer.OnInt64Scalar(Sum); - writer.OnKeyedItem("count"); - writer.OnInt64Scalar(Count); - writer.OnKeyedItem("avg"); - writer.OnInt64Scalar(Avg); - writer.OnKeyedItem("max"); - writer.OnInt64Scalar(Max); - writer.OnKeyedItem("min"); - writer.OnInt64Scalar(Min); - } else { - for (auto& [name, child]: Children) { - writer.OnKeyedItem(name); + writer.OnBeginMap(); + if (Children.empty()) { + writer.OnKeyedItem("sum"); + writer.OnInt64Scalar(Sum); + writer.OnKeyedItem("count"); + writer.OnInt64Scalar(Count); + writer.OnKeyedItem("avg"); + writer.OnInt64Scalar(Avg); + writer.OnKeyedItem("max"); + writer.OnInt64Scalar(Max); + writer.OnKeyedItem("min"); + writer.OnInt64Scalar(Min); + } else { + for (auto& [name, child]: Children) { + writer.OnKeyedItem(name); child.Write(writer); - } - } - writer.OnEndMap(); - } - }; - - TStringStream out; - - TStatisticsNode statistics; - for (const auto& metric : response.GetMetric()) { - auto longName = metric.GetName(); - TString prefix; - TString name; - std::map<TString, TString> labels; + } + } + writer.OnEndMap(); + } + }; + + TStringStream out; + + TStatisticsNode statistics; + for (const auto& metric : response.GetMetric()) { + auto longName = metric.GetName(); + TString prefix; + TString name; + std::map<TString, TString> labels; if (!NYql::NCommon::ParseCounterName(&prefix, &labels, &name, longName)) { - prefix = ""; - name = longName; - labels.clear(); - } - - TStatisticsNode* node = &statistics; - - if (prefix) { - node = &node->Children[prefix]; - } - - for (const auto& [k, v] : labels) { - node = &node->Children[k + "=" + v]; - } - - node = &node->Children[name]; - - node->Sum = metric.GetSum(); - node->Count = metric.GetCount(); - node->Avg = metric.GetAvg(); - node->Max = metric.GetMax(); - node->Min = metric.GetMin(); - } - - NYson::TYsonWriter writer(&out); + prefix = ""; + name = longName; + labels.clear(); + } + + TStatisticsNode* node = &statistics; + + if (prefix) { + node = &node->Children[prefix]; + } + + for (const auto& [k, v] : labels) { + node = &node->Children[k + "=" + v]; + } + + node = &node->Children[name]; + + node->Sum = metric.GetSum(); + node->Count = metric.GetCount(); + node->Avg = metric.GetAvg(); + node->Max = metric.GetMax(); + node->Min = metric.GetMin(); + } + + NYson::TYsonWriter writer(&out); statistics.Write(writer); - - return out.Str(); - } - + + return out.Str(); + } + void SaveStatistics(const NYql::NDqProto::TQueryResponse& result) { - // Yson routines are very strict, so it's better to try-catch them - try { - Statistics.push_back(BuildNormalizedStatistics(result)); - TStringStream out; - NYson::TYsonWriter writer(&out); - writer.OnBeginMap(); - ui32 graphIndex = 0; - for (const auto& s : Statistics) { - writer.OnKeyedItem("Graph=" + ToString(++graphIndex)); - writer.OnRaw(s); - } - writer.OnEndMap(); - QueryStateUpdateRequest.set_statistics(NJson2Yson::ConvertYson2Json(out.Str())); - } catch (NYson::TYsonException& ex) { - LOG_E(ex.what()); - } + // Yson routines are very strict, so it's better to try-catch them + try { + Statistics.push_back(BuildNormalizedStatistics(result)); + TStringStream out; + NYson::TYsonWriter writer(&out); + writer.OnBeginMap(); + ui32 graphIndex = 0; + for (const auto& s : Statistics) { + writer.OnKeyedItem("Graph=" + ToString(++graphIndex)); + writer.OnRaw(s); + } + writer.OnEndMap(); + QueryStateUpdateRequest.set_statistics(NJson2Yson::ConvertYson2Json(out.Str())); + } catch (NYson::TYsonException& ex) { + LOG_E(ex.what()); + } } - + void AddIssues(const google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage>& issuesProto) { TIssues issues; IssuesFromMessage(issuesProto, issues); Issues.AddIssues(issues); } - + void SaveQueryResponse(NYql::NDqs::TEvQueryResponse::TPtr& ev) { auto& result = ev->Get()->Record; LOG_D("Query response. Retryable: " << result.GetRetriable() @@ -712,15 +712,15 @@ private: Finish(GetFinishStatus(!failure)); return; - } + } // Continue with the next graph QueryStateUpdateRequest.set_dq_graph_index(++DqGraphIndex); RunNextDqGraph(); LOG_D("Send save query response request to pinger"); Send(Pinger, new TEvents::TEvForwardPingRequest(QueryStateUpdateRequest)); - } - + } + void HandleFinish(NYql::NDqs::TEvQueryResponse::TPtr& ev) { // In this case we can have race between normal finishing of running query and aborting it. // If query is finished with success error code or failure != abort, we override abort with this result. @@ -740,11 +740,11 @@ private: AddIssueWithSubIssues("Problems with read rules creation", ev->Get()->Issues); LOG_D(Issues.ToOneLineString()); Finish(YandexQuery::QueryMeta::FAILED); - } else { - RunDqGraphs(); - } - } - + } else { + RunDqGraphs(); + } + } + void HandleFinish(NYq::TEvents::TEvDataStreamsReadRulesCreationResult::TPtr& ev) { ReadRulesCreatorId = {}; if (ev->Get()->Issues) { @@ -791,7 +791,7 @@ private: Register( ::NYq::MakeReadRuleDeleterActor( SelfId(), - Params.QueryId, + Params.QueryId, Params.Driver, Params.CreatedTopicConsumers, std::move(credentials) @@ -799,7 +799,7 @@ private: ); } - void RunDqGraphs() { + void RunDqGraphs() { if (DqGraphParams.empty()) { *QueryStateUpdateRequest.mutable_started_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(CreatedAt.MilliSeconds()); QueryStateUpdateRequest.set_resign_query(false); @@ -808,81 +808,81 @@ private: return; } - { - Params.Status = YandexQuery::QueryMeta::RUNNING; + { + Params.Status = YandexQuery::QueryMeta::RUNNING; Yq::Private::PingTaskRequest request; - request.set_status(YandexQuery::QueryMeta::RUNNING); - *request.mutable_started_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(Now().MilliSeconds()); - Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, UpdateQueryInfoCookie); - } - -/* - { // Failure test -- keep it until integrational tests work + request.set_status(YandexQuery::QueryMeta::RUNNING); + *request.mutable_started_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(Now().MilliSeconds()); + Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, UpdateQueryInfoCookie); + } + +/* + { // Failure test -- keep it until integrational tests work Yq::Private::PingTaskRequest request; - request.set_status(YandexQuery::QueryMeta::RUNNING); - request.set_resign_query(true); - Send(Pinger, new TEvents::TEvForwardPingRequest(request, true), 0, UpdateQueryInfoCookie); - PassAway(); - return; - } -*/ - PrepareQueryCounters(); - RunNextDqGraph(); - } - - void PrepareQueryCounters() { + request.set_status(YandexQuery::QueryMeta::RUNNING); + request.set_resign_query(true); + Send(Pinger, new TEvents::TEvForwardPingRequest(request, true), 0, UpdateQueryInfoCookie); + PassAway(); + return; + } +*/ + PrepareQueryCounters(); + RunNextDqGraph(); + } + + void PrepareQueryCounters() { const TVector<TString> path = StringSplitter(Params.Scope.ToString()).Split('/').SkipEmpty(); // yandexcloud://{folder_id} const TString folderId = path.size() == 2 && path.front().StartsWith(NYdb::NYq::TScope::YandexCloudScopeSchema) ? path.back() : TString{}; + - - QueryCounters = ServiceCounters; + QueryCounters = ServiceCounters; PublicCountersParent = ServiceCounters.PublicCounters; - + if (Params.CloudId && folderId) { PublicCountersParent = PublicCountersParent->GetSubgroup("cloud_id", Params.CloudId)->GetSubgroup("folder_id", folderId); - } - QueryCounters.PublicCounters = PublicCountersParent->GetSubgroup("query_id", - Params.Automatic ? (Params.QueryName ? Params.QueryName : "automatic") : Params.QueryId); - - RootCountersParent = ServiceCounters.RootCounters; - QueryCounters.RootCounters = RootCountersParent->GetSubgroup("query_id", - Params.Automatic ? (folderId ? "automatic_" + folderId : "automatic") : Params.QueryId); - QueryCounters.Counters = QueryCounters.RootCounters; - } - - void RunNextDqGraph() { + } + QueryCounters.PublicCounters = PublicCountersParent->GetSubgroup("query_id", + Params.Automatic ? (Params.QueryName ? Params.QueryName : "automatic") : Params.QueryId); + + RootCountersParent = ServiceCounters.RootCounters; + QueryCounters.RootCounters = RootCountersParent->GetSubgroup("query_id", + Params.Automatic ? (folderId ? "automatic_" + folderId : "automatic") : Params.QueryId); + QueryCounters.Counters = QueryCounters.RootCounters; + } + + void RunNextDqGraph() { auto& dqGraphParams = DqGraphParams.at(DqGraphIndex); - TDqConfiguration::TPtr dqConfiguration = MakeIntrusive<TDqConfiguration>(); + TDqConfiguration::TPtr dqConfiguration = MakeIntrusive<TDqConfiguration>(); dqConfiguration->Dispatch(dqGraphParams.GetSettings()); - dqConfiguration->FreezeDefaults(); + dqConfiguration->FreezeDefaults(); dqConfiguration->FallbackPolicy = "never"; - + ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeYqlNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, ServiceCounters.Counters, TInstant::Now(), EnableCheckpointCoordinator)); - + NActors::TActorId resultId; - if (dqGraphParams.GetResultType()) { + if (dqGraphParams.GetResultType()) { TResultId writerResultId; - { - writerResultId.HistoryId = Params.QueryId; - writerResultId.Id = Params.ResultId; - writerResultId.Owner = Params.Owner; + { + writerResultId.HistoryId = Params.QueryId; + writerResultId.Id = Params.ResultId; + writerResultId.Owner = Params.Owner; writerResultId.SetId = DqGrapResultIndices.at(DqGraphIndex); - } - TVector<TString> columns; - for (const auto& column : dqGraphParams.GetColumns()) { - columns.emplace_back(column); - } - resultId = NActors::TActivationContext::Register( + } + TVector<TString> columns; + for (const auto& column : dqGraphParams.GetColumns()) { + columns.emplace_back(column); + } + resultId = NActors::TActivationContext::Register( CreateResultWriter( Params.Driver, ExecuterId, dqGraphParams.GetResultType(), Params.PrivateApiConfig, writerResultId, columns, dqGraphParams.GetSession(), Params.Deadline, Params.ClientCounters)); - } else { - LOG_D("ResultWriter was NOT CREATED since ResultType is empty"); - resultId = ExecuterId; - } - - ControlId = NActors::TActivationContext::Register(NYql::MakeTaskController(SessionId, ExecuterId, resultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release()); + } else { + LOG_D("ResultWriter was NOT CREATED since ResultType is empty"); + resultId = ExecuterId; + } + + ControlId = NActors::TActivationContext::Register(NYql::MakeTaskController(SessionId, ExecuterId, resultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release()); if (EnableCheckpointCoordinator) { CheckpointCoordinatorId = NActors::TActivationContext::Register(MakeCheckpointCoordinator( ::NYq::TCoordinatorId(Params.QueryId + "-" + ToString(DqGraphIndex), Params.PreviousQueryRevision), @@ -894,20 +894,20 @@ private: dqGraphParams, Params.StateLoadMode, Params.StreamingDisposition).Release()); - } - - Yql::DqsProto::ExecuteGraphRequest request; - request.SetSourceId(dqGraphParams.GetSourceId()); - request.SetResultType(dqGraphParams.GetResultType()); - request.SetSession(dqGraphParams.GetSession()); - *request.MutableSettings() = dqGraphParams.GetSettings(); - *request.MutableSecureParams() = dqGraphParams.GetSecureParams(); - *request.MutableColumns() = dqGraphParams.GetColumns(); + } + + Yql::DqsProto::ExecuteGraphRequest request; + request.SetSourceId(dqGraphParams.GetSourceId()); + request.SetResultType(dqGraphParams.GetResultType()); + request.SetSession(dqGraphParams.GetSession()); + *request.MutableSettings() = dqGraphParams.GetSettings(); + *request.MutableSecureParams() = dqGraphParams.GetSecureParams(); + *request.MutableColumns() = dqGraphParams.GetColumns(); NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram()); NActors::TActivationContext::Send(new IEventHandle(ExecuterId, SelfId(), new NYql::NDqs::TEvGraphRequest(request, ControlId, resultId, CheckpointCoordinatorId))); LOG_D("Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId << ", CheckPointCoordinatior " << CheckpointCoordinatorId); - } - + } + void SetupDqSettings(::google::protobuf::RepeatedPtrField< ::NYql::TAttr>& dqSettings) const { auto attr = dqSettings.Add(); attr->SetName("MaxTasksPerStage"); @@ -924,7 +924,7 @@ private: // Streaming queries: // - turn off timeout; // - turn on check that query has one graph. - if (Params.QueryType == YandexQuery::QueryContent::STREAMING) { + if (Params.QueryType == YandexQuery::QueryContent::STREAMING) { attr = dqSettings.Add(); attr->SetName("_TableTimeout"); attr->SetValue("0"); @@ -977,7 +977,7 @@ private: case YandexQuery::QueryAction_INT_MAX_SENTINEL_DO_NOT_USE_: return YandexQuery::QueryMeta::FAILED; } - } + } YandexQuery::QueryMeta::ComputeStatus GetFinalizingStatus() { // Status before final. "*ING" one. switch (FinalQueryStatus) { @@ -1043,23 +1043,23 @@ private: QueryStateUpdateRequest.set_status(FinalQueryStatus); // Can be changed later. *QueryStateUpdateRequest.mutable_finished_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(TInstant::Now().MilliSeconds()); Become(&TRunActor::StateFuncWrapper<&TRunActor::FinishStateFunc>); - + if (!FinalizingStatusIsWritten) { WriteFinalizingStatus(); } - + CancelRunningQuery(); ContinueFinish(); } - + void ContinueFinish() { if (NeedDeleteReadRules() && !ConsumersAreDeleted) { if (CanRunReadRulesDeletionActor()) { RunReadRulesDeletionActor(); - } + } return; } - + SendPingAndPassAway(); } @@ -1080,36 +1080,36 @@ private: Send(Pinger, new TEvents::TEvForwardPingRequest(QueryStateUpdateRequest, true)); - PassAway(); - } - + PassAway(); + } + void Abort(const TString& message, YandexQuery::QueryMeta::ComputeStatus status, const NYql::TIssues& issues = {}) { AddIssueWithSubIssues(message, issues); Finish(status); } void FillDqGraphParams() { - for (const auto& s : Params.DqGraphs) { - NYq::NProto::TGraphParams dqGraphParams; + for (const auto& s : Params.DqGraphs) { + NYq::NProto::TGraphParams dqGraphParams; Y_VERIFY(dqGraphParams.ParseFromString(s)); - DqGraphParams.emplace_back(std::move(dqGraphParams)); - } + DqGraphParams.emplace_back(std::move(dqGraphParams)); + } } void ReRunQuery() { if (AbortOnExceedingDqGraphsLimits()) { return; } - for (const auto& m : Params.ResultSetMetas) { - *QueryStateUpdateRequest.add_result_set_meta() = m; - } - DqGraphIndex = Params.DqGraphIndex; - UpdateResultIndices(); - PrepareQueryCounters(); - RunNextDqGraph(); - } - - bool CompileQuery() { + for (const auto& m : Params.ResultSetMetas) { + *QueryStateUpdateRequest.add_result_set_meta() = m; + } + DqGraphIndex = Params.DqGraphIndex; + UpdateResultIndices(); + PrepareQueryCounters(); + RunNextDqGraph(); + } + + bool CompileQuery() { LOG_D("Compiling query ..."); NYql::TGatewaysConfig gatewaysConfig; SetupDqSettings(*gatewaysConfig.MutableDq()->MutableDefaultSettings()); @@ -1132,10 +1132,10 @@ private: const auto dbResolver = std::make_shared<TDatabaseAsyncResolverWithMeta>(TDatabaseAsyncResolverWithMeta(NActors::TActivationContext::ActorSystem(), Params.DatabaseResolver, Params.CommonConfig.GetYdbMvpCloudEndpoint(), Params.CommonConfig.GetMdbGateway(), Params.CommonConfig.GetMdbTransformHost(), Params.QueryId, Params.AuthToken, Params.AccountIdSignatures, Connections)); { - // TBD: move init to better place - QueryStateUpdateRequest.set_scope(Params.Scope.ToString()); - QueryStateUpdateRequest.mutable_query_id()->set_value(Params.QueryId); - QueryStateUpdateRequest.set_owner_id(Params.Owner); + // TBD: move init to better place + QueryStateUpdateRequest.set_scope(Params.Scope.ToString()); + QueryStateUpdateRequest.mutable_query_id()->set_value(Params.QueryId); + QueryStateUpdateRequest.set_owner_id(Params.Owner); dataProvidersInit.push_back(GetDqDataProviderInitializer(&CreateInMemoryExecTransformer, NYq::CreateEmptyGateway(SelfId()), Params.DqCompFactory, {}, nullptr)); } @@ -1174,14 +1174,14 @@ private: progFactory.SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(Params.FunctionRegistry, nullptr)); progFactory.SetGatewaysConfig(&gatewaysConfig); - SessionId = TStringBuilder() - << Params.QueryId << '#' - << Params.ResultId << '#' + SessionId = TStringBuilder() + << Params.QueryId << '#' + << Params.ResultId << '#' << Params.Scope.ToString() << '#' << Params.Owner << '#' << Params.CloudId; - Program = progFactory.Create("-stdin-", Params.Sql, SessionId); + Program = progFactory.Create("-stdin-", Params.Sql, SessionId); Program->EnableResultPosition(); NSQLTranslation::TTranslationSettings sqlSettings; @@ -1238,7 +1238,7 @@ private: return false; } - futureStatus.Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) { + futureStatus.Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) { actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f)); }); return true; @@ -1246,11 +1246,11 @@ private: void Handle(TEvents::TEvAsyncContinue::TPtr& ev, const TActorContext& ctx) { LOG_D("Compiling finished"); - NYql::TProgram::TStatus status = TProgram::TStatus::Error; - + NYql::TProgram::TStatus status = TProgram::TStatus::Error; + const auto& f = ev->Get()->Future; try { - status = f.GetValue(); + status = f.GetValue(); if (status == TProgram::TStatus::Async) { auto futureStatus = Program->ContinueAsync(); auto actorSystem = ctx.ActorSystem(); @@ -1260,13 +1260,13 @@ private: }); return; } - } catch (const std::exception& err) { - Issues.AddIssue(ExceptionToIssue(err)); - } + } catch (const std::exception& err) { + Issues.AddIssue(ExceptionToIssue(err)); + } - if (status == TProgram::TStatus::Ok || (DqGraphParams.size() > 0 && !DqGraphParams[0].GetResultType())) { - PrepareGraphs(); - } else { + if (status == TProgram::TStatus::Ok || (DqGraphParams.size() > 0 && !DqGraphParams[0].GetResultType())) { + PrepareGraphs(); + } else { Abort(TStringBuilder() << "Run query failed: " << ToString(status), YandexQuery::QueryMeta::FAILED, Program->Issues()); } } @@ -1327,19 +1327,19 @@ private: TActorId Pinger; TInstant CreatedAt; YandexQuery::QueryAction Action = YandexQuery::QueryAction::QUERY_ACTION_UNSPECIFIED; - std::vector<NYq::NProto::TGraphParams> DqGraphParams; - std::vector<i32> DqGrapResultIndices; - i32 DqGraphIndex = 0; - NMonitoring::TDynamicCounterPtr RootCountersParent; - NMonitoring::TDynamicCounterPtr PublicCountersParent; - NActors::TActorId ExecuterId; - NActors::TActorId ControlId; + std::vector<NYq::NProto::TGraphParams> DqGraphParams; + std::vector<i32> DqGrapResultIndices; + i32 DqGraphIndex = 0; + NMonitoring::TDynamicCounterPtr RootCountersParent; + NMonitoring::TDynamicCounterPtr PublicCountersParent; + NActors::TActorId ExecuterId; + NActors::TActorId ControlId; NActors::TActorId CheckpointCoordinatorId; - TString SessionId; - ::NYq::NCommon::TServiceCounters ServiceCounters; - ::NYq::NCommon::TServiceCounters QueryCounters; + TString SessionId; + ::NYq::NCommon::TServiceCounters ServiceCounters; + ::NYq::NCommon::TServiceCounters QueryCounters; bool EnableCheckpointCoordinator = false; - bool RetryNeeded = false; + bool RetryNeeded = false; Yq::Private::PingTaskRequest QueryStateUpdateRequest; THashMap<TString, YandexQuery::Connection> Connections; // Necessary for DbAsyncResolver @@ -1348,7 +1348,7 @@ private: // Consumers creation TVector<NYql::NPq::NProto::TDqPqTopicSource> TopicsForConsumersCreation; TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> CredentialsForConsumersCreation; - TVector<TString> Statistics; + TVector<TString> Statistics; NActors::TActorId ReadRulesCreatorId; // Finish @@ -1360,7 +1360,7 @@ private: // Cookies for pings enum : ui64 { - SaveQueryInfoCookie = 1, + SaveQueryInfoCookie = 1, UpdateQueryInfoCookie, SaveFinalizingStatusCookie, SetLoadFromCheckpointModeCookie, @@ -1368,10 +1368,10 @@ private: }; -IActor* CreateRunActor( - const ::NYq::NCommon::TServiceCounters& serviceCounters, +IActor* CreateRunActor( + const ::NYq::NCommon::TServiceCounters& serviceCounters, TRunActorParams&& params -) { +) { return new TRunActor(serviceCounters, std::move(params)); } diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp index 1446cae1bb..0a56a7d6fa 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.cpp +++ b/ydb/core/yq/libs/actors/run_actor_params.cpp @@ -22,7 +22,7 @@ TRunActorParams::TRunActorParams( const TScope& scope, const TString& authToken, const TActorId& databaseResolver, - const TString& queryId, + const TString& queryId, const TString& userId, const TString& owner, const int64_t previousQueryRevision, @@ -33,12 +33,12 @@ TRunActorParams::TRunActorParams( YandexQuery::QueryContent::QueryType queryType, YandexQuery::ExecuteMode executeMode, const TString& resultId, - const YandexQuery::StateLoadMode stateLoadMode, + const YandexQuery::StateLoadMode stateLoadMode, const YandexQuery::StreamingDisposition& streamingDisposition, YandexQuery::QueryMeta::ComputeStatus status, - const TString& cloudId, - TVector<YandexQuery::ResultSetMeta> resultSetMetas, - TVector<TString> dqGraphs, + const TString& cloudId, + TVector<YandexQuery::ResultSetMeta> resultSetMetas, + TVector<TString> dqGraphs, int32_t dqGraphIndex, TVector<Yq::Private::TopicConsumer> createdTopicConsumers, bool automatic, @@ -63,7 +63,7 @@ TRunActorParams::TRunActorParams( , Scope(scope) , AuthToken(authToken) , DatabaseResolver(databaseResolver) - , QueryId(queryId) + , QueryId(queryId) , UserId(userId) , Owner(owner) , PreviousQueryRevision(previousQueryRevision) @@ -76,11 +76,11 @@ TRunActorParams::TRunActorParams( , ResultId(resultId) , StateLoadMode(stateLoadMode) , StreamingDisposition(streamingDisposition) - , Status(status) + , Status(status) , CloudId(cloudId) - , ResultSetMetas(std::move(resultSetMetas)) - , DqGraphs(std::move(dqGraphs)) - , DqGraphIndex(dqGraphIndex) + , ResultSetMetas(std::move(resultSetMetas)) + , DqGraphs(std::move(dqGraphs)) + , DqGraphIndex(dqGraphIndex) , CreatedTopicConsumers(std::move(createdTopicConsumers)) , Automatic(automatic) , QueryName(queryName) diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h index bad5c3c32b..3f5015eef2 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.h +++ b/ydb/core/yq/libs/actors/run_actor_params.h @@ -36,7 +36,7 @@ struct TRunActorParams { // TODO2 : Change name const TScope& scope, const TString& authToken, const NActors::TActorId& databaseResolver, - const TString& queryId, + const TString& queryId, const TString& userId, const TString& owner, const int64_t previousQueryRevision, @@ -47,11 +47,11 @@ struct TRunActorParams { // TODO2 : Change name YandexQuery::QueryContent::QueryType queryType, YandexQuery::ExecuteMode executeMode, const TString& resultId, - const YandexQuery::StateLoadMode stateLoadMode, + const YandexQuery::StateLoadMode stateLoadMode, const YandexQuery::StreamingDisposition& streamingDisposition, YandexQuery::QueryMeta::ComputeStatus status, - const TString& cloudId, - TVector<YandexQuery::ResultSetMeta> resultSetMetas, + const TString& cloudId, + TVector<YandexQuery::ResultSetMeta> resultSetMetas, TVector<TString> dqGraphs, int32_t dqGraphIndex, TVector<Yq::Private::TopicConsumer> createdTopicConsumers, @@ -82,7 +82,7 @@ struct TRunActorParams { // TODO2 : Change name const TScope Scope; const TString AuthToken; const NActors::TActorId DatabaseResolver; - const TString QueryId; + const TString QueryId; const TString UserId; const TString Owner; const int64_t PreviousQueryRevision; @@ -97,9 +97,9 @@ struct TRunActorParams { // TODO2 : Change name const YandexQuery::StreamingDisposition StreamingDisposition; YandexQuery::QueryMeta::ComputeStatus Status; const TString CloudId; - const TVector<YandexQuery::ResultSetMeta> ResultSetMetas; - const TVector<TString> DqGraphs; - const int32_t DqGraphIndex; + const TVector<YandexQuery::ResultSetMeta> ResultSetMetas; + const TVector<TString> DqGraphs; + const int32_t DqGraphIndex; TVector<Yq::Private::TopicConsumer> CreatedTopicConsumers; bool Automatic = false; diff --git a/ydb/core/yq/libs/actors/task_ping.cpp b/ydb/core/yq/libs/actors/task_ping.cpp index 29410dccd1..1284b8d56f 100644 --- a/ydb/core/yq/libs/actors/task_ping.cpp +++ b/ydb/core/yq/libs/actors/task_ping.cpp @@ -120,14 +120,14 @@ private: Issues.AddIssues(reqIssues); event->Issues = Issues; } - if (!req.transient_issues().empty()) { - NYql::TIssues transientIssues; + if (!req.transient_issues().empty()) { + NYql::TIssues transientIssues; for (const auto& issue : req.transient_issues()) { transientIssuesByteSize += issue.ByteSize(); } - NYql::IssuesFromMessage(req.transient_issues(), transientIssues); - event->TransientIssues = transientIssues; - } + NYql::IssuesFromMessage(req.transient_issues(), transientIssues); + event->TransientIssues = transientIssues; + } if (req.statistics()) { event->Statistics = req.statistics(); } @@ -144,7 +144,7 @@ private: for (const auto& rsMeta : req.result_set_meta()) { resultSetMetaByteSize += rsMeta.ByteSize(); } - event->ResultSetMetas = {req.result_set_meta().begin(), req.result_set_meta().end()}; + event->ResultSetMetas = {req.result_set_meta().begin(), req.result_set_meta().end()}; } if (req.has_started_at()) { event->StartedAt = TInstant::FromValue(google::protobuf::util::TimeUtil::TimestampToMicroseconds(req.started_at())); @@ -152,7 +152,7 @@ private: if (req.has_finished_at()) { event->FinishedAt = TInstant::FromValue(google::protobuf::util::TimeUtil::TimestampToMicroseconds(req.finished_at())); } - event->ResignQuery = req.resign_query(); + event->ResignQuery = req.resign_query(); event->CreatedTopicConsumers.reserve(req.created_topic_consumers_size()); for (const auto& topicConsumerProto : req.created_topic_consumers()) { @@ -166,13 +166,13 @@ private: topicConsumer.TokenName = topicConsumerProto.token_name(); topicConsumer.AddBearerToToken = topicConsumerProto.add_bearer_to_token(); } - - event->DqGraphs.reserve(req.dq_graph_size()); - for (const auto& g : req.dq_graph()) { + + event->DqGraphs.reserve(req.dq_graph_size()); + for (const auto& g : req.dq_graph()) { dqGraphBytesSize += g.size(); - event->DqGraphs.emplace_back(g); - } - + event->DqGraphs.emplace_back(g); + } + if (req.state_load_mode()) { event->StateLoadMode = req.state_load_mode(); } @@ -191,8 +191,8 @@ private: << "Issues size: " << issuesByteSize << " bytes, " << "Transient issues size: " << transientIssuesByteSize << " bytes"); - event->DqGraphIndex = req.dq_graph_index(); - + event->DqGraphIndex = req.dq_graph_index(); + return std::move(event); } diff --git a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp index 7799b52daa..06dd9d3667 100644 --- a/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp +++ b/ydb/core/yq/libs/checkpoint_storage/ydb_checkpoint_storage.cpp @@ -725,7 +725,7 @@ TFuture<TIssues> TCheckpointStorage::RegisterGraphCoordinator(const TCoordinator coordinator.GraphId, coordinator.Generation); - return RegisterCheckGeneration(context); + return RegisterCheckGeneration(context); }); return StatusToIssues(future); diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp index d9d354c2e5..6d15f3da76 100644 --- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp +++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.cpp @@ -1,5 +1,5 @@ -#include "utils.h" - +#include "utils.h" + #include "checkpoint_coordinator.h" #include <ydb/core/yq/libs/actors/logging/log.h> @@ -28,7 +28,7 @@ namespace NYq { TCheckpointCoordinator::TCheckpointCoordinator(TCoordinatorId coordinatorId, const TActorId& taskControllerId, - const TActorId& storageProxy, + const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const NMonitoring::TDynamicCounterPtr& counters, @@ -51,22 +51,22 @@ TCheckpointCoordinator::TCheckpointCoordinator(TCoordinatorId coordinatorId, void TCheckpointCoordinator::Bootstrap() { Become(&TThis::DispatchEvent); CC_LOG_D("Bootstrapped with streaming disposition " << StreamingDisposition << " and state load mode " << YandexQuery::StateLoadMode_Name(StateLoadMode)); -} - -void TCheckpointCoordinator::Handle(const NYql::NDqs::TEvReadyState::TPtr& ev) { - const auto& tasks = ev->Get()->Record.GetTask(); - const auto& actorIds = ev->Get()->Record.GetActorId(); - Y_VERIFY(tasks.size() == actorIds.size()); - - for (int i = 0; i < static_cast<int>(tasks.size()); ++i) { - auto& task = tasks[i]; +} + +void TCheckpointCoordinator::Handle(const NYql::NDqs::TEvReadyState::TPtr& ev) { + const auto& tasks = ev->Get()->Record.GetTask(); + const auto& actorIds = ev->Get()->Record.GetActorId(); + Y_VERIFY(tasks.size() == actorIds.size()); + + for (int i = 0; i < static_cast<int>(tasks.size()); ++i) { + auto& task = tasks[i]; auto& actorId = TaskIdToActor[task.GetId()]; if (actorId) { Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::InternalError(TStringBuilder() << "Duplicate task id: " << task.GetId())); return; } actorId = ActorIdFromProto(actorIds[i]); - + TComputeActorTransportStuff::TPtr transport = AllActors[actorId] = MakeIntrusive<TComputeActorTransportStuff>(); transport->EventsQueue.Init(CoordinatorId.ToString(), SelfId(), SelfId(), task.GetId()); transport->EventsQueue.OnNewRecipientId(actorId); @@ -84,10 +84,10 @@ void TCheckpointCoordinator::Handle(const NYql::NDqs::TEvReadyState::TPtr& ev) { ActorsToWaitFor[actorId] = transport; ActorsToWaitForSet.insert(actorId); } - } + } AllActorsSet.insert(actorId); - } - + } + PendingInit = std::make_unique<TPendingInitCoordinator>(AllActors.size()); CC_LOG_D("Send TEvRegisterCoordinatorRequest"); @@ -555,17 +555,17 @@ void TCheckpointCoordinator::Handle(NActors::TEvInterconnect::TEvNodeConnected:: } } -void TCheckpointCoordinator::Handle(NActors::TEvents::TEvPoison::TPtr& ev) { +void TCheckpointCoordinator::Handle(NActors::TEvents::TEvPoison::TPtr& ev) { CC_LOG_D("Got TEvPoison"); - Send(ev->Sender, new NActors::TEvents::TEvPoisonTaken(), 0, ev->Cookie); - PassAway(); -} - + Send(ev->Sender, new NActors::TEvents::TEvPoisonTaken(), 0, ev->Cookie); + PassAway(); +} + void TCheckpointCoordinator::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { TStringStream message; message << "Got TEvUndelivered; reason: " << ev->Get()->Reason << ", sourceType: " << ev->Get()->SourceType; CC_LOG_D(message.Str()); - Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::Unavailable(message.Str())); + Send(TaskControllerId, NYql::NDq::TEvDq::TEvAbortExecution::Unavailable(message.Str())); PassAway(); } @@ -583,6 +583,6 @@ void TCheckpointCoordinator::PassAway() { THolder<NActors::IActor> MakeCheckpointCoordinator(TCoordinatorId coordinatorId, const TActorId& taskControllerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode, const YandexQuery::StreamingDisposition& streamingDisposition) { return MakeHolder<TCheckpointCoordinator>(coordinatorId, taskControllerId, storageProxy, runActorId, settings, counters, graphParams, stateLoadMode, streamingDisposition); -} - +} + } // namespace NYq diff --git a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h index 0bb76685e1..ccf4c006f8 100644 --- a/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h +++ b/ydb/core/yq/libs/checkpointing/checkpoint_coordinator.h @@ -25,7 +25,7 @@ class TCheckpointCoordinator : public TActorBootstrapped<TCheckpointCoordinator> public: TCheckpointCoordinator(TCoordinatorId coordinatorId, const TActorId& taskControllerId, - const TActorId& storageProxy, + const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const NMonitoring::TDynamicCounterPtr& counters, @@ -33,7 +33,7 @@ public: const YandexQuery::StateLoadMode& stateLoadMode, const YandexQuery::StreamingDisposition& streamingDisposition); - void Handle(const NYql::NDqs::TEvReadyState::TPtr&); + void Handle(const NYql::NDqs::TEvReadyState::TPtr&); void Handle(const TEvCheckpointStorage::TEvRegisterCoordinatorResponse::TPtr&); void Handle(const NYql::NDq::TEvDqCompute::TEvNewCheckpointCoordinatorAck::TPtr&); void Handle(const TEvCheckpointStorage::TEvGetCheckpointsMetadataResponse::TPtr&); @@ -46,7 +46,7 @@ public: void Handle(const TEvCheckpointStorage::TEvCompleteCheckpointResponse::TPtr&); void Handle(const TEvCheckpointStorage::TEvAbortCheckpointResponse::TPtr&); void Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvRetry::TPtr& ev); - void Handle(NActors::TEvents::TEvPoison::TPtr&); + void Handle(NActors::TEvents::TEvPoison::TPtr&); void Handle(NActors::TEvents::TEvUndelivered::TPtr&); void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr& ev); void Handle(NActors::TEvInterconnect::TEvNodeConnected::TPtr& ev); @@ -148,7 +148,7 @@ private: const TActorId StorageProxy; const TActorId RunActorId; std::unique_ptr<TCheckpointIdGenerator> CheckpointIdGenerator; - TCheckpointCoordinatorConfig Settings; + TCheckpointCoordinatorConfig Settings; const TDuration CheckpointingPeriod; const NProto::TGraphParams GraphParams; TString GraphDescId; @@ -176,5 +176,5 @@ private: }; THolder<NActors::IActor> MakeCheckpointCoordinator(TCoordinatorId coordinatorId, const TActorId& executerId, const TActorId& storageProxy, const TActorId& runActorId, const TCheckpointCoordinatorConfig& settings, const NMonitoring::TDynamicCounterPtr& counters, const NProto::TGraphParams& graphParams, const YandexQuery::StateLoadMode& stateLoadMode = YandexQuery::StateLoadMode::FROM_LAST_CHECKPOINT, const YandexQuery::StreamingDisposition& streamingDisposition = {}); - + } // namespace NYq diff --git a/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp b/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp index ceb0a9faf5..c60a31d9d2 100644 --- a/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp +++ b/ydb/core/yq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp @@ -17,10 +17,10 @@ enum ETestGraphFlags : ui64 { SourceWithChannelInOneTask = 2, }; -NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) { - - NYql::NDqProto::TReadyState result; +NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) { + NYql::NDqProto::TReadyState result; + auto* ingress = result.AddTask(); ingress->SetId(1); auto* ingressOutput = ingress->AddOutputs(); @@ -50,7 +50,7 @@ NYql::NDqProto::TReadyState BuildTestGraph(ui64 flags = 0) { } struct TTestBootstrap : public TTestActorRuntime { - NYql::NDqProto::TReadyState GraphState; + NYql::NDqProto::TReadyState GraphState; NConfig::TCheckpointCoordinatorConfig Settings; NActors::TActorId StorageProxy; NActors::TActorId CheckpointCoordinator; @@ -66,7 +66,7 @@ struct TTestBootstrap : public TTestActorRuntime { explicit TTestBootstrap(ui64 graphFlags = 0) : TTestActorRuntime(true) - , GraphState(BuildTestGraph(graphFlags)) + , GraphState(BuildTestGraph(graphFlags)) { TAutoPtr<TAppPrepare> app = new TAppPrepare(); Initialize(app->Unwrap()); @@ -76,13 +76,13 @@ struct TTestBootstrap : public TTestActorRuntime { MapActor = AllocateEdgeActor(); EgressActor = AllocateEdgeActor(); - ActorIdToProto(IngressActor, GraphState.AddActorId()); - ActorIdToProto(MapActor, GraphState.AddActorId()); - ActorIdToProto(EgressActor, GraphState.AddActorId()); + ActorIdToProto(IngressActor, GraphState.AddActorId()); + ActorIdToProto(MapActor, GraphState.AddActorId()); + ActorIdToProto(EgressActor, GraphState.AddActorId()); - ActorToTask[IngressActor] = GraphState.GetTask()[0].GetId(); - ActorToTask[MapActor] = GraphState.GetTask()[1].GetId(); - ActorToTask[EgressActor] = GraphState.GetTask()[2].GetId(); + ActorToTask[IngressActor] = GraphState.GetTask()[0].GetId(); + ActorToTask[MapActor] = GraphState.GetTask()[1].GetId(); + ActorToTask[EgressActor] = GraphState.GetTask()[2].GetId(); Settings = NConfig::TCheckpointCoordinatorConfig(); Settings.SetEnabled(true); @@ -92,17 +92,17 @@ struct TTestBootstrap : public TTestActorRuntime { SetLogPriority(NKikimrServices::STREAMS_CHECKPOINT_COORDINATOR, NLog::PRI_DEBUG); CheckpointCoordinator = Register(MakeCheckpointCoordinator(TCoordinatorId("my-graph-id", 42), {}, StorageProxy, RunActor, Settings, Counters, NProto::TGraphParams()).Release()); - WaitForBootstrap(); - Send(new IEventHandle(CheckpointCoordinator, {}, new NYql::NDqs::TEvReadyState(std::move(GraphState)))); - + WaitForBootstrap(); + Send(new IEventHandle(CheckpointCoordinator, {}, new NYql::NDqs::TEvReadyState(std::move(GraphState)))); + EnableScheduleForActor(CheckpointCoordinator); } - - void WaitForBootstrap() { - NActors::TDispatchOptions options; - options.FinalEvents.emplace_back(NActors::TEvents::TSystem::Bootstrap, 1); - DispatchEvents(options); - } + + void WaitForBootstrap() { + NActors::TDispatchOptions options; + options.FinalEvents.emplace_back(NActors::TEvents::TSystem::Bootstrap, 1); + DispatchEvents(options); + } }; } // namespace diff --git a/ydb/core/yq/libs/common/service_counters.h b/ydb/core/yq/libs/common/service_counters.h index 594cbf04fc..db6b8d570a 100644 --- a/ydb/core/yq/libs/common/service_counters.h +++ b/ydb/core/yq/libs/common/service_counters.h @@ -1,39 +1,39 @@ -#pragma once - -#include <util/generic/string.h> -#include <library/cpp/monlib/dynamic_counters/counters.h> - -namespace NYq { -namespace NCommon { - -struct TServiceCounters { - NMonitoring::TDynamicCounterPtr RootCounters; // "counters/counters=yq" - root counters for service metrics - NMonitoring::TDynamicCounterPtr PublicCounters; // "counters/counters=public" - root counters for cloud user metrics - NMonitoring::TDynamicCounterPtr Counters; // "counters/counters=yq/subsystem=smth" - subsystem part, may match to RootCounters if subsystem name is empty - - explicit TServiceCounters(const NMonitoring::TDynamicCounterPtr& rootCounters, const NMonitoring::TDynamicCounterPtr& publicCounters, const TString& subsystemName = "") - : RootCounters(rootCounters) - , PublicCounters(publicCounters) - , Counters(subsystemName ? RootCounters->GetSubgroup("subsystem", subsystemName) : RootCounters) - { - } - - explicit TServiceCounters(const NMonitoring::TDynamicCounterPtr& baseCounters, const TString& subsystemName = "") - : RootCounters(baseCounters->GetSubgroup("counters", "yq")) - , PublicCounters(baseCounters->GetSubgroup("counters", "yq_public")) - , Counters(subsystemName ? RootCounters->GetSubgroup("subsystem", subsystemName) : RootCounters) - { - } - - explicit TServiceCounters(const TServiceCounters& serviceCounters, const TString& subsystemName = "") - : RootCounters(serviceCounters.RootCounters) - , PublicCounters(serviceCounters.PublicCounters) - , Counters(subsystemName ? RootCounters->GetSubgroup("subsystem", subsystemName) : serviceCounters.Counters) - { - } +#pragma once + +#include <util/generic/string.h> +#include <library/cpp/monlib/dynamic_counters/counters.h> + +namespace NYq { +namespace NCommon { + +struct TServiceCounters { + NMonitoring::TDynamicCounterPtr RootCounters; // "counters/counters=yq" - root counters for service metrics + NMonitoring::TDynamicCounterPtr PublicCounters; // "counters/counters=public" - root counters for cloud user metrics + NMonitoring::TDynamicCounterPtr Counters; // "counters/counters=yq/subsystem=smth" - subsystem part, may match to RootCounters if subsystem name is empty + + explicit TServiceCounters(const NMonitoring::TDynamicCounterPtr& rootCounters, const NMonitoring::TDynamicCounterPtr& publicCounters, const TString& subsystemName = "") + : RootCounters(rootCounters) + , PublicCounters(publicCounters) + , Counters(subsystemName ? RootCounters->GetSubgroup("subsystem", subsystemName) : RootCounters) + { + } + + explicit TServiceCounters(const NMonitoring::TDynamicCounterPtr& baseCounters, const TString& subsystemName = "") + : RootCounters(baseCounters->GetSubgroup("counters", "yq")) + , PublicCounters(baseCounters->GetSubgroup("counters", "yq_public")) + , Counters(subsystemName ? RootCounters->GetSubgroup("subsystem", subsystemName) : RootCounters) + { + } + + explicit TServiceCounters(const TServiceCounters& serviceCounters, const TString& subsystemName = "") + : RootCounters(serviceCounters.RootCounters) + , PublicCounters(serviceCounters.PublicCounters) + , Counters(subsystemName ? RootCounters->GetSubgroup("subsystem", subsystemName) : serviceCounters.Counters) + { + } TServiceCounters& operator=(const TServiceCounters& serviceCounters) = default; -}; - -} // namespace NCommon -} // namespace NYq +}; + +} // namespace NCommon +} // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_proxy/utils.h b/ydb/core/yq/libs/control_plane_proxy/utils.h index 96e901b5f8..ba33dad6fb 100644 --- a/ydb/core/yq/libs/control_plane_proxy/utils.h +++ b/ydb/core/yq/libs/control_plane_proxy/utils.h @@ -39,17 +39,17 @@ inline TString ExtractServiceAccountId(const YandexQuery::TestConnectionRequest& return ExtractServiceAccountIdImpl(c.setting()); } -inline TString ExtractServiceAccountId(const YandexQuery::CreateConnectionRequest& c) { +inline TString ExtractServiceAccountId(const YandexQuery::CreateConnectionRequest& c) { return ExtractServiceAccountIdImpl(c.content().setting()); } -inline TString ExtractServiceAccountId(const YandexQuery::ModifyConnectionRequest& c) { - return ExtractServiceAccountIdImpl(c.content().setting()); -} - -template<typename T> -TString ExtractServiceAccountId(const T&) { - return {}; -} - +inline TString ExtractServiceAccountId(const YandexQuery::ModifyConnectionRequest& c) { + return ExtractServiceAccountIdImpl(c.content().setting()); +} + +template<typename T> +TString ExtractServiceAccountId(const T&) { + return {}; +} + } // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h index 8c98e790d7..8cafdde3dc 100644 --- a/ydb/core/yq/libs/control_plane_storage/events/events.h +++ b/ydb/core/yq/libs/control_plane_storage/events/events.h @@ -1041,15 +1041,15 @@ struct TEvControlPlaneStorage { TMaybe<NYql::TIssues> Issues; TMaybe<NYql::TIssues> TransientIssues; TMaybe<TString> Statistics; - TMaybe<TVector<YandexQuery::ResultSetMeta>> ResultSetMetas; + TMaybe<TVector<YandexQuery::ResultSetMeta>> ResultSetMetas; TMaybe<TString> Ast; TMaybe<TString> Plan; TMaybe<TInstant> StartedAt; TMaybe<TInstant> FinishedAt; - bool ResignQuery = false; + bool ResignQuery = false; TVector<TTopicConsumer> CreatedTopicConsumers; - TVector<TString> DqGraphs; - i32 DqGraphIndex = 0; + TVector<TString> DqGraphs; + i32 DqGraphIndex = 0; YandexQuery::StateLoadMode StateLoadMode = YandexQuery::STATE_LOAD_MODE_UNSPECIFIED; TMaybe<YandexQuery::StreamingDisposition> StreamingDisposition; }; diff --git a/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp b/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp index eb25dac4b1..8946e4cf8d 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp @@ -12,13 +12,13 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth const auto& request = ev->Get()->Request; const TString tenant = request.tenant(); - const auto& node = request.node(); - const ui32 nodeId = node.node_id(); - const TString instanceId = node.instance_id(); - const TString hostName = node.hostname(); - const ui64 activeWorkers = node.active_workers(); - const ui64 memoryLimit = node.memory_limit(); - const ui64 memoryAllocated = node.memory_allocated(); + const auto& node = request.node(); + const ui32 nodeId = node.node_id(); + const TString instanceId = node.instance_id(); + const TString hostName = node.hostname(); + const ui64 activeWorkers = node.active_workers(); + const ui64 memoryLimit = node.memory_limit(); + const ui64 memoryAllocated = node.memory_allocated(); const ui32 icPort = node.interconnect_port(); const TString nodeAddress = node.node_address(); const auto ttl = TDuration::Seconds(5); @@ -36,16 +36,16 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth } std::shared_ptr<Yq::Private::NodesHealthCheckResult> response = std::make_shared<Yq::Private::NodesHealthCheckResult>(); - { - auto* node = response->add_nodes(); - node->set_node_id(nodeId); - node->set_instance_id(instanceId); - node->set_hostname(hostName); - node->set_active_workers(activeWorkers); - node->set_memory_limit(memoryLimit); - node->set_memory_allocated(memoryAllocated); + { + auto* node = response->add_nodes(); + node->set_node_id(nodeId); + node->set_instance_id(instanceId); + node->set_hostname(hostName); + node->set_active_workers(activeWorkers); + node->set_memory_limit(memoryLimit); + node->set_memory_allocated(memoryAllocated); node->set_node_address(nodeAddress); - } + } TSqlQueryBuilder readQueryBuilder(YdbConnection->TablePathPrefix, "NodesHealthCheck(read)"); readQueryBuilder.AddTimestamp("now", TInstant::Now()); @@ -60,18 +60,18 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth for (const auto& resultSet : resultSets) { TResultSetParser parser(resultSet); while (parser.TryNextRow()) { - auto nid = *parser.ColumnParser(NODE_ID_COLUMN_NAME).GetOptionalUint32(); - if (nid != nodeId) { - auto* node = response->add_nodes(); - node->set_node_id(nid); - node->set_instance_id(*parser.ColumnParser(INSTANCE_ID_COLUMN_NAME).GetOptionalString()); - node->set_hostname(*parser.ColumnParser(HOST_NAME_COLUMN_NAME).GetOptionalString()); - node->set_active_workers(*parser.ColumnParser(ACTIVE_WORKERS_COLUMN_NAME).GetOptionalUint64()); - node->set_memory_limit(*parser.ColumnParser(MEMORY_LIMIT_COLUMN_NAME).GetOptionalUint64()); - node->set_memory_allocated(*parser.ColumnParser(MEMORY_ALLOCATED_COLUMN_NAME).GetOptionalUint64()); + auto nid = *parser.ColumnParser(NODE_ID_COLUMN_NAME).GetOptionalUint32(); + if (nid != nodeId) { + auto* node = response->add_nodes(); + node->set_node_id(nid); + node->set_instance_id(*parser.ColumnParser(INSTANCE_ID_COLUMN_NAME).GetOptionalString()); + node->set_hostname(*parser.ColumnParser(HOST_NAME_COLUMN_NAME).GetOptionalString()); + node->set_active_workers(*parser.ColumnParser(ACTIVE_WORKERS_COLUMN_NAME).GetOptionalUint64()); + node->set_memory_limit(*parser.ColumnParser(MEMORY_LIMIT_COLUMN_NAME).GetOptionalUint64()); + node->set_memory_allocated(*parser.ColumnParser(MEMORY_ALLOCATED_COLUMN_NAME).GetOptionalUint64()); node->set_interconnect_port(parser.ColumnParser(INTERCONNECT_PORT_COLUMN_NAME).GetOptionalUint32().GetOrElse(0)); node->set_node_address(*parser.ColumnParser(NODE_ADDRESS_COLUMN_NAME).GetOptionalString()); - } + } } } diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp index db945dcca9..d2b4bb6e95 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp @@ -55,7 +55,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvWriteResult queryBuilder.AddText( "UPSERT INTO `" RESULT_SETS_TABLE_NAME "`\n" - "SELECT $result_id as result_id, $result_set_id as result_set_id,\n" + "SELECT $result_id as result_id, $result_set_id as result_set_id,\n" " T.*, $expire_at as expire_at FROM as_table($items) AS T;\n" ); diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h index 75ee6f1f9f..320b4d9cac 100644 --- a/ydb/core/yq/libs/control_plane_storage/message_builders.h +++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h @@ -998,15 +998,15 @@ class TPingTaskBuilder { TMaybe<NYql::TIssues> Issues; TMaybe<NYql::TIssues> TransientIssues; TMaybe<TString> Statistics; - TMaybe<TVector<YandexQuery::ResultSetMeta>> ResultSetMetas; + TMaybe<TVector<YandexQuery::ResultSetMeta>> ResultSetMetas; TMaybe<TString> Ast; TMaybe<TString> Plan; TMaybe<TInstant> StartedAt; TMaybe<TInstant> FinishedAt; - bool ResignQuery = false; + bool ResignQuery = false; TVector<NYq::TEvControlPlaneStorage::TTopicConsumer> CreatedTopicConsumers; - TVector<TString> DqGraphs; - i32 DqGraphIndex = 0; + TVector<TString> DqGraphs; + i32 DqGraphIndex = 0; public: TPingTaskBuilder() @@ -1068,9 +1068,9 @@ public: return *this; } - TPingTaskBuilder& SetResultSetMetas(const TVector<YandexQuery::ResultSetMeta>& resultSetMetas) + TPingTaskBuilder& SetResultSetMetas(const TVector<YandexQuery::ResultSetMeta>& resultSetMetas) { - ResultSetMetas = resultSetMetas; + ResultSetMetas = resultSetMetas; return *this; } @@ -1099,29 +1099,29 @@ public: } TPingTaskBuilder& SetResignQuery(bool resignQuery = true) - { - ResignQuery = resignQuery; - return *this; - } - + { + ResignQuery = resignQuery; + return *this; + } + TPingTaskBuilder& AddCreatedConsumer(const TString& databaseId, const TString& database, const TString& topicPath, const TString& consumerName, const TString& clusterEndpoint, bool useSsl) { CreatedTopicConsumers.emplace_back(NYq::TEvControlPlaneStorage::TTopicConsumer{databaseId, database, topicPath, consumerName, clusterEndpoint, useSsl, "", false}); return *this; } - TPingTaskBuilder& AddDqGraph(const TString& dqGraph) - { - DqGraphs.push_back(dqGraph); - return *this; - } - - TPingTaskBuilder& SetDqGraphIndex(i32 dqGraphIndex) - { - DqGraphIndex = dqGraphIndex; - return *this; - } - + TPingTaskBuilder& AddDqGraph(const TString& dqGraph) + { + DqGraphs.push_back(dqGraph); + return *this; + } + + TPingTaskBuilder& SetDqGraphIndex(i32 dqGraphIndex) + { + DqGraphIndex = dqGraphIndex; + return *this; + } + std::unique_ptr<TEvControlPlaneStorage::TEvPingTaskRequest> Build() { auto request = std::make_unique<TEvControlPlaneStorage::TEvPingTaskRequest>(Scope, QueryId, Owner, Deadline, ResultId); @@ -1129,15 +1129,15 @@ public: request->Issues = Issues; request->TransientIssues = TransientIssues; request->Statistics = Statistics; - request->ResultSetMetas = ResultSetMetas; + request->ResultSetMetas = ResultSetMetas; request->Ast = Ast; request->Plan = Plan; request->StartedAt = StartedAt; request->FinishedAt = FinishedAt; - request->ResignQuery = ResignQuery; + request->ResignQuery = ResignQuery; request->CreatedTopicConsumers = CreatedTopicConsumers; - request->DqGraphs = DqGraphs; - request->DqGraphIndex = DqGraphIndex; + request->DqGraphs = DqGraphs; + request->DqGraphIndex = DqGraphIndex; return request; } }; @@ -1167,33 +1167,33 @@ public: return *this; } - TNodesHealthCheckBuilder& SetHostName(const TString& hostName) - { - HostName = hostName; - return *this; - } - - TNodesHealthCheckBuilder& SetInstanceId(const TString& instanceId) - { - InstanceId = instanceId; - return *this; - } - + TNodesHealthCheckBuilder& SetHostName(const TString& hostName) + { + HostName = hostName; + return *this; + } + + TNodesHealthCheckBuilder& SetInstanceId(const TString& instanceId) + { + InstanceId = instanceId; + return *this; + } + TNodesHealthCheckBuilder& SetActiveWorkers(const ui64& activeWorkers) { ActiveWorkers = activeWorkers; return *this; } - TNodesHealthCheckBuilder& SetMemoryLimit(const ui64& memoryLimit) + TNodesHealthCheckBuilder& SetMemoryLimit(const ui64& memoryLimit) { - MemoryLimit = memoryLimit; + MemoryLimit = memoryLimit; return *this; } - TNodesHealthCheckBuilder& SetMemoryAllocated(const ui64& memoryAllocated) + TNodesHealthCheckBuilder& SetMemoryAllocated(const ui64& memoryAllocated) { - MemoryAllocated = memoryAllocated; + MemoryAllocated = memoryAllocated; return *this; } @@ -1201,15 +1201,15 @@ public: { Yq::Private::NodesHealthCheckRequest request; request.set_tenant(Tenant); - auto& node = *request.mutable_node(); - node.set_node_id(NodeId); - node.set_instance_id(InstanceId); - node.set_hostname(HostName); - node.set_active_workers(ActiveWorkers); - node.set_memory_limit(MemoryLimit); - node.set_memory_allocated(MemoryAllocated); + auto& node = *request.mutable_node(); + node.set_node_id(NodeId); + node.set_instance_id(InstanceId); + node.set_hostname(HostName); + node.set_active_workers(ActiveWorkers); + node.set_memory_limit(MemoryLimit); + node.set_memory_allocated(MemoryAllocated); return std::make_unique<TEvControlPlaneStorage::TEvNodesHealthCheckRequest>(std::move(request)); } }; -} +} diff --git a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto index e6ebff0c84..c594ec1b98 100644 --- a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto +++ b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto @@ -16,19 +16,19 @@ message Consumer { message QueryInternal { string token = 1; - repeated NYql.NDqProto.TDqTask task = 2; // deprected and should not be used, will be removed in future + repeated NYql.NDqProto.TDqTask task = 2; // deprected and should not be used, will be removed in future repeated Yql.DqsProto.TAttr settings = 3; repeated Consumer consumer = 4; repeated YandexQuery.Connection connection = 5; repeated YandexQuery.Binding binding = 6; YandexQuery.QueryAction action = 7; - string ast = 8; // deprected and should not be used, will be removed in future + string ast = 8; // deprected and should not be used, will be removed in future ExecuteMode execute_mode = 9; StateLoadMode state_load_mode = 10; string cloud_id = 11; repeated Yq.Private.TopicConsumer created_topic_consumers = 12; - repeated bytes dq_graph = 13; - int32 dq_graph_index = 14; + repeated bytes dq_graph = 13; + int32 dq_graph_index = 14; StreamingDisposition disposition = 15; } diff --git a/ydb/core/yq/libs/control_plane_storage/schema.h b/ydb/core/yq/libs/control_plane_storage/schema.h index 613eb7bcc1..7434bbfdef 100644 --- a/ydb/core/yq/libs/control_plane_storage/schema.h +++ b/ydb/core/yq/libs/control_plane_storage/schema.h @@ -58,8 +58,8 @@ namespace NYq { #define INSTANCE_ID_COLUMN_NAME "instance_id" #define NODE_ID_COLUMN_NAME "node_id" #define ACTIVE_WORKERS_COLUMN_NAME "active_workers" -#define MEMORY_LIMIT_COLUMN_NAME "memory_limit" -#define MEMORY_ALLOCATED_COLUMN_NAME "memory_allocated" +#define MEMORY_LIMIT_COLUMN_NAME "memory_limit" +#define MEMORY_ALLOCATED_COLUMN_NAME "memory_allocated" #define INTERCONNECT_PORT_COLUMN_NAME "interconnect_port" #define NODE_ADDRESS_COLUMN_NAME "node_address" @@ -67,4 +67,4 @@ namespace NYq { #define OWNER_COLUMN_NAME "owner" #define LAST_SEEN_AT_COLUMN_NAME "last_seen_at" -} // namespace NYq +} // namespace NYq diff --git a/ydb/core/yq/libs/control_plane_storage/util.cpp b/ydb/core/yq/libs/control_plane_storage/util.cpp index 288f25a1cb..53aafef259 100644 --- a/ydb/core/yq/libs/control_plane_storage/util.cpp +++ b/ydb/core/yq/libs/control_plane_storage/util.cpp @@ -99,13 +99,13 @@ bool DoesPingTaskUpdateQueriesTable(const TEvControlPlaneStorage::TEvPingTaskReq request->Issues || request->TransientIssues || request->Statistics || - request->ResultSetMetas || + request->ResultSetMetas || request->Ast || request->Plan || request->StartedAt || request->FinishedAt || - !request->CreatedTopicConsumers.empty() || - !request->DqGraphs.empty() || + !request->CreatedTopicConsumers.empty() || + !request->DqGraphs.empty() || request->DqGraphIndex || request->StateLoadMode || request->StreamingDisposition; diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp index 222ebb6e8e..85d995c25f 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp @@ -230,8 +230,8 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateNodesTable(TActorSystem* as) .AddNullableColumn(NODE_ID_COLUMN_NAME, EPrimitiveType::Uint32) .AddNullableColumn(HOST_NAME_COLUMN_NAME, EPrimitiveType::String) .AddNullableColumn(ACTIVE_WORKERS_COLUMN_NAME, EPrimitiveType::Uint64) - .AddNullableColumn(MEMORY_LIMIT_COLUMN_NAME, EPrimitiveType::Uint64) - .AddNullableColumn(MEMORY_ALLOCATED_COLUMN_NAME, EPrimitiveType::Uint64) + .AddNullableColumn(MEMORY_LIMIT_COLUMN_NAME, EPrimitiveType::Uint64) + .AddNullableColumn(MEMORY_ALLOCATED_COLUMN_NAME, EPrimitiveType::Uint64) .AddNullableColumn(EXPIRE_AT_COLUMN_NAME, EPrimitiveType::Timestamp) .AddNullableColumn(INTERCONNECT_PORT_COLUMN_NAME, EPrimitiveType::Uint32) .AddNullableColumn(NODE_ADDRESS_COLUMN_NAME, EPrimitiveType::String) diff --git a/ydb/core/yq/libs/gateway/empty_gateway.cpp b/ydb/core/yq/libs/gateway/empty_gateway.cpp index 01bfd9bfcb..15be4854fe 100644 --- a/ydb/core/yq/libs/gateway/empty_gateway.cpp +++ b/ydb/core/yq/libs/gateway/empty_gateway.cpp @@ -1,95 +1,95 @@ -#include "empty_gateway.h" - +#include "empty_gateway.h" + #include <ydb/core/yq/libs/graph_params/proto/graph_params.pb.h> #include <ydb/core/yq/libs/events/events.h> #include <ydb/core/yq/libs/tasks_packer/tasks_packer.h> -#include <library/cpp/actors/core/actor.h> - -namespace NYq { - -class TEmptyGateway : public NYql::IDqGateway { -public: - explicit TEmptyGateway(NActors::TActorId runActorId) : RunActorId(runActorId) { - } - - NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override { - Y_UNUSED(sessionId); - Y_UNUSED(username); - auto result = NThreading::NewPromise<void>(); - result.SetValue(); - return result; - } - - void CloseSession(const TString& action) override { - Y_UNUSED(action); - } - - NThreading::TFuture<TResult> ExecutePlan( - const TString& sessionId, - NYql::NDqs::IDqsExecutionPlanner& plan, - const TVector<TString>& columns, - const THashMap<TString, TString>& secureParams, +#include <library/cpp/actors/core/actor.h> + +namespace NYq { + +class TEmptyGateway : public NYql::IDqGateway { +public: + explicit TEmptyGateway(NActors::TActorId runActorId) : RunActorId(runActorId) { + } + + NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override { + Y_UNUSED(sessionId); + Y_UNUSED(username); + auto result = NThreading::NewPromise<void>(); + result.SetValue(); + return result; + } + + void CloseSession(const TString& action) override { + Y_UNUSED(action); + } + + NThreading::TFuture<TResult> ExecutePlan( + const TString& sessionId, + NYql::NDqs::IDqsExecutionPlanner& plan, + const TVector<TString>& columns, + const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& queryParams, - const NYql::TDqSettings::TPtr& settings, - const TDqProgressWriter& progressWriter, - const THashMap<TString, TString>& modulesMapping, + const NYql::TDqSettings::TPtr& settings, + const TDqProgressWriter& progressWriter, + const THashMap<TString, TString>& modulesMapping, bool discard) override - { - Y_UNUSED(progressWriter); - Y_UNUSED(modulesMapping); // TODO: support. + { + Y_UNUSED(progressWriter); + Y_UNUSED(modulesMapping); // TODO: support. Y_UNUSED(discard); Y_UNUSED(queryParams); - - NProto::TGraphParams params; + + NProto::TGraphParams params; THashMap<i64, TString> stagePrograms; NTasksPacker::Pack(plan.GetTasks(), stagePrograms); - for (auto&& task : plan.GetTasks()) { - *params.AddTasks() = std::move(task); - } + for (auto&& task : plan.GetTasks()) { + *params.AddTasks() = std::move(task); + } for (const auto& [id, program]: stagePrograms) { (*params.MutableStageProgram())[id] = program; } - for (auto&& col : columns) { - *params.AddColumns() = col; - } - for (auto&& [k, v] : secureParams) { - (*params.MutableSecureParams())[k] = v; - } - settings->Save(params); - if (plan.GetSourceID()) { - params.SetSourceId(plan.GetSourceID().NodeId() - 1); - params.SetResultType(plan.GetResultType()); - } - params.SetSession(sessionId); - + for (auto&& col : columns) { + *params.AddColumns() = col; + } + for (auto&& [k, v] : secureParams) { + (*params.MutableSecureParams())[k] = v; + } + settings->Save(params); + if (plan.GetSourceID()) { + params.SetSourceId(plan.GetSourceID().NodeId() - 1); + params.SetResultType(plan.GetResultType()); + } + params.SetSession(sessionId); + NActors::TActivationContext::Send(new NActors::IEventHandle(RunActorId, {}, new TEvents::TEvGraphParams(params))); - - auto result = NThreading::NewPromise<NYql::IDqGateway::TResult>(); - NYql::IDqGateway::TResult gatewayResult; - // fake it till you make it - // generate dummy result for YQL facade now, remove this gateway completely - // when top-level YQL facade call like Preprocess() is implemented - if (plan.GetResultType()) { - // for resultable graphs return dummy "select 1" result (it is not used and is required to satisfy YQL facade only) - gatewayResult.SetSuccess(); - gatewayResult.Data = "[[\001\0021]]"; - gatewayResult.Truncated = true; - gatewayResult.RowsCount = 0; - } else { - // for resultless results expect infinite INSERT FROM SELECT and fail YQL facade (with well known secret code?) - gatewayResult.Issues.AddIssues({NYql::TIssue("MAGIC BREAK").SetCode(555, NYql::TSeverityIds::S_ERROR)}); - } - result.SetValue(gatewayResult); - return result; - } - -private: - NActors::TActorId RunActorId; -}; - -TIntrusivePtr<NYql::IDqGateway> CreateEmptyGateway(NActors::TActorId runActorId) { - return MakeIntrusive<TEmptyGateway>(runActorId); -} - -} // namespace NYq + + auto result = NThreading::NewPromise<NYql::IDqGateway::TResult>(); + NYql::IDqGateway::TResult gatewayResult; + // fake it till you make it + // generate dummy result for YQL facade now, remove this gateway completely + // when top-level YQL facade call like Preprocess() is implemented + if (plan.GetResultType()) { + // for resultable graphs return dummy "select 1" result (it is not used and is required to satisfy YQL facade only) + gatewayResult.SetSuccess(); + gatewayResult.Data = "[[\001\0021]]"; + gatewayResult.Truncated = true; + gatewayResult.RowsCount = 0; + } else { + // for resultless results expect infinite INSERT FROM SELECT and fail YQL facade (with well known secret code?) + gatewayResult.Issues.AddIssues({NYql::TIssue("MAGIC BREAK").SetCode(555, NYql::TSeverityIds::S_ERROR)}); + } + result.SetValue(gatewayResult); + return result; + } + +private: + NActors::TActorId RunActorId; +}; + +TIntrusivePtr<NYql::IDqGateway> CreateEmptyGateway(NActors::TActorId runActorId) { + return MakeIntrusive<TEmptyGateway>(runActorId); +} + +} // namespace NYq diff --git a/ydb/core/yq/libs/gateway/empty_gateway.h b/ydb/core/yq/libs/gateway/empty_gateway.h index d18f5c2c0e..f9c95f9e15 100644 --- a/ydb/core/yq/libs/gateway/empty_gateway.h +++ b/ydb/core/yq/libs/gateway/empty_gateway.h @@ -1,10 +1,10 @@ -#pragma once +#pragma once #include <ydb/library/yql/providers/dq/provider/yql_dq_gateway.h> - -#include <library/cpp/actors/core/actorsystem.h> - -namespace NYq { - -TIntrusivePtr<NYql::IDqGateway> CreateEmptyGateway(NActors::TActorId runActorId); - -} // namespace NYq + +#include <library/cpp/actors/core/actorsystem.h> + +namespace NYq { + +TIntrusivePtr<NYql::IDqGateway> CreateEmptyGateway(NActors::TActorId runActorId); + +} // namespace NYq diff --git a/ydb/core/yq/libs/gateway/ya.make b/ydb/core/yq/libs/gateway/ya.make index 503e9b691b..16e315281f 100644 --- a/ydb/core/yq/libs/gateway/ya.make +++ b/ydb/core/yq/libs/gateway/ya.make @@ -3,7 +3,7 @@ OWNER(g:yq) LIBRARY() SRCS( - empty_gateway.cpp + empty_gateway.cpp ) PEERDIR( diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index d66ac202bf..802b69c9f2 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -186,12 +186,12 @@ void Init( RegisterDqPqWriteActorFactory(*sinkActorFactory, yqSharedResources->YdbDriver, credentialsFactory); RegisterDQSolomonWriteActorFactory(*sinkActorFactory, credentialsFactory); - } - - ui64 mkqlInitialMemoryLimit = 8_GB; + } + ui64 mkqlInitialMemoryLimit = 8_GB; + if (protoConfig.GetResourceManager().GetEnabled()) { - mkqlInitialMemoryLimit = protoConfig.GetResourceManager().GetMkqlInitialMemoryLimit(); + mkqlInitialMemoryLimit = protoConfig.GetResourceManager().GetMkqlInitialMemoryLimit(); if (!mkqlInitialMemoryLimit) { mkqlInitialMemoryLimit = 8_GB; } @@ -227,7 +227,7 @@ void Init( yqSharedResources, icPort, std::get<1>(localAddr), - tenant, + tenant, mkqlInitialMemoryLimit, clientCounters); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h index 84eeda2edb..f54c2646bb 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor.h @@ -18,7 +18,7 @@ namespace NDq { struct TEvDqCompute { struct TEvState : public NActors::TEventPB<TEvState, NDqProto::TEvComputeActorState, TDqComputeEvents::EvState> {}; - struct TEvStateRequest : public NActors::TEventPB<TEvState, NDqProto::TEvComputeStateRequest, TDqComputeEvents::EvStateRequest> {}; + struct TEvStateRequest : public NActors::TEventPB<TEvState, NDqProto::TEvComputeStateRequest, TDqComputeEvents::EvStateRequest> {}; struct TEvResumeExecution : public NActors::TEventLocal<TEvResumeExecution, TDqComputeEvents::EvResumeExecution> {}; @@ -233,19 +233,19 @@ struct TComputeRuntimeSettings { TMaybe<NDqProto::TRlPath> RlPath; }; -using TAllocateMemoryCallback = std::function<bool(const TTxId& txId, ui64 taskId, ui64 memory)>; -using TFreeMemoryCallback = std::function<void(const TTxId& txId, ui64 taskId, ui64 memory)>; - +using TAllocateMemoryCallback = std::function<bool(const TTxId& txId, ui64 taskId, ui64 memory)>; +using TFreeMemoryCallback = std::function<void(const TTxId& txId, ui64 taskId, ui64 memory)>; + struct TComputeMemoryLimits { ui64 ChannelBufferSize = 0; ui64 ScanBufferSize = 0; // TODO: drop it ui64 MkqlLightProgramMemoryLimit = 0; ui64 MkqlHeavyProgramMemoryLimit = 0; - TAllocateMemoryCallback AllocateMemoryFn = nullptr; - TFreeMemoryCallback FreeMemoryFn = nullptr; - ui64 MinMemAllocSize = 30_MB; - ui64 MinMemFreeSize = 30_MB; + TAllocateMemoryCallback AllocateMemoryFn = nullptr; + TFreeMemoryCallback FreeMemoryFn = nullptr; + ui64 MinMemAllocSize = 30_MB; + ui64 MinMemFreeSize = 30_MB; }; using TTaskRunnerFactory = std::function< diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index deaadefb42..6a51d5efff 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -118,7 +118,7 @@ public: protected: TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) : ExecuterId(executerId) , TxId(txId) , Task(std::move(task)) @@ -128,8 +128,8 @@ protected: , SourceActorFactory(std::move(sourceActorFactory)) , SinkActorFactory(std::move(sinkActorFactory)) , CheckpointingMode(GetTaskCheckpointingMode(Task)) - , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) - , Running(!Task.GetCreateSuspended()) + , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) + , Running(!Task.GetCreateSuspended()) { if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) { BasicStats = std::make_unique<TBasicStats>(); @@ -142,7 +142,7 @@ protected: TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, const NDqProto::TDqTask& task, IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory, - const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) + const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits) : ExecuterId(executerId) , TxId(txId) , Task(task) @@ -151,8 +151,8 @@ protected: , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn) , SourceActorFactory(std::move(sourceActorFactory)) , SinkActorFactory(std::move(sinkActorFactory)) - , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) - , Running(!Task.GetCreateSuspended()) + , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) + , Running(!Task.GetCreateSuspended()) { if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) { BasicStats = std::make_unique<TBasicStats>(); @@ -193,7 +193,7 @@ protected: FFunc(TEvDqCompute::TEvChannelData::EventType, Channels->Receive); FFunc(TEvDqCompute::TEvChannelDataAck::EventType, Channels->Receive); hFunc(TEvDqCompute::TEvRun, HandleExecuteBase); - hFunc(TEvDqCompute::TEvStateRequest, HandleExecuteBase); + hFunc(TEvDqCompute::TEvStateRequest, HandleExecuteBase); hFunc(TEvDqCompute::TEvNewCheckpointCoordinator, HandleExecuteBase); FFunc(TEvDqCompute::TEvInjectCheckpoint::EventType, Checkpoints->Receive); FFunc(TEvDqCompute::TEvCommitState::EventType, Checkpoints->Receive); @@ -230,18 +230,18 @@ protected: DoExecuteImpl(); } - if (alloc->GetAllocated() - alloc->GetUsed() > MemoryLimits.MinMemFreeSize) { + if (alloc->GetAllocated() - alloc->GetUsed() > MemoryLimits.MinMemFreeSize) { alloc->ReleaseFreePages(); - if (MemoryLimits.FreeMemoryFn) { - auto newLimit = std::max(alloc->GetAllocated(), CalcMkqlMemoryLimit()); - if (MkqlMemoryLimit > newLimit) { - auto freedSize = MkqlMemoryLimit - newLimit; - MkqlMemoryLimit = newLimit; - alloc->SetLimit(newLimit); - MemoryLimits.FreeMemoryFn(TxId, Task.GetId(), freedSize); - CA_LOG_I("[Mem] memory shrinked, new limit: " << MkqlMemoryLimit); - } - } + if (MemoryLimits.FreeMemoryFn) { + auto newLimit = std::max(alloc->GetAllocated(), CalcMkqlMemoryLimit()); + if (MkqlMemoryLimit > newLimit) { + auto freedSize = MkqlMemoryLimit - newLimit; + MkqlMemoryLimit = newLimit; + alloc->SetLimit(newLimit); + MemoryLimits.FreeMemoryFn(TxId, Task.GetId(), freedSize); + CA_LOG_I("[Mem] memory shrinked, new limit: " << MkqlMemoryLimit); + } + } } auto now = TInstant::Now(); @@ -378,9 +378,9 @@ protected: return; } if (Channels->CheckInFlight("Tasks execution finished")) { - State = NDqProto::COMPUTE_STATE_FINISHED; + State = NDqProto::COMPUTE_STATE_FINISHED; CA_LOG_D("Compute state finished. All channels finished"); - ReportStateAndMaybeDie(TIssue("success")); + ReportStateAndMaybeDie(TIssue("success")); } } } @@ -388,12 +388,12 @@ protected: protected: void Terminate(bool success, const TString& message) { - - if (MkqlMemoryLimit && MemoryLimits.FreeMemoryFn) { - MemoryLimits.FreeMemoryFn(TxId, Task.GetId(), MkqlMemoryLimit); - MkqlMemoryLimit = 0; - } - + + if (MkqlMemoryLimit && MemoryLimits.FreeMemoryFn) { + MemoryLimits.FreeMemoryFn(TxId, Task.GetId(), MkqlMemoryLimit); + MkqlMemoryLimit = 0; + } + if (Channels) { TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Channels->SelfId(), this->SelfId(), new NActors::TEvents::TEvPoison); @@ -431,11 +431,11 @@ protected: this->PassAway(); } - void ReportStateAndMaybeDie(TIssue&& issue) { - ReportStateAndMaybeDie( - State == NDqProto::COMPUTE_STATE_FINISHED ? - Ydb::StatusIds::STATUS_CODE_UNSPECIFIED - : Ydb::StatusIds::ABORTED, TIssues({issue})); + void ReportStateAndMaybeDie(TIssue&& issue) { + ReportStateAndMaybeDie( + State == NDqProto::COMPUTE_STATE_FINISHED ? + Ydb::StatusIds::STATUS_CODE_UNSPECIFIED + : Ydb::StatusIds::ABORTED, TIssues({issue})); } void ReportStateAndDie(NDqProto::EComputeState state, TIssue&& issue) { @@ -463,12 +463,12 @@ protected: Terminate(state == NDqProto::COMPUTE_STATE_FINISHED, issue.Message); } - void ReportStateAndMaybeDie(Ydb::StatusIds::StatusCode status, const TIssues& issues) + void ReportStateAndMaybeDie(Ydb::StatusIds::StatusCode status, const TIssues& issues) { auto execEv = MakeHolder<TEvDqCompute::TEvState>(); auto& record = execEv->Record; - record.SetState(State); + record.SetState(State); record.SetStatus(status); record.SetTaskId(Task.GetId()); if (RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_BASIC) { @@ -478,13 +478,13 @@ protected: this->Send(ExecuterId, execEv.Release()); - if (Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) { - // checkpointed CAs must not self-destroy - return; - } - - TerminateSources(NDqProto::EComputeState_Name(State), State == NDqProto::COMPUTE_STATE_FINISHED); - Terminate(State == NDqProto::COMPUTE_STATE_FINISHED, NDqProto::EComputeState_Name(State)); + if (Checkpoints && State == NDqProto::COMPUTE_STATE_FINISHED) { + // checkpointed CAs must not self-destroy + return; + } + + TerminateSources(NDqProto::EComputeState_Name(State), State == NDqProto::COMPUTE_STATE_FINISHED); + Terminate(State == NDqProto::COMPUTE_STATE_FINISHED, NDqProto::EComputeState_Name(State)); } void InternalError(TIssuesIds::EIssueCode issueCode, const TString& message) { @@ -492,8 +492,8 @@ protected: TIssue issue(message); SetIssueCode(issueCode, issue); std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard = MaybeBindAllocator(); - State = NDqProto::COMPUTE_STATE_FAILURE; - ReportStateAndMaybeDie(std::move(issue)); + State = NDqProto::COMPUTE_STATE_FAILURE; + ReportStateAndMaybeDie(std::move(issue)); } void ContinueExecute() { @@ -657,13 +657,13 @@ protected: void Start() override { Running = true; - State = NDqProto::COMPUTE_STATE_EXECUTING; + State = NDqProto::COMPUTE_STATE_EXECUTING; ContinueExecute(); } void Stop() override { Running = false; - State = NDqProto::COMPUTE_STATE_UNKNOWN; + State = NDqProto::COMPUTE_STATE_UNKNOWN; } protected: @@ -874,8 +874,8 @@ protected: } } - void HandleExecuteBase(TEvDqCompute::TEvRun::TPtr& ev) { - CA_LOG_D("Got TEvRun from actor " << ev->Sender); + void HandleExecuteBase(TEvDqCompute::TEvRun::TPtr& ev) { + CA_LOG_D("Got TEvRun from actor " << ev->Sender); Start(); // Event from coordinator should be processed to confirm seq no. @@ -885,16 +885,16 @@ protected: } } - void HandleExecuteBase(TEvDqCompute::TEvStateRequest::TPtr& ev) { - CA_LOG_D("Got TEvStateRequest from actor " << ev->Sender << " TaskId: " << Task.GetId() << " PingCookie: " << ev->Cookie); - auto evState = MakeHolder<TEvDqCompute::TEvState>(); - evState->Record.SetState(NDqProto::COMPUTE_STATE_EXECUTING); - evState->Record.SetStatus(Ydb::StatusIds::SUCCESS); - evState->Record.SetTaskId(Task.GetId()); + void HandleExecuteBase(TEvDqCompute::TEvStateRequest::TPtr& ev) { + CA_LOG_D("Got TEvStateRequest from actor " << ev->Sender << " TaskId: " << Task.GetId() << " PingCookie: " << ev->Cookie); + auto evState = MakeHolder<TEvDqCompute::TEvState>(); + evState->Record.SetState(NDqProto::COMPUTE_STATE_EXECUTING); + evState->Record.SetStatus(Ydb::StatusIds::SUCCESS); + evState->Record.SetTaskId(Task.GetId()); FillStats(evState->Record.MutableStats(), /* last */ false); - this->Send(ev->Sender, evState.Release(), NActors::IEventHandle::FlagTrackDelivery, ev->Cookie); - } - + this->Send(ev->Sender, evState.Release(), NActors::IEventHandle::FlagTrackDelivery, ev->Cookie); + } + void HandleExecuteBase(TEvDqCompute::TEvNewCheckpointCoordinator::TPtr& ev) { if (!Checkpoints) { Checkpoints = new TDqComputeActorCheckpoints(TxId, Task, this); @@ -1312,14 +1312,14 @@ private: MkqlMemoryLimit = CalcMkqlMemoryLimit(); } - static ui64 AlignMemorySizeToMbBoundary(ui64 memory) { - // allocate memory in 1_MB (2^20B) chunks, so requested value is rounded up to MB boundary - constexpr ui64 alignMask = 1_MB - 1; - return (memory + alignMask) & ~alignMask; - } - + static ui64 AlignMemorySizeToMbBoundary(ui64 memory) { + // allocate memory in 1_MB (2^20B) chunks, so requested value is rounded up to MB boundary + constexpr ui64 alignMask = 1_MB - 1; + return (memory + alignMask) & ~alignMask; + } + void RequestExtraMemory(ui64 memory, NKikimr::NMiniKQL::TScopedAlloc* alloc) { - memory = std::max(AlignMemorySizeToMbBoundary(memory), MemoryLimits.MinMemAllocSize); + memory = std::max(AlignMemorySizeToMbBoundary(memory), MemoryLimits.MinMemAllocSize); CA_LOG_I("not enough memory, request +" << memory); @@ -1354,8 +1354,8 @@ private: dst->SetMkqlMaxMemoryUsage(ProfileStats->MkqlMaxUsedMemory); dst->SetMkqlExtraMemoryBytes(ProfileStats->MkqlExtraMemoryBytes); dst->SetMkqlExtraMemoryRequests(ProfileStats->MkqlExtraMemoryRequests); - } - + } + if (TaskRunner) { TaskRunner->UpdateStats(); @@ -1474,7 +1474,7 @@ protected: THashMap<ui64, TSinkInfo> SinksMap; // Output index -> Sink info ui64 MkqlMemoryLimit = 0; bool ResumeEventScheduled = false; - NDqProto::EComputeState State; + NDqProto::EComputeState State; struct TBasicStats { TDuration CpuTime; diff --git a/ydb/library/yql/dq/actors/dq_events_ids.h b/ydb/library/yql/dq/actors/dq_events_ids.h index 698377a393..19125041d3 100644 --- a/ydb/library/yql/dq/actors/dq_events_ids.h +++ b/ydb/library/yql/dq/actors/dq_events_ids.h @@ -46,7 +46,7 @@ struct TDqComputeEvents { EvRestoreFromCheckpointResult, EvGetTaskState, EvGetTaskStateResult, - EvStateRequest, + EvStateRequest, EvNewCheckpointCoordinatorAck, // place all new events here diff --git a/ydb/library/yql/dq/actors/protos/dq_events.proto b/ydb/library/yql/dq/actors/protos/dq_events.proto index 0e38ab3bbc..8b05b8c3e2 100644 --- a/ydb/library/yql/dq/actors/protos/dq_events.proto +++ b/ydb/library/yql/dq/actors/protos/dq_events.proto @@ -56,9 +56,9 @@ message TEvComputeActorState { optional TDqComputeActorStats Stats = 6; }; -message TEvComputeStateRequest { -} - +message TEvComputeStateRequest { +} + message TEvComputeChannelData { optional TChannelData ChannelData = 1; optional uint64 SendTime = 2; diff --git a/ydb/library/yql/dq/common/dq_common.h b/ydb/library/yql/dq/common/dq_common.h index 71cc7be6cb..fefcd9fa91 100644 --- a/ydb/library/yql/dq/common/dq_common.h +++ b/ydb/library/yql/dq/common/dq_common.h @@ -55,7 +55,7 @@ struct TBaseDqExecuterEvents { ES_PULL_RESULT, ES_RESULT_SET, ES_DQ_FAILURE, - ES_GRAPH, + ES_GRAPH, ES_GRAPH_FINISHED, ES_GRAPH_EXECUTION_EVENT, }; diff --git a/ydb/library/yql/dq/common/dq_resource_quoter.h b/ydb/library/yql/dq/common/dq_resource_quoter.h index e087e090a2..4635c2b00b 100644 --- a/ydb/library/yql/dq/common/dq_resource_quoter.h +++ b/ydb/library/yql/dq/common/dq_resource_quoter.h @@ -1,147 +1,147 @@ -#pragma once - -#include <util/generic/hash.h> -#include <util/system/mutex.h> -#include <util/system/types.h> - -namespace NYql::NDq { - -using TAllocateCallback = std::function<void(const NDq::TTxId& txId, ui64 taskId, ui64 size, bool success)>; -using TNotifyCallback = std::function<void(const ui64 limit, ui64 allocated)>; - -struct TTransactionTaskInfo { - NDq::TTxId TxId; - ui64 TaskId = 0; - ui64 Size = 0; -}; - -class TResourceQuoter { - -public: - TResourceQuoter(ui64 limit, bool strict = false) : Limit(limit), Strict(strict) { - } - - ui64 GetLimit() const { - return Limit; - } - - ui64 GetAllocatedTotal() const { - TGuard<TMutex> lock(Mutex); - return Allocated; - } - - ui64 GetFreeTotal() const { - TGuard<TMutex> lock(Mutex); - return Limit > Allocated ? Limit - Allocated : 0; - } - - bool Allocate(const NDq::TTxId& txId, ui64 taskId, ui64 size) { - TGuard<TMutex> lock(Mutex); - if (Limit && Allocated + size > Limit) { - return false; - } - Allocated += size; - - auto& txMap = ResourceMap[txId]; - auto itt = txMap.find(taskId); - if (itt == txMap.end()) { - auto& info = txMap[taskId]; - info.TxId = txId; - info.TaskId = taskId; - info.Size = size; - } else { - itt->second.Size += size; - } - - Notify(); - return true; - } - - void Free(const NDq::TTxId& txId, ui64 taskId, ui64 size) { - TGuard<TMutex> lock(Mutex); - auto itx = ResourceMap.find(txId); - - if (Strict) { - Y_VERIFY(itx != ResourceMap.end()); - } else { - if(itx == ResourceMap.end()) { - return; - } - } - - auto itt = itx->second.find(taskId); - Y_VERIFY(itt != itx->second.end()); - - if (Strict) { - Y_VERIFY(itt->second.Size >= size); - } else { - if (size > itt->second.Size) { - size = itt->second.Size; - } - } - - Y_VERIFY(Allocated >= size); - - itt->second.Size -= size; - if (itt->second.Size == 0) { - itx->second.erase(itt); - if (itx->second.size() == 0) { - ResourceMap.erase(itx); - } - } - - Allocated -= size; - Notify(); - } - - ui64 GetAllocated(const NDq::TTxId& txId, ui64 taskId) const { - TGuard<TMutex> lock(Mutex); - auto itx = ResourceMap.find(txId); - if (itx != ResourceMap.end()) { - auto itt = itx->second.find(taskId); - if (itt != itx->second.end()) { - return itt->second.Size; - } - } - return 0; - } - - void Free(const NDq::TTxId& txId, ui64 taskId) { - TGuard<TMutex> lock(Mutex); - auto itx = ResourceMap.find(txId); - if (itx != ResourceMap.end()) { - auto itt = itx->second.find(taskId); - if (itt != itx->second.end()) { - Y_VERIFY(Allocated >= itt->second.Size); - Allocated -= itt->second.Size; - itx->second.erase(itt); - if (itx->second.size() == 0) { - ResourceMap.erase(itx); - } - Notify(); - } - } - } - - TNotifyCallback SetNotifier(const TNotifyCallback& newCb) { - TNotifyCallback result = std::move(Cb); - Cb = newCb; - return result; - } - -private: - ui64 Limit; - bool Strict; - ui64 Allocated = 0; - TMutex Mutex; - THashMap<NDq::TTxId, THashMap<ui64, TTransactionTaskInfo>> ResourceMap; - TNotifyCallback Cb; - - void Notify() const { - if (Cb) { - Cb(Limit, Allocated); - } - } -}; - -} // namespace NYql::NDq +#pragma once + +#include <util/generic/hash.h> +#include <util/system/mutex.h> +#include <util/system/types.h> + +namespace NYql::NDq { + +using TAllocateCallback = std::function<void(const NDq::TTxId& txId, ui64 taskId, ui64 size, bool success)>; +using TNotifyCallback = std::function<void(const ui64 limit, ui64 allocated)>; + +struct TTransactionTaskInfo { + NDq::TTxId TxId; + ui64 TaskId = 0; + ui64 Size = 0; +}; + +class TResourceQuoter { + +public: + TResourceQuoter(ui64 limit, bool strict = false) : Limit(limit), Strict(strict) { + } + + ui64 GetLimit() const { + return Limit; + } + + ui64 GetAllocatedTotal() const { + TGuard<TMutex> lock(Mutex); + return Allocated; + } + + ui64 GetFreeTotal() const { + TGuard<TMutex> lock(Mutex); + return Limit > Allocated ? Limit - Allocated : 0; + } + + bool Allocate(const NDq::TTxId& txId, ui64 taskId, ui64 size) { + TGuard<TMutex> lock(Mutex); + if (Limit && Allocated + size > Limit) { + return false; + } + Allocated += size; + + auto& txMap = ResourceMap[txId]; + auto itt = txMap.find(taskId); + if (itt == txMap.end()) { + auto& info = txMap[taskId]; + info.TxId = txId; + info.TaskId = taskId; + info.Size = size; + } else { + itt->second.Size += size; + } + + Notify(); + return true; + } + + void Free(const NDq::TTxId& txId, ui64 taskId, ui64 size) { + TGuard<TMutex> lock(Mutex); + auto itx = ResourceMap.find(txId); + + if (Strict) { + Y_VERIFY(itx != ResourceMap.end()); + } else { + if(itx == ResourceMap.end()) { + return; + } + } + + auto itt = itx->second.find(taskId); + Y_VERIFY(itt != itx->second.end()); + + if (Strict) { + Y_VERIFY(itt->second.Size >= size); + } else { + if (size > itt->second.Size) { + size = itt->second.Size; + } + } + + Y_VERIFY(Allocated >= size); + + itt->second.Size -= size; + if (itt->second.Size == 0) { + itx->second.erase(itt); + if (itx->second.size() == 0) { + ResourceMap.erase(itx); + } + } + + Allocated -= size; + Notify(); + } + + ui64 GetAllocated(const NDq::TTxId& txId, ui64 taskId) const { + TGuard<TMutex> lock(Mutex); + auto itx = ResourceMap.find(txId); + if (itx != ResourceMap.end()) { + auto itt = itx->second.find(taskId); + if (itt != itx->second.end()) { + return itt->second.Size; + } + } + return 0; + } + + void Free(const NDq::TTxId& txId, ui64 taskId) { + TGuard<TMutex> lock(Mutex); + auto itx = ResourceMap.find(txId); + if (itx != ResourceMap.end()) { + auto itt = itx->second.find(taskId); + if (itt != itx->second.end()) { + Y_VERIFY(Allocated >= itt->second.Size); + Allocated -= itt->second.Size; + itx->second.erase(itt); + if (itx->second.size() == 0) { + ResourceMap.erase(itx); + } + Notify(); + } + } + } + + TNotifyCallback SetNotifier(const TNotifyCallback& newCb) { + TNotifyCallback result = std::move(Cb); + Cb = newCb; + return result; + } + +private: + ui64 Limit; + bool Strict; + ui64 Allocated = 0; + TMutex Mutex; + THashMap<NDq::TTxId, THashMap<ui64, TTransactionTaskInfo>> ResourceMap; + TNotifyCallback Cb; + + void Notify() const { + if (Cb) { + Cb(Limit, Allocated); + } + } +}; + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/common/ya.make b/ydb/library/yql/dq/common/ya.make index a47ae59c37..d79d3712fa 100644 --- a/ydb/library/yql/dq/common/ya.make +++ b/ydb/library/yql/dq/common/ya.make @@ -12,7 +12,7 @@ PEERDIR( SRCS( dq_common.cpp - dq_resource_quoter.h + dq_resource_quoter.h dq_value.cpp ) diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index e29e64ef2c..50ecf65fd2 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -148,6 +148,6 @@ message TDqTask { repeated TTaskInput Inputs = 5; repeated TTaskOutput Outputs = 6; google.protobuf.Any Meta = 7; - bool CreateSuspended = 8; + bool CreateSuspended = 8; optional TDqTransform OutputTransform = 12; } diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 10900352c3..ac39ee61fa 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -212,13 +212,13 @@ public: , CollectBasicStats(Settings.CollectBasicStats) , CollectProfileStats(Settings.CollectProfileStats) { - if (CollectBasicStats) { + if (CollectBasicStats) { Stats = std::make_unique<TDqTaskRunnerStats>(); - if (Y_UNLIKELY(CollectProfileStats)) { + if (Y_UNLIKELY(CollectProfileStats)) { Stats->ComputeCpuTimeByRun = NMonitoring::ExponentialHistogram(6, 10, 10); - } - } else { - YQL_ENSURE(!CollectProfileStats, "CollectProfileStats requires CollectBasicStats to be set as well"); + } + } else { + YQL_ENSURE(!CollectProfileStats, "CollectProfileStats requires CollectBasicStats to be set as well"); } if (!Context.Alloc) { @@ -521,7 +521,7 @@ public: Stats->RunStatusTimeMetrics.SetCurrentStatus(runStatus, RunComputeTime); } - if (Y_UNLIKELY(CollectProfileStats)) { + if (Y_UNLIKELY(CollectProfileStats)) { Stats->ComputeCpuTimeByRun->Collect(RunComputeTime.MilliSeconds()); if (ProgramParsed.StatsRegistry) { @@ -530,7 +530,7 @@ public: Stats->MkqlStats.emplace_back(TMkqlStat{key, value}); }); } - } + } if (runStatus == ERunStatus::Finished) { if (Stats) { @@ -719,7 +719,7 @@ private: bool CollectBasicStats = false; bool CollectProfileStats = false; std::unique_ptr<TDqTaskRunnerStats> Stats; - TDuration RunComputeTime; + TDuration RunComputeTime; private: // statistics support diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index a0cc816e91..cc61b9cb08 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -14,8 +14,8 @@ #include <ydb/library/yql/minikql/mkql_node.h> #include <ydb/library/yql/minikql/mkql_node_visitor.h> -#include <library/cpp/monlib/metrics/histogram_collector.h> - +#include <library/cpp/monlib/metrics/histogram_collector.h> + #include <util/generic/size_literals.h> #include <util/system/types.h> @@ -79,7 +79,7 @@ struct TDqTaskRunnerStats { TDuration WaitOutputTime; NMonitoring::IHistogramCollectorPtr ComputeCpuTimeByRun; // in millis - + THashMap<ui64, const TDqInputChannelStats*> InputChannels; // Channel id -> Channel stats THashMap<ui64, const TDqSourceStats*> Sources; // Input index -> Source stats THashMap<ui64, const TDqOutputChannelStats*> OutputChannels; // Channel id -> Channel stats diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp index a2ae923d16..923ad38522 100644 --- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp @@ -19,7 +19,7 @@ IActor* CreateComputeActor( NDq::TAllocateMemoryCallback allocateMemoryFn, NDq::TFreeMemoryCallback freeMemoryFn, const TActorId& executerId, - const TString& operationId, + const TString& operationId, NYql::NDqProto::TDqTask&& task, const TString& computeActorType, const NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory) @@ -27,17 +27,17 @@ IActor* CreateComputeActor( auto memoryLimits = NDq::TComputeMemoryLimits(); memoryLimits.ChannelBufferSize = 1000000; memoryLimits.ScanBufferSize = 16_MB; - // light == heavy since we allow extra allocation + // light == heavy since we allow extra allocation memoryLimits.MkqlLightProgramMemoryLimit = options.MkqlInitialMemoryLimit; memoryLimits.MkqlHeavyProgramMemoryLimit = options.MkqlInitialMemoryLimit; - memoryLimits.AllocateMemoryFn = allocateMemoryFn; - memoryLimits.FreeMemoryFn = freeMemoryFn; - // min alloc size == min free size to simplify api + memoryLimits.AllocateMemoryFn = allocateMemoryFn; + memoryLimits.FreeMemoryFn = freeMemoryFn; + // min alloc size == min free size to simplify api memoryLimits.MinMemAllocSize = options.MkqlMinAllocSize; memoryLimits.MinMemFreeSize = options.MkqlMinAllocSize; auto computeRuntimeSettings = NDq::TComputeRuntimeSettings(); - computeRuntimeSettings.ExtraMemoryAllocationPool = 3; + computeRuntimeSettings.ExtraMemoryAllocationPool = 3; computeRuntimeSettings.FailOnUndelivery = false; computeRuntimeSettings.StatsMode = NDqProto::DQ_STATS_MODE_PROFILE; diff --git a/ydb/library/yql/providers/dq/actors/events.cpp b/ydb/library/yql/providers/dq/actors/events.cpp index 892c5cccd7..12f55ce24b 100644 --- a/ydb/library/yql/providers/dq/actors/events.cpp +++ b/ydb/library/yql/providers/dq/actors/events.cpp @@ -1,14 +1,14 @@ #include "events.h" #include <ydb/library/yql/public/issue/yql_issue_message.h> - + namespace NYql::NDqs { TEvDqTask::TEvDqTask(NDqProto::TDqTask task) { *Record.MutableTask() = std::move(task); } - TEvDqFailure::TEvDqFailure(const TIssue& issue, bool retriable, bool needFallback) { - IssuesToMessage({issue}, Record.MutableIssues()); + TEvDqFailure::TEvDqFailure(const TIssue& issue, bool retriable, bool needFallback) { + IssuesToMessage({issue}, Record.MutableIssues()); Record.SetRetriable(retriable); Record.SetNeedFallback(needFallback); } @@ -17,14 +17,14 @@ namespace NYql::NDqs { Record = std::move(queryResult); } - TEvGraphRequest::TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId, NActors::TActorId checkPointCoordinatorId) + TEvGraphRequest::TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId, NActors::TActorId checkPointCoordinatorId) { *Record.MutableRequest() = request; - NActors::ActorIdToProto(controlId, Record.MutableControlId()); + NActors::ActorIdToProto(controlId, Record.MutableControlId()); NActors::ActorIdToProto(resultId, Record.MutableResultId()); - if (checkPointCoordinatorId) { - NActors::ActorIdToProto(checkPointCoordinatorId, Record.MutableCheckPointCoordinatorId()); - } + if (checkPointCoordinatorId) { + NActors::ActorIdToProto(checkPointCoordinatorId, Record.MutableCheckPointCoordinatorId()); + } } TEvReadyState::TEvReadyState(NActors::TActorId sourceId, TString type) { @@ -32,10 +32,10 @@ namespace NYql::NDqs { *Record.MutableResultType() = std::move(type); } - TEvReadyState::TEvReadyState(NDqProto::TReadyState&& proto) { - Record = std::move(proto); - } - + TEvReadyState::TEvReadyState(NDqProto::TReadyState&& proto) { + Record = std::move(proto); + } + TEvGraphExecutionEvent::TEvGraphExecutionEvent(NDqProto::TGraphExecutionEvent& evt) { Record = evt; } diff --git a/ydb/library/yql/providers/dq/actors/events.h b/ydb/library/yql/providers/dq/actors/events.h index 594921560a..7af23886bd 100644 --- a/ydb/library/yql/providers/dq/actors/events.h +++ b/ydb/library/yql/providers/dq/actors/events.h @@ -21,7 +21,7 @@ namespace NYql::NDqs { struct TEvDqFailure : NActors::TEventPB<TEvDqFailure, NDqProto::TDqFailure, TDqExecuterEvents::ES_DQ_FAILURE> { TEvDqFailure() = default; - explicit TEvDqFailure(const TIssue& issue, bool retriable = false, bool needFallback = false); + explicit TEvDqFailure(const TIssue& issue, bool retriable = false, bool needFallback = false); }; struct TEvQueryResponse @@ -32,13 +32,13 @@ namespace NYql::NDqs { struct TEvGraphRequest : NActors::TEventPB<TEvGraphRequest, NDqProto::TGraphRequest, TDqExecuterEvents::ES_GRAPH> { TEvGraphRequest() = default; - TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId, NActors::TActorId checkPointCoordinatorId = {}); + TEvGraphRequest(const Yql::DqsProto::ExecuteGraphRequest& request, NActors::TActorId controlId, NActors::TActorId resultId, NActors::TActorId checkPointCoordinatorId = {}); }; struct TEvReadyState : NActors::TEventPB<TEvReadyState, NDqProto::TReadyState, TDqExecuterEvents::ES_READY_TO_PULL> { TEvReadyState() = default; TEvReadyState(NActors::TActorId sourceId, TString type); - explicit TEvReadyState(NDqProto::TReadyState&& proto); + explicit TEvReadyState(NDqProto::TReadyState&& proto); }; struct TEvPullResult : NActors::TEventBase<TEvPullResult, TDqExecuterEvents::ES_PULL_RESULT> { @@ -74,7 +74,7 @@ namespace NYql::NDqs { : NActors::TEventPB<TEvPingResponse, NYql::NDqProto::TPingResponse, TDqDataEvents::ES_PING_RESPONSE> { TEvPingResponse() = default; }; - + struct TEvFullResultWriterStatusRequest : NActors::TEventPB<TEvFullResultWriterStatusRequest, NYql::NDqProto::TFullResultWriterStatusRequest, TDqDataEvents::ES_FULL_RESULT_WRITER_STATUS_REQUEST> { @@ -88,7 +88,7 @@ namespace NYql::NDqs { explicit TEvFullResultWriterStatusResponse(NDqProto::TFullResultWriterStatusResponse& data); }; - struct TEvGraphFinished : NActors::TEventBase<TEvGraphFinished, TDqExecuterEvents::ES_GRAPH_FINISHED> { - DEFINE_SIMPLE_NONLOCAL_EVENT(TEvGraphFinished, ""); - }; + struct TEvGraphFinished : NActors::TEventBase<TEvGraphFinished, TDqExecuterEvents::ES_GRAPH_FINISHED> { + DEFINE_SIMPLE_NONLOCAL_EVENT(TEvGraphFinished, ""); + }; } diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index 91f1caee74..d34a4a4479 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -38,16 +38,16 @@ public: static constexpr char ActorName[] = "YQL_DQ_EXECUTER"; TDqExecuter( - const NActors::TActorId& gwmActorId, - const NActors::TActorId& printerId, + const NActors::TActorId& gwmActorId, + const NActors::TActorId& printerId, const TString& traceId, const TString& username, const TDqConfiguration::TPtr& settings, const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, - TInstant requestStartTime, - bool createTaskSuspended) + TInstant requestStartTime, + bool createTaskSuspended) : TRichActor<TDqExecuter>(&TDqExecuter::Handler) - , GwmActorId(gwmActorId) - , PrinterId(printerId) + , GwmActorId(gwmActorId) + , PrinterId(printerId) , Settings(settings) , TraceId(traceId) , Username(username) @@ -58,7 +58,7 @@ public: , ExecutionHistogram(Counters->GetSubgroup("component", "ServiceProxyActorHistograms")->GetHistogram("ExecutionTime", ExponentialHistogram(10, 3, 1))) , AllocationHistogram(Counters->GetSubgroup("component", "ServiceProxyActorHistograms")->GetHistogram("WorkersAllocationTime", ExponentialHistogram(10, 2, 1))) , TasksHistogram(Counters->GetSubgroup("component", "ServiceProxyActorHistograms")->GetHistogram("TasksCount", ExponentialHistogram(10, 2, 1))) - , CreateTaskSuspended(createTaskSuspended) + , CreateTaskSuspended(createTaskSuspended) { } ~TDqExecuter() { @@ -75,10 +75,10 @@ private: HFunc(TEvGraphRequest, OnGraph); HFunc(TEvAllocateWorkersResponse, OnAllocateWorkersResponse); cFunc(TEvents::TEvPoison::EventType, PassAway); - hFunc(NActors::TEvents::TEvPoisonTaken, Handle); - HFunc(TEvDqFailure, OnFailure); - HFunc(TEvGraphFinished, OnGraphFinished); - HFunc(TEvQueryResponse, OnQueryResponse); + hFunc(NActors::TEvents::TEvPoisonTaken, Handle); + HFunc(TEvDqFailure, OnFailure); + HFunc(TEvGraphFinished, OnGraphFinished); + HFunc(TEvQueryResponse, OnQueryResponse); // execution timeout cFunc(TEvents::TEvBootstrap::EventType, [this]() { YQL_LOG_CTX_SCOPE(TraceId); @@ -87,7 +87,7 @@ private: issue.SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR); Issues.AddIssues({issue}); *ExecutionTimeoutCounter += 1; - Finish(/*retriable=*/ false, /*needFallback=*/ true); + Finish(/*retriable=*/ false, /*needFallback=*/ true); }) cFunc(TEvents::TEvWakeup::EventType, OnWakeup) }) @@ -129,12 +129,12 @@ private: void OnGraph(TEvGraphRequest::TPtr& ev, const NActors::TActorContext&) { YQL_LOG_CTX_SCOPE(TraceId); - Y_VERIFY(!ControlId); + Y_VERIFY(!ControlId); Y_VERIFY(!ResultId); YQL_LOG(DEBUG) << "TDqExecuter::OnGraph"; - ControlId = NActors::ActorIdFromProto(ev->Get()->Record.GetControlId()); + ControlId = NActors::ActorIdFromProto(ev->Get()->Record.GetControlId()); ResultId = NActors::ActorIdFromProto(ev->Get()->Record.GetResultId()); - CheckPointCoordinatorId = NActors::ActorIdFromProto(ev->Get()->Record.GetCheckPointCoordinatorId()); + CheckPointCoordinatorId = NActors::ActorIdFromProto(ev->Get()->Record.GetCheckPointCoordinatorId()); // These actors will be killed at exit. AddChild(ControlId); AddChild(ResultId); @@ -160,7 +160,7 @@ private: Settings->Save(taskMeta); task.MutableMeta()->PackFrom(taskMeta); - task.SetCreateSuspended(CreateTaskSuspended); + task.SetCreateSuspended(CreateTaskSuspended); tasks.emplace_back(task); } @@ -177,7 +177,7 @@ private: const TString computeActorType = Settings->ComputeActorType.Get().GetOrElse("old"); auto resourceAllocator = RegisterChild(CreateResourceAllocator( - GwmActorId, SelfId(), ControlId, workerCount, + GwmActorId, SelfId(), ControlId, workerCount, TraceId, Settings, Counters, enableComputeActor ? tasks : TVector<NYql::NDqProto::TDqTask>(), @@ -187,7 +187,7 @@ private: allocateRequest->Record.SetCreateComputeActor(enableComputeActor); allocateRequest->Record.SetComputeActorType(computeActorType); if (enableComputeActor) { - ActorIdToProto(ControlId, allocateRequest->Record.MutableResultActorId()); + ActorIdToProto(ControlId, allocateRequest->Record.MutableResultActorId()); } for (const auto& [_, f] : files) { *allocateRequest->Record.AddFiles() = f; @@ -217,7 +217,7 @@ private: StartCounter("AllocateWorkers"); TActivationContext::Send(new IEventHandle( - GwmActorId, + GwmActorId, resourceAllocator, allocateRequest.Release(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession)); @@ -236,67 +236,67 @@ private: } } - void Finish(bool retriable, bool needFallback = false) + void Finish(bool retriable, bool needFallback = false) { YQL_LOG(DEBUG) << __FUNCTION__ << ", retriable=" << retriable << ", needFallback=" << needFallback; - if (Finished) { - YQL_LOG(WARN) << "Re-Finish IGNORED with Retriable=" << retriable << ", NeedFallback=" << needFallback; - } else { - FlushCounter("ExecutionTime"); - TQueryResponse result; - IssuesToMessage(Issues, result.MutableIssues()); - result.SetRetriable(retriable); - result.SetNeedFallback(needFallback); - FlushCounters(result); - Send(ControlId, MakeHolder<TEvQueryResponse>(std::move(result))); - Finished = true; - } + if (Finished) { + YQL_LOG(WARN) << "Re-Finish IGNORED with Retriable=" << retriable << ", NeedFallback=" << needFallback; + } else { + FlushCounter("ExecutionTime"); + TQueryResponse result; + IssuesToMessage(Issues, result.MutableIssues()); + result.SetRetriable(retriable); + result.SetNeedFallback(needFallback); + FlushCounters(result); + Send(ControlId, MakeHolder<TEvQueryResponse>(std::move(result))); + Finished = true; + } } void OnFailure(TEvDqFailure::TPtr& ev, const NActors::TActorContext&) { - if (!Finished) { - YQL_LOG_CTX_SCOPE(TraceId); + if (!Finished) { + YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << __FUNCTION__; - AddCounters(ev->Get()->Record); - bool retriable = ev->Get()->Record.GetRetriable(); - bool fallback = ev->Get()->Record.GetNeedFallback(); - if (ev->Get()->Record.IssuesSize()) { - TIssues issues; - IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); - Issues.AddIssues(issues); - } + AddCounters(ev->Get()->Record); + bool retriable = ev->Get()->Record.GetRetriable(); + bool fallback = ev->Get()->Record.GetNeedFallback(); + if (ev->Get()->Record.IssuesSize()) { + TIssues issues; + IssuesFromMessage(ev->Get()->Record.GetIssues(), issues); + Issues.AddIssues(issues); + } Finish(retriable, fallback); } } - void OnGraphFinished(TEvGraphFinished::TPtr&, const NActors::TActorContext&) { + void OnGraphFinished(TEvGraphFinished::TPtr&, const NActors::TActorContext&) { YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << __FUNCTION__; - if (!Finished) { - try { - TFailureInjector::Reach("dq_fail_on_finish", [] { throw yexception() << "dq_fail_on_finish"; }); - Finish(false); - } catch (...) { - YQL_LOG(ERROR) << " FailureInjector " << CurrentExceptionMessage(); - Issues.AddIssue(TIssue("FailureInjection")); - Finish(true); - } + if (!Finished) { + try { + TFailureInjector::Reach("dq_fail_on_finish", [] { throw yexception() << "dq_fail_on_finish"; }); + Finish(false); + } catch (...) { + YQL_LOG(ERROR) << " FailureInjector " << CurrentExceptionMessage(); + Issues.AddIssue(TIssue("FailureInjection")); + Finish(true); + } } - } - - // TBD: wait for PoisonTaken from CheckPointCoordinator before send TEvQueryResponse to PrinterId - - void OnQueryResponse(TEvQueryResponse::TPtr& ev, const TActorContext&) { + } + + // TBD: wait for PoisonTaken from CheckPointCoordinator before send TEvQueryResponse to PrinterId + + void OnQueryResponse(TEvQueryResponse::TPtr& ev, const TActorContext&) { YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << __FUNCTION__; - Send(PrinterId, ev->Release().Release()); - PassAway(); - } - - void Handle(NActors::TEvents::TEvPoisonTaken::TPtr&) { - // ignore ack from checkpoint coordinator now - } - + Send(PrinterId, ev->Release().Release()); + PassAway(); + } + + void Handle(NActors::TEvents::TEvPoisonTaken::TPtr&) { + // ignore ack from checkpoint coordinator now + } + void OnAllocateWorkersResponse(TEvAllocateWorkersResponse::TPtr& ev, const NActors::TActorContext&) { YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << "TDqExecuter::TEvAllocateWorkersResponse"; @@ -313,7 +313,7 @@ private: << ev->Get()->Record.GetError().GetMessage() << ":" << static_cast<int>(ev->Get()->Record.GetError().GetErrorCode()); Issues.AddIssue(TIssue(ev->Get()->Record.GetError().GetMessage()).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_ERROR)); - Finish(/*retriable = */ true, /*fallback = */ true); + Finish(/*retriable = */ true, /*fallback = */ true); return; } case TAllocateWorkersResponse::kNodes: @@ -355,7 +355,7 @@ private: YQL_LOG(INFO) << workers.size() << " workers allocated"; YQL_ENSURE(workers.size() == tasks.size()); - + auto res = MakeHolder<TEvReadyState>(ExecutionPlanner->GetSourceID(), ExecutionPlanner->GetResultType()); if (Settings->EnableComputeActor.Get().GetOrElse(false) == false) { @@ -385,15 +385,15 @@ private: AllocationHistogram->Collect((ExecutionStart-StartTime).Seconds()); - auto readyState1 = res->Record; - auto readyState2 = res->Record; - Send(ControlId, res.Release()); - if (ResultId != SelfId()) { - Send(ResultId, new TEvReadyState(std::move(readyState1))); - } - if (CheckPointCoordinatorId) { - Send(CheckPointCoordinatorId, new TEvReadyState(std::move(readyState2))); - } + auto readyState1 = res->Record; + auto readyState2 = res->Record; + Send(ControlId, res.Release()); + if (ResultId != SelfId()) { + Send(ResultId, new TEvReadyState(std::move(readyState1))); + } + if (CheckPointCoordinatorId) { + Send(CheckPointCoordinatorId, new TEvReadyState(std::move(readyState2))); + } if (Timeout) { ExecutionTimeoutCookieHolder.Reset(ISchedulerCookie::Make2Way()); @@ -442,13 +442,13 @@ private: } } - NActors::TActorId GwmActorId; - NActors::TActorId PrinterId; + NActors::TActorId GwmActorId; + NActors::TActorId PrinterId; TDqConfiguration::TPtr Settings; - NActors::TActorId ControlId; + NActors::TActorId ControlId; NActors::TActorId ResultId; - NActors::TActorId CheckPointCoordinatorId; + NActors::TActorId CheckPointCoordinatorId; TExprNode::TPtr ExprRoot; THolder<IDqsExecutionPlanner> ExecutionPlanner; ui64 ResourceId = 0; @@ -472,20 +472,20 @@ private: THistogramPtr TasksHistogram; TIssues Issues; - bool CreateTaskSuspended; - bool Finished = false; + bool CreateTaskSuspended; + bool Finished = false; }; NActors::IActor* MakeDqExecuter( - const NActors::TActorId& gwmActorId, - const NActors::TActorId& printerId, + const NActors::TActorId& gwmActorId, + const NActors::TActorId& printerId, const TString& traceId, const TString& username, const TDqConfiguration::TPtr& settings, const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, - TInstant requestStartTime, - bool createTaskSuspended + TInstant requestStartTime, + bool createTaskSuspended ) { - return new TLogWrapReceive(new TDqExecuter(gwmActorId, printerId, traceId, username, settings, counters, requestStartTime, createTaskSuspended), traceId); + return new TLogWrapReceive(new TDqExecuter(gwmActorId, printerId, traceId, username, settings, counters, requestStartTime, createTaskSuspended), traceId); } } // namespace NDq diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.h b/ydb/library/yql/providers/dq/actors/executer_actor.h index fae7d1cdbf..a86a698858 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.h +++ b/ydb/library/yql/providers/dq/actors/executer_actor.h @@ -9,13 +9,13 @@ namespace NYql { namespace NDq { NActors::IActor* MakeDqExecuter( - const NActors::TActorId& gwmActorId, - const NActors::TActorId& printerId, + const NActors::TActorId& gwmActorId, + const NActors::TActorId& printerId, const TString& traceId, const TString& username, const TDqConfiguration::TPtr& settings, const TIntrusivePtr<NMonitoring::TDynamicCounters>& counters, - TInstant requestStartTime = TInstant::Now(), - bool createTaskSuspended = false + TInstant requestStartTime = TInstant::Now(), + bool createTaskSuspended = false ); } // namespace NDq diff --git a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp index 6acfb1277e..80a3c80fe3 100644 --- a/ydb/library/yql/providers/dq/actors/resource_allocator.cpp +++ b/ydb/library/yql/providers/dq/actors/resource_allocator.cpp @@ -44,7 +44,7 @@ public: TResourceAllocator( TActorId gwmActor, TActorId senderId, - TActorId controlId, + TActorId controlId, ui32 workerCount, const TString& traceId, const TDqConfiguration::TPtr settings, @@ -54,7 +54,7 @@ public: : TRichActor<TResourceAllocator>(&TResourceAllocator::Handle) , GwmActor(gwmActor) , SenderId(senderId) - , ControlId(controlId) + , ControlId(controlId) , RequestedCount(workerCount) , TraceId(traceId) , Settings(settings) @@ -243,7 +243,7 @@ private: if (!Tasks.empty()) { request->Record.SetCreateComputeActor(true); request->Record.SetComputeActorType(ComputeActorType); - ActorIdToProto(ControlId, request->Record.MutableResultActorId()); + ActorIdToProto(ControlId, request->Record.MutableResultActorId()); *request->Record.AddTask() = node.Task; } YQL_LOG(WARN) << "Send TEvAllocateWorkersRequest to " << NDqs::NExecutionHelpers::PrettyPrintWorkerInfo(node.WorkerInfo, 0); @@ -298,7 +298,7 @@ private: const TActorId GwmActor; const TActorId SenderId; - const TActorId ControlId; + const TActorId ControlId; THashMap<ui64, TRequestInfo> RequestedNodes; ui32 RequestedCount; @@ -329,7 +329,7 @@ private: NActors::IActor* CreateResourceAllocator( TActorId gwmActor, TActorId senderId, - TActorId controlId, + TActorId controlId, ui32 size, const TString& traceId, const TDqConfiguration::TPtr& settings, diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp index 59ae95c3af..6fd9480efa 100644 --- a/ydb/library/yql/providers/dq/actors/result_aggregator.cpp +++ b/ydb/library/yql/providers/dq/actors/result_aggregator.cpp @@ -1,5 +1,5 @@ #include "result_aggregator.h" -#include "result_receiver.h" +#include "result_receiver.h" #include "proto_builder.h" #include "full_result_writer.h" @@ -204,9 +204,9 @@ private: // finalization has been begun, actor will not kill himself anymore, should ignore responses instead return; } - + auto& response = ev->Get()->Record; - + AddCounters(response); switch (response.GetResponseType()) { @@ -409,7 +409,7 @@ private: } YQL_LOG(DEBUG) << "Waiting for " << BlockingActors.size() << " blocking actors"; - + QueryResponse.Reset(ev->Release().Release()); Become(&TResultAggregator::ShutdownHandler); Send(SelfId(), MakeHolder<TEvents::TEvGone>()); @@ -448,10 +448,10 @@ private: IssuesToMessage(FinishIssues, result.MutableIssues()); } result.SetTruncated(FinishTruncated); - + Send(ExecuterID, new TEvQueryResponse(std::move(result))); } - + const NActors::TActorId ExecuterID; NActors::TActorId SourceID; const NActors::TActorId GraphExecutionEventsId; @@ -478,7 +478,7 @@ private: ui64 FullResultSentBytes = 0; ui64 FullResultReceivedBytes = 0; ui64 FullResultSentDataParts = 0; - + TIssues FinishIssues; bool FinishTruncated = false; bool FinishCalled = false; diff --git a/ydb/library/yql/providers/dq/actors/result_aggregator.h b/ydb/library/yql/providers/dq/actors/result_aggregator.h index 450bc2de88..1183e8eed6 100644 --- a/ydb/library/yql/providers/dq/actors/result_aggregator.h +++ b/ydb/library/yql/providers/dq/actors/result_aggregator.h @@ -9,9 +9,9 @@ namespace NYql::NDqs::NExecutionHelpers { THolder<NActors::IActor> MakeResultAggregator( - const TVector<TString>& columns, - const NActors::TActorId& executerId, - const TString& traceId, + const TVector<TString>& columns, + const NActors::TActorId& executerId, + const TString& traceId, const THashMap<TString, TString>& secureParams, const TDqConfiguration::TPtr& settings, const TString& resultType, diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.cpp b/ydb/library/yql/providers/dq/actors/result_receiver.cpp index 7fdb73f257..8f0e61354b 100644 --- a/ydb/library/yql/providers/dq/actors/result_receiver.cpp +++ b/ydb/library/yql/providers/dq/actors/result_receiver.cpp @@ -1,6 +1,6 @@ -#include "result_receiver.h" +#include "result_receiver.h" #include "proto_builder.h" - + #include <ydb/library/yql/providers/dq/actors/execution_helpers.h> #include <ydb/library/yql/providers/dq/actors/events.h> @@ -41,7 +41,7 @@ public: explicit TResultReceiver(const TVector<TString>& columns, const NActors::TActorId& executerId, const TString& traceId, const TDqConfiguration::TPtr& settings, const THashMap<TString, TString>& secureParams, const TString& resultType, bool discard) : TRichActor<TResultReceiver>(&TResultReceiver::Handler) - , ExecuterId(executerId) + , ExecuterId(executerId) , TraceId(traceId) , Settings(settings) , SecureParams(std::move(secureParams)) @@ -110,60 +110,60 @@ private: YQL_LOG(DEBUG) << "Finished: " << ev->Get()->Record.GetChannelData().GetFinished(); } - void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) { - // do nothing + void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) { + // do nothing } - void OnQueryResult(TEvQueryResponse::TPtr& ev, const TActorContext&) { - NDqProto::TQueryResponse result(ev->Get()->Record); - YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty()); - + void OnQueryResult(TEvQueryResponse::TPtr& ev, const TActorContext&) { + NDqProto::TQueryResponse result(ev->Get()->Record); + YQL_ENSURE(!result.HasResultSet() && result.GetYson().empty()); + if (ResultBuilder) { - try { + try { TString yson = Discard ? "" : ResultBuilder->BuildYson( DataParts, Settings && Settings->_AllResultsBytesLimit.Get() ? *Settings->_AllResultsBytesLimit.Get() : 64000000 /* grpc limit*/); *result.MutableYson() = yson; - } catch (...) { + } catch (...) { Issues.AddIssue(TIssue(CurrentExceptionMessage()).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)); result.SetNeedFallback(true); } } else { if (Rows > 0) { - Issues.AddIssue(TIssue("Non empty rows: " + ToString(Rows)).SetCode(0, TSeverityIds::S_WARNING)); + Issues.AddIssue(TIssue("Non empty rows: " + ToString(Rows)).SetCode(0, TSeverityIds::S_WARNING)); } } - - if (!Issues.Empty()) { - IssuesToMessage(Issues, result.MutableIssues()); + + if (!Issues.Empty()) { + IssuesToMessage(Issues, result.MutableIssues()); } - result.SetTruncated(Truncated); - Send(ExecuterId, new TEvQueryResponse(std::move(result))); + result.SetTruncated(Truncated); + Send(ExecuterId, new TEvQueryResponse(std::move(result))); } void OnError(const TString& message, bool retriable, bool needFallback) { YQL_LOG_CTX_SCOPE(TraceId); YQL_LOG(DEBUG) << "OnError " << message; - auto req = MakeHolder<TEvDqFailure>(TIssue(message).SetCode(-1, TSeverityIds::S_ERROR), retriable, needFallback); - Send(ExecuterId, req.Release()); + auto req = MakeHolder<TEvDqFailure>(TIssue(message).SetCode(-1, TSeverityIds::S_ERROR), retriable, needFallback); + Send(ExecuterId, req.Release()); Finished = true; } - void Finish(bool truncated = false) { - Send(ExecuterId, new TEvGraphFinished()); + void Finish(bool truncated = false) { + Send(ExecuterId, new TEvGraphFinished()); Finished = true; - Truncated = truncated; + Truncated = truncated; } - const NActors::TActorId ExecuterId; + const NActors::TActorId ExecuterId; TVector<NDqProto::TData> DataParts; const TString TraceId; TDqConfiguration::TPtr Settings; bool Finished = false; - bool Truncated = false; - TIssues Issues; + bool Truncated = false; + TIssues Issues; ui64 Size = 0; ui64 Rows = 0; // const Yql::DqsProto::TFullResultTable FullResultTable; diff --git a/ydb/library/yql/providers/dq/actors/result_receiver.h b/ydb/library/yql/providers/dq/actors/result_receiver.h index 968bcede10..62fccf8aa1 100644 --- a/ydb/library/yql/providers/dq/actors/result_receiver.h +++ b/ydb/library/yql/providers/dq/actors/result_receiver.h @@ -1,15 +1,15 @@ -#pragma once - +#pragma once + #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> #include <ydb/library/yql/providers/dq/api/protos/service.pb.h> - -#include <library/cpp/actors/core/actor.h> - -namespace NYql { - + +#include <library/cpp/actors/core/actor.h> + +namespace NYql { + THolder<NActors::IActor> MakeResultReceiver( const TVector<TString>& columns, - const NActors::TActorId& executerId, + const NActors::TActorId& executerId, const TString& traceId, const TDqConfiguration::TPtr& settings, // const Yql::DqsProto::TFullResultTable& resultTable, @@ -17,5 +17,5 @@ THolder<NActors::IActor> MakeResultReceiver( const TString& resultBuilder, bool discard ); - -} // namespace NYql + +} // namespace NYql diff --git a/ydb/library/yql/providers/dq/actors/task_controller.cpp b/ydb/library/yql/providers/dq/actors/task_controller.cpp index 238828e453..490de7a630 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller.cpp +++ b/ydb/library/yql/providers/dq/actors/task_controller.cpp @@ -1,4 +1,4 @@ -#include "task_controller.h" +#include "task_controller.h" #include "execution_helpers.h" #include "events.h" #include "proto_builder.h" @@ -6,214 +6,214 @@ #include "executer_actor.h" #include <ydb/library/yql/providers/dq/counters/counters.h> - + #include <ydb/library/yql/providers/dq/common/yql_dq_common.h> - + #include <ydb/library/yql/public/issue/yql_issue_message.h> - + #include <ydb/library/yql/utils/actor_log/log.h> #include <ydb/library/yql/utils/log/log.h> - + #include <ydb/public/lib/yson_value/ydb_yson_value.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/core/kqp/kqp.h> - -#include <library/cpp/actors/core/actorsystem.h> -#include <library/cpp/actors/core/event_pb.h> -#include <library/cpp/actors/core/executor_pool_basic.h> -#include <library/cpp/actors/core/hfunc.h> -#include <library/cpp/actors/core/scheduler_basic.h> -#include <library/cpp/threading/future/future.h> -#include <library/cpp/protobuf/util/pb_io.h> - -#include <util/generic/size_literals.h> -#include <util/generic/ptr.h> -#include <util/string/split.h> -#include <util/system/types.h> - -namespace NYql { - -using namespace NActors; -using namespace NDqs; - -namespace { - -class TTaskController: public TRichActor<TTaskController> { -public: - static constexpr ui64 PING_TIMER_TAG = 1; - static constexpr ui64 AGGR_TIMER_TAG = 2; - + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/event_pb.h> +#include <library/cpp/actors/core/executor_pool_basic.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/scheduler_basic.h> +#include <library/cpp/threading/future/future.h> +#include <library/cpp/protobuf/util/pb_io.h> + +#include <util/generic/size_literals.h> +#include <util/generic/ptr.h> +#include <util/string/split.h> +#include <util/system/types.h> + +namespace NYql { + +using namespace NActors; +using namespace NDqs; + +namespace { + +class TTaskController: public TRichActor<TTaskController> { +public: + static constexpr ui64 PING_TIMER_TAG = 1; + static constexpr ui64 AGGR_TIMER_TAG = 2; + static constexpr char ActorName[] = "YQL_DQ_TASK_CONTROLLER"; - - explicit TTaskController( - const TString& traceId, - const NActors::TActorId& executerId, - const NActors::TActorId& resultId, - const TDqConfiguration::TPtr& settings, - const NYq::NCommon::TServiceCounters& serviceCounters, - const TDuration& pingPeriod, - const TDuration& aggrPeriod - ) - : TRichActor<TTaskController>(&TTaskController::Handler) - , ExecuterId(executerId) - , ResultId(resultId) - , TraceId(traceId) - , Settings(settings) - , ServiceCounters(serviceCounters, "task_controller") - , PingPeriod(pingPeriod) - , AggrPeriod(aggrPeriod) - { - if (Settings) { - if (Settings->_AllResultsBytesLimit.Get()) { - YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << *Settings->_AllResultsBytesLimit.Get(); - } - if (Settings->_RowsLimitPerWrite.Get()) { - YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get(); - } - } - } - - ~TTaskController() override { + + explicit TTaskController( + const TString& traceId, + const NActors::TActorId& executerId, + const NActors::TActorId& resultId, + const TDqConfiguration::TPtr& settings, + const NYq::NCommon::TServiceCounters& serviceCounters, + const TDuration& pingPeriod, + const TDuration& aggrPeriod + ) + : TRichActor<TTaskController>(&TTaskController::Handler) + , ExecuterId(executerId) + , ResultId(resultId) + , TraceId(traceId) + , Settings(settings) + , ServiceCounters(serviceCounters, "task_controller") + , PingPeriod(pingPeriod) + , AggrPeriod(aggrPeriod) + { + if (Settings) { + if (Settings->_AllResultsBytesLimit.Get()) { + YQL_LOG(DEBUG) << "_AllResultsBytesLimit = " << *Settings->_AllResultsBytesLimit.Get(); + } + if (Settings->_RowsLimitPerWrite.Get()) { + YQL_LOG(DEBUG) << "_RowsLimitPerWrite = " << *Settings->_RowsLimitPerWrite.Get(); + } + } + } + + ~TTaskController() override { SetTaskCountMetric(0); - } - -private: - STRICT_STFUNC(Handler, { - hFunc(TEvReadyState, OnReadyState); - hFunc(TEvQueryResponse, OnQueryResult); - hFunc(TEvDqFailure, OnResultFailure); - hFunc(NDq::TEvDqCompute::TEvState, OnComputeActorState); - hFunc(NDq::TEvDq::TEvAbortExecution, OnAbortExecution); - cFunc(TEvents::TEvPoison::EventType, PassAway); - hFunc(TEvents::TEvUndelivered, OnUndelivered); - hFunc(TEvents::TEvWakeup, OnWakeup); - }) - - void OnUndelivered(TEvents::TEvUndelivered::TPtr& ev) { - auto it = TaskIds.find(ev->Sender); - if (it != TaskIds.end() && FinishedTasks.contains(it->second)) { - // ignore undelivered from finished CAs - return; - } - - TString message = "Undelivered Event " + ToString(ev->Get()->SourceType) - + " from " + ToString(SelfId()) + " (Self) to " + ToString(ev->Sender) + - + " Reason: " + ToString(ev->Get()->Reason) + " Cookie: " + ToString(ev->Cookie); - OnError(message, true, true); - } - - void OnAbortExecution(NDq::TEvDq::TEvAbortExecution::TPtr& ev) { - YQL_LOG_CTX_SCOPE(TraceId); - auto ydbStatusId = ev->Get()->Record.GetStatusCode(); - auto message = ev->Get()->Record.GetMessage(); - YQL_LOG(DEBUG) << "AbortExecution from " << ev->Sender << ":" << ydbStatusId << " " << message; - OnError(message, ydbStatusId == Ydb::StatusIds::UNAVAILABLE, false); // TODO: check fallback - } - - void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) { - TActorId computeActor = ev->Sender; - auto& state = ev->Get()->Record; - ui64 taskId = state.GetTaskId(); - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) - << SelfId() - << " TaskId: " << taskId - << " State: " << ev->Get()->Record.GetState() - << " PingCookie: " << ev->Cookie; - - if (state.HasStats() && state.GetStats().GetTasks().size()) { - YQL_LOG(DEBUG) << " " << SelfId() << " AddStats " << taskId; + } + +private: + STRICT_STFUNC(Handler, { + hFunc(TEvReadyState, OnReadyState); + hFunc(TEvQueryResponse, OnQueryResult); + hFunc(TEvDqFailure, OnResultFailure); + hFunc(NDq::TEvDqCompute::TEvState, OnComputeActorState); + hFunc(NDq::TEvDq::TEvAbortExecution, OnAbortExecution); + cFunc(TEvents::TEvPoison::EventType, PassAway); + hFunc(TEvents::TEvUndelivered, OnUndelivered); + hFunc(TEvents::TEvWakeup, OnWakeup); + }) + + void OnUndelivered(TEvents::TEvUndelivered::TPtr& ev) { + auto it = TaskIds.find(ev->Sender); + if (it != TaskIds.end() && FinishedTasks.contains(it->second)) { + // ignore undelivered from finished CAs + return; + } + + TString message = "Undelivered Event " + ToString(ev->Get()->SourceType) + + " from " + ToString(SelfId()) + " (Self) to " + ToString(ev->Sender) + + + " Reason: " + ToString(ev->Get()->Reason) + " Cookie: " + ToString(ev->Cookie); + OnError(message, true, true); + } + + void OnAbortExecution(NDq::TEvDq::TEvAbortExecution::TPtr& ev) { + YQL_LOG_CTX_SCOPE(TraceId); + auto ydbStatusId = ev->Get()->Record.GetStatusCode(); + auto message = ev->Get()->Record.GetMessage(); + YQL_LOG(DEBUG) << "AbortExecution from " << ev->Sender << ":" << ydbStatusId << " " << message; + OnError(message, ydbStatusId == Ydb::StatusIds::UNAVAILABLE, false); // TODO: check fallback + } + + void OnComputeActorState(NDq::TEvDqCompute::TEvState::TPtr& ev) { + TActorId computeActor = ev->Sender; + auto& state = ev->Get()->Record; + ui64 taskId = state.GetTaskId(); + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) + << SelfId() + << " TaskId: " << taskId + << " State: " << ev->Get()->Record.GetState() + << " PingCookie: " << ev->Cookie; + + if (state.HasStats() && state.GetStats().GetTasks().size()) { + YQL_LOG(DEBUG) << " " << SelfId() << " AddStats " << taskId; AddStats(state.GetStats()); - if (ServiceCounters.Counters && !AggrPeriod) { - ExportStats(TaskStat, taskId); - } - } - - switch (ev->Get()->Record.GetState()) { + if (ServiceCounters.Counters && !AggrPeriod) { + ExportStats(TaskStat, taskId); + } + } + + switch (ev->Get()->Record.GetState()) { case NDqProto::COMPUTE_STATE_UNKNOWN: { - // TODO: use issues - TString message = "unexpected state from " + ToString(computeActor) + ", task: " + ToString(taskId); - OnError(message, false, false); - break; - } + // TODO: use issues + TString message = "unexpected state from " + ToString(computeActor) + ", task: " + ToString(taskId); + OnError(message, false, false); + break; + } case NDqProto::COMPUTE_STATE_FAILURE: { - // TODO: don't convert issues to string + // TODO: don't convert issues to string NYql::IssuesFromMessage(state.GetIssues(), Issues); OnError(Issues.ToString(), false, false); - break; - } + break; + } case NDqProto::COMPUTE_STATE_EXECUTING: { - YQL_LOG(DEBUG) << " " << SelfId() << " Executing TaskId: " << taskId; - if (!FinishedTasks.contains(taskId)) { - // may get late/reordered? message - Executing[taskId] = Now(); - } - break; - } + YQL_LOG(DEBUG) << " " << SelfId() << " Executing TaskId: " << taskId; + if (!FinishedTasks.contains(taskId)) { + // may get late/reordered? message + Executing[taskId] = Now(); + } + break; + } case NDqProto::COMPUTE_STATE_FINISHED: { - YQL_LOG(DEBUG) << " " << SelfId() << " Finish TaskId: " << taskId; - Executing.erase(taskId); - FinishedTasks.insert(taskId); - break; - } - } - - MaybeUpdateChannels(); - MaybeFinish(); - } - - void OnWakeup(TEvents::TEvWakeup::TPtr& ev) { - switch (ev->Get()->Tag) { - case PING_TIMER_TAG: - if (PingPeriod) { - auto now = Now(); - for (auto& taskActors: Executing) { - if (now > taskActors.second + PingPeriod) { - PingCookie++; - YQL_LOG(DEBUG) << " Ping TaskId: " << taskActors.first << ", Compute ActorId: " << ActorIds[taskActors.first] << ", PingCookie: " << PingCookie; - Send(ActorIds[taskActors.first], new NDq::TEvDqCompute::TEvStateRequest(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered, PingCookie); - taskActors.second = now; - } - } - Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup(PING_TIMER_TAG)); - } - break; - case AGGR_TIMER_TAG: - if (AggrPeriod) { - if (ServiceCounters.Counters) { + YQL_LOG(DEBUG) << " " << SelfId() << " Finish TaskId: " << taskId; + Executing.erase(taskId); + FinishedTasks.insert(taskId); + break; + } + } + + MaybeUpdateChannels(); + MaybeFinish(); + } + + void OnWakeup(TEvents::TEvWakeup::TPtr& ev) { + switch (ev->Get()->Tag) { + case PING_TIMER_TAG: + if (PingPeriod) { + auto now = Now(); + for (auto& taskActors: Executing) { + if (now > taskActors.second + PingPeriod) { + PingCookie++; + YQL_LOG(DEBUG) << " Ping TaskId: " << taskActors.first << ", Compute ActorId: " << ActorIds[taskActors.first] << ", PingCookie: " << PingCookie; + Send(ActorIds[taskActors.first], new NDq::TEvDqCompute::TEvStateRequest(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagGenerateUnsureUndelivered, PingCookie); + taskActors.second = now; + } + } + Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup(PING_TIMER_TAG)); + } + break; + case AGGR_TIMER_TAG: + if (AggrPeriod) { + if (ServiceCounters.Counters) { ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0); - } - Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG)); - } - break; - } - }; - - NMonitoring::TDynamicCounterPtr GroupForExport(const TCounters& stat, const TString& counterName, ui64 taskId, TString& name, std::map<TString, TString>& labels) { + } + Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG)); + } + break; + } + }; + + NMonitoring::TDynamicCounterPtr GroupForExport(const TCounters& stat, const TString& counterName, ui64 taskId, TString& name, std::map<TString, TString>& labels) { Y_UNUSED(stat); - TString prefix; + TString prefix; if (NCommon::ParseCounterName(&prefix, &labels, &name, counterName)) { - if (prefix == "TaskRunner" && (taskId == 0 || labels["Task"] == ToString(taskId))) { - auto group = (taskId == 0) ? ServiceCounters.Counters : ServiceCounters.Counters->GetSubgroup("Stage", ToString(Stages[taskId])); - for (const auto& [k, v] : labels) { - group = group->GetSubgroup(k, v); - } - return group; - } - } - return nullptr; - } - - void ExportStats(const TCounters& stat, ui64 taskId) { - YQL_LOG(DEBUG) << " " << SelfId() << " ExportStats " << (taskId ? ToString(taskId) : "Summary"); - TString name; - std::map<TString, TString> labels; - for (const auto& [k, v] : stat.Get()) { - labels.clear(); - if (auto group = GroupForExport(stat, k, taskId, name, labels)) { - *group->GetCounter(name) = v.Count; - if (ServiceCounters.PublicCounters && taskId == 0) { - TString publicCounterName; + if (prefix == "TaskRunner" && (taskId == 0 || labels["Task"] == ToString(taskId))) { + auto group = (taskId == 0) ? ServiceCounters.Counters : ServiceCounters.Counters->GetSubgroup("Stage", ToString(Stages[taskId])); + for (const auto& [k, v] : labels) { + group = group->GetSubgroup(k, v); + } + return group; + } + } + return nullptr; + } + + void ExportStats(const TCounters& stat, ui64 taskId) { + YQL_LOG(DEBUG) << " " << SelfId() << " ExportStats " << (taskId ? ToString(taskId) : "Summary"); + TString name; + std::map<TString, TString> labels; + for (const auto& [k, v] : stat.Get()) { + labels.clear(); + if (auto group = GroupForExport(stat, k, taskId, name, labels)) { + *group->GetCounter(name) = v.Count; + if (ServiceCounters.PublicCounters && taskId == 0) { + TString publicCounterName; bool isDeriv = false; if (name == "MkqlMaxMemoryUsage") { publicCounterName = "query.memory_usage_bytes"; @@ -224,51 +224,51 @@ private: if (labels.find("Source") != labels.end()) publicCounterName = "query.input_bytes"; else if (labels.find("Sink") != labels.end()) publicCounterName = "query.output_bytes"; isDeriv = true; - } - if (publicCounterName) { + } + if (publicCounterName) { *ServiceCounters.PublicCounters->GetNamedCounter("name", publicCounterName, isDeriv) = v.Count; - } - } - } - } - for (const auto& [k, v] : stat.GetHistograms()) { - labels.clear(); - if (auto group = GroupForExport(stat, k, taskId, name, labels)) { - auto hist = group->GetHistogram(name, NMonitoring::ExponentialHistogram(6, 10, 10)); - hist->Reset(); - for (const auto& [bound, value] : v) { - hist->Collect(bound, value); - } - } - } - } - + } + } + } + } + for (const auto& [k, v] : stat.GetHistograms()) { + labels.clear(); + if (auto group = GroupForExport(stat, k, taskId, name, labels)) { + auto hist = group->GetHistogram(name, NMonitoring::ExponentialHistogram(6, 10, 10)); + hist->Reset(); + for (const auto& [bound, value] : v) { + hist->Collect(bound, value); + } + } + } + } + void AddStats(const NDqProto::TDqComputeActorStats& x) { YQL_ENSURE(x.GetTasks().size() == 1); auto& s = x.GetTasks(0); ui64 taskId = s.GetTaskId(); -#define ADD_COUNTER(name) \ - if (stats.Get ## name()) { \ - TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, #name), stats.Get ## name ()); \ - } - - std::map<TString, TString> labels = { - {"Task", ToString(taskId)} - }; - - auto& stats = s; - // basic stats +#define ADD_COUNTER(name) \ + if (stats.Get ## name()) { \ + TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, #name), stats.Get ## name ()); \ + } + + std::map<TString, TString> labels = { + {"Task", ToString(taskId)} + }; + + auto& stats = s; + // basic stats ADD_COUNTER(ComputeCpuTimeUs) ADD_COUNTER(PendingInputTimeUs) ADD_COUNTER(PendingOutputTimeUs) ADD_COUNTER(FinishTimeUs) - - // profile stats + + // profile stats ADD_COUNTER(BuildCpuTimeUs) ADD_COUNTER(WaitTimeUs) ADD_COUNTER(WaitOutputTimeUs) - + if (auto v = x.GetMkqlMaxMemoryUsage()) { TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "MkqlMaxMemoryUsage"), v); } @@ -278,13 +278,13 @@ private: } if (stats.ComputeCpuTimeByRunSize()) { - auto& hist = TaskStat.GetHistogram(TaskStat.GetCounterName("TaskRunner", labels, "ComputeTimeByRunMs")); + auto& hist = TaskStat.GetHistogram(TaskStat.GetCounterName("TaskRunner", labels, "ComputeTimeByRunMs")); for (const auto& bucket : s.GetComputeCpuTimeByRun()) { - hist[bucket.GetBound()] = bucket.GetValue(); - } - } - - // compilation stats + hist[bucket.GetBound()] = bucket.GetValue(); + } + } + + // compilation stats // ADD_COUNTER(MkqlTotalNodes) // ADD_COUNTER(MkqlCodegenFunctions) // ADD_COUNTER(CodeGenTotalInstructions) @@ -293,53 +293,53 @@ private: // ADD_COUNTER(CodeGenFullTime) // ADD_COUNTER(CodeGenFinalizeTime) // ADD_COUNTER(CodeGenModulePassTime) - + // if (stats.GetFinishTs() >= stats.GetStartTs()) { // TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); // } - - for (const auto& stats : s.GetInputChannels()) { - std::map<TString, TString> labels = { - {"Task", ToString(taskId)}, - {"InputChannel", ToString(stats.GetChannelId())} - }; - - ADD_COUNTER(Chunks); - ADD_COUNTER(Bytes); - ADD_COUNTER(RowsIn); - ADD_COUNTER(RowsOut); - ADD_COUNTER(MaxMemoryUsage); + + for (const auto& stats : s.GetInputChannels()) { + std::map<TString, TString> labels = { + {"Task", ToString(taskId)}, + {"InputChannel", ToString(stats.GetChannelId())} + }; + + ADD_COUNTER(Chunks); + ADD_COUNTER(Bytes); + ADD_COUNTER(RowsIn); + ADD_COUNTER(RowsOut); + ADD_COUNTER(MaxMemoryUsage); ADD_COUNTER(DeserializationTimeUs); - + // if (stats.GetFinishTs() >= stats.GetStartTs()) { // TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); // } - } - - for (const auto& stats : s.GetOutputChannels()) { - std::map<TString, TString> labels = { - {"Task", ToString(taskId)}, - {"OutputChannel", ToString(stats.GetChannelId())} - }; - - ADD_COUNTER(Chunks) - ADD_COUNTER(Bytes); - ADD_COUNTER(RowsIn); - ADD_COUNTER(RowsOut); - ADD_COUNTER(MaxMemoryUsage); - + } + + for (const auto& stats : s.GetOutputChannels()) { + std::map<TString, TString> labels = { + {"Task", ToString(taskId)}, + {"OutputChannel", ToString(stats.GetChannelId())} + }; + + ADD_COUNTER(Chunks) + ADD_COUNTER(Bytes); + ADD_COUNTER(RowsIn); + ADD_COUNTER(RowsOut); + ADD_COUNTER(MaxMemoryUsage); + ADD_COUNTER(SerializationTimeUs); ADD_COUNTER(BlockedByCapacity); - - ADD_COUNTER(SpilledBytes); - ADD_COUNTER(SpilledRows); - ADD_COUNTER(SpilledBlobs); - + + ADD_COUNTER(SpilledBytes); + ADD_COUNTER(SpilledRows); + ADD_COUNTER(SpilledBlobs); + // if (stats.GetFinishTs() >= stats.GetStartTs()) { // TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Total"), stats.GetFinishTs() - stats.GetStartTs()); // } - } - + } + for (const auto& stats : s.GetSources()) { std::map<TString, TString> labels = { {"Task", ToString(taskId)}, @@ -378,15 +378,15 @@ private: // } } -#undef ADD_COUNTER - } - - void MaybeFinish() { - if (!Finished && !Tasks.empty() && FinishedTasks.size() == Tasks.size()) { - Finish(); - } - } - +#undef ADD_COUNTER + } + + void MaybeFinish() { + if (!Finished && !Tasks.empty() && FinishedTasks.size() == Tasks.size()) { + Finish(); + } + } + void SetTaskCountMetric(ui64 count) { if (!ServiceCounters.Counters) { return; @@ -399,47 +399,47 @@ private: *ServiceCounters.PublicCounters->GetNamedCounter("name", "query.running_tasks") = count; } - void OnReadyState(TEvReadyState::TPtr& ev) { - YQL_LOG_CTX_SCOPE(TraceId); - - TaskStat.AddCounters(ev->Get()->Record); - - const auto& tasks = ev->Get()->Record.GetTask(); - const auto& actorIds = ev->Get()->Record.GetActorId(); - Y_VERIFY(tasks.size() == actorIds.size()); - + void OnReadyState(TEvReadyState::TPtr& ev) { + YQL_LOG_CTX_SCOPE(TraceId); + + TaskStat.AddCounters(ev->Get()->Record); + + const auto& tasks = ev->Get()->Record.GetTask(); + const auto& actorIds = ev->Get()->Record.GetActorId(); + Y_VERIFY(tasks.size() == actorIds.size()); + SetTaskCountMetric(tasks.size()); - - for (int i = 0; i < static_cast<int>(tasks.size()); ++i) { - auto actorId = ActorIdFromProto(actorIds[i]); - auto& task = tasks[i]; - Tasks.emplace_back(task, actorId); - ActorIds.emplace(task.GetId(), actorId); - TaskIds.emplace(actorId, task.GetId()); - Yql::DqsProto::TTaskMeta taskMeta; - task.GetMeta().UnpackTo(&taskMeta); - Stages.emplace(task.GetId(), taskMeta.GetStageId()); - } - - YQL_LOG(DEBUG) << "Ready State: X1=" << SelfId().RawX1() << ", X2=" << SelfId().RawX2(); - - MaybeUpdateChannels(); - - if (PingPeriod) { - Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup(PING_TIMER_TAG)); - } - if (AggrPeriod) { - Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG)); - } - } - - void MaybeUpdateChannels() { - if (Tasks.empty() || ChannelsUpdated || Tasks.size() != Executing.size()) { - return; - } - - YQL_LOG(DEBUG) << "Update channels"; - for (const auto& [task, actorId] : Tasks) { + + for (int i = 0; i < static_cast<int>(tasks.size()); ++i) { + auto actorId = ActorIdFromProto(actorIds[i]); + auto& task = tasks[i]; + Tasks.emplace_back(task, actorId); + ActorIds.emplace(task.GetId(), actorId); + TaskIds.emplace(actorId, task.GetId()); + Yql::DqsProto::TTaskMeta taskMeta; + task.GetMeta().UnpackTo(&taskMeta); + Stages.emplace(task.GetId(), taskMeta.GetStageId()); + } + + YQL_LOG(DEBUG) << "Ready State: X1=" << SelfId().RawX1() << ", X2=" << SelfId().RawX2(); + + MaybeUpdateChannels(); + + if (PingPeriod) { + Schedule(TDuration::MilliSeconds(100), new TEvents::TEvWakeup(PING_TIMER_TAG)); + } + if (AggrPeriod) { + Schedule(AggrPeriod, new TEvents::TEvWakeup(AGGR_TIMER_TAG)); + } + } + + void MaybeUpdateChannels() { + if (Tasks.empty() || ChannelsUpdated || Tasks.size() != Executing.size()) { + return; + } + + YQL_LOG(DEBUG) << "Update channels"; + for (const auto& [task, actorId] : Tasks) { auto ev = MakeHolder<NDq::TEvDqCompute::TEvChannelsInfo>(); for (const auto& input : task.GetInputs()) { @@ -448,101 +448,101 @@ private: } } - for (const auto& output : task.GetOutputs()) { - for (const auto& channel : output.GetChannels()) { - *ev->Record.AddUpdate() = channel; - } - } - - YQL_LOG(DEBUG) << task.GetId() << " " << ev->Record.ShortDebugString(); - - Send(actorId, ev.Release()); - } - ChannelsUpdated = true; - } - - void OnResultFailure(TEvDqFailure::TPtr& ev) { - if (Finished) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(WARN) << "TEvDqFailure IGNORED when Finished from " << ev->Sender; - } else { - FinalStat().FlushCounters(ev->Get()->Record); // histograms will NOT be reported - Send(ExecuterId, ev->Release().Release()); - Finished = true; - } - } - - void OnError(const TString& message, bool retriable, bool needFallback) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(DEBUG) << "OnError " << message; - if (Finished) { - YQL_LOG_CTX_SCOPE(TraceId); - YQL_LOG(WARN) << "OnError IGNORED when Finished, Retriable=" << retriable << ", NeedFallback=" << needFallback; - } else { - auto issueCode = needFallback - ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR - : TIssuesIds::DQ_GATEWAY_ERROR; - auto req = MakeHolder<TEvDqFailure>(TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR), retriable, needFallback); - FinalStat().FlushCounters(req->Record); - Send(ExecuterId, req.Release()); - Finished = true; - } - } - - void Finish() { - if (ServiceCounters.Counters && AggrPeriod) { + for (const auto& output : task.GetOutputs()) { + for (const auto& channel : output.GetChannels()) { + *ev->Record.AddUpdate() = channel; + } + } + + YQL_LOG(DEBUG) << task.GetId() << " " << ev->Record.ShortDebugString(); + + Send(actorId, ev.Release()); + } + ChannelsUpdated = true; + } + + void OnResultFailure(TEvDqFailure::TPtr& ev) { + if (Finished) { + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(WARN) << "TEvDqFailure IGNORED when Finished from " << ev->Sender; + } else { + FinalStat().FlushCounters(ev->Get()->Record); // histograms will NOT be reported + Send(ExecuterId, ev->Release().Release()); + Finished = true; + } + } + + void OnError(const TString& message, bool retriable, bool needFallback) { + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(DEBUG) << "OnError " << message; + if (Finished) { + YQL_LOG_CTX_SCOPE(TraceId); + YQL_LOG(WARN) << "OnError IGNORED when Finished, Retriable=" << retriable << ", NeedFallback=" << needFallback; + } else { + auto issueCode = needFallback + ? TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR + : TIssuesIds::DQ_GATEWAY_ERROR; + auto req = MakeHolder<TEvDqFailure>(TIssue(message).SetCode(issueCode, TSeverityIds::S_ERROR), retriable, needFallback); + FinalStat().FlushCounters(req->Record); + Send(ExecuterId, req.Release()); + Finished = true; + } + } + + void Finish() { + if (ServiceCounters.Counters && AggrPeriod) { ExportStats(AggregateQueryStatsByStage(TaskStat, Stages), 0); // force metrics upload on Finish when Aggregated - } - Send(ExecuterId, new TEvGraphFinished()); - Finished = true; - } - - void OnQueryResult(TEvQueryResponse::TPtr& ev) { - YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty()); - FinalStat().FlushCounters(ev->Get()->Record); - if (!Issues.Empty()) { - IssuesToMessage(Issues, ev->Get()->Record.MutableIssues()); - } - Send(ResultId, ev->Release().Release()); - } - - TCounters FinalStat() { + } + Send(ExecuterId, new TEvGraphFinished()); + Finished = true; + } + + void OnQueryResult(TEvQueryResponse::TPtr& ev) { + YQL_ENSURE(!ev->Get()->Record.HasResultSet() && ev->Get()->Record.GetYson().empty()); + FinalStat().FlushCounters(ev->Get()->Record); + if (!Issues.Empty()) { + IssuesToMessage(Issues, ev->Get()->Record.MutableIssues()); + } + Send(ResultId, ev->Release().Release()); + } + + TCounters FinalStat() { return AggrPeriod ? AggregateQueryStatsByStage(TaskStat, Stages) : TaskStat; - } - - bool ChannelsUpdated = false; - TVector<std::pair<NDqProto::TDqTask, TActorId>> Tasks; - THashSet<ui64> FinishedTasks; - THashMap<ui64, TInstant> Executing; - THashMap<ui64, TActorId> ActorIds; - THashMap<TActorId, ui64> TaskIds; - THashMap<ui64, ui64> Stages; - const NActors::TActorId ExecuterId; - const NActors::TActorId ResultId; - const TString TraceId; - TDqConfiguration::TPtr Settings; - bool Finished = false; - TCounters TaskStat; - NYq::NCommon::TServiceCounters ServiceCounters; - TDuration PingPeriod = TDuration::Zero(); - TDuration AggrPeriod = TDuration::Zero(); + } + + bool ChannelsUpdated = false; + TVector<std::pair<NDqProto::TDqTask, TActorId>> Tasks; + THashSet<ui64> FinishedTasks; + THashMap<ui64, TInstant> Executing; + THashMap<ui64, TActorId> ActorIds; + THashMap<TActorId, ui64> TaskIds; + THashMap<ui64, ui64> Stages; + const NActors::TActorId ExecuterId; + const NActors::TActorId ResultId; + const TString TraceId; + TDqConfiguration::TPtr Settings; + bool Finished = false; + TCounters TaskStat; + NYq::NCommon::TServiceCounters ServiceCounters; + TDuration PingPeriod = TDuration::Zero(); + TDuration AggrPeriod = TDuration::Zero(); TIssues Issues; - ui64 PingCookie = 0; -}; - -} /* namespace */ - -THolder<NActors::IActor> MakeTaskController( - const TString& traceId, - const NActors::TActorId& executerId, - const NActors::TActorId& resultId, - const TDqConfiguration::TPtr& settings, - const NYq::NCommon::TServiceCounters& serviceCounters, - const TDuration& pingPeriod, - const TDuration& aggrPeriod -) { - return MakeHolder<NDq::TLogWrapReceive>(new TTaskController(traceId, executerId, resultId, settings, serviceCounters, pingPeriod, aggrPeriod), traceId); -} - - -} /* namespace NYql */ + ui64 PingCookie = 0; +}; + +} /* namespace */ + +THolder<NActors::IActor> MakeTaskController( + const TString& traceId, + const NActors::TActorId& executerId, + const NActors::TActorId& resultId, + const TDqConfiguration::TPtr& settings, + const NYq::NCommon::TServiceCounters& serviceCounters, + const TDuration& pingPeriod, + const TDuration& aggrPeriod +) { + return MakeHolder<NDq::TLogWrapReceive>(new TTaskController(traceId, executerId, resultId, settings, serviceCounters, pingPeriod, aggrPeriod), traceId); +} + + +} /* namespace NYql */ diff --git a/ydb/library/yql/providers/dq/actors/task_controller.h b/ydb/library/yql/providers/dq/actors/task_controller.h index d065ac17e7..f58f451eee 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller.h +++ b/ydb/library/yql/providers/dq/actors/task_controller.h @@ -1,20 +1,20 @@ -#pragma once - +#pragma once + #include <ydb/library/yql/providers/dq/common/yql_dq_settings.h> - -#include <library/cpp/actors/core/actor.h> - + +#include <library/cpp/actors/core/actor.h> + #include <ydb/core/yq/libs/common/service_counters.h> - -namespace NYql { - -THolder<NActors::IActor> MakeTaskController( - const TString& traceId, - const NActors::TActorId& executerId, - const NActors::TActorId& resultId, - const TDqConfiguration::TPtr& settings, + +namespace NYql { + +THolder<NActors::IActor> MakeTaskController( + const TString& traceId, + const NActors::TActorId& executerId, + const NActors::TActorId& resultId, + const TDqConfiguration::TPtr& settings, const ::NYq::NCommon::TServiceCounters& serviceCounters, - const TDuration& pingPeriod = TDuration::Zero(), - const TDuration& aggrPeriod = TDuration::Seconds(1)); - -} // namespace NYql + const TDuration& pingPeriod = TDuration::Zero(), + const TDuration& aggrPeriod = TDuration::Seconds(1)); + +} // namespace NYql diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index 09aecfcf92..67b281776c 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -34,7 +34,7 @@ message TAllocateWorkersRequest { string ComputeActorType = 15; repeated NYql.NDqProto.TDqTask Task = 12; // used for compute actor NActorsProto.TActorId ResultActorId = 13; // used for compute actor - + uint64 FreeWorkerAfterMs = 14; } @@ -180,7 +180,7 @@ message TQueryResponse { } message TDqFailure { - repeated Ydb.Issue.IssueMessage Issues = 4; + repeated Ydb.Issue.IssueMessage Issues = 4; repeated TMetric Metric = 5; bool Retriable = 6; bool NeedFallback = 7; @@ -188,9 +188,9 @@ message TDqFailure { message TGraphRequest { Yql.DqsProto.ExecuteGraphRequest Request = 1; - NActorsProto.TActorId ControlId = 2; - NActorsProto.TActorId ResultId = 3; - NActorsProto.TActorId CheckPointCoordinatorId = 4; // may be empty + NActorsProto.TActorId ControlId = 2; + NActorsProto.TActorId ResultId = 3; + NActorsProto.TActorId CheckPointCoordinatorId = 4; // may be empty } message TDqTaskRequest { diff --git a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp index bd33963442..95d03a4611 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_common.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_common.cpp @@ -70,9 +70,9 @@ bool ParseCounterName(TString* prefix, std::map<TString, TString>* labels, TStri auto pos = counterName.find(":"); if (pos == TString::npos) { return false; - } + } *prefix = counterName.substr(0, pos); - + auto labelsString = counterName.substr(pos+1); *name = ""; diff --git a/ydb/library/yql/providers/dq/counters/counters.h b/ydb/library/yql/providers/dq/counters/counters.h index 4a457d24fc..614a66a296 100644 --- a/ydb/library/yql/providers/dq/counters/counters.h +++ b/ydb/library/yql/providers/dq/counters/counters.h @@ -60,16 +60,16 @@ struct TCounters { counter.Count += value; } - template<typename T> - void SetCounter(const TString& name, T value) const { - auto& counter = Counters[name]; - counter.Count = value; - } - - THashMap<i64, ui64>& GetHistogram(const TString& name) { - return Histograms[name]; - } - + template<typename T> + void SetCounter(const TString& name, T value) const { + auto& counter = Counters[name]; + counter.Count = value; + } + + THashMap<i64, ui64>& GetHistogram(const TString& name) { + return Histograms[name]; + } + void AddTimeCounter(const TString& name, i64 value) const { AddCounter(name, TDuration::MilliSeconds(value)); } @@ -132,10 +132,10 @@ struct TCounters { return Counters; } - const auto& GetHistograms() const { - return Histograms; - } - + const auto& GetHistograms() const { + return Histograms; + } + struct TEntry { i64 Sum = 0; i64 Max = 0; @@ -285,7 +285,7 @@ struct TCounters { protected: mutable THashMap<TString, TEntry> Counters; - mutable THashMap<TString, THashMap<i64, ui64>> Histograms; + mutable THashMap<TString, THashMap<i64, ui64>> Histograms; mutable THashMap<TString, TInstant> Start; }; diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index 65a9e500d9..f20764e145 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -363,7 +363,7 @@ namespace NYql::NDqs { YQL_LOG(DEBUG) << __FUNCTION__; MergeTaskMetas(params); - auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime)); + auto executerId = RegisterChild(NDq::MakeDqExecuter(MakeWorkerManagerActorID(SelfId().NodeId()), SelfId(), TraceId, Username, Settings, Counters, RequestStartTime)); TVector<TString> columns; columns.reserve(Request->GetColumns().size()); @@ -388,11 +388,11 @@ namespace NYql::NDqs { Request->GetResultType(), Request->GetDiscard(), GraphExecutionEventsActorId).Release()); - auto controlId = Settings->EnableComputeActor.Get().GetOrElse(false) == false ? resultId - : RegisterChild(NYql::MakeTaskController(TraceId, executerId, resultId, Settings, NYq::NCommon::TServiceCounters(Counters, nullptr, "")).Release()); - Send(executerId, MakeHolder<TEvGraphRequest>( + auto controlId = Settings->EnableComputeActor.Get().GetOrElse(false) == false ? resultId + : RegisterChild(NYql::MakeTaskController(TraceId, executerId, resultId, Settings, NYq::NCommon::TServiceCounters(Counters, nullptr, "")).Release()); + Send(executerId, MakeHolder<TEvGraphRequest>( *Request, - controlId, + controlId, resultId)); } diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp index 6b298a5641..6b981ee389 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/interface/counters.cpp @@ -4,8 +4,8 @@ namespace NYql::NDqs { TWorkerManagerCounters::TWorkerManagerCounters(NMonitoring::TDynamicCounterPtr root) { ActiveWorkers = root->GetCounter("ActiveWorkers"); - MkqlMemoryLimit = root->GetCounter("MkqlMemoryLimit"); - MkqlMemoryAllocated = root->GetCounter("MkqlMemoryAllocated"); + MkqlMemoryLimit = root->GetCounter("MkqlMemoryLimit"); + MkqlMemoryAllocated = root->GetCounter("MkqlMemoryAllocated"); } TWorkerManagerCounters::TWorkerManagerCounters() diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/counters.h b/ydb/library/yql/providers/dq/worker_manager/interface/counters.h index 51290cba62..e84f8a6c2a 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/counters.h +++ b/ydb/library/yql/providers/dq/worker_manager/interface/counters.h @@ -5,8 +5,8 @@ namespace NYql::NDqs { struct TWorkerManagerCounters { NMonitoring::TDynamicCounters::TCounterPtr ActiveWorkers; - NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryLimit; - NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryAllocated; + NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryLimit; + NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryAllocated; explicit TWorkerManagerCounters(NMonitoring::TDynamicCounterPtr root); TWorkerManagerCounters(); diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp index 5ecf0a4a85..12caab7fcc 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.cpp @@ -18,7 +18,7 @@ #include "worker_manager_common.h" #include <util/generic/vector.h> -#include <util/system/mutex.h> +#include <util/system/mutex.h> #include <util/random/random.h> #include <util/system/rusage.h> @@ -46,39 +46,39 @@ public: : TWorkerManagerCommon<TLocalWorkerManager>(&TLocalWorkerManager::Handler) , Options(options) , MemoryQuoter(std::make_shared<NDq::TResourceQuoter>(Options.MkqlTotalMemoryLimit)) - { + { Options.Counters.MkqlMemoryLimit->Set(Options.MkqlTotalMemoryLimit); Options.Counters.MkqlMemoryAllocated->Set(0); - + MemoryQuoter->SetNotifier([limitCounter = Options.Counters.MkqlMemoryLimit, allocatedCounter = Options.Counters.MkqlMemoryAllocated](const ui64 limit, ui64 allocated) { - limitCounter->Set(limit); - allocatedCounter->Set(allocated); - }); - - AllocateMemoryFn = [quoter = MemoryQuoter](const auto& txId, ui64, ui64 size) { - // mem per task is not tracked yet - return quoter->Allocate(txId, 0, size); - }; - - FreeMemoryFn = [quoter = MemoryQuoter](const auto& txId, ui64, ui64 size) { - // mem per task is not tracked yet - quoter->Free(txId, 0, size); - }; - } - + limitCounter->Set(limit); + allocatedCounter->Set(allocated); + }); + + AllocateMemoryFn = [quoter = MemoryQuoter](const auto& txId, ui64, ui64 size) { + // mem per task is not tracked yet + return quoter->Allocate(txId, 0, size); + }; + + FreeMemoryFn = [quoter = MemoryQuoter](const auto& txId, ui64, ui64 size) { + // mem per task is not tracked yet + quoter->Free(txId, 0, size); + }; + } + private: STRICT_STFUNC(Handler, { - hFunc(TEvAllocateWorkersRequest, OnAllocateWorkersRequest) - hFunc(TEvFreeWorkersNotify, OnFreeWorkers) + hFunc(TEvAllocateWorkersRequest, OnAllocateWorkersRequest) + hFunc(TEvFreeWorkersNotify, OnFreeWorkers) cFunc(TEvents::TEvPoison::EventType, PassAway) cFunc(TEvents::TEvBootstrap::EventType, Bootstrap) cFunc(TEvents::TEvWakeup::EventType, WakeUp) IgnoreFunc(TEvInterconnect::TEvNodeConnected) - hFunc(TEvInterconnect::TEvNodeDisconnected, OnDisconnected) - hFunc(TEvents::TEvUndelivered, OnUndelivered) - hFunc(TEvConfigureFailureInjectorRequest, OnConfigureFailureInjector) + hFunc(TEvInterconnect::TEvNodeDisconnected, OnDisconnected) + hFunc(TEvents::TEvUndelivered, OnUndelivered) + hFunc(TEvConfigureFailureInjectorRequest, OnConfigureFailureInjector) HFunc(TEvRoutesRequest, OnRoutesRequest) - hFunc(TEvQueryStatus, OnQueryStatus) + hFunc(TEvQueryStatus, OnQueryStatus) }) TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parentId) override { @@ -122,7 +122,7 @@ private: YQL_LOG(DEBUG) << "Deallocate " << nodeId; for (const auto& [k, v] : AllocatedWorkers) { - if (v.Sender.NodeId() == nodeId) { + if (v.Sender.NodeId() == nodeId) { toDeallocate.push_back(k); } } @@ -132,48 +132,48 @@ private: } } - void Deallocate(const NActors::TActorId& senderId) { - TVector<ui64> toDeallocate; - - YQL_LOG(DEBUG) << "Deallocate " << senderId; - for (const auto& [k, v] : AllocatedWorkers) { - if (v.Sender == senderId) { - toDeallocate.push_back(k); - } - } - - for (const auto& k : toDeallocate) { - FreeGroup(k); - } - } - - void OnDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) + void Deallocate(const NActors::TActorId& senderId) { + TVector<ui64> toDeallocate; + + YQL_LOG(DEBUG) << "Deallocate " << senderId; + for (const auto& [k, v] : AllocatedWorkers) { + if (v.Sender == senderId) { + toDeallocate.push_back(k); + } + } + + for (const auto& k : toDeallocate) { + FreeGroup(k); + } + } + + void OnDisconnected(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { YQL_LOG(DEBUG) << "Disconnected " << ev->Get()->NodeId; Unsubscribe(ev->Get()->NodeId); Deallocate(ev->Get()->NodeId); } - void OnUndelivered(TEvents::TEvUndelivered::TPtr& ev) + void OnUndelivered(TEvents::TEvUndelivered::TPtr& ev) { Y_VERIFY(ev->Get()->Reason == TEvents::TEvUndelivered::Disconnected || ev->Get()->Reason == TEvents::TEvUndelivered::ReasonActorUnknown); - YQL_LOG(DEBUG) << "Undelivered " << ev->Sender; - - switch (ev->Get()->Reason) { - case TEvents::TEvUndelivered::Disconnected: - Deallocate(ev->Sender.NodeId()); - break; - case TEvents::TEvUndelivered::ReasonActorUnknown: - Deallocate(ev->Sender); - break; - default: - break; - } + YQL_LOG(DEBUG) << "Undelivered " << ev->Sender; + + switch (ev->Get()->Reason) { + case TEvents::TEvUndelivered::Disconnected: + Deallocate(ev->Sender.NodeId()); + break; + case TEvents::TEvUndelivered::ReasonActorUnknown: + Deallocate(ev->Sender); + break; + default: + break; + } } - void OnConfigureFailureInjector(TEvConfigureFailureInjectorRequest::TPtr& ev) { + void OnConfigureFailureInjector(TEvConfigureFailureInjectorRequest::TPtr& ev) { YQL_LOG(DEBUG) << "TEvConfigureFailureInjectorRequest "; auto& request = ev->Get()->Record.GetRequest(); @@ -189,7 +189,7 @@ private: Send(ev->Sender, response.Release()); } - void OnAllocateWorkersRequest(TEvAllocateWorkersRequest::TPtr& ev) { + void OnAllocateWorkersRequest(TEvAllocateWorkersRequest::TPtr& ev) { ui64 resourceId; if (ev->Get()->Record.GetResourceId()) { resourceId = ev->Get()->Record.GetResourceId(); @@ -212,23 +212,23 @@ private: TFailureInjector::Reach("allocate_workers_failure", [] { ::_exit(1); }); auto& allocationInfo = AllocatedWorkers[resourceId]; - auto traceId = ev->Get()->Record.GetTraceId(); - allocationInfo.TxId = traceId; - + auto traceId = ev->Get()->Record.GetTraceId(); + allocationInfo.TxId = traceId; + auto count = ev->Get()->Record.GetCount(); Y_VERIFY(count > 0); bool canAllocate = MemoryQuoter->Allocate(traceId, 0, count * Options.MkqlInitialMemoryLimit); - - if (!canAllocate) { - Send(ev->Sender, MakeHolder<TEvAllocateWorkersResponse>("Not enough memory to allocate tasks"), 0, ev->Cookie); - return; - } - + + if (!canAllocate) { + Send(ev->Sender, MakeHolder<TEvAllocateWorkersResponse>("Not enough memory to allocate tasks"), 0, ev->Cookie); + return; + } + if (allocationInfo.WorkerActors.empty()) { allocationInfo.WorkerActors.reserve(count); - allocationInfo.Sender = ev->Sender; + allocationInfo.Sender = ev->Sender; if (ev->Get()->Record.GetFreeWorkerAfterMs()) { allocationInfo.Deadline = TInstant::Now() + TDuration::MilliSeconds(ev->Get()->Record.GetFreeWorkerAfterMs()); @@ -259,13 +259,13 @@ private: } else { actor.Reset(CreateWorkerActor( Options.RuntimeData, - traceId, + traceId, Options.TaskRunnerActorFactory, Options.SourceActorFactory, Options.SinkActorFactory)); } allocationInfo.WorkerActors.emplace_back(RegisterChild( - actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr + actor.Release(), createComputeActor ? NYql::NDq::TEvDq::TEvAbortExecution::Unavailable("Aborted by LWM").Release() : nullptr )); } @@ -279,33 +279,33 @@ private: Subscribe(ev->Sender.NodeId()); } - void OnFreeWorkers(TEvFreeWorkersNotify::TPtr& ev) { + void OnFreeWorkers(TEvFreeWorkersNotify::TPtr& ev) { ui64 resourceId = ev->Get()->Record.GetResourceId(); YQL_LOG(DEBUG) << "TEvFreeWorkersNotify " << resourceId; FreeGroup(resourceId, ev->Sender); } - void OnQueryStatus(TEvQueryStatus::TPtr& ev) { + void OnQueryStatus(TEvQueryStatus::TPtr& ev) { auto response = MakeHolder<TEvQueryStatusResponse>(); Send(ev->Sender, response.Release()); } - void FreeGroup(ui64 id, NActors::TActorId sender = NActors::TActorId()) { + void FreeGroup(ui64 id, NActors::TActorId sender = NActors::TActorId()) { YQL_LOG(DEBUG) << "Free Group " << id; auto it = AllocatedWorkers.find(id); if (it != AllocatedWorkers.end()) { - for (const auto& actorId : it->second.WorkerActors) { + for (const auto& actorId : it->second.WorkerActors) { UnregisterChild(actorId); - } + } - if (sender) { - Y_VERIFY(it->second.Sender == sender); - } + if (sender) { + Y_VERIFY(it->second.Sender == sender); + } - MemoryQuoter->Free(it->second.TxId, 0); + MemoryQuoter->Free(it->second.TxId, 0); Options.Counters.ActiveWorkers->Sub(it->second.WorkerActors.size()); - AllocatedWorkers.erase(it); - } + AllocatedWorkers.erase(it); + } } void FreeOnDeadline() { @@ -326,18 +326,18 @@ private: struct TAllocationInfo { TVector<NActors::TActorId> WorkerActors; - NActors::TActorId Sender; + NActors::TActorId Sender; TInstant Deadline; - NDq::TTxId TxId; + NDq::TTxId TxId; }; THashMap<ui64, TAllocationInfo> AllocatedWorkers; TDqLocalResourceId ResourceId; TRusage Rusage; - NDq::TAllocateMemoryCallback AllocateMemoryFn; - NDq::TFreeMemoryCallback FreeMemoryFn; - std::shared_ptr<NDq::TResourceQuoter> MemoryQuoter; + NDq::TAllocateMemoryCallback AllocateMemoryFn; + NDq::TFreeMemoryCallback FreeMemoryFn; + std::shared_ptr<NDq::TResourceQuoter> MemoryQuoter; }; diff --git a/ydb/public/api/protos/draft/yq_private.proto b/ydb/public/api/protos/draft/yq_private.proto index 2af1c2f3ed..8297abf647 100644 --- a/ydb/public/api/protos/draft/yq_private.proto +++ b/ydb/public/api/protos/draft/yq_private.proto @@ -83,13 +83,13 @@ message PingTaskRequest { SignedIdentity result_id = 4; YandexQuery.QueryMeta.ComputeStatus status = 5; repeated Ydb.Issue.IssueMessage issues = 6; - repeated Ydb.Issue.IssueMessage transient_issues = 16; + repeated Ydb.Issue.IssueMessage transient_issues = 16; uint32 result_set_count = 7; string statistics = 8; repeated YandexQuery.ResultSetMeta result_set_meta = 9; string executer_info = 10; - repeated bytes dq_graph = 11; - int32 dq_graph_index = 20; + repeated bytes dq_graph = 11; + int32 dq_graph_index = 20; string ast = 12; string plan = 13; bool resign_query = 14; @@ -130,20 +130,20 @@ message WriteTaskResultResponse { Ydb.Operations.Operation operation = 1; // WriteRowsResultResult } -message NodeInfo { - uint32 node_id = 1; - string instance_id = 2; - string hostname = 3; - uint64 active_workers = 4; - uint64 memory_limit = 5; - uint64 memory_allocated = 6; +message NodeInfo { + uint32 node_id = 1; + string instance_id = 2; + string hostname = 3; + uint64 active_workers = 4; + uint64 memory_limit = 5; + uint64 memory_allocated = 6; uint32 interconnect_port = 7; string node_address = 8; -} - +} + message NodesHealthCheckRequest { string tenant = 1; - NodeInfo node = 2; + NodeInfo node = 2; Ydb.Operations.OperationParams operation_params = 6; } diff --git a/ydb/public/api/protos/yq.proto b/ydb/public/api/protos/yq.proto index 6aa7169a89..4875bc549d 100644 --- a/ydb/public/api/protos/yq.proto +++ b/ydb/public/api/protos/yq.proto @@ -60,7 +60,7 @@ enum QueryAction { PAUSE = 1; // Pause the query, with the possibility of its quick resumption PAUSE_GRACEFULLY = 2; // Similar to PAUSE, only suspends the query allowing it to pause in checkpoint. Can work for a long time ABORT = 3; // Stop the query - ABORT_GRACEFULLY = 4; // Similar to ABORT, only stops the query in checkpoint + ABORT_GRACEFULLY = 4; // Similar to ABORT, only stops the query in checkpoint RESUME = 5; // Resumes the execution of the query. Works only for PAUSE queries } diff --git a/ydb/services/yq/grpc_service.cpp b/ydb/services/yq/grpc_service.cpp index bb71dde092..0e37d5c39f 100644 --- a/ydb/services/yq/grpc_service.cpp +++ b/ydb/services/yq/grpc_service.cpp @@ -3,7 +3,7 @@ #include <ydb/core/grpc_services/grpc_helper.h> #include <ydb/core/grpc_services/grpc_request_proxy.h> #include <ydb/core/grpc_services/rpc_calls.h> -#include <ydb/core/grpc_services/service_yq.h> +#include <ydb/core/grpc_services/service_yq.h> #include <ydb/library/protobuf_printer/security_printer.h> namespace NKikimr::NGRpcService { @@ -35,153 +35,153 @@ void TGRpcYandexQueryService::DecRequest() { void TGRpcYandexQueryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); - static const TVector<TString> CreateQueryPermissions = { - "yq.queries.create", - "yq.queries.invoke", - "yq.connections.use", - "yq.bindings.use", - "yq.resources.managePublic" - }; - static const TVector<TString> ListQueriesPermissions = { - "yq.queries.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - static const TVector<TString> DescribeQueryPermissions = { - "yq.queries.get", - "yq.queries.viewAst", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - static const TVector<TString> GetQueryStatusPermissions = { - "yq.queries.getStatus", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - static const TVector<TString> ModifyQueryPermissions = { - "yq.queries.update", - "yq.queries.invoke", - "yq.connections.use", - "yq.bindings.use", - "yq.resources.managePublic", - "yq.resources.managePrivate" - }; - static const TVector<TString> DeleteQueryPermissions = { - "yq.queries.delete", - "yq.resources.managePublic", - "yq.resources.managePrivate" - }; - static const TVector<TString> ControlQueryPermissions = { - "yq.queries.control", - "yq.resources.managePublic", - "yq.resources.managePrivate" - }; - static const TVector<TString> GetResultDataPermissions = { - "yq.queries.getData", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - static const TVector<TString> ListJobsPermissions = { - "yq.jobs.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - static const TVector<TString> DescribeJobPermissions = { - "yq.jobs.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - static const TVector<TString> CreateConnectionPermissions = { - "yq.connections.create", - "yq.resources.managePublic", - }; - static const TVector<TString> ListConnectionsPermissions = { - "yq.connections.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - static const TVector<TString> DescribeConnectionPermissions = { - "yq.connections.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - static const TVector<TString> ModifyConnectionPermissions = { - "yq.connections.update", - "yq.resources.managePublic", - "yq.resources.managePrivate", - }; - static const TVector<TString> DeleteConnectionPermissions = { - "yq.connections.delete", - "yq.resources.managePublic", - "yq.resources.managePrivate" - }; - static const TVector<TString> TestConnectionPermissions = { - "yq.connections.create", - }; - static const TVector<TString> CreateBindingPermissions = { - "yq.bindings.create", - "yq.resources.managePublic" - }; - static const TVector<TString> ListBindingsPermissions = { - "yq.bindings.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - static const TVector<TString> DescribeBindingPermissions = { - "yq.bindings.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - static const TVector<TString> ModifyBindingPermissions = { - "yq.bindings.update", - "yq.resources.managePublic", - "yq.resources.managePrivate" - }; - static const TVector<TString> DeleteBindingPermissions = { - "yq.bindings.delete", - "yq.resources.managePublic", - "yq.resources.managePrivate" - }; - + static const TVector<TString> CreateQueryPermissions = { + "yq.queries.create", + "yq.queries.invoke", + "yq.connections.use", + "yq.bindings.use", + "yq.resources.managePublic" + }; + static const TVector<TString> ListQueriesPermissions = { + "yq.queries.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> DescribeQueryPermissions = { + "yq.queries.get", + "yq.queries.viewAst", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> GetQueryStatusPermissions = { + "yq.queries.getStatus", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> ModifyQueryPermissions = { + "yq.queries.update", + "yq.queries.invoke", + "yq.connections.use", + "yq.bindings.use", + "yq.resources.managePublic", + "yq.resources.managePrivate" + }; + static const TVector<TString> DeleteQueryPermissions = { + "yq.queries.delete", + "yq.resources.managePublic", + "yq.resources.managePrivate" + }; + static const TVector<TString> ControlQueryPermissions = { + "yq.queries.control", + "yq.resources.managePublic", + "yq.resources.managePrivate" + }; + static const TVector<TString> GetResultDataPermissions = { + "yq.queries.getData", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> ListJobsPermissions = { + "yq.jobs.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> DescribeJobPermissions = { + "yq.jobs.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> CreateConnectionPermissions = { + "yq.connections.create", + "yq.resources.managePublic", + }; + static const TVector<TString> ListConnectionsPermissions = { + "yq.connections.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> DescribeConnectionPermissions = { + "yq.connections.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> ModifyConnectionPermissions = { + "yq.connections.update", + "yq.resources.managePublic", + "yq.resources.managePrivate", + }; + static const TVector<TString> DeleteConnectionPermissions = { + "yq.connections.delete", + "yq.resources.managePublic", + "yq.resources.managePrivate" + }; + static const TVector<TString> TestConnectionPermissions = { + "yq.connections.create", + }; + static const TVector<TString> CreateBindingPermissions = { + "yq.bindings.create", + "yq.resources.managePublic" + }; + static const TVector<TString> ListBindingsPermissions = { + "yq.bindings.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> DescribeBindingPermissions = { + "yq.bindings.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> ModifyBindingPermissions = { + "yq.bindings.update", + "yq.resources.managePublic", + "yq.resources.managePrivate" + }; + static const TVector<TString> DeleteBindingPermissions = { + "yq.bindings.delete", + "yq.resources.managePublic", + "yq.resources.managePrivate" + }; + #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif -#define ADD_REQUEST(NAME, CB, PERMISSIONS) \ -MakeIntrusive<TGRpcRequest<YandexQuery::NAME##Request, YandexQuery::NAME##Response, TGRpcYandexQueryService, TSecurityTextFormatPrinter<YandexQuery::NAME##Request>, TSecurityTextFormatPrinter<YandexQuery::NAME##Response>>>( \ - this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ActorSystem_->Send(GRpcRequestProxyId_, \ - new TGrpcYqRequestOperationCall<YandexQuery::NAME##Request, YandexQuery::NAME##Response> \ - (ctx, &CB, PERMISSIONS)); \ - }, \ - &YandexQuery::V1::YandexQueryService::AsyncService::Request##NAME, \ - #NAME, logger, getCounterBlock("yq", #NAME)) \ - ->Run(); \ +#define ADD_REQUEST(NAME, CB, PERMISSIONS) \ +MakeIntrusive<TGRpcRequest<YandexQuery::NAME##Request, YandexQuery::NAME##Response, TGRpcYandexQueryService, TSecurityTextFormatPrinter<YandexQuery::NAME##Request>, TSecurityTextFormatPrinter<YandexQuery::NAME##Response>>>( \ + this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcYqRequestOperationCall<YandexQuery::NAME##Request, YandexQuery::NAME##Response> \ + (ctx, &CB, PERMISSIONS)); \ + }, \ + &YandexQuery::V1::YandexQueryService::AsyncService::Request##NAME, \ + #NAME, logger, getCounterBlock("yq", #NAME)) \ + ->Run(); \ - ADD_REQUEST(CreateQuery, DoYandexQueryCreateQueryRequest, CreateQueryPermissions) - ADD_REQUEST(ListQueries, DoYandexQueryListQueriesRequest, ListQueriesPermissions) - ADD_REQUEST(DescribeQuery, DoYandexQueryDescribeQueryRequest, DescribeQueryPermissions) - ADD_REQUEST(GetQueryStatus, DoYandexQueryGetQueryStatusRequest, GetQueryStatusPermissions) - ADD_REQUEST(ModifyQuery, DoYandexQueryModifyQueryRequest, ModifyQueryPermissions) - ADD_REQUEST(DeleteQuery, DoYandexQueryDeleteQueryRequest, DeleteQueryPermissions) - ADD_REQUEST(ControlQuery, DoYandexQueryControlQueryRequest, ControlQueryPermissions) - ADD_REQUEST(GetResultData, DoGetResultDataRequest, GetResultDataPermissions) - ADD_REQUEST(ListJobs, DoListJobsRequest, ListJobsPermissions) - ADD_REQUEST(DescribeJob, DoDescribeJobRequest, DescribeJobPermissions) - ADD_REQUEST(CreateConnection, DoCreateConnectionRequest, CreateConnectionPermissions) - ADD_REQUEST(ListConnections, DoListConnectionsRequest, ListConnectionsPermissions) - ADD_REQUEST(DescribeConnection, DoDescribeConnectionRequest, DescribeConnectionPermissions) - ADD_REQUEST(ModifyConnection, DoModifyConnectionRequest, ModifyConnectionPermissions) - ADD_REQUEST(DeleteConnection, DoDeleteConnectionRequest, DeleteConnectionPermissions) - ADD_REQUEST(TestConnection, DoTestConnectionRequest, TestConnectionPermissions) - ADD_REQUEST(CreateBinding, DoCreateBindingRequest, CreateBindingPermissions) - ADD_REQUEST(ListBindings, DoListBindingsRequest, ListBindingsPermissions) - ADD_REQUEST(DescribeBinding, DoDescribeBindingRequest, DescribeBindingPermissions) - ADD_REQUEST(ModifyBinding, DoModifyBindingRequest, ModifyBindingPermissions) - ADD_REQUEST(DeleteBinding, DoDeleteBindingRequest, DeleteBindingPermissions) + ADD_REQUEST(CreateQuery, DoYandexQueryCreateQueryRequest, CreateQueryPermissions) + ADD_REQUEST(ListQueries, DoYandexQueryListQueriesRequest, ListQueriesPermissions) + ADD_REQUEST(DescribeQuery, DoYandexQueryDescribeQueryRequest, DescribeQueryPermissions) + ADD_REQUEST(GetQueryStatus, DoYandexQueryGetQueryStatusRequest, GetQueryStatusPermissions) + ADD_REQUEST(ModifyQuery, DoYandexQueryModifyQueryRequest, ModifyQueryPermissions) + ADD_REQUEST(DeleteQuery, DoYandexQueryDeleteQueryRequest, DeleteQueryPermissions) + ADD_REQUEST(ControlQuery, DoYandexQueryControlQueryRequest, ControlQueryPermissions) + ADD_REQUEST(GetResultData, DoGetResultDataRequest, GetResultDataPermissions) + ADD_REQUEST(ListJobs, DoListJobsRequest, ListJobsPermissions) + ADD_REQUEST(DescribeJob, DoDescribeJobRequest, DescribeJobPermissions) + ADD_REQUEST(CreateConnection, DoCreateConnectionRequest, CreateConnectionPermissions) + ADD_REQUEST(ListConnections, DoListConnectionsRequest, ListConnectionsPermissions) + ADD_REQUEST(DescribeConnection, DoDescribeConnectionRequest, DescribeConnectionPermissions) + ADD_REQUEST(ModifyConnection, DoModifyConnectionRequest, ModifyConnectionPermissions) + ADD_REQUEST(DeleteConnection, DoDeleteConnectionRequest, DeleteConnectionPermissions) + ADD_REQUEST(TestConnection, DoTestConnectionRequest, TestConnectionPermissions) + ADD_REQUEST(CreateBinding, DoCreateBindingRequest, CreateBindingPermissions) + ADD_REQUEST(ListBindings, DoListBindingsRequest, ListBindingsPermissions) + ADD_REQUEST(DescribeBinding, DoDescribeBindingRequest, DescribeBindingPermissions) + ADD_REQUEST(ModifyBinding, DoModifyBindingRequest, ModifyBindingPermissions) + ADD_REQUEST(DeleteBinding, DoDeleteBindingRequest, DeleteBindingPermissions) -#undef ADD_REQUEST +#undef ADD_REQUEST } diff --git a/ydb/services/yq/ut_integration/yq_ut.cpp b/ydb/services/yq/ut_integration/yq_ut.cpp index d4a35f195e..332473115f 100644 --- a/ydb/services/yq/ut_integration/yq_ut.cpp +++ b/ydb/services/yq/ut_integration/yq_ut.cpp @@ -975,10 +975,10 @@ Y_UNIT_TEST_SUITE(PrivateApi) { { Yq::Private::NodesHealthCheckRequest req; req.set_tenant("Tenant"); - auto& node = *req.mutable_node(); - node.set_hostname("hostname"); - node.set_node_id(100500); - node.set_instance_id(instanceId); + auto& node = *req.mutable_node(); + node.set_hostname("hostname"); + node.set_node_id(100500); + node.set_instance_id(instanceId); const auto result = DoWithRetryOnRetCode([&]() { auto r = req; auto result = client.NodesHealthCheck(std::move(r)).ExtractValueSync(); diff --git a/ydb/tests/library/harness/resources/default_yaml.yml b/ydb/tests/library/harness/resources/default_yaml.yml index eed25cd17d..bd9fde467d 100644 --- a/ydb/tests/library/harness/resources/default_yaml.yml +++ b/ydb/tests/library/harness/resources/default_yaml.yml @@ -180,61 +180,61 @@ net_classifier_config: retry_interval_seconds: 30 net_data_update_interval_seconds: 60 cms_config_timeout_seconds: 30 -yandex_query_config: - enabled: false - enable_dynamic_nameservice: false - common: - use_bearer_for_ydb: true - ids_prefix: "pt" - control_plane_storage: - enabled: true - available_binding: - - "DATA_STREAMS" - - "OBJECT_STORAGE" - available_connection: - - "YDB_DATABASE" - - "CLICKHOUSE_CLUSTER" - - "DATA_STREAMS" - - "OBJECT_STORAGE" - - "MONITORING" - storage: - endpoint: "" - control_plane_proxy: - enabled: true - request_timeout: "30s" - private_api: - enabled: true - token_accessor: - enabled: true - gateways: - enabled: true - pq: - cluster_mapping: [] - solomon: - cluster_mapping: [] - dq: - default_settings: [] - db_pool: - enabled: true - storage: - endpoint: "" - checkpoint_coordinator: - enabled: true - checkpointing_period_millis: 1000 - max_inflight: 1 - storage: - endpoint: "" - resource_manager: - enabled: true - private_proxy: - enabled: true - nodes_manager: - enabled: true - pending_fetcher: - enabled: true - audit: - enabled: false - uaconfig: - uri: "" - pinger: - ping_period: "30s" +yandex_query_config: + enabled: false + enable_dynamic_nameservice: false + common: + use_bearer_for_ydb: true + ids_prefix: "pt" + control_plane_storage: + enabled: true + available_binding: + - "DATA_STREAMS" + - "OBJECT_STORAGE" + available_connection: + - "YDB_DATABASE" + - "CLICKHOUSE_CLUSTER" + - "DATA_STREAMS" + - "OBJECT_STORAGE" + - "MONITORING" + storage: + endpoint: "" + control_plane_proxy: + enabled: true + request_timeout: "30s" + private_api: + enabled: true + token_accessor: + enabled: true + gateways: + enabled: true + pq: + cluster_mapping: [] + solomon: + cluster_mapping: [] + dq: + default_settings: [] + db_pool: + enabled: true + storage: + endpoint: "" + checkpoint_coordinator: + enabled: true + checkpointing_period_millis: 1000 + max_inflight: 1 + storage: + endpoint: "" + resource_manager: + enabled: true + private_proxy: + enabled: true + nodes_manager: + enabled: true + pending_fetcher: + enabled: true + audit: + enabled: false + uaconfig: + uri: "" + pinger: + ping_period: "30s" |