aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <chertus@gmail.com>2022-07-05 21:00:34 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2022-07-05 21:00:34 +0300
commit8ce18c6635e7cee5d50bc9151b3b37b84da887c4 (patch)
treed8b560af713a43fbf433a468be1d3d518c335bed
parent5bf469f233cf0b3c464e5edb654c62e4b6617b96 (diff)
downloadydb-8ce18c6635e7cee5d50bc9151b3b37b84da887c4.tar.gz
KIKIMR-14971 KIKIMR-15038: Backport overload and stats for ColumnShards into 22.2
REVIEW: 2661554 REVIEW: 2685632 x-ydb-stable-ref: 6581b1d410c8efb66c68d82e6d64c8f1728014c6
-rw-r--r--ydb/core/protos/counters_columnshard.proto6
-rw-r--r--ydb/core/protos/tx_columnshard.proto6
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp179
-rw-r--r--ydb/core/tx/columnshard/columnshard.h3
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp46
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp75
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp135
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h48
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h32
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h17
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h1
-rw-r--r--ydb/core/tx/columnshard/write_actor.cpp22
-rw-r--r--ydb/core/tx/datashard/datashard.h5
-rw-r--r--ydb/core/tx/schemeshard/operation_queue_timer.h2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__compaction.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp31
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__table_stats.cpp75
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h10
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.cpp75
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_info_types.h243
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_path_describer.cpp23
-rw-r--r--ydb/core/tx/schemeshard/ut_compaction.cpp40
-rw-r--r--ydb/core/tx/schemeshard/ut_olap.cpp291
-rw-r--r--ydb/core/tx/schemeshard/ut_olap/ya.make1
28 files changed, 808 insertions, 574 deletions
diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto
index 57c2f4ecf1..22d0c8e41b 100644
--- a/ydb/core/protos/counters_columnshard.proto
+++ b/ydb/core/protos/counters_columnshard.proto
@@ -48,6 +48,7 @@ enum ESimpleCounters {
COUNTER_EVICTED_ROWS = 38 [(CounterOpts) = {Name: "Index/EvictedRows"}];
COUNTER_EVICTED_BYTES = 39 [(CounterOpts) = {Name: "Index/EvictedBytes"}];
COUNTER_EVICTED_RAW_BYTES = 40 [(CounterOpts) = {Name: "Index/EvictedBytesRaw"}];
+ COUNTER_WRITES_IN_FLY = 41 [(CounterOpts) = {Name: "WritesInFly"}];
}
enum ECumulativeCounters {
@@ -118,6 +119,11 @@ enum ECumulativeCounters {
COUNTER_EVICTION_PORTIONS_WRITTEN = 64 [(CounterOpts) = {Name: "EvictionPortionsWritten"}];
COUNTER_EVICTION_BLOBS_WRITTEN = 65 [(CounterOpts) = {Name: "EvictionBlobsWritten"}];
COUNTER_EVICTION_BYTES_WRITTEN = 66 [(CounterOpts) = {Name: "EvictionBytesWritten"}];
+ COUNTER_EXPORT_SUCCESS = 67 [(CounterOpts) = {Name: "ExportSuccess"}];
+ COUNTER_EXPORT_FAIL = 68 [(CounterOpts) = {Name: "ExportFail"}];
+ COUNTER_FORGET_SUCCESS = 69 [(CounterOpts) = {Name: "ForgetSuccess"}];
+ COUNTER_FORGET_FAIL = 70 [(CounterOpts) = {Name: "ForgetFail"}];
+ COUNTER_WRITE_OVERLOAD = 71 [(CounterOpts) = {Name: "WriteOverload"}];
}
enum EPercentileCounters {
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto
index f79c256a5b..09f0b82dcd 100644
--- a/ydb/core/protos/tx_columnshard.proto
+++ b/ydb/core/protos/tx_columnshard.proto
@@ -156,6 +156,7 @@ message TCommitTxBody {
message TInitShard {
optional uint32 DataChannelCount = 1;
+ optional uint64 StorePathId = 2;
}
message TSchemaPresetVersionInfo {
@@ -217,8 +218,9 @@ message TAlterStore {
optional NKikimrSchemeOp.TAlterColumnStore AlterBody = 1;
repeated uint32 DroppedSchemaPresets = 2;
repeated NKikimrSchemeOp.TColumnTableSchemaPreset SchemaPresets = 3;
- repeated uint32 DroppedTtlSettingsPresets = 4;
- repeated NKikimrSchemeOp.TColumnTableTtlSettingsPreset TtlSettingsPresets = 5;
+ repeated uint32 Reserved_4 = 4; // DroppedTtlSettingsPresets, deprecated
+ repeated NKikimrSchemeOp.TColumnTableTtlSettingsPreset Reserved_5 = 5; // TtlSettingsPresets, deprecated
+ optional uint64 StorePathId = 6;
}
message TSchemaSeqNo {
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 0db4b3cc76..32219f0604 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -11,17 +11,6 @@ IActor* CreateColumnShard(const TActorId& tablet, TTabletStorageInfo* info) {
namespace NKikimr::NColumnShard {
-IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent);
-IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent);
-IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent);
-IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
- const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
- TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline = TInstant::Max());
-IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
- const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
- TAutoPtr<TEvPrivate::TEvWriteIndex> ev, const TInstant& deadline = TInstant::Max());
-IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId);
-
void TColumnShard::BecomeBroken(const TActorContext& ctx)
{
Become(&TThis::StateBroken);
@@ -62,6 +51,14 @@ void TColumnShard::Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext&
void TColumnShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&) {
auto tabletId = ev->Get()->TabletId;
+ auto clientId = ev->Get()->ClientId;
+
+ if (clientId == StatsReportPipe) {
+ if (ev->Get()->Status != NKikimrProto::OK) {
+ StatsReportPipe = TActorId();
+ }
+ return;
+ }
if (PipeClientCache->OnConnect(ev)) {
LOG_S_DEBUG("Connected to tablet at " << TabletID() << ", remote " << tabletId);
@@ -73,8 +70,15 @@ void TColumnShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TAc
void TColumnShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext&) {
auto tabletId = ev->Get()->TabletId;
+ auto clientId = ev->Get()->ClientId;
+
LOG_S_DEBUG("Client pipe reset at " << TabletID() << ", remote " << tabletId);
+ if (clientId == StatsReportPipe) {
+ StatsReportPipe = TActorId();
+ return;
+ }
+
PipeClientCache->OnDisconnect(ev);
}
@@ -105,47 +109,6 @@ void TColumnShard::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorCo
Execute(new TTxPlanStep(this, ev), ctx);
}
-// EvWrite -> WriteActor (attach BlobId without proto changes) -> EvWrite
-void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
- OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels));
-
- auto& data = Proto(ev->Get()).GetData();
- const ui64 tableId = ev->Get()->Record.GetTableId();
- bool error = data.empty() || data.size() > TLimits::MAX_BLOB_SIZE || !PrimaryIndex || !IsTableWritable(tableId)
- || ev->Get()->PutStatus == NKikimrProto::ERROR;
-
- if (error) {
- LOG_S_WARN("Write (fail) " << data.size() << " bytes at tablet " << TabletID());
-
- ev->Get()->PutStatus = NKikimrProto::ERROR;
- Execute(new TTxWrite(this, ev), ctx);
- } else if (InsertTable->IsOverloaded(tableId)) {
- LOG_S_INFO("Write (overload) " << data.size() << " bytes for table " << tableId << " at tablet " << TabletID());
-
- ev->Get()->PutStatus = NKikimrProto::TRYLATER;
- Execute(new TTxWrite(this, ev), ctx);
- } else if (ev->Get()->BlobId.IsValid()) {
- LOG_S_DEBUG("Write (record) " << data.size() << " bytes at tablet " << TabletID());
-
- Execute(new TTxWrite(this, ev), ctx);
- } else {
- if (IsAnyChannelYellowStop()) {
- LOG_S_ERROR("Write (out of disk space) at tablet " << TabletID());
-
- IncCounter(COUNTER_OUT_OF_SPACE);
- ev->Get()->PutStatus = NKikimrProto::TRYLATER;
- Execute(new TTxWrite(this, ev), ctx);
- } else {
- LOG_S_DEBUG("Write (blob) " << data.size() << " bytes at tablet " << TabletID());
-
- ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize;
-
- ctx.Register(CreateWriteActor(TabletID(), PrimaryIndex->GetIndexInfo(), ctx.SelfID,
- BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
- }
- }
-}
-
void TColumnShard::Handle(TEvColumnShard::TEvRead::TPtr& ev, const TActorContext& ctx) {
const auto* msg = ev->Get();
TRowVersion readVersion(msg->Record.GetPlanStep(), msg->Record.GetTxId());
@@ -253,6 +216,35 @@ void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorC
ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup());
}
+void TColumnShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext&) {
+ const auto* msg = ev->Get();
+ Y_VERIFY(msg->TabletId == TabletID());
+ MediatorTimeCastEntry = msg->Entry;
+ Y_VERIFY(MediatorTimeCastEntry);
+ LOG_S_DEBUG("Registered with mediator time cast at tablet " << TabletID());
+
+ RescheduleWaitingReads();
+}
+
+void TColumnShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext&) {
+ const auto* msg = ev->Get();
+ Y_VERIFY(msg->TabletId == TabletID());
+
+ Y_VERIFY(MediatorTimeCastEntry);
+ ui64 step = MediatorTimeCastEntry->Get(TabletID());
+ LOG_S_DEBUG("Notified by mediator time cast with PlanStep# " << step << " at tablet " << TabletID());
+
+ for (auto it = MediatorTimeCastWaitingSteps.begin(); it != MediatorTimeCastWaitingSteps.end();) {
+ if (step < *it) {
+ break;
+ }
+ it = MediatorTimeCastWaitingSteps.erase(it);
+ }
+
+ RescheduleWaitingReads();
+ EnqueueBackgroundActivities(true);
+}
+
void TColumnShard::UpdateBlobMangerCounters() {
const auto counters = BlobManager->GetCountersUpdate();
IncCounter(COUNTER_BLOB_MANAGER_GC_REQUESTS, counters.GcRequestsSent);
@@ -319,6 +311,22 @@ void TColumnShard::UpdateIndexCounters() {
SetCounter(COUNTER_EVICTED_RAW_BYTES, stats.Evicted.RawBytes);
}
+ui64 TColumnShard::MemoryUsage() const {
+ ui64 memory =
+ Tables.size() * sizeof(TTableInfo) +
+ PathsToDrop.size() * sizeof(ui64) +
+ Ttl.PathsCount() * sizeof(TTtl::TDescription) +
+ SchemaPresets.size() * sizeof(TSchemaPreset) +
+ AltersInFlight.size() * sizeof(TAlterMeta) +
+ CommitsInFlight.size() * sizeof(TCommitMeta) +
+ TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) +
+ TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData);
+ if (PrimaryIndex) {
+ memory += PrimaryIndex->MemoryUsage();
+ }
+ return memory;
+}
+
void TColumnShard::UpdateResourceMetrics(const TUsage& usage) {
auto * metrics = Executor()->GetResourceMetrics();
if (!metrics) {
@@ -333,19 +341,7 @@ void TColumnShard::UpdateResourceMetrics(const TUsage& usage) {
TabletCounters->Simple()[COUNTER_SPLIT_COMPACTED_BYTES].Get() +
TabletCounters->Simple()[COUNTER_INACTIVE_BYTES].Get();
- ui64 memory =
- Tables.size() * sizeof(TTableInfo) +
- PathsToDrop.size() * sizeof(ui64) +
- Ttl.PathsCount() * sizeof(TTtl::TDescription) +
- SchemaPresets.size() * sizeof(TSchemaPreset) +
- //TtlSettingsPresets.size() * sizeof(TTtlSettingsPreset) +
- AltersInFlight.size() * sizeof(TAlterMeta) +
- CommitsInFlight.size() * sizeof(TCommitMeta) +
- TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) +
- TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData);
- if (PrimaryIndex) {
- memory += PrimaryIndex->MemoryUsage();
- }
+ ui64 memory = MemoryUsage();
const TActorContext& ctx = TlsActivationContext->AsActorContext();
TInstant now = AppData(ctx)->TimeProvider->Now();
@@ -360,4 +356,57 @@ void TColumnShard::UpdateResourceMetrics(const TUsage& usage) {
metrics->TryUpdate(ctx);
}
+void TColumnShard::SendPeriodicStats(const TActorContext& ctx) {
+ if (!CurrentSchemeShardId || !StorePathId) {
+ LOG_S_DEBUG("Disabled periodic stats at tablet " << TabletID());
+ return;
+ }
+
+ TInstant now = AppData(ctx)->TimeProvider->Now();
+ if (LastStatsReport + StatsReportInterval > now) {
+ return;
+ }
+ LastStatsReport = now;
+
+ if (!StatsReportPipe) {
+ LOG_S_DEBUG("Create periodic stats pipe to " << CurrentSchemeShardId << " at tablet " << TabletID());
+ NTabletPipe::TClientConfig clientConfig;
+ StatsReportPipe = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, CurrentSchemeShardId, clientConfig));
+ }
+
+ auto ev = std::make_unique<TEvDataShard::TEvPeriodicTableStats>(TabletID(), StorePathId);
+ {
+ ev->Record.SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready
+ ev->Record.SetGeneration(Executor()->Generation());
+ ev->Record.SetRound(StatsReportRound++);
+ ev->Record.SetNodeId(ctx.ExecutorThread.ActorSystem->NodeId);
+ ev->Record.SetStartTime(StartTime().MilliSeconds());
+
+ if (auto* resourceMetrics = Executor()->GetResourceMetrics()) {
+ resourceMetrics->Fill(*ev->Record.MutableTabletMetrics());
+ }
+
+ auto* tabletStats = ev->Record.MutableTableStats();
+ tabletStats->SetTxRejectedByOverload(TabletCounters->Cumulative()[COUNTER_WRITE_OVERLOAD].Get());
+ tabletStats->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_OUT_OF_SPACE].Get());
+ tabletStats->SetInFlightTxCount(Executor()->GetStats().TxInFly);
+
+ if (PrimaryIndex) {
+ const auto& indexStats = PrimaryIndex->GetTotalStats();
+ NOlap::TSnapshot lastIndexUpdate = PrimaryIndex->LastUpdate();
+ auto activeIndexStats = indexStats.Active(); // data stats excluding inactive and evicted
+
+ tabletStats->SetRowCount(activeIndexStats.Rows);
+ tabletStats->SetDataSize(activeIndexStats.Bytes + TabletCounters->Simple()[COUNTER_COMMITTED_BYTES].Get());
+ // TODO: we need row/dataSize counters for evicted data (managed by tablet but stored outside)
+ //tabletStats->SetIndexSize(); // TODO: calc size of internal tables
+ //tabletStats->SetLastAccessTime(); // TODO: last read/write time
+ tabletStats->SetLastUpdateTime(lastIndexUpdate.PlanStep);
+ }
+ }
+
+ LOG_S_DEBUG("Sending periodic stats at tablet " << TabletID());
+ NTabletPipe::SendData(ctx, StatsReportPipe, ev.release());
+}
+
}
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h
index 62486e3c33..809a0c6e19 100644
--- a/ydb/core/tx/columnshard/columnshard.h
+++ b/ydb/core/tx/columnshard/columnshard.h
@@ -8,7 +8,7 @@
#include <ydb/core/tx/long_tx_service/public/types.h>
-// TODO: temporarily reuse datashard TEvScan (KIKIMR-11069)
+// TODO: temporarily reuse datashard TEvScan (KIKIMR-11069) and TEvPeriodicTableStats
#include <ydb/core/tx/datashard/datashard.h>
namespace NKikimr {
@@ -236,6 +236,7 @@ struct TEvColumnShard {
};
using TEvScan = TEvDataShard::TEvKqpScan;
+ using TEvPeriodicTableStats = TEvDataShard::TEvPeriodicTableStats;
};
inline auto& Proto(TEvColumnShard::TEvProposeTransaction* ev) {
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index b5493685a0..e96b215c8e 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -19,13 +19,13 @@ void TTxInit::SetDefaults() {
Self->LastWriteId = TWriteId{0};
Self->LastPlannedStep = 0;
Self->LastPlannedTxId = 0;
+ Self->StorePathId = 0;
Self->BasicTxInfo.clear();
Self->DeadlineQueue.clear();
Self->PlanQueue.clear();
Self->AltersInFlight.clear();
Self->CommitsInFlight.clear();
Self->SchemaPresets.clear();
- //Self->TtlSettingsPresets.clear();
Self->Tables.clear();
Self->LongTxWrites.clear();
Self->LongTxWritesByUniqueId.clear();
@@ -62,6 +62,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastWriteId, Self->LastWriteId);
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastPlannedStep, Self->LastPlannedStep);
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastPlannedTxId, Self->LastPlannedTxId);
+ ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::StorePathId, Self->StorePathId);
if (!ready)
return false;
@@ -170,49 +171,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
return false;
}
}
-#if 0 // TTL presets haven't been used
- { // Load ttl settings presets
- auto rowset = db.Table<Schema::TtlSettingsPresetInfo>().Select();
- if (!rowset.IsReady())
- return false;
-
- while (!rowset.EndOfSet()) {
- const ui32 id = rowset.GetValue<Schema::TtlSettingsPresetInfo::Id>();
- auto& preset = Self->TtlSettingsPresets[id];
- preset.Id = id;
- preset.Name = rowset.GetValue<Schema::TtlSettingsPresetInfo::Name>();
- if (rowset.HaveValue<Schema::TtlSettingsPresetInfo::DropStep>() &&
- rowset.HaveValue<Schema::TtlSettingsPresetInfo::DropTxId>())
- {
- preset.DropVersion.Step = rowset.GetValue<Schema::TtlSettingsPresetInfo::DropStep>();
- preset.DropVersion.TxId = rowset.GetValue<Schema::TtlSettingsPresetInfo::DropTxId>();
- }
-
- if (!rowset.Next())
- return false;
- }
- }
-
- { // Load ttl settings preset versions
- auto rowset = db.Table<Schema::TtlSettingsPresetVersionInfo>().Select();
- if (!rowset.IsReady())
- return false;
- while (!rowset.EndOfSet()) {
- const ui32 id = rowset.GetValue<Schema::TtlSettingsPresetVersionInfo::Id>();
- Y_VERIFY(Self->TtlSettingsPresets.contains(id));
- auto& preset = Self->TtlSettingsPresets.at(id);
- const TRowVersion version(
- rowset.GetValue<Schema::TtlSettingsPresetVersionInfo::SinceStep>(),
- rowset.GetValue<Schema::TtlSettingsPresetVersionInfo::SinceTxId>());
- auto& info = preset.Versions[version];
- Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::TtlSettingsPresetVersionInfo::InfoProto>()));
-
- if (!rowset.Next())
- return false;
- }
- }
-#endif
{ // Load tables
auto rowset = db.Table<Schema::TableInfo>().Select();
if (!rowset.IsReady())
@@ -266,7 +225,6 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
Self->SetCounter(COUNTER_TABLES, Self->Tables.size());
Self->SetCounter(COUNTER_TABLE_PRESETS, Self->SchemaPresets.size());
- //Self->SetCounter(COUNTER_TTL_PRESETS, Self->TtlSettingsPresets.size());
Self->SetCounter(COUNTER_TABLE_TTLS, ttls.size());
if (!schemaPreset.empty()) {
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index 481e8a48aa..028b236624 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -114,4 +114,79 @@ void TTxWrite::Complete(const TActorContext& ctx) {
ctx.Send(Ev->Get()->GetSource(), Result.release());
}
+// EvWrite -> WriteActor (attach BlobId without proto changes) -> EvWrite
+void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) {
+ OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels));
+
+ auto& record = Proto(ev->Get());
+ auto& data = record.GetData();
+ ui64 tableId = record.GetTableId();
+ ui64 metaShard = record.GetTxInitiator();
+ ui64 writeId = record.GetWriteId();
+ TString dedupId = record.GetDedupId();
+
+ bool isWritable = IsTableWritable(tableId);
+ bool error = data.empty() || data.size() > TLimits::MAX_BLOB_SIZE || !PrimaryIndex || !isWritable;
+ bool errorReturned = (ev->Get()->PutStatus != NKikimrProto::OK) && (ev->Get()->PutStatus != NKikimrProto::UNKNOWN);
+ bool isOutOfSpace = IsAnyChannelYellowStop();
+
+ if (error || errorReturned) {
+ LOG_S_WARN("Write (fail) " << data.size() << " bytes into pathId " << tableId
+ << ", status " << ev->Get()->PutStatus
+ << (PrimaryIndex? "": ", no index") << (isWritable? "": ", ro")
+ << " at tablet " << TabletID());
+
+ IncCounter(COUNTER_WRITE_FAIL);
+
+ auto errCode = NKikimrTxColumnShard::EResultStatus::ERROR;
+ if (errorReturned) {
+ if (ev->Get()->PutStatus == NKikimrProto::TIMEOUT) {
+ errCode = NKikimrTxColumnShard::EResultStatus::TIMEOUT;
+ }
+ --WritesInFly; // write failed
+ }
+
+ auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(
+ TabletID(), metaShard, writeId, tableId, dedupId, errCode);
+ ctx.Send(ev->Get()->GetSource(), result.release());
+
+ } else if (ev->Get()->BlobId.IsValid()) {
+ LOG_S_DEBUG("Write (record) " << data.size() << " bytes into pathId " << tableId
+ << (writeId? (" writeId " + ToString(writeId)).c_str() : "") << " at tablet " << TabletID());
+
+ --WritesInFly; // write successed
+ Execute(new TTxWrite(this, ev), ctx);
+ } else if (isOutOfSpace || InsertTable->IsOverloaded(tableId) || ShardOverloaded()) {
+ IncCounter(COUNTER_WRITE_FAIL);
+
+ if (isOutOfSpace) {
+ IncCounter(COUNTER_OUT_OF_SPACE);
+ LOG_S_ERROR("Write (out of disk space) " << data.size() << " bytes into pathId " << tableId
+ << " at tablet " << TabletID());
+ } else {
+ IncCounter(COUNTER_WRITE_OVERLOAD);
+ bool tableOverload = InsertTable->IsOverloaded(tableId);
+ LOG_S_INFO("Write (overload) " << data.size() << " bytes into pathId " << tableId
+ << (ShardOverloaded()? "[shard]" : "") << (tableOverload? "[table]" : "")
+ << " at tablet " << TabletID());
+ }
+
+ auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(
+ TabletID(), metaShard, writeId, tableId, dedupId, NKikimrTxColumnShard::EResultStatus::OVERLOADED);
+ ctx.Send(ev->Get()->GetSource(), result.release());
+ } else {
+ LOG_S_DEBUG("Write (blob) " << data.size() << " bytes into pathId " << tableId
+ << (writeId? (" writeId " + ToString(writeId)).c_str() : "")
+ << " at tablet " << TabletID());
+
+ ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize;
+
+ ++WritesInFly; // write started
+ ctx.Register(CreateWriteActor(TabletID(), PrimaryIndex->GetIndexInfo(), ctx.SelfID,
+ BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release()));
+ }
+
+ SetCounter(COUNTER_WRITES_IN_FLY, WritesInFly);
+}
+
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index cadd224819..8b0a6479d6 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -68,35 +68,6 @@ void TColumnShard::UnregisterMediatorTimeCast() {
}
}
-void TColumnShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext&) {
- const auto* msg = ev->Get();
- Y_VERIFY(msg->TabletId == TabletID());
- MediatorTimeCastEntry = msg->Entry;
- Y_VERIFY(MediatorTimeCastEntry);
- LOG_S_DEBUG("Registered with mediator time cast at tablet " << TabletID());
-
- RescheduleWaitingReads();
-}
-
-void TColumnShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext&) {
- const auto* msg = ev->Get();
- Y_VERIFY(msg->TabletId == TabletID());
-
- Y_VERIFY(MediatorTimeCastEntry);
- ui64 step = MediatorTimeCastEntry->Get(TabletID());
- LOG_S_DEBUG("Notified by mediator time cast with PlanStep# " << step << " at tablet " << TabletID());
-
- for (auto it = MediatorTimeCastWaitingSteps.begin(); it != MediatorTimeCastWaitingSteps.end();) {
- if (step < *it) {
- break;
- }
- it = MediatorTimeCastWaitingSteps.erase(it);
- }
-
- RescheduleWaitingReads();
- EnqueueBackgroundActivities(true);
-}
-
bool TColumnShard::WaitPlanStep(ui64 step) {
if (step <= LastPlannedStep) {
return false;
@@ -293,7 +264,8 @@ void TColumnShard::UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExec
}
}
-void TColumnShard::ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& seqNoProto, NTabletFlatExecutor::TTransactionContext& txc) {
+void TColumnShard::ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& seqNoProto,
+ NTabletFlatExecutor::TTransactionContext& txc) {
auto seqNo = SeqNoFromProto(seqNoProto);
if (LastSchemaSeqNo <= seqNo) {
UpdateSchemaSeqNo(++seqNo, txc);
@@ -308,7 +280,8 @@ bool TColumnShard::IsTableWritable(ui64 tableId) const {
return !it->second.IsDropped();
}
-ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, const TRowVersion& version) {
+ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto,
+ const TRowVersion& version) {
if (!SchemaPresets.contains(presetProto.GetId())) {
auto& preset = SchemaPresets[presetProto.GetId()];
preset.Id = presetProto.GetId();
@@ -328,30 +301,12 @@ ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp
return presetProto.GetId();
}
-#if 0
-ui32 TColumnShard::EnsureTtlSettingsPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableTtlSettingsPreset& presetProto, const TRowVersion& version) {
- if (!TtlSettingsPresets.contains(presetProto.GetId())) {
- auto& preset = TtlSettingsPresets[presetProto.GetId()];
- preset.Id = presetProto.GetId();
- preset.Name = presetProto.GetName();
- auto& info = preset.Versions[version];
- info.SetId(preset.Id);
- info.SetSinceStep(version.Step);
- info.SetSinceTxId(version.TxId);
- *info.MutableTtlSettings() = presetProto.GetTtlSettings();
-
- Schema::SaveTtlSettingsPresetInfo(db, preset.Id, preset.Name);
- Schema::SaveTtlSettingsPresetVersionInfo(db, preset.Id, version, info);
- SetCounter(COUNTER_TTL_PRESETS, TtlSettingsPresets.size());
- }
- return presetProto.GetId();
-}
-#endif
-void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc) {
+void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version,
+ NTabletFlatExecutor::TTransactionContext& txc) {
switch (body.TxBody_case()) {
case NKikimrTxColumnShard::TSchemaTxBody::kInitShard: {
- // nothing yet, but maybe remember initial PlanStep:TxId for some reason?
+ RunInit(body.GetInitShard(), version, txc);
return;
}
case NKikimrTxColumnShard::TSchemaTxBody::kEnsureTables: {
@@ -380,7 +335,20 @@ void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body,
Y_FAIL("Unsupported schema tx type");
}
-void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tableProto, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc) {
+void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const TRowVersion& version,
+ NTabletFlatExecutor::TTransactionContext& txc) {
+ Y_UNUSED(version);
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ if (proto.HasStorePathId()) {
+ StorePathId = proto.GetStorePathId();
+ Schema::SaveSpecialValue(db, Schema::EValueIds::StorePathId, StorePathId);
+ }
+}
+
+void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tableProto, const TRowVersion& version,
+ NTabletFlatExecutor::TTransactionContext& txc) {
NIceDb::TNiceDb db(txc.DB);
const ui64 pathId = tableProto.GetPathId();
@@ -403,12 +371,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
Ttl.SetPathTtl(pathId, TTtl::TDescription(tableProto.GetTtlSettings()));
SetCounter(COUNTER_TABLE_TTLS, Ttl.PathsCount());
}
-#if 0
- if (tableProto.HasTtlSettingsPreset()) {
- ui32 ttlPresetId = EnsureTtlSettingsPreset(db, tableProto.GetTtlSettingsPreset(), version);
- tableVerProto.SetTtlSettingsPresetId(ttlPresetId);
- }
-#endif
+
if (!PrimaryIndex && schemaPresetId) {
auto& schemaPresetVerProto = SchemaPresets[schemaPresetId].Versions[version];
TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset;
@@ -427,7 +390,8 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl
}
}
-void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterProto, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc) {
+void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterProto, const TRowVersion& version,
+ NTabletFlatExecutor::TTransactionContext& txc) {
NIceDb::TNiceDb db(txc.DB);
const ui64 pathId = alterProto.GetPathId();
@@ -449,21 +413,14 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP
} else {
Ttl.DropPathTtl(pathId);
}
-#if 0
- if (alterProto.HasTtlSettingsPreset()) {
- ui32 ttlPresetId = EnsureTtlSettingsPreset(db, alterProto.GetTtlSettingsPreset(), version);
- info.SetTtlSettingsPresetId(ttlPresetId);
- }
-#endif
+
info.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj());
-#if 0
- info.SetTtlSettingsPresetVersionAdj(alterProto.GetTtlSettingsPresetVersionAdj());
-#endif
Schema::SaveTableVersionInfo(db, table.PathId, version, info);
}
-void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc) {
+void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const TRowVersion& version,
+ NTabletFlatExecutor::TTransactionContext& txc) {
NIceDb::TNiceDb db(txc.DB);
const ui64 pathId = dropProto.GetPathId();
@@ -486,9 +443,15 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt
}
}
-void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc) {
+void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const TRowVersion& version,
+ NTabletFlatExecutor::TTransactionContext& txc) {
NIceDb::TNiceDb db(txc.DB);
+ if (proto.HasStorePathId()) {
+ StorePathId = proto.GetStorePathId();
+ Schema::SaveSpecialValue(db, Schema::EValueIds::StorePathId, StorePathId);
+ }
+
TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset;
for (ui32 id : proto.GetDroppedSchemaPresets()) {
@@ -500,16 +463,7 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto,
preset.DropVersion = version;
Schema::SaveSchemaPresetDropVersion(db, id, version);
}
-#if 0
- for (ui32 id : proto.GetDroppedTtlSettingsPresets()) {
- if (!TtlSettingsPresets.contains(id)) {
- continue;
- }
- auto& preset = TtlSettingsPresets.at(id);
- preset.DropVersion = version;
- Schema::SaveTtlSettingsPresetDropVersion(db, id, version);
- }
-#endif
+
for (const auto& presetProto : proto.GetSchemaPresets()) {
if (!SchemaPresets.contains(presetProto.GetId())) {
continue; // we don't update presets that we don't use
@@ -528,23 +482,6 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto,
Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info);
}
-#if 0
- for (const auto& presetProto : proto.GetTtlSettingsPresets()) {
- ui32 presetId = presetProto.GetId();
- if (!TtlSettingsPresets.contains(presetId)) {
- continue; // we don't update presets that we don't use
- }
-
- auto& preset = TtlSettingsPresets[presetId];
- auto& info = preset.Versions[version];
- info.SetId(presetId);
- info.SetSinceStep(version.Step);
- info.SetSinceTxId(version.TxId);
- *info.MutableTtlSettings() = presetProto.GetTtlSettings();
-
- Schema::SaveTtlSettingsPresetVersionInfo(db, preset.Id, version, info);
- }
-#endif
if (!schemaPreset.empty()) {
SetPrimaryIndex(std::move(schemaPreset));
@@ -572,6 +509,8 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, bool insertOnly) {
}
const TActorContext& ctx = TlsActivationContext->AsActorContext();
+ SendPeriodicStats(ctx);
+
if (auto event = SetupIndexation()) {
ctx.Send(IndexingActor, event.release());
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 951827b623..9c42e0b57d 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -17,17 +17,32 @@ namespace NKikimr::NColumnShard {
extern bool gAllowLogBatchingDefaultValue;
+IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent);
+IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent);
+IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent);
+IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
+ const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
+ TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline = TInstant::Max());
+IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable,
+ const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled,
+ TAutoPtr<TEvPrivate::TEvWriteIndex> ev, const TInstant& deadline = TInstant::Max());
+IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId);
+
struct TSettings {
TControlWrapper BlobWriteGrouppingEnabled;
TControlWrapper CacheDataAfterIndexing;
TControlWrapper CacheDataAfterCompaction;
TControlWrapper MaxSmallBlobSize;
+ TControlWrapper OverloadTxInFly;
+ TControlWrapper OverloadWritesInFly;
TSettings()
: BlobWriteGrouppingEnabled(1, 0, 1)
, CacheDataAfterIndexing(1, 0, 1)
, CacheDataAfterCompaction(1, 0, 1)
, MaxSmallBlobSize(0, 0, 8000000)
+ , OverloadTxInFly(1000, 0, 10000)
+ , OverloadWritesInFly(1000, 0, 10000)
{}
void RegisterControls(TControlBoard& icb) {
@@ -35,6 +50,8 @@ struct TSettings {
icb.RegisterSharedControl(CacheDataAfterIndexing, "ColumnShardControls.CacheDataAfterIndexing");
icb.RegisterSharedControl(CacheDataAfterCompaction, "ColumnShardControls.CacheDataAfterCompaction");
icb.RegisterSharedControl(MaxSmallBlobSize, "ColumnShardControls.MaxSmallBlobSize");
+ icb.RegisterSharedControl(OverloadTxInFly, "ColumnShardControls.OverloadTxInFly");
+ icb.RegisterSharedControl(OverloadWritesInFly, "ColumnShardControls.OverloadWritesInFly");
}
};
@@ -102,6 +119,7 @@ class TColumnShard
void Die(const TActorContext& ctx) override {
// TODO
+ NTabletPipe::CloseAndForgetClient(SelfId(), StatsReportPipe);
UnregisterMediatorTimeCast();
return IActor::Die(ctx);
}
@@ -257,20 +275,7 @@ private:
return DropVersion != TRowVersion::Max();
}
};
-#if 0
- struct TTtlSettingsPreset {
- using TVerProto = NKikimrTxColumnShard::TTtlSettingsPresetVersionInfo;
-
- ui32 Id;
- TString Name;
- TMap<TRowVersion, TVerProto> Versions;
- TRowVersion DropVersion = TRowVersion::Max();
- bool IsDropped() const {
- return DropVersion != TRowVersion::Max();
- }
- };
-#endif
struct TTableInfo {
using TVerProto = NKikimrTxColumnShard::TTableVersionInfo;
@@ -296,6 +301,9 @@ private:
ui64 LastPlannedStep = 0;
ui64 LastPlannedTxId = 0;
ui64 LastCompactedGranule = 0;
+ ui64 WritesInFly = 0;
+ ui64 StorePathId = 0;
+ ui64 StatsReportRound = 0;
TIntrusivePtr<TMediatorTimecastEntry> MediatorTimeCastEntry;
bool MediatorTimeCastRegistered = false;
@@ -304,11 +312,14 @@ private:
TDuration MaxCommitTxDelay = TDuration::Seconds(30); // TODO: Make configurable?
TDuration ActivationPeriod = TDuration::Seconds(60);
TDuration FailActivationDelay = TDuration::Seconds(1);
+ TDuration StatsReportInterval = TDuration::Seconds(10);
TInstant LastBackActivation;
+ TInstant LastStatsReport;
TActorId IndexingActor; // It's logically bounded to 1: we move each portion of data to multiple indices.
TActorId CompactionActor; // It's memory bounded to 1: we have no memory for parallel compation.
TActorId EvictionActor;
+ TActorId StatsReportPipe;
std::unique_ptr<TTabletCountersBase> TabletCountersPtr;
TTabletCountersBase* TabletCounters;
std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache;
@@ -325,7 +336,6 @@ private:
THashMap<ui64, TAlterMeta> AltersInFlight;
THashMap<ui64, TCommitMeta> CommitsInFlight; // key is TxId from propose
THashMap<ui32, TSchemaPreset> SchemaPresets;
- //THashMap<ui32, TTtlSettingsPreset> TtlSettingsPresets;
THashMap<ui64, TTableInfo> Tables;
THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites;
THashMap<TULID, TLongTxWriteInfo*> LongTxWritesByUniqueId;
@@ -354,6 +364,13 @@ private:
ui64 GetAllowedStep() const;
bool HaveOutdatedTxs() const;
+ bool ShardOverloaded() const {
+ ui64 txLimit = Settings.OverloadTxInFly;
+ ui64 writesLimit = Settings.OverloadWritesInFly;
+ return (txLimit && Executor()->GetStats().TxInFly > txLimit) ||
+ (writesLimit && WritesInFly > writesLimit);
+ }
+
TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId);
void AddLongTxWrite(TWriteId writeId, ui64 txId);
void LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLongTxId& longTxId);
@@ -372,6 +389,7 @@ private:
//ui32 EnsureTtlSettingsPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableTtlSettingsPreset& presetProto, const TRowVersion& version);
void RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
+ void RunInit(const NKikimrTxColumnShard::TInitShard& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunAlterTable(const NKikimrTxColumnShard::TAlterTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc);
@@ -390,6 +408,8 @@ private:
void UpdateInsertTableCounters();
void UpdateIndexCounters();
void UpdateResourceMetrics(const TUsage& usage);
+ ui64 MemoryUsage() const;
+ void SendPeriodicStats(const TActorContext& ctx);
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index d88300f374..a6d62d9582 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -47,6 +47,8 @@ struct Schema : NIceDb::Schema {
LastGcBarrierGen = 8,
LastGcBarrierStep = 9,
+ //LastExportNumber = 10,
+ StorePathId = 11,
};
enum class EInsertTableIds : ui8 {
@@ -345,37 +347,7 @@ struct Schema : NIceDb::Schema {
static void EraseSchemaPresetInfo(NIceDb::TNiceDb& db, ui64 id) {
db.Table<SchemaPresetInfo>().Key(id).Delete();
}
-#if 0
- static void SaveTtlSettingsPresetInfo(NIceDb::TNiceDb& db, ui64 id, const TString& name) {
- db.Table<TtlSettingsPresetInfo>().Key(id).Update(
- NIceDb::TUpdate<TtlSettingsPresetInfo::Name>(name));
- }
- static void SaveTtlSettingsPresetVersionInfo(
- NIceDb::TNiceDb& db,
- ui64 id, const TRowVersion& version,
- const NKikimrTxColumnShard::TTtlSettingsPresetVersionInfo& info)
- {
- TString serialized;
- Y_VERIFY(info.SerializeToString(&serialized));
- db.Table<TtlSettingsPresetVersionInfo>().Key(id, version.Step, version.TxId).Update(
- NIceDb::TUpdate<TtlSettingsPresetVersionInfo::InfoProto>(serialized));
- }
-
- static void SaveTtlSettingsPresetDropVersion(NIceDb::TNiceDb& db, ui64 id, const TRowVersion& dropVersion) {
- db.Table<TtlSettingsPresetInfo>().Key(id).Update(
- NIceDb::TUpdate<TtlSettingsPresetInfo::DropStep>(dropVersion.Step),
- NIceDb::TUpdate<TtlSettingsPresetInfo::DropTxId>(dropVersion.TxId));
- }
-
- static void EraseTtlSettingsPresetVersionInfo(NIceDb::TNiceDb& db, ui64 id, const TRowVersion& version) {
- db.Table<TtlSettingsPresetVersionInfo>().Key(id, version.Step, version.TxId).Delete();
- }
-
- static void EraseTtlSettingsPresetInfo(NIceDb::TNiceDb& db, ui64 id) {
- db.Table<TtlSettingsPresetInfo>().Key(id).Delete();
- }
-#endif
static void SaveTableInfo(NIceDb::TNiceDb& db, ui64 pathId) {
db.Table<TableInfo>().Key(pathId).Update();
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 8fb6ac424a..8c14d41815 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -306,6 +306,22 @@ struct TColumnEngineStats {
TPortionsStats Inactive{};
TPortionsStats Evicted{};
+ TPortionsStats Active() const {
+ return TPortionsStats {
+ .Portions = ActivePortions(),
+ .Blobs = ActiveBlobs(),
+ .Rows = ActiveRows(),
+ .Bytes = ActiveBytes(),
+ .RawBytes = ActiveRawBytes()
+ };
+ }
+
+ ui64 ActivePortions() const { return Inserted.Portions + Compacted.Portions + SplitCompacted.Portions; }
+ ui64 ActiveBlobs() const { return Inserted.Blobs + Compacted.Blobs + SplitCompacted.Blobs; }
+ ui64 ActiveRows() const { return Inserted.Rows + Compacted.Rows + SplitCompacted.Rows; }
+ ui64 ActiveBytes() const { return Inserted.Bytes + Compacted.Bytes + SplitCompacted.Bytes; }
+ ui64 ActiveRawBytes() const { return Inserted.RawBytes + Compacted.RawBytes + SplitCompacted.RawBytes; }
+
void Clear() {
*this = {};
}
@@ -341,6 +357,7 @@ public:
virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0;
virtual const TColumnEngineStats& GetTotalStats() = 0;
virtual ui64 MemoryUsage() const { return 0; }
+ virtual TSnapshot LastUpdate() const { return {}; }
};
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 7a4f443ed3..a8b071f2ba 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -149,6 +149,7 @@ public:
const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const override;
const TColumnEngineStats& GetTotalStats() override;
ui64 MemoryUsage() const override;
+ TSnapshot LastUpdate() const override { return LastSnapshot; }
std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot,
const THashSet<ui32>& columnIds,
diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp
index e32d1f1f9a..ec78578324 100644
--- a/ydb/core/tx/columnshard/write_actor.cpp
+++ b/ydb/core/tx/columnshard/write_actor.cpp
@@ -104,15 +104,15 @@ public:
void SendWriteRequest(const TActorContext& ctx) {
Y_VERIFY(WriteEv->PutStatus == NKikimrProto::UNKNOWN);
- LOG_S_DEBUG("Writing inserted blob at tablet " << TabletId);
-
auto& record = Proto(WriteEv.Get());
+ ui64 pathId = record.GetTableId();
+ ui64 writeId = record.GetWriteId();
auto& srcData = record.GetData();
TString meta;
if (record.HasMeta()) {
meta = record.GetMeta().GetSchema();
if (meta.empty() || record.GetMeta().GetFormat() != NKikimrTxColumnShard::FORMAT_ARROW) {
- LOG_S_DEBUG("Bad metadata for writing data at tablet " << TabletId);
+ LOG_S_INFO("Bad metadata for writeId " << writeId << " pathId " << pathId << " at tablet " << TabletId);
SendResultAndDie(ctx, NKikimrProto::ERROR);
}
}
@@ -125,7 +125,8 @@ public:
batch = IndexInfo.PrepareForInsert(srcData, meta, strError);
}
if (!batch) {
- LOG_S_DEBUG("Bad data to write (" << strError << ") at tablet " << TabletId);
+ LOG_S_INFO("Bad data for writeId " << writeId << ", pathId " << pathId
+ << " (" << strError << ") at tablet " << TabletId);
SendResultAndDie(ctx, NKikimrProto::ERROR);
return;
}
@@ -136,8 +137,9 @@ public:
data = NArrow::SerializeBatchNoCompression(batch);
}
if (data.size() > TLimits::MAX_BLOB_SIZE) {
- LOG_S_DEBUG("Extracted data (" << data.size() << " bytes) is bigger than source ("
- << srcData.size() << " bytes) and limit at tablet " << TabletId);
+ LOG_S_INFO("Extracted data (" << data.size() << " bytes) is bigger than source ("
+ << srcData.size() << " bytes) and limit, writeId " << writeId << " pathId " << pathId
+ << " at tablet " << TabletId);
SendResultAndDie(ctx, NKikimrProto::ERROR);
return;
@@ -156,7 +158,8 @@ public:
meta.clear();
if (!outMeta.SerializeToString(&meta)) {
- LOG_S_ERROR("Canot set metadata for writing blob at tablet " << TabletId);
+ LOG_S_ERROR("Canot set metadata for blob, writeId " << writeId << " pathId " << pathId
+ << " at tablet " << TabletId);
SendResultAndDie(ctx, NKikimrProto::ERROR);
return;
}
@@ -173,7 +176,8 @@ public:
Y_VERIFY(WriteEv->BlobId.BlobSize() == data.size());
- LOG_S_DEBUG("Write Blob " << WriteEv->BlobId.ToStringNew());
+ LOG_S_DEBUG("Writing " << WriteEv->BlobId.ToStringNew() << " writeId " << writeId << " pathId " << pathId
+ << " at tablet " << TabletId);
if (BlobBatch.AllBlobWritesCompleted()) {
SendResultAndDie(ctx, NKikimrProto::OK);
@@ -298,7 +302,7 @@ private:
void SendResult(const TActorContext& ctx, NKikimrProto::EReplyStatus status) {
SaveResourceUsage();
if (WriteEv) {
- LOG_S_DEBUG("Write Blob " << WriteEv->BlobId.ToStringNew() << " Status: " << status);
+ LOG_S_DEBUG("Written " << WriteEv->BlobId.ToStringNew() << " Status: " << status);
WriteEv->PutStatus = status;
WriteEv->BlobBatch = std::move(BlobBatch);
WriteEv->YellowMoveChannels = TVector<ui32>(YellowMoveChannels.begin(), YellowMoveChannels.end());
diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h
index 1de4c368ce..6e0f11fa18 100644
--- a/ydb/core/tx/datashard/datashard.h
+++ b/ydb/core/tx/datashard/datashard.h
@@ -823,6 +823,11 @@ struct TEvDataShard {
Record.SetTableOwnerId(tableOwnerId);
Record.SetTableLocalId(tableLocalId);
}
+
+ TEvPeriodicTableStats(ui64 datashardId, ui64 tableLocalId) {
+ Record.SetDatashardId(datashardId);
+ Record.SetTableLocalId(tableLocalId);
+ }
};
struct TEvS3ListingRequest : public TEventPB<TEvS3ListingRequest,
diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h
index 14d5530153..8f045de44b 100644
--- a/ydb/core/tx/schemeshard/operation_queue_timer.h
+++ b/ydb/core/tx/schemeshard/operation_queue_timer.h
@@ -115,7 +115,7 @@ struct TShardCompactionInfo {
: ShardIdx(id)
{}
- TShardCompactionInfo(const TShardIdx& id, const TTableInfo::TPartitionStats& stats)
+ TShardCompactionInfo(const TShardIdx& id, const TPartitionStats& stats)
: ShardIdx(id)
, SearchHeight(stats.SearchHeight)
, LastFullCompactionTs(stats.FullCompactionTs)
diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
index 810be23a9d..2f9d015ec5 100644
--- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp
@@ -85,7 +85,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T
// it's OK to OnDone InvalidShardIdx
// move shard to the end of all queues
TInstant now = AppData(ctx)->TimeProvider->Now();
- TTableInfo::TPartitionStats stats;
+ TPartitionStats stats;
stats.FullCompactionTs = now.Seconds();
auto duration = CompactionQueue->OnDone(TShardCompactionInfo(shardIdx, stats));
@@ -139,7 +139,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T
void TSchemeShard::EnqueueBackgroundCompaction(
const TShardIdx& shardIdx,
- const TTableInfo::TPartitionStats& stats)
+ const TPartitionStats& stats)
{
if (!CompactionQueue)
return;
@@ -174,7 +174,7 @@ void TSchemeShard::EnqueueBackgroundCompaction(
void TSchemeShard::UpdateBackgroundCompaction(
const TShardIdx& shardIdx,
- const TTableInfo::TPartitionStats& newStats)
+ const TPartitionStats& newStats)
{
if (!CompactionQueue)
return;
@@ -241,7 +241,7 @@ void TSchemeShard::UpdateBackgroundCompactionQueueMetrics() {
void TSchemeShard::UpdateShardMetrics(
const TShardIdx& shardIdx,
- const TTableInfo::TPartitionStats& newStats)
+ const TPartitionStats& newStats)
{
if (newStats.HasBorrowedData)
ShardsWithBorrowed.insert(shardIdx);
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index 7211043579..11527944f0 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -2131,7 +2131,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
}
}
- TTableInfo::TPartitionStats stats;
+ TPartitionStats stats;
stats.SeqNo = TMessageSeqNo(
rowSet.GetValue<Schema::TablePartitionStats::SeqNoGeneration>(),
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp
index fd53304a5e..c94721a26c 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp
@@ -261,6 +261,7 @@ public:
context.SS->FillSeqNo(tx, seqNo);
auto* alter = tx.MutableAlterStore();
+ alter->SetStorePathId(txState->TargetPathId.LocalPathId);
for (ui32 id : droppedSchemaPresets) {
alter->AddDroppedSchemaPresets(id);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
index e8eebf867c..4cb1329e18 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
@@ -276,6 +276,7 @@ public:
// TODO: we may need to specify a more complex data channel mapping
auto* init = tx.MutableInitShard();
init->SetDataChannelCount(storeInfo->Description.GetStorageConfig().GetDataChannelCount());
+ init->SetStorePathId(txState->TargetPathId.LocalPathId);
Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&columnShardTxBody);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
index 02886af4b6..07006a0203 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp
@@ -184,40 +184,13 @@ TColumnTableInfo::TPtr CreateColumnTable(
} else {
storageTiers = pSchema->Tiers;
}
-#if 1
+
if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) {
status = NKikimrScheme::StatusSchemeError;
errStr = "TTL presets are not supported";
return nullptr;
}
-#else
- if (op.HasTtlSettingsPresetName()) {
- const TString presetName = op.GetTtlSettingsPresetName();
- if (!storeInfo->TtlSettingsPresetByName.contains(presetName)) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = Sprintf("Specified ttl settings preset '%s' does not exist in olap store", presetName.c_str());
- return nullptr;
- }
- const ui32 presetId = storeInfo->TtlSettingsPresetByName.at(presetName);
- if (!op.HasTtlSettingsPresetId()) {
- op.SetTtlSettingsPresetId(presetId);
- }
- if (op.GetTtlSettingsPresetId() != presetId) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = Sprintf("Specified ttl settings preset '%s' and id %" PRIu32 " do not match in olap store", presetName.c_str(), presetId);
- return nullptr;
- }
- } else if (op.HasTtlSettingsPresetId()) {
- const ui32 presetId = op.GetTtlSettingsPresetId();
- if (!storeInfo->TtlSettingsPresets.contains(presetId)) {
- status = NKikimrScheme::StatusSchemeError;
- errStr = Sprintf("Specified ttl preset %" PRIu32 " does not exist in olap store", presetId);
- return nullptr;
- }
- const TString& presetName = storeInfo->TtlSettingsPresets.at(presetId).Name;
- op.SetTtlSettingsPresetName(presetName);
- } else
-#endif
+
if (op.HasTtlSettings()) {
op.MutableTtlSettings()->SetVersion(1);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
index bd64a85234..1a5cc6fa3f 100644
--- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp
@@ -27,7 +27,7 @@ void TSchemeShard::Handle(NSysView::TEvSysView::TEvGetPartitionStats::TPtr& ev,
}
auto TSchemeShard::BuildStatsForCollector(TPathId pathId, TShardIdx shardIdx, TTabletId datashardId,
- TMaybe<ui32> nodeId, TMaybe<ui64> startTime, const TTableInfo::TPartitionStats& stats)
+ TMaybe<ui32> nodeId, TMaybe<ui64> startTime, const TPartitionStats& stats)
{
auto ev = MakeHolder<NSysView::TEvSysView::TEvSendPartitionStats>(
GetDomainKey(pathId), pathId, std::make_pair(ui64(shardIdx.GetOwnerId()), ui64(shardIdx.GetLocalId())));
@@ -175,20 +175,21 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const
ui64 dataSize = tableStats.GetDataSize();
ui64 rowCount = tableStats.GetRowCount();
- if (!Self->Tables.contains(pathId)) {
+ bool isDataShard = Self->Tables.contains(pathId);
+ bool isOlapStore = Self->OlapStores.contains(pathId);
+ if (!isDataShard && !isOlapStore) {
+ LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unexpected stats from shard " << datashardId);
return true;
}
- TTableInfo::TPtr table = Self->Tables[pathId];
-
if (!Self->TabletIdToShardIdx.contains(datashardId)) {
+ LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "No shardIdx for shard " << datashardId);
return true;
}
- auto shardIdx = Self->TabletIdToShardIdx[datashardId];
- const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings();
+ TShardIdx shardIdx = Self->TabletIdToShardIdx[datashardId];
- TTableInfo::TPartitionStats newStats;
+ TPartitionStats newStats;
newStats.SeqNo = TMessageSeqNo(rec.GetGeneration(), rec.GetRound());
newStats.RowCount = tableStats.GetRowCount();
@@ -236,22 +237,46 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const
}
newStats.ShardState = rec.GetShardState();
- auto oldAggrStats = table->GetStats().Aggregated;
- table->UpdateShardStats(shardIdx, newStats);
+ LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Add stats from shard " << datashardId << ", pathId " << pathId.LocalPathId
+ << ": RowCount " << newStats.RowCount << ", DataSize " << newStats.DataSize);
- if (!table->IsBackup) {
- Self->UpdateBackgroundCompaction(shardIdx, newStats);
- Self->UpdateShardMetrics(shardIdx, newStats);
- }
+ NIceDb::TNiceDb db(txc.DB);
- if (!newStats.HasBorrowedData) {
- Self->RemoveBorrowedCompaction(shardIdx);
- }
+ TTableInfo::TPtr table;
+ TPartitionStats oldAggrStats;
+ TPartitionStats newAggrStats;
+ bool updateSubdomainInfo = false;
- NIceDb::TNiceDb db(txc.DB);
+ if (isDataShard) {
+ table = Self->Tables[pathId];
+ oldAggrStats = table->GetStats().Aggregated;
+ table->UpdateShardStats(shardIdx, newStats);
+
+ if (!table->IsBackup) {
+ Self->UpdateBackgroundCompaction(shardIdx, newStats);
+ Self->UpdateShardMetrics(shardIdx, newStats);
+ }
+
+ if (!newStats.HasBorrowedData) {
+ Self->RemoveBorrowedCompaction(shardIdx);
+ }
- if (!table->IsBackup && !table->IsShardsStatsDetached()) {
- auto newAggrStats = table->GetStats().Aggregated;
+ if (!table->IsBackup && !table->IsShardsStatsDetached()) {
+ newAggrStats = table->GetStats().Aggregated;
+ updateSubdomainInfo = true;
+ }
+
+ Self->PersistTablePartitionStats(db, pathId, shardIdx, table);
+ } else if (isOlapStore) {
+ TOlapStoreInfo::TPtr olapStore = Self->OlapStores[pathId];
+ oldAggrStats = olapStore->GetStats().Aggregated;
+ olapStore->UpdateShardStats(shardIdx, newStats);
+ newAggrStats = olapStore->GetStats().Aggregated;
+ updateSubdomainInfo = true;
+ }
+
+ if (updateSubdomainInfo) {
auto subDomainId = Self->ResolveDomainId(pathId);
auto subDomainInfo = Self->ResolveDomainInfo(pathId);
subDomainInfo->AggrDiskSpaceUsage(Self, newAggrStats, oldAggrStats);
@@ -264,8 +289,6 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const
}
}
- Self->PersistTablePartitionStats(db, pathId, shardIdx, table);
-
if (AppData(ctx)->FeatureFlags.GetEnableSystemViews()) {
TMaybe<ui32> nodeId;
if (rec.HasNodeId()) {
@@ -281,6 +304,13 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const
Self->BuildStatsForCollector(pathId, shardIdx, datashardId, nodeId, startTime, newStats).Release());
}
+ if (isOlapStore) {
+ LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Aggregated stats for pathId " << pathId.LocalPathId
+ << ": RowCount " << newAggrStats.RowCount << ", DataSize " << newAggrStats.DataSize);
+ return true;
+ }
+
const auto& shardToPartition = table->GetShard2PartitionIdx();
if (table->IsTTLEnabled() && shardToPartition.contains(shardIdx)) {
const ui64 partitionIdx = shardToPartition.at(shardIdx);
@@ -306,6 +336,7 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const
Self->TabletCounters->Percentile()[COUNTER_NUM_SHARDS_BY_TTL_LAG].IncrementFor(lag->Seconds());
}
+ const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings();
TVector<TShardIdx> shardsToMerge;
if (table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge)) {
TTxId txId = Self->GetCachedTxId(ctx);
@@ -430,7 +461,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvPeriodicTableStats::TPtr& ev, const T
LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
"Got periodic table stats at tablet " << TabletID()
- << " from datashard " << datashardId
+ << " from shard " << datashardId
<< " pathId " << pathId
<< " state '" << DatashardStateName(rec.GetShardState()) << "'"
<< " dataSize " << dataSize
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index e3bcf381d9..9592b57f97 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -2111,7 +2111,7 @@ void TSchemeShard::PersistTablePartitionCondErase(NIceDb::TNiceDb& db, const TPa
}
}
-void TSchemeShard::PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, ui64 partitionId, const TTableInfo::TPartitionStats& stats) {
+void TSchemeShard::PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, ui64 partitionId, const TPartitionStats& stats) {
if (!AppData()->FeatureFlags.GetEnablePersistentPartitionStats()) {
return;
}
@@ -3267,7 +3267,7 @@ void TSchemeShard::PersistRemoveTable(NIceDb::TNiceDb& db, TPathId pathId, const
if (!tableInfo->IsBackup && !tableInfo->IsShardsStatsDetached()) {
auto subDomainId = ResolveDomainId(pathId);
auto subDomainInfo = ResolveDomainInfo(pathId);
- subDomainInfo->AggrDiskSpaceUsage(this, TTableInfo::TPartitionStats(), tableInfo->GetStats().Aggregated);
+ subDomainInfo->AggrDiskSpaceUsage(this, TPartitionStats(), tableInfo->GetStats().Aggregated);
if (subDomainInfo->CheckDiskSpaceQuotas(this)) {
PersistSubDomainState(db, subDomainId, *subDomainInfo);
// Publish is done in a separate transaction, so we may call this directly
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 6e4ecc0256..2f6041ff42 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -517,7 +517,7 @@ public:
void SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, TVector<TTableShardInfo>&& newPartitioning);
auto BuildStatsForCollector(TPathId tableId, TShardIdx shardIdx, TTabletId datashardId,
- TMaybe<ui32> nodeId, TMaybe<ui64> startTime, const TTableInfo::TPartitionStats& stats);
+ TMaybe<ui32> nodeId, TMaybe<ui64> startTime, const TPartitionStats& stats);
bool ReadSysValue(NIceDb::TNiceDb& db, ui64 sysTag, TString& value, TString defValue = TString());
bool ReadSysValue(NIceDb::TNiceDb& db, ui64 sysTag, ui64& value, ui64 defVal = 0);
@@ -557,7 +557,7 @@ public:
void PersistTablePartitioning(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo);
void PersistTablePartitioningDeletion(NIceDb::TNiceDb& db, const TPathId tableId, const TTableInfo::TPtr tableInfo);
void PersistTablePartitionCondErase(NIceDb::TNiceDb& db, const TPathId& pathId, ui64 id, const TTableInfo::TPtr tableInfo);
- void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, ui64 partitionId, const TTableInfo::TPartitionStats& stats);
+ void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, ui64 partitionId, const TPartitionStats& stats);
void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, const TShardIdx& shardIdx, const TTableInfo::TPtr tableInfo);
void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, const TTableInfo::TPtr tableInfo);
void PersistTableCreated(NIceDb::TNiceDb& db, const TPathId tableId);
@@ -731,14 +731,14 @@ public:
void ScheduleCleanDroppedPaths();
void Handle(TEvPrivate::TEvCleanDroppedPaths::TPtr& ev, const TActorContext& ctx);
- void EnqueueBackgroundCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats);
- void UpdateBackgroundCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats);
+ void EnqueueBackgroundCompaction(const TShardIdx& shardIdx, const TPartitionStats& stats);
+ void UpdateBackgroundCompaction(const TShardIdx& shardIdx, const TPartitionStats& stats);
void RemoveBackgroundCompaction(const TShardIdx& shardIdx);
void EnqueueBorrowedCompaction(const TShardIdx& shardIdx);
void RemoveBorrowedCompaction(const TShardIdx& shardIdx);
- void UpdateShardMetrics(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& newStats);
+ void UpdateShardMetrics(const TShardIdx& shardIdx, const TPartitionStats& newStats);
void RemoveShardMetrics(const TShardIdx& shardIdx);
void ShardRemoved(const TShardIdx& shardIdx);
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
index 75da6dea98..ee068bbfb2 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp
@@ -1288,12 +1288,16 @@ void TTableInfo::SetPartitioning(TVector<TTableShardInfo>&& newPartitioning) {
}
}
-void TTableInfo::UpdateShardStats(TShardIdx datashardIdx, TPartitionStats& newStats) {
+void TTableInfo::UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats) {
+ Stats.UpdateShardStats(datashardIdx, newStats);
+}
+
+void TAggregatedStats::UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats) {
// Ignore stats from unknown datashard (it could have been split)
- if (!Stats.PartitionStats.contains(datashardIdx))
+ if (!PartitionStats.contains(datashardIdx))
return;
- TPartitionStats& oldStats = Stats.PartitionStats[datashardIdx];
+ TPartitionStats& oldStats = PartitionStats[datashardIdx];
if (newStats.SeqNo <= oldStats.SeqNo) {
// Ignore outdated message
@@ -1313,46 +1317,47 @@ void TTableInfo::UpdateShardStats(TShardIdx datashardIdx, TPartitionStats& newSt
oldStats.RangeReadRows = 0;
}
- Stats.Aggregated.RowCount += (newStats.RowCount - oldStats.RowCount);
- Stats.Aggregated.DataSize += (newStats.DataSize - oldStats.DataSize);
- Stats.Aggregated.IndexSize += (newStats.IndexSize - oldStats.IndexSize);
- Stats.Aggregated.LastAccessTime = Max(Stats.Aggregated.LastAccessTime, newStats.LastAccessTime);
- Stats.Aggregated.LastUpdateTime = Max(Stats.Aggregated.LastUpdateTime, newStats.LastUpdateTime);
- Stats.Aggregated.ImmediateTxCompleted += (newStats.ImmediateTxCompleted - oldStats.ImmediateTxCompleted);
- Stats.Aggregated.PlannedTxCompleted += (newStats.PlannedTxCompleted - oldStats.PlannedTxCompleted);
- Stats.Aggregated.TxRejectedByOverload += (newStats.TxRejectedByOverload - oldStats.TxRejectedByOverload);
- Stats.Aggregated.TxRejectedBySpace += (newStats.TxRejectedBySpace - oldStats.TxRejectedBySpace);
- Stats.Aggregated.InFlightTxCount += (newStats.InFlightTxCount - oldStats.InFlightTxCount);
+ Aggregated.RowCount += (newStats.RowCount - oldStats.RowCount);
+ Aggregated.DataSize += (newStats.DataSize - oldStats.DataSize);
+ Aggregated.IndexSize += (newStats.IndexSize - oldStats.IndexSize);
+ Aggregated.LastAccessTime = Max(Aggregated.LastAccessTime, newStats.LastAccessTime);
+ Aggregated.LastUpdateTime = Max(Aggregated.LastUpdateTime, newStats.LastUpdateTime);
+ Aggregated.ImmediateTxCompleted += (newStats.ImmediateTxCompleted - oldStats.ImmediateTxCompleted);
+ Aggregated.PlannedTxCompleted += (newStats.PlannedTxCompleted - oldStats.PlannedTxCompleted);
+ Aggregated.TxRejectedByOverload += (newStats.TxRejectedByOverload - oldStats.TxRejectedByOverload);
+ Aggregated.TxRejectedBySpace += (newStats.TxRejectedBySpace - oldStats.TxRejectedBySpace);
+ Aggregated.InFlightTxCount += (newStats.InFlightTxCount - oldStats.InFlightTxCount);
- Stats.Aggregated.RowUpdates += (newStats.RowUpdates - oldStats.RowUpdates);
- Stats.Aggregated.RowDeletes += (newStats.RowDeletes - oldStats.RowDeletes);
- Stats.Aggregated.RowReads += (newStats.RowReads - oldStats.RowReads);
- Stats.Aggregated.RangeReads += (newStats.RangeReads - oldStats.RangeReads);
- Stats.Aggregated.RangeReadRows += (newStats.RangeReadRows - oldStats.RangeReadRows);
+ Aggregated.RowUpdates += (newStats.RowUpdates - oldStats.RowUpdates);
+ Aggregated.RowDeletes += (newStats.RowDeletes - oldStats.RowDeletes);
+ Aggregated.RowReads += (newStats.RowReads - oldStats.RowReads);
+ Aggregated.RangeReads += (newStats.RangeReads - oldStats.RangeReads);
+ Aggregated.RangeReadRows += (newStats.RangeReadRows - oldStats.RangeReadRows);
i64 cpuUsageDelta = newStats.GetCurrentRawCpuUsage() - oldStats.GetCurrentRawCpuUsage();
- i64 prevCpuUsage = Stats.Aggregated.GetCurrentRawCpuUsage();
+ i64 prevCpuUsage = Aggregated.GetCurrentRawCpuUsage();
ui64 newAggregatedCpuUsage = std::max<i64>(0, prevCpuUsage + cpuUsageDelta);
TInstant now = AppData()->TimeProvider->Now();
- Stats.Aggregated.SetCurrentRawCpuUsage(newAggregatedCpuUsage, now);
- Stats.Aggregated.Memory += (newStats.Memory - oldStats.Memory);
- Stats.Aggregated.Network += (newStats.Network - oldStats.Network);
- Stats.Aggregated.Storage += (newStats.Storage - oldStats.Storage);
- Stats.Aggregated.ReadThroughput += (newStats.ReadThroughput - oldStats.ReadThroughput);
- Stats.Aggregated.WriteThroughput += (newStats.WriteThroughput - oldStats.WriteThroughput);
- Stats.Aggregated.ReadIops += (newStats.ReadIops - oldStats.ReadIops);
- Stats.Aggregated.WriteIops += (newStats.WriteIops - oldStats.WriteIops);
-
- newStats.SaveCpuUsageHistory(oldStats);
+ Aggregated.SetCurrentRawCpuUsage(newAggregatedCpuUsage, now);
+ Aggregated.Memory += (newStats.Memory - oldStats.Memory);
+ Aggregated.Network += (newStats.Network - oldStats.Network);
+ Aggregated.Storage += (newStats.Storage - oldStats.Storage);
+ Aggregated.ReadThroughput += (newStats.ReadThroughput - oldStats.ReadThroughput);
+ Aggregated.WriteThroughput += (newStats.WriteThroughput - oldStats.WriteThroughput);
+ Aggregated.ReadIops += (newStats.ReadIops - oldStats.ReadIops);
+ Aggregated.WriteIops += (newStats.WriteIops - oldStats.WriteIops);
+
+ auto topUsage = oldStats.TopUsage.Update(newStats.TopUsage);
oldStats = newStats;
- Stats.PartitionStatsUpdated++;
+ oldStats.TopUsage = std::move(topUsage);
+ PartitionStatsUpdated++;
// Rescan stats for aggregations only once in a while
- if (Stats.PartitionStatsUpdated >= Stats.PartitionStats.size()) {
- Stats.PartitionStatsUpdated = 0;
- Stats.Aggregated.TxCompleteLag = TDuration();
- for (const auto& ps : Stats.PartitionStats) {
- Stats.Aggregated.TxCompleteLag = Max(Stats.Aggregated.TxCompleteLag, ps.second.TxCompleteLag);
+ if (PartitionStatsUpdated >= PartitionStats.size()) {
+ PartitionStatsUpdated = 0;
+ Aggregated.TxCompleteLag = TDuration();
+ for (const auto& ps : PartitionStats) {
+ Aggregated.TxCompleteLag = Max(Aggregated.TxCompleteLag, ps.second.TxCompleteLag);
}
}
}
diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h
index 1d649daa7a..09a7a9d096 100644
--- a/ydb/core/tx/schemeshard/schemeshard_info_types.h
+++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h
@@ -185,6 +185,121 @@ struct TPartitionConfigMerger {
};
+struct TPartitionStats {
+ // Latest timestamps when CPU usage exceeded 2%, 5%, 10%, 20%, 30%
+ struct TTopUsage {
+ TInstant Last2PercentLoad;
+ TInstant Last5PercentLoad;
+ TInstant Last10PercentLoad;
+ TInstant Last20PercentLoad;
+ TInstant Last30PercentLoad;
+
+ const TTopUsage& Update(const TTopUsage& usage) {
+ Last2PercentLoad = std::max(Last2PercentLoad, usage.Last2PercentLoad);
+ Last5PercentLoad = std::max(Last5PercentLoad, usage.Last5PercentLoad);
+ Last10PercentLoad = std::max(Last10PercentLoad, usage.Last10PercentLoad);
+ Last20PercentLoad = std::max(Last20PercentLoad, usage.Last20PercentLoad);
+ Last30PercentLoad = std::max(Last30PercentLoad, usage.Last30PercentLoad);
+ return *this;
+ }
+ };
+
+ TMessageSeqNo SeqNo;
+
+ ui64 RowCount = 0;
+ ui64 DataSize = 0;
+ ui64 IndexSize = 0;
+
+ TInstant LastAccessTime;
+ TInstant LastUpdateTime;
+ TDuration TxCompleteLag;
+
+ ui64 ImmediateTxCompleted = 0;
+ ui64 PlannedTxCompleted = 0;
+ ui64 TxRejectedByOverload = 0;
+ ui64 TxRejectedBySpace = 0;
+ ui64 InFlightTxCount = 0;
+
+ ui64 RowUpdates = 0;
+ ui64 RowDeletes = 0;
+ ui64 RowReads = 0;
+ ui64 RangeReads = 0;
+ ui64 RangeReadRows = 0;
+
+ ui64 Memory = 0;
+ ui64 Network = 0;
+ ui64 Storage = 0;
+ ui64 ReadThroughput = 0;
+ ui64 WriteThroughput = 0;
+ ui64 ReadIops = 0;
+ ui64 WriteIops = 0;
+
+ THashSet<TTabletId> PartOwners;
+ ui64 PartCount = 0;
+ ui64 SearchHeight = 0;
+ ui64 FullCompactionTs = 0;
+ ui64 MemDataSize = 0;
+ ui32 ShardState = NKikimrTxDataShard::Unknown;
+
+ // True when PartOwners has parts from other tablets
+ bool HasBorrowedData = false;
+
+ // True when lent parts to other tablets
+ bool HasLoanedData = false;
+
+ // Tablet actor started at
+ TInstant StartTime;
+
+ TTopUsage TopUsage;
+
+ void SetCurrentRawCpuUsage(ui64 rawCpuUsage, TInstant now) {
+ CPU = rawCpuUsage;
+ float percent = rawCpuUsage * 0.000001 * 100;
+ if (percent >= 2)
+ TopUsage.Last2PercentLoad = now;
+ if (percent >= 5)
+ TopUsage.Last5PercentLoad = now;
+ if (percent >= 10)
+ TopUsage.Last10PercentLoad = now;
+ if (percent >= 20)
+ TopUsage.Last20PercentLoad = now;
+ if (percent >= 30)
+ TopUsage.Last30PercentLoad = now;
+ }
+
+ ui64 GetCurrentRawCpuUsage() const {
+ return CPU;
+ }
+
+ float GetLatestMaxCpuUsagePercent(TInstant since) const {
+ // TODO: fix the case when stats were not collected yet
+
+ if (TopUsage.Last30PercentLoad > since)
+ return 40;
+ if (TopUsage.Last20PercentLoad > since)
+ return 30;
+ if (TopUsage.Last10PercentLoad > since)
+ return 20;
+ if (TopUsage.Last5PercentLoad > since)
+ return 10;
+ if (TopUsage.Last2PercentLoad > since)
+ return 5;
+
+ return 2;
+ }
+
+private:
+ ui64 CPU = 0;
+};
+
+struct TAggregatedStats {
+ TPartitionStats Aggregated;
+ THashMap<TShardIdx, TPartitionStats> PartitionStats;
+ size_t PartitionStatsUpdated = 0;
+
+ void UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats);
+};
+
struct TSubDomainInfo;
struct TTableInfo : public TSimpleRefCount<TTableInfo> {
@@ -228,114 +343,6 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> {
ui64 DataTotalSize;
};
- struct TPartitionStats {
- TMessageSeqNo SeqNo;
-
- ui64 RowCount = 0;
- ui64 DataSize = 0;
- ui64 IndexSize = 0;
-
- TInstant LastAccessTime;
- TInstant LastUpdateTime;
- TDuration TxCompleteLag;
-
- ui64 ImmediateTxCompleted = 0;
- ui64 PlannedTxCompleted = 0;
- ui64 TxRejectedByOverload = 0;
- ui64 TxRejectedBySpace = 0;
- ui64 InFlightTxCount = 0;
-
- ui64 RowUpdates = 0;
- ui64 RowDeletes = 0;
- ui64 RowReads = 0;
- ui64 RangeReads = 0;
- ui64 RangeReadRows = 0;
-
- ui64 Memory = 0;
- ui64 Network = 0;
- ui64 Storage = 0;
- ui64 ReadThroughput = 0;
- ui64 WriteThroughput = 0;
- ui64 ReadIops = 0;
- ui64 WriteIops = 0;
-
- THashSet<TTabletId> PartOwners;
- ui64 PartCount = 0;
- ui64 SearchHeight = 0;
- ui64 FullCompactionTs = 0;
- ui64 MemDataSize = 0;
- ui32 ShardState = NKikimrTxDataShard::Unknown;
-
- // True when PartOwners has parts from other tablets
- bool HasBorrowedData = false;
-
- // True when lent parts to other tablets
- bool HasLoanedData = false;
-
- // Tablet actor started at
- TInstant StartTime;
-
- void SetCurrentRawCpuUsage(ui64 rawCpuUsage, TInstant now) {
- CPU = rawCpuUsage;
- float percent = rawCpuUsage * 0.000001 * 100;
- if (percent >= 2)
- Last2PercentLoad = now;
- if (percent >= 5)
- Last5PercentLoad = now;
- if (percent >= 10)
- Last10PercentLoad = now;
- if (percent >= 20)
- Last20PercentLoad = now;
- if (percent >= 30)
- Last30PercentLoad = now;
- }
-
- void SaveCpuUsageHistory(const TPartitionStats& oldStats) {
- Last2PercentLoad = std::max(Last2PercentLoad, oldStats.Last2PercentLoad);
- Last5PercentLoad = std::max(Last5PercentLoad, oldStats.Last5PercentLoad);
- Last10PercentLoad = std::max(Last10PercentLoad, oldStats.Last10PercentLoad);
- Last20PercentLoad = std::max(Last20PercentLoad, oldStats.Last20PercentLoad);
- Last30PercentLoad = std::max(Last30PercentLoad, oldStats.Last30PercentLoad);
- }
-
- ui64 GetCurrentRawCpuUsage() const {
- return CPU;
- }
-
- float GetLatestMaxCpuUsagePercent(TInstant since) const {
- // TODO: fix the case when stats were not collected yet
-
- if (Last30PercentLoad > since)
- return 40;
- if (Last20PercentLoad > since)
- return 30;
- if (Last10PercentLoad > since)
- return 20;
- if (Last5PercentLoad > since)
- return 10;
- if (Last2PercentLoad > since)
- return 5;
-
- return 2;
- }
-
- private:
- ui64 CPU = 0;
-
- // Latest timestamps when CPU usage exceeded 2%, 5%, 10%, 20%, 30%
- TInstant Last2PercentLoad;
- TInstant Last5PercentLoad;
- TInstant Last10PercentLoad;
- TInstant Last20PercentLoad;
- TInstant Last30PercentLoad;
- };
-
- struct TStats {
- TPartitionStats Aggregated;
- THashMap<TShardIdx, TPartitionStats> PartitionStats;
- size_t PartitionStatsUpdated = 0;
- };
-
struct TAlterTableInfo : TSimpleRefCount<TAlterTableInfo> {
using TPtr = TIntrusivePtr<TAlterTableInfo>;
@@ -454,7 +461,7 @@ private:
THashMap<TOperationId, TVector<TShardIdx>> ShardsInSplitMergeByOpId;
THashMap<TShardIdx, TOperationId> ShardsInSplitMergeByShards;
ui64 ExpectedPartitionCount = 0; // number of partitions after all in-flight splits/merges are finished
- TStats Stats;
+ TAggregatedStats Stats;
bool ShardsStatsDetached = false;
TPartitionsVec::iterator FindPartition(const TShardIdx& shardIdx) {
@@ -545,7 +552,7 @@ public:
return Partitions;
}
- const TStats& GetStats() const {
+ const TAggregatedStats& GetStats() const {
return Stats;
}
@@ -556,7 +563,7 @@ public:
ShardsStatsDetached = true;
}
- void UpdateShardStats(TShardIdx datashardIdx, TPartitionStats& newStats);
+ void UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats);
void RegisterSplitMegreOp(TOperationId txId, const TTxState& txState);
@@ -840,17 +847,25 @@ struct TOlapStoreInfo : TSimpleRefCount<TOlapStoreInfo> {
TVector<TShardIdx> ColumnShards;
THashMap<ui32, TOlapStoreSchemaPreset> SchemaPresets;
- //THashMap<ui32, TOlapStoreTtlSettingsPreset> TtlSettingsPresets;
THashMap<TString, ui32> SchemaPresetByName;
- THashMap<TString, ui32> TtlSettingsPresetByName;
THashSet<TPathId> ColumnTables;
THashSet<TPathId> ColumnTablesUnderOperation;
+ TAggregatedStats Stats;
TOlapStoreInfo() = default;
TOlapStoreInfo(ui64 alterVersion, NKikimrSchemeOp::TColumnStoreDescription&& description,
NKikimrSchemeOp::TColumnStoreSharding&& sharding,
TMaybe<NKikimrSchemeOp::TAlterColumnStore>&& alterBody = Nothing());
+
+ const TAggregatedStats& GetStats() const {
+ return Stats;
+ }
+
+ void UpdateShardStats(TShardIdx shardIdx, const TPartitionStats& newStats) {
+ Stats.PartitionStats[shardIdx]; // insert if none
+ Stats.UpdateShardStats(shardIdx, newStats);
+ }
};
struct TColumnTableInfo : TSimpleRefCount<TColumnTableInfo> {
@@ -1695,7 +1710,7 @@ struct TSubDomainInfo: TSimpleRefCount<TSubDomainInfo> {
CoordinatorSelector = new TCoordinators(ProcessingParams);
}
- void AggrDiskSpaceUsage(IQuotaCounters* counters, const TTableInfo::TPartitionStats& newAggr, const TTableInfo::TPartitionStats& oldAggr = TTableInfo::TPartitionStats()) {
+ void AggrDiskSpaceUsage(IQuotaCounters* counters, const TPartitionStats& newAggr, const TPartitionStats& oldAggr = {}) {
DiskSpaceUsage.Tables.DataSize += (newAggr.DataSize - oldAggr.DataSize);
counters->ChangeDiskSpaceTablesDataBytes(newAggr.DataSize - oldAggr.DataSize);
diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
index cb6d72a5ed..84e3260de9 100644
--- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp
@@ -9,7 +9,7 @@
namespace NKikimr {
namespace NSchemeShard {
-static void FillTableStats(NKikimrTableStats::TTableStats* stats, const TTableInfo::TPartitionStats& tableStats) {
+static void FillTableStats(NKikimrTableStats::TTableStats* stats, const TPartitionStats& tableStats) {
stats->SetRowCount(tableStats.RowCount);
stats->SetDataSize(tableStats.DataSize);
stats->SetIndexSize(tableStats.IndexSize);
@@ -31,7 +31,7 @@ static void FillTableStats(NKikimrTableStats::TTableStats* stats, const TTableIn
stats->SetPartCount(tableStats.PartCount);
}
-static void FillTableMetrics(NKikimrTabletBase::TMetrics* metrics, const TTableInfo::TPartitionStats& tableStats) {
+static void FillTableMetrics(NKikimrTabletBase::TMetrics* metrics, const TPartitionStats& tableStats) {
metrics->SetCPU(tableStats.GetCurrentRawCpuUsage());
metrics->SetMemory(tableStats.Memory);
metrics->SetNetwork(tableStats.Network);
@@ -42,6 +42,11 @@ static void FillTableMetrics(NKikimrTabletBase::TMetrics* metrics, const TTableI
metrics->SetWriteIops(tableStats.WriteIops);
}
+static void FillAggregatedStats(NKikimrSchemeOp::TPathDescription& pathDescription, const TAggregatedStats& stats) {
+ FillTableStats(pathDescription.MutableTableStats(), stats.Aggregated);
+ FillTableMetrics(pathDescription.MutableTabletMetrics(), stats.Aggregated);
+}
+
void TPathDescriber::FillPathDescr(NKikimrSchemeOp::TDirEntry* descr, TPathElement::TPtr pathEl, TPathElement::EPathSubType subType) {
FillChildDescr(descr, pathEl);
@@ -233,17 +238,7 @@ void TPathDescriber::DescribeTable(const TActorContext& ctx, TPathId pathId, TPa
}
}
- const auto& tableStats(tableInfo->GetStats().Aggregated);
-
- {
- auto* stats = Result->Record.MutablePathDescription()->MutableTableStats();
- FillTableStats(stats, tableStats);
- }
-
- {
- auto* metrics = Result->Record.MutablePathDescription()->MutableTabletMetrics();
- FillTableMetrics(metrics, tableStats);
- }
+ FillAggregatedStats(*Result->Record.MutablePathDescription(), tableInfo->GetStats());
if (returnPartitionStats) {
NKikimrSchemeOp::TPathDescription& pathDescription = *Result->Record.MutablePathDescription();
@@ -362,6 +357,8 @@ void TPathDescriber::DescribeOlapStore(TPathId pathId, TPathElement::TPtr pathEl
Y_VERIFY(shardInfo, "ColumnShard not found");
description->AddColumnShards(shardInfo->TabletID.GetValue());
}
+
+ FillAggregatedStats(*Result->Record.MutablePathDescription(), storeInfo->GetStats());
}
void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr pathEl) {
diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp
index 4da2c1e22b..6cec84ff8c 100644
--- a/ydb/core/tx/schemeshard/ut_compaction.cpp
+++ b/ydb/core/tx/schemeshard/ut_compaction.cpp
@@ -19,7 +19,7 @@ using TTableInfoMap = THashMap<TString, NKikimrTxDataShard::TEvGetInfoResponse::
TShardCompactionInfo MakeCompactionInfo(ui64 idx, ui64 ts, ui64 sh = 0, ui64 d = 0) {
TShardIdx shardId = TShardIdx(1, idx);
- TTableInfo::TPartitionStats stats;
+ TPartitionStats stats;
stats.FullCompactionTs = ts;
stats.SearchHeight = sh;
stats.RowDeletes = d;
@@ -1090,7 +1090,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
config.SearchHeightThreshold = 0;
config.RowDeletesThreshold = 0;
- TTableInfo::TPartitionStats stats; // all zeros
+ TPartitionStats stats; // all zeros
TCompactionQueueImpl queue(config);
UNIT_ASSERT(!queue.Enqueue({ShardIdx, stats}));
@@ -1102,7 +1102,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
config.SearchHeightThreshold = 0;
config.RowDeletesThreshold = 0;
- TTableInfo::TPartitionStats stats;
+ TPartitionStats stats;
stats.RowCount = 10;
stats.RowDeletes = 100;
stats.SearchHeight = 1; // below threshold
@@ -1119,7 +1119,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
config.RowDeletesThreshold = 0;
config.CompactSinglePartedShards = true; // turn on
- TTableInfo::TPartitionStats stats;
+ TPartitionStats stats;
stats.RowCount = 10;
stats.RowDeletes = 100;
stats.SearchHeight = 1; // below threshold
@@ -1135,7 +1135,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
config.SearchHeightThreshold = 10;
config.RowDeletesThreshold = 0;
- TTableInfo::TPartitionStats stats;
+ TPartitionStats stats;
stats.RowCount = 10;
stats.RowDeletes = 100;
stats.SearchHeight = 1; // below threshold
@@ -1152,7 +1152,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
config.SearchHeightThreshold = 10;
config.RowDeletesThreshold = 10;
- TTableInfo::TPartitionStats stats;
+ TPartitionStats stats;
stats.RowCount = 10;
stats.RowDeletes = 100;
stats.SearchHeight = 3;
@@ -1170,7 +1170,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
config.SearchHeightThreshold = 10;
config.RowDeletesThreshold = 10;
- TTableInfo::TPartitionStats stats;
+ TPartitionStats stats;
stats.RowCount = 10;
stats.RowDeletes = 1;
stats.SearchHeight = 20;
@@ -1189,7 +1189,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
config.RowDeletesThreshold = 10;
config.RowCountThreshold = 1;
- TTableInfo::TPartitionStats stats;
+ TPartitionStats stats;
stats.RowCount = 0;
stats.RowDeletes = 1;
stats.SearchHeight = 20;
@@ -1233,7 +1233,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 8);
// remove from LastCompaction, active queue should not change
- UNIT_ASSERT(queue.Remove({TShardIdx(1, 0), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 0), TPartitionStats()}));
UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 7UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 2UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 2UL);
@@ -1249,7 +1249,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 2);
// remove1 from BySearchHeight (active queue should not change)
- UNIT_ASSERT(queue.Remove({TShardIdx(1, 4), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 4), TPartitionStats()}));
UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 5UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 2UL);
@@ -1257,7 +1257,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
// remove2 from BySearchHeight, ByRowDeletes is active now
- UNIT_ASSERT(queue.Remove({TShardIdx(1, 5), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 5), TPartitionStats()}));
UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 2UL);
@@ -1265,7 +1265,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 2);
// remove1 from ByRowDeletes
- UNIT_ASSERT(queue.Remove({TShardIdx(1, 6), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 6), TPartitionStats()}));
UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL);
@@ -1273,7 +1273,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
// remove2 from ByRowDeletes
- UNIT_ASSERT(queue.Remove({TShardIdx(1, 7), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 7), TPartitionStats()}));
UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 2UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
@@ -1281,7 +1281,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 2);
// remove1 from LastCompaction
- UNIT_ASSERT(queue.Remove({TShardIdx(1, 2), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 2), TPartitionStats()}));
UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 1UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
@@ -1289,7 +1289,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
// remove2 from LastCompaction
- UNIT_ASSERT(queue.Remove({TShardIdx(1, 3), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 3), TPartitionStats()}));
UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 0UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
@@ -1319,7 +1319,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
// remove from BySearchHeight
- UNIT_ASSERT(queue.Remove({TShardIdx(1, 4), TTableInfo::TPartitionStats()}));
+ UNIT_ASSERT(queue.Remove({TShardIdx(1, 4), TPartitionStats()}));
UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 2UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL);
UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL);
@@ -1387,7 +1387,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::BySearchHeight);
UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1);
- TTableInfo::TPartitionStats statsBelow;
+ TPartitionStats statsBelow;
statsBelow.RowDeletes = 100;
statsBelow.FullCompactionTs = 4;
statsBelow.SearchHeight = 1; // below threshold
@@ -1415,7 +1415,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
// Now check transition from BySearchHeight to LastCompaction, i.e. empty RowDeletes
// step1: populate w with item
- TTableInfo::TPartitionStats statsSh;
+ TPartitionStats statsSh;
statsSh.FullCompactionTs = 4;
statsSh.SearchHeight = 100; // above threshold
statsSh.RowDeletes = 1; // below threshold
@@ -1447,7 +1447,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
// check ByLastCompaction -> ByRowDeletes, i.e. empty BySearchHeight
// step1: populate ByRowDeletes with item
- TTableInfo::TPartitionStats statsDel;
+ TPartitionStats statsDel;
statsDel.FullCompactionTs = 5;
statsDel.SearchHeight = 1; // below threshold
statsDel.RowDeletes = 100; // above threshold
@@ -1475,7 +1475,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) {
config.SearchHeightThreshold = 10;
config.RowDeletesThreshold = 10;
- TTableInfo::TPartitionStats stats;
+ TPartitionStats stats;
stats.RowCount = 10;
stats.RowDeletes = 1000;
stats.SearchHeight = 20;
diff --git a/ydb/core/tx/schemeshard/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap.cpp
index 53e0c22249..e5899f5272 100644
--- a/ydb/core/tx/schemeshard/ut_olap.cpp
+++ b/ydb/core/tx/schemeshard/ut_olap.cpp
@@ -1,10 +1,131 @@
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
+#include <ydb/core/tx/columnshard/columnshard.h>
+#include <ydb/core/formats/arrow_helpers.h>
+#include <ydb/core/formats/arrow_batch_builder.h>
using namespace NKikimr::NSchemeShard;
using namespace NKikimr;
using namespace NKikimrSchemeOp;
using namespace NSchemeShardUT_Private;
+namespace NKikimr {
+namespace {
+
+namespace NTypeIds = NScheme::NTypeIds;
+using TTypeId = NScheme::TTypeId;
+
+static const TString defaultStoreSchema = R"(
+ Name: "OlapStore"
+ ColumnShardCount: 1
+ SchemaPresets {
+ Name: "default"
+ Schema {
+ Columns { Name: "timestamp" Type: "Timestamp" }
+ Columns { Name: "data" Type: "Utf8" }
+ KeyColumnNames: "timestamp"
+ Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ }
+ }
+)";
+
+static const TVector<std::pair<TString, TTypeId>> defaultYdbSchema = {
+ {"timestamp", NTypeIds::Timestamp },
+ {"data", NTypeIds::Utf8 }
+};
+
+TString MakeTestBlob(std::pair<ui64, ui64> range) {
+ TString err;
+ NArrow::TArrowBatchBuilder batchBuilder(arrow::Compression::LZ4_FRAME);
+ batchBuilder.Start(defaultYdbSchema, 0, 0, err);
+
+ TString str;
+ TVector<TTypeId> types = {NTypeIds::Timestamp, NTypeIds::Utf8};
+
+ for (size_t i = range.first; i < range.second; ++i) {
+ str = ToString(i);
+
+ TVector<TCell> cells;
+ cells.push_back(TCell::Make<ui64>(i));
+ cells.push_back(TCell(str.data(), str.size()));
+
+ NKikimr::TDbTupleRef unused;
+ batchBuilder.AddRow(unused, NKikimr::TDbTupleRef(types.data(), cells.data(), 2));
+ }
+
+ auto batch = batchBuilder.FlushBatch(true);
+ UNIT_ASSERT(batch);
+ auto status = batch->ValidateFull();
+ UNIT_ASSERT(status.ok());
+
+ return batchBuilder.Finish();
+}
+
+static constexpr ui64 txInitiator = 42; // 0 means LongTx, we need another value here
+
+void WriteData(TTestBasicRuntime& runtime, TActorId sender, ui64 tabletId, ui64 pathId, ui64 writeId, TString data) {
+ const TString dedupId = ToString(writeId);
+
+ auto evWrite = std::make_unique<TEvColumnShard::TEvWrite>(sender, txInitiator, writeId, pathId, dedupId, data);
+
+ ForwardToTablet(runtime, tabletId, sender, evWrite.release());
+ TAutoPtr<IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle);
+ UNIT_ASSERT(event);
+
+ auto& resWrite = Proto(event);
+ UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), tabletId);
+ UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), txInitiator);
+ UNIT_ASSERT_EQUAL(resWrite.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
+}
+
+void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 tabletId, ui64 txId, const TVector<ui64>& writeIds) {
+ NKikimrTxColumnShard::ETransactionKind txKind = NKikimrTxColumnShard::ETransactionKind::TX_KIND_COMMIT;
+ TString txBody;
+ {
+ NKikimrTxColumnShard::TCommitTxBody proto;
+ proto.SetTxInitiator(txInitiator);
+ for (ui64 id : writeIds) {
+ proto.AddWriteIds(id);
+ }
+
+ Y_PROTOBUF_SUPPRESS_NODISCARD proto.SerializeToString(&txBody);
+ }
+
+ ForwardToTablet(runtime, tabletId, sender,
+ new TEvColumnShard::TEvProposeTransaction(txKind, sender, txId, txBody));
+ TAutoPtr<IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle);
+ UNIT_ASSERT(event);
+
+ auto& res = Proto(event);
+ UNIT_ASSERT_EQUAL(res.GetTxKind(), txKind);
+ UNIT_ASSERT_EQUAL(res.GetTxId(), txId);
+ UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::PREPARED);
+}
+
+void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 tabletId, ui64 planStep, const TSet<ui64>& txIds) {
+ auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(planStep, txInitiator, tabletId);
+ for (ui64 txId : txIds) {
+ auto tx = plan->Record.AddTransactions();
+ tx->SetTxId(txId);
+ ActorIdToProto(sender, tx->MutableAckTo());
+ }
+
+ ForwardToTablet(runtime, tabletId, sender, plan.release());
+ TAutoPtr<IEventHandle> handle;
+
+ for (ui32 i = 0; i < txIds.size(); ++i) {
+ auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle);
+ UNIT_ASSERT(event);
+
+ auto& res = Proto(event);
+ UNIT_ASSERT(txIds.count(res.GetTxId()));
+ UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ }
+}
+
+}}
+
Y_UNIT_TEST_SUITE(TOlap) {
Y_UNIT_TEST(CreateStore) {
@@ -12,19 +133,7 @@ Y_UNIT_TEST_SUITE(TOlap) {
TTestEnv env(runtime, TTestEnvOptions().EnableOlapSchemaOperations(true));
ui64 txId = 100;
- TString olapSchema = R"(
- Name: "OlapStore"
- ColumnShardCount: 1
- SchemaPresets {
- Name: "default"
- Schema {
- Columns { Name: "timestamp" Type: "Timestamp" }
- Columns { Name: "data" Type: "Utf8" }
- KeyColumnNames: "timestamp"
- Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
- }
- }
- )";
+ const TString& olapSchema = defaultStoreSchema;
TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema);
env.TestWaitNotification(runtime, txId);
@@ -78,19 +187,7 @@ Y_UNIT_TEST_SUITE(TOlap) {
TTestEnv env(runtime, TTestEnvOptions().EnableOlapSchemaOperations(true));
ui64 txId = 100;
- TString olapSchema = R"(
- Name: "OlapStore"
- ColumnShardCount: 1
- SchemaPresets {
- Name: "default"
- Schema {
- Columns { Name: "timestamp" Type: "Timestamp" }
- Columns { Name: "data" Type: "Utf8" }
- KeyColumnNames: "timestamp"
- Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
- }
- }
- )";
+ const TString& olapSchema = defaultStoreSchema;
TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema);
env.TestWaitNotification(runtime, txId);
@@ -220,19 +317,9 @@ Y_UNIT_TEST_SUITE(TOlap) {
TTestEnv env(runtime, TTestEnvOptions().EnableOlapSchemaOperations(true));
ui64 txId = 100;
- TestCreateOlapStore(runtime, ++txId, "/MyRoot", R"(
- Name: "OlapStore"
- ColumnShardCount: 1
- SchemaPresets {
- Name: "default"
- Schema {
- Columns { Name: "timestamp" Type: "Timestamp" }
- Columns { Name: "data" Type: "Utf8" }
- KeyColumnNames: "timestamp"
- Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
- }
- }
- )");
+ const TString& olapSchema = defaultStoreSchema;
+
+ TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema);
env.TestWaitNotification(runtime, txId);
TestCreateColumnTable(runtime, ++txId, "/MyRoot/OlapStore", R"(
@@ -251,19 +338,7 @@ Y_UNIT_TEST_SUITE(TOlap) {
TTestEnv env(runtime, TTestEnvOptions().EnableOlapSchemaOperations(true));
ui64 txId = 100;
- TString olapSchema = R"(
- Name: "OlapStore"
- ColumnShardCount: 1
- SchemaPresets {
- Name: "default"
- Schema {
- Columns { Name: "timestamp" Type: "Timestamp" }
- Columns { Name: "data" Type: "Utf8" }
- KeyColumnNames: "timestamp"
- Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
- }
- }
- )";
+ const TString& olapSchema = defaultStoreSchema;
TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema);
env.TestWaitNotification(runtime, txId);
@@ -396,19 +471,7 @@ Y_UNIT_TEST_SUITE(TOlap) {
TTestEnv env(runtime, TTestEnvOptions().EnableOlapSchemaOperations(true));
ui64 txId = 100;
- TString olapSchema = R"(
- Name: "OlapStore"
- ColumnShardCount: 1
- SchemaPresets {
- Name: "default"
- Schema {
- Columns { Name: "timestamp" Type: "Timestamp" }
- Columns { Name: "data" Type: "Utf8" }
- KeyColumnNames: "timestamp"
- Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
- }
- }
- )";
+ const TString& olapSchema = defaultStoreSchema;
TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema);
env.TestWaitNotification(runtime, txId);
@@ -543,4 +606,102 @@ Y_UNIT_TEST_SUITE(TOlap) {
// negatives for store: disallow alters
// negatives for table: wrong tiers count, wrong tiers, wrong eviction column, wrong eviction values,
// different TTL columns in tiers
+
+ Y_UNIT_TEST(StoreStats) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime, TTestEnvOptions().EnableOlapSchemaOperations(true));
+ runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
+ runtime.UpdateCurrentTime(TInstant::Now() - TDuration::Seconds(600));
+
+ // disable stats batching
+ auto& appData = runtime.GetAppData();
+ appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0);
+ appData.SchemeShardConfig.SetStatsMaxBatchSize(0);
+
+ // apply config via reboot
+ TActorId sender = runtime.AllocateEdgeActor();
+ GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender);
+
+ ui64 txId = 100;
+
+ const TString& olapSchema = defaultStoreSchema;
+
+ TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema);
+ env.TestWaitNotification(runtime, txId);
+
+ TestLs(runtime, "/MyRoot/OlapStore", false, NLs::PathExist);
+ TestLsPathId(runtime, 2, NLs::PathStringEqual("/MyRoot/OlapStore"));
+
+ TString tableSchema = R"(
+ Name: "ColumnTable"
+ )";
+
+ TestCreateColumnTable(runtime, ++txId, "/MyRoot/OlapStore", tableSchema);
+ env.TestWaitNotification(runtime, txId);
+
+ ui64 pathId = 0;
+ ui64 shardId = 0;
+ ui64 planStep = 0;
+ auto checkFn = [&](const NKikimrScheme::TEvDescribeSchemeResult& record) {
+ auto& self = record.GetPathDescription().GetSelf();
+ pathId = self.GetPathId();
+ txId = self.GetCreateTxId() + 1;
+ planStep = self.GetCreateStep();
+ auto& sharding = record.GetPathDescription().GetColumnTableDescription().GetSharding();
+ UNIT_ASSERT_VALUES_EQUAL(sharding.ColumnShardsSize(), 1);
+ shardId = sharding.GetColumnShards()[0];
+ UNIT_ASSERT_VALUES_EQUAL(record.GetPath(), "/MyRoot/OlapStore/ColumnTable");
+ };
+
+ TestLsPathId(runtime, 3, checkFn);
+ UNIT_ASSERT(shardId);
+ UNIT_ASSERT(pathId);
+ UNIT_ASSERT(planStep);
+
+ ui32 rowsInBatch = 100000;
+
+ { // Write data directly into shard
+ TActorId sender = runtime.AllocateEdgeActor();
+ TString data = MakeTestBlob({0, rowsInBatch});
+
+ ui64 writeId = 0;
+
+ TSet<ui64> txIds;
+ for (ui32 i = 0; i < 10; ++i) {
+ WriteData(runtime, sender, shardId, pathId, ++writeId, data);
+ ProposeCommit(runtime, sender, shardId, ++txId, {writeId});
+ txIds.insert(txId);
+ }
+
+ PlanCommit(runtime, sender, shardId, ++planStep, txIds);
+
+ // emulate timeout
+ runtime.UpdateCurrentTime(TInstant::Now());
+
+ // trigger periodic stats at shard (after timeout)
+ WriteData(runtime, sender, shardId, pathId, ++writeId, data);
+ ProposeCommit(runtime, sender, shardId, ++txId, {writeId});
+ txIds = {txId};
+ PlanCommit(runtime, sender, shardId, ++planStep, txIds);
+ }
+
+ auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/OlapStore", true, true);
+ auto& tabletStats = description.GetPathDescription().GetTableStats();
+
+ UNIT_ASSERT_GT(tabletStats.GetRowCount(), 0);
+ UNIT_ASSERT_GT(tabletStats.GetDataSize(), 0);
+#if 0
+ TestDropColumnTable(runtime, ++txId, "/MyRoot/OlapStore", "ColumnTable");
+ env.TestWaitNotification(runtime, txId);
+
+ TestLs(runtime, "/MyRoot/OlapStore/ColumnTable", false, NLs::PathNotExist);
+ TestLsPathId(runtime, 3, NLs::PathStringEqual(""));
+
+ TestDropOlapStore(runtime, ++txId, "/MyRoot", "OlapStore");
+ env.TestWaitNotification(runtime, txId);
+
+ TestLs(runtime, "/MyRoot/OlapStore", false, NLs::PathNotExist);
+ TestLsPathId(runtime, 2, NLs::PathStringEqual(""));
+#endif
+ }
}
diff --git a/ydb/core/tx/schemeshard/ut_olap/ya.make b/ydb/core/tx/schemeshard/ut_olap/ya.make
index e093cd5b21..d4dbdc98d0 100644
--- a/ydb/core/tx/schemeshard/ut_olap/ya.make
+++ b/ydb/core/tx/schemeshard/ut_olap/ya.make
@@ -23,6 +23,7 @@ PEERDIR(
library/cpp/regex/pcre
library/cpp/svnversion
ydb/core/testlib
+ ydb/core/formats
ydb/core/tx
ydb/core/tx/columnshard
ydb/core/tx/schemeshard/ut_helpers