diff options
author | andrew stalin <andrew.stalin@gmail.com> | 2024-10-16 09:44:27 +0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-16 09:44:27 +0700 |
commit | da0e694c1900697aaa367fa6a4e9b9ba68151e0d (patch) | |
tree | 44b1cda8ef45c036bf02f5f99d086d52855e186f | |
parent | b390c9dba1929b46726243ab260df05f48f1fd06 (diff) | |
download | ydb-da0e694c1900697aaa367fa6a4e9b9ba68151e0d.tar.gz |
fixed the count-min request deletion error (#10427)
-rw-r--r-- | ydb/core/statistics/service/service_impl.cpp | 128 |
1 files changed, 53 insertions, 75 deletions
diff --git a/ydb/core/statistics/service/service_impl.cpp b/ydb/core/statistics/service/service_impl.cpp index 1b19f77002..d6e3d1e86a 100644 --- a/ydb/core/statistics/service/service_impl.cpp +++ b/ydb/core/statistics/service/service_impl.cpp @@ -1,6 +1,7 @@ #include "service.h" #include "http_request.h" +#include <ydb/core/statistics/common.h> #include <ydb/core/statistics/events.h> #include <ydb/core/statistics/database/database.h> @@ -104,8 +105,7 @@ struct TAggregationStatistics { ? &Nodes[i] : nullptr; } } - LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Child node with the specified id was not found"); + SA_LOG_E("Child node with the specified id was not found"); return nullptr; } }; @@ -212,15 +212,13 @@ public: hFunc(NMon::TEvHttpInfoRes, Handle); cFunc(TEvents::TEvPoison::EventType, PassAway); default: - LOG_CRIT_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "NStat::TStatService: unexpected event# " << ev->GetTypeRewrite() << " " << ev->ToString()); + SA_LOG_CRIT("NStat::TStatService: unexpected event# " << ev->GetTypeRewrite() << " " << ev->ToString()); } } private: void HandleConfig(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) { - LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Subscribed for config changes on node " << SelfId().NodeId()); + SA_LOG_I("Subscribed for config changes on node " << SelfId().NodeId()); } void HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) { @@ -240,8 +238,7 @@ private: bool IsNotCurrentRound(ui64 round) { if (round != AggregationStatistics.Round) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Event round " << round << " is different from the current " << AggregationStatistics.Round); + SA_LOG_D("Event round " << round << " is different from the current " << AggregationStatistics.Round); return true; } return false; @@ -306,8 +303,7 @@ private: const auto& record = ev->Get()->Record; const auto tabletId = record.GetShardTabletId(); - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Received TEvStatisticsResponse TabletId: " << tabletId); + SA_LOG_D("Received TEvStatisticsResponse TabletId: " << tabletId); const auto round = ev->Cookie; if (IsNotCurrentRound(round)) { @@ -335,8 +331,7 @@ private: const auto round = record.GetRound(); if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvAggregateKeepAliveAck"); + SA_LOG_D("Skip TEvAggregateKeepAliveAck"); return; } @@ -346,8 +341,7 @@ private: void Handle(TEvPrivate::TEvKeepAliveAckTimeout::TPtr& ev) { const auto round = ev->Get()->Round; if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvKeepAliveAckTimeout"); + SA_LOG_D("Skip TEvKeepAliveAckTimeout"); return; } @@ -362,8 +356,7 @@ private: // the parent node is unavailable // invalidate the subtree with the root in the current node - LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Parent node " << AggregationStatistics.ParentNode.NodeId() << " is unavailable"); + SA_LOG_I("Parent node " << AggregationStatistics.ParentNode.NodeId() << " is unavailable"); ResetAggregationStatistics(); @@ -372,8 +365,7 @@ private: void Handle(TEvPrivate::TEvDispatchKeepAlive::TPtr& ev) { const auto round = ev->Get()->Round; if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvDispatchKeepAlive"); + SA_LOG_D("Skip TEvDispatchKeepAlive"); return; } @@ -387,8 +379,7 @@ private: const auto round = ev->Get()->Round; if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvKeepAliveTimeout"); + SA_LOG_D("Skip TEvKeepAliveTimeout"); return; } @@ -396,8 +387,7 @@ private: auto node = AggregationStatistics.GetProcessingChildNode(nodeId); if (node == nullptr) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvKeepAliveTimeout"); + SA_LOG_D("Skip TEvKeepAliveTimeout"); return; } @@ -412,8 +402,7 @@ private: node->Status = TAggregationStatistics::TNode::EStatus::Unavailable; ++AggregationStatistics.PprocessedNodes; - LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Node " << nodeId << " is unavailable"); + SA_LOG_I("Node " << nodeId << " is unavailable"); if (AggregationStatistics.IsCompleted()) { OnAggregateStatisticsFinished(); @@ -425,8 +414,7 @@ private: const auto round = record.GetRound(); if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvAggregateKeepAlive"); + SA_LOG_D("Skip TEvAggregateKeepAlive"); return; } @@ -434,8 +422,7 @@ private: auto node = AggregationStatistics.GetProcessingChildNode(nodeId); if (node == nullptr) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvAggregateKeepAlive"); + SA_LOG_D( "Skip TEvAggregateKeepAlive"); return; } @@ -447,15 +434,13 @@ private: } void Handle(TEvStatistics::TEvAggregateStatisticsResponse::TPtr& ev) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Received TEvAggregateStatisticsResponse SenderNodeId: " << ev->Sender.NodeId()); + SA_LOG_D("Received TEvAggregateStatisticsResponse SenderNodeId: " << ev->Sender.NodeId()); const auto& record = ev->Get()->Record; const auto round = record.GetRound(); if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvAggregateStatisticsResponse"); + SA_LOG_D("Skip TEvAggregateStatisticsResponse"); return; } @@ -463,8 +448,7 @@ private: auto node = AggregationStatistics.GetProcessingChildNode(nodeId); if (node == nullptr) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvAggregateStatisticsResponse"); + SA_LOG_D("Skip TEvAggregateStatisticsResponse"); return; } @@ -504,8 +488,7 @@ private: } void SendAggregateStatisticsResponse() { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Send aggregate statistics response to node: " << AggregationStatistics.ParentNode.NodeId()); + SA_LOG_D("Send aggregate statistics response to node: " << AggregationStatistics.ParentNode.NodeId()); auto response = std::make_unique<TEvStatistics::TEvAggregateStatisticsResponse>(); auto& record = response->Record; @@ -590,8 +573,7 @@ private: const auto& record = ev->Get()->Record; const auto round = record.GetRound(); - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Received TEvAggregateStatistics from node: " << ev->Sender.NodeId() + SA_LOG_D("Received TEvAggregateStatistics from node: " << ev->Sender.NodeId() << ", Round: " << round << ", current Round: " << AggregationStatistics.Round); // reset previous state @@ -670,8 +652,7 @@ private: return; } - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Handle TEvStatistics::TEvGetStatistics, request id = " << requestId + SA_LOG_D("Handle TEvStatistics::TEvGetStatistics, request id = " << requestId << ", ReplyToActorId = " << request.ReplyToActorId << ", StatRequests.size() = " << request.StatRequests.size()); @@ -715,8 +696,7 @@ private: auto cookie = navigate->Cookie; - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult, request id = " << cookie); + SA_LOG_D("Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult, request id = " << cookie); if (cookie == ResolveSACookie) { Y_ABORT_UNLESS(navigate->ResultSet.size() == 1); @@ -732,7 +712,14 @@ private: ConnectToSA(); SyncNode(); } else { - ReplyAllFailed(); + for (auto it = InFlight.begin(); it != InFlight.end();) { + if (EStatType::COUNT_MIN_SKETCH == it->second.StatType) { + ++it; + continue; + } + ReplyFailed(it->first, false); + it = InFlight.erase(it); + } } return; } @@ -838,8 +825,7 @@ private: } void Handle(TEvStatistics::TEvPropagateStatistics::TPtr& ev) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "EvPropagateStatistics, node id = " << SelfId().NodeId()); + SA_LOG_D("EvPropagateStatistics, node id = " << SelfId().NodeId()); Send(ev->Sender, new TEvStatistics::TEvPropagateStatisticsResponse); @@ -923,21 +909,18 @@ private: void Handle(TEvPrivate::TEvStatisticsRequestTimeout::TPtr& ev) { const auto round = ev->Get()->Round; if (IsNotCurrentRound(round)) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip TEvStatisticsRequestTimeout"); + SA_LOG_D("Skip TEvStatisticsRequestTimeout"); return; } const auto tabletId = ev->Get()->TabletId; auto tabletPipe = AggregationStatistics.LocalTablets.TabletsPipes.find(tabletId); if (tabletPipe == AggregationStatistics.LocalTablets.TabletsPipes.end()) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Tablet " << tabletId << " has already been processed"); + SA_LOG_D("Tablet " << tabletId << " has already been processed"); return; } - LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "No result was received from the tablet " << tabletId); + SA_LOG_E("No result was received from the tablet " << tabletId); auto clientId = tabletPipe->second; OnTabletError(tabletId); @@ -962,15 +945,13 @@ private: NTabletPipe::SendData(SelfId(), clientId, request.release(), round); Schedule(Settings.StatisticsRequestTimeout, new TEvPrivate::TEvStatisticsRequestTimeout(round, tabletId)); - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "TEvStatisticsRequest send" + SA_LOG_D("TEvStatisticsRequest send" << ", client id = " << clientId << ", path = " << *path); } void OnTabletError(ui64 tabletId) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Tablet " << tabletId << " is not local."); + SA_LOG_D("Tablet " << tabletId << " is not local."); const auto error = NKikimrStat::TEvAggregateStatisticsResponse::TYPE_NON_LOCAL_TABLET; AggregationStatistics.FailedTablets.emplace_back(tabletId, 0, error); @@ -988,8 +969,7 @@ private: const auto& clientId = ev->Get()->ClientId; const auto& tabletId = ev->Get()->TabletId; - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "EvClientConnected" + SA_LOG_D("EvClientConnected" << ", node id = " << ev->Get()->ClientId.NodeId() << ", client id = " << clientId << ", server id = " << ev->Get()->ServerId @@ -1018,16 +998,14 @@ private: return; } - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip EvClientConnected"); + SA_LOG_D("Skip EvClientConnected"); } void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) { const auto& clientId = ev->Get()->ClientId; const auto& tabletId = ev->Get()->TabletId; - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "EvClientDestroyed" + SA_LOG_D("EvClientDestroyed" << ", node id = " << ev->Get()->ClientId.NodeId() << ", client id = " << clientId << ", server id = " << ev->Get()->ServerId @@ -1049,8 +1027,7 @@ private: return; } - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "Skip EvClientDestroyed"); + SA_LOG_D("Skip EvClientDestroyed"); } void Handle(TEvStatistics::TEvStatisticsIsDisabled::TPtr&) { @@ -1060,13 +1037,19 @@ private: void Handle(TEvStatistics::TEvLoadStatisticsQueryResponse::TPtr& ev) { ui64 cookie = ev->Get()->Cookie; - auto itLoadQuery = LoadQueriesInFlight.find(cookie); Y_ABORT_UNLESS(itLoadQuery != LoadQueriesInFlight.end()); auto [requestId, requestIndex] = itLoadQuery->second; + SA_LOG_D("TEvLoadStatisticsQueryResponse, request id = " << requestId); + auto itRequest = InFlight.find(requestId); - Y_ABORT_UNLESS(itRequest != InFlight.end()); + if (InFlight.end() == itRequest) { + SA_LOG_E("TEvLoadStatisticsQueryResponse, request id = " << requestId + << ". Request not found in InFlight"); + return; + } + auto& request = itRequest->second; auto& response = request.StatResponses[requestIndex]; @@ -1093,8 +1076,7 @@ private: } void Handle(TEvPrivate::TEvRequestTimeout::TPtr& ev) { - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "EvRequestTimeout" + SA_LOG_D("EvRequestTimeout" << ", pipe client id = " << ev->Get()->PipeClientId << ", schemeshard count = " << ev->Get()->NeedSchemeShards.size()); @@ -1126,8 +1108,7 @@ private: NTabletPipe::TClientConfig pipeConfig{.RetryPolicy = policy}; SAPipeClientId = Register(NTabletPipe::CreateClient(SelfId(), StatisticsAggregatorId, pipeConfig)); - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "ConnectToSA(), pipe client id = " << SAPipeClientId); + SA_LOG_D("ConnectToSA(), pipe client id = " << SAPipeClientId); } void SyncNode() { @@ -1156,8 +1137,7 @@ private: Schedule(RequestTimeout, timeout.release()); } - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "SyncNode(), pipe client id = " << SAPipeClientId); + SA_LOG_D("SyncNode(), pipe client id = " << SAPipeClientId); } void ReplySuccess(ui64 requestId, bool eraseRequest) { @@ -1167,8 +1147,7 @@ private: } auto& request = itRequest->second; - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "ReplySuccess(), request id = " << requestId + SA_LOG_D("ReplySuccess(), request id = " << requestId << ", ReplyToActorId = " << request.ReplyToActorId << ", StatRequests.size() = " << request.StatRequests.size()); @@ -1214,8 +1193,7 @@ private: } auto& request = itRequest->second; - LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS, - "ReplyFailed(), request id = " << requestId); + SA_LOG_D("ReplyFailed(), request id = " << requestId); auto result = std::make_unique<TEvStatistics::TEvGetStatisticsResult>(); result->Success = false; |