diff options
author | Artem Zuikov <chertus@gmail.com> | 2022-07-05 21:00:42 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2022-07-05 21:00:42 +0300 |
commit | 4e3d96f9fb84fe7703a0c2107a3c8f44f9ccf748 (patch) | |
tree | 8a133c6132fd24b703b123c63a2188b810f63c0e | |
parent | 014d4040c4b1be95e8260f823efcdae39cbbd439 (diff) | |
download | ydb-4e3d96f9fb84fe7703a0c2107a3c8f44f9ccf748.tar.gz |
KIKIMR-15038: pipe delivery problems (c-p r9662729)
REVIEW: 2694709
REVIEW: 2695840
x-ydb-stable-ref: 65c6f45946e8d9ffc9e34e93aa3a596d732de513
-rw-r--r-- | ydb/core/tx/columnshard/columnshard.cpp | 34 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__init.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__plan_step.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__progress_tx.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__scan.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write_index.cpp | 5 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__init.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.cpp | 17 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.h | 1 |
13 files changed, 57 insertions, 32 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp index 32219f0604..96853e60b2 100644 --- a/ydb/core/tx/columnshard/columnshard.cpp +++ b/ydb/core/tx/columnshard/columnshard.cpp @@ -54,28 +54,31 @@ void TColumnShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TAc auto clientId = ev->Get()->ClientId; if (clientId == StatsReportPipe) { - if (ev->Get()->Status != NKikimrProto::OK) { - StatsReportPipe = TActorId(); + if (ev->Get()->Status == NKikimrProto::OK) { + LOG_S_DEBUG("Connected to " << tabletId << " at tablet " << TabletID()); + } else { + LOG_S_INFO("Failed to connect to " << tabletId << " at tablet " << TabletID()); + StatsReportPipe = {}; } return; } if (PipeClientCache->OnConnect(ev)) { - LOG_S_DEBUG("Connected to tablet at " << TabletID() << ", remote " << tabletId); + LOG_S_DEBUG("Connected to " << tabletId << " at tablet " << TabletID()); return; } - LOG_S_INFO("Failed to connect at " << TabletID() << ", remote " << tabletId); + LOG_S_INFO("Failed to connect to " << tabletId << " at tablet " << TabletID()); } void TColumnShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext&) { auto tabletId = ev->Get()->TabletId; auto clientId = ev->Get()->ClientId; - LOG_S_DEBUG("Client pipe reset at " << TabletID() << ", remote " << tabletId); + LOG_S_DEBUG("Client pipe reset to " << tabletId << " at tablet " << TabletID()); if (clientId == StatsReportPipe) { - StatsReportPipe = TActorId(); + StatsReportPipe = {}; return; } @@ -83,13 +86,13 @@ void TColumnShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TAc } void TColumnShard::Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext&) { - auto tabletId = ev->Get()->TabletId; - LOG_S_DEBUG("Server pipe connected at tablet " << TabletID() << ", remote " << tabletId); + Y_UNUSED(ev); + LOG_S_DEBUG("Server pipe connected at tablet " << TabletID()); } void TColumnShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TActorContext&) { - auto tabletId = ev->Get()->TabletId; - LOG_S_DEBUG("Server pipe reset at tablet " << TabletID() << ", remote " << tabletId); + Y_UNUSED(ev); + LOG_S_DEBUG("Server pipe reset at tablet " << TabletID()); } void TColumnShard::Handle(TEvColumnShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) { @@ -213,6 +216,7 @@ void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorC SendWaitPlanStep(GetOutdatedStep()); } + SendPeriodicStats(); ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup()); } @@ -327,7 +331,7 @@ ui64 TColumnShard::MemoryUsage() const { return memory; } -void TColumnShard::UpdateResourceMetrics(const TUsage& usage) { +void TColumnShard::UpdateResourceMetrics(const TActorContext& ctx, const TUsage& usage) { auto * metrics = Executor()->GetResourceMetrics(); if (!metrics) { return; @@ -343,8 +347,7 @@ void TColumnShard::UpdateResourceMetrics(const TUsage& usage) { ui64 memory = MemoryUsage(); - const TActorContext& ctx = TlsActivationContext->AsActorContext(); - TInstant now = AppData(ctx)->TimeProvider->Now(); + TInstant now = TAppData::TimeProvider->Now(); metrics->CPU.Increment(usage.CPUExecTime, now); metrics->Network.Increment(usage.Network, now); //metrics->StorageSystem @@ -356,13 +359,14 @@ void TColumnShard::UpdateResourceMetrics(const TUsage& usage) { metrics->TryUpdate(ctx); } -void TColumnShard::SendPeriodicStats(const TActorContext& ctx) { +void TColumnShard::SendPeriodicStats() { if (!CurrentSchemeShardId || !StorePathId) { LOG_S_DEBUG("Disabled periodic stats at tablet " << TabletID()); return; } - TInstant now = AppData(ctx)->TimeProvider->Now(); + const TActorContext& ctx = TActivationContext::ActorContextFor(SelfId()); + TInstant now = TAppData::TimeProvider->Now(); if (LastStatsReport + StatsReportInterval > now) { return; } diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index 809a0c6e19..8c52759ad1 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -236,7 +236,6 @@ struct TEvColumnShard { }; using TEvScan = TEvDataShard::TEvKqpScan; - using TEvPeriodicTableStats = TEvDataShard::TEvPeriodicTableStats; }; inline auto& Proto(TEvColumnShard::TEvProposeTransaction* ev) { diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index e96b215c8e..cf5cd6bb03 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -36,7 +36,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) // Load InsertTable TBlobGroupSelector dsGroupSelector(Self->Info()); NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector); - if (!Self->InsertTable->Load(dbTable, AppData(ctx)->TimeProvider->Now())) { + if (!Self->InsertTable->Load(dbTable, TAppData::TimeProvider->Now())) { return false; } @@ -280,7 +280,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx) Self->UpdateInsertTableCounters(); Self->UpdateIndexCounters(); - Self->UpdateResourceMetrics({}); + Self->UpdateResourceMetrics(ctx, {}); return true; } @@ -309,7 +309,7 @@ void TTxInit::Complete(const TActorContext& ctx) { Self->TryRegisterMediatorTimeCast(); // Trigger progress: planned or outdated tx - Self->EnqueueProgressTx(); + Self->EnqueueProgressTx(ctx); Self->EnqueueBackgroundActivities(); // Start periodic wakeups diff --git a/ydb/core/tx/columnshard/columnshard__plan_step.cpp b/ydb/core/tx/columnshard/columnshard__plan_step.cpp index 59fb09dbd9..7c1ae9a054 100644 --- a/ydb/core/tx/columnshard/columnshard__plan_step.cpp +++ b/ydb/core/tx/columnshard/columnshard__plan_step.cpp @@ -8,7 +8,7 @@ namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; -bool TTxPlanStep::Execute(TTransactionContext& txc, const TActorContext&) { +bool TTxPlanStep::Execute(TTransactionContext& txc, const TActorContext& ctx) { Y_VERIFY(Ev); LOG_S_DEBUG("TTxPlanStep.Execute at tablet " << Self->TabletID()); @@ -75,7 +75,7 @@ bool TTxPlanStep::Execute(TTransactionContext& txc, const TActorContext&) { Self->IncCounter(COUNTER_PLAN_STEP_ACCEPTED); if (plannedCount > 0 || Self->HaveOutdatedTxs()) { - Self->EnqueueProgressTx(); + Self->EnqueueProgressTx(ctx); } return true; } diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index ba2498a023..0d05a33b66 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -118,7 +118,7 @@ public: Self->ProgressTxInFlight = false; if (Self->PlanQueue) { - Self->EnqueueProgressTx(); + Self->EnqueueProgressTx(ctx); } return true; } @@ -145,10 +145,10 @@ private: bool StartBackgroundActivities{false}; }; -void TColumnShard::EnqueueProgressTx() { +void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) { if (!ProgressTxInFlight) { ProgressTxInFlight = true; - Execute(new TTxProgressTx(this), TlsActivationContext->AsActorContext()); + Execute(new TTxProgressTx(this), ctx); } } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index e2f7ba5a51..e011b37926 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -52,7 +52,7 @@ public: void Bootstrap(const TActorContext& ctx) { ScanActorId = ctx.SelfID; - TimeoutActorId = CreateLongTimer(TlsActivationContext->AsActorContext(), Deadline - TInstant::Now(), + TimeoutActorId = CreateLongTimer(ctx, Deadline - TInstant::Now(), new IEventHandle(SelfId(), SelfId(), new TEvents::TEvWakeup)); Y_VERIFY(!ScanIterator); diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp index 250b6e57e3..4edbfd626d 100644 --- a/ydb/core/tx/columnshard/columnshard__write_index.cpp +++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp @@ -8,7 +8,8 @@ namespace NKikimr::NColumnShard { using namespace NTabletFlatExecutor; -bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { + +bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) { Y_VERIFY(Ev); Y_VERIFY(Self->InsertTable); Y_VERIFY(Self->PrimaryIndex); @@ -169,7 +170,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) { Self->IncCounter(COUNTER_EVICTION_BYTES_WRITTEN, bytesWritten); } - Self->UpdateResourceMetrics(Ev->Get()->ResourceUsage); + Self->UpdateResourceMetrics(ctx, Ev->Get()->ResourceUsage); return true; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 8b0a6479d6..d905e742de 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -508,8 +508,8 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, bool insertOnly) { return; } - const TActorContext& ctx = TlsActivationContext->AsActorContext(); - SendPeriodicStats(ctx); + const TActorContext& ctx = TActivationContext::ActorContextFor(SelfId()); + SendPeriodicStats(); if (auto event = SetupIndexation()) { ctx.Send(IndexingActor, event.release()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 9c42e0b57d..eecf21f510 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -377,7 +377,7 @@ private: bool RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId = 0); bool RemoveTx(NTable::TDatabase& database, ui64 txId); - void EnqueueProgressTx(); + void EnqueueProgressTx(const TActorContext& ctx); void EnqueueBackgroundActivities(bool periodic = false, bool insertOnly = false); void UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc); @@ -407,9 +407,9 @@ private: void UpdateBlobMangerCounters(); void UpdateInsertTableCounters(); void UpdateIndexCounters(); - void UpdateResourceMetrics(const TUsage& usage); + void UpdateResourceMetrics(const TActorContext& ctx, const TUsage& usage); ui64 MemoryUsage() const; - void SendPeriodicStats(const TActorContext& ctx); + void SendPeriodicStats(); public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 11527944f0..ed4a73b588 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4334,6 +4334,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { Self->OlapStores[pathId] = new TOlapStoreInfo(alterVersion, std::move(description), std::move(sharding)); Self->IncrementPathDbRefCount(pathId); + Self->SetPartitioning(pathId, Self->OlapStores[pathId]); if (!rowset.Next()) { return false; diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp index 4cb1329e18..cfb26e14fc 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp @@ -227,6 +227,8 @@ void ApplySharding(TTxId txId, TPathId pathId, TOlapStoreInfo::TPtr storeInfo, shardInfoProto->SetOwnerId(idx.GetOwnerId()); shardInfoProto->SetLocalId(idx.GetLocalId().GetValue()); } + + ss->SetPartitioning(pathId, storeInfo); } class TConfigureParts: public TSubOperationState { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 4b3532944f..9c6350e2a8 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -5824,6 +5824,23 @@ bool TSchemeShard::FillUniformPartitioning(TVector<TString>& rangeEnds, ui32 key return true; } +void TSchemeShard::SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInfo) { + const TVector<TShardIdx>& partitioning = storeInfo->ColumnShards; + + if (AppData()->FeatureFlags.GetEnableSystemViews() && SysPartitionStatsCollector) { + TVector<std::pair<ui64, ui64>> shardIndices; + shardIndices.reserve(partitioning.size()); + for (auto& shardIdx : partitioning) { + shardIndices.emplace_back(ui64(shardIdx.GetOwnerId()), ui64(shardIdx.GetLocalId())); + } + + auto path = TPath::Init(pathId, this); + auto ev = MakeHolder<NSysView::TEvSysView::TEvSetPartitioning>(GetDomainKey(pathId), pathId, path.PathString()); + ev->ShardIndices.swap(shardIndices); + Send(SysPartitionStatsCollector, ev.Release()); + } +} + void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, TVector<TTableShardInfo>&& newPartitioning) { if (AppData()->FeatureFlags.GetEnableSystemViews()) { TVector<std::pair<ui64, ui64>> shardIndices; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 2f6041ff42..6c54754ac9 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -515,6 +515,7 @@ public: void DoShardsDeletion(const THashSet<TShardIdx>& shardIdx, const TActorContext& ctx); + void SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInfo); void SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, TVector<TTableShardInfo>&& newPartitioning); auto BuildStatsForCollector(TPathId tableId, TShardIdx shardIdx, TTabletId datashardId, TMaybe<ui32> nodeId, TMaybe<ui64> startTime, const TPartitionStats& stats); |