diff options
author | Artem Zuikov <chertus@gmail.com> | 2022-05-25 22:03:13 +0300 |
---|---|---|
committer | Artem Zuikov <chertus@gmail.com> | 2022-05-25 22:03:13 +0300 |
commit | 87ae86feb6850a95d5403ab12581ece07d1f32c0 (patch) | |
tree | 86add3518b35fed4e4093ab3dbc69e4c31833d1e | |
parent | 483dcbe8de1b955ec0556102f0d44ec0d461b8a0 (diff) | |
download | ydb-87ae86feb6850a95d5403ab12581ece07d1f32c0.tar.gz |
KIKIMR-14958: protection for wrong write metadata
ref:d2b74ffc407940cf3f048d013f83055e066f3d27
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_schema.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_ut_common.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/insert_table.cpp | 20 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/insert_table.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/ut_logs_engine.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/write_actor.cpp | 15 |
6 files changed, 29 insertions, 20 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 229fc91c4c8..0a6eca77630 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -505,13 +505,13 @@ struct Schema : NIceDb::Schema { switch (recType) { case EInsertTableIds::Inserted: - inserted[TWriteId{data.WriteTxId}] = std::move(data); + inserted.emplace(TWriteId{data.WriteTxId}, std::move(data)); break; case EInsertTableIds::Committed: committed[data.PathId].emplace(data); break; case EInsertTableIds::Aborted: - aborted[TWriteId{data.WriteTxId}] = std::move(data); + aborted.emplace(TWriteId{data.WriteTxId}, std::move(data)); break; } diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index fc65951124f..0b38308548a 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -37,6 +37,8 @@ void TTester::Setup(TTestActorRuntime& runtime) { app.AddDomain(domain.Release()); SetupTabletServices(runtime, &app); + + runtime.UpdateCurrentTime(TInstant::Now()); } bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap) { diff --git a/ydb/core/tx/columnshard/engines/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table.cpp index 3ae5d4f713e..88295ba805d 100644 --- a/ydb/core/tx/columnshard/engines/insert_table.cpp +++ b/ydb/core/tx/columnshard/engines/insert_table.cpp @@ -6,14 +6,14 @@ namespace NKikimr::NOlap { -bool TInsertTable::Insert(IDbWrapper& dbTable, const TInsertedData& data) { +bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) { TWriteId writeId{data.WriteTxId}; if (Inserted.count(writeId)) { return false; } dbTable.Insert(data); - Inserted[writeId] = data; + Inserted.emplace(writeId, std::move(data)); return true; } @@ -51,14 +51,14 @@ void TInsertTable::Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWr Y_UNUSED(metaShard); for (auto writeId : writeIds) { - auto* data = Inserted.FindPtr(writeId); - Y_VERIFY(data, "Abort writeId %" PRIu64 " not found", (ui64)writeId); - - dbTable.EraseInserted(*data); - dbTable.Abort(*data); + // There could be inconsistency with txs and writes in case of bugs. So we could find no record for writeId. + if (auto* data = Inserted.FindPtr(writeId)) { + dbTable.EraseInserted(*data); + dbTable.Abort(*data); - Aborted[writeId] = std::move(Inserted[writeId]); - Inserted.erase(writeId); + Aborted.emplace(writeId, std::move(*data)); + Inserted.erase(writeId); + } } } @@ -74,7 +74,7 @@ THashSet<TWriteId> TInsertTable::AbortOld(IDbWrapper& dbTable, const TInstant& n TInstant timeBorder = now - WaitCommitDelay; THashSet<TWriteId> toAbort; for (auto& [writeId, data] : Inserted) { - if (data.DirtyTime < timeBorder) { + if (data.DirtyTime && data.DirtyTime < timeBorder) { toAbort.insert(writeId); } } diff --git a/ydb/core/tx/columnshard/engines/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table.h index bd2ee798224..ab25ddd7213 100644 --- a/ydb/core/tx/columnshard/engines/insert_table.h +++ b/ydb/core/tx/columnshard/engines/insert_table.h @@ -15,7 +15,7 @@ struct TInsertedData { TString Metadata; TInstant DirtyTime; - TInsertedData() = default; + TInsertedData() = delete; // avoid invalid TInsertedData anywhere TInsertedData(ui64 shardOrPlan, ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId, const TString& meta, const TInstant& writeTime) @@ -112,7 +112,7 @@ public: ui64 RawBytes{}; }; - bool Insert(IDbWrapper& dbTable, const TInsertedData& data); + bool Insert(IDbWrapper& dbTable, TInsertedData&& data); TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, const THashSet<TWriteId>& writeIds); void Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWriteId>& writeIds); THashSet<TWriteId> AbortOld(IDbWrapper& dbTable, const TInstant& now); diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index 42f15440077..fb3546293ce 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -20,7 +20,7 @@ public: }; void Insert(const TInsertedData& data) override { - Inserted[TWriteId{data.WriteTxId}] = data; + Inserted.emplace(TWriteId{data.WriteTxId}, data); } void Commit(const TInsertedData& data) override { @@ -28,7 +28,7 @@ public: } void Abort(const TInsertedData& data) override { - Aborted[TWriteId{data.WriteTxId}] = data; + Aborted.emplace(TWriteId{data.WriteTxId}, data); } void EraseInserted(const TInsertedData& data) override { diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp index 1a96ba7b44e..e32d1f1f9a6 100644 --- a/ydb/core/tx/columnshard/write_actor.cpp +++ b/ydb/core/tx/columnshard/write_actor.cpp @@ -67,7 +67,6 @@ public: LOG_S_WARN("TEvWakeup: write timeout at tablet " << TabletId << " (write)"); SendResultAndDie(ctx, NKikimrProto::TIMEOUT); - return; } void SendResultAndDie(const TActorContext& ctx, NKikimrProto::EReplyStatus status) { @@ -147,12 +146,20 @@ public: record.SetData(data); // modify for TxWrite { // Update meta + ui64 dirtyTime = AppData(ctx)->TimeProvider->Now().Seconds(); + Y_VERIFY(dirtyTime); + NKikimrTxColumnShard::TLogicalMetadata outMeta; outMeta.SetNumRows(batch->num_rows()); outMeta.SetRawBytes(NArrow::GetBatchDataSize(batch)); - outMeta.SetDirtyWriteTimeSeconds(AppData(ctx)->TimeProvider->Now().Seconds()); - // TODO: Add FirstPkValue & LastPkValue if needed - Y_PROTOBUF_SUPPRESS_NODISCARD outMeta.SerializeToString(&meta); + outMeta.SetDirtyWriteTimeSeconds(dirtyTime); + + meta.clear(); + if (!outMeta.SerializeToString(&meta)) { + LOG_S_ERROR("Canot set metadata for writing blob at tablet " << TabletId); + SendResultAndDie(ctx, NKikimrProto::ERROR); + return; + } } record.MutableMeta()->SetLogicalMeta(meta); |