aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <chertus@gmail.com>2022-07-05 21:00:42 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2022-07-05 21:00:42 +0300
commit4e3d96f9fb84fe7703a0c2107a3c8f44f9ccf748 (patch)
tree8a133c6132fd24b703b123c63a2188b810f63c0e
parent014d4040c4b1be95e8260f823efcdae39cbbd439 (diff)
downloadydb-4e3d96f9fb84fe7703a0c2107a3c8f44f9ccf748.tar.gz
KIKIMR-15038: pipe delivery problems (c-p r9662729)
REVIEW: 2694709 REVIEW: 2695840 x-ydb-stable-ref: 65c6f45946e8d9ffc9e34e93aa3a596d732de513
-rw-r--r--ydb/core/tx/columnshard/columnshard.cpp34
-rw-r--r--ydb/core/tx/columnshard/columnshard.h1
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard__plan_step.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp6
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp5
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h6
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp17
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h1
13 files changed, 57 insertions, 32 deletions
diff --git a/ydb/core/tx/columnshard/columnshard.cpp b/ydb/core/tx/columnshard/columnshard.cpp
index 32219f0604..96853e60b2 100644
--- a/ydb/core/tx/columnshard/columnshard.cpp
+++ b/ydb/core/tx/columnshard/columnshard.cpp
@@ -54,28 +54,31 @@ void TColumnShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TAc
auto clientId = ev->Get()->ClientId;
if (clientId == StatsReportPipe) {
- if (ev->Get()->Status != NKikimrProto::OK) {
- StatsReportPipe = TActorId();
+ if (ev->Get()->Status == NKikimrProto::OK) {
+ LOG_S_DEBUG("Connected to " << tabletId << " at tablet " << TabletID());
+ } else {
+ LOG_S_INFO("Failed to connect to " << tabletId << " at tablet " << TabletID());
+ StatsReportPipe = {};
}
return;
}
if (PipeClientCache->OnConnect(ev)) {
- LOG_S_DEBUG("Connected to tablet at " << TabletID() << ", remote " << tabletId);
+ LOG_S_DEBUG("Connected to " << tabletId << " at tablet " << TabletID());
return;
}
- LOG_S_INFO("Failed to connect at " << TabletID() << ", remote " << tabletId);
+ LOG_S_INFO("Failed to connect to " << tabletId << " at tablet " << TabletID());
}
void TColumnShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TActorContext&) {
auto tabletId = ev->Get()->TabletId;
auto clientId = ev->Get()->ClientId;
- LOG_S_DEBUG("Client pipe reset at " << TabletID() << ", remote " << tabletId);
+ LOG_S_DEBUG("Client pipe reset to " << tabletId << " at tablet " << TabletID());
if (clientId == StatsReportPipe) {
- StatsReportPipe = TActorId();
+ StatsReportPipe = {};
return;
}
@@ -83,13 +86,13 @@ void TColumnShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr& ev, const TAc
}
void TColumnShard::Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev, const TActorContext&) {
- auto tabletId = ev->Get()->TabletId;
- LOG_S_DEBUG("Server pipe connected at tablet " << TabletID() << ", remote " << tabletId);
+ Y_UNUSED(ev);
+ LOG_S_DEBUG("Server pipe connected at tablet " << TabletID());
}
void TColumnShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev, const TActorContext&) {
- auto tabletId = ev->Get()->TabletId;
- LOG_S_DEBUG("Server pipe reset at tablet " << TabletID() << ", remote " << tabletId);
+ Y_UNUSED(ev);
+ LOG_S_DEBUG("Server pipe reset at tablet " << TabletID());
}
void TColumnShard::Handle(TEvColumnShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) {
@@ -213,6 +216,7 @@ void TColumnShard::Handle(TEvPrivate::TEvPeriodicWakeup::TPtr& ev, const TActorC
SendWaitPlanStep(GetOutdatedStep());
}
+ SendPeriodicStats();
ctx.Schedule(ActivationPeriod, new TEvPrivate::TEvPeriodicWakeup());
}
@@ -327,7 +331,7 @@ ui64 TColumnShard::MemoryUsage() const {
return memory;
}
-void TColumnShard::UpdateResourceMetrics(const TUsage& usage) {
+void TColumnShard::UpdateResourceMetrics(const TActorContext& ctx, const TUsage& usage) {
auto * metrics = Executor()->GetResourceMetrics();
if (!metrics) {
return;
@@ -343,8 +347,7 @@ void TColumnShard::UpdateResourceMetrics(const TUsage& usage) {
ui64 memory = MemoryUsage();
- const TActorContext& ctx = TlsActivationContext->AsActorContext();
- TInstant now = AppData(ctx)->TimeProvider->Now();
+ TInstant now = TAppData::TimeProvider->Now();
metrics->CPU.Increment(usage.CPUExecTime, now);
metrics->Network.Increment(usage.Network, now);
//metrics->StorageSystem
@@ -356,13 +359,14 @@ void TColumnShard::UpdateResourceMetrics(const TUsage& usage) {
metrics->TryUpdate(ctx);
}
-void TColumnShard::SendPeriodicStats(const TActorContext& ctx) {
+void TColumnShard::SendPeriodicStats() {
if (!CurrentSchemeShardId || !StorePathId) {
LOG_S_DEBUG("Disabled periodic stats at tablet " << TabletID());
return;
}
- TInstant now = AppData(ctx)->TimeProvider->Now();
+ const TActorContext& ctx = TActivationContext::ActorContextFor(SelfId());
+ TInstant now = TAppData::TimeProvider->Now();
if (LastStatsReport + StatsReportInterval > now) {
return;
}
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h
index 809a0c6e19..8c52759ad1 100644
--- a/ydb/core/tx/columnshard/columnshard.h
+++ b/ydb/core/tx/columnshard/columnshard.h
@@ -236,7 +236,6 @@ struct TEvColumnShard {
};
using TEvScan = TEvDataShard::TEvKqpScan;
- using TEvPeriodicTableStats = TEvDataShard::TEvPeriodicTableStats;
};
inline auto& Proto(TEvColumnShard::TEvProposeTransaction* ev) {
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index e96b215c8e..cf5cd6bb03 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -36,7 +36,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
// Load InsertTable
TBlobGroupSelector dsGroupSelector(Self->Info());
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
- if (!Self->InsertTable->Load(dbTable, AppData(ctx)->TimeProvider->Now())) {
+ if (!Self->InsertTable->Load(dbTable, TAppData::TimeProvider->Now())) {
return false;
}
@@ -280,7 +280,7 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
Self->UpdateInsertTableCounters();
Self->UpdateIndexCounters();
- Self->UpdateResourceMetrics({});
+ Self->UpdateResourceMetrics(ctx, {});
return true;
}
@@ -309,7 +309,7 @@ void TTxInit::Complete(const TActorContext& ctx) {
Self->TryRegisterMediatorTimeCast();
// Trigger progress: planned or outdated tx
- Self->EnqueueProgressTx();
+ Self->EnqueueProgressTx(ctx);
Self->EnqueueBackgroundActivities();
// Start periodic wakeups
diff --git a/ydb/core/tx/columnshard/columnshard__plan_step.cpp b/ydb/core/tx/columnshard/columnshard__plan_step.cpp
index 59fb09dbd9..7c1ae9a054 100644
--- a/ydb/core/tx/columnshard/columnshard__plan_step.cpp
+++ b/ydb/core/tx/columnshard/columnshard__plan_step.cpp
@@ -8,7 +8,7 @@ namespace NKikimr::NColumnShard {
using namespace NTabletFlatExecutor;
-bool TTxPlanStep::Execute(TTransactionContext& txc, const TActorContext&) {
+bool TTxPlanStep::Execute(TTransactionContext& txc, const TActorContext& ctx) {
Y_VERIFY(Ev);
LOG_S_DEBUG("TTxPlanStep.Execute at tablet " << Self->TabletID());
@@ -75,7 +75,7 @@ bool TTxPlanStep::Execute(TTransactionContext& txc, const TActorContext&) {
Self->IncCounter(COUNTER_PLAN_STEP_ACCEPTED);
if (plannedCount > 0 || Self->HaveOutdatedTxs()) {
- Self->EnqueueProgressTx();
+ Self->EnqueueProgressTx(ctx);
}
return true;
}
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index ba2498a023..0d05a33b66 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -118,7 +118,7 @@ public:
Self->ProgressTxInFlight = false;
if (Self->PlanQueue) {
- Self->EnqueueProgressTx();
+ Self->EnqueueProgressTx(ctx);
}
return true;
}
@@ -145,10 +145,10 @@ private:
bool StartBackgroundActivities{false};
};
-void TColumnShard::EnqueueProgressTx() {
+void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) {
if (!ProgressTxInFlight) {
ProgressTxInFlight = true;
- Execute(new TTxProgressTx(this), TlsActivationContext->AsActorContext());
+ Execute(new TTxProgressTx(this), ctx);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index e2f7ba5a51..e011b37926 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -52,7 +52,7 @@ public:
void Bootstrap(const TActorContext& ctx) {
ScanActorId = ctx.SelfID;
- TimeoutActorId = CreateLongTimer(TlsActivationContext->AsActorContext(), Deadline - TInstant::Now(),
+ TimeoutActorId = CreateLongTimer(ctx, Deadline - TInstant::Now(),
new IEventHandle(SelfId(), SelfId(), new TEvents::TEvWakeup));
Y_VERIFY(!ScanIterator);
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 250b6e57e3..4edbfd626d 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -8,7 +8,8 @@ namespace NKikimr::NColumnShard {
using namespace NTabletFlatExecutor;
-bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
+
+bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx) {
Y_VERIFY(Ev);
Y_VERIFY(Self->InsertTable);
Y_VERIFY(Self->PrimaryIndex);
@@ -169,7 +170,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext&) {
Self->IncCounter(COUNTER_EVICTION_BYTES_WRITTEN, bytesWritten);
}
- Self->UpdateResourceMetrics(Ev->Get()->ResourceUsage);
+ Self->UpdateResourceMetrics(ctx, Ev->Get()->ResourceUsage);
return true;
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 8b0a6479d6..d905e742de 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -508,8 +508,8 @@ void TColumnShard::EnqueueBackgroundActivities(bool periodic, bool insertOnly) {
return;
}
- const TActorContext& ctx = TlsActivationContext->AsActorContext();
- SendPeriodicStats(ctx);
+ const TActorContext& ctx = TActivationContext::ActorContextFor(SelfId());
+ SendPeriodicStats();
if (auto event = SetupIndexation()) {
ctx.Send(IndexingActor, event.release());
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 9c42e0b57d..eecf21f510 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -377,7 +377,7 @@ private:
bool RemoveLongTxWrite(NIceDb::TNiceDb& db, TWriteId writeId, ui64 txId = 0);
bool RemoveTx(NTable::TDatabase& database, ui64 txId);
- void EnqueueProgressTx();
+ void EnqueueProgressTx(const TActorContext& ctx);
void EnqueueBackgroundActivities(bool periodic = false, bool insertOnly = false);
void UpdateSchemaSeqNo(const TMessageSeqNo& seqNo, NTabletFlatExecutor::TTransactionContext& txc);
@@ -407,9 +407,9 @@ private:
void UpdateBlobMangerCounters();
void UpdateInsertTableCounters();
void UpdateIndexCounters();
- void UpdateResourceMetrics(const TUsage& usage);
+ void UpdateResourceMetrics(const TActorContext& ctx, const TUsage& usage);
ui64 MemoryUsage() const;
- void SendPeriodicStats(const TActorContext& ctx);
+ void SendPeriodicStats();
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index 11527944f0..ed4a73b588 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -4334,6 +4334,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
Self->OlapStores[pathId] = new TOlapStoreInfo(alterVersion, std::move(description), std::move(sharding));
Self->IncrementPathDbRefCount(pathId);
+ Self->SetPartitioning(pathId, Self->OlapStores[pathId]);
if (!rowset.Next()) {
return false;
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
index 4cb1329e18..cfb26e14fc 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_create_olap_store.cpp
@@ -227,6 +227,8 @@ void ApplySharding(TTxId txId, TPathId pathId, TOlapStoreInfo::TPtr storeInfo,
shardInfoProto->SetOwnerId(idx.GetOwnerId());
shardInfoProto->SetLocalId(idx.GetLocalId().GetValue());
}
+
+ ss->SetPartitioning(pathId, storeInfo);
}
class TConfigureParts: public TSubOperationState {
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index 4b3532944f..9c6350e2a8 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -5824,6 +5824,23 @@ bool TSchemeShard::FillUniformPartitioning(TVector<TString>& rangeEnds, ui32 key
return true;
}
+void TSchemeShard::SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInfo) {
+ const TVector<TShardIdx>& partitioning = storeInfo->ColumnShards;
+
+ if (AppData()->FeatureFlags.GetEnableSystemViews() && SysPartitionStatsCollector) {
+ TVector<std::pair<ui64, ui64>> shardIndices;
+ shardIndices.reserve(partitioning.size());
+ for (auto& shardIdx : partitioning) {
+ shardIndices.emplace_back(ui64(shardIdx.GetOwnerId()), ui64(shardIdx.GetLocalId()));
+ }
+
+ auto path = TPath::Init(pathId, this);
+ auto ev = MakeHolder<NSysView::TEvSysView::TEvSetPartitioning>(GetDomainKey(pathId), pathId, path.PathString());
+ ev->ShardIndices.swap(shardIndices);
+ Send(SysPartitionStatsCollector, ev.Release());
+ }
+}
+
void TSchemeShard::SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, TVector<TTableShardInfo>&& newPartitioning) {
if (AppData()->FeatureFlags.GetEnableSystemViews()) {
TVector<std::pair<ui64, ui64>> shardIndices;
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 2f6041ff42..6c54754ac9 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -515,6 +515,7 @@ public:
void DoShardsDeletion(const THashSet<TShardIdx>& shardIdx, const TActorContext& ctx);
+ void SetPartitioning(TPathId pathId, TOlapStoreInfo::TPtr storeInfo);
void SetPartitioning(TPathId pathId, TTableInfo::TPtr tableInfo, TVector<TTableShardInfo>&& newPartitioning);
auto BuildStatsForCollector(TPathId tableId, TShardIdx shardIdx, TTabletId datashardId,
TMaybe<ui32> nodeId, TMaybe<ui64> startTime, const TPartitionStats& stats);