diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-30 17:48:32 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-30 17:48:32 +0300 |
commit | 6b6fa4bd8c71c7b69c40f2a9c7cee8b77387d4e8 (patch) | |
tree | 3a54ee4ba82033d0e3fcf253b05dab04c09d19a2 | |
parent | ebd5d895f93c2ae9bfaf21f84fb0a9e623f4b104 (diff) | |
download | ydb-6b6fa4bd8c71c7b69c40f2a9c7cee8b77387d4e8.tar.gz |
incapsulate zero granule fetching logic into reader
7 files changed, 85 insertions, 133 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index fd7e8d47815..ccbb1217c75 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -9,27 +9,7 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP , ReadMetadata(readMetadata) , IndexedData(ReadMetadata, false, context) { - ui32 batchNo = 0; - for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i, ++batchNo) { - const auto& cmtBlob = ReadMetadata->CommittedBlobs[i]; - WaitCommitted.emplace(cmtBlob, batchNo); - } - IndexedData.InitRead(batchNo); - // Add cached batches without read - for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) { - auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId)); - Y_VERIFY(!cmt.empty()); - - const NOlap::TCommittedBlob& cmtBlob = cmt.key(); - ui32 batchNo = cmt.mapped(); - IndexedData.AddNotIndexed(batchNo, batch, cmtBlob); - } - // Read all remained committed blobs - for (const auto& [cmtBlob, _] : WaitCommitted) { - auto& blobId = cmtBlob.GetBlobId(); - IndexedData.AddBlobToFetchInFront(0, TBlobRange(blobId, 0, blobId.BlobSize())); - } - + IndexedData.InitRead(); Y_VERIFY(ReadMetadata->IsSorted()); if (ReadMetadata->Empty()) { @@ -38,16 +18,7 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP } void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data) { - const auto& blobId = blobRange.BlobId; - if (IndexedData.IsIndexedBlob(blobRange)) { - IndexedData.AddIndexed(blobRange, data); - } else { - auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId)); - Y_VERIFY(!cmt.empty()); - const NOlap::TCommittedBlob& cmtBlob = cmt.key(); - ui32 batchNo = cmt.mapped(); - IndexedData.AddNotIndexed(batchNo, data, cmtBlob); - } + IndexedData.AddData(blobRange, data); } NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() { @@ -87,11 +58,10 @@ void TColumnShardScanIterator::FillReadyResults() { } if (limitLeft == 0) { - WaitCommitted.clear(); IndexedData.Abort(); } - if (WaitCommitted.empty() && IndexedData.IsFinished()) { + if (IndexedData.IsFinished()) { Context.MutableProcessor().Stop(); } } diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index c41ecc88507..4484ee85744 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -44,6 +44,12 @@ public: return ReadyResults.size(); } + virtual TString DebugString() const override { + return TStringBuilder() + << "indexed_data:(" << IndexedData.DebugString() << ")" + ; + } + virtual void Apply(IDataTasksProcessor::ITask::TPtr task) override; virtual bool HasWaitingTasks() const override; diff --git a/ydb/core/tx/columnshard/counters/common/object_counter.h b/ydb/core/tx/columnshard/counters/common/object_counter.h index b3cea6c284d..2cef57b7a11 100644 --- a/ydb/core/tx/columnshard/counters/common/object_counter.h +++ b/ydb/core/tx/columnshard/counters/common/object_counter.h @@ -44,7 +44,7 @@ public: } Counter.Inc(); if (UseLogs) { - AFL_TRACE(NKikimrServices::OBJECTS_MONITORING)("event", "create")("object_type", TypeName<TObject>())("count", Counter.Val()); + ACFL_TRACE(NKikimrServices::OBJECTS_MONITORING)("event", "create")("object_type", TypeName<TObject>())("count", Counter.Val()); } } ~TMonitoringObjectsCounter() { @@ -53,7 +53,7 @@ public: } Counter.Dec(); if (UseLogs) { - AFL_TRACE(NKikimrServices::OBJECTS_MONITORING)("event", "destroy")("object_type", TypeName<TObject>())("count", Counter.Val()); + ACFL_TRACE(NKikimrServices::OBJECTS_MONITORING)("event", "destroy")("object_type", TypeName<TObject>())("count", Counter.Val()); } } }; diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 9ebde609884..ab1f584a0e3 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -104,7 +104,6 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v } void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) { - Y_VERIFY(IndexedBlobs.emplace(range).second); Y_VERIFY(IndexedBlobSubscriber.emplace(range, &batch).second); if (batch.GetFetchedInfo().GetFilter()) { Context.GetCounters().PostFilterBytes->Add(range.Size); @@ -117,15 +116,28 @@ void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader:: } } -void TIndexedReadData::InitRead(ui32 inputBatch) { +void TIndexedReadData::RegisterZeroGranula() { + for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i) { + const auto& cmtBlob = ReadMetadata->CommittedBlobs[i]; + WaitCommitted.emplace(cmtBlob.GetBlobId(), cmtBlob); + } + for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) { + AddNotIndexed(blobId, batch); + } + for (const auto& [blobId, _] : WaitCommitted) { + AddBlobToFetchInFront(0, TBlobRange(blobId, 0, blobId.BlobSize())); + } +} + +void TIndexedReadData::InitRead() { + RegisterZeroGranula(); + auto& indexInfo = ReadMetadata->GetIndexInfo(); Y_VERIFY(indexInfo.GetSortingKey()); Y_VERIFY(indexInfo.GetIndexKey() && indexInfo.GetIndexKey()->num_fields()); SortReplaceDescription = indexInfo.SortReplaceDescription(); - NotIndexed.resize(inputBatch); - Y_VERIFY(!GranulesContext); GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode); ui64 portionsBytes = 0; @@ -161,10 +173,7 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& da NIndexedReader::TBatch* portionBatch = nullptr; { auto it = IndexedBlobSubscriber.find(blobRange); - Y_VERIFY_DEBUG(it != IndexedBlobSubscriber.end()); - if (it == IndexedBlobSubscriber.end()) { - return; - } + Y_VERIFY(it != IndexedBlobSubscriber.end()); portionBatch = it->second; IndexedBlobSubscriber.erase(it); } @@ -203,7 +212,7 @@ std::vector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t Y_VERIFY(SortReplaceDescription); auto& indexInfo = ReadMetadata->GetIndexInfo(); - if (NotIndexed.size() != ReadyNotIndexed) { + if (WaitCommitted.size()) { // Wait till we have all not indexed data so we could replace keys in granules return {}; } @@ -235,7 +244,6 @@ std::vector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t GranulesContext->DrainNotIndexedBatches(nullptr); } NotIndexed.clear(); - ReadyNotIndexed = 0; } else { GranulesContext->DrainNotIndexedBatches(nullptr); } @@ -480,4 +488,19 @@ NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob() { } } +void TIndexedReadData::AddNotIndexed(const TBlobRange& blobRange, const TString& column) { + auto it = WaitCommitted.find(blobRange.BlobId); + Y_VERIFY(it != WaitCommitted.end()); + auto batch = NArrow::DeserializeBatch(column, ReadMetadata->GetBlobSchema(it->second.GetSchemaSnapshot())); + NotIndexed.emplace_back(MakeNotIndexedBatch(batch, it->second.GetSchemaSnapshot())); + WaitCommitted.erase(it); +} + +void TIndexedReadData::AddNotIndexed(const TUnifiedBlobId& blobId, const std::shared_ptr<arrow::RecordBatch>& batch) { + auto it = WaitCommitted.find(blobId); + Y_VERIFY(it != WaitCommitted.end()); + NotIndexed.emplace_back(MakeNotIndexedBatch(batch, it->second.GetSchemaSnapshot())); + WaitCommitted.erase(it); +} + } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 527b7d8a0cf..3638ebd0d7f 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -21,6 +21,7 @@ class TIndexedReadData { private: TReadContext Context; std::unique_ptr<NIndexedReader::TGranulesFillingContext> GranulesContext; + THashMap<TUnifiedBlobId, NOlap::TCommittedBlob> WaitCommitted; TFetchBlobsQueue FetchBlobsQueue; TFetchBlobsQueue PriorityBlobsQueue; @@ -28,15 +29,29 @@ private: bool OnePhaseReadMode = false; std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed; - THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; // blobId -> batch - THashSet<TBlobRange> IndexedBlobs; - ui32 ReadyNotIndexed{ 0 }; - std::shared_ptr<arrow::RecordBatch> NotIndexedOutscopeBatch; // outscope granules batch + THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; + std::shared_ptr<arrow::RecordBatch> NotIndexedOutscopeBatch; std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription; + void AddNotIndexed(const TBlobRange& blobRange, const TString& column); + void AddNotIndexed(const TUnifiedBlobId& blobId, const std::shared_ptr<arrow::RecordBatch>& batch); + void RegisterZeroGranula(); + + void AddIndexed(const TBlobRange& blobRange, const TString& column); + bool IsIndexedBlob(const TBlobRange& blobRange) const { + return IndexedBlobSubscriber.contains(blobRange); + } public: TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, const bool internalRead, const TReadContext& context); + TString DebugString() const { + return TStringBuilder() + << "internal:" << OnePhaseReadMode << ";" + << "wait_committed:" << WaitCommitted.size() << ";" + << "granules_context:(" << (GranulesContext ? GranulesContext->DebugString() : "NO") << ");" + ; + } + const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept { return Context.GetCounters(); } @@ -50,30 +65,21 @@ public: return *GranulesContext; } - /// Initial FetchBlobsQueue filling (queue from external scan iterator). Granules could be read independently - void InitRead(ui32 numNotIndexed); + void InitRead(); void Abort(); bool IsFinished() const; /// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK) std::vector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch); - void AddNotIndexed(ui32 batchNo, TString blob, const TCommittedBlob& commitedBlob) { - auto batch = NArrow::DeserializeBatch(blob, ReadMetadata->GetBlobSchema(commitedBlob.GetSchemaSnapshot())); - AddNotIndexed(batchNo, batch, commitedBlob); - } - - void AddNotIndexed(ui32 batchNo, const std::shared_ptr<arrow::RecordBatch>& batch, const TCommittedBlob& commitedBlob) { - Y_VERIFY(batchNo < NotIndexed.size()); - Y_VERIFY(!NotIndexed[batchNo]); - ++ReadyNotIndexed; - NotIndexed[batchNo] = MakeNotIndexedBatch(batch, commitedBlob.GetSchemaSnapshot()); + void AddData(const TBlobRange& blobRange, const TString& data) { + if (IsIndexedBlob(blobRange)) { + AddIndexed(blobRange, data); + } else { + AddNotIndexed(blobRange, data); + } } - void AddIndexed(const TBlobRange& blobRange, const TString& column); - bool IsIndexedBlob(const TBlobRange& blobRange) const { - return IndexedBlobs.contains(blobRange); - } NOlap::TReadMetadata::TConstPtr GetReadMetadata() const { return ReadMetadata; } diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index 571f88f1698..64f5f60fed6 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -37,6 +37,12 @@ public: bool TryStartProcessGranule(const ui64 granuleId, const TBlobRange & range); TGranulesFillingContext(TReadMetadata::TConstPtr readMetadata, TIndexedReadData & owner, const bool internalReading); + TString DebugString() const { + return TStringBuilder() + << "sorting_policy:(" << SortingPolicy->DebugString() << ");" + ; + } + void OnBlobReady(const ui64 /*granuleId*/, const TBlobRange& /*range*/) noexcept { } diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 6d54d0618d2..5c90910b8b4 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -14,7 +14,7 @@ private: size_t next = 1; for (auto it = ready.begin(); it != ready.end(); ++it, ++next) { - bool lastOne = Finished() && (next == ready.size()); + const bool lastOne = IndexedData.IsFinished() && (next == ready.size()); SendResult(ctx, it->ResultBatch, lastOne); } } @@ -58,25 +58,7 @@ public: Y_VERIFY(event.Data.size() == event.BlobRange.Size, "%zu, %d", event.Data.size(), event.BlobRange.Size); - if (IndexedBlobs.contains(event.BlobRange)) { - if (!WaitIndexed.contains(event.BlobRange)) { - return; // ignore duplicate parts - } - WaitIndexed.erase(event.BlobRange); - IndexedData.AddIndexed(event.BlobRange, event.Data); - } else if (CommittedBlobs.contains(blobId)) { - auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId)); - if (cmt.empty()) { - return; // ignore duplicates - } - const NOlap::TCommittedBlob& cmtBlob = cmt.key(); - ui32 batchNo = cmt.mapped(); - IndexedData.AddNotIndexed(batchNo, event.Data, cmtBlob); - } else { - LOG_S_ERROR("TEvReadBlobRangeResult returned unexpected blob at tablet " - << TabletId << " (read)"); - return; - } + IndexedData.AddData(event.BlobRange, event.Data); BuildResult(ctx); DieFinished(ctx); @@ -93,9 +75,7 @@ public: void SendErrorResult(const TActorContext& ctx, NKikimrTxColumnShard::EResultStatus status) { Y_VERIFY(status != NKikimrTxColumnShard::EResultStatus::SUCCESS); SendResult(ctx, {}, true, status); - - WaitIndexed.clear(); - WaitCommitted.clear(); + IndexedData.Abort(); } void SendResult(const TActorContext& ctx, const std::shared_ptr<arrow::RecordBatch>& batch, bool finished = false, @@ -151,12 +131,8 @@ public: ctx.Send(DstActor, chunkEvent.release()); } - bool Finished() const { - return WaitCommitted.empty() && WaitIndexed.empty(); - } - void DieFinished(const TActorContext& ctx) { - if (Finished()) { + if (IndexedData.IsFinished()) { LOG_S_DEBUG("Finished read (with " << ReturnedBatchNo << " batches sent) at tablet " << TabletId); Send(ColumnShardActorId, new TEvPrivate::TEvReadFinished(RequestCookie)); Die(ctx); @@ -164,34 +140,9 @@ public: } void Bootstrap(const TActorContext& ctx) { - ui32 notIndexed = 0; - for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i, ++notIndexed) { - const auto& cmtBlob = ReadMetadata->CommittedBlobs[i]; - - CommittedBlobs.emplace(cmtBlob.GetBlobId()); - WaitCommitted.emplace(cmtBlob, notIndexed); - } - - IndexedData.InitRead(notIndexed); - while (IndexedData.HasMoreBlobs()) { - const auto blobRange = IndexedData.ExtractNextBlob(); - WaitIndexed.insert(blobRange); - IndexedBlobs.emplace(blobRange); - } - - // Add cached batches without read - for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) { - auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob::BuildKeyBlob(blobId)); - Y_VERIFY(!cmt.empty()); + IndexedData.InitRead(); - const NOlap::TCommittedBlob& cmtBlob = cmt.key(); - ui32 batchNo = cmt.mapped(); - IndexedData.AddNotIndexed(batchNo, batch, cmtBlob); - } - - LOG_S_DEBUG("Starting read (" << WaitIndexed.size() << " indexed, " - << ReadMetadata->CommittedBlobs.size() << " committed, " - << WaitCommitted.size() << " not cached committed) at tablet " << TabletId); + LOG_S_DEBUG("Starting read (" << IndexedData.DebugString() << ") at tablet " << TabletId); bool earlyExit = false; if (Deadline != TInstant::Max()) { @@ -208,17 +159,11 @@ public: SendTimeouts(ctx); ctx.Send(SelfId(), new TEvents::TEvPoisonPill()); } else { - // TODO: Keep inflight - for (auto& [cmtBlob, batchNo] : WaitCommitted) { - auto& blobId = cmtBlob.GetBlobId(); - SendReadRequest(ctx, NBlobCache::TBlobRange(blobId, 0, blobId.BlobSize())); - } - for (auto&& blobRange : IndexedBlobs) { + while (IndexedData.HasMoreBlobs()) { + const auto blobRange = IndexedData.ExtractNextBlob(); SendReadRequest(ctx, blobRange); } - if (WaitCommitted.empty() && IndexedBlobs.empty()) { - BuildResult(ctx); - } + BuildResult(ctx); } Become(&TThis::StateWait); @@ -262,10 +207,6 @@ private: TInstant Deadline; TActorId ColumnShardActorId; const ui64 RequestCookie; - THashSet<NBlobCache::TBlobRange> IndexedBlobs; - THashSet<TUnifiedBlobId> CommittedBlobs; - THashSet<NBlobCache::TBlobRange> WaitIndexed; - std::unordered_map<NOlap::TCommittedBlob, ui32, THash<NOlap::TCommittedBlob>> WaitCommitted; ui32 ReturnedBatchNo; mutable TString SerializedSchema; |