diff options
author | azevaykin <145343289+azevaykin@users.noreply.github.com> | 2024-08-15 17:23:37 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-15 17:23:37 +0300 |
commit | d24978e629ff59a74c0271e0e42c05853ea78642 (patch) | |
tree | 8c4b058fe8fc8c26a7577095bdd97ea9ee2c2d97 | |
parent | 929e8f5b39dc726b5a1db376198d1370391a3bc8 (diff) | |
download | ydb-d24978e629ff59a74c0271e0e42c05853ea78642.tar.gz |
Statistics: Two force traversal collections in local DB (#7765)
-rw-r--r-- | ydb/core/statistics/aggregator/aggregator_impl.cpp | 109 | ||||
-rw-r--r-- | ydb/core/statistics/aggregator/aggregator_impl.h | 30 | ||||
-rw-r--r-- | ydb/core/statistics/aggregator/schema.h | 43 | ||||
-rw-r--r-- | ydb/core/statistics/aggregator/tx_analyze_table.cpp | 76 | ||||
-rw-r--r-- | ydb/core/statistics/aggregator/tx_finish_trasersal.cpp | 13 | ||||
-rw-r--r-- | ydb/core/statistics/aggregator/tx_init.cpp | 88 | ||||
-rw-r--r-- | ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp | 9 | ||||
-rw-r--r-- | ydb/core/statistics/ut_common/ut_common.cpp | 3 |
8 files changed, 246 insertions, 125 deletions
diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp index 12ba2dc9c1c..679034e2fa9 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.cpp +++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp @@ -443,8 +443,8 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyzeStatus::TPtr& ev) { if (ForceTraversalOperationId == operationId) { outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS); } else { - if (std::any_of(ForceTraversals.begin(), ForceTraversals.end(), - [&operationId](const TForceTraversal& elem) { return elem.OperationId == operationId;})) { + auto forceTraversalOperation = ForceTraversalOperation(operationId); + if (forceTraversalOperation) { outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED); } else { outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION); @@ -580,22 +580,35 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) { TPathId pathId; - if (!ForceTraversals.empty() && !LastTraversalWasForce) { + if (!LastTraversalWasForce) { LastTraversalWasForce = true; - TForceTraversal& operation = ForceTraversals.front(); - pathId = operation.PathId; + for (TForceTraversalOperation& operation : ForceTraversals) { + for (TForceTraversalTable& operationTable : operation.Tables) { + if (operationTable.Status == TForceTraversalTable::EStatus::None) { + operationTable.Status = TForceTraversalTable::EStatus::RequestSent; + db.Table<Schema::ForceTraversalTables>().Key(operation.OperationId, operationTable.PathId.OwnerId, operationTable.PathId.LocalPathId) + .Update(NIceDb::TUpdate<Schema::ForceTraversalTables::Status>((ui64)operationTable.Status)); - ForceTraversalOperationId = operation.OperationId; - ForceTraversalColumnTags = operation.ColumnTags; - ForceTraversalTypes = operation.Types; - ForceTraversalReplyToActorId = operation.ReplyToActorId; + pathId = operationTable.PathId; + break; + } + } + + if (!pathId) { + SA_LOG_D("[" << TabletID() << "] All the force traversal tables sent the requests. OperationId=" << operation.OperationId); + continue; + } - PersistForceTraversal(db); + ForceTraversalOperationId = operation.OperationId; + } -// db.Table<Schema::ForceTraversals>().Key(operation.OperationId, operation.PathId.OwnerId, operation.PathId.LocalPathId).Delete(); - ForceTraversals.pop_front(); - } else if (!ScheduleTraversalsByTime.Empty()){ + if (!pathId) { + SA_LOG_D("[" << TabletID() << "] All the force traversal operations sent the requests."); + } + } + + if (!pathId && !ScheduleTraversalsByTime.Empty()){ LastTraversalWasForce = false; auto* oldestTable = ScheduleTraversalsByTime.Top(); @@ -606,8 +619,10 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) { } pathId = oldestTable->PathId; - } else { - SA_LOG_E("[" << TabletID() << "] No schedule traversal from schemeshard."); + } + + if (!pathId) { + SA_LOG_E("[" << TabletID() << "] No traversal from schemeshard."); return; } @@ -653,6 +668,15 @@ void TStatisticsAggregator::FinishTraversal(NIceDb::TNiceDb& db) { } } + auto forceTraversalOperation = CurrentForceTraversalOperation(); + if (forceTraversalOperation) { + bool tablesRemained = std::any_of(forceTraversalOperation->Tables.begin(), forceTraversalOperation->Tables.end(), + [](const TForceTraversalTable& elem) { return elem.Status == TForceTraversalTable::EStatus::None;}); + if (!tablesRemained) { + DeleteForceTraversalOperation(ForceTraversalOperationId, db); + } + } + ResetTraversalState(db); } @@ -660,6 +684,50 @@ TString TStatisticsAggregator::LastTraversalWasForceString() const { return LastTraversalWasForce ? "force" : "schedule"; } +TStatisticsAggregator::TForceTraversalOperation* TStatisticsAggregator::CurrentForceTraversalOperation() { + return ForceTraversalOperation(ForceTraversalOperationId); +} + +TStatisticsAggregator::TForceTraversalOperation* TStatisticsAggregator::ForceTraversalOperation(const TString& operationId) { + auto forceTraversalOperation = std::find_if(ForceTraversals.begin(), ForceTraversals.end(), + [operationId](const TForceTraversalOperation& elem) { return elem.OperationId == operationId;}); + + if (forceTraversalOperation == ForceTraversals.end()) { + return nullptr; + } else { + return &*forceTraversalOperation; + } +} + +void TStatisticsAggregator::DeleteForceTraversalOperation(const TString& operationId, NIceDb::TNiceDb& db) { + db.Table<Schema::ForceTraversalOperations>().Key(ForceTraversalOperationId).Delete(); + + auto operation = ForceTraversalOperation(operationId); + for(const TForceTraversalTable& table : operation->Tables) { + db.Table<Schema::ForceTraversalTables>().Key(operationId, table.PathId.OwnerId, table.PathId.LocalPathId).Delete(); + } + + ForceTraversals.remove_if([operationId](const TForceTraversalOperation& elem) { return elem.OperationId == operationId;}); +} + +TStatisticsAggregator::TForceTraversalTable* TStatisticsAggregator::ForceTraversalTable(const TString& operationId, const TPathId& pathId) { + for (TForceTraversalOperation& operation : ForceTraversals) { + if (operation.OperationId == operationId) { + for (TForceTraversalTable& operationTable : operation.Tables) { + if (operationTable.PathId == pathId) { + return &operationTable; + } + } + } + } + + return nullptr; +} + +TStatisticsAggregator::TForceTraversalTable* TStatisticsAggregator::CurrentForceTraversalTable() { + return ForceTraversalTable(ForceTraversalOperationId, TraversalTableId.PathId); +} + void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) { db.Table<Schema::SysParams>().Key(id).Update( NIceDb::TUpdate<Schema::SysParams::Value>(value)); @@ -676,13 +744,6 @@ void TStatisticsAggregator::PersistStartKey(NIceDb::TNiceDb& db) { PersistSysParam(db, Schema::SysParam_TraversalStartKey, TraversalStartKey.GetBuffer()); } -void TStatisticsAggregator::PersistForceTraversal(NIceDb::TNiceDb& db) { - PersistSysParam(db, Schema::SysParam_ForceTraversalOperationId, ToString(ForceTraversalOperationId)); - PersistSysParam(db, Schema::SysParam_ForceTraversalCookie, ForceTraversalOperationId); - PersistSysParam(db, Schema::SysParam_ForceTraversalColumnTags, ToString(ForceTraversalColumnTags)); - PersistSysParam(db, Schema::SysParam_ForceTraversalTypes, ToString(ForceTraversalTypes)); -} - void TStatisticsAggregator::PersistGlobalTraversalRound(NIceDb::TNiceDb& db) { PersistSysParam(db, Schema::SysParam_GlobalTraversalRound, ToString(GlobalTraversalRound)); } @@ -690,16 +751,12 @@ void TStatisticsAggregator::PersistGlobalTraversalRound(NIceDb::TNiceDb& db) { void TStatisticsAggregator::ResetTraversalState(NIceDb::TNiceDb& db) { ForceTraversalOperationId.clear(); TraversalTableId.PathId = TPathId(); - ForceTraversalColumnTags.clear(); - ForceTraversalTypes.clear(); TraversalStartTime = TInstant::MicroSeconds(0); PersistTraversal(db); TraversalStartKey = TSerializedCellVec(); PersistStartKey(db); - ForceTraversalReplyToActorId = {}; - for (auto& [tag, _] : CountMinSketches) { db.Table<Schema::ColumnStatistics>().Key(tag).Delete(); } diff --git a/ydb/core/statistics/aggregator/aggregator_impl.h b/ydb/core/statistics/aggregator/aggregator_impl.h index 60dc0998a85..a572a57a1b7 100644 --- a/ydb/core/statistics/aggregator/aggregator_impl.h +++ b/ydb/core/statistics/aggregator/aggregator_impl.h @@ -145,7 +145,6 @@ private: void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value); void PersistTraversal(NIceDb::TNiceDb& db); - void PersistForceTraversal(NIceDb::TNiceDb& db); void PersistStartKey(NIceDb::TNiceDb& db); void PersistGlobalTraversalRound(NIceDb::TNiceDb& db); @@ -240,8 +239,6 @@ private: std::queue<TEvStatistics::TEvRequestStats::TPtr> PendingRequests; bool ProcessUrgentInFlight = false; - TActorId ForceTraversalReplyToActorId = {}; - bool IsSchemeshardSeen = false; bool IsStatisticsTableCreated = false; bool PendingSaveStatistics = false; @@ -306,8 +303,7 @@ private: private: // stored in local db TString ForceTraversalOperationId; - TString ForceTraversalColumnTags; - TString ForceTraversalTypes; + TTableId TraversalTableId; bool TraversalIsColumnTable = false; TSerializedCellVec TraversalStartKey; @@ -323,14 +319,32 @@ private: // stored in local db TTraversalsByTime; TTraversalsByTime ScheduleTraversalsByTime; - struct TForceTraversal { - TString OperationId; + struct TForceTraversalTable { TPathId PathId; TString ColumnTags; + + enum class EStatus : ui8 { + None, + RequestSent, + ResponseReceived, + }; + EStatus Status = EStatus::None; + }; + struct TForceTraversalOperation { + TString OperationId; + std::list<TForceTraversalTable> Tables; TString Types; TActorId ReplyToActorId; }; - std::list<TForceTraversal> ForceTraversals; + std::list<TForceTraversalOperation> ForceTraversals; + +private: + TForceTraversalOperation* CurrentForceTraversalOperation(); + TForceTraversalOperation* ForceTraversalOperation(const TString& operationId); + void DeleteForceTraversalOperation(const TString& operationId, NIceDb::TNiceDb& db); + + TForceTraversalTable* ForceTraversalTable(const TString& operationId, const TPathId& pathId); + TForceTraversalTable* CurrentForceTraversalTable(); }; } // NKikimr::NStat diff --git a/ydb/core/statistics/aggregator/schema.h b/ydb/core/statistics/aggregator/schema.h index 036902199f1..d897dc8cd8b 100644 --- a/ydb/core/statistics/aggregator/schema.h +++ b/ydb/core/statistics/aggregator/schema.h @@ -45,32 +45,45 @@ struct TAggregatorSchema : NIceDb::Schema { IsColumnTable >; }; -/* - struct ForceTraversals : Table<5> { - struct OperationId : Column<1, NScheme::NTypeIds::Uint64> {}; + + // struct ForceTraversals : Table<5> + + struct ForceTraversalOperations : Table<6> { + struct OperationId : Column<1, NScheme::NTypeIds::String> {}; + struct Types : Column<2, NScheme::NTypeIds::String> {}; + + using TKey = TableKey<OperationId>; + using TColumns = TableColumns< + OperationId, + Types + >; + }; + + struct ForceTraversalTables : Table<7> { + struct OperationId : Column<1, NScheme::NTypeIds::String> {}; struct OwnerId : Column<2, NScheme::NTypeIds::Uint64> {}; struct LocalPathId : Column<3, NScheme::NTypeIds::Uint64> {}; - struct Cookie : Column<4, NScheme::NTypeIds::String> {}; - struct ColumnTags : Column<5, NScheme::NTypeIds::String> {}; - struct Types : Column<6, NScheme::NTypeIds::String> {}; + struct ColumnTags : Column<4, NScheme::NTypeIds::String> {}; + struct Status : Column<5, NScheme::NTypeIds::Uint64> {}; using TKey = TableKey<OperationId, OwnerId, LocalPathId>; using TColumns = TableColumns< OperationId, OwnerId, LocalPathId, - Cookie, ColumnTags, - Types + Status >; }; -*/ + using TTables = SchemaTables< SysParams, BaseStatistics, ColumnStatistics, - ScheduleTraversals -// ForceTraversals + ScheduleTraversals, +// ForceTraversals, + ForceTraversalOperations, + ForceTraversalTables >; using TSettings = SchemaSettings< @@ -80,12 +93,12 @@ struct TAggregatorSchema : NIceDb::Schema { static constexpr ui64 SysParam_Database = 1; static constexpr ui64 SysParam_TraversalStartKey = 2; - static constexpr ui64 SysParam_ForceTraversalOperationId = 3; + // deprecated 3 static constexpr ui64 SysParam_TraversalTableOwnerId = 4; static constexpr ui64 SysParam_TraversalTableLocalPathId = 5; - static constexpr ui64 SysParam_ForceTraversalCookie = 6; - static constexpr ui64 SysParam_ForceTraversalColumnTags = 7; - static constexpr ui64 SysParam_ForceTraversalTypes = 8; + // deprecated 6 + // deprecated 7 + // deprecated 8 static constexpr ui64 SysParam_TraversalStartTime = 9; // deprecated 10 static constexpr ui64 SysParam_TraversalIsColumnTable = 11; diff --git a/ydb/core/statistics/aggregator/tx_analyze_table.cpp b/ydb/core/statistics/aggregator/tx_analyze_table.cpp index 6dde96c8e54..0352c72ed2e 100644 --- a/ydb/core/statistics/aggregator/tx_analyze_table.cpp +++ b/ydb/core/statistics/aggregator/tx_analyze_table.cpp @@ -28,48 +28,64 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase { NIceDb::TNiceDb db(txc.DB); const TString operationId = Record.GetOperationId(); - const TString types = JoinVectorIntoString(TVector<ui32>(Record.GetTypes().begin(), Record.GetTypes().end()), ","); - - for (const auto& table : Record.GetTables()) { - const TPathId pathId = PathIdFromPathId(table.GetPathId()); - const TString columnTags = JoinVectorIntoString(TVector<ui32>{table.GetColumnTags().begin(),table.GetColumnTags().end()},","); - // check existing force traversal with the same cookie and path - auto forceTraversal = std::find_if(Self->ForceTraversals.begin(), Self->ForceTraversals.end(), - [&pathId, &operationId](const TForceTraversal& elem) { - return elem.PathId == pathId - && elem.OperationId == operationId;}); + // check existing force traversal with the same OperationId + const auto existingOperation = Self->ForceTraversalOperation(operationId); - // update existing force traversal - if (forceTraversal != Self->ForceTraversals.end()) { - SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Update existing force traversal. PathId " << pathId << " , ReplyToActorId " << ReplyToActorId); - forceTraversal->ReplyToActorId = ReplyToActorId; + // update existing force traversal + if (existingOperation) { + if (existingOperation->Tables.size() == Record.TablesSize()) { + SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Update existing force traversal. OperationId " << operationId << " , ReplyToActorId " << ReplyToActorId); + existingOperation->ReplyToActorId = ReplyToActorId; return true; + } else { + SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Delete broken force traversal. OperationId " << operationId << " , ReplyToActorId " << ReplyToActorId); + Self->DeleteForceTraversalOperation(operationId, db); } + } + + SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Create new force traversal operation, OperationId=" << operationId); + const TString types = JoinVectorIntoString(TVector<ui32>(Record.GetTypes().begin(), Record.GetTypes().end()), ","); - SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Create new force traversal operation for pathId " << pathId); + // create new force trasersal + TForceTraversalOperation operation { + .OperationId = operationId, + .Tables = {}, + .Types = types, + .ReplyToActorId = ReplyToActorId + }; - // create new force trasersal - TForceTraversal operation { - .OperationId = operationId, + for (const auto& table : Record.GetTables()) { + const TPathId pathId = PathIdFromPathId(table.GetPathId()); + const TString columnTags = JoinVectorIntoString(TVector<ui32>{table.GetColumnTags().begin(),table.GetColumnTags().end()},","); + const TForceTraversalTable::EStatus status = TForceTraversalTable::EStatus::None; + + SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Create new force traversal operation, OperationId=" << operationId << " , PathId " << pathId); + + // create new force traversal + TForceTraversalTable operationTable { .PathId = pathId, .ColumnTags = columnTags, - .Types = types, - .ReplyToActorId = ReplyToActorId + .Status = status }; - Self->ForceTraversals.emplace_back(operation); -/* - db.Table<Schema::ForceTraversals>().Key(Self->NextForceTraversalOperationId, pathId.OwnerId, pathId.LocalPathId).Update( - NIceDb::TUpdate<Schema::ForceTraversals::OperationId>(Self->NextForceTraversalOperationId), - NIceDb::TUpdate<Schema::ForceTraversals::OwnerId>(pathId.OwnerId), - NIceDb::TUpdate<Schema::ForceTraversals::LocalPathId>(pathId.LocalPathId), - NIceDb::TUpdate<Schema::ForceTraversals::Cookie>(cookie), - NIceDb::TUpdate<Schema::ForceTraversals::ColumnTags>(columnTags), - NIceDb::TUpdate<Schema::ForceTraversals::Types>(types) + operation.Tables.emplace_back(operationTable); + + db.Table<Schema::ForceTraversalTables>().Key(operationId, pathId.OwnerId, pathId.LocalPathId).Update( + NIceDb::TUpdate<Schema::ForceTraversalTables::OperationId>(operationId), + NIceDb::TUpdate<Schema::ForceTraversalTables::OwnerId>(pathId.OwnerId), + NIceDb::TUpdate<Schema::ForceTraversalTables::LocalPathId>(pathId.LocalPathId), + NIceDb::TUpdate<Schema::ForceTraversalTables::ColumnTags>(columnTags), + NIceDb::TUpdate<Schema::ForceTraversalTables::Status>((ui64)status) ); -*/ } + Self->ForceTraversals.emplace_back(operation); + + db.Table<Schema::ForceTraversalOperations>().Key(operationId).Update( + NIceDb::TUpdate<Schema::ForceTraversalOperations::OperationId>(operationId), + NIceDb::TUpdate<Schema::ForceTraversalOperations::Types>(types) + ); + return true; } diff --git a/ydb/core/statistics/aggregator/tx_finish_trasersal.cpp b/ydb/core/statistics/aggregator/tx_finish_trasersal.cpp index b9f7fc597fa..7feffc80137 100644 --- a/ydb/core/statistics/aggregator/tx_finish_trasersal.cpp +++ b/ydb/core/statistics/aggregator/tx_finish_trasersal.cpp @@ -13,8 +13,12 @@ struct TStatisticsAggregator::TTxFinishTraversal : public TTxBase { : TTxBase(self) , OperationId(self->ForceTraversalOperationId) , PathId(self->TraversalTableId.PathId) - , ReplyToActorId(self->ForceTraversalReplyToActorId) - {} + { + auto forceTraversal = Self->CurrentForceTraversalOperation(); + if (forceTraversal) { + ReplyToActorId = forceTraversal->ReplyToActorId; + } + } TTxType GetTxType() const override { return TXTYPE_FINISH_TRAVERSAL; } @@ -36,10 +40,9 @@ struct TStatisticsAggregator::TTxFinishTraversal : public TTxBase { return; } - bool operationsRemain = std::any_of(Self->ForceTraversals.begin(), Self->ForceTraversals.end(), - [this](const TForceTraversal& elem) { return elem.OperationId == OperationId;}); + auto forceTraversalRemained = Self->ForceTraversalOperation(OperationId); - if (operationsRemain) { + if (forceTraversalRemained) { SA_LOG_D("[" << Self->TabletID() << "] TTxFinishTraversal::Complete. Don't send TEvAnalyzeResponse. " << "There are pending operations, OperationId " << OperationId << " , ActorId=" << ReplyToActorId); } else { diff --git a/ydb/core/statistics/aggregator/tx_init.cpp b/ydb/core/statistics/aggregator/tx_init.cpp index e4012d7bbe3..58ec7b8040d 100644 --- a/ydb/core/statistics/aggregator/tx_init.cpp +++ b/ydb/core/statistics/aggregator/tx_init.cpp @@ -22,13 +22,15 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { auto baseStatisticsRowset = db.Table<Schema::BaseStatistics>().Range().Select(); auto statisticsRowset = db.Table<Schema::ColumnStatistics>().Range().Select(); auto scheduleTraversalRowset = db.Table<Schema::ScheduleTraversals>().Range().Select(); -// auto forceTraversalRowset = db.Table<Schema::ForceTraversals>().Range().Select(); + auto forceTraversalOperationsRowset = db.Table<Schema::ForceTraversalOperations>().Range().Select(); + auto forceTraversalTablesRowset = db.Table<Schema::ForceTraversalTables>().Range().Select(); if (!sysParamsRowset.IsReady() || !baseStatisticsRowset.IsReady() || !statisticsRowset.IsReady() || - !scheduleTraversalRowset.IsReady()) -// !forceTraversalRowset.IsReady()) + !scheduleTraversalRowset.IsReady() || + !forceTraversalOperationsRowset.IsReady() || + !forceTraversalTablesRowset.IsReady()) { return false; } @@ -54,11 +56,6 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { Self->TraversalStartKey = TSerializedCellVec(value); SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal start key"); break; - case Schema::SysParam_ForceTraversalOperationId: { - Self->ForceTraversalOperationId = value; - SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal operation id: " << value); - break; - } case Schema::SysParam_TraversalTableOwnerId: Self->TraversalTableId.PathId.OwnerId = FromString<ui64>(value); SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal table owner id: " @@ -69,16 +66,6 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal table local path id: " << Self->TraversalTableId.PathId.LocalPathId); break; - case Schema::SysParam_ForceTraversalColumnTags: { - Self->ForceTraversalColumnTags = value; - SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal columns tags: " << value); - break; - } - case Schema::SysParam_ForceTraversalTypes: { - Self->ForceTraversalTypes = value; - SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal types: " << value); - break; - } case Schema::SysParam_TraversalStartTime: { auto us = FromString<ui64>(value); Self->TraversalStartTime = TInstant::MicroSeconds(us); @@ -193,31 +180,22 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { << "table count# " << Self->ScheduleTraversals.size()); } - // ForceTraversals -/* + // ForceTraversalOperations { Self->ForceTraversals.clear(); - auto rowset = db.Table<Schema::ForceTraversals>().Range().Select(); + auto rowset = db.Table<Schema::ForceTraversalOperations>().Range().Select(); if (!rowset.IsReady()) { return false; } while (!rowset.EndOfSet()) { - ui64 operationId = rowset.GetValue<Schema::ForceTraversals::OperationId>(); - ui64 ownerId = rowset.GetValue<Schema::ForceTraversals::OwnerId>(); - ui64 localPathId = rowset.GetValue<Schema::ForceTraversals::LocalPathId>(); - TString cookie = rowset.GetValue<Schema::ForceTraversals::Cookie>(); - TString columnTags = rowset.GetValue<Schema::ForceTraversals::ColumnTags>(); - TString types = rowset.GetValue<Schema::ForceTraversals::Types>(); + TString operationId = rowset.GetValue<Schema::ForceTraversalOperations::OperationId>(); + TString types = rowset.GetValue<Schema::ForceTraversalOperations::Types>(); - auto pathId = TPathId(ownerId, localPathId); - - TForceTraversal operation { + TForceTraversalOperation operation { .OperationId = operationId, - .Cookie = cookie, - .PathId = pathId, - .ColumnTags = columnTags, + .Tables = {}, .Types = types, .ReplyToActorId = {} }; @@ -228,10 +206,50 @@ struct TStatisticsAggregator::TTxInit : public TTxBase { } } - SA_LOG_D("[" << Self->TabletID() << "] Loaded ForceTraversals: " + SA_LOG_D("[" << Self->TabletID() << "] Loaded ForceTraversalOperations: " << "table count# " << Self->ForceTraversals.size()); } -*/ + + // ForceTraversalTables + { + auto rowset = db.Table<Schema::ForceTraversalTables>().Range().Select(); + if (!rowset.IsReady()) { + return false; + } + + size_t size = 0; + while (!rowset.EndOfSet()) { + ++size; + + TString operationId = rowset.GetValue<Schema::ForceTraversalTables::OperationId>(); + ui64 ownerId = rowset.GetValue<Schema::ForceTraversalTables::OwnerId>(); + ui64 localPathId = rowset.GetValue<Schema::ForceTraversalTables::LocalPathId>(); + TString columnTags = rowset.GetValue<Schema::ForceTraversalTables::ColumnTags>(); + ui64 status = rowset.GetValue<Schema::ForceTraversalTables::Status>(); + + auto pathId = TPathId(ownerId, localPathId); + + TForceTraversalTable operationTable { + .PathId = pathId, + .ColumnTags = columnTags, + .Status = (TForceTraversalTable::EStatus)status + }; + auto forceTraversalOperation = Self->ForceTraversalOperation(operationId); + if (!forceTraversalOperation) { + SA_LOG_E("[" << Self->TabletID() << "] ForceTraversalTables contains unknown operationId: " << operationId); + continue; + } + forceTraversalOperation->Tables.emplace_back(operationTable); + + if (!rowset.Next()) { + return false; + } + } + + SA_LOG_D("[" << Self->TabletID() << "] Loaded ForceTraversalTables: " + << "table count# " << size); + } + return true; } diff --git a/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp b/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp index 341b3196490..f9739cc1bde 100644 --- a/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp +++ b/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp @@ -36,9 +36,12 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase { auto& outRecord = Request->Record; PathIdFromPathId(Self->TraversalTableId.PathId, outRecord.MutablePathId()); - - TVector<ui32> columnTags = Scan<ui32>(SplitString(Self->ForceTraversalColumnTags, ",")); - outRecord.MutableColumnTags()->Add(columnTags.begin(), columnTags.end()); + + const auto forceTraversalTable = Self->CurrentForceTraversalTable(); + if (forceTraversalTable) { + TVector<ui32> columnTags = Scan<ui32>(SplitString(forceTraversalTable->ColumnTags, ",")); + outRecord.MutableColumnTags()->Add(columnTags.begin(), columnTags.end()); + } auto distribution = Self->TabletsForReqDistribution; for (auto& inNode : Record.GetNodes()) { diff --git a/ydb/core/statistics/ut_common/ut_common.cpp b/ydb/core/statistics/ut_common/ut_common.cpp index bdd07b99d9f..241a2330127 100644 --- a/ydb/core/statistics/ut_common/ut_common.cpp +++ b/ydb/core/statistics/ut_common/ut_common.cpp @@ -75,9 +75,6 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool Driver = MakeHolder<NYdb::TDriver>(DriverConfig); Server->GetRuntime()->SetLogPriority(NKikimrServices::STATISTICS, NActors::NLog::PRI_DEBUG); - Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_YQL, NActors::NLog::PRI_DEBUG); - // Server->GetRuntime()->SetLogPriority(NKikimrServices::K, NActors::NLog::PRI_DEBUG); - } TTestEnv::~TTestEnv() { |