diff options
author | azevaykin <145343289+azevaykin@users.noreply.github.com> | 2024-08-23 13:12:28 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-23 13:12:28 +0300 |
commit | 1869eedd3e49984e0b363ceda467bc1fc7b16214 (patch) | |
tree | a34630387857bdb3dc5f217b39098d79b31b6bf6 | |
parent | 3ca30a5cff6dbf72e9b7575c0ab9b8f9c37a7b3f (diff) | |
download | ydb-1869eedd3e49984e0b363ceda467bc1fc7b16214.tar.gz |
Statistics: retry AnalyzeTable to ColumnShard (#8190)
7 files changed, 89 insertions, 1 deletions
diff --git a/ydb/core/protos/counters_statistics_aggregator.proto b/ydb/core/protos/counters_statistics_aggregator.proto index eb263bab21..0947ca7951 100644 --- a/ydb/core/protos/counters_statistics_aggregator.proto +++ b/ydb/core/protos/counters_statistics_aggregator.proto @@ -22,4 +22,5 @@ enum ETxTypes { TXTYPE_ACK_TIMEOUT = 12 [(TxTypeOpts) = {Name: "TxAckTimeout"}]; TXTYPE_ANALYZE_TABLE_REQUEST = 13 [(TxTypeOpts) = {Name: "TxAnalyzeTableRequest"}]; TXTYPE_ANALYZE_TABLE_RESPONSE = 14 [(TxTypeOpts) = {Name: "TxAnalyzeTableResponse"}]; + TXTYPE_ANALYZE_TABLE_DELIVERY_PROBLEM = 15 [(TxTypeOpts) = {Name: "TTxAnalyzeTableDeliveryProblem"}]; } diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp index 98dde3778c..baedc73b0c 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.cpp +++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp @@ -404,11 +404,24 @@ void TStatisticsAggregator::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { auto tabletId = ev->Get()->TabletId; if (TraversalIsColumnTable) { if (tabletId == HiveId) { + SA_LOG_E("[" << TabletID() << "] TEvDeliveryProblem with HiveId=" << tabletId); Schedule(HiveRetryInterval, new TEvPrivate::TEvRequestDistribution); } else { + for (TForceTraversalOperation& operation : ForceTraversals) { + for (TForceTraversalTable& operationTable : operation.Tables) { + for (TAnalyzedShard& shard : operationTable.AnalyzedShards) { + if (shard.ShardTabletId == tabletId) { + SA_LOG_E("[" << TabletID() << "] TEvDeliveryProblem with ColumnShard=" << tabletId); + shard.Status = TAnalyzedShard::EStatus::DeliveryProblem; + return; + } + } + } + } SA_LOG_CRIT("[" << TabletID() << "] TEvDeliveryProblem with unexpected tablet " << tabletId); } } else { + SA_LOG_E("[" << TabletID() << "] TEvDeliveryProblem with DataShard=" << tabletId); if (DatashardRanges.empty()) { return; } diff --git a/ydb/core/statistics/aggregator/aggregator_impl.h b/ydb/core/statistics/aggregator/aggregator_impl.h index e6b63b79e3..f0419f7f2e 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.h +++ b/ydb/core/statistics/aggregator/aggregator_impl.h @@ -48,6 +48,7 @@ private: struct TTxAnalyze; struct TTxAnalyzeTableRequest; struct TTxAnalyzeTableResponse; + struct TTxAnalyzeTableDeliveryProblem; struct TTxNavigate; struct TTxResolve; struct TTxDatashardScanResponse; @@ -68,6 +69,7 @@ private: EvResolve, EvAckTimeout, EvSendAnalyze, + EvAnalyzeDeliveryProblem, EvEnd }; @@ -80,6 +82,7 @@ private: struct TEvRequestDistribution : public TEventLocal<TEvRequestDistribution, EvRequestDistribution> {}; struct TEvResolve : public TEventLocal<TEvResolve, EvResolve> {}; struct TEvSendAnalyze : public TEventLocal<TEvSendAnalyze, EvSendAnalyze> {}; + struct TEvAnalyzeDeliveryProblem : public TEventLocal<TEvAnalyzeDeliveryProblem, EvAnalyzeDeliveryProblem> {}; struct TEvAckTimeout : public TEventLocal<TEvAckTimeout, EvAckTimeout> { size_t SeqNo = 0; @@ -142,6 +145,7 @@ private: void Handle(TEvStatistics::TEvAggregateKeepAlive::TPtr& ev); void Handle(TEvPrivate::TEvAckTimeout::TPtr& ev); void Handle(TEvPrivate::TEvSendAnalyze::TPtr& ev); + void Handle(TEvPrivate::TEvAnalyzeDeliveryProblem::TPtr& ev); void InitializeStatisticsTable(); void Navigate(); @@ -204,6 +208,7 @@ private: hFunc(TEvStatistics::TEvAggregateKeepAlive, Handle); hFunc(TEvPrivate::TEvAckTimeout, Handle); hFunc(TEvPrivate::TEvSendAnalyze, Handle); + hFunc(TEvPrivate::TEvAnalyzeDeliveryProblem, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { @@ -311,6 +316,7 @@ private: static constexpr size_t SendAnalyzeCount = 100; static constexpr TDuration SendAnalyzePeriod = TDuration::Seconds(1); + static constexpr TDuration AnalyzeDeliveryProblemPeriod = TDuration::Seconds(1); enum ENavigateType { Analyze, @@ -348,6 +354,7 @@ private: // stored in local db enum class EStatus : ui8 { None, + DeliveryProblem, AnalyzeStarted, AnalyzeFinished, }; diff --git a/ydb/core/statistics/aggregator/tx_analyze_table_delivery_problem.cpp b/ydb/core/statistics/aggregator/tx_analyze_table_delivery_problem.cpp new file mode 100644 index 0000000000..b975069797 --- /dev/null +++ b/ydb/core/statistics/aggregator/tx_analyze_table_delivery_problem.cpp @@ -0,0 +1,48 @@ +#include "aggregator_impl.h" + +#include <ydb/core/protos/hive.pb.h> +#include <ydb/core/statistics/service/service.h> + +#include <util/string/vector.h> + +namespace NKikimr::NStat { + +struct TStatisticsAggregator::TTxAnalyzeTableDeliveryProblem : public TTxBase { + std::vector<std::unique_ptr<IEventBase>> Events; + + TTxAnalyzeTableDeliveryProblem(TSelf* self) + : TTxBase(self) + {} + + TTxType GetTxType() const override { return TXTYPE_ANALYZE_TABLE_DELIVERY_PROBLEM; } + + bool Execute(TTransactionContext&, const TActorContext&) override { + SA_LOG_T("[" << Self->TabletID() << "] TTxAnalyzeTableDeliveryProblem::Execute"); + + for (TForceTraversalOperation& operation : Self->ForceTraversals) { + for (TForceTraversalTable& operationTable : operation.Tables) { + for(TAnalyzedShard& analyzedShard : operationTable.AnalyzedShards) { + if (analyzedShard.Status == TAnalyzedShard::EStatus::DeliveryProblem) { + SA_LOG_D("[" << Self->TabletID() << "] Reset DeliveryProblem to ColumnShard=" << analyzedShard.ShardTabletId); + analyzedShard.Status = TAnalyzedShard::EStatus::None; + } + } + } + } + + return true; + } + + void Complete(const TActorContext& ctx) override { + SA_LOG_T("[" << Self->TabletID() << "] TTxAnalyzeTableDeliveryProblem::Complete"); + + ctx.Schedule(AnalyzeDeliveryProblemPeriod, new TEvPrivate::TEvAnalyzeDeliveryProblem()); + } +}; + +void TStatisticsAggregator::Handle(TEvPrivate::TEvAnalyzeDeliveryProblem::TPtr&) { + Execute(new TTxAnalyzeTableDeliveryProblem(this), + TActivationContext::AsActorContext()); +} + +} // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/tx_init.cpp b/ydb/core/statistics/aggregator/tx_init.cpp index d97521eda5..2a774d4132 100644 --- a/ydb/core/statistics/aggregator/tx_init.cpp +++ b/ydb/core/statistics/aggregator/tx_init.cpp @@ -274,6 +274,7 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { if (Self->EnableColumnStatistics) { Self->Schedule(Self->TraversalPeriod, new TEvPrivate::TEvScheduleTraversal()); Self->Schedule(Self->SendAnalyzePeriod, new TEvPrivate::TEvSendAnalyze()); + Self->Schedule(Self->AnalyzeDeliveryProblemPeriod, new TEvPrivate::TEvAnalyzeDeliveryProblem()); } else { SA_LOG_W("[" << Self->TabletID() << "] TTxInit::Complete. EnableColumnStatistics=false"); } diff --git a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp index 6b5beea5ac..6edffe5e31 100644 --- a/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp +++ b/ydb/core/statistics/aggregator/ut/ut_analyze_columnshard.cpp @@ -4,7 +4,7 @@ #include <ydb/core/tx/scheme_cache/scheme_cache.h> -#include <thread> +#include <ydb/core/testlib/actors/block_events.h> namespace NKikimr { namespace NStat { @@ -226,6 +226,23 @@ Y_UNIT_TEST_SUITE(AnalyzeColumnshard) { runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender); } + Y_UNIT_TEST(AnalyzeRebootColumnShard) { + TTestEnv env(1, 1); + auto& runtime = *env.GetServer().GetRuntime(); + auto tableInfo = CreateDatabaseColumnTables(env, 1, 1)[0]; + auto sender = runtime.AllocateEdgeActor(); + + TBlockEvents<TEvStatistics::TEvAnalyzeTableResponse> block(runtime); + + auto analyzeRequest = MakeAnalyzeRequest({tableInfo.PathId}); + runtime.SendToPipe(tableInfo.SaTabletId, sender, analyzeRequest.release()); + + runtime.WaitFor("TEvAnalyzeTableResponse", [&]{ return block.size(); }); + block.Stop(); + RebootTablet(runtime, tableInfo.ShardIds[0], sender); + + runtime.GrabEdgeEventRethrow<TEvStatistics::TEvAnalyzeResponse>(sender); + } } } // NStat diff --git a/ydb/core/statistics/aggregator/ya.make b/ydb/core/statistics/aggregator/ya.make index 8898d92bfc..e63230eebc 100644 --- a/ydb/core/statistics/aggregator/ya.make +++ b/ydb/core/statistics/aggregator/ya.make @@ -10,6 +10,7 @@ SRCS( tx_ack_timeout.cpp tx_aggr_stat_response.cpp tx_analyze.cpp + tx_analyze_table_delivery_problem.cpp tx_analyze_table_request.cpp tx_analyze_table_response.cpp tx_configure.cpp |