aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-09-06 12:58:27 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-09-06 12:58:27 +0300
commite9326a3327ea548d69b98df2718079a2617d0315 (patch)
tree299801801810746c7dd0ac59a595d44c9319e04f
parent2a7533a248335565c1573eda193110fdf25781b3 (diff)
downloadydb-e9326a3327ea548d69b98df2718079a2617d0315.tar.gz
prevent evicting duplication and ttl conflict
-rw-r--r--ydb/core/tx/columnshard/blob_manager.cpp28
-rw-r--r--ydb/core/tx/columnshard/blob_manager.h24
-rw-r--r--ydb/core/tx/columnshard/columnshard__export.cpp33
-rw-r--r--ydb/core/tx/columnshard/columnshard__write_index.cpp14
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp14
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h11
-rw-r--r--ydb/core/tx/columnshard/columnshard_txs.h20
-rw-r--r--ydb/core/tx/columnshard/export_actor.cpp7
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/s3_actor.cpp6
10 files changed, 95 insertions, 64 deletions
diff --git a/ydb/core/tx/columnshard/blob_manager.cpp b/ydb/core/tx/columnshard/blob_manager.cpp
index db29182c8c0..5d67ee6e56b 100644
--- a/ydb/core/tx/columnshard/blob_manager.cpp
+++ b/ydb/core/tx/columnshard/blob_manager.cpp
@@ -693,4 +693,32 @@ void TBlobManager::SetBlobInUse(const TUnifiedBlobId& blobId, bool inUse) {
NBlobCache::ForgetBlob(blobId);
}
+bool TBlobManager::IsEvicting(const TUnifiedBlobId& id) {
+ TEvictMetadata meta;
+ return GetEvicted(id, meta).State == EEvictState::EVICTING;
+}
+
+bool TBlobManager::ExtractEvicted(TEvictedBlob& evict, TEvictMetadata& meta, bool fromDropped /*= false*/) {
+ if (fromDropped) {
+ if (DroppedEvictedBlobs.count(evict)) {
+ auto node = DroppedEvictedBlobs.extract(evict);
+ if (!node.empty()) {
+ evict = node.key();
+ meta = node.mapped();
+ return true;
+ }
+ }
+ } else {
+ if (EvictedBlobs.count(evict)) {
+ auto node = EvictedBlobs.extract(evict);
+ if (!node.empty()) {
+ evict = node.key();
+ meta = node.mapped();
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
}
diff --git a/ydb/core/tx/columnshard/blob_manager.h b/ydb/core/tx/columnshard/blob_manager.h
index 0d30720126b..f1bd7af5b3e 100644
--- a/ydb/core/tx/columnshard/blob_manager.h
+++ b/ydb/core/tx/columnshard/blob_manager.h
@@ -226,6 +226,7 @@ public:
CountersUpdate = TBlobManagerCounters();
return res;
}
+ bool IsEvicting(const TUnifiedBlobId& id);
// Implementation of IBlobManager interface
TBlobBatch StartBlobBatch(ui32 channel = BLOB_CHANNEL) override;
@@ -255,28 +256,7 @@ private:
// Delete small blobs that were previously in use and could not be deleted
void PerformDelayedDeletes(IBlobManagerDb& db);
- bool ExtractEvicted(TEvictedBlob& evict, TEvictMetadata& meta, bool fromDropped = false) {
- if (fromDropped) {
- if (DroppedEvictedBlobs.count(evict)) {
- auto node = DroppedEvictedBlobs.extract(evict);
- if (!node.empty()) {
- evict = node.key();
- meta = node.mapped();
- return true;
- }
- }
- } else {
- if (EvictedBlobs.count(evict)) {
- auto node = EvictedBlobs.extract(evict);
- if (!node.empty()) {
- evict = node.key();
- meta = node.mapped();
- return true;
- }
- }
- }
- return false;
- }
+ bool ExtractEvicted(TEvictedBlob& evict, TEvictMetadata& meta, bool fromDropped = false);
};
}
diff --git a/ydb/core/tx/columnshard/columnshard__export.cpp b/ydb/core/tx/columnshard/columnshard__export.cpp
index d92e6e8b9a2..2caf42183a8 100644
--- a/ydb/core/tx/columnshard/columnshard__export.cpp
+++ b/ydb/core/tx/columnshard/columnshard__export.cpp
@@ -1,16 +1,17 @@
#include "columnshard_impl.h"
#include "blob_manager_db.h"
+#include "columnshard_schema.h"
namespace NKikimr::NColumnShard {
using namespace NTabletFlatExecutor;
-class TTxExport : public TTransactionBase<TColumnShard> {
+class TTxExportFinish: public TTransactionBase<TColumnShard> {
public:
- TTxExport(TColumnShard* self, TEvPrivate::TEvExport::TPtr& ev)
+ TTxExportFinish(TColumnShard* self, TEvPrivate::TEvExport::TPtr& ev)
: TBase(self)
- , Ev(ev)
- {}
+ , Ev(ev) {
+ }
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
void Complete(const TActorContext& ctx) override;
@@ -22,9 +23,9 @@ private:
};
-bool TTxExport::Execute(TTransactionContext& txc, const TActorContext&) {
+bool TTxExportFinish::Execute(TTransactionContext& txc, const TActorContext&) {
Y_VERIFY(Ev);
- LOG_S_DEBUG("TTxExport.Execute at tablet " << Self->TabletID());
+ LOG_S_DEBUG("TTxExportFinish.Execute at tablet " << Self->TabletID());
txc.DB.NoMoreReadsForTx();
//NIceDb::TNiceDb db(txc.DB);
@@ -92,13 +93,13 @@ bool TTxExport::Execute(TTransactionContext& txc, const TActorContext&) {
return true;
}
-void TTxExport::Complete(const TActorContext& ctx) {
+void TTxExportFinish::Complete(const TActorContext& ctx) {
Y_VERIFY(Ev);
- LOG_S_DEBUG("TTxExport.Complete at tablet " << Self->TabletID());
+ LOG_S_DEBUG("TTxExportFinish.Complete at tablet " << Self->TabletID());
auto& msg = *Ev->Get();
Y_VERIFY(!msg.TierName.empty());
-
+ Self->ActiveEviction = false;
if (!BlobsToForget.empty()) {
Self->ForgetBlobs(ctx, msg.TierName, std::move(BlobsToForget));
}
@@ -107,26 +108,28 @@ void TTxExport::Complete(const TActorContext& ctx) {
void TColumnShard::Handle(TEvPrivate::TEvExport::TPtr& ev, const TActorContext& ctx) {
auto status = ev->Get()->Status;
+
+ Y_VERIFY(!ActiveTtl, "TTL already in progress at tablet %lu", TabletID());
+ Y_VERIFY(!ActiveEviction || status != NKikimrProto::UNKNOWN, "Eviction in progress at tablet %lu", TabletID());
ui64 exportNo = ev->Get()->ExportNo;
auto& tierName = ev->Get()->TierName;
- bool error = status == NKikimrProto::ERROR;
- if (error) {
+ if (status == NKikimrProto::ERROR) {
LOG_S_WARN("Export (fail): " << exportNo << " tier '" << tierName << "' error: "
- << ev->Get()->ErrorStr << "' at tablet " << TabletID());
+ << ev->Get()->SerializeErrorsToString() << "' at tablet " << TabletID());
+ ActiveEviction = false;
} else if (status == NKikimrProto::UNKNOWN) {
LOG_S_DEBUG("Export (write): " << exportNo << " tier '" << tierName << "' at tablet " << TabletID());
-
auto& tierBlobs = ev->Get()->Blobs;
Y_VERIFY(tierBlobs.size());
ExportBlobs(ctx, exportNo, tierName, std::move(tierBlobs));
} else if (status == NKikimrProto::OK) {
LOG_S_DEBUG("Export (apply): " << exportNo << " tier '" << tierName << "' at tablet " << TabletID());
-
- Execute(new TTxExport(this, ev), ctx);
+ Execute(new TTxExportFinish(this, ev), ctx);
} else {
Y_VERIFY(false);
}
+ ActiveEviction = true;
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__write_index.cpp b/ydb/core/tx/columnshard/columnshard__write_index.cpp
index 8f35ec52a4a..e0233685047 100644
--- a/ydb/core/tx/columnshard/columnshard__write_index.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write_index.cpp
@@ -25,7 +25,7 @@ public:
private:
TEvPrivate::TEvWriteIndex::TPtr Ev;
THashMap<TUnifiedBlobId, TString> BlobsToExport;
- THashMap<TString, THashMap<TUnifiedBlobId, TString>> ExportTierBlobs;
+ THashMap<TString, THashSet<TUnifiedBlobId>> ExportTierBlobs;
THashMap<TString, std::vector<NOlap::TEvictedBlob>> TierBlobsToForget;
ui64 ExportNo = 0;
};
@@ -217,7 +217,7 @@ bool TTxWriteIndex::Execute(TTransactionContext& txc, const TActorContext& ctx)
if (BlobsToExport.size()) {
size_t numBlobs = BlobsToExport.size();
for (auto& [blobId, tierName] : BlobsToExport) {
- ExportTierBlobs[tierName].emplace(blobId, TString{});
+ ExportTierBlobs[tierName].emplace(blobId);
}
BlobsToExport.clear();
@@ -280,7 +280,15 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
for (auto& [tierName, blobIds] : ExportTierBlobs) {
Y_VERIFY(ExportNo);
- ctx.Send(Self->SelfId(), new TEvPrivate::TEvExport(ExportNo, tierName, std::move(blobIds)));
+
+ TEvPrivate::TEvExport::TBlobDataMap blobsData;
+ for (auto&& i : blobIds) {
+ TEvPrivate::TEvExport::TExportBlobInfo info;
+ info.Evicting = Self->BlobManager->IsEvicting(i);
+ blobsData.emplace(i, std::move(info));
+ }
+
+ ctx.Send(Self->SelfId(), new TEvPrivate::TEvExport(ExportNo, tierName, std::move(blobsData)));
++ExportNo;
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 99b9ee07710..d469f0bc599 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -861,12 +861,13 @@ void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMeta
}
}
-TActorId TColumnShard::GetS3ActorForTier(const TString& tierName, const TString& phase) {
- if (!S3Actors.count(tierName)) {
+TActorId TColumnShard::GetS3ActorForTier(const TString& tierName, const TString& phase) const {
+ auto it = S3Actors.find(tierName);
+ if (it == S3Actors.end()) {
LOG_S_ERROR("No S3 actor for tier '" << tierName << "' (on " << phase << ") at tablet " << TabletID());
return {};
}
- auto s3 = S3Actors[tierName];
+ auto s3 = it->second;
if (!s3) {
LOG_S_ERROR("Not started S3 actor for tier '" << tierName << "' (on " << phase << ") at tablet " << TabletID());
return {};
@@ -875,15 +876,14 @@ TActorId TColumnShard::GetS3ActorForTier(const TString& tierName, const TString&
}
void TColumnShard::ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName,
- THashMap<TUnifiedBlobId, TString>&& blobsIds) {
+ TEvPrivate::TEvExport::TBlobDataMap&& blobsInfo) const {
if (auto s3 = GetS3ActorForTier(tierName, "export")) {
- auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, tierName, s3, std::move(blobsIds));
+ auto event = std::make_unique<TEvPrivate::TEvExport>(exportNo, tierName, s3, std::move(blobsInfo));
ctx.Register(CreateExportActor(TabletID(), ctx.SelfID, event.release()));
}
}
-void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName,
- std::vector<NOlap::TEvictedBlob>&& blobs) {
+void TColumnShard::ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs) const {
if (auto s3 = GetS3ActorForTier(tierName, "forget")) {
auto forget = std::make_unique<TEvPrivate::TEvForget>();
forget->Evicted = std::move(blobs);
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 5ec77f71a9c..b7e49660e9c 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -34,7 +34,7 @@ IActor* CreateReadActor(ui64 tabletId,
const TActorId& columnShardActorId,
ui64 requestCookie);
IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId);
-IActor* CreateExportActor(ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev);
+IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev);
#ifndef KIKIMR_DISABLE_S3_OPS
IActor* CreateS3Actor(ui64 tabletId, const TActorId& parent, const TString& tierName);
#endif
@@ -85,7 +85,7 @@ class TColumnShard
friend class TTxRead;
friend class TTxScan;
friend class TTxWriteIndex;
- friend class TTxExport;
+ friend class TTxExportFinish;
friend class TTxForget;
friend class TTxRunGC;
friend class TTxProcessGCResult;
@@ -382,6 +382,7 @@ private:
bool ActiveIndexingOrCompaction = false;
bool ActiveCleanup = false;
bool ActiveTtl = false;
+ bool ActiveEviction = false;
std::unique_ptr<TBlobManager> BlobManager;
TInFlightReadsTracker InFlightReadsTracker;
TSettings Settings;
@@ -442,10 +443,10 @@ private:
NOlap::TIndexInfo ConvertSchema(const NKikimrSchemeOp::TColumnTableSchema& schema);
void MapExternBlobs(const TActorContext& ctx, NOlap::TReadMetadata& metadata);
- TActorId GetS3ActorForTier(const TString& tierName, const TString& phase);
+ TActorId GetS3ActorForTier(const TString& tierName, const TString& phase) const;
void ExportBlobs(const TActorContext& ctx, ui64 exportNo, const TString& tierName,
- THashMap<TUnifiedBlobId, TString>&& blobsIds);
- void ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs);
+ TEvPrivate::TEvExport::TBlobDataMap&& blobsInfo) const;
+ void ForgetBlobs(const TActorContext& ctx, const TString& tierName, std::vector<NOlap::TEvictedBlob>&& blobs) const;
bool GetExportedBlob(const TActorContext& ctx, TActorId dst, ui64 cookie, const TString& tierName,
NOlap::TEvictedBlob&& evicted, std::vector<NOlap::TBlobRange>&& ranges);
ui32 InitS3Actors(const TActorContext& ctx, bool init);
diff --git a/ydb/core/tx/columnshard/columnshard_txs.h b/ydb/core/tx/columnshard/columnshard_txs.h
index 4f6a0e5372b..4616ae298ab 100644
--- a/ydb/core/tx/columnshard/columnshard_txs.h
+++ b/ydb/core/tx/columnshard/columnshard_txs.h
@@ -125,15 +125,19 @@ struct TEvPrivate {
};
struct TEvExport : public TEventLocal<TEvExport, EvExport> {
- using TBlobDataMap = THashMap<TUnifiedBlobId, TString>;
+ struct TExportBlobInfo {
+ TString Data;
+ bool Evicting = false;
+ };
+ using TBlobDataMap = THashMap<TUnifiedBlobId, TExportBlobInfo>;
NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN;
- ui64 ExportNo{};
+ ui64 ExportNo = 0;
TString TierName;
TActorId DstActor;
TBlobDataMap Blobs;
THashMap<TUnifiedBlobId, TUnifiedBlobId> SrcToDstBlobs;
- TString ErrorStr;
+ TMap<TString, TString> ErrorStrings;
explicit TEvExport(ui64 exportNo, const TString& tierName, TBlobDataMap&& tierBlobs)
: ExportNo(exportNo)
@@ -156,9 +160,17 @@ struct TEvPrivate {
Y_VERIFY(DstActor);
Y_VERIFY(!Blobs.empty());
}
+
+ TString SerializeErrorsToString() const {
+ TStringBuilder sb;
+ for (auto&& i : ErrorStrings) {
+ sb << i.first << "=" << i.second << ";";
+ }
+ return sb;
+ }
};
- struct TEvForget : public TEventLocal<TEvForget, EvForget> {
+ struct TEvForget: public TEventLocal<TEvForget, EvForget> {
NKikimrProto::EReplyStatus Status = NKikimrProto::UNKNOWN;
std::vector<NOlap::TEvictedBlob> Evicted;
TString ErrorStr;
diff --git a/ydb/core/tx/columnshard/export_actor.cpp b/ydb/core/tx/columnshard/export_actor.cpp
index 4038f1cde3a..bba0020fe3b 100644
--- a/ydb/core/tx/columnshard/export_actor.cpp
+++ b/ydb/core/tx/columnshard/export_actor.cpp
@@ -22,8 +22,7 @@ public:
}
void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) {
- LOG_S_DEBUG("TEvReadBlobRangeResult (waiting " << BlobsToRead.size()
- << ") at tablet " << TabletId << " (export)");
+ LOG_S_DEBUG("TEvReadBlobRangeResult (waiting " << BlobsToRead.size() << ") at tablet " << TabletId << " (export)");
auto& event = *ev->Get();
const TUnifiedBlobId& blobId = event.BlobRange.BlobId;
@@ -49,7 +48,7 @@ public:
BlobsToRead.erase(blobId);
Y_VERIFY(Event);
- Event->Blobs[blobId] = blobData;
+ Event->Blobs[blobId].Data = blobData;
if (BlobsToRead.empty()) {
SendResultAndDie(ctx);
@@ -99,7 +98,7 @@ private:
}
};
-IActor* CreateExportActor(ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev) {
+IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev) {
return new TExportActor(tabletId, dstActor, ev);
}
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 2fff0b14416..ab49a66f592 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -35,7 +35,7 @@ public:
auto& event = *ev->Get();
const TUnifiedBlobId& blobId = event.BlobRange.BlobId;
- Y_VERIFY(event.Data.size() == event.BlobRange.Size);
+ Y_VERIFY(event.Data.size() == event.BlobRange.Size, "%zu, %d", event.Data.size(), event.BlobRange.Size);
if (IndexedBlobs.count(event.BlobRange)) {
if (!WaitIndexed.count(event.BlobRange)) {
diff --git a/ydb/core/tx/columnshard/s3_actor.cpp b/ydb/core/tx/columnshard/s3_actor.cpp
index bd71bafdc76..29c2119ffd1 100644
--- a/ydb/core/tx/columnshard/s3_actor.cpp
+++ b/ydb/core/tx/columnshard/s3_actor.cpp
@@ -27,7 +27,7 @@ struct TS3Export {
: Event(ev.Release())
{}
- THashMap<TUnifiedBlobId, TString>& Blobs() {
+ TEvPrivate::TEvExport::TBlobDataMap& Blobs() {
return Event->Blobs;
}
@@ -123,7 +123,7 @@ public:
ex.KeysToWrite.emplace(key);
ExportingKeys[key] = exportNo;
- SendPutObject(key, std::move(blob));
+ SendPutObject(key, std::move(blob.Data));
}
}
@@ -204,7 +204,7 @@ public:
if (!errStr.empty()) {
ex.Event->Status = NKikimrProto::ERROR;
- ex.Event->ErrorStr = errStr;
+ ex.Event->ErrorStrings.emplace(key, errStr);
Send(ShardActor, ex.Event.release());
Exports.erase(exportNo);
} else if (ex.KeysToWrite.empty()) {