aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <chertus@gmail.com>2022-05-25 22:03:13 +0300
committerArtem Zuikov <chertus@gmail.com>2022-05-25 22:03:13 +0300
commit87ae86feb6850a95d5403ab12581ece07d1f32c0 (patch)
tree86add3518b35fed4e4093ab3dbc69e4c31833d1e
parent483dcbe8de1b955ec0556102f0d44ec0d461b8a0 (diff)
downloadydb-87ae86feb6850a95d5403ab12581ece07d1f32c0.tar.gz
KIKIMR-14958: protection for wrong write metadata
ref:d2b74ffc407940cf3f048d013f83055e066f3d27
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h4
-rw-r--r--ydb/core/tx/columnshard/columnshard_ut_common.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table.cpp20
-rw-r--r--ydb/core/tx/columnshard/engines/insert_table.h4
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp4
-rw-r--r--ydb/core/tx/columnshard/write_actor.cpp15
6 files changed, 29 insertions, 20 deletions
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index 229fc91c4c8..0a6eca77630 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -505,13 +505,13 @@ struct Schema : NIceDb::Schema {
switch (recType) {
case EInsertTableIds::Inserted:
- inserted[TWriteId{data.WriteTxId}] = std::move(data);
+ inserted.emplace(TWriteId{data.WriteTxId}, std::move(data));
break;
case EInsertTableIds::Committed:
committed[data.PathId].emplace(data);
break;
case EInsertTableIds::Aborted:
- aborted[TWriteId{data.WriteTxId}] = std::move(data);
+ aborted.emplace(TWriteId{data.WriteTxId}, std::move(data));
break;
}
diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
index fc65951124f..0b38308548a 100644
--- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp
@@ -37,6 +37,8 @@ void TTester::Setup(TTestActorRuntime& runtime) {
app.AddDomain(domain.Release());
SetupTabletServices(runtime, &app);
+
+ runtime.UpdateCurrentTime(TInstant::Now());
}
bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap) {
diff --git a/ydb/core/tx/columnshard/engines/insert_table.cpp b/ydb/core/tx/columnshard/engines/insert_table.cpp
index 3ae5d4f713e..88295ba805d 100644
--- a/ydb/core/tx/columnshard/engines/insert_table.cpp
+++ b/ydb/core/tx/columnshard/engines/insert_table.cpp
@@ -6,14 +6,14 @@
namespace NKikimr::NOlap {
-bool TInsertTable::Insert(IDbWrapper& dbTable, const TInsertedData& data) {
+bool TInsertTable::Insert(IDbWrapper& dbTable, TInsertedData&& data) {
TWriteId writeId{data.WriteTxId};
if (Inserted.count(writeId)) {
return false;
}
dbTable.Insert(data);
- Inserted[writeId] = data;
+ Inserted.emplace(writeId, std::move(data));
return true;
}
@@ -51,14 +51,14 @@ void TInsertTable::Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWr
Y_UNUSED(metaShard);
for (auto writeId : writeIds) {
- auto* data = Inserted.FindPtr(writeId);
- Y_VERIFY(data, "Abort writeId %" PRIu64 " not found", (ui64)writeId);
-
- dbTable.EraseInserted(*data);
- dbTable.Abort(*data);
+ // There could be inconsistency with txs and writes in case of bugs. So we could find no record for writeId.
+ if (auto* data = Inserted.FindPtr(writeId)) {
+ dbTable.EraseInserted(*data);
+ dbTable.Abort(*data);
- Aborted[writeId] = std::move(Inserted[writeId]);
- Inserted.erase(writeId);
+ Aborted.emplace(writeId, std::move(*data));
+ Inserted.erase(writeId);
+ }
}
}
@@ -74,7 +74,7 @@ THashSet<TWriteId> TInsertTable::AbortOld(IDbWrapper& dbTable, const TInstant& n
TInstant timeBorder = now - WaitCommitDelay;
THashSet<TWriteId> toAbort;
for (auto& [writeId, data] : Inserted) {
- if (data.DirtyTime < timeBorder) {
+ if (data.DirtyTime && data.DirtyTime < timeBorder) {
toAbort.insert(writeId);
}
}
diff --git a/ydb/core/tx/columnshard/engines/insert_table.h b/ydb/core/tx/columnshard/engines/insert_table.h
index bd2ee798224..ab25ddd7213 100644
--- a/ydb/core/tx/columnshard/engines/insert_table.h
+++ b/ydb/core/tx/columnshard/engines/insert_table.h
@@ -15,7 +15,7 @@ struct TInsertedData {
TString Metadata;
TInstant DirtyTime;
- TInsertedData() = default;
+ TInsertedData() = delete; // avoid invalid TInsertedData anywhere
TInsertedData(ui64 shardOrPlan, ui64 writeTxId, ui64 pathId, TString dedupId, const TUnifiedBlobId& blobId,
const TString& meta, const TInstant& writeTime)
@@ -112,7 +112,7 @@ public:
ui64 RawBytes{};
};
- bool Insert(IDbWrapper& dbTable, const TInsertedData& data);
+ bool Insert(IDbWrapper& dbTable, TInsertedData&& data);
TCounters Commit(IDbWrapper& dbTable, ui64 planStep, ui64 txId, ui64 metaShard, const THashSet<TWriteId>& writeIds);
void Abort(IDbWrapper& dbTable, ui64 metaShard, const THashSet<TWriteId>& writeIds);
THashSet<TWriteId> AbortOld(IDbWrapper& dbTable, const TInstant& now);
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index 42f15440077..fb3546293ce 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -20,7 +20,7 @@ public:
};
void Insert(const TInsertedData& data) override {
- Inserted[TWriteId{data.WriteTxId}] = data;
+ Inserted.emplace(TWriteId{data.WriteTxId}, data);
}
void Commit(const TInsertedData& data) override {
@@ -28,7 +28,7 @@ public:
}
void Abort(const TInsertedData& data) override {
- Aborted[TWriteId{data.WriteTxId}] = data;
+ Aborted.emplace(TWriteId{data.WriteTxId}, data);
}
void EraseInserted(const TInsertedData& data) override {
diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp
index 1a96ba7b44e..e32d1f1f9a6 100644
--- a/ydb/core/tx/columnshard/write_actor.cpp
+++ b/ydb/core/tx/columnshard/write_actor.cpp
@@ -67,7 +67,6 @@ public:
LOG_S_WARN("TEvWakeup: write timeout at tablet " << TabletId << " (write)");
SendResultAndDie(ctx, NKikimrProto::TIMEOUT);
- return;
}
void SendResultAndDie(const TActorContext& ctx, NKikimrProto::EReplyStatus status) {
@@ -147,12 +146,20 @@ public:
record.SetData(data); // modify for TxWrite
{ // Update meta
+ ui64 dirtyTime = AppData(ctx)->TimeProvider->Now().Seconds();
+ Y_VERIFY(dirtyTime);
+
NKikimrTxColumnShard::TLogicalMetadata outMeta;
outMeta.SetNumRows(batch->num_rows());
outMeta.SetRawBytes(NArrow::GetBatchDataSize(batch));
- outMeta.SetDirtyWriteTimeSeconds(AppData(ctx)->TimeProvider->Now().Seconds());
- // TODO: Add FirstPkValue & LastPkValue if needed
- Y_PROTOBUF_SUPPRESS_NODISCARD outMeta.SerializeToString(&meta);
+ outMeta.SetDirtyWriteTimeSeconds(dirtyTime);
+
+ meta.clear();
+ if (!outMeta.SerializeToString(&meta)) {
+ LOG_S_ERROR("Canot set metadata for writing blob at tablet " << TabletId);
+ SendResultAndDie(ctx, NKikimrProto::ERROR);
+ return;
+ }
}
record.MutableMeta()->SetLogicalMeta(meta);