diff options
author | chertus <azuikov@ydb.tech> | 2023-06-30 18:23:22 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-06-30 18:23:22 +0300 |
commit | 83e9195e30a0c2891f89d3cddf69b1c863cf3c52 (patch) | |
tree | 1739d19b1e9cc39781721385659942282fb426b1 | |
parent | b454398de8bd6fda1251382513891ede660e6755 (diff) | |
download | ydb-83e9195e30a0c2891f89d3cddf69b1c863cf3c52.tar.gz |
fix TCompactedBlobsConstructor
6 files changed, 82 insertions, 63 deletions
diff --git a/ydb/core/tx/columnshard/engines/writer/blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/blob_constructor.h index 7d4f720b484..d35d27d7109 100644 --- a/ydb/core/tx/columnshard/engines/writer/blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/blob_constructor.h @@ -32,9 +32,13 @@ public: virtual ~IBlobConstructor() {} virtual const TString& GetBlob() const = 0; virtual bool RegisterBlobId(const TUnifiedBlobId& blobId) = 0; - virtual EStatus BuildNext(NColumnShard::TUsage& resourceUsage, const TAppData& appData) = 0; + virtual EStatus BuildNext() = 0; + virtual NColumnShard::TUsage& GetResourceUsage() = 0; - virtual TAutoPtr<NActors::IEventBase> BuildResult(NKikimrProto::EReplyStatus status, NColumnShard::TBlobBatch&& blobBatch, THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels, const NColumnShard::TUsage& resourceUsage) = 0; + virtual TAutoPtr<NActors::IEventBase> BuildResult( + NKikimrProto::EReplyStatus status, + NColumnShard::TBlobBatch&& blobBatch, + THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels) = 0; }; } diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp index 2c8c412fd56..06ab340ddc6 100644 --- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.cpp @@ -12,9 +12,15 @@ TCompactedBlobsConstructor::TCompactedBlobsConstructor(TAutoPtr<NColumnShard::TE , Blobs(WriteIndexEv->Blobs) , BlobGrouppingEnabled(blobGrouppingEnabled) , CacheData(WriteIndexEv->CacheData) + , IsEviction(IndexChanges.PortionsToEvict.size() > 0) { - auto indexChanges = WriteIndexEv->IndexChanges; - LastPortion = indexChanges->AppendedPortions.size() + indexChanges->PortionsToEvict.size(); + if (IsEviction) { + Y_VERIFY(IndexChanges.AppendedPortions.empty()); + LastPortion = IndexChanges.PortionsToEvict.size(); + } else { + Y_VERIFY(IndexChanges.PortionsToEvict.empty()); + LastPortion = IndexChanges.AppendedPortions.size(); + } Y_VERIFY(Blobs.size() > 0); } @@ -26,7 +32,7 @@ bool TCompactedBlobsConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) { Y_VERIFY(AccumulatedBlob.size() > 0); Y_VERIFY(RecordsInBlob.size() > 0); - auto& portionInfo = PortionUpdates.back(); + auto& portionInfo = PortionUpdates.back(); LOG_S_TRACE("Write Index Blob " << blobId << " with " << RecordsInBlob.size() << " records"); for (const auto& rec : RecordsInBlob) { size_t i = rec.first; @@ -45,37 +51,32 @@ bool TCompactedBlobsConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) { return true; } -IBlobConstructor::EStatus TCompactedBlobsConstructor::BuildNext(NColumnShard::TUsage& resourceUsage, const TAppData& /*appData*/) { - Y_UNUSED(resourceUsage); - if (CurrentPortion == LastPortion) { - Y_VERIFY(CurrentBlob == Blobs.size()); - return EStatus::Finished; - } - +IBlobConstructor::EStatus TCompactedBlobsConstructor::BuildNext() { AccumulatedBlob.clear(); RecordsInBlob.clear(); - if (CurrentPortionRecord == 0) { - PortionUpdates.push_back(GetPortionInfo(CurrentPortion)); - // There could be eviction mix between normal eviction and eviction without data changes - // TODO: better portions to blobs mathching - const bool eviction = IndexChanges.PortionsToEvict.size() > 0; - if (eviction) { - while (CurrentPortion < LastPortion && !IndexChanges.PortionsToEvict[CurrentPortion].second.DataChanges) { - ++CurrentPortion; - PortionUpdates.push_back(GetPortionInfo(CurrentPortion)); - continue; - } - if (CurrentPortion == LastPortion) { - return EStatus::Finished; + if (IsEviction && CurrentPortionRecord == 0) { + // Skip portions without data changes + for (; CurrentPortion < LastPortion; ++CurrentPortion) { + if (IndexChanges.PortionsToEvict[CurrentPortion].second.DataChanges) { + break; } + PortionUpdates.push_back(GetPortionInfo(CurrentPortion)); } } - auto& portionInfo = GetPortionInfo(CurrentPortion); + if (CurrentPortion == LastPortion) { + Y_VERIFY(CurrentBlob == Blobs.size()); + return EStatus::Finished; + } + + const auto& portionInfo = GetPortionInfo(CurrentPortion); + if (CurrentPortionRecord == 0) { + PortionUpdates.push_back(portionInfo); + } NOlap::TPortionInfo& newPortionInfo = PortionUpdates.back(); - const auto& records = portionInfo.Records; + const auto& records = portionInfo.Records; for (; CurrentPortionRecord < records.size(); ++CurrentPortionRecord, ++CurrentBlob) { Y_VERIFY(CurrentBlob < Blobs.size()); const TString& currentBlob = Blobs[CurrentBlob]; @@ -99,11 +100,14 @@ IBlobConstructor::EStatus TCompactedBlobsConstructor::BuildNext(NColumnShard::TU return AccumulatedBlob.empty() ? EStatus::Finished : EStatus::Ok; } -TAutoPtr<IEventBase> TCompactedBlobsConstructor::BuildResult(NKikimrProto::EReplyStatus status, NColumnShard::TBlobBatch&& blobBatch, THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels, const NColumnShard::TUsage& resourceUsage) { +TAutoPtr<IEventBase> TCompactedBlobsConstructor::BuildResult(NKikimrProto::EReplyStatus status, + NColumnShard::TBlobBatch&& blobBatch, + THashSet<ui32>&& yellowMoveChannels, + THashSet<ui32>&& yellowStopChannels) +{ for (ui64 index = 0; index < PortionUpdates.size(); ++index) { const auto& portionInfo = PortionUpdates[index]; - const bool eviction = IndexChanges.PortionsToEvict.size() > 0; - if (eviction) { + if (IsEviction) { Y_VERIFY(index < IndexChanges.PortionsToEvict.size()); WriteIndexEv->IndexChanges->PortionsToEvict[index].first = portionInfo; } else { @@ -112,15 +116,14 @@ TAutoPtr<IEventBase> TCompactedBlobsConstructor::BuildResult(NKikimrProto::ERepl } } - WriteIndexEv->ResourceUsage.Add(resourceUsage); + WriteIndexEv->ResourceUsage.Add(ResourceUsage); WriteIndexEv->SetPutStatus(status, std::move(yellowMoveChannels), std::move(yellowStopChannels)); WriteIndexEv->BlobBatch = std::move(blobBatch); return WriteIndexEv.Release(); } const NOlap::TPortionInfo& TCompactedBlobsConstructor::GetPortionInfo(const ui64 index) const { - bool eviction = IndexChanges.PortionsToEvict.size() > 0; - if (eviction) { + if (IsEviction) { Y_VERIFY(index < IndexChanges.PortionsToEvict.size()); return IndexChanges.PortionsToEvict[index].first; } else { diff --git a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h index aebb3d87a92..4b84584886a 100644 --- a/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/compacted_blob_constructor.h @@ -9,30 +9,36 @@ namespace NKikimr::NOlap { class TCompactedBlobsConstructor : public IBlobConstructor { TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> WriteIndexEv; + NColumnShard::TUsage ResourceUsage; const NOlap::TColumnEngineChanges& IndexChanges; const std::vector<TString>& Blobs; - const bool BlobGrouppingEnabled; const bool CacheData; + const bool IsEviction; TString AccumulatedBlob; std::vector<std::pair<size_t, TString>> RecordsInBlob; + std::vector<NOlap::TPortionInfo> PortionUpdates; ui64 CurrentPortion = 0; ui64 LastPortion = 0; - ui64 CurrentBlob = 0; ui64 CurrentPortionRecord = 0; - TVector<NOlap::TPortionInfo> PortionUpdates; - public: TCompactedBlobsConstructor(TAutoPtr<NColumnShard::TEvPrivate::TEvWriteIndex> writeIndexEv, bool blobGrouppingEnabled); const TString& GetBlob() const override; bool RegisterBlobId(const TUnifiedBlobId& blobId) override; - EStatus BuildNext(NColumnShard::TUsage& resourceUsage, const TAppData& /*appData*/) override; + EStatus BuildNext() override; + + NColumnShard::TUsage& GetResourceUsage() override { + return ResourceUsage; + } - TAutoPtr<IEventBase> BuildResult(NKikimrProto::EReplyStatus status, NColumnShard::TBlobBatch&& blobBatch, THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels, const NColumnShard::TUsage& resourceUsage) override; + TAutoPtr<IEventBase> BuildResult( + NKikimrProto::EReplyStatus status, + NColumnShard::TBlobBatch&& blobBatch, + THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels) override; private: const NOlap::TPortionInfo& GetPortionInfo(const ui64 index) const; diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index cbaf9b21f5a..3a663d4215d 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -15,7 +15,7 @@ const TString& TIndexedBlobConstructor::GetBlob() const { return DataPrepared; } -IBlobConstructor::EStatus TIndexedBlobConstructor::BuildNext(NColumnShard::TUsage& resourceUsage, const TAppData& appData) { +IBlobConstructor::EStatus TIndexedBlobConstructor::BuildNext() { if (!!DataPrepared) { return EStatus::Finished; } @@ -37,7 +37,7 @@ IBlobConstructor::EStatus TIndexedBlobConstructor::BuildNext(NColumnShard::TUsag // Heavy operations inside. We cannot run them in tablet event handler. TString strError; { - NColumnShard::TCpuGuard guard(resourceUsage); + NColumnShard::TCpuGuard guard(ResourceUsage); Batch = SnapshotSchema->PrepareForInsert(data, serializedScheme, strError); } if (!Batch) { @@ -46,7 +46,7 @@ IBlobConstructor::EStatus TIndexedBlobConstructor::BuildNext(NColumnShard::TUsag } { - NColumnShard::TCpuGuard guard(resourceUsage); + NColumnShard::TCpuGuard guard(ResourceUsage); DataPrepared = NArrow::SerializeBatchNoCompression(Batch); } @@ -54,8 +54,8 @@ IBlobConstructor::EStatus TIndexedBlobConstructor::BuildNext(NColumnShard::TUsag AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_data_too_big")("write_id", writeId)("path_id", pathId); return EStatus::Error; } - - ui64 dirtyTime = appData.TimeProvider->Now().Seconds(); + + ui64 dirtyTime = TAppData::TimeProvider->Now().Seconds(); Y_VERIFY(dirtyTime); NKikimrTxColumnShard::TLogicalMetadata outMeta; outMeta.SetNumRows(Batch->num_rows()); @@ -78,14 +78,18 @@ bool TIndexedBlobConstructor::RegisterBlobId(const TUnifiedBlobId& blobId) { return true; } -TAutoPtr<IEventBase> TIndexedBlobConstructor::BuildResult(NKikimrProto::EReplyStatus status, NColumnShard::TBlobBatch&& blobBatch, THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels, const NColumnShard::TUsage& resourceUsage) { +TAutoPtr<IEventBase> TIndexedBlobConstructor::BuildResult(NKikimrProto::EReplyStatus status, + NColumnShard::TBlobBatch&& blobBatch, + THashSet<ui32>&& yellowMoveChannels, + THashSet<ui32>&& yellowStopChannels) +{ WriteEv->WrittenBatch = Batch; auto& record = Proto(WriteEv.Get()); record.SetData(DataPrepared); // modify for TxWrite record.MutableMeta()->SetLogicalMeta(MetaString); - WriteEv->ResourceUsage.Add(resourceUsage); + WriteEv->ResourceUsage.Add(ResourceUsage); WriteEv->SetPutStatus(status, std::move(yellowMoveChannels), std::move(yellowStopChannels)); WriteEv->BlobBatch = std::move(blobBatch); return WriteEv.Release(); diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h index 7ff9e385ab9..bba47550ada 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h @@ -10,6 +10,7 @@ namespace NKikimr::NOlap { class TIndexedBlobConstructor : public IBlobConstructor { TAutoPtr<TEvColumnShard::TEvWrite> WriteEv; NOlap::ISnapshotSchema::TPtr SnapshotSchema; + NColumnShard::TUsage ResourceUsage; TString DataPrepared; TString MetaString; @@ -20,10 +21,17 @@ public: TIndexedBlobConstructor(TAutoPtr<TEvColumnShard::TEvWrite> writeEv, NOlap::ISnapshotSchema::TPtr snapshotSchema); const TString& GetBlob() const override; - EStatus BuildNext(NColumnShard::TUsage& resourceUsage, const TAppData& appData) override; + EStatus BuildNext() override; bool RegisterBlobId(const TUnifiedBlobId& blobId) override; - TAutoPtr<NActors::IEventBase> BuildResult(NKikimrProto::EReplyStatus status, NColumnShard::TBlobBatch&& blobBatch, THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels, const NColumnShard::TUsage& resourceUsage) override; + NColumnShard::TUsage& GetResourceUsage() override { + return ResourceUsage; + } + + TAutoPtr<NActors::IEventBase> BuildResult( + NKikimrProto::EReplyStatus status, + NColumnShard::TBlobBatch&& blobBatch, + THashSet<ui32>&& yellowMoveChannels, THashSet<ui32>&& yellowStopChannels) override; }; } diff --git a/ydb/core/tx/columnshard/write_actor.cpp b/ydb/core/tx/columnshard/write_actor.cpp index aaba96fe98b..7b85892a59b 100644 --- a/ydb/core/tx/columnshard/write_actor.cpp +++ b/ydb/core/tx/columnshard/write_actor.cpp @@ -14,11 +14,10 @@ namespace { class TWriteActor : public TActorBootstrapped<TWriteActor> { ui64 TabletId; TActorId DstActor; - TUsage ResourceUsage; TBlobBatch BlobBatch; NOlap::IBlobConstructor::TPtr BlobsConstructor; - + THashSet<ui32> YellowMoveChannels; THashSet<ui32> YellowStopChannels; TInstant Deadline; @@ -37,7 +36,7 @@ public: , Deadline(deadline) , MaxSmallBlobSize(maxSmallBlobSize) {} - + void Handle(TEvBlobStorage::TEvPutResult::TPtr& ev, const TActorContext& ctx) { TEvBlobStorage::TEvPutResult* msg = ev->Get(); auto status = msg->Status; @@ -76,10 +75,9 @@ public: putStatus = NKikimrProto::TIMEOUT; } } - auto ev = BlobsConstructor->BuildResult(putStatus, std::move(BlobBatch), + auto ev = BlobsConstructor->BuildResult(putStatus, std::move(BlobBatch), std::move(YellowMoveChannels), - std::move(YellowStopChannels), - ResourceUsage); + std::move(YellowStopChannels)); ctx.Send(DstActor, ev.Release()); Die(ctx); } @@ -94,20 +92,16 @@ public: const TDuration timeout = Deadline - now; ctx.Schedule(timeout, new TEvents::TEvWakeup()); } - + auto status = NOlap::IBlobConstructor::EStatus::Finished; - while (true) { - status = BlobsConstructor->BuildNext(ResourceUsage, *AppData(ctx)); - if (status != NOlap::IBlobConstructor::EStatus::Ok) { - break; - } + while (BlobsConstructor->BuildNext() == NOlap::IBlobConstructor::EStatus::Ok) { auto blobId = SendWriteBlobRequest(BlobsConstructor->GetBlob(), ctx); BlobsConstructor->RegisterBlobId(blobId); - } if (status != NOlap::IBlobConstructor::EStatus::Finished) { return SendResultAndDie(ctx, NKikimrProto::ERROR); } + if (BlobBatch.AllBlobWritesCompleted()) { return SendResultAndDie(ctx, NKikimrProto::OK); } @@ -122,10 +116,10 @@ public: break; } } - + private: TUnifiedBlobId SendWriteBlobRequest(const TString& data, const TActorContext& ctx) { - ResourceUsage.Network += data.size(); + BlobsConstructor->GetResourceUsage().Network += data.size(); if (MaxSmallBlobSize && data.size() <= *MaxSmallBlobSize) { TUnifiedBlobId smallBlobId = BlobBatch.AddSmallBlob(data); Y_VERIFY(smallBlobId.IsSmallBlob()); |