aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <145343289+azevaykin@users.noreply.github.com>2024-08-15 17:23:37 +0300
committerGitHub <noreply@github.com>2024-08-15 17:23:37 +0300
commitd24978e629ff59a74c0271e0e42c05853ea78642 (patch)
tree8c4b058fe8fc8c26a7577095bdd97ea9ee2c2d97
parent929e8f5b39dc726b5a1db376198d1370391a3bc8 (diff)
downloadydb-d24978e629ff59a74c0271e0e42c05853ea78642.tar.gz
Statistics: Two force traversal collections in local DB (#7765)
-rw-r--r--ydb/core/statistics/aggregator/aggregator_impl.cpp109
-rw-r--r--ydb/core/statistics/aggregator/aggregator_impl.h30
-rw-r--r--ydb/core/statistics/aggregator/schema.h43
-rw-r--r--ydb/core/statistics/aggregator/tx_analyze_table.cpp76
-rw-r--r--ydb/core/statistics/aggregator/tx_finish_trasersal.cpp13
-rw-r--r--ydb/core/statistics/aggregator/tx_init.cpp88
-rw-r--r--ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp9
-rw-r--r--ydb/core/statistics/ut_common/ut_common.cpp3
8 files changed, 246 insertions, 125 deletions
diff --git a/ydb/core/statistics/aggregator/aggregator_impl.cpp b/ydb/core/statistics/aggregator/aggregator_impl.cpp
index 12ba2dc9c1c..679034e2fa9 100644
--- a/ydb/core/statistics/aggregator/aggregator_impl.cpp
+++ b/ydb/core/statistics/aggregator/aggregator_impl.cpp
@@ -443,8 +443,8 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvAnalyzeStatus::TPtr& ev) {
if (ForceTraversalOperationId == operationId) {
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_IN_PROGRESS);
} else {
- if (std::any_of(ForceTraversals.begin(), ForceTraversals.end(),
- [&operationId](const TForceTraversal& elem) { return elem.OperationId == operationId;})) {
+ auto forceTraversalOperation = ForceTraversalOperation(operationId);
+ if (forceTraversalOperation) {
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_ENQUEUED);
} else {
outRecord.SetStatus(NKikimrStat::TEvAnalyzeStatusResponse::STATUS_NO_OPERATION);
@@ -580,22 +580,35 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
TPathId pathId;
- if (!ForceTraversals.empty() && !LastTraversalWasForce) {
+ if (!LastTraversalWasForce) {
LastTraversalWasForce = true;
- TForceTraversal& operation = ForceTraversals.front();
- pathId = operation.PathId;
+ for (TForceTraversalOperation& operation : ForceTraversals) {
+ for (TForceTraversalTable& operationTable : operation.Tables) {
+ if (operationTable.Status == TForceTraversalTable::EStatus::None) {
+ operationTable.Status = TForceTraversalTable::EStatus::RequestSent;
+ db.Table<Schema::ForceTraversalTables>().Key(operation.OperationId, operationTable.PathId.OwnerId, operationTable.PathId.LocalPathId)
+ .Update(NIceDb::TUpdate<Schema::ForceTraversalTables::Status>((ui64)operationTable.Status));
- ForceTraversalOperationId = operation.OperationId;
- ForceTraversalColumnTags = operation.ColumnTags;
- ForceTraversalTypes = operation.Types;
- ForceTraversalReplyToActorId = operation.ReplyToActorId;
+ pathId = operationTable.PathId;
+ break;
+ }
+ }
+
+ if (!pathId) {
+ SA_LOG_D("[" << TabletID() << "] All the force traversal tables sent the requests. OperationId=" << operation.OperationId);
+ continue;
+ }
- PersistForceTraversal(db);
+ ForceTraversalOperationId = operation.OperationId;
+ }
-// db.Table<Schema::ForceTraversals>().Key(operation.OperationId, operation.PathId.OwnerId, operation.PathId.LocalPathId).Delete();
- ForceTraversals.pop_front();
- } else if (!ScheduleTraversalsByTime.Empty()){
+ if (!pathId) {
+ SA_LOG_D("[" << TabletID() << "] All the force traversal operations sent the requests.");
+ }
+ }
+
+ if (!pathId && !ScheduleTraversalsByTime.Empty()){
LastTraversalWasForce = false;
auto* oldestTable = ScheduleTraversalsByTime.Top();
@@ -606,8 +619,10 @@ void TStatisticsAggregator::ScheduleNextTraversal(NIceDb::TNiceDb& db) {
}
pathId = oldestTable->PathId;
- } else {
- SA_LOG_E("[" << TabletID() << "] No schedule traversal from schemeshard.");
+ }
+
+ if (!pathId) {
+ SA_LOG_E("[" << TabletID() << "] No traversal from schemeshard.");
return;
}
@@ -653,6 +668,15 @@ void TStatisticsAggregator::FinishTraversal(NIceDb::TNiceDb& db) {
}
}
+ auto forceTraversalOperation = CurrentForceTraversalOperation();
+ if (forceTraversalOperation) {
+ bool tablesRemained = std::any_of(forceTraversalOperation->Tables.begin(), forceTraversalOperation->Tables.end(),
+ [](const TForceTraversalTable& elem) { return elem.Status == TForceTraversalTable::EStatus::None;});
+ if (!tablesRemained) {
+ DeleteForceTraversalOperation(ForceTraversalOperationId, db);
+ }
+ }
+
ResetTraversalState(db);
}
@@ -660,6 +684,50 @@ TString TStatisticsAggregator::LastTraversalWasForceString() const {
return LastTraversalWasForce ? "force" : "schedule";
}
+TStatisticsAggregator::TForceTraversalOperation* TStatisticsAggregator::CurrentForceTraversalOperation() {
+ return ForceTraversalOperation(ForceTraversalOperationId);
+}
+
+TStatisticsAggregator::TForceTraversalOperation* TStatisticsAggregator::ForceTraversalOperation(const TString& operationId) {
+ auto forceTraversalOperation = std::find_if(ForceTraversals.begin(), ForceTraversals.end(),
+ [operationId](const TForceTraversalOperation& elem) { return elem.OperationId == operationId;});
+
+ if (forceTraversalOperation == ForceTraversals.end()) {
+ return nullptr;
+ } else {
+ return &*forceTraversalOperation;
+ }
+}
+
+void TStatisticsAggregator::DeleteForceTraversalOperation(const TString& operationId, NIceDb::TNiceDb& db) {
+ db.Table<Schema::ForceTraversalOperations>().Key(ForceTraversalOperationId).Delete();
+
+ auto operation = ForceTraversalOperation(operationId);
+ for(const TForceTraversalTable& table : operation->Tables) {
+ db.Table<Schema::ForceTraversalTables>().Key(operationId, table.PathId.OwnerId, table.PathId.LocalPathId).Delete();
+ }
+
+ ForceTraversals.remove_if([operationId](const TForceTraversalOperation& elem) { return elem.OperationId == operationId;});
+}
+
+TStatisticsAggregator::TForceTraversalTable* TStatisticsAggregator::ForceTraversalTable(const TString& operationId, const TPathId& pathId) {
+ for (TForceTraversalOperation& operation : ForceTraversals) {
+ if (operation.OperationId == operationId) {
+ for (TForceTraversalTable& operationTable : operation.Tables) {
+ if (operationTable.PathId == pathId) {
+ return &operationTable;
+ }
+ }
+ }
+ }
+
+ return nullptr;
+}
+
+TStatisticsAggregator::TForceTraversalTable* TStatisticsAggregator::CurrentForceTraversalTable() {
+ return ForceTraversalTable(ForceTraversalOperationId, TraversalTableId.PathId);
+}
+
void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) {
db.Table<Schema::SysParams>().Key(id).Update(
NIceDb::TUpdate<Schema::SysParams::Value>(value));
@@ -676,13 +744,6 @@ void TStatisticsAggregator::PersistStartKey(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_TraversalStartKey, TraversalStartKey.GetBuffer());
}
-void TStatisticsAggregator::PersistForceTraversal(NIceDb::TNiceDb& db) {
- PersistSysParam(db, Schema::SysParam_ForceTraversalOperationId, ToString(ForceTraversalOperationId));
- PersistSysParam(db, Schema::SysParam_ForceTraversalCookie, ForceTraversalOperationId);
- PersistSysParam(db, Schema::SysParam_ForceTraversalColumnTags, ToString(ForceTraversalColumnTags));
- PersistSysParam(db, Schema::SysParam_ForceTraversalTypes, ToString(ForceTraversalTypes));
-}
-
void TStatisticsAggregator::PersistGlobalTraversalRound(NIceDb::TNiceDb& db) {
PersistSysParam(db, Schema::SysParam_GlobalTraversalRound, ToString(GlobalTraversalRound));
}
@@ -690,16 +751,12 @@ void TStatisticsAggregator::PersistGlobalTraversalRound(NIceDb::TNiceDb& db) {
void TStatisticsAggregator::ResetTraversalState(NIceDb::TNiceDb& db) {
ForceTraversalOperationId.clear();
TraversalTableId.PathId = TPathId();
- ForceTraversalColumnTags.clear();
- ForceTraversalTypes.clear();
TraversalStartTime = TInstant::MicroSeconds(0);
PersistTraversal(db);
TraversalStartKey = TSerializedCellVec();
PersistStartKey(db);
- ForceTraversalReplyToActorId = {};
-
for (auto& [tag, _] : CountMinSketches) {
db.Table<Schema::ColumnStatistics>().Key(tag).Delete();
}
diff --git a/ydb/core/statistics/aggregator/aggregator_impl.h b/ydb/core/statistics/aggregator/aggregator_impl.h
index 60dc0998a85..a572a57a1b7 100644
--- a/ydb/core/statistics/aggregator/aggregator_impl.h
+++ b/ydb/core/statistics/aggregator/aggregator_impl.h
@@ -145,7 +145,6 @@ private:
void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value);
void PersistTraversal(NIceDb::TNiceDb& db);
- void PersistForceTraversal(NIceDb::TNiceDb& db);
void PersistStartKey(NIceDb::TNiceDb& db);
void PersistGlobalTraversalRound(NIceDb::TNiceDb& db);
@@ -240,8 +239,6 @@ private:
std::queue<TEvStatistics::TEvRequestStats::TPtr> PendingRequests;
bool ProcessUrgentInFlight = false;
- TActorId ForceTraversalReplyToActorId = {};
-
bool IsSchemeshardSeen = false;
bool IsStatisticsTableCreated = false;
bool PendingSaveStatistics = false;
@@ -306,8 +303,7 @@ private:
private: // stored in local db
TString ForceTraversalOperationId;
- TString ForceTraversalColumnTags;
- TString ForceTraversalTypes;
+
TTableId TraversalTableId;
bool TraversalIsColumnTable = false;
TSerializedCellVec TraversalStartKey;
@@ -323,14 +319,32 @@ private: // stored in local db
TTraversalsByTime;
TTraversalsByTime ScheduleTraversalsByTime;
- struct TForceTraversal {
- TString OperationId;
+ struct TForceTraversalTable {
TPathId PathId;
TString ColumnTags;
+
+ enum class EStatus : ui8 {
+ None,
+ RequestSent,
+ ResponseReceived,
+ };
+ EStatus Status = EStatus::None;
+ };
+ struct TForceTraversalOperation {
+ TString OperationId;
+ std::list<TForceTraversalTable> Tables;
TString Types;
TActorId ReplyToActorId;
};
- std::list<TForceTraversal> ForceTraversals;
+ std::list<TForceTraversalOperation> ForceTraversals;
+
+private:
+ TForceTraversalOperation* CurrentForceTraversalOperation();
+ TForceTraversalOperation* ForceTraversalOperation(const TString& operationId);
+ void DeleteForceTraversalOperation(const TString& operationId, NIceDb::TNiceDb& db);
+
+ TForceTraversalTable* ForceTraversalTable(const TString& operationId, const TPathId& pathId);
+ TForceTraversalTable* CurrentForceTraversalTable();
};
} // NKikimr::NStat
diff --git a/ydb/core/statistics/aggregator/schema.h b/ydb/core/statistics/aggregator/schema.h
index 036902199f1..d897dc8cd8b 100644
--- a/ydb/core/statistics/aggregator/schema.h
+++ b/ydb/core/statistics/aggregator/schema.h
@@ -45,32 +45,45 @@ struct TAggregatorSchema : NIceDb::Schema {
IsColumnTable
>;
};
-/*
- struct ForceTraversals : Table<5> {
- struct OperationId : Column<1, NScheme::NTypeIds::Uint64> {};
+
+ // struct ForceTraversals : Table<5>
+
+ struct ForceTraversalOperations : Table<6> {
+ struct OperationId : Column<1, NScheme::NTypeIds::String> {};
+ struct Types : Column<2, NScheme::NTypeIds::String> {};
+
+ using TKey = TableKey<OperationId>;
+ using TColumns = TableColumns<
+ OperationId,
+ Types
+ >;
+ };
+
+ struct ForceTraversalTables : Table<7> {
+ struct OperationId : Column<1, NScheme::NTypeIds::String> {};
struct OwnerId : Column<2, NScheme::NTypeIds::Uint64> {};
struct LocalPathId : Column<3, NScheme::NTypeIds::Uint64> {};
- struct Cookie : Column<4, NScheme::NTypeIds::String> {};
- struct ColumnTags : Column<5, NScheme::NTypeIds::String> {};
- struct Types : Column<6, NScheme::NTypeIds::String> {};
+ struct ColumnTags : Column<4, NScheme::NTypeIds::String> {};
+ struct Status : Column<5, NScheme::NTypeIds::Uint64> {};
using TKey = TableKey<OperationId, OwnerId, LocalPathId>;
using TColumns = TableColumns<
OperationId,
OwnerId,
LocalPathId,
- Cookie,
ColumnTags,
- Types
+ Status
>;
};
-*/
+
using TTables = SchemaTables<
SysParams,
BaseStatistics,
ColumnStatistics,
- ScheduleTraversals
-// ForceTraversals
+ ScheduleTraversals,
+// ForceTraversals,
+ ForceTraversalOperations,
+ ForceTraversalTables
>;
using TSettings = SchemaSettings<
@@ -80,12 +93,12 @@ struct TAggregatorSchema : NIceDb::Schema {
static constexpr ui64 SysParam_Database = 1;
static constexpr ui64 SysParam_TraversalStartKey = 2;
- static constexpr ui64 SysParam_ForceTraversalOperationId = 3;
+ // deprecated 3
static constexpr ui64 SysParam_TraversalTableOwnerId = 4;
static constexpr ui64 SysParam_TraversalTableLocalPathId = 5;
- static constexpr ui64 SysParam_ForceTraversalCookie = 6;
- static constexpr ui64 SysParam_ForceTraversalColumnTags = 7;
- static constexpr ui64 SysParam_ForceTraversalTypes = 8;
+ // deprecated 6
+ // deprecated 7
+ // deprecated 8
static constexpr ui64 SysParam_TraversalStartTime = 9;
// deprecated 10
static constexpr ui64 SysParam_TraversalIsColumnTable = 11;
diff --git a/ydb/core/statistics/aggregator/tx_analyze_table.cpp b/ydb/core/statistics/aggregator/tx_analyze_table.cpp
index 6dde96c8e54..0352c72ed2e 100644
--- a/ydb/core/statistics/aggregator/tx_analyze_table.cpp
+++ b/ydb/core/statistics/aggregator/tx_analyze_table.cpp
@@ -28,48 +28,64 @@ struct TStatisticsAggregator::TTxAnalyzeTable : public TTxBase {
NIceDb::TNiceDb db(txc.DB);
const TString operationId = Record.GetOperationId();
- const TString types = JoinVectorIntoString(TVector<ui32>(Record.GetTypes().begin(), Record.GetTypes().end()), ",");
-
- for (const auto& table : Record.GetTables()) {
- const TPathId pathId = PathIdFromPathId(table.GetPathId());
- const TString columnTags = JoinVectorIntoString(TVector<ui32>{table.GetColumnTags().begin(),table.GetColumnTags().end()},",");
- // check existing force traversal with the same cookie and path
- auto forceTraversal = std::find_if(Self->ForceTraversals.begin(), Self->ForceTraversals.end(),
- [&pathId, &operationId](const TForceTraversal& elem) {
- return elem.PathId == pathId
- && elem.OperationId == operationId;});
+ // check existing force traversal with the same OperationId
+ const auto existingOperation = Self->ForceTraversalOperation(operationId);
- // update existing force traversal
- if (forceTraversal != Self->ForceTraversals.end()) {
- SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Update existing force traversal. PathId " << pathId << " , ReplyToActorId " << ReplyToActorId);
- forceTraversal->ReplyToActorId = ReplyToActorId;
+ // update existing force traversal
+ if (existingOperation) {
+ if (existingOperation->Tables.size() == Record.TablesSize()) {
+ SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Update existing force traversal. OperationId " << operationId << " , ReplyToActorId " << ReplyToActorId);
+ existingOperation->ReplyToActorId = ReplyToActorId;
return true;
+ } else {
+ SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Delete broken force traversal. OperationId " << operationId << " , ReplyToActorId " << ReplyToActorId);
+ Self->DeleteForceTraversalOperation(operationId, db);
}
+ }
+
+ SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Create new force traversal operation, OperationId=" << operationId);
+ const TString types = JoinVectorIntoString(TVector<ui32>(Record.GetTypes().begin(), Record.GetTypes().end()), ",");
- SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Create new force traversal operation for pathId " << pathId);
+ // create new force trasersal
+ TForceTraversalOperation operation {
+ .OperationId = operationId,
+ .Tables = {},
+ .Types = types,
+ .ReplyToActorId = ReplyToActorId
+ };
- // create new force trasersal
- TForceTraversal operation {
- .OperationId = operationId,
+ for (const auto& table : Record.GetTables()) {
+ const TPathId pathId = PathIdFromPathId(table.GetPathId());
+ const TString columnTags = JoinVectorIntoString(TVector<ui32>{table.GetColumnTags().begin(),table.GetColumnTags().end()},",");
+ const TForceTraversalTable::EStatus status = TForceTraversalTable::EStatus::None;
+
+ SA_LOG_D("[" << Self->TabletID() << "] TTxAnalyzeTable::Execute. Create new force traversal operation, OperationId=" << operationId << " , PathId " << pathId);
+
+ // create new force traversal
+ TForceTraversalTable operationTable {
.PathId = pathId,
.ColumnTags = columnTags,
- .Types = types,
- .ReplyToActorId = ReplyToActorId
+ .Status = status
};
- Self->ForceTraversals.emplace_back(operation);
-/*
- db.Table<Schema::ForceTraversals>().Key(Self->NextForceTraversalOperationId, pathId.OwnerId, pathId.LocalPathId).Update(
- NIceDb::TUpdate<Schema::ForceTraversals::OperationId>(Self->NextForceTraversalOperationId),
- NIceDb::TUpdate<Schema::ForceTraversals::OwnerId>(pathId.OwnerId),
- NIceDb::TUpdate<Schema::ForceTraversals::LocalPathId>(pathId.LocalPathId),
- NIceDb::TUpdate<Schema::ForceTraversals::Cookie>(cookie),
- NIceDb::TUpdate<Schema::ForceTraversals::ColumnTags>(columnTags),
- NIceDb::TUpdate<Schema::ForceTraversals::Types>(types)
+ operation.Tables.emplace_back(operationTable);
+
+ db.Table<Schema::ForceTraversalTables>().Key(operationId, pathId.OwnerId, pathId.LocalPathId).Update(
+ NIceDb::TUpdate<Schema::ForceTraversalTables::OperationId>(operationId),
+ NIceDb::TUpdate<Schema::ForceTraversalTables::OwnerId>(pathId.OwnerId),
+ NIceDb::TUpdate<Schema::ForceTraversalTables::LocalPathId>(pathId.LocalPathId),
+ NIceDb::TUpdate<Schema::ForceTraversalTables::ColumnTags>(columnTags),
+ NIceDb::TUpdate<Schema::ForceTraversalTables::Status>((ui64)status)
);
-*/
}
+ Self->ForceTraversals.emplace_back(operation);
+
+ db.Table<Schema::ForceTraversalOperations>().Key(operationId).Update(
+ NIceDb::TUpdate<Schema::ForceTraversalOperations::OperationId>(operationId),
+ NIceDb::TUpdate<Schema::ForceTraversalOperations::Types>(types)
+ );
+
return true;
}
diff --git a/ydb/core/statistics/aggregator/tx_finish_trasersal.cpp b/ydb/core/statistics/aggregator/tx_finish_trasersal.cpp
index b9f7fc597fa..7feffc80137 100644
--- a/ydb/core/statistics/aggregator/tx_finish_trasersal.cpp
+++ b/ydb/core/statistics/aggregator/tx_finish_trasersal.cpp
@@ -13,8 +13,12 @@ struct TStatisticsAggregator::TTxFinishTraversal : public TTxBase {
: TTxBase(self)
, OperationId(self->ForceTraversalOperationId)
, PathId(self->TraversalTableId.PathId)
- , ReplyToActorId(self->ForceTraversalReplyToActorId)
- {}
+ {
+ auto forceTraversal = Self->CurrentForceTraversalOperation();
+ if (forceTraversal) {
+ ReplyToActorId = forceTraversal->ReplyToActorId;
+ }
+ }
TTxType GetTxType() const override { return TXTYPE_FINISH_TRAVERSAL; }
@@ -36,10 +40,9 @@ struct TStatisticsAggregator::TTxFinishTraversal : public TTxBase {
return;
}
- bool operationsRemain = std::any_of(Self->ForceTraversals.begin(), Self->ForceTraversals.end(),
- [this](const TForceTraversal& elem) { return elem.OperationId == OperationId;});
+ auto forceTraversalRemained = Self->ForceTraversalOperation(OperationId);
- if (operationsRemain) {
+ if (forceTraversalRemained) {
SA_LOG_D("[" << Self->TabletID() << "] TTxFinishTraversal::Complete. Don't send TEvAnalyzeResponse. " <<
"There are pending operations, OperationId " << OperationId << " , ActorId=" << ReplyToActorId);
} else {
diff --git a/ydb/core/statistics/aggregator/tx_init.cpp b/ydb/core/statistics/aggregator/tx_init.cpp
index e4012d7bbe3..58ec7b8040d 100644
--- a/ydb/core/statistics/aggregator/tx_init.cpp
+++ b/ydb/core/statistics/aggregator/tx_init.cpp
@@ -22,13 +22,15 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
auto baseStatisticsRowset = db.Table<Schema::BaseStatistics>().Range().Select();
auto statisticsRowset = db.Table<Schema::ColumnStatistics>().Range().Select();
auto scheduleTraversalRowset = db.Table<Schema::ScheduleTraversals>().Range().Select();
-// auto forceTraversalRowset = db.Table<Schema::ForceTraversals>().Range().Select();
+ auto forceTraversalOperationsRowset = db.Table<Schema::ForceTraversalOperations>().Range().Select();
+ auto forceTraversalTablesRowset = db.Table<Schema::ForceTraversalTables>().Range().Select();
if (!sysParamsRowset.IsReady() ||
!baseStatisticsRowset.IsReady() ||
!statisticsRowset.IsReady() ||
- !scheduleTraversalRowset.IsReady())
-// !forceTraversalRowset.IsReady())
+ !scheduleTraversalRowset.IsReady() ||
+ !forceTraversalOperationsRowset.IsReady() ||
+ !forceTraversalTablesRowset.IsReady())
{
return false;
}
@@ -54,11 +56,6 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
Self->TraversalStartKey = TSerializedCellVec(value);
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal start key");
break;
- case Schema::SysParam_ForceTraversalOperationId: {
- Self->ForceTraversalOperationId = value;
- SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal operation id: " << value);
- break;
- }
case Schema::SysParam_TraversalTableOwnerId:
Self->TraversalTableId.PathId.OwnerId = FromString<ui64>(value);
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal table owner id: "
@@ -69,16 +66,6 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal table local path id: "
<< Self->TraversalTableId.PathId.LocalPathId);
break;
- case Schema::SysParam_ForceTraversalColumnTags: {
- Self->ForceTraversalColumnTags = value;
- SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal columns tags: " << value);
- break;
- }
- case Schema::SysParam_ForceTraversalTypes: {
- Self->ForceTraversalTypes = value;
- SA_LOG_D("[" << Self->TabletID() << "] Loaded traversal types: " << value);
- break;
- }
case Schema::SysParam_TraversalStartTime: {
auto us = FromString<ui64>(value);
Self->TraversalStartTime = TInstant::MicroSeconds(us);
@@ -193,31 +180,22 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
<< "table count# " << Self->ScheduleTraversals.size());
}
- // ForceTraversals
-/*
+ // ForceTraversalOperations
{
Self->ForceTraversals.clear();
- auto rowset = db.Table<Schema::ForceTraversals>().Range().Select();
+ auto rowset = db.Table<Schema::ForceTraversalOperations>().Range().Select();
if (!rowset.IsReady()) {
return false;
}
while (!rowset.EndOfSet()) {
- ui64 operationId = rowset.GetValue<Schema::ForceTraversals::OperationId>();
- ui64 ownerId = rowset.GetValue<Schema::ForceTraversals::OwnerId>();
- ui64 localPathId = rowset.GetValue<Schema::ForceTraversals::LocalPathId>();
- TString cookie = rowset.GetValue<Schema::ForceTraversals::Cookie>();
- TString columnTags = rowset.GetValue<Schema::ForceTraversals::ColumnTags>();
- TString types = rowset.GetValue<Schema::ForceTraversals::Types>();
+ TString operationId = rowset.GetValue<Schema::ForceTraversalOperations::OperationId>();
+ TString types = rowset.GetValue<Schema::ForceTraversalOperations::Types>();
- auto pathId = TPathId(ownerId, localPathId);
-
- TForceTraversal operation {
+ TForceTraversalOperation operation {
.OperationId = operationId,
- .Cookie = cookie,
- .PathId = pathId,
- .ColumnTags = columnTags,
+ .Tables = {},
.Types = types,
.ReplyToActorId = {}
};
@@ -228,10 +206,50 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
}
}
- SA_LOG_D("[" << Self->TabletID() << "] Loaded ForceTraversals: "
+ SA_LOG_D("[" << Self->TabletID() << "] Loaded ForceTraversalOperations: "
<< "table count# " << Self->ForceTraversals.size());
}
-*/
+
+ // ForceTraversalTables
+ {
+ auto rowset = db.Table<Schema::ForceTraversalTables>().Range().Select();
+ if (!rowset.IsReady()) {
+ return false;
+ }
+
+ size_t size = 0;
+ while (!rowset.EndOfSet()) {
+ ++size;
+
+ TString operationId = rowset.GetValue<Schema::ForceTraversalTables::OperationId>();
+ ui64 ownerId = rowset.GetValue<Schema::ForceTraversalTables::OwnerId>();
+ ui64 localPathId = rowset.GetValue<Schema::ForceTraversalTables::LocalPathId>();
+ TString columnTags = rowset.GetValue<Schema::ForceTraversalTables::ColumnTags>();
+ ui64 status = rowset.GetValue<Schema::ForceTraversalTables::Status>();
+
+ auto pathId = TPathId(ownerId, localPathId);
+
+ TForceTraversalTable operationTable {
+ .PathId = pathId,
+ .ColumnTags = columnTags,
+ .Status = (TForceTraversalTable::EStatus)status
+ };
+ auto forceTraversalOperation = Self->ForceTraversalOperation(operationId);
+ if (!forceTraversalOperation) {
+ SA_LOG_E("[" << Self->TabletID() << "] ForceTraversalTables contains unknown operationId: " << operationId);
+ continue;
+ }
+ forceTraversalOperation->Tables.emplace_back(operationTable);
+
+ if (!rowset.Next()) {
+ return false;
+ }
+ }
+
+ SA_LOG_D("[" << Self->TabletID() << "] Loaded ForceTraversalTables: "
+ << "table count# " << size);
+ }
+
return true;
}
diff --git a/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp b/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp
index 341b3196490..f9739cc1bde 100644
--- a/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp
+++ b/ydb/core/statistics/aggregator/tx_response_tablet_distribution.cpp
@@ -36,9 +36,12 @@ struct TStatisticsAggregator::TTxResponseTabletDistribution : public TTxBase {
auto& outRecord = Request->Record;
PathIdFromPathId(Self->TraversalTableId.PathId, outRecord.MutablePathId());
-
- TVector<ui32> columnTags = Scan<ui32>(SplitString(Self->ForceTraversalColumnTags, ","));
- outRecord.MutableColumnTags()->Add(columnTags.begin(), columnTags.end());
+
+ const auto forceTraversalTable = Self->CurrentForceTraversalTable();
+ if (forceTraversalTable) {
+ TVector<ui32> columnTags = Scan<ui32>(SplitString(forceTraversalTable->ColumnTags, ","));
+ outRecord.MutableColumnTags()->Add(columnTags.begin(), columnTags.end());
+ }
auto distribution = Self->TabletsForReqDistribution;
for (auto& inNode : Record.GetNodes()) {
diff --git a/ydb/core/statistics/ut_common/ut_common.cpp b/ydb/core/statistics/ut_common/ut_common.cpp
index bdd07b99d9f..241a2330127 100644
--- a/ydb/core/statistics/ut_common/ut_common.cpp
+++ b/ydb/core/statistics/ut_common/ut_common.cpp
@@ -75,9 +75,6 @@ TTestEnv::TTestEnv(ui32 staticNodes, ui32 dynamicNodes, ui32 storagePools, bool
Driver = MakeHolder<NYdb::TDriver>(DriverConfig);
Server->GetRuntime()->SetLogPriority(NKikimrServices::STATISTICS, NActors::NLog::PRI_DEBUG);
- Server->GetRuntime()->SetLogPriority(NKikimrServices::KQP_YQL, NActors::NLog::PRI_DEBUG);
- // Server->GetRuntime()->SetLogPriority(NKikimrServices::K, NActors::NLog::PRI_DEBUG);
-
}
TTestEnv::~TTestEnv() {