aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <145343289+azevaykin@users.noreply.github.com>2024-08-23 13:12:28 +0300
committerGitHub <noreply@github.com>2024-08-23 13:12:28 +0300
commit1869eedd3e49984e0b363ceda467bc1fc7b16214 (patch)
treea34630387857bdb3dc5f217b39098d79b31b6bf6
parent3ca30a5cff6dbf72e9b7575c0ab9b8f9c37a7b3f (diff)
downloadydb-1869eedd3e49984e0b363ceda467bc1fc7b16214.tar.gz
Statistics: retry AnalyzeTable to ColumnShard (#8190)
-rw-r--r--ydb/core/protos/counters_statistics_aggregator.proto1
-rw-r--r--ydb/core/statistics/aggregator/aggregator_impl.cpp13
-rw-r--r--ydb/core/statistics/aggregator/aggregator_impl.h7
-rw-r--r--ydb/core/statistics/aggregator/tx_analyze_table_delivery_problem.cpp48
-rw-r--r--ydb/core/statistics/aggregator/tx_init.cpp1
-rw-r--r--ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp19
-rw-r--r--ydb/core/statistics/aggregator/ya.make1
7 files changed, 89 insertions, 1 deletions
diff --git a/ydb/core/protos/counters_statistics_aggregator.proto b/ydb/core/protos/counters_statistics_aggregator.proto
index eb263bab21..0947ca7951 100644
--- a/ydb/core/protos/counters_statistics_aggregator.proto
+++ b/ydb/core/protos/counters_statistics_aggregator.proto
@@ -22,4 +22,5 @@ enum ETxTypes {
TXTYPE_ACK_TIMEOUT = 12 [(TxTypeOpts) = {Name: "TxAckTimeout"}];
TXTYPE_ANALYZE_TABLE_REQUEST = 13 [(TxTypeOpts) = {Name: "TxAnalyzeTableRequest"}];
TXTYPE_ANALYZE_TABLE_RESPONSE = 14 [(TxTypeOpts) = {Name: "TxAnalyzeTableResponse"}];
+ TXTYPE_ANALYZE_TABLE_DELIVERY_PROBLEM = 15 [(TxTypeOpts) = {Name: "TTxAnalyzeTableDeliveryProblem"}];
}
diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp
index 98dde3778c..baedc73b0c 100644
--- a/ydb/core/statistics/aggregator/aggregator_impl.cpp
+++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp
@@ -404,11 +404,24 @@ void TStatisticsAggregator::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
auto tabletId = ev->Get()->TabletId;
if (TraversalIsColumnTable) {
if (tabletId == HiveId) {
+ SA_LOG_E("[" << TabletID() << "] TEvDeliveryProblem with HiveId=" << tabletId);
Schedule(HiveRetryInterval, new TEvPrivate::TEvRequestDistribution);
} else {
+ for (TForceTraversalOperation& operation : ForceTraversals) {
+ for (TForceTraversalTable& operationTable : operation.Tables) {
+ for (TAnalyzedShard& shard : operationTable.AnalyzedShards) {
+ if (shard.ShardTabletId == tabletId) {
+ SA_LOG_E("[" << TabletID() << "] TEvDeliveryProblem with ColumnShard=" << tabletId);
+ shard.Status = TAnalyzedShard::EStatus::DeliveryProblem;
+ return;
+ }
+ }
+ }
+ }
SA_LOG_CRIT("[" << TabletID() << "] TEvDeliveryProblem with unexpected tablet " << tabletId);
}
} else {
+ SA_LOG_E("[" << TabletID() << "] TEvDeliveryProblem with DataShard=" << tabletId);
if (DatashardRanges.empty()) {
return;
}
diff --git a/ydb/core/statistics/aggregator/aggregator_impl.h b/ydb/core/statistics/aggregator/aggregator_impl.h
index e6b63b79e3..f0419f7f2e 100644
--- a/ydb/core/statistics/aggregator/aggregator_impl.h
+++ b/ydb/core/statistics/aggregator/aggregator_impl.h
@@ -48,6 +48,7 @@ private:
struct TTxAnalyze;
struct TTxAnalyzeTableRequest;
struct TTxAnalyzeTableResponse;
+ struct TTxAnalyzeTableDeliveryProblem;
struct TTxNavigate;
struct TTxResolve;
struct TTxDatashardScanResponse;
@@ -68,6 +69,7 @@ private:
EvResolve,
EvAckTimeout,
EvSendAnalyze,
+ EvAnalyzeDeliveryProblem,
EvEnd
};
@@ -80,6 +82,7 @@ private:
struct TEvRequestDistribution : public TEventLocal<TEvRequestDistribution, EvRequestDistribution> {};
struct TEvResolve : public TEventLocal<TEvResolve, EvResolve> {};
struct TEvSendAnalyze : public TEventLocal<TEvSendAnalyze, EvSendAnalyze> {};
+ struct TEvAnalyzeDeliveryProblem : public TEventLocal<TEvAnalyzeDeliveryProblem, EvAnalyzeDeliveryProblem> {};
struct TEvAckTimeout : public TEventLocal<TEvAckTimeout, EvAckTimeout> {
size_t SeqNo = 0;
@@ -142,6 +145,7 @@ private:
void Handle(TEvStatistics::TEvAggregateKeepAlive::TPtr& ev);
void Handle(TEvPrivate::TEvAckTimeout::TPtr& ev);
void Handle(TEvPrivate::TEvSendAnalyze::TPtr& ev);
+ void Handle(TEvPrivate::TEvAnalyzeDeliveryProblem::TPtr& ev);
void InitializeStatisticsTable();
void Navigate();
@@ -204,6 +208,7 @@ private:
hFunc(TEvStatistics::TEvAggregateKeepAlive, Handle);
hFunc(TEvPrivate::TEvAckTimeout, Handle);
hFunc(TEvPrivate::TEvSendAnalyze, Handle);
+ hFunc(TEvPrivate::TEvAnalyzeDeliveryProblem, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
@@ -311,6 +316,7 @@ private:
static constexpr size_t SendAnalyzeCount = 100;
static constexpr TDuration SendAnalyzePeriod = TDuration::Seconds(1);
+ static constexpr TDuration AnalyzeDeliveryProblemPeriod = TDuration::Seconds(1);
enum ENavigateType {
Analyze,
@@ -348,6 +354,7 @@ private: // stored in local db
enum class EStatus : ui8 {
None,
+ DeliveryProblem,
AnalyzeStarted,
AnalyzeFinished,
};
diff --git a/ydb/core/statistics/aggregator/tx_analyze_table_delivery_problem.cpp b/ydb/core/statistics/aggregator/tx_analyze_table_delivery_problem.cpp
new file mode 100644
index 0000000000..b975069797
--- /dev/null
+++ b/ydb/core/statistics/aggregator/tx_analyze_table_delivery_problem.cpp
@@ -0,0 +1,48 @@
+#include "aggregator_impl.h"
+
+#include <ydb/core/protos/hive.pb.h>
+#include <ydb/core/statistics/service/service.h>
+
+#include <util/string/vector.h>
+
+namespace NKikimr::NStat {
+
+struct TStatisticsAggregator::TTxAnalyzeTableDeliveryProblem : public TTxBase {
+ std::vector<std::unique_ptr<IEventBase>> Events;
+
+ TTxAnalyzeTableDeliveryProblem(TSelf* self)
+ : TTxBase(self)
+ {}
+
+ TTxType GetTxType() const override { return TXTYPE_ANALYZE_TABLE_DELIVERY_PROBLEM; }
+
+ bool Execute(TTransactionContext&, const TActorContext&) override {
+ SA_LOG_T("[" << Self->TabletID() << "] TTxAnalyzeTableDeliveryProblem::Execute");
+
+ for (TForceTraversalOperation& operation : Self->ForceTraversals) {
+ for (TForceTraversalTable& operationTable : operation.Tables) {
+ for(TAnalyzedShard& analyzedShard : operationTable.AnalyzedShards) {
+ if (analyzedShard.Status == TAnalyzedShard::EStatus::DeliveryProblem) {
+ SA_LOG_D("[" << Self->TabletID() << "] Reset DeliveryProblem to ColumnShard=" << analyzedShard.ShardTabletId);
+ analyzedShard.Status = TAnalyzedShard::EStatus::None;
+ }
+ }
+ }
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext& ctx) override {
+ SA_LOG_T("[" << Self->TabletID() << "] TTxAnalyzeTableDeliveryProblem::Complete");
+
+ ctx.Schedule(AnalyzeDeliveryProblemPeriod, new TEvPrivate::TEvAnalyzeDeliveryProblem());
+ }
+};
+
+void TStatisticsAggregator::Handle(TEvPrivate::TEvAnalyzeDeliveryProblem::TPtr&) {
+ Execute(new TTxAnalyzeTableDeliveryProblem(this),
+ TActivationContext::AsActorContext());
+}
+
+} // NKikimr::NStat
diff --git a/ydb/core/statistics/aggregator/tx_init.cpp b/ydb/core/statistics/aggregator/tx_init.cpp
index d97521eda5..2a774d4132 100644
--- a/ydb/core/statistics/aggregator/tx_init.cpp
+++ b/ydb/core/statistics/aggregator/tx_init.cpp
@@ -274,6 +274,7 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
if (Self->EnableColumnStatistics) {
Self->Schedule(Self->TraversalPeriod, new TEvPrivate::TEvScheduleTraversal());
Self->Schedule(Self->SendAnalyzePeriod, new TEvPrivate::TEvSendAnalyze());
+ Self->Schedule(Self->AnalyzeDeliveryProblemPeriod, new TEvPrivate::TEvAnalyzeDeliveryProblem());
} else {
SA_LOG_W("[" << Self->TabletID() << "] TTxInit::Complete. EnableColumnStatistics=false");
}
diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp
index 6b5beea5ac..6edffe5e31 100644
--- a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp
+++ b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp
@@ -4,7 +4,7 @@
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
-#include <thread>
+#include <ydb/core/testlib/actors/block_events.h>
namespace NKikimr {
namespace NStat {
@@ -226,6 +226,23 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) {
runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
}
+ Y_UNIT_TEST(AnalyzeRebootColumnShard) {
+ TTestEnv env(1, 1);
+ auto& runtime = *env.GetServer().GetRuntime();
+ auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0];
+ auto sender = runtime.AllocateEdgeActor();
+
+ TBlockEvents<TEvStatistics::TEvAnalyzeTableResponse> block(runtime);
+
+ auto analyzeRequest = MakeAnalyzeRequest({tableInfo.PathId});
+ runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest.release());
+
+ runtime.WaitFor("TEvAnalyzeTableResponse", [&]{ return block.size(); });
+ block.Stop();
+ RebootTablet(runtime, tableInfo.ShardIds[0], sender);
+
+ runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender);
+ }
}
} // NStat
diff --git a/ydb/core/statistics/aggregator/ya.make b/ydb/core/statistics/aggregator/ya.make
index 8898d92bfc..e63230eebc 100644
--- a/ydb/core/statistics/aggregator/ya.make
+++ b/ydb/core/statistics/aggregator/ya.make
@@ -10,6 +10,7 @@ SRCS(
tx_ack_timeout.cpp
tx_aggr_stat_response.cpp
tx_analyze.cpp
+ tx_analyze_table_delivery_problem.cpp
tx_analyze_table_request.cpp
tx_analyze_table_response.cpp
tx_configure.cpp