aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-12-11 20:13:40 +0300
committerGitHub <noreply@github.com>2024-12-11 20:13:40 +0300
commit16418df6e17d4a6d93218df1887937387df07ee6 (patch)
treef7d64f6e63d76e87affadade3eb2bc4c58e6da47
parentbff00aea49e2e97a97dbbd535882b53c1b5c4d9d (diff)
downloadydb-16418df6e17d4a6d93218df1887937387df07ee6.tar.gz
commit processing fixes (#12519)
-rw-r--r--ydb/core/tx/columnshard/columnshard__progress_tx.cpp31
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h2
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp24
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp22
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h30
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h13
-rw-r--r--ydb/core/tx/columnshard/transactions/tx_controller.cpp28
-rw-r--r--ydb/core/tx/columnshard/transactions/tx_controller.h16
-rw-r--r--ydb/library/services/services.proto1
15 files changed, 111 insertions, 80 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
index 73a4a0200d9..44bb0a27f86 100644
--- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
+++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp
@@ -28,9 +28,12 @@ public:
}
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
- NActors::TLogContextGuard logGuard =
- NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
- Y_ABORT_UNLESS(Self->ProgressTxInFlight);
+ NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)("tablet_id", Self->TabletID())(
+ "tx_state", "TTxProgressTx::Execute")("tx_current", Self->ProgressTxInFlight);
+ if (!Self->ProgressTxInFlight) {
+ AbortedThroughRemoveExpired = true;
+ return true;
+ }
Self->Counters.GetTabletCounters()->SetCounter(COUNTER_TX_COMPLETE_LAG, Self->GetTxCompleteLag().MilliSeconds());
const size_t removedCount = Self->ProgressTxController->CleanExpiredTxs(txc);
@@ -45,15 +48,24 @@ public:
const auto plannedItem = Self->ProgressTxController->GetFirstPlannedTx();
if (!!plannedItem) {
PlannedQueueItem.emplace(plannedItem->PlanStep, plannedItem->TxId);
- ui64 step = plannedItem->PlanStep;
- ui64 txId = plannedItem->TxId;
+ const ui64 step = plannedItem->PlanStep;
+ const ui64 txId = plannedItem->TxId;
+ NActors::TLogContextGuard logGuardTx = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)("tx_id", txId);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart");
TxOperator = Self->ProgressTxController->GetTxOperatorVerified(txId);
if (auto txPrepare = TxOperator->BuildTxPrepareForProgress(Self)) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart")("details", "BuildTxPrepareForProgress");
AbortedThroughRemoveExpired = true;
Self->ProgressTxInFlight = txId;
Self->Execute(txPrepare.release(), ctx);
return true;
+ } else if (TxOperator->IsInProgress()) {
+ AbortedThroughRemoveExpired = true;
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemContinue");
+ AFL_VERIFY(Self->ProgressTxInFlight == txId);
+ return true;
} else {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart")("details", "PopFirstPlannedTx");
Self->ProgressTxController->PopFirstPlannedTx();
}
StartExecution = TMonotonic::Now();
@@ -80,8 +92,9 @@ public:
if (AbortedThroughRemoveExpired) {
return;
}
- NActors::TLogContextGuard logGuard =
- NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
+ NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)(
+ "tablet_id", Self->TabletID())(
+ "tx_state", "TTxProgressTx::Complete");
if (TxOperator) {
TxOperator->ProgressOnComplete(*Self, ctx);
Self->RescheduleWaitingReads();
@@ -104,11 +117,13 @@ public:
};
void TColumnShard::EnqueueProgressTx(const TActorContext& ctx, const std::optional<ui64> continueTxId) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "EnqueueProgressTx")("tablet_id", TabletID());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "EnqueueProgressTx")("tablet_id", TabletID())("tx_id", continueTxId);
if (continueTxId) {
AFL_VERIFY(!ProgressTxInFlight || ProgressTxInFlight == continueTxId)("current", ProgressTxInFlight)("expected", continueTxId);
}
if (!ProgressTxInFlight || ProgressTxInFlight == continueTxId) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "EnqueueProgressTxStart")("tablet_id", TabletID())("tx_id", continueTxId)(
+ "tx_current", ProgressTxInFlight);
ProgressTxInFlight = continueTxId.value_or(0);
Execute(new TTxProgressTx(this), ctx);
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp
index e350fbb0571..bd6cf4925a4 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine.cpp
@@ -38,10 +38,10 @@ void IColumnEngine::FetchDataAccessors(const std::shared_ptr<TDataAccessorsReque
TSelectInfo::TStats TSelectInfo::Stats() const {
TStats out;
- out.Portions = PortionsOrderedPK.size();
+ out.Portions = Portions.size();
THashSet<TUnifiedBlobId> uniqBlob;
- for (auto& portionInfo : PortionsOrderedPK) {
+ for (auto& portionInfo : Portions) {
out.Rows += portionInfo->GetRecordsCount();
for (auto& blobId : portionInfo->GetBlobIds()) {
out.Bytes += blobId.BlobSize();
@@ -53,10 +53,10 @@ TSelectInfo::TStats TSelectInfo::Stats() const {
TString TSelectInfo::DebugString() const {
TStringBuilder result;
- result << "count:" << PortionsOrderedPK.size() << ";";
- if (PortionsOrderedPK.size()) {
+ result << "count:" << Portions.size() << ";";
+ if (Portions.size()) {
result << "portions:";
- for (auto& portionInfo : PortionsOrderedPK) {
+ for (auto& portionInfo : Portions) {
result << portionInfo->DebugString();
}
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 940a055963b..13d5c954920 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -46,7 +46,7 @@ struct TSelectInfo {
}
};
- std::vector<std::shared_ptr<TPortionInfo>> PortionsOrderedPK;
+ std::vector<std::shared_ptr<TPortionInfo>> Portions;
TStats Stats() const;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 518513b5635..30ad46d591d 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -502,18 +502,22 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(
return out;
}
- if (withUncommitted) {
- for (const auto& [_, portionInfo] : spg->GetInsertedPortions()) {
- AFL_VERIFY(portionInfo->HasInsertWriteId());
- AFL_VERIFY(!portionInfo->HasCommitSnapshot());
- const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo);
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)(
- "portion", portionInfo->DebugString());
- if (skipPortion) {
+ for (const auto& [_, portionInfo] : spg->GetInsertedPortions()) {
+ AFL_VERIFY(portionInfo->HasInsertWriteId());
+ if (withUncommitted) {
+ if (!portionInfo->IsVisible(snapshot, !withUncommitted)) {
continue;
}
- out->PortionsOrderedPK.emplace_back(portionInfo);
+ } else if (!portionInfo->HasCommitSnapshot()) {
+ continue;
+ }
+ const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo);
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)(
+ "portion", portionInfo->DebugString());
+ if (skipPortion) {
+ continue;
}
+ out->Portions.emplace_back(portionInfo);
}
for (const auto& [_, portionInfo] : spg->GetPortions()) {
if (!portionInfo->IsVisible(snapshot, !withUncommitted)) {
@@ -525,7 +529,7 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(
if (skipPortion) {
continue;
}
- out->PortionsOrderedPK.emplace_back(portionInfo);
+ out->Portions.emplace_back(portionInfo);
}
return out;
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp
index ec712ef066c..56a14c9b23f 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp
@@ -20,7 +20,7 @@ TConclusionStatus TReadMetadata::Init(
SelectInfo = dataAccessor.Select(readDescription, !!LockId);
if (LockId) {
- for (auto&& i : SelectInfo->PortionsOrderedPK) {
+ for (auto&& i : SelectInfo->Portions) {
if (i->HasInsertWriteId() && !i->HasCommitSnapshot()) {
if (owner->HasLongTxWrites(i->GetInsertWriteIdVerified())) {
} else {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h
index 17f56ef0ff3..b0242d486aa 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.h
@@ -21,7 +21,7 @@ public:
std::vector<TCommittedBlob> CommittedBlobs;
virtual bool Empty() const override {
Y_ABORT_UNLESS(SelectInfo);
- return SelectInfo->PortionsOrderedPK.empty() && CommittedBlobs.empty();
+ return SelectInfo->Portions.empty() && CommittedBlobs.empty();
}
virtual std::shared_ptr<IDataReader> BuildReader(const std::shared_ptr<TReadContext>& context) const override;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp
index 036fdcb6655..8633f5d692a 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/plain_read_data.cpp
@@ -9,7 +9,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context)) {
ui32 sourceIdx = 0;
std::deque<std::shared_ptr<IDataSource>> sources;
- const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
+ const auto& portions = GetReadMetadata()->SelectInfo->Portions;
const auto& committed = GetReadMetadata()->CommittedBlobs;
ui64 compactedPortionsBytes = 0;
ui64 insertedPortionsBytes = 0;
@@ -49,7 +49,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
Scanner = std::make_shared<TScanHead>(std::move(sources), SpecialReadContext);
auto& stats = GetReadMetadata()->ReadStats;
- stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size();
+ stats->IndexPortions = GetReadMetadata()->SelectInfo->Portions.size();
stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs();
stats->CommittedBatches = GetReadMetadata()->CommittedBlobs.size();
stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount();
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h
index eb1e302ca21..be603922a06 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/constructor/read_metadata.h
@@ -12,7 +12,7 @@ public:
virtual bool Empty() const override {
Y_ABORT_UNLESS(SelectInfo);
- return SelectInfo->PortionsOrderedPK.empty();
+ return SelectInfo->Portions.empty();
}
virtual std::shared_ptr<IDataReader> BuildReader(const std::shared_ptr<TReadContext>& context) const override;
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp
index d794ff4a24a..eb8c21e291b 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.cpp
@@ -9,7 +9,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context)) {
ui32 sourceIdx = 0;
std::deque<std::shared_ptr<IDataSource>> sources;
- const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
+ const auto& portions = GetReadMetadata()->SelectInfo->Portions;
ui64 compactedPortionsBytes = 0;
ui64 insertedPortionsBytes = 0;
for (auto&& i : portions) {
@@ -26,7 +26,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
Scanner = std::make_shared<TScanHead>(std::move(sources), SpecialReadContext);
auto& stats = GetReadMetadata()->ReadStats;
- stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size();
+ stats->IndexPortions = GetReadMetadata()->SelectInfo->Portions.size();
stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs();
stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount();
stats->InsertedPortionsBytes = insertedPortionsBytes;
diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
index 8201deaf4c5..475c51a93fa 100644
--- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
@@ -566,28 +566,28 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
ui64 planStep = 1;
ui64 txId = 0;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
}
{ // select from snap between insert (greater txId)
ui64 planStep = 1;
ui64 txId = 2;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
}
{ // select from snap after insert (greater planStep)
ui64 planStep = 2;
ui64 txId = 1;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 1);
}
{ // select another pathId
ui64 planStep = 2;
ui64 txId = 1;
auto selectInfo = engine.Select(paths[1], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
}
}
@@ -657,7 +657,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20);
}
// predicates
@@ -671,7 +671,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
NOlap::TPKRangesFilter pkFilter(false);
Y_ABORT_UNLESS(pkFilter.Add(gt10k, nullptr, indexInfo.GetReplaceKey()));
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter, false);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10);
}
{
@@ -683,7 +683,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
NOlap::TPKRangesFilter pkFilter(false);
Y_ABORT_UNLESS(pkFilter.Add(nullptr, lt10k, indexInfo.GetReplaceKey()));
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter, false);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 9);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 9);
}
}
@@ -841,7 +841,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20);
}
// Cleanup
@@ -850,7 +850,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20);
}
// TTL
@@ -866,7 +866,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10);
}
}
{
@@ -882,7 +882,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10);
}
}
}
diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h
index 40a2a6586ab..f53042bf0e2 100644
--- a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h
+++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h
@@ -23,7 +23,6 @@ private:
std::set<ui64> WaitShardsBrokenFlags;
std::set<ui64> WaitShardsResultAck;
std::optional<bool> TxBroken;
- mutable TAtomicCounter ControlCounter = 0;
virtual NKikimrTxColumnShard::TCommitWriteTxBody SerializeToProto() const override {
NKikimrTxColumnShard::TCommitWriteTxBody result;
@@ -48,7 +47,7 @@ private:
virtual bool DoParseImpl(TColumnShard& /*owner*/, const NKikimrTxColumnShard::TCommitWriteTxBody& commitTxBody) override {
if (!commitTxBody.HasPrimaryTabletData()) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot read proto")("proto", commitTxBody.DebugString());
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_TX)("event", "cannot read proto")("proto", commitTxBody.DebugString());
return false;
}
auto& protoData = commitTxBody.GetPrimaryTabletData();
@@ -92,7 +91,7 @@ private:
copy.TxBroken = copy.TxBroken.value_or(false) || BrokenFlag;
Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, copy.SerializeToProto().SerializeAsString());
} else {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "repeated shard broken_flag info")("shard_id", TabletId);
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId);
}
return true;
}
@@ -101,11 +100,11 @@ private:
if (op->WaitShardsBrokenFlags.erase(TabletId)) {
op->TxBroken = op->TxBroken.value_or(false) || BrokenFlag;
op->SendBrokenFlagAck(*Self, TabletId);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "remove_tablet_id")("wait", JoinSeq(",", op->WaitShardsBrokenFlags))(
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "remove_tablet_id")("wait", JoinSeq(",", op->WaitShardsBrokenFlags))(
"receive", TabletId);
op->InitializeRequests(*Self);
} else {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "repeated shard broken_flag info")("shard_id", TabletId);
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId);
}
}
@@ -132,17 +131,18 @@ private:
virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId);
auto copy = *op;
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))("receive", TabletId);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))(
+ "receive", TabletId);
AFL_VERIFY(copy.WaitShardsResultAck.erase(TabletId));
Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, copy.SerializeToProto().SerializeAsString());
return true;
}
virtual void DoComplete(const NActors::TActorContext& /*ctx*/) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))(
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))(
"receive", TabletId);
if (!op->WaitShardsResultAck.erase(TabletId)) {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet_duplication")("wait", JoinSeq(",", op->WaitShardsResultAck))(
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet_duplication")("wait", JoinSeq(",", op->WaitShardsResultAck))(
"receive", TabletId);
}
op->CheckFinished(*Self);
@@ -174,7 +174,7 @@ private:
void CheckFinished(TColumnShard& owner) {
if (WaitShardsResultAck.empty()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "finished");
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "finished");
owner.EnqueueProgressTx(NActors::TActivationContext::AsActorContext(), GetTxId());
}
}
@@ -248,7 +248,7 @@ private:
if (op->WaitShardsBrokenFlags.empty()) {
AFL_VERIFY(op->WaitShardsResultAck.erase(Self->TabletID()));
}
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "remove_tablet_id")("wait", JoinSeq(",", op->WaitShardsBrokenFlags))(
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "remove_tablet_id")("wait", JoinSeq(",", op->WaitShardsBrokenFlags))(
"receive", Self->TabletID());
op->CheckFinished(*Self);
}
@@ -265,13 +265,11 @@ private:
InitializeRequests(owner);
}
+ virtual bool DoIsInProgress() const override {
+ return WaitShardsResultAck.size();
+ }
virtual std::unique_ptr<NTabletFlatExecutor::ITransaction> DoBuildTxPrepareForProgress(TColumnShard* owner) const override {
- if (WaitShardsResultAck.empty()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_prepare_for_progress")("lock_id", LockId);
- return nullptr;
- }
- AFL_VERIFY(ControlCounter.Inc() <= 1);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "prepare_for_progress_started")("lock_id", LockId);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "prepare_for_progress_started")("lock_id", LockId);
return std::make_unique<TTxStartPreparation>(owner, GetTxId());
}
diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h
index ae249b07995..8bbf9d4d6f5 100644
--- a/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h
+++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h
@@ -20,7 +20,6 @@ private:
bool NeedReceiveBroken = false;
bool ReceiveAck = false;
bool SelfBroken = false;
- mutable TAtomicCounter ControlCounter = 0;
std::optional<bool> TxBroken;
virtual NKikimrTxColumnShard::TCommitWriteTxBody SerializeToProto() const override {
@@ -38,7 +37,7 @@ private:
virtual bool DoParseImpl(TColumnShard& /*owner*/, const NKikimrTxColumnShard::TCommitWriteTxBody& commitTxBody) override {
if (!commitTxBody.HasSecondaryTabletData()) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot read proto")("proto", commitTxBody.DebugString());
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_TX)("event", "cannot read proto")("proto", commitTxBody.DebugString());
return false;
}
auto& protoData = commitTxBody.GetSecondaryTabletData();
@@ -187,13 +186,11 @@ private:
}
};
+ virtual bool DoIsInProgress() const override {
+ return !TxBroken && (NeedReceiveBroken || !ReceiveAck);
+ }
virtual std::unique_ptr<NTabletFlatExecutor::ITransaction> DoBuildTxPrepareForProgress(TColumnShard* owner) const override {
- if (TxBroken || (!NeedReceiveBroken && ReceiveAck)) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_prepare_for_progress")("lock_id", LockId);
- return nullptr;
- }
- AFL_VERIFY(ControlCounter.Inc() <= 1);
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "prepare_for_progress_started")("lock_id", LockId);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "prepare_for_progress_started")("lock_id", LockId);
return std::make_unique<TTxStartPreparation>(owner, GetTxId());
}
diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.cpp b/ydb/core/tx/columnshard/transactions/tx_controller.cpp
index afb1e8a33d5..a32ebbf0250 100644
--- a/ydb/core/tx/columnshard/transactions/tx_controller.cpp
+++ b/ydb/core/tx/columnshard/transactions/tx_controller.cpp
@@ -86,7 +86,7 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) {
return false;
}
}
- AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("override", countOverrideDeadline)("no_dl", countNoDeadline)("dl", countWithDeadline)(
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD_TX)("override", countOverrideDeadline)("no_dl", countNoDeadline)("dl", countWithDeadline)(
"operators", Operators.size())("plan", PlanQueue.size())("dl_queue", DeadlineQueue.size());
return true;
}
@@ -277,10 +277,10 @@ TDuration TTxController::GetTxCompleteLag(ui64 timecastStep) const {
TTxController::EPlanResult TTxController::PlanTx(const ui64 planStep, const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
auto it = Operators.find(txId);
if (it == Operators.end()) {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_plan_tx")("tx_id", txId);
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "skip_plan_tx")("tx_id", txId);
return EPlanResult::Skipped;
} else {
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "plan_tx")("tx_id", txId)("plan_step", it->second->MutableTxInfo().PlanStep);
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_TX)("event", "plan_tx")("tx_id", txId)("plan_step", it->second->MutableTxInfo().PlanStep);
}
auto& txInfo = it->second->MutableTxInfo();
if (txInfo.PlanStep == 0) {
@@ -308,12 +308,12 @@ std::shared_ptr<TTxController::ITransactionOperator> TTxController::StartPropose
const TTxController::TTxInfo& txInfo, const TString& txBody, NTabletFlatExecutor::TTransactionContext& txc) {
NActors::TLogContextGuard lGuard =
NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnExecute")("tx_info", txInfo.DebugString());
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start");
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "start");
std::shared_ptr<TTxController::ITransactionOperator> txOperator(
TTxController::ITransactionOperator::TFactory::Construct(txInfo.TxKind, txInfo));
AFL_VERIFY(!!txOperator);
if (!txOperator->Parse(Owner, txBody)) {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "cannot parse txOperator");
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_TX)("error", "cannot parse txOperator");
return txOperator;
}
Counters.OnStartProposeOnExecute(txOperator->GetOpType());
@@ -321,13 +321,13 @@ std::shared_ptr<TTxController::ITransactionOperator> TTxController::StartPropose
auto txInfoPtr = GetTxInfo(txInfo.TxId);
if (!!txInfoPtr) {
if (!txOperator->CheckAllowUpdate(*txInfoPtr)) {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "incorrect duplication")("actual_tx", txInfoPtr->DebugString());
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("error", "incorrect duplication")("actual_tx", txInfoPtr->DebugString());
TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR,
TStringBuilder() << "Another commit TxId# " << txInfo.TxId << " has already been proposed");
txOperator->SetProposeStartInfo(proposeResult);
return txOperator;
} else {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "update duplication data")("deprecated_tx", txInfoPtr->DebugString());
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("error", "update duplication data")("deprecated_tx", txInfoPtr->DebugString());
return UpdateTxSourceInfo(txOperator->GetTxInfo(), txc);
}
} else {
@@ -337,9 +337,9 @@ std::shared_ptr<TTxController::ITransactionOperator> TTxController::StartPropose
} else {
RegisterTx(txOperator, txBody, txc);
}
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "registered");
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "registered");
} else {
- AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", "problem on start")(
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_TX)("error", "problem on start")(
"message", txOperator->GetProposeStartInfoVerified().GetStatusMessage());
}
return txOperator;
@@ -349,7 +349,7 @@ std::shared_ptr<TTxController::ITransactionOperator> TTxController::StartPropose
void TTxController::StartProposeOnComplete(ITransactionOperator& txOperator, const TActorContext& ctx) {
NActors::TLogContextGuard lGuard =
NActors::TLogContextBuilder::Build()("method", "TTxController::StartProposeOnComplete")("tx_id", txOperator.GetTxId());
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start");
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "start");
txOperator.StartProposeOnComplete(Owner, ctx);
Counters.OnStartProposeOnComplete(txOperator.GetOpType());
}
@@ -357,18 +357,18 @@ void TTxController::StartProposeOnComplete(ITransactionOperator& txOperator, con
void TTxController::FinishProposeOnExecute(const ui64 txId, NTabletFlatExecutor::TTransactionContext& txc) {
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("method", "TTxController::FinishProposeOnExecute")("tx_id", txId);
if (auto txOperator = GetTxOperatorOptional(txId)) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start");
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "start");
txOperator->FinishProposeOnExecute(Owner, txc);
Counters.OnFinishProposeOnExecute(txOperator->GetOpType());
} else {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction base")("tx_id", txId);
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("error", "cannot found txOperator in propose transaction base")("tx_id", txId);
}
}
void TTxController::FinishProposeOnComplete(ITransactionOperator& txOperator, const TActorContext& ctx) {
NActors::TLogContextGuard lGuard =
NActors::TLogContextBuilder::Build()("method", "TTxController::FinishProposeOnComplete")("tx_id", txOperator.GetTxId());
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start")("tx_info", txOperator.GetTxInfo().DebugString());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "start")("tx_info", txOperator.GetTxInfo().DebugString());
TTxController::TProposeResult proposeResult = txOperator.GetProposeStartInfoVerified();
AFL_VERIFY(!txOperator.IsFail());
txOperator.FinishProposeOnComplete(Owner, ctx);
@@ -379,7 +379,7 @@ void TTxController::FinishProposeOnComplete(ITransactionOperator& txOperator, co
void TTxController::FinishProposeOnComplete(const ui64 txId, const TActorContext& ctx) {
auto txOperator = GetTxOperatorOptional(txId);
if (!txOperator) {
- AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot found txOperator in propose transaction finish")("tx_id", txId);
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("error", "cannot found txOperator in propose transaction finish")("tx_id", txId);
return;
}
return FinishProposeOnComplete(*txOperator, ctx);
diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.h b/ydb/core/tx/columnshard/transactions/tx_controller.h
index e48f10d3796..a546f50001f 100644
--- a/ydb/core/tx/columnshard/transactions/tx_controller.h
+++ b/ydb/core/tx/columnshard/transactions/tx_controller.h
@@ -198,6 +198,8 @@ public:
std::optional<EStatus> Status = EStatus::Created;
private:
+ mutable TAtomicCounter PreparationsStarted = 0;
+
friend class TTxController;
virtual bool DoParse(TColumnShard& owner, const TString& data) = 0;
virtual TTxController::TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) = 0;
@@ -215,6 +217,9 @@ public:
return false;
}
+ virtual bool DoIsInProgress() const {
+ return false;
+ }
virtual std::unique_ptr<NTabletFlatExecutor::ITransaction> DoBuildTxPrepareForProgress(TColumnShard* /*owner*/) const {
return nullptr;
}
@@ -240,6 +245,10 @@ public:
using TFactory = NObjectFactory::TParametrizedObjectFactory<ITransactionOperator, NKikimrTxColumnShard::ETransactionKind, TTxInfo>;
using OpType = TString;
+ bool IsInProgress() const {
+ return DoIsInProgress();
+ }
+
bool PingTimeout(TColumnShard& owner, const TMonotonic now) {
return DoPingTimeout(owner, now);
}
@@ -257,6 +266,13 @@ public:
}
std::unique_ptr<NTabletFlatExecutor::ITransaction> BuildTxPrepareForProgress(TColumnShard* owner) const {
+ if (!IsInProgress()) {
+ return nullptr;
+ }
+ if (PreparationsStarted.Val()) {
+ return nullptr;
+ }
+ PreparationsStarted.Inc();
return DoBuildTxPrepareForProgress(owner);
}
diff --git a/ydb/library/services/services.proto b/ydb/library/services/services.proto
index 5bd248386be..ee0a8a7929d 100644
--- a/ydb/library/services/services.proto
+++ b/ydb/library/services/services.proto
@@ -302,6 +302,7 @@ enum EServiceKikimr {
TX_COLUMNSHARD_ACTUALIZATION = 850;
TX_COLUMNSHARD_COMPACTION = 851;
TX_COLUMNSHARD_WRITE = 852;
+ TX_COLUMNSHARD_TX = 853;
// System views
SYSTEM_VIEWS = 900;