diff options
author | Artem Zuikov <chertus@gmail.com> | 2022-07-01 20:47:08 +0300 |
---|---|---|
committer | Artem Zuikov <chertus@gmail.com> | 2022-07-01 20:47:08 +0300 |
commit | 1ec57b72c9b73737b49ededffee0f25ad5ebe395 (patch) | |
tree | 6c2ded94baeb60b184a23718d9c1f571229e3631 | |
parent | 736432ded7ac5ddb4c45990816b463912041acf3 (diff) | |
download | ydb-1ec57b72c9b73737b49ededffee0f25ad5ebe395.tar.gz |
KIKIMR-15038: pipe delivery problems
ref:af766a412891ac0bdcdb885abaf43807d9e2eea9
-rw-r--r-- | ydb/core/tx/columnshard/columnshard.cpp | 34 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__init.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__plan_step.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__progress_tx.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write_index.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__init.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 17 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.h | 1 |
13 files changed, 56 insertions, 32 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 0d1ab5dc496..dcd41a0c3e0 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -58,28 +58,31 @@ void TColumnShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TAc auto clientId = ev->Get()->ClientId; if (clientId == StatsReportPipe) { - if (ev->Get()->Status != NKikimrProto::OK) { - StatsReportPipe = TActorId(); + if (ev->Get()->Status == NKikimrProto::OK) { + LOG_S_DEBUG("Connected to " << tabletId << " at tablet " << TabletID()); + } else { + LOG_S_INFO("Failed to connect to " << tabletId << " at tablet " << TabletID()); + StatsReportPipe = {}; } return; } if (PipeClientCache->OnConnect(ev)) { - LOG_S_DEBUG("Connected to tablet at " << TabletID() << ", remote " << tabletId); + LOG_S_DEBUG("Connected to " << tabletId << " at tablet " << TabletID()); return; } - LOG_S_INFO("Failed to connect at " << TabletID() << ", remote " << tabletId); + LOG_S_INFO("Failed to connect to " << tabletId << " at tablet " << TabletID()); } void TColumnShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext&) { auto tabletId = ev->Get()->TabletId; auto clientId = ev->Get()->ClientId; - LOG_S_DEBUG("Client pipe reset at " << TabletID() << ", remote " << tabletId); + LOG_S_DEBUG("Client pipe reset to " << tabletId << " at tablet " << TabletID()); if (clientId == StatsReportPipe) { - StatsReportPipe = TActorId(); + StatsReportPipe = {}; return; } @@ -87,13 +90,13 @@ void TColumnShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TAc } void TColumnShard::Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext&) { - auto tabletId = ev->Get()->TabletId; - LOG_S_DEBUG("Server pipe connected at tablet " << TabletID() << ", remote " << tabletId); + Y_UNUSED(ev); + LOG_S_DEBUG("Server pipe connected at tablet " << TabletID()); } void TColumnShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TActorContext&) { - auto tabletId = ev->Get()->TabletId; - LOG_S_DEBUG("Server pipe reset at tablet " << TabletID() << ", remote " << tabletId); + Y_UNUSED(ev); + LOG_S_DEBUG("Server pipe reset at tablet " << TabletID()); } void TColumnShard::Handle(TEvPrivate::TEvScanStats::TPtr& ev, const TActorContext &ctx) { @@ -128,6 +131,7 @@ void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorC SendWaitPlanStep(GetOutdatedStep()); } + SendPeriodicStats(); ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup()); } @@ -242,7 +246,7 @@ ui64 TColumnShard::MemoryUsage() const { return memory; } -void TColumnShard::UpdateResourceMetrics(const TUsage& usage) { +void TColumnShard::UpdateResourceMetrics(const TActorContext& ctx, const TUsage& usage) { auto * metrics = Executor()->GetResourceMetrics(); if (!metrics) { return; @@ -258,8 +262,7 @@ void TColumnShard::UpdateResourceMetrics(const TUsage& usage) { ui64 memory = MemoryUsage(); - const TActorContext& ctx = TlsActivationContext->AsActorContext(); - TInstant now = AppData(ctx)->TimeProvider->Now(); + TInstant now = TAppData::TimeProvider->Now(); metrics->CPU.Increment(usage.CPUExecTime, now); metrics->Network.Increment(usage.Network, now); //metrics->StorageSystem @@ -271,13 +274,14 @@ void TColumnShard::UpdateResourceMetrics(const TUsage& usage) { metrics->TryUpdate(ctx); } -void TColumnShard::SendPeriodicStats(const TActorContext& ctx) { +void TColumnShard::SendPeriodicStats() { if (!CurrentSchemeShardId || !StorePathId) { LOG_S_DEBUG("Disabled periodic stats at tablet " << TabletID()); return; } - TInstant now = AppData(ctx)->TimeProvider->Now(); + const TActorContext& ctx = TActivationContext::ActorContextFor(SelfId()); + TInstant now = TAppData::TimeProvider->Now(); if (LastStatsReport + StatsReportInterval > now) { return; } diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index a5f61dc2f04..cb6ca9a92b6 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -268,7 +268,6 @@ struct TEvColumnShard { }; using TEvScan = TEvDataShard::TEvKqpScan; - using TEvPeriodicTableStats = TEvDataShard::TEvPeriodicTableStats; }; inline auto& Proto(TEvColumnShard::TEvProposeTransaction* ev) { diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index cff96a16dba..89ac045f2b5 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -36,7 +36,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) // Load InsertTable TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - if (!Self->InsertTable->Load(dbTable, AppData(ctx)->TimeProvider->Now())) { + if (!Self->InsertTable->Load(dbTable, TAppData::TimeProvider->Now())) { return false; } @@ -284,7 +284,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) Self->UpdateInsertTableCounters(); Self->UpdateIndexCounters(); - Self->UpdateResourceMetrics({}); + Self->UpdateResourceMetrics(ctx, {}); return true; } @@ -313,7 +313,7 @@ void TTxInit::Complete(const TActorContext& ctx) { Self->TryRegisterMediatorTimeCast(); // Trigger progress: planned or outdated tx - Self->EnqueueProgressTx(); + Self->EnqueueProgressTx(ctx); Self->EnqueueBackgroundActivities(); // Start periodic wakeups diff --git a/ydb/core/tx/columnshard/columnshard__plan_step.cpp b/ydb/core/tx/columnshard/columnshard__plan_step.cpp index b34582ba7e7..ed8f1ea1be7 100644 --- a/ydb/core/tx/columnshard/columnshard__plan_step.cpp +++ b/ydb/core/tx/columnshard/columnshard__plan_step.cpp @@ -26,7 +26,7 @@ private: }; -bool TTxPlanStep::Execute(TTransactionContext& txc, const TActorContext&) { +bool TTxPlanStep::Execute(TTransactionContext& txc, const TActorContext& ctx) { Y_VERIFY(Ev); LOG_S_DEBUG("TTxPlanStep.Execute at tablet " << Self->TabletID()); @@ -93,7 +93,7 @@ bool TTxPlanStep::Execute(TTransactionContext& txc, const TActorContext&) { Self->IncCounter(COUNTER_PLAN_STEP_ACCEPTED); if (plannedCount > 0 || Self->HaveOutdatedTxs()) { - Self->EnqueueProgressTx(); + Self->EnqueueProgressTx(ctx); } return true; } diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index edb9a8e90df..baae719ab53 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -119,7 +119,7 @@ public: Self->ProgressTxInFlight = false; if (Self->PlanQueue) { - Self->EnqueueProgressTx(); + Self->EnqueueProgressTx(ctx); } return true; } @@ -146,10 +146,10 @@ private: bool StartBackgroundActivities{false}; }; -void TColumnShard::EnqueueProgressTx() { +void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) { if (!ProgressTxInFlight) { ProgressTxInFlight = true; - Execute(new TTxProgressTx(this), TlsActivationContext->AsActorContext()); + Execute(new TTxProgressTx(this), ctx); } } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index ae87df537d8..a918219f262 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -75,7 +75,7 @@ public: void Bootstrap(const TActorContext& ctx) { ScanActorId = ctx.SelfID; - TimeoutActorId = CreateLongTimer(TlsActivationContext->AsActorContext(), Deadline - TInstant::Now(), + TimeoutActorId = CreateLongTimer(ctx, Deadline - TInstant::Now(), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvWakeup)); Y_VERIFY(!ScanIterator); diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 4ebfc9440da..47c5d59f0dc 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -31,7 +31,7 @@ private: }; -bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { +bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) { Y_VERIFY(Ev); Y_VERIFY(Self->InsertTable); Y_VERIFY(Self->PrimaryIndex); @@ -264,7 +264,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { Self->IncCounter(COUNTER_EVICTION_BYTES_WRITTEN, bytesWritten); } - Self->UpdateResourceMetrics(Ev->Get()->ResourceUsage); + Self->UpdateResourceMetrics(ctx, Ev->Get()->ResourceUsage); return true; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 9c9244ee22d..9303fea4792 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -508,8 +508,8 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, bool insertOnly) { return; } - const TActorContext& ctx = TlsActivationContext->AsActorContext(); - SendPeriodicStats(ctx); + const TActorContext& ctx = TActivationContext::ActorContextFor(SelfId()); + SendPeriodicStats(); if (auto event = SetupIndexation()) { ctx.Send(IndexingActor, event.release()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 3422874694b..5e9e8d63cfc 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -413,7 +413,7 @@ private: bool RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId = 0); bool RemoveTx(NTable::TDatabase& database, ui64 txId); - void EnqueueProgressTx(); + void EnqueueProgressTx(const TActorContext& ctx); void EnqueueBackgroundActivities(bool periodic = false, bool insertOnly = false); void UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc); @@ -452,9 +452,9 @@ private: void UpdateBlobMangerCounters(); void UpdateInsertTableCounters(); void UpdateIndexCounters(); - void UpdateResourceMetrics(const TUsage& usage); + void UpdateResourceMetrics(const TActorContext& ctx, const TUsage& usage); ui64 MemoryUsage() const; - void SendPeriodicStats(const TActorContext& ctx); + void SendPeriodicStats(); public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 1789b3b69b2..b095639e36f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4341,6 +4341,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { Self->OlapStores[pathId] = new TOlapStoreInfo(alterVersion, std::move(description), std::move(sharding)); Self->IncrementPathDbRefCount(pathId); + Self->SetPartitioning(pathId, Self->OlapStores[pathId]); if (!rowset.Next()) { return false; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp index 4cb1329e18a..cfb26e14fce 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp @@ -227,6 +227,8 @@ void ApplySharding(TTxId txId, TPathId pathId, TOlapStoreInfo::TPtr storeInfo, shardInfoProto->SetOwnerId(idx.GetOwnerId()); shardInfoProto->SetLocalId(idx.GetLocalId().GetValue()); } + + ss->SetPartitioning(pathId, storeInfo); } class TConfigureParts: public TSubOperationState { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 1c824e9abf5..ea1bad65c46 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -5873,6 +5873,23 @@ bool TSchemeShard::FillUniformPartitioning(TVector<TString>& rangeEnds, ui32 key return true; } +void TSchemeShard::SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInfo) { + const TVector<TShardIdx>& partitioning = storeInfo->ColumnShards; + + if (AppData()->FeatureFlags.GetEnableSystemViews() && SysPartitionStatsCollector) { + TVector<std::pair<ui64, ui64>> shardIndices; + shardIndices.reserve(partitioning.size()); + for (auto& shardIdx : partitioning) { + shardIndices.emplace_back(ui64(shardIdx.GetOwnerId()), ui64(shardIdx.GetLocalId())); + } + + auto path = TPath::Init(pathId, this); + auto ev = MakeHolder<NSysView::TEvSysView::TEvSetPartitioning>(GetDomainKey(pathId), pathId, path.PathString()); + ev->ShardIndices.swap(shardIndices); + Send(SysPartitionStatsCollector, ev.Release()); + } +} + void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, TVector<TTableShardInfo>&& newPartitioning) { if (AppData()->FeatureFlags.GetEnableSystemViews()) { TVector<std::pair<ui64, ui64>> shardIndices; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 30c512d1c75..71885b07ad5 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -516,6 +516,7 @@ public: void DoShardsDeletion(const THashSet<TShardIdx>& shardIdx, const TActorContext& ctx); + void SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInfo); void SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, TVector<TTableShardInfo>&& newPartitioning); auto BuildStatsForCollector(TPathId tableId, TShardIdx shardIdx, TTabletId datashardId, TMaybe<ui32> nodeId, TMaybe<ui64> startTime, const TPartitionStats& stats); |