diff options
author | Artem Zuikov <chertus@gmail.com> | 2022-07-05 21:00:34 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2022-07-05 21:00:34 +0300 |
commit | 8ce18c6635e7cee5d50bc9151b3b37b84da887c4 (patch) | |
tree | d8b560af713a43fbf433a468be1d3d518c335bed | |
parent | 5bf469f233cf0b3c464e5edb654c62e4b6617b96 (diff) | |
download | ydb-8ce18c6635e7cee5d50bc9151b3b37b84da887c4.tar.gz |
KIKIMR-14971 KIKIMR-15038: Backport overload and stats for ColumnShards into 22.2
REVIEW: 2661554
REVIEW: 2685632
x-ydb-stable-ref: 6581b1d410c8efb66c68d82e6d64c8f1728014c6
28 files changed, 808 insertions, 574 deletions
diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto index 57c2f4ecf1..22d0c8e41b 100644 --- a/ydb/core/protos/counters_columnshard.proto +++ b/ydb/core/protos/counters_columnshard.proto @@ -48,6 +48,7 @@ enum ESimpleCounters { COUNTER_EVICTED_ROWS = 38 [(CounterOpts) = {Name: "Index/EvictedRows"}]; COUNTER_EVICTED_BYTES = 39 [(CounterOpts) = {Name: "Index/EvictedBytes"}]; COUNTER_EVICTED_RAW_BYTES = 40 [(CounterOpts) = {Name: "Index/EvictedBytesRaw"}]; + COUNTER_WRITES_IN_FLY = 41 [(CounterOpts) = {Name: "WritesInFly"}]; } enum ECumulativeCounters { @@ -118,6 +119,11 @@ enum ECumulativeCounters { COUNTER_EVICTION_PORTIONS_WRITTEN = 64 [(CounterOpts) = {Name: "EvictionPortionsWritten"}]; COUNTER_EVICTION_BLOBS_WRITTEN = 65 [(CounterOpts) = {Name: "EvictionBlobsWritten"}]; COUNTER_EVICTION_BYTES_WRITTEN = 66 [(CounterOpts) = {Name: "EvictionBytesWritten"}]; + COUNTER_EXPORT_SUCCESS = 67 [(CounterOpts) = {Name: "ExportSuccess"}]; + COUNTER_EXPORT_FAIL = 68 [(CounterOpts) = {Name: "ExportFail"}]; + COUNTER_FORGET_SUCCESS = 69 [(CounterOpts) = {Name: "ForgetSuccess"}]; + COUNTER_FORGET_FAIL = 70 [(CounterOpts) = {Name: "ForgetFail"}]; + COUNTER_WRITE_OVERLOAD = 71 [(CounterOpts) = {Name: "WriteOverload"}]; } enum EPercentileCounters { diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index f79c256a5b..09f0b82dcd 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -156,6 +156,7 @@ message TCommitTxBody { message TInitShard { optional uint32 DataChannelCount = 1; + optional uint64 StorePathId = 2; } message TSchemaPresetVersionInfo { @@ -217,8 +218,9 @@ message TAlterStore { optional NKikimrSchemeOp.TAlterColumnStore AlterBody = 1; repeated uint32 DroppedSchemaPresets = 2; repeated NKikimrSchemeOp.TColumnTableSchemaPreset SchemaPresets = 3; - repeated uint32 DroppedTtlSettingsPresets = 4; - repeated NKikimrSchemeOp.TColumnTableTtlSettingsPreset TtlSettingsPresets = 5; + repeated uint32 Reserved_4 = 4; // DroppedTtlSettingsPresets, deprecated + repeated NKikimrSchemeOp.TColumnTableTtlSettingsPreset Reserved_5 = 5; // TtlSettingsPresets, deprecated + optional uint64 StorePathId = 6; } message TSchemaSeqNo { diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 0db4b3cc76..32219f0604 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -11,17 +11,6 @@ IActor* CreateColumnShard(const TActorId& tablet, TTabletStorageInfo* info) { namespace NKikimr::NColumnShard { -IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent); -IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent); -IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent); -IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, - const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, - TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline = TInstant::Max()); -IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, - const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, - TAutoPtr<TEvPrivate::TEvWriteIndex> ev, const TInstant& deadline = TInstant::Max()); -IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId); - void TColumnShard::BecomeBroken(const TActorContext& ctx) { Become(&TThis::StateBroken); @@ -62,6 +51,14 @@ void TColumnShard::Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& void TColumnShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&) { auto tabletId = ev->Get()->TabletId; + auto clientId = ev->Get()->ClientId; + + if (clientId == StatsReportPipe) { + if (ev->Get()->Status != NKikimrProto::OK) { + StatsReportPipe = TActorId(); + } + return; + } if (PipeClientCache->OnConnect(ev)) { LOG_S_DEBUG("Connected to tablet at " << TabletID() << ", remote " << tabletId); @@ -73,8 +70,15 @@ void TColumnShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TAc void TColumnShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext&) { auto tabletId = ev->Get()->TabletId; + auto clientId = ev->Get()->ClientId; + LOG_S_DEBUG("Client pipe reset at " << TabletID() << ", remote " << tabletId); + if (clientId == StatsReportPipe) { + StatsReportPipe = TActorId(); + return; + } + PipeClientCache->OnDisconnect(ev); } @@ -105,47 +109,6 @@ void TColumnShard::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorCo Execute(new TTxPlanStep(this, ev), ctx); } -// EvWrite -> WriteActor (attach BlobId without proto changes) -> EvWrite -void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { - OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels)); - - auto& data = Proto(ev->Get()).GetData(); - const ui64 tableId = ev->Get()->Record.GetTableId(); - bool error = data.empty() || data.size() > TLimits::MAX_BLOB_SIZE || !PrimaryIndex || !IsTableWritable(tableId) - || ev->Get()->PutStatus == NKikimrProto::ERROR; - - if (error) { - LOG_S_WARN("Write (fail) " << data.size() << " bytes at tablet " << TabletID()); - - ev->Get()->PutStatus = NKikimrProto::ERROR; - Execute(new TTxWrite(this, ev), ctx); - } else if (InsertTable->IsOverloaded(tableId)) { - LOG_S_INFO("Write (overload) " << data.size() << " bytes for table " << tableId << " at tablet " << TabletID()); - - ev->Get()->PutStatus = NKikimrProto::TRYLATER; - Execute(new TTxWrite(this, ev), ctx); - } else if (ev->Get()->BlobId.IsValid()) { - LOG_S_DEBUG("Write (record) " << data.size() << " bytes at tablet " << TabletID()); - - Execute(new TTxWrite(this, ev), ctx); - } else { - if (IsAnyChannelYellowStop()) { - LOG_S_ERROR("Write (out of disk space) at tablet " << TabletID()); - - IncCounter(COUNTER_OUT_OF_SPACE); - ev->Get()->PutStatus = NKikimrProto::TRYLATER; - Execute(new TTxWrite(this, ev), ctx); - } else { - LOG_S_DEBUG("Write (blob) " << data.size() << " bytes at tablet " << TabletID()); - - ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize; - - ctx.Register(CreateWriteActor(TabletID(), PrimaryIndex->GetIndexInfo(), ctx.SelfID, - BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); - } - } -} - void TColumnShard::Handle(TEvColumnShard::TEvRead::TPtr& ev, const TActorContext& ctx) { const auto* msg = ev->Get(); TRowVersion readVersion(msg->Record.GetPlanStep(), msg->Record.GetTxId()); @@ -253,6 +216,35 @@ void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorC ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup()); } +void TColumnShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext&) { + const auto* msg = ev->Get(); + Y_VERIFY(msg->TabletId == TabletID()); + MediatorTimeCastEntry = msg->Entry; + Y_VERIFY(MediatorTimeCastEntry); + LOG_S_DEBUG("Registered with mediator time cast at tablet " << TabletID()); + + RescheduleWaitingReads(); +} + +void TColumnShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext&) { + const auto* msg = ev->Get(); + Y_VERIFY(msg->TabletId == TabletID()); + + Y_VERIFY(MediatorTimeCastEntry); + ui64 step = MediatorTimeCastEntry->Get(TabletID()); + LOG_S_DEBUG("Notified by mediator time cast with PlanStep# " << step << " at tablet " << TabletID()); + + for (auto it = MediatorTimeCastWaitingSteps.begin(); it != MediatorTimeCastWaitingSteps.end();) { + if (step < *it) { + break; + } + it = MediatorTimeCastWaitingSteps.erase(it); + } + + RescheduleWaitingReads(); + EnqueueBackgroundActivities(true); +} + void TColumnShard::UpdateBlobMangerCounters() { const auto counters = BlobManager->GetCountersUpdate(); IncCounter(COUNTER_BLOB_MANAGER_GC_REQUESTS, counters.GcRequestsSent); @@ -319,6 +311,22 @@ void TColumnShard::UpdateIndexCounters() { SetCounter(COUNTER_EVICTED_RAW_BYTES, stats.Evicted.RawBytes); } +ui64 TColumnShard::MemoryUsage() const { + ui64 memory = + Tables.size() * sizeof(TTableInfo) + + PathsToDrop.size() * sizeof(ui64) + + Ttl.PathsCount() * sizeof(TTtl::TDescription) + + SchemaPresets.size() * sizeof(TSchemaPreset) + + AltersInFlight.size() * sizeof(TAlterMeta) + + CommitsInFlight.size() * sizeof(TCommitMeta) + + TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) + + TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData); + if (PrimaryIndex) { + memory += PrimaryIndex->MemoryUsage(); + } + return memory; +} + void TColumnShard::UpdateResourceMetrics(const TUsage& usage) { auto * metrics = Executor()->GetResourceMetrics(); if (!metrics) { @@ -333,19 +341,7 @@ void TColumnShard::UpdateResourceMetrics(const TUsage& usage) { TabletCounters->Simple()[COUNTER_SPLIT_COMPACTED_BYTES].Get() + TabletCounters->Simple()[COUNTER_INACTIVE_BYTES].Get(); - ui64 memory = - Tables.size() * sizeof(TTableInfo) + - PathsToDrop.size() * sizeof(ui64) + - Ttl.PathsCount() * sizeof(TTtl::TDescription) + - SchemaPresets.size() * sizeof(TSchemaPreset) + - //TtlSettingsPresets.size() * sizeof(TTtlSettingsPreset) + - AltersInFlight.size() * sizeof(TAlterMeta) + - CommitsInFlight.size() * sizeof(TCommitMeta) + - TabletCounters->Simple()[COUNTER_PREPARED_RECORDS].Get() * sizeof(NOlap::TInsertedData) + - TabletCounters->Simple()[COUNTER_COMMITTED_RECORDS].Get() * sizeof(NOlap::TInsertedData); - if (PrimaryIndex) { - memory += PrimaryIndex->MemoryUsage(); - } + ui64 memory = MemoryUsage(); const TActorContext& ctx = TlsActivationContext->AsActorContext(); TInstant now = AppData(ctx)->TimeProvider->Now(); @@ -360,4 +356,57 @@ void TColumnShard::UpdateResourceMetrics(const TUsage& usage) { metrics->TryUpdate(ctx); } +void TColumnShard::SendPeriodicStats(const TActorContext& ctx) { + if (!CurrentSchemeShardId || !StorePathId) { + LOG_S_DEBUG("Disabled periodic stats at tablet " << TabletID()); + return; + } + + TInstant now = AppData(ctx)->TimeProvider->Now(); + if (LastStatsReport + StatsReportInterval > now) { + return; + } + LastStatsReport = now; + + if (!StatsReportPipe) { + LOG_S_DEBUG("Create periodic stats pipe to " << CurrentSchemeShardId << " at tablet " << TabletID()); + NTabletPipe::TClientConfig clientConfig; + StatsReportPipe = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, CurrentSchemeShardId, clientConfig)); + } + + auto ev = std::make_unique<TEvDataShard::TEvPeriodicTableStats>(TabletID(), StorePathId); + { + ev->Record.SetShardState(2); // NKikimrTxDataShard.EDatashardState.Ready + ev->Record.SetGeneration(Executor()->Generation()); + ev->Record.SetRound(StatsReportRound++); + ev->Record.SetNodeId(ctx.ExecutorThread.ActorSystem->NodeId); + ev->Record.SetStartTime(StartTime().MilliSeconds()); + + if (auto* resourceMetrics = Executor()->GetResourceMetrics()) { + resourceMetrics->Fill(*ev->Record.MutableTabletMetrics()); + } + + auto* tabletStats = ev->Record.MutableTableStats(); + tabletStats->SetTxRejectedByOverload(TabletCounters->Cumulative()[COUNTER_WRITE_OVERLOAD].Get()); + tabletStats->SetTxRejectedBySpace(TabletCounters->Cumulative()[COUNTER_OUT_OF_SPACE].Get()); + tabletStats->SetInFlightTxCount(Executor()->GetStats().TxInFly); + + if (PrimaryIndex) { + const auto& indexStats = PrimaryIndex->GetTotalStats(); + NOlap::TSnapshot lastIndexUpdate = PrimaryIndex->LastUpdate(); + auto activeIndexStats = indexStats.Active(); // data stats excluding inactive and evicted + + tabletStats->SetRowCount(activeIndexStats.Rows); + tabletStats->SetDataSize(activeIndexStats.Bytes + TabletCounters->Simple()[COUNTER_COMMITTED_BYTES].Get()); + // TODO: we need row/dataSize counters for evicted data (managed by tablet but stored outside) + //tabletStats->SetIndexSize(); // TODO: calc size of internal tables + //tabletStats->SetLastAccessTime(); // TODO: last read/write time + tabletStats->SetLastUpdateTime(lastIndexUpdate.PlanStep); + } + } + + LOG_S_DEBUG("Sending periodic stats at tablet " << TabletID()); + NTabletPipe::SendData(ctx, StatsReportPipe, ev.release()); +} + } diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index 62486e3c33..809a0c6e19 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -8,7 +8,7 @@ #include <ydb/core/tx/long_tx_service/public/types.h> -// TODO: temporarily reuse datashard TEvScan (KIKIMR-11069) +// TODO: temporarily reuse datashard TEvScan (KIKIMR-11069) and TEvPeriodicTableStats #include <ydb/core/tx/datashard/datashard.h> namespace NKikimr { @@ -236,6 +236,7 @@ struct TEvColumnShard { }; using TEvScan = TEvDataShard::TEvKqpScan; + using TEvPeriodicTableStats = TEvDataShard::TEvPeriodicTableStats; }; inline auto& Proto(TEvColumnShard::TEvProposeTransaction* ev) { diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index b5493685a0..e96b215c8e 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -19,13 +19,13 @@ void TTxInit::SetDefaults() { Self->LastWriteId = TWriteId{0}; Self->LastPlannedStep = 0; Self->LastPlannedTxId = 0; + Self->StorePathId = 0; Self->BasicTxInfo.clear(); Self->DeadlineQueue.clear(); Self->PlanQueue.clear(); Self->AltersInFlight.clear(); Self->CommitsInFlight.clear(); Self->SchemaPresets.clear(); - //Self->TtlSettingsPresets.clear(); Self->Tables.clear(); Self->LongTxWrites.clear(); Self->LongTxWritesByUniqueId.clear(); @@ -62,6 +62,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastWriteId, Self->LastWriteId); ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastPlannedStep, Self->LastPlannedStep); ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastPlannedTxId, Self->LastPlannedTxId); + ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::StorePathId, Self->StorePathId); if (!ready) return false; @@ -170,49 +171,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) return false; } } -#if 0 // TTL presets haven't been used - { // Load ttl settings presets - auto rowset = db.Table<Schema::TtlSettingsPresetInfo>().Select(); - if (!rowset.IsReady()) - return false; - - while (!rowset.EndOfSet()) { - const ui32 id = rowset.GetValue<Schema::TtlSettingsPresetInfo::Id>(); - auto& preset = Self->TtlSettingsPresets[id]; - preset.Id = id; - preset.Name = rowset.GetValue<Schema::TtlSettingsPresetInfo::Name>(); - if (rowset.HaveValue<Schema::TtlSettingsPresetInfo::DropStep>() && - rowset.HaveValue<Schema::TtlSettingsPresetInfo::DropTxId>()) - { - preset.DropVersion.Step = rowset.GetValue<Schema::TtlSettingsPresetInfo::DropStep>(); - preset.DropVersion.TxId = rowset.GetValue<Schema::TtlSettingsPresetInfo::DropTxId>(); - } - - if (!rowset.Next()) - return false; - } - } - - { // Load ttl settings preset versions - auto rowset = db.Table<Schema::TtlSettingsPresetVersionInfo>().Select(); - if (!rowset.IsReady()) - return false; - while (!rowset.EndOfSet()) { - const ui32 id = rowset.GetValue<Schema::TtlSettingsPresetVersionInfo::Id>(); - Y_VERIFY(Self->TtlSettingsPresets.contains(id)); - auto& preset = Self->TtlSettingsPresets.at(id); - const TRowVersion version( - rowset.GetValue<Schema::TtlSettingsPresetVersionInfo::SinceStep>(), - rowset.GetValue<Schema::TtlSettingsPresetVersionInfo::SinceTxId>()); - auto& info = preset.Versions[version]; - Y_VERIFY(info.ParseFromString(rowset.GetValue<Schema::TtlSettingsPresetVersionInfo::InfoProto>())); - - if (!rowset.Next()) - return false; - } - } -#endif { // Load tables auto rowset = db.Table<Schema::TableInfo>().Select(); if (!rowset.IsReady()) @@ -266,7 +225,6 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) Self->SetCounter(COUNTER_TABLES, Self->Tables.size()); Self->SetCounter(COUNTER_TABLE_PRESETS, Self->SchemaPresets.size()); - //Self->SetCounter(COUNTER_TTL_PRESETS, Self->TtlSettingsPresets.size()); Self->SetCounter(COUNTER_TABLE_TTLS, ttls.size()); if (!schemaPreset.empty()) { diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 481e8a48aa..028b236624 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -114,4 +114,79 @@ void TTxWrite::Complete(const TActorContext& ctx) { ctx.Send(Ev->Get()->GetSource(), Result.release()); } +// EvWrite -> WriteActor (attach BlobId without proto changes) -> EvWrite +void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContext& ctx) { + OnYellowChannels(std::move(ev->Get()->YellowMoveChannels), std::move(ev->Get()->YellowStopChannels)); + + auto& record = Proto(ev->Get()); + auto& data = record.GetData(); + ui64 tableId = record.GetTableId(); + ui64 metaShard = record.GetTxInitiator(); + ui64 writeId = record.GetWriteId(); + TString dedupId = record.GetDedupId(); + + bool isWritable = IsTableWritable(tableId); + bool error = data.empty() || data.size() > TLimits::MAX_BLOB_SIZE || !PrimaryIndex || !isWritable; + bool errorReturned = (ev->Get()->PutStatus != NKikimrProto::OK) && (ev->Get()->PutStatus != NKikimrProto::UNKNOWN); + bool isOutOfSpace = IsAnyChannelYellowStop(); + + if (error || errorReturned) { + LOG_S_WARN("Write (fail) " << data.size() << " bytes into pathId " << tableId + << ", status " << ev->Get()->PutStatus + << (PrimaryIndex? "": ", no index") << (isWritable? "": ", ro") + << " at tablet " << TabletID()); + + IncCounter(COUNTER_WRITE_FAIL); + + auto errCode = NKikimrTxColumnShard::EResultStatus::ERROR; + if (errorReturned) { + if (ev->Get()->PutStatus == NKikimrProto::TIMEOUT) { + errCode = NKikimrTxColumnShard::EResultStatus::TIMEOUT; + } + --WritesInFly; // write failed + } + + auto result = std::make_unique<TEvColumnShard::TEvWriteResult>( + TabletID(), metaShard, writeId, tableId, dedupId, errCode); + ctx.Send(ev->Get()->GetSource(), result.release()); + + } else if (ev->Get()->BlobId.IsValid()) { + LOG_S_DEBUG("Write (record) " << data.size() << " bytes into pathId " << tableId + << (writeId? (" writeId " + ToString(writeId)).c_str() : "") << " at tablet " << TabletID()); + + --WritesInFly; // write successed + Execute(new TTxWrite(this, ev), ctx); + } else if (isOutOfSpace || InsertTable->IsOverloaded(tableId) || ShardOverloaded()) { + IncCounter(COUNTER_WRITE_FAIL); + + if (isOutOfSpace) { + IncCounter(COUNTER_OUT_OF_SPACE); + LOG_S_ERROR("Write (out of disk space) " << data.size() << " bytes into pathId " << tableId + << " at tablet " << TabletID()); + } else { + IncCounter(COUNTER_WRITE_OVERLOAD); + bool tableOverload = InsertTable->IsOverloaded(tableId); + LOG_S_INFO("Write (overload) " << data.size() << " bytes into pathId " << tableId + << (ShardOverloaded()? "[shard]" : "") << (tableOverload? "[table]" : "") + << " at tablet " << TabletID()); + } + + auto result = std::make_unique<TEvColumnShard::TEvWriteResult>( + TabletID(), metaShard, writeId, tableId, dedupId, NKikimrTxColumnShard::EResultStatus::OVERLOADED); + ctx.Send(ev->Get()->GetSource(), result.release()); + } else { + LOG_S_DEBUG("Write (blob) " << data.size() << " bytes into pathId " << tableId + << (writeId? (" writeId " + ToString(writeId)).c_str() : "") + << " at tablet " << TabletID()); + + ev->Get()->MaxSmallBlobSize = Settings.MaxSmallBlobSize; + + ++WritesInFly; // write started + ctx.Register(CreateWriteActor(TabletID(), PrimaryIndex->GetIndexInfo(), ctx.SelfID, + BlobManager->StartBlobBatch(), Settings.BlobWriteGrouppingEnabled, ev->Release())); + } + + SetCounter(COUNTER_WRITES_IN_FLY, WritesInFly); +} + } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index cadd224819..8b0a6479d6 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -68,35 +68,6 @@ void TColumnShard::UnregisterMediatorTimeCast() { } } -void TColumnShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext&) { - const auto* msg = ev->Get(); - Y_VERIFY(msg->TabletId == TabletID()); - MediatorTimeCastEntry = msg->Entry; - Y_VERIFY(MediatorTimeCastEntry); - LOG_S_DEBUG("Registered with mediator time cast at tablet " << TabletID()); - - RescheduleWaitingReads(); -} - -void TColumnShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext&) { - const auto* msg = ev->Get(); - Y_VERIFY(msg->TabletId == TabletID()); - - Y_VERIFY(MediatorTimeCastEntry); - ui64 step = MediatorTimeCastEntry->Get(TabletID()); - LOG_S_DEBUG("Notified by mediator time cast with PlanStep# " << step << " at tablet " << TabletID()); - - for (auto it = MediatorTimeCastWaitingSteps.begin(); it != MediatorTimeCastWaitingSteps.end();) { - if (step < *it) { - break; - } - it = MediatorTimeCastWaitingSteps.erase(it); - } - - RescheduleWaitingReads(); - EnqueueBackgroundActivities(true); -} - bool TColumnShard::WaitPlanStep(ui64 step) { if (step <= LastPlannedStep) { return false; @@ -293,7 +264,8 @@ void TColumnShard::UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExec } } -void TColumnShard::ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& seqNoProto, NTabletFlatExecutor::TTransactionContext& txc) { +void TColumnShard::ProtectSchemaSeqNo(const NKikimrTxColumnShard::TSchemaSeqNo& seqNoProto, + NTabletFlatExecutor::TTransactionContext& txc) { auto seqNo = SeqNoFromProto(seqNoProto); if (LastSchemaSeqNo <= seqNo) { UpdateSchemaSeqNo(++seqNo, txc); @@ -308,7 +280,8 @@ bool TColumnShard::IsTableWritable(ui64 tableId) const { return !it->second.IsDropped(); } -ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, const TRowVersion& version) { +ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableSchemaPreset& presetProto, + const TRowVersion& version) { if (!SchemaPresets.contains(presetProto.GetId())) { auto& preset = SchemaPresets[presetProto.GetId()]; preset.Id = presetProto.GetId(); @@ -328,30 +301,12 @@ ui32 TColumnShard::EnsureSchemaPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp return presetProto.GetId(); } -#if 0 -ui32 TColumnShard::EnsureTtlSettingsPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableTtlSettingsPreset& presetProto, const TRowVersion& version) { - if (!TtlSettingsPresets.contains(presetProto.GetId())) { - auto& preset = TtlSettingsPresets[presetProto.GetId()]; - preset.Id = presetProto.GetId(); - preset.Name = presetProto.GetName(); - auto& info = preset.Versions[version]; - info.SetId(preset.Id); - info.SetSinceStep(version.Step); - info.SetSinceTxId(version.TxId); - *info.MutableTtlSettings() = presetProto.GetTtlSettings(); - - Schema::SaveTtlSettingsPresetInfo(db, preset.Id, preset.Name); - Schema::SaveTtlSettingsPresetVersionInfo(db, preset.Id, version, info); - SetCounter(COUNTER_TTL_PRESETS, TtlSettingsPresets.size()); - } - return presetProto.GetId(); -} -#endif -void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc) { +void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, + NTabletFlatExecutor::TTransactionContext& txc) { switch (body.TxBody_case()) { case NKikimrTxColumnShard::TSchemaTxBody::kInitShard: { - // nothing yet, but maybe remember initial PlanStep:TxId for some reason? + RunInit(body.GetInitShard(), version, txc); return; } case NKikimrTxColumnShard::TSchemaTxBody::kEnsureTables: { @@ -380,7 +335,20 @@ void TColumnShard::RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, Y_FAIL("Unsupported schema tx type"); } -void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tableProto, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc) { +void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const TRowVersion& version, + NTabletFlatExecutor::TTransactionContext& txc) { + Y_UNUSED(version); + + NIceDb::TNiceDb db(txc.DB); + + if (proto.HasStorePathId()) { + StorePathId = proto.GetStorePathId(); + Schema::SaveSpecialValue(db, Schema::EValueIds::StorePathId, StorePathId); + } +} + +void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tableProto, const TRowVersion& version, + NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); const ui64 pathId = tableProto.GetPathId(); @@ -403,12 +371,7 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl Ttl.SetPathTtl(pathId, TTtl::TDescription(tableProto.GetTtlSettings())); SetCounter(COUNTER_TABLE_TTLS, Ttl.PathsCount()); } -#if 0 - if (tableProto.HasTtlSettingsPreset()) { - ui32 ttlPresetId = EnsureTtlSettingsPreset(db, tableProto.GetTtlSettingsPreset(), version); - tableVerProto.SetTtlSettingsPresetId(ttlPresetId); - } -#endif + if (!PrimaryIndex && schemaPresetId) { auto& schemaPresetVerProto = SchemaPresets[schemaPresetId].Versions[version]; TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset; @@ -427,7 +390,8 @@ void TColumnShard::RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& tabl } } -void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterProto, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc) { +void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterProto, const TRowVersion& version, + NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); const ui64 pathId = alterProto.GetPathId(); @@ -449,21 +413,14 @@ void TColumnShard::RunAlterTable(const NKikimrTxColumnShard::TAlterTable& alterP } else { Ttl.DropPathTtl(pathId); } -#if 0 - if (alterProto.HasTtlSettingsPreset()) { - ui32 ttlPresetId = EnsureTtlSettingsPreset(db, alterProto.GetTtlSettingsPreset(), version); - info.SetTtlSettingsPresetId(ttlPresetId); - } -#endif + info.SetSchemaPresetVersionAdj(alterProto.GetSchemaPresetVersionAdj()); -#if 0 - info.SetTtlSettingsPresetVersionAdj(alterProto.GetTtlSettingsPresetVersionAdj()); -#endif Schema::SaveTableVersionInfo(db, table.PathId, version, info); } -void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc) { +void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProto, const TRowVersion& version, + NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); const ui64 pathId = dropProto.GetPathId(); @@ -486,9 +443,15 @@ void TColumnShard::RunDropTable(const NKikimrTxColumnShard::TDropTable& dropProt } } -void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc) { +void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, const TRowVersion& version, + NTabletFlatExecutor::TTransactionContext& txc) { NIceDb::TNiceDb db(txc.DB); + if (proto.HasStorePathId()) { + StorePathId = proto.GetStorePathId(); + Schema::SaveSpecialValue(db, Schema::EValueIds::StorePathId, StorePathId); + } + TMap<NOlap::TSnapshot, NOlap::TIndexInfo> schemaPreset; for (ui32 id : proto.GetDroppedSchemaPresets()) { @@ -500,16 +463,7 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, preset.DropVersion = version; Schema::SaveSchemaPresetDropVersion(db, id, version); } -#if 0 - for (ui32 id : proto.GetDroppedTtlSettingsPresets()) { - if (!TtlSettingsPresets.contains(id)) { - continue; - } - auto& preset = TtlSettingsPresets.at(id); - preset.DropVersion = version; - Schema::SaveTtlSettingsPresetDropVersion(db, id, version); - } -#endif + for (const auto& presetProto : proto.GetSchemaPresets()) { if (!SchemaPresets.contains(presetProto.GetId())) { continue; // we don't update presets that we don't use @@ -528,23 +482,6 @@ void TColumnShard::RunAlterStore(const NKikimrTxColumnShard::TAlterStore& proto, Schema::SaveSchemaPresetVersionInfo(db, preset.Id, version, info); } -#if 0 - for (const auto& presetProto : proto.GetTtlSettingsPresets()) { - ui32 presetId = presetProto.GetId(); - if (!TtlSettingsPresets.contains(presetId)) { - continue; // we don't update presets that we don't use - } - - auto& preset = TtlSettingsPresets[presetId]; - auto& info = preset.Versions[version]; - info.SetId(presetId); - info.SetSinceStep(version.Step); - info.SetSinceTxId(version.TxId); - *info.MutableTtlSettings() = presetProto.GetTtlSettings(); - - Schema::SaveTtlSettingsPresetVersionInfo(db, preset.Id, version, info); - } -#endif if (!schemaPreset.empty()) { SetPrimaryIndex(std::move(schemaPreset)); @@ -572,6 +509,8 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, bool insertOnly) { } const TActorContext& ctx = TlsActivationContext->AsActorContext(); + SendPeriodicStats(ctx); + if (auto event = SetupIndexation()) { ctx.Send(IndexingActor, event.release()); } diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 951827b623..9c42e0b57d 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -17,17 +17,32 @@ namespace NKikimr::NColumnShard { extern bool gAllowLogBatchingDefaultValue; +IActor* CreateIndexingActor(ui64 tabletId, const TActorId& parent); +IActor* CreateCompactionActor(ui64 tabletId, const TActorId& parent); +IActor* CreateEvictionActor(ui64 tabletId, const TActorId& parent); +IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, + const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, + TAutoPtr<TEvColumnShard::TEvWrite> ev, const TInstant& deadline = TInstant::Max()); +IActor* CreateWriteActor(ui64 tabletId, const NOlap::TIndexInfo& indexTable, + const TActorId& dstActor, TBlobBatch&& blobBatch, bool blobGrouppingEnabled, + TAutoPtr<TEvPrivate::TEvWriteIndex> ev, const TInstant& deadline = TInstant::Max()); +IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId); + struct TSettings { TControlWrapper BlobWriteGrouppingEnabled; TControlWrapper CacheDataAfterIndexing; TControlWrapper CacheDataAfterCompaction; TControlWrapper MaxSmallBlobSize; + TControlWrapper OverloadTxInFly; + TControlWrapper OverloadWritesInFly; TSettings() : BlobWriteGrouppingEnabled(1, 0, 1) , CacheDataAfterIndexing(1, 0, 1) , CacheDataAfterCompaction(1, 0, 1) , MaxSmallBlobSize(0, 0, 8000000) + , OverloadTxInFly(1000, 0, 10000) + , OverloadWritesInFly(1000, 0, 10000) {} void RegisterControls(TControlBoard& icb) { @@ -35,6 +50,8 @@ struct TSettings { icb.RegisterSharedControl(CacheDataAfterIndexing, "ColumnShardControls.CacheDataAfterIndexing"); icb.RegisterSharedControl(CacheDataAfterCompaction, "ColumnShardControls.CacheDataAfterCompaction"); icb.RegisterSharedControl(MaxSmallBlobSize, "ColumnShardControls.MaxSmallBlobSize"); + icb.RegisterSharedControl(OverloadTxInFly, "ColumnShardControls.OverloadTxInFly"); + icb.RegisterSharedControl(OverloadWritesInFly, "ColumnShardControls.OverloadWritesInFly"); } }; @@ -102,6 +119,7 @@ class TColumnShard void Die(const TActorContext& ctx) override { // TODO + NTabletPipe::CloseAndForgetClient(SelfId(), StatsReportPipe); UnregisterMediatorTimeCast(); return IActor::Die(ctx); } @@ -257,20 +275,7 @@ private: return DropVersion != TRowVersion::Max(); } }; -#if 0 - struct TTtlSettingsPreset { - using TVerProto = NKikimrTxColumnShard::TTtlSettingsPresetVersionInfo; - - ui32 Id; - TString Name; - TMap<TRowVersion, TVerProto> Versions; - TRowVersion DropVersion = TRowVersion::Max(); - bool IsDropped() const { - return DropVersion != TRowVersion::Max(); - } - }; -#endif struct TTableInfo { using TVerProto = NKikimrTxColumnShard::TTableVersionInfo; @@ -296,6 +301,9 @@ private: ui64 LastPlannedStep = 0; ui64 LastPlannedTxId = 0; ui64 LastCompactedGranule = 0; + ui64 WritesInFly = 0; + ui64 StorePathId = 0; + ui64 StatsReportRound = 0; TIntrusivePtr<TMediatorTimecastEntry> MediatorTimeCastEntry; bool MediatorTimeCastRegistered = false; @@ -304,11 +312,14 @@ private: TDuration MaxCommitTxDelay = TDuration::Seconds(30); // TODO: Make configurable? TDuration ActivationPeriod = TDuration::Seconds(60); TDuration FailActivationDelay = TDuration::Seconds(1); + TDuration StatsReportInterval = TDuration::Seconds(10); TInstant LastBackActivation; + TInstant LastStatsReport; TActorId IndexingActor; // It's logically bounded to 1: we move each portion of data to multiple indices. TActorId CompactionActor; // It's memory bounded to 1: we have no memory for parallel compation. TActorId EvictionActor; + TActorId StatsReportPipe; std::unique_ptr<TTabletCountersBase> TabletCountersPtr; TTabletCountersBase* TabletCounters; std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache; @@ -325,7 +336,6 @@ private: THashMap<ui64, TAlterMeta> AltersInFlight; THashMap<ui64, TCommitMeta> CommitsInFlight; // key is TxId from propose THashMap<ui32, TSchemaPreset> SchemaPresets; - //THashMap<ui32, TTtlSettingsPreset> TtlSettingsPresets; THashMap<ui64, TTableInfo> Tables; THashMap<TWriteId, TLongTxWriteInfo> LongTxWrites; THashMap<TULID, TLongTxWriteInfo*> LongTxWritesByUniqueId; @@ -354,6 +364,13 @@ private: ui64 GetAllowedStep() const; bool HaveOutdatedTxs() const; + bool ShardOverloaded() const { + ui64 txLimit = Settings.OverloadTxInFly; + ui64 writesLimit = Settings.OverloadWritesInFly; + return (txLimit && Executor()->GetStats().TxInFly > txLimit) || + (writesLimit && WritesInFly > writesLimit); + } + TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId); void AddLongTxWrite(TWriteId writeId, ui64 txId); void LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLongTxId& longTxId); @@ -372,6 +389,7 @@ private: //ui32 EnsureTtlSettingsPreset(NIceDb::TNiceDb& db, const NKikimrSchemeOp::TColumnTableTtlSettingsPreset& presetProto, const TRowVersion& version); void RunSchemaTx(const NKikimrTxColumnShard::TSchemaTxBody& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); + void RunInit(const NKikimrTxColumnShard::TInitShard& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunEnsureTable(const NKikimrTxColumnShard::TCreateTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunAlterTable(const NKikimrTxColumnShard::TAlterTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); void RunDropTable(const NKikimrTxColumnShard::TDropTable& body, const TRowVersion& version, NTabletFlatExecutor::TTransactionContext& txc); @@ -390,6 +408,8 @@ private: void UpdateInsertTableCounters(); void UpdateIndexCounters(); void UpdateResourceMetrics(const TUsage& usage); + ui64 MemoryUsage() const; + void SendPeriodicStats(const TActorContext& ctx); public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index d88300f374..a6d62d9582 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -47,6 +47,8 @@ struct Schema : NIceDb::Schema { LastGcBarrierGen = 8, LastGcBarrierStep = 9, + //LastExportNumber = 10, + StorePathId = 11, }; enum class EInsertTableIds : ui8 { @@ -345,37 +347,7 @@ struct Schema : NIceDb::Schema { static void EraseSchemaPresetInfo(NIceDb::TNiceDb& db, ui64 id) { db.Table<SchemaPresetInfo>().Key(id).Delete(); } -#if 0 - static void SaveTtlSettingsPresetInfo(NIceDb::TNiceDb& db, ui64 id, const TString& name) { - db.Table<TtlSettingsPresetInfo>().Key(id).Update( - NIceDb::TUpdate<TtlSettingsPresetInfo::Name>(name)); - } - static void SaveTtlSettingsPresetVersionInfo( - NIceDb::TNiceDb& db, - ui64 id, const TRowVersion& version, - const NKikimrTxColumnShard::TTtlSettingsPresetVersionInfo& info) - { - TString serialized; - Y_VERIFY(info.SerializeToString(&serialized)); - db.Table<TtlSettingsPresetVersionInfo>().Key(id, version.Step, version.TxId).Update( - NIceDb::TUpdate<TtlSettingsPresetVersionInfo::InfoProto>(serialized)); - } - - static void SaveTtlSettingsPresetDropVersion(NIceDb::TNiceDb& db, ui64 id, const TRowVersion& dropVersion) { - db.Table<TtlSettingsPresetInfo>().Key(id).Update( - NIceDb::TUpdate<TtlSettingsPresetInfo::DropStep>(dropVersion.Step), - NIceDb::TUpdate<TtlSettingsPresetInfo::DropTxId>(dropVersion.TxId)); - } - - static void EraseTtlSettingsPresetVersionInfo(NIceDb::TNiceDb& db, ui64 id, const TRowVersion& version) { - db.Table<TtlSettingsPresetVersionInfo>().Key(id, version.Step, version.TxId).Delete(); - } - - static void EraseTtlSettingsPresetInfo(NIceDb::TNiceDb& db, ui64 id) { - db.Table<TtlSettingsPresetInfo>().Key(id).Delete(); - } -#endif static void SaveTableInfo(NIceDb::TNiceDb& db, ui64 pathId) { db.Table<TableInfo>().Key(pathId).Update(); } diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 8fb6ac424a..8c14d41815 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -306,6 +306,22 @@ struct TColumnEngineStats { TPortionsStats Inactive{}; TPortionsStats Evicted{}; + TPortionsStats Active() const { + return TPortionsStats { + .Portions = ActivePortions(), + .Blobs = ActiveBlobs(), + .Rows = ActiveRows(), + .Bytes = ActiveBytes(), + .RawBytes = ActiveRawBytes() + }; + } + + ui64 ActivePortions() const { return Inserted.Portions + Compacted.Portions + SplitCompacted.Portions; } + ui64 ActiveBlobs() const { return Inserted.Blobs + Compacted.Blobs + SplitCompacted.Blobs; } + ui64 ActiveRows() const { return Inserted.Rows + Compacted.Rows + SplitCompacted.Rows; } + ui64 ActiveBytes() const { return Inserted.Bytes + Compacted.Bytes + SplitCompacted.Bytes; } + ui64 ActiveRawBytes() const { return Inserted.RawBytes + Compacted.RawBytes + SplitCompacted.RawBytes; } + void Clear() { *this = {}; } @@ -341,6 +357,7 @@ public: virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0; virtual const TColumnEngineStats& GetTotalStats() = 0; virtual ui64 MemoryUsage() const { return 0; } + virtual TSnapshot LastUpdate() const { return {}; } }; } diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 7a4f443ed3..a8b071f2ba 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -149,6 +149,7 @@ public: const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const override; const TColumnEngineStats& GetTotalStats() override; ui64 MemoryUsage() const override; + TSnapshot LastUpdate() const override { return LastSnapshot; } std::shared_ptr<TSelectInfo> Select(ui64 pathId, TSnapshot snapshot, const THashSet<ui32>& columnIds, diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp index e32d1f1f9a..ec78578324 100644 --- a/ydb/core/tx/columnshard/write_actor.cpp +++ b/ydb/core/tx/columnshard/write_actor.cpp @@ -104,15 +104,15 @@ public: void SendWriteRequest(const TActorContext& ctx) { Y_VERIFY(WriteEv->PutStatus == NKikimrProto::UNKNOWN); - LOG_S_DEBUG("Writing inserted blob at tablet " << TabletId); - auto& record = Proto(WriteEv.Get()); + ui64 pathId = record.GetTableId(); + ui64 writeId = record.GetWriteId(); auto& srcData = record.GetData(); TString meta; if (record.HasMeta()) { meta = record.GetMeta().GetSchema(); if (meta.empty() || record.GetMeta().GetFormat() != NKikimrTxColumnShard::FORMAT_ARROW) { - LOG_S_DEBUG("Bad metadata for writing data at tablet " << TabletId); + LOG_S_INFO("Bad metadata for writeId " << writeId << " pathId " << pathId << " at tablet " << TabletId); SendResultAndDie(ctx, NKikimrProto::ERROR); } } @@ -125,7 +125,8 @@ public: batch = IndexInfo.PrepareForInsert(srcData, meta, strError); } if (!batch) { - LOG_S_DEBUG("Bad data to write (" << strError << ") at tablet " << TabletId); + LOG_S_INFO("Bad data for writeId " << writeId << ", pathId " << pathId + << " (" << strError << ") at tablet " << TabletId); SendResultAndDie(ctx, NKikimrProto::ERROR); return; } @@ -136,8 +137,9 @@ public: data = NArrow::SerializeBatchNoCompression(batch); } if (data.size() > TLimits::MAX_BLOB_SIZE) { - LOG_S_DEBUG("Extracted data (" << data.size() << " bytes) is bigger than source (" - << srcData.size() << " bytes) and limit at tablet " << TabletId); + LOG_S_INFO("Extracted data (" << data.size() << " bytes) is bigger than source (" + << srcData.size() << " bytes) and limit, writeId " << writeId << " pathId " << pathId + << " at tablet " << TabletId); SendResultAndDie(ctx, NKikimrProto::ERROR); return; @@ -156,7 +158,8 @@ public: meta.clear(); if (!outMeta.SerializeToString(&meta)) { - LOG_S_ERROR("Canot set metadata for writing blob at tablet " << TabletId); + LOG_S_ERROR("Canot set metadata for blob, writeId " << writeId << " pathId " << pathId + << " at tablet " << TabletId); SendResultAndDie(ctx, NKikimrProto::ERROR); return; } @@ -173,7 +176,8 @@ public: Y_VERIFY(WriteEv->BlobId.BlobSize() == data.size()); - LOG_S_DEBUG("Write Blob " << WriteEv->BlobId.ToStringNew()); + LOG_S_DEBUG("Writing " << WriteEv->BlobId.ToStringNew() << " writeId " << writeId << " pathId " << pathId + << " at tablet " << TabletId); if (BlobBatch.AllBlobWritesCompleted()) { SendResultAndDie(ctx, NKikimrProto::OK); @@ -298,7 +302,7 @@ private: void SendResult(const TActorContext& ctx, NKikimrProto::EReplyStatus status) { SaveResourceUsage(); if (WriteEv) { - LOG_S_DEBUG("Write Blob " << WriteEv->BlobId.ToStringNew() << " Status: " << status); + LOG_S_DEBUG("Written " << WriteEv->BlobId.ToStringNew() << " Status: " << status); WriteEv->PutStatus = status; WriteEv->BlobBatch = std::move(BlobBatch); WriteEv->YellowMoveChannels = TVector<ui32>(YellowMoveChannels.begin(), YellowMoveChannels.end()); diff --git a/ydb/core/tx/datashard/datashard.h b/ydb/core/tx/datashard/datashard.h index 1de4c368ce..6e0f11fa18 100644 --- a/ydb/core/tx/datashard/datashard.h +++ b/ydb/core/tx/datashard/datashard.h @@ -823,6 +823,11 @@ struct TEvDataShard { Record.SetTableOwnerId(tableOwnerId); Record.SetTableLocalId(tableLocalId); } + + TEvPeriodicTableStats(ui64 datashardId, ui64 tableLocalId) { + Record.SetDatashardId(datashardId); + Record.SetTableLocalId(tableLocalId); + } }; struct TEvS3ListingRequest : public TEventPB<TEvS3ListingRequest, diff --git a/ydb/core/tx/schemeshard/operation_queue_timer.h b/ydb/core/tx/schemeshard/operation_queue_timer.h index 14d5530153..8f045de44b 100644 --- a/ydb/core/tx/schemeshard/operation_queue_timer.h +++ b/ydb/core/tx/schemeshard/operation_queue_timer.h @@ -115,7 +115,7 @@ struct TShardCompactionInfo { : ShardIdx(id) {} - TShardCompactionInfo(const TShardIdx& id, const TTableInfo::TPartitionStats& stats) + TShardCompactionInfo(const TShardIdx& id, const TPartitionStats& stats) : ShardIdx(id) , SearchHeight(stats.SearchHeight) , LastFullCompactionTs(stats.FullCompactionTs) diff --git a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp index 810be23a9d..2f9d015ec5 100644 --- a/ydb/core/tx/schemeshard/schemeshard__compaction.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__compaction.cpp @@ -85,7 +85,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T // it's OK to OnDone InvalidShardIdx // move shard to the end of all queues TInstant now = AppData(ctx)->TimeProvider->Now(); - TTableInfo::TPartitionStats stats; + TPartitionStats stats; stats.FullCompactionTs = now.Seconds(); auto duration = CompactionQueue->OnDone(TShardCompactionInfo(shardIdx, stats)); @@ -139,7 +139,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const T void TSchemeShard::EnqueueBackgroundCompaction( const TShardIdx& shardIdx, - const TTableInfo::TPartitionStats& stats) + const TPartitionStats& stats) { if (!CompactionQueue) return; @@ -174,7 +174,7 @@ void TSchemeShard::EnqueueBackgroundCompaction( void TSchemeShard::UpdateBackgroundCompaction( const TShardIdx& shardIdx, - const TTableInfo::TPartitionStats& newStats) + const TPartitionStats& newStats) { if (!CompactionQueue) return; @@ -241,7 +241,7 @@ void TSchemeShard::UpdateBackgroundCompactionQueueMetrics() { void TSchemeShard::UpdateShardMetrics( const TShardIdx& shardIdx, - const TTableInfo::TPartitionStats& newStats) + const TPartitionStats& newStats) { if (newStats.HasBorrowedData) ShardsWithBorrowed.insert(shardIdx); diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 7211043579..11527944f0 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -2131,7 +2131,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { } } - TTableInfo::TPartitionStats stats; + TPartitionStats stats; stats.SeqNo = TMessageSeqNo( rowSet.GetValue<Schema::TablePartitionStats::SeqNoGeneration>(), diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp index fd53304a5e..c94721a26c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_alter_olap_store.cpp @@ -261,6 +261,7 @@ public: context.SS->FillSeqNo(tx, seqNo); auto* alter = tx.MutableAlterStore(); + alter->SetStorePathId(txState->TargetPathId.LocalPathId); for (ui32 id : droppedSchemaPresets) { alter->AddDroppedSchemaPresets(id); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp index e8eebf867c..4cb1329e18 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp @@ -276,6 +276,7 @@ public: // TODO: we may need to specify a more complex data channel mapping auto* init = tx.MutableInitShard(); init->SetDataChannelCount(storeInfo->Description.GetStorageConfig().GetDataChannelCount()); + init->SetStorePathId(txState->TargetPathId.LocalPathId); Y_PROTOBUF_SUPPRESS_NODISCARD tx.SerializeToString(&columnShardTxBody); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp index 02886af4b6..07006a0203 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_table.cpp @@ -184,40 +184,13 @@ TColumnTableInfo::TPtr CreateColumnTable( } else { storageTiers = pSchema->Tiers; } -#if 1 + if (op.HasRESERVED_TtlSettingsPresetName() || op.HasRESERVED_TtlSettingsPresetId()) { status = NKikimrScheme::StatusSchemeError; errStr = "TTL presets are not supported"; return nullptr; } -#else - if (op.HasTtlSettingsPresetName()) { - const TString presetName = op.GetTtlSettingsPresetName(); - if (!storeInfo->TtlSettingsPresetByName.contains(presetName)) { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Specified ttl settings preset '%s' does not exist in olap store", presetName.c_str()); - return nullptr; - } - const ui32 presetId = storeInfo->TtlSettingsPresetByName.at(presetName); - if (!op.HasTtlSettingsPresetId()) { - op.SetTtlSettingsPresetId(presetId); - } - if (op.GetTtlSettingsPresetId() != presetId) { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Specified ttl settings preset '%s' and id %" PRIu32 " do not match in olap store", presetName.c_str(), presetId); - return nullptr; - } - } else if (op.HasTtlSettingsPresetId()) { - const ui32 presetId = op.GetTtlSettingsPresetId(); - if (!storeInfo->TtlSettingsPresets.contains(presetId)) { - status = NKikimrScheme::StatusSchemeError; - errStr = Sprintf("Specified ttl preset %" PRIu32 " does not exist in olap store", presetId); - return nullptr; - } - const TString& presetName = storeInfo->TtlSettingsPresets.at(presetId).Name; - op.SetTtlSettingsPresetName(presetName); - } else -#endif + if (op.HasTtlSettings()) { op.MutableTtlSettings()->SetVersion(1); } diff --git a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp index bd64a85234..1a5cc6fa3f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__table_stats.cpp @@ -27,7 +27,7 @@ void TSchemeShard::Handle(NSysView::TEvSysView::TEvGetPartitionStats::TPtr& ev, } auto TSchemeShard::BuildStatsForCollector(TPathId pathId, TShardIdx shardIdx, TTabletId datashardId, - TMaybe<ui32> nodeId, TMaybe<ui64> startTime, const TTableInfo::TPartitionStats& stats) + TMaybe<ui32> nodeId, TMaybe<ui64> startTime, const TPartitionStats& stats) { auto ev = MakeHolder<NSysView::TEvSysView::TEvSendPartitionStats>( GetDomainKey(pathId), pathId, std::make_pair(ui64(shardIdx.GetOwnerId()), ui64(shardIdx.GetLocalId()))); @@ -175,20 +175,21 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const ui64 dataSize = tableStats.GetDataSize(); ui64 rowCount = tableStats.GetRowCount(); - if (!Self->Tables.contains(pathId)) { + bool isDataShard = Self->Tables.contains(pathId); + bool isOlapStore = Self->OlapStores.contains(pathId); + if (!isDataShard && !isOlapStore) { + LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Unexpected stats from shard " << datashardId); return true; } - TTableInfo::TPtr table = Self->Tables[pathId]; - if (!Self->TabletIdToShardIdx.contains(datashardId)) { + LOG_ERROR_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "No shardIdx for shard " << datashardId); return true; } - auto shardIdx = Self->TabletIdToShardIdx[datashardId]; - const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings(); + TShardIdx shardIdx = Self->TabletIdToShardIdx[datashardId]; - TTableInfo::TPartitionStats newStats; + TPartitionStats newStats; newStats.SeqNo = TMessageSeqNo(rec.GetGeneration(), rec.GetRound()); newStats.RowCount = tableStats.GetRowCount(); @@ -236,22 +237,46 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const } newStats.ShardState = rec.GetShardState(); - auto oldAggrStats = table->GetStats().Aggregated; - table->UpdateShardStats(shardIdx, newStats); + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Add stats from shard " << datashardId << ", pathId " << pathId.LocalPathId + << ": RowCount " << newStats.RowCount << ", DataSize " << newStats.DataSize); - if (!table->IsBackup) { - Self->UpdateBackgroundCompaction(shardIdx, newStats); - Self->UpdateShardMetrics(shardIdx, newStats); - } + NIceDb::TNiceDb db(txc.DB); - if (!newStats.HasBorrowedData) { - Self->RemoveBorrowedCompaction(shardIdx); - } + TTableInfo::TPtr table; + TPartitionStats oldAggrStats; + TPartitionStats newAggrStats; + bool updateSubdomainInfo = false; - NIceDb::TNiceDb db(txc.DB); + if (isDataShard) { + table = Self->Tables[pathId]; + oldAggrStats = table->GetStats().Aggregated; + table->UpdateShardStats(shardIdx, newStats); + + if (!table->IsBackup) { + Self->UpdateBackgroundCompaction(shardIdx, newStats); + Self->UpdateShardMetrics(shardIdx, newStats); + } + + if (!newStats.HasBorrowedData) { + Self->RemoveBorrowedCompaction(shardIdx); + } - if (!table->IsBackup && !table->IsShardsStatsDetached()) { - auto newAggrStats = table->GetStats().Aggregated; + if (!table->IsBackup && !table->IsShardsStatsDetached()) { + newAggrStats = table->GetStats().Aggregated; + updateSubdomainInfo = true; + } + + Self->PersistTablePartitionStats(db, pathId, shardIdx, table); + } else if (isOlapStore) { + TOlapStoreInfo::TPtr olapStore = Self->OlapStores[pathId]; + oldAggrStats = olapStore->GetStats().Aggregated; + olapStore->UpdateShardStats(shardIdx, newStats); + newAggrStats = olapStore->GetStats().Aggregated; + updateSubdomainInfo = true; + } + + if (updateSubdomainInfo) { auto subDomainId = Self->ResolveDomainId(pathId); auto subDomainInfo = Self->ResolveDomainInfo(pathId); subDomainInfo->AggrDiskSpaceUsage(Self, newAggrStats, oldAggrStats); @@ -264,8 +289,6 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const } } - Self->PersistTablePartitionStats(db, pathId, shardIdx, table); - if (AppData(ctx)->FeatureFlags.GetEnableSystemViews()) { TMaybe<ui32> nodeId; if (rec.HasNodeId()) { @@ -281,6 +304,13 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const Self->BuildStatsForCollector(pathId, shardIdx, datashardId, nodeId, startTime, newStats).Release()); } + if (isOlapStore) { + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Aggregated stats for pathId " << pathId.LocalPathId + << ": RowCount " << newAggrStats.RowCount << ", DataSize " << newAggrStats.DataSize); + return true; + } + const auto& shardToPartition = table->GetShard2PartitionIdx(); if (table->IsTTLEnabled() && shardToPartition.contains(shardIdx)) { const ui64 partitionIdx = shardToPartition.at(shardIdx); @@ -306,6 +336,7 @@ bool TTxStorePartitionStats::PersistSingleStats(TTransactionContext& txc, const Self->TabletCounters->Percentile()[COUNTER_NUM_SHARDS_BY_TTL_LAG].IncrementFor(lag->Seconds()); } + const auto forceShardSplitSettings = Self->SplitSettings.GetForceShardSplitSettings(); TVector<TShardIdx> shardsToMerge; if (table->CheckCanMergePartitions(Self->SplitSettings, forceShardSplitSettings, shardIdx, shardsToMerge)) { TTxId txId = Self->GetCachedTxId(ctx); @@ -430,7 +461,7 @@ void TSchemeShard::Handle(TEvDataShard::TEvPeriodicTableStats::TPtr& ev, const T LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, "Got periodic table stats at tablet " << TabletID() - << " from datashard " << datashardId + << " from shard " << datashardId << " pathId " << pathId << " state '" << DatashardStateName(rec.GetShardState()) << "'" << " dataSize " << dataSize diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index e3bcf381d9..9592b57f97 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -2111,7 +2111,7 @@ void TSchemeShard::PersistTablePartitionCondErase(NIceDb::TNiceDb& db, const TPa } } -void TSchemeShard::PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, ui64 partitionId, const TTableInfo::TPartitionStats& stats) { +void TSchemeShard::PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, ui64 partitionId, const TPartitionStats& stats) { if (!AppData()->FeatureFlags.GetEnablePersistentPartitionStats()) { return; } @@ -3267,7 +3267,7 @@ void TSchemeShard::PersistRemoveTable(NIceDb::TNiceDb& db, TPathId pathId, const if (!tableInfo->IsBackup && !tableInfo->IsShardsStatsDetached()) { auto subDomainId = ResolveDomainId(pathId); auto subDomainInfo = ResolveDomainInfo(pathId); - subDomainInfo->AggrDiskSpaceUsage(this, TTableInfo::TPartitionStats(), tableInfo->GetStats().Aggregated); + subDomainInfo->AggrDiskSpaceUsage(this, TPartitionStats(), tableInfo->GetStats().Aggregated); if (subDomainInfo->CheckDiskSpaceQuotas(this)) { PersistSubDomainState(db, subDomainId, *subDomainInfo); // Publish is done in a separate transaction, so we may call this directly diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 6e4ecc0256..2f6041ff42 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -517,7 +517,7 @@ public: void SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, TVector<TTableShardInfo>&& newPartitioning); auto BuildStatsForCollector(TPathId tableId, TShardIdx shardIdx, TTabletId datashardId, - TMaybe<ui32> nodeId, TMaybe<ui64> startTime, const TTableInfo::TPartitionStats& stats); + TMaybe<ui32> nodeId, TMaybe<ui64> startTime, const TPartitionStats& stats); bool ReadSysValue(NIceDb::TNiceDb& db, ui64 sysTag, TString& value, TString defValue = TString()); bool ReadSysValue(NIceDb::TNiceDb& db, ui64 sysTag, ui64& value, ui64 defVal = 0); @@ -557,7 +557,7 @@ public: void PersistTablePartitioning(NIceDb::TNiceDb &db, const TPathId pathId, const TTableInfo::TPtr tableInfo); void PersistTablePartitioningDeletion(NIceDb::TNiceDb& db, const TPathId tableId, const TTableInfo::TPtr tableInfo); void PersistTablePartitionCondErase(NIceDb::TNiceDb& db, const TPathId& pathId, ui64 id, const TTableInfo::TPtr tableInfo); - void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, ui64 partitionId, const TTableInfo::TPartitionStats& stats); + void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, ui64 partitionId, const TPartitionStats& stats); void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, const TShardIdx& shardIdx, const TTableInfo::TPtr tableInfo); void PersistTablePartitionStats(NIceDb::TNiceDb& db, const TPathId& tableId, const TTableInfo::TPtr tableInfo); void PersistTableCreated(NIceDb::TNiceDb& db, const TPathId tableId); @@ -731,14 +731,14 @@ public: void ScheduleCleanDroppedPaths(); void Handle(TEvPrivate::TEvCleanDroppedPaths::TPtr& ev, const TActorContext& ctx); - void EnqueueBackgroundCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats); - void UpdateBackgroundCompaction(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& stats); + void EnqueueBackgroundCompaction(const TShardIdx& shardIdx, const TPartitionStats& stats); + void UpdateBackgroundCompaction(const TShardIdx& shardIdx, const TPartitionStats& stats); void RemoveBackgroundCompaction(const TShardIdx& shardIdx); void EnqueueBorrowedCompaction(const TShardIdx& shardIdx); void RemoveBorrowedCompaction(const TShardIdx& shardIdx); - void UpdateShardMetrics(const TShardIdx& shardIdx, const TTableInfo::TPartitionStats& newStats); + void UpdateShardMetrics(const TShardIdx& shardIdx, const TPartitionStats& newStats); void RemoveShardMetrics(const TShardIdx& shardIdx); void ShardRemoved(const TShardIdx& shardIdx); diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp index 75da6dea98..ee068bbfb2 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.cpp @@ -1288,12 +1288,16 @@ void TTableInfo::SetPartitioning(TVector<TTableShardInfo>&& newPartitioning) { } } -void TTableInfo::UpdateShardStats(TShardIdx datashardIdx, TPartitionStats& newStats) { +void TTableInfo::UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats) { + Stats.UpdateShardStats(datashardIdx, newStats); +} + +void TAggregatedStats::UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats) { // Ignore stats from unknown datashard (it could have been split) - if (!Stats.PartitionStats.contains(datashardIdx)) + if (!PartitionStats.contains(datashardIdx)) return; - TPartitionStats& oldStats = Stats.PartitionStats[datashardIdx]; + TPartitionStats& oldStats = PartitionStats[datashardIdx]; if (newStats.SeqNo <= oldStats.SeqNo) { // Ignore outdated message @@ -1313,46 +1317,47 @@ void TTableInfo::UpdateShardStats(TShardIdx datashardIdx, TPartitionStats& newSt oldStats.RangeReadRows = 0; } - Stats.Aggregated.RowCount += (newStats.RowCount - oldStats.RowCount); - Stats.Aggregated.DataSize += (newStats.DataSize - oldStats.DataSize); - Stats.Aggregated.IndexSize += (newStats.IndexSize - oldStats.IndexSize); - Stats.Aggregated.LastAccessTime = Max(Stats.Aggregated.LastAccessTime, newStats.LastAccessTime); - Stats.Aggregated.LastUpdateTime = Max(Stats.Aggregated.LastUpdateTime, newStats.LastUpdateTime); - Stats.Aggregated.ImmediateTxCompleted += (newStats.ImmediateTxCompleted - oldStats.ImmediateTxCompleted); - Stats.Aggregated.PlannedTxCompleted += (newStats.PlannedTxCompleted - oldStats.PlannedTxCompleted); - Stats.Aggregated.TxRejectedByOverload += (newStats.TxRejectedByOverload - oldStats.TxRejectedByOverload); - Stats.Aggregated.TxRejectedBySpace += (newStats.TxRejectedBySpace - oldStats.TxRejectedBySpace); - Stats.Aggregated.InFlightTxCount += (newStats.InFlightTxCount - oldStats.InFlightTxCount); + Aggregated.RowCount += (newStats.RowCount - oldStats.RowCount); + Aggregated.DataSize += (newStats.DataSize - oldStats.DataSize); + Aggregated.IndexSize += (newStats.IndexSize - oldStats.IndexSize); + Aggregated.LastAccessTime = Max(Aggregated.LastAccessTime, newStats.LastAccessTime); + Aggregated.LastUpdateTime = Max(Aggregated.LastUpdateTime, newStats.LastUpdateTime); + Aggregated.ImmediateTxCompleted += (newStats.ImmediateTxCompleted - oldStats.ImmediateTxCompleted); + Aggregated.PlannedTxCompleted += (newStats.PlannedTxCompleted - oldStats.PlannedTxCompleted); + Aggregated.TxRejectedByOverload += (newStats.TxRejectedByOverload - oldStats.TxRejectedByOverload); + Aggregated.TxRejectedBySpace += (newStats.TxRejectedBySpace - oldStats.TxRejectedBySpace); + Aggregated.InFlightTxCount += (newStats.InFlightTxCount - oldStats.InFlightTxCount); - Stats.Aggregated.RowUpdates += (newStats.RowUpdates - oldStats.RowUpdates); - Stats.Aggregated.RowDeletes += (newStats.RowDeletes - oldStats.RowDeletes); - Stats.Aggregated.RowReads += (newStats.RowReads - oldStats.RowReads); - Stats.Aggregated.RangeReads += (newStats.RangeReads - oldStats.RangeReads); - Stats.Aggregated.RangeReadRows += (newStats.RangeReadRows - oldStats.RangeReadRows); + Aggregated.RowUpdates += (newStats.RowUpdates - oldStats.RowUpdates); + Aggregated.RowDeletes += (newStats.RowDeletes - oldStats.RowDeletes); + Aggregated.RowReads += (newStats.RowReads - oldStats.RowReads); + Aggregated.RangeReads += (newStats.RangeReads - oldStats.RangeReads); + Aggregated.RangeReadRows += (newStats.RangeReadRows - oldStats.RangeReadRows); i64 cpuUsageDelta = newStats.GetCurrentRawCpuUsage() - oldStats.GetCurrentRawCpuUsage(); - i64 prevCpuUsage = Stats.Aggregated.GetCurrentRawCpuUsage(); + i64 prevCpuUsage = Aggregated.GetCurrentRawCpuUsage(); ui64 newAggregatedCpuUsage = std::max<i64>(0, prevCpuUsage + cpuUsageDelta); TInstant now = AppData()->TimeProvider->Now(); - Stats.Aggregated.SetCurrentRawCpuUsage(newAggregatedCpuUsage, now); - Stats.Aggregated.Memory += (newStats.Memory - oldStats.Memory); - Stats.Aggregated.Network += (newStats.Network - oldStats.Network); - Stats.Aggregated.Storage += (newStats.Storage - oldStats.Storage); - Stats.Aggregated.ReadThroughput += (newStats.ReadThroughput - oldStats.ReadThroughput); - Stats.Aggregated.WriteThroughput += (newStats.WriteThroughput - oldStats.WriteThroughput); - Stats.Aggregated.ReadIops += (newStats.ReadIops - oldStats.ReadIops); - Stats.Aggregated.WriteIops += (newStats.WriteIops - oldStats.WriteIops); - - newStats.SaveCpuUsageHistory(oldStats); + Aggregated.SetCurrentRawCpuUsage(newAggregatedCpuUsage, now); + Aggregated.Memory += (newStats.Memory - oldStats.Memory); + Aggregated.Network += (newStats.Network - oldStats.Network); + Aggregated.Storage += (newStats.Storage - oldStats.Storage); + Aggregated.ReadThroughput += (newStats.ReadThroughput - oldStats.ReadThroughput); + Aggregated.WriteThroughput += (newStats.WriteThroughput - oldStats.WriteThroughput); + Aggregated.ReadIops += (newStats.ReadIops - oldStats.ReadIops); + Aggregated.WriteIops += (newStats.WriteIops - oldStats.WriteIops); + + auto topUsage = oldStats.TopUsage.Update(newStats.TopUsage); oldStats = newStats; - Stats.PartitionStatsUpdated++; + oldStats.TopUsage = std::move(topUsage); + PartitionStatsUpdated++; // Rescan stats for aggregations only once in a while - if (Stats.PartitionStatsUpdated >= Stats.PartitionStats.size()) { - Stats.PartitionStatsUpdated = 0; - Stats.Aggregated.TxCompleteLag = TDuration(); - for (const auto& ps : Stats.PartitionStats) { - Stats.Aggregated.TxCompleteLag = Max(Stats.Aggregated.TxCompleteLag, ps.second.TxCompleteLag); + if (PartitionStatsUpdated >= PartitionStats.size()) { + PartitionStatsUpdated = 0; + Aggregated.TxCompleteLag = TDuration(); + for (const auto& ps : PartitionStats) { + Aggregated.TxCompleteLag = Max(Aggregated.TxCompleteLag, ps.second.TxCompleteLag); } } } diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 1d649daa7a..09a7a9d096 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -185,6 +185,121 @@ struct TPartitionConfigMerger { }; +struct TPartitionStats { + // Latest timestamps when CPU usage exceeded 2%, 5%, 10%, 20%, 30% + struct TTopUsage { + TInstant Last2PercentLoad; + TInstant Last5PercentLoad; + TInstant Last10PercentLoad; + TInstant Last20PercentLoad; + TInstant Last30PercentLoad; + + const TTopUsage& Update(const TTopUsage& usage) { + Last2PercentLoad = std::max(Last2PercentLoad, usage.Last2PercentLoad); + Last5PercentLoad = std::max(Last5PercentLoad, usage.Last5PercentLoad); + Last10PercentLoad = std::max(Last10PercentLoad, usage.Last10PercentLoad); + Last20PercentLoad = std::max(Last20PercentLoad, usage.Last20PercentLoad); + Last30PercentLoad = std::max(Last30PercentLoad, usage.Last30PercentLoad); + return *this; + } + }; + + TMessageSeqNo SeqNo; + + ui64 RowCount = 0; + ui64 DataSize = 0; + ui64 IndexSize = 0; + + TInstant LastAccessTime; + TInstant LastUpdateTime; + TDuration TxCompleteLag; + + ui64 ImmediateTxCompleted = 0; + ui64 PlannedTxCompleted = 0; + ui64 TxRejectedByOverload = 0; + ui64 TxRejectedBySpace = 0; + ui64 InFlightTxCount = 0; + + ui64 RowUpdates = 0; + ui64 RowDeletes = 0; + ui64 RowReads = 0; + ui64 RangeReads = 0; + ui64 RangeReadRows = 0; + + ui64 Memory = 0; + ui64 Network = 0; + ui64 Storage = 0; + ui64 ReadThroughput = 0; + ui64 WriteThroughput = 0; + ui64 ReadIops = 0; + ui64 WriteIops = 0; + + THashSet<TTabletId> PartOwners; + ui64 PartCount = 0; + ui64 SearchHeight = 0; + ui64 FullCompactionTs = 0; + ui64 MemDataSize = 0; + ui32 ShardState = NKikimrTxDataShard::Unknown; + + // True when PartOwners has parts from other tablets + bool HasBorrowedData = false; + + // True when lent parts to other tablets + bool HasLoanedData = false; + + // Tablet actor started at + TInstant StartTime; + + TTopUsage TopUsage; + + void SetCurrentRawCpuUsage(ui64 rawCpuUsage, TInstant now) { + CPU = rawCpuUsage; + float percent = rawCpuUsage * 0.000001 * 100; + if (percent >= 2) + TopUsage.Last2PercentLoad = now; + if (percent >= 5) + TopUsage.Last5PercentLoad = now; + if (percent >= 10) + TopUsage.Last10PercentLoad = now; + if (percent >= 20) + TopUsage.Last20PercentLoad = now; + if (percent >= 30) + TopUsage.Last30PercentLoad = now; + } + + ui64 GetCurrentRawCpuUsage() const { + return CPU; + } + + float GetLatestMaxCpuUsagePercent(TInstant since) const { + // TODO: fix the case when stats were not collected yet + + if (TopUsage.Last30PercentLoad > since) + return 40; + if (TopUsage.Last20PercentLoad > since) + return 30; + if (TopUsage.Last10PercentLoad > since) + return 20; + if (TopUsage.Last5PercentLoad > since) + return 10; + if (TopUsage.Last2PercentLoad > since) + return 5; + + return 2; + } + +private: + ui64 CPU = 0; +}; + +struct TAggregatedStats { + TPartitionStats Aggregated; + THashMap<TShardIdx, TPartitionStats> PartitionStats; + size_t PartitionStatsUpdated = 0; + + void UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats); +}; + struct TSubDomainInfo; struct TTableInfo : public TSimpleRefCount<TTableInfo> { @@ -228,114 +343,6 @@ struct TTableInfo : public TSimpleRefCount<TTableInfo> { ui64 DataTotalSize; }; - struct TPartitionStats { - TMessageSeqNo SeqNo; - - ui64 RowCount = 0; - ui64 DataSize = 0; - ui64 IndexSize = 0; - - TInstant LastAccessTime; - TInstant LastUpdateTime; - TDuration TxCompleteLag; - - ui64 ImmediateTxCompleted = 0; - ui64 PlannedTxCompleted = 0; - ui64 TxRejectedByOverload = 0; - ui64 TxRejectedBySpace = 0; - ui64 InFlightTxCount = 0; - - ui64 RowUpdates = 0; - ui64 RowDeletes = 0; - ui64 RowReads = 0; - ui64 RangeReads = 0; - ui64 RangeReadRows = 0; - - ui64 Memory = 0; - ui64 Network = 0; - ui64 Storage = 0; - ui64 ReadThroughput = 0; - ui64 WriteThroughput = 0; - ui64 ReadIops = 0; - ui64 WriteIops = 0; - - THashSet<TTabletId> PartOwners; - ui64 PartCount = 0; - ui64 SearchHeight = 0; - ui64 FullCompactionTs = 0; - ui64 MemDataSize = 0; - ui32 ShardState = NKikimrTxDataShard::Unknown; - - // True when PartOwners has parts from other tablets - bool HasBorrowedData = false; - - // True when lent parts to other tablets - bool HasLoanedData = false; - - // Tablet actor started at - TInstant StartTime; - - void SetCurrentRawCpuUsage(ui64 rawCpuUsage, TInstant now) { - CPU = rawCpuUsage; - float percent = rawCpuUsage * 0.000001 * 100; - if (percent >= 2) - Last2PercentLoad = now; - if (percent >= 5) - Last5PercentLoad = now; - if (percent >= 10) - Last10PercentLoad = now; - if (percent >= 20) - Last20PercentLoad = now; - if (percent >= 30) - Last30PercentLoad = now; - } - - void SaveCpuUsageHistory(const TPartitionStats& oldStats) { - Last2PercentLoad = std::max(Last2PercentLoad, oldStats.Last2PercentLoad); - Last5PercentLoad = std::max(Last5PercentLoad, oldStats.Last5PercentLoad); - Last10PercentLoad = std::max(Last10PercentLoad, oldStats.Last10PercentLoad); - Last20PercentLoad = std::max(Last20PercentLoad, oldStats.Last20PercentLoad); - Last30PercentLoad = std::max(Last30PercentLoad, oldStats.Last30PercentLoad); - } - - ui64 GetCurrentRawCpuUsage() const { - return CPU; - } - - float GetLatestMaxCpuUsagePercent(TInstant since) const { - // TODO: fix the case when stats were not collected yet - - if (Last30PercentLoad > since) - return 40; - if (Last20PercentLoad > since) - return 30; - if (Last10PercentLoad > since) - return 20; - if (Last5PercentLoad > since) - return 10; - if (Last2PercentLoad > since) - return 5; - - return 2; - } - - private: - ui64 CPU = 0; - - // Latest timestamps when CPU usage exceeded 2%, 5%, 10%, 20%, 30% - TInstant Last2PercentLoad; - TInstant Last5PercentLoad; - TInstant Last10PercentLoad; - TInstant Last20PercentLoad; - TInstant Last30PercentLoad; - }; - - struct TStats { - TPartitionStats Aggregated; - THashMap<TShardIdx, TPartitionStats> PartitionStats; - size_t PartitionStatsUpdated = 0; - }; - struct TAlterTableInfo : TSimpleRefCount<TAlterTableInfo> { using TPtr = TIntrusivePtr<TAlterTableInfo>; @@ -454,7 +461,7 @@ private: THashMap<TOperationId, TVector<TShardIdx>> ShardsInSplitMergeByOpId; THashMap<TShardIdx, TOperationId> ShardsInSplitMergeByShards; ui64 ExpectedPartitionCount = 0; // number of partitions after all in-flight splits/merges are finished - TStats Stats; + TAggregatedStats Stats; bool ShardsStatsDetached = false; TPartitionsVec::iterator FindPartition(const TShardIdx& shardIdx) { @@ -545,7 +552,7 @@ public: return Partitions; } - const TStats& GetStats() const { + const TAggregatedStats& GetStats() const { return Stats; } @@ -556,7 +563,7 @@ public: ShardsStatsDetached = true; } - void UpdateShardStats(TShardIdx datashardIdx, TPartitionStats& newStats); + void UpdateShardStats(TShardIdx datashardIdx, const TPartitionStats& newStats); void RegisterSplitMegreOp(TOperationId txId, const TTxState& txState); @@ -840,17 +847,25 @@ struct TOlapStoreInfo : TSimpleRefCount<TOlapStoreInfo> { TVector<TShardIdx> ColumnShards; THashMap<ui32, TOlapStoreSchemaPreset> SchemaPresets; - //THashMap<ui32, TOlapStoreTtlSettingsPreset> TtlSettingsPresets; THashMap<TString, ui32> SchemaPresetByName; - THashMap<TString, ui32> TtlSettingsPresetByName; THashSet<TPathId> ColumnTables; THashSet<TPathId> ColumnTablesUnderOperation; + TAggregatedStats Stats; TOlapStoreInfo() = default; TOlapStoreInfo(ui64 alterVersion, NKikimrSchemeOp::TColumnStoreDescription&& description, NKikimrSchemeOp::TColumnStoreSharding&& sharding, TMaybe<NKikimrSchemeOp::TAlterColumnStore>&& alterBody = Nothing()); + + const TAggregatedStats& GetStats() const { + return Stats; + } + + void UpdateShardStats(TShardIdx shardIdx, const TPartitionStats& newStats) { + Stats.PartitionStats[shardIdx]; // insert if none + Stats.UpdateShardStats(shardIdx, newStats); + } }; struct TColumnTableInfo : TSimpleRefCount<TColumnTableInfo> { @@ -1695,7 +1710,7 @@ struct TSubDomainInfo: TSimpleRefCount<TSubDomainInfo> { CoordinatorSelector = new TCoordinators(ProcessingParams); } - void AggrDiskSpaceUsage(IQuotaCounters* counters, const TTableInfo::TPartitionStats& newAggr, const TTableInfo::TPartitionStats& oldAggr = TTableInfo::TPartitionStats()) { + void AggrDiskSpaceUsage(IQuotaCounters* counters, const TPartitionStats& newAggr, const TPartitionStats& oldAggr = {}) { DiskSpaceUsage.Tables.DataSize += (newAggr.DataSize - oldAggr.DataSize); counters->ChangeDiskSpaceTablesDataBytes(newAggr.DataSize - oldAggr.DataSize); diff --git a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp index cb6d72a5ed..84e3260de9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path_describer.cpp @@ -9,7 +9,7 @@ namespace NKikimr { namespace NSchemeShard { -static void FillTableStats(NKikimrTableStats::TTableStats* stats, const TTableInfo::TPartitionStats& tableStats) { +static void FillTableStats(NKikimrTableStats::TTableStats* stats, const TPartitionStats& tableStats) { stats->SetRowCount(tableStats.RowCount); stats->SetDataSize(tableStats.DataSize); stats->SetIndexSize(tableStats.IndexSize); @@ -31,7 +31,7 @@ static void FillTableStats(NKikimrTableStats::TTableStats* stats, const TTableIn stats->SetPartCount(tableStats.PartCount); } -static void FillTableMetrics(NKikimrTabletBase::TMetrics* metrics, const TTableInfo::TPartitionStats& tableStats) { +static void FillTableMetrics(NKikimrTabletBase::TMetrics* metrics, const TPartitionStats& tableStats) { metrics->SetCPU(tableStats.GetCurrentRawCpuUsage()); metrics->SetMemory(tableStats.Memory); metrics->SetNetwork(tableStats.Network); @@ -42,6 +42,11 @@ static void FillTableMetrics(NKikimrTabletBase::TMetrics* metrics, const TTableI metrics->SetWriteIops(tableStats.WriteIops); } +static void FillAggregatedStats(NKikimrSchemeOp::TPathDescription& pathDescription, const TAggregatedStats& stats) { + FillTableStats(pathDescription.MutableTableStats(), stats.Aggregated); + FillTableMetrics(pathDescription.MutableTabletMetrics(), stats.Aggregated); +} + void TPathDescriber::FillPathDescr(NKikimrSchemeOp::TDirEntry* descr, TPathElement::TPtr pathEl, TPathElement::EPathSubType subType) { FillChildDescr(descr, pathEl); @@ -233,17 +238,7 @@ void TPathDescriber::DescribeTable(const TActorContext& ctx, TPathId pathId, TPa } } - const auto& tableStats(tableInfo->GetStats().Aggregated); - - { - auto* stats = Result->Record.MutablePathDescription()->MutableTableStats(); - FillTableStats(stats, tableStats); - } - - { - auto* metrics = Result->Record.MutablePathDescription()->MutableTabletMetrics(); - FillTableMetrics(metrics, tableStats); - } + FillAggregatedStats(*Result->Record.MutablePathDescription(), tableInfo->GetStats()); if (returnPartitionStats) { NKikimrSchemeOp::TPathDescription& pathDescription = *Result->Record.MutablePathDescription(); @@ -362,6 +357,8 @@ void TPathDescriber::DescribeOlapStore(TPathId pathId, TPathElement::TPtr pathEl Y_VERIFY(shardInfo, "ColumnShard not found"); description->AddColumnShards(shardInfo->TabletID.GetValue()); } + + FillAggregatedStats(*Result->Record.MutablePathDescription(), storeInfo->GetStats()); } void TPathDescriber::DescribeColumnTable(TPathId pathId, TPathElement::TPtr pathEl) { diff --git a/ydb/core/tx/schemeshard/ut_compaction.cpp b/ydb/core/tx/schemeshard/ut_compaction.cpp index 4da2c1e22b..6cec84ff8c 100644 --- a/ydb/core/tx/schemeshard/ut_compaction.cpp +++ b/ydb/core/tx/schemeshard/ut_compaction.cpp @@ -19,7 +19,7 @@ using TTableInfoMap = THashMap<TString, NKikimrTxDataShard::TEvGetInfoResponse:: TShardCompactionInfo MakeCompactionInfo(ui64 idx, ui64 ts, ui64 sh = 0, ui64 d = 0) { TShardIdx shardId = TShardIdx(1, idx); - TTableInfo::TPartitionStats stats; + TPartitionStats stats; stats.FullCompactionTs = ts; stats.SearchHeight = sh; stats.RowDeletes = d; @@ -1090,7 +1090,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { config.SearchHeightThreshold = 0; config.RowDeletesThreshold = 0; - TTableInfo::TPartitionStats stats; // all zeros + TPartitionStats stats; // all zeros TCompactionQueueImpl queue(config); UNIT_ASSERT(!queue.Enqueue({ShardIdx, stats})); @@ -1102,7 +1102,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { config.SearchHeightThreshold = 0; config.RowDeletesThreshold = 0; - TTableInfo::TPartitionStats stats; + TPartitionStats stats; stats.RowCount = 10; stats.RowDeletes = 100; stats.SearchHeight = 1; // below threshold @@ -1119,7 +1119,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { config.RowDeletesThreshold = 0; config.CompactSinglePartedShards = true; // turn on - TTableInfo::TPartitionStats stats; + TPartitionStats stats; stats.RowCount = 10; stats.RowDeletes = 100; stats.SearchHeight = 1; // below threshold @@ -1135,7 +1135,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { config.SearchHeightThreshold = 10; config.RowDeletesThreshold = 0; - TTableInfo::TPartitionStats stats; + TPartitionStats stats; stats.RowCount = 10; stats.RowDeletes = 100; stats.SearchHeight = 1; // below threshold @@ -1152,7 +1152,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { config.SearchHeightThreshold = 10; config.RowDeletesThreshold = 10; - TTableInfo::TPartitionStats stats; + TPartitionStats stats; stats.RowCount = 10; stats.RowDeletes = 100; stats.SearchHeight = 3; @@ -1170,7 +1170,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { config.SearchHeightThreshold = 10; config.RowDeletesThreshold = 10; - TTableInfo::TPartitionStats stats; + TPartitionStats stats; stats.RowCount = 10; stats.RowDeletes = 1; stats.SearchHeight = 20; @@ -1189,7 +1189,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { config.RowDeletesThreshold = 10; config.RowCountThreshold = 1; - TTableInfo::TPartitionStats stats; + TPartitionStats stats; stats.RowCount = 0; stats.RowDeletes = 1; stats.SearchHeight = 20; @@ -1233,7 +1233,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 8); // remove from LastCompaction, active queue should not change - UNIT_ASSERT(queue.Remove({TShardIdx(1, 0), TTableInfo::TPartitionStats()})); + UNIT_ASSERT(queue.Remove({TShardIdx(1, 0), TPartitionStats()})); UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 7UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 2UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 2UL); @@ -1249,7 +1249,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 2); // remove1 from BySearchHeight (active queue should not change) - UNIT_ASSERT(queue.Remove({TShardIdx(1, 4), TTableInfo::TPartitionStats()})); + UNIT_ASSERT(queue.Remove({TShardIdx(1, 4), TPartitionStats()})); UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 5UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 1UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 2UL); @@ -1257,7 +1257,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1); // remove2 from BySearchHeight, ByRowDeletes is active now - UNIT_ASSERT(queue.Remove({TShardIdx(1, 5), TTableInfo::TPartitionStats()})); + UNIT_ASSERT(queue.Remove({TShardIdx(1, 5), TPartitionStats()})); UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 4UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 2UL); @@ -1265,7 +1265,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 2); // remove1 from ByRowDeletes - UNIT_ASSERT(queue.Remove({TShardIdx(1, 6), TTableInfo::TPartitionStats()})); + UNIT_ASSERT(queue.Remove({TShardIdx(1, 6), TPartitionStats()})); UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 3UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 1UL); @@ -1273,7 +1273,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1); // remove2 from ByRowDeletes - UNIT_ASSERT(queue.Remove({TShardIdx(1, 7), TTableInfo::TPartitionStats()})); + UNIT_ASSERT(queue.Remove({TShardIdx(1, 7), TPartitionStats()})); UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 2UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL); @@ -1281,7 +1281,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 2); // remove1 from LastCompaction - UNIT_ASSERT(queue.Remove({TShardIdx(1, 2), TTableInfo::TPartitionStats()})); + UNIT_ASSERT(queue.Remove({TShardIdx(1, 2), TPartitionStats()})); UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 1UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL); @@ -1289,7 +1289,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1); // remove2 from LastCompaction - UNIT_ASSERT(queue.Remove({TShardIdx(1, 3), TTableInfo::TPartitionStats()})); + UNIT_ASSERT(queue.Remove({TShardIdx(1, 3), TPartitionStats()})); UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 0UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL); @@ -1319,7 +1319,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1); // remove from BySearchHeight - UNIT_ASSERT(queue.Remove({TShardIdx(1, 4), TTableInfo::TPartitionStats()})); + UNIT_ASSERT(queue.Remove({TShardIdx(1, 4), TPartitionStats()})); UNIT_ASSERT_VALUES_EQUAL(queue.Size(), 2UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeBySearchHeight(), 0UL); UNIT_ASSERT_VALUES_EQUAL(queue.SizeByRowDeletes(), 0UL); @@ -1387,7 +1387,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueType(), TCompactionQueueImpl::EActiveQueue::BySearchHeight); UNIT_ASSERT_VALUES_EQUAL(queue.ActiveQueueSize(), 1); - TTableInfo::TPartitionStats statsBelow; + TPartitionStats statsBelow; statsBelow.RowDeletes = 100; statsBelow.FullCompactionTs = 4; statsBelow.SearchHeight = 1; // below threshold @@ -1415,7 +1415,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { // Now check transition from BySearchHeight to LastCompaction, i.e. empty RowDeletes // step1: populate w with item - TTableInfo::TPartitionStats statsSh; + TPartitionStats statsSh; statsSh.FullCompactionTs = 4; statsSh.SearchHeight = 100; // above threshold statsSh.RowDeletes = 1; // below threshold @@ -1447,7 +1447,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { // check ByLastCompaction -> ByRowDeletes, i.e. empty BySearchHeight // step1: populate ByRowDeletes with item - TTableInfo::TPartitionStats statsDel; + TPartitionStats statsDel; statsDel.FullCompactionTs = 5; statsDel.SearchHeight = 1; // below threshold statsDel.RowDeletes = 100; // above threshold @@ -1475,7 +1475,7 @@ Y_UNIT_TEST_SUITE(TSchemeshardCompactionQueueTest) { config.SearchHeightThreshold = 10; config.RowDeletesThreshold = 10; - TTableInfo::TPartitionStats stats; + TPartitionStats stats; stats.RowCount = 10; stats.RowDeletes = 1000; stats.SearchHeight = 20; diff --git a/ydb/core/tx/schemeshard/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap.cpp index 53e0c22249..e5899f5272 100644 --- a/ydb/core/tx/schemeshard/ut_olap.cpp +++ b/ydb/core/tx/schemeshard/ut_olap.cpp @@ -1,10 +1,131 @@ #include <ydb/core/tx/schemeshard/ut_helpers/helpers.h> +#include <ydb/core/tx/columnshard/columnshard.h> +#include <ydb/core/formats/arrow_helpers.h> +#include <ydb/core/formats/arrow_batch_builder.h> using namespace NKikimr::NSchemeShard; using namespace NKikimr; using namespace NKikimrSchemeOp; using namespace NSchemeShardUT_Private; +namespace NKikimr { +namespace { + +namespace NTypeIds = NScheme::NTypeIds; +using TTypeId = NScheme::TTypeId; + +static const TString defaultStoreSchema = R"( + Name: "OlapStore" + ColumnShardCount: 1 + SchemaPresets { + Name: "default" + Schema { + Columns { Name: "timestamp" Type: "Timestamp" } + Columns { Name: "data" Type: "Utf8" } + KeyColumnNames: "timestamp" + Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + } + } +)"; + +static const TVector<std::pair<TString, TTypeId>> defaultYdbSchema = { + {"timestamp", NTypeIds::Timestamp }, + {"data", NTypeIds::Utf8 } +}; + +TString MakeTestBlob(std::pair<ui64, ui64> range) { + TString err; + NArrow::TArrowBatchBuilder batchBuilder(arrow::Compression::LZ4_FRAME); + batchBuilder.Start(defaultYdbSchema, 0, 0, err); + + TString str; + TVector<TTypeId> types = {NTypeIds::Timestamp, NTypeIds::Utf8}; + + for (size_t i = range.first; i < range.second; ++i) { + str = ToString(i); + + TVector<TCell> cells; + cells.push_back(TCell::Make<ui64>(i)); + cells.push_back(TCell(str.data(), str.size())); + + NKikimr::TDbTupleRef unused; + batchBuilder.AddRow(unused, NKikimr::TDbTupleRef(types.data(), cells.data(), 2)); + } + + auto batch = batchBuilder.FlushBatch(true); + UNIT_ASSERT(batch); + auto status = batch->ValidateFull(); + UNIT_ASSERT(status.ok()); + + return batchBuilder.Finish(); +} + +static constexpr ui64 txInitiator = 42; // 0 means LongTx, we need another value here + +void WriteData(TTestBasicRuntime& runtime, TActorId sender, ui64 tabletId, ui64 pathId, ui64 writeId, TString data) { + const TString dedupId = ToString(writeId); + + auto evWrite = std::make_unique<TEvColumnShard::TEvWrite>(sender, txInitiator, writeId, pathId, dedupId, data); + + ForwardToTablet(runtime, tabletId, sender, evWrite.release()); + TAutoPtr<IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvWriteResult>(handle); + UNIT_ASSERT(event); + + auto& resWrite = Proto(event); + UNIT_ASSERT_EQUAL(resWrite.GetOrigin(), tabletId); + UNIT_ASSERT_EQUAL(resWrite.GetTxInitiator(), txInitiator); + UNIT_ASSERT_EQUAL(resWrite.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); +} + +void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 tabletId, ui64 txId, const TVector<ui64>& writeIds) { + NKikimrTxColumnShard::ETransactionKind txKind = NKikimrTxColumnShard::ETransactionKind::TX_KIND_COMMIT; + TString txBody; + { + NKikimrTxColumnShard::TCommitTxBody proto; + proto.SetTxInitiator(txInitiator); + for (ui64 id : writeIds) { + proto.AddWriteIds(id); + } + + Y_PROTOBUF_SUPPRESS_NODISCARD proto.SerializeToString(&txBody); + } + + ForwardToTablet(runtime, tabletId, sender, + new TEvColumnShard::TEvProposeTransaction(txKind, sender, txId, txBody)); + TAutoPtr<IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle); + UNIT_ASSERT(event); + + auto& res = Proto(event); + UNIT_ASSERT_EQUAL(res.GetTxKind(), txKind); + UNIT_ASSERT_EQUAL(res.GetTxId(), txId); + UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::PREPARED); +} + +void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 tabletId, ui64 planStep, const TSet<ui64>& txIds) { + auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(planStep, txInitiator, tabletId); + for (ui64 txId : txIds) { + auto tx = plan->Record.AddTransactions(); + tx->SetTxId(txId); + ActorIdToProto(sender, tx->MutableAckTo()); + } + + ForwardToTablet(runtime, tabletId, sender, plan.release()); + TAutoPtr<IEventHandle> handle; + + for (ui32 i = 0; i < txIds.size(); ++i) { + auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(handle); + UNIT_ASSERT(event); + + auto& res = Proto(event); + UNIT_ASSERT(txIds.count(res.GetTxId())); + UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); + } +} + +}} + Y_UNIT_TEST_SUITE(TOlap) { Y_UNIT_TEST(CreateStore) { @@ -12,19 +133,7 @@ Y_UNIT_TEST_SUITE(TOlap) { TTestEnv env(runtime, TTestEnvOptions().EnableOlapSchemaOperations(true)); ui64 txId = 100; - TString olapSchema = R"( - Name: "OlapStore" - ColumnShardCount: 1 - SchemaPresets { - Name: "default" - Schema { - Columns { Name: "timestamp" Type: "Timestamp" } - Columns { Name: "data" Type: "Utf8" } - KeyColumnNames: "timestamp" - Engine: COLUMN_ENGINE_REPLACING_TIMESERIES - } - } - )"; + const TString& olapSchema = defaultStoreSchema; TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema); env.TestWaitNotification(runtime, txId); @@ -78,19 +187,7 @@ Y_UNIT_TEST_SUITE(TOlap) { TTestEnv env(runtime, TTestEnvOptions().EnableOlapSchemaOperations(true)); ui64 txId = 100; - TString olapSchema = R"( - Name: "OlapStore" - ColumnShardCount: 1 - SchemaPresets { - Name: "default" - Schema { - Columns { Name: "timestamp" Type: "Timestamp" } - Columns { Name: "data" Type: "Utf8" } - KeyColumnNames: "timestamp" - Engine: COLUMN_ENGINE_REPLACING_TIMESERIES - } - } - )"; + const TString& olapSchema = defaultStoreSchema; TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema); env.TestWaitNotification(runtime, txId); @@ -220,19 +317,9 @@ Y_UNIT_TEST_SUITE(TOlap) { TTestEnv env(runtime, TTestEnvOptions().EnableOlapSchemaOperations(true)); ui64 txId = 100; - TestCreateOlapStore(runtime, ++txId, "/MyRoot", R"( - Name: "OlapStore" - ColumnShardCount: 1 - SchemaPresets { - Name: "default" - Schema { - Columns { Name: "timestamp" Type: "Timestamp" } - Columns { Name: "data" Type: "Utf8" } - KeyColumnNames: "timestamp" - Engine: COLUMN_ENGINE_REPLACING_TIMESERIES - } - } - )"); + const TString& olapSchema = defaultStoreSchema; + + TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema); env.TestWaitNotification(runtime, txId); TestCreateColumnTable(runtime, ++txId, "/MyRoot/OlapStore", R"( @@ -251,19 +338,7 @@ Y_UNIT_TEST_SUITE(TOlap) { TTestEnv env(runtime, TTestEnvOptions().EnableOlapSchemaOperations(true)); ui64 txId = 100; - TString olapSchema = R"( - Name: "OlapStore" - ColumnShardCount: 1 - SchemaPresets { - Name: "default" - Schema { - Columns { Name: "timestamp" Type: "Timestamp" } - Columns { Name: "data" Type: "Utf8" } - KeyColumnNames: "timestamp" - Engine: COLUMN_ENGINE_REPLACING_TIMESERIES - } - } - )"; + const TString& olapSchema = defaultStoreSchema; TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema); env.TestWaitNotification(runtime, txId); @@ -396,19 +471,7 @@ Y_UNIT_TEST_SUITE(TOlap) { TTestEnv env(runtime, TTestEnvOptions().EnableOlapSchemaOperations(true)); ui64 txId = 100; - TString olapSchema = R"( - Name: "OlapStore" - ColumnShardCount: 1 - SchemaPresets { - Name: "default" - Schema { - Columns { Name: "timestamp" Type: "Timestamp" } - Columns { Name: "data" Type: "Utf8" } - KeyColumnNames: "timestamp" - Engine: COLUMN_ENGINE_REPLACING_TIMESERIES - } - } - )"; + const TString& olapSchema = defaultStoreSchema; TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema); env.TestWaitNotification(runtime, txId); @@ -543,4 +606,102 @@ Y_UNIT_TEST_SUITE(TOlap) { // negatives for store: disallow alters // negatives for table: wrong tiers count, wrong tiers, wrong eviction column, wrong eviction values, // different TTL columns in tiers + + Y_UNIT_TEST(StoreStats) { + TTestBasicRuntime runtime; + TTestEnv env(runtime, TTestEnvOptions().EnableOlapSchemaOperations(true)); + runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + runtime.UpdateCurrentTime(TInstant::Now() - TDuration::Seconds(600)); + + // disable stats batching + auto& appData = runtime.GetAppData(); + appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0); + appData.SchemeShardConfig.SetStatsMaxBatchSize(0); + + // apply config via reboot + TActorId sender = runtime.AllocateEdgeActor(); + GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender); + + ui64 txId = 100; + + const TString& olapSchema = defaultStoreSchema; + + TestCreateOlapStore(runtime, ++txId, "/MyRoot", olapSchema); + env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/OlapStore", false, NLs::PathExist); + TestLsPathId(runtime, 2, NLs::PathStringEqual("/MyRoot/OlapStore")); + + TString tableSchema = R"( + Name: "ColumnTable" + )"; + + TestCreateColumnTable(runtime, ++txId, "/MyRoot/OlapStore", tableSchema); + env.TestWaitNotification(runtime, txId); + + ui64 pathId = 0; + ui64 shardId = 0; + ui64 planStep = 0; + auto checkFn = [&](const NKikimrScheme::TEvDescribeSchemeResult& record) { + auto& self = record.GetPathDescription().GetSelf(); + pathId = self.GetPathId(); + txId = self.GetCreateTxId() + 1; + planStep = self.GetCreateStep(); + auto& sharding = record.GetPathDescription().GetColumnTableDescription().GetSharding(); + UNIT_ASSERT_VALUES_EQUAL(sharding.ColumnShardsSize(), 1); + shardId = sharding.GetColumnShards()[0]; + UNIT_ASSERT_VALUES_EQUAL(record.GetPath(), "/MyRoot/OlapStore/ColumnTable"); + }; + + TestLsPathId(runtime, 3, checkFn); + UNIT_ASSERT(shardId); + UNIT_ASSERT(pathId); + UNIT_ASSERT(planStep); + + ui32 rowsInBatch = 100000; + + { // Write data directly into shard + TActorId sender = runtime.AllocateEdgeActor(); + TString data = MakeTestBlob({0, rowsInBatch}); + + ui64 writeId = 0; + + TSet<ui64> txIds; + for (ui32 i = 0; i < 10; ++i) { + WriteData(runtime, sender, shardId, pathId, ++writeId, data); + ProposeCommit(runtime, sender, shardId, ++txId, {writeId}); + txIds.insert(txId); + } + + PlanCommit(runtime, sender, shardId, ++planStep, txIds); + + // emulate timeout + runtime.UpdateCurrentTime(TInstant::Now()); + + // trigger periodic stats at shard (after timeout) + WriteData(runtime, sender, shardId, pathId, ++writeId, data); + ProposeCommit(runtime, sender, shardId, ++txId, {writeId}); + txIds = {txId}; + PlanCommit(runtime, sender, shardId, ++planStep, txIds); + } + + auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/OlapStore", true, true); + auto& tabletStats = description.GetPathDescription().GetTableStats(); + + UNIT_ASSERT_GT(tabletStats.GetRowCount(), 0); + UNIT_ASSERT_GT(tabletStats.GetDataSize(), 0); +#if 0 + TestDropColumnTable(runtime, ++txId, "/MyRoot/OlapStore", "ColumnTable"); + env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/OlapStore/ColumnTable", false, NLs::PathNotExist); + TestLsPathId(runtime, 3, NLs::PathStringEqual("")); + + TestDropOlapStore(runtime, ++txId, "/MyRoot", "OlapStore"); + env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/OlapStore", false, NLs::PathNotExist); + TestLsPathId(runtime, 2, NLs::PathStringEqual("")); +#endif + } } diff --git a/ydb/core/tx/schemeshard/ut_olap/ya.make b/ydb/core/tx/schemeshard/ut_olap/ya.make index e093cd5b21..d4dbdc98d0 100644 --- a/ydb/core/tx/schemeshard/ut_olap/ya.make +++ b/ydb/core/tx/schemeshard/ut_olap/ya.make @@ -23,6 +23,7 @@ PEERDIR( library/cpp/regex/pcre library/cpp/svnversion ydb/core/testlib + ydb/core/formats ydb/core/tx ydb/core/tx/columnshard ydb/core/tx/schemeshard/ut_helpers |