aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorandrew stalin <andrew.stalin@gmail.com>2024-10-16 09:44:27 +0700
committerGitHub <noreply@github.com>2024-10-16 09:44:27 +0700
commitda0e694c1900697aaa367fa6a4e9b9ba68151e0d (patch)
tree44b1cda8ef45c036bf02f5f99d086d52855e186f
parentb390c9dba1929b46726243ab260df05f48f1fd06 (diff)
downloadydb-da0e694c1900697aaa367fa6a4e9b9ba68151e0d.tar.gz
fixed the count-min request deletion error (#10427)
-rw-r--r--ydb/core/statistics/service/service_impl.cpp128
1 files changed, 53 insertions, 75 deletions
diff --git a/ydb/core/statistics/service/service_impl.cpp b/ydb/core/statistics/service/service_impl.cpp
index 1b19f77002..d6e3d1e86a 100644
--- a/ydb/core/statistics/service/service_impl.cpp
+++ b/ydb/core/statistics/service/service_impl.cpp
@@ -1,6 +1,7 @@
#include "service.h"
#include "http_request.h"
+#include <ydb/core/statistics/common.h>
#include <ydb/core/statistics/events.h>
#include <ydb/core/statistics/database/database.h>
@@ -104,8 +105,7 @@ struct TAggregationStatistics {
? &Nodes[i] : nullptr;
}
}
- LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Child node with the specified id was not found");
+ SA_LOG_E("Child node with the specified id was not found");
return nullptr;
}
};
@@ -212,15 +212,13 @@ public:
hFunc(NMon::TEvHttpInfoRes, Handle);
cFunc(TEvents::TEvPoison::EventType, PassAway);
default:
- LOG_CRIT_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "NStat::TStatService: unexpected event# " << ev->GetTypeRewrite() << " " << ev->ToString());
+ SA_LOG_CRIT("NStat::TStatService: unexpected event# " << ev->GetTypeRewrite() << " " << ev->ToString());
}
}
private:
void HandleConfig(NConsole::TEvConfigsDispatcher::TEvSetConfigSubscriptionResponse::TPtr&) {
- LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Subscribed for config changes on node " << SelfId().NodeId());
+ SA_LOG_I("Subscribed for config changes on node " << SelfId().NodeId());
}
void HandleConfig(NConsole::TEvConsole::TEvConfigNotificationRequest::TPtr& ev) {
@@ -240,8 +238,7 @@ private:
bool IsNotCurrentRound(ui64 round) {
if (round != AggregationStatistics.Round) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Event round " << round << " is different from the current " << AggregationStatistics.Round);
+ SA_LOG_D("Event round " << round << " is different from the current " << AggregationStatistics.Round);
return true;
}
return false;
@@ -306,8 +303,7 @@ private:
const auto& record = ev->Get()->Record;
const auto tabletId = record.GetShardTabletId();
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Received TEvStatisticsResponse TabletId: " << tabletId);
+ SA_LOG_D("Received TEvStatisticsResponse TabletId: " << tabletId);
const auto round = ev->Cookie;
if (IsNotCurrentRound(round)) {
@@ -335,8 +331,7 @@ private:
const auto round = record.GetRound();
if (IsNotCurrentRound(round)) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Skip TEvAggregateKeepAliveAck");
+ SA_LOG_D("Skip TEvAggregateKeepAliveAck");
return;
}
@@ -346,8 +341,7 @@ private:
void Handle(TEvPrivate::TEvKeepAliveAckTimeout::TPtr& ev) {
const auto round = ev->Get()->Round;
if (IsNotCurrentRound(round)) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Skip TEvKeepAliveAckTimeout");
+ SA_LOG_D("Skip TEvKeepAliveAckTimeout");
return;
}
@@ -362,8 +356,7 @@ private:
// the parent node is unavailable
// invalidate the subtree with the root in the current node
- LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Parent node " << AggregationStatistics.ParentNode.NodeId() << " is unavailable");
+ SA_LOG_I("Parent node " << AggregationStatistics.ParentNode.NodeId() << " is unavailable");
ResetAggregationStatistics();
@@ -372,8 +365,7 @@ private:
void Handle(TEvPrivate::TEvDispatchKeepAlive::TPtr& ev) {
const auto round = ev->Get()->Round;
if (IsNotCurrentRound(round)) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Skip TEvDispatchKeepAlive");
+ SA_LOG_D("Skip TEvDispatchKeepAlive");
return;
}
@@ -387,8 +379,7 @@ private:
const auto round = ev->Get()->Round;
if (IsNotCurrentRound(round)) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Skip TEvKeepAliveTimeout");
+ SA_LOG_D("Skip TEvKeepAliveTimeout");
return;
}
@@ -396,8 +387,7 @@ private:
auto node = AggregationStatistics.GetProcessingChildNode(nodeId);
if (node == nullptr) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Skip TEvKeepAliveTimeout");
+ SA_LOG_D("Skip TEvKeepAliveTimeout");
return;
}
@@ -412,8 +402,7 @@ private:
node->Status = TAggregationStatistics::TNode::EStatus::Unavailable;
++AggregationStatistics.PprocessedNodes;
- LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Node " << nodeId << " is unavailable");
+ SA_LOG_I("Node " << nodeId << " is unavailable");
if (AggregationStatistics.IsCompleted()) {
OnAggregateStatisticsFinished();
@@ -425,8 +414,7 @@ private:
const auto round = record.GetRound();
if (IsNotCurrentRound(round)) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Skip TEvAggregateKeepAlive");
+ SA_LOG_D("Skip TEvAggregateKeepAlive");
return;
}
@@ -434,8 +422,7 @@ private:
auto node = AggregationStatistics.GetProcessingChildNode(nodeId);
if (node == nullptr) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Skip TEvAggregateKeepAlive");
+ SA_LOG_D( "Skip TEvAggregateKeepAlive");
return;
}
@@ -447,15 +434,13 @@ private:
}
void Handle(TEvStatistics::TEvAggregateStatisticsResponse::TPtr& ev) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Received TEvAggregateStatisticsResponse SenderNodeId: " << ev->Sender.NodeId());
+ SA_LOG_D("Received TEvAggregateStatisticsResponse SenderNodeId: " << ev->Sender.NodeId());
const auto& record = ev->Get()->Record;
const auto round = record.GetRound();
if (IsNotCurrentRound(round)) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Skip TEvAggregateStatisticsResponse");
+ SA_LOG_D("Skip TEvAggregateStatisticsResponse");
return;
}
@@ -463,8 +448,7 @@ private:
auto node = AggregationStatistics.GetProcessingChildNode(nodeId);
if (node == nullptr) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Skip TEvAggregateStatisticsResponse");
+ SA_LOG_D("Skip TEvAggregateStatisticsResponse");
return;
}
@@ -504,8 +488,7 @@ private:
}
void SendAggregateStatisticsResponse() {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Send aggregate statistics response to node: " << AggregationStatistics.ParentNode.NodeId());
+ SA_LOG_D("Send aggregate statistics response to node: " << AggregationStatistics.ParentNode.NodeId());
auto response = std::make_unique<TEvStatistics::TEvAggregateStatisticsResponse>();
auto& record = response->Record;
@@ -590,8 +573,7 @@ private:
const auto& record = ev->Get()->Record;
const auto round = record.GetRound();
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Received TEvAggregateStatistics from node: " << ev->Sender.NodeId()
+ SA_LOG_D("Received TEvAggregateStatistics from node: " << ev->Sender.NodeId()
<< ", Round: " << round << ", current Round: " << AggregationStatistics.Round);
// reset previous state
@@ -670,8 +652,7 @@ private:
return;
}
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Handle TEvStatistics::TEvGetStatistics, request id = " << requestId
+ SA_LOG_D("Handle TEvStatistics::TEvGetStatistics, request id = " << requestId
<< ", ReplyToActorId = " << request.ReplyToActorId
<< ", StatRequests.size() = " << request.StatRequests.size());
@@ -715,8 +696,7 @@ private:
auto cookie = navigate->Cookie;
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult, request id = " << cookie);
+ SA_LOG_D("Handle TEvTxProxySchemeCache::TEvNavigateKeySetResult, request id = " << cookie);
if (cookie == ResolveSACookie) {
Y_ABORT_UNLESS(navigate->ResultSet.size() == 1);
@@ -732,7 +712,14 @@ private:
ConnectToSA();
SyncNode();
} else {
- ReplyAllFailed();
+ for (auto it = InFlight.begin(); it != InFlight.end();) {
+ if (EStatType::COUNT_MIN_SKETCH == it->second.StatType) {
+ ++it;
+ continue;
+ }
+ ReplyFailed(it->first, false);
+ it = InFlight.erase(it);
+ }
}
return;
}
@@ -838,8 +825,7 @@ private:
}
void Handle(TEvStatistics::TEvPropagateStatistics::TPtr& ev) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "EvPropagateStatistics, node id = " << SelfId().NodeId());
+ SA_LOG_D("EvPropagateStatistics, node id = " << SelfId().NodeId());
Send(ev->Sender, new TEvStatistics::TEvPropagateStatisticsResponse);
@@ -923,21 +909,18 @@ private:
void Handle(TEvPrivate::TEvStatisticsRequestTimeout::TPtr& ev) {
const auto round = ev->Get()->Round;
if (IsNotCurrentRound(round)) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Skip TEvStatisticsRequestTimeout");
+ SA_LOG_D("Skip TEvStatisticsRequestTimeout");
return;
}
const auto tabletId = ev->Get()->TabletId;
auto tabletPipe = AggregationStatistics.LocalTablets.TabletsPipes.find(tabletId);
if (tabletPipe == AggregationStatistics.LocalTablets.TabletsPipes.end()) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Tablet " << tabletId << " has already been processed");
+ SA_LOG_D("Tablet " << tabletId << " has already been processed");
return;
}
- LOG_ERROR_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "No result was received from the tablet " << tabletId);
+ SA_LOG_E("No result was received from the tablet " << tabletId);
auto clientId = tabletPipe->second;
OnTabletError(tabletId);
@@ -962,15 +945,13 @@ private:
NTabletPipe::SendData(SelfId(), clientId, request.release(), round);
Schedule(Settings.StatisticsRequestTimeout, new TEvPrivate::TEvStatisticsRequestTimeout(round, tabletId));
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "TEvStatisticsRequest send"
+ SA_LOG_D("TEvStatisticsRequest send"
<< ", client id = " << clientId
<< ", path = " << *path);
}
void OnTabletError(ui64 tabletId) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Tablet " << tabletId << " is not local.");
+ SA_LOG_D("Tablet " << tabletId << " is not local.");
const auto error = NKikimrStat::TEvAggregateStatisticsResponse::TYPE_NON_LOCAL_TABLET;
AggregationStatistics.FailedTablets.emplace_back(tabletId, 0, error);
@@ -988,8 +969,7 @@ private:
const auto& clientId = ev->Get()->ClientId;
const auto& tabletId = ev->Get()->TabletId;
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "EvClientConnected"
+ SA_LOG_D("EvClientConnected"
<< ", node id = " << ev->Get()->ClientId.NodeId()
<< ", client id = " << clientId
<< ", server id = " << ev->Get()->ServerId
@@ -1018,16 +998,14 @@ private:
return;
}
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Skip EvClientConnected");
+ SA_LOG_D("Skip EvClientConnected");
}
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev) {
const auto& clientId = ev->Get()->ClientId;
const auto& tabletId = ev->Get()->TabletId;
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "EvClientDestroyed"
+ SA_LOG_D("EvClientDestroyed"
<< ", node id = " << ev->Get()->ClientId.NodeId()
<< ", client id = " << clientId
<< ", server id = " << ev->Get()->ServerId
@@ -1049,8 +1027,7 @@ private:
return;
}
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "Skip EvClientDestroyed");
+ SA_LOG_D("Skip EvClientDestroyed");
}
void Handle(TEvStatistics::TEvStatisticsIsDisabled::TPtr&) {
@@ -1060,13 +1037,19 @@ private:
void Handle(TEvStatistics::TEvLoadStatisticsQueryResponse::TPtr& ev) {
ui64 cookie = ev->Get()->Cookie;
-
auto itLoadQuery = LoadQueriesInFlight.find(cookie);
Y_ABORT_UNLESS(itLoadQuery != LoadQueriesInFlight.end());
auto [requestId, requestIndex] = itLoadQuery->second;
+ SA_LOG_D("TEvLoadStatisticsQueryResponse, request id = " << requestId);
+
auto itRequest = InFlight.find(requestId);
- Y_ABORT_UNLESS(itRequest != InFlight.end());
+ if (InFlight.end() == itRequest) {
+ SA_LOG_E("TEvLoadStatisticsQueryResponse, request id = " << requestId
+ << ". Request not found in InFlight");
+ return;
+ }
+
auto& request = itRequest->second;
auto& response = request.StatResponses[requestIndex];
@@ -1093,8 +1076,7 @@ private:
}
void Handle(TEvPrivate::TEvRequestTimeout::TPtr& ev) {
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "EvRequestTimeout"
+ SA_LOG_D("EvRequestTimeout"
<< ", pipe client id = " << ev->Get()->PipeClientId
<< ", schemeshard count = " << ev->Get()->NeedSchemeShards.size());
@@ -1126,8 +1108,7 @@ private:
NTabletPipe::TClientConfig pipeConfig{.RetryPolicy = policy};
SAPipeClientId = Register(NTabletPipe::CreateClient(SelfId(), StatisticsAggregatorId, pipeConfig));
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "ConnectToSA(), pipe client id = " << SAPipeClientId);
+ SA_LOG_D("ConnectToSA(), pipe client id = " << SAPipeClientId);
}
void SyncNode() {
@@ -1156,8 +1137,7 @@ private:
Schedule(RequestTimeout, timeout.release());
}
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "SyncNode(), pipe client id = " << SAPipeClientId);
+ SA_LOG_D("SyncNode(), pipe client id = " << SAPipeClientId);
}
void ReplySuccess(ui64 requestId, bool eraseRequest) {
@@ -1167,8 +1147,7 @@ private:
}
auto& request = itRequest->second;
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "ReplySuccess(), request id = " << requestId
+ SA_LOG_D("ReplySuccess(), request id = " << requestId
<< ", ReplyToActorId = " << request.ReplyToActorId
<< ", StatRequests.size() = " << request.StatRequests.size());
@@ -1214,8 +1193,7 @@ private:
}
auto& request = itRequest->second;
- LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
- "ReplyFailed(), request id = " << requestId);
+ SA_LOG_D("ReplyFailed(), request id = " << requestId);
auto result = std::make_unique<TEvStatistics::TEvGetStatisticsResult>();
result->Success = false;