diff options
author | ivanmorozov <[email protected]> | 2023-09-01 14:08:07 +0300 |
---|---|---|
committer | ivanmorozov <[email protected]> | 2023-09-01 14:31:36 +0300 |
commit | c6ec06e7c77e4029e41a21fa12f2e38d60eece59 (patch) | |
tree | 2f91624b5fbd01a3c0ec30c3de30fdfae0e3a415 | |
parent | bf418335042882be6345a85cce0fa92e8c95537a (diff) |
KIKIMR-19213: reading through interface
41 files changed, 676 insertions, 368 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index c73a20a2332..6ff5f6e3b38 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -8,31 +8,30 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP : Context(context) , ReadyResults(context.GetCounters()) , ReadMetadata(readMetadata) - , IndexedData(ReadMetadata, false, context) { - IndexedData.InitRead(); + IndexedData = ReadMetadata->BuildReader(context, ReadMetadata); Y_VERIFY(ReadMetadata->IsSorted()); if (ReadMetadata->Empty()) { - IndexedData.Abort(); + IndexedData->Abort(); } } void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data) { - IndexedData.AddData(blobRange, data); + IndexedData->AddData(blobRange, data); } -NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() { +NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() { FillReadyResults(); return ReadyResults.pop_front(); } -NKikimr::NColumnShard::TBlobRange TColumnShardScanIterator::GetNextBlobToRead() { - return IndexedData.ExtractNextBlob(ReadyResults.size()); +std::optional<NBlobCache::TBlobRange> TColumnShardScanIterator::GetNextBlobToRead() { + return IndexedData->ExtractNextBlob(ReadyResults.size()); } void TColumnShardScanIterator::FillReadyResults() { - auto ready = IndexedData.GetReadyResults(MaxRowsInBatch); + auto ready = IndexedData->ExtractReadyResults(MaxRowsInBatch); i64 limitLeft = ReadMetadata->Limit == 0 ? INT64_MAX : ReadMetadata->Limit - ItemsRead; for (size_t i = 0; i < ready.size() && limitLeft; ++i) { if (ready[i].GetResultBatch()->num_rows() == 0 && !ready[i].GetLastReadKey()) { @@ -49,10 +48,10 @@ void TColumnShardScanIterator::FillReadyResults() { } if (limitLeft == 0) { - IndexedData.Abort(); + IndexedData->Abort(); } - if (IndexedData.IsFinished()) { + if (IndexedData->IsFinished()) { Context.MutableProcessor().Stop(); } } @@ -62,15 +61,15 @@ bool TColumnShardScanIterator::HasWaitingTasks() const { } TColumnShardScanIterator::~TColumnShardScanIterator() { - IndexedData.Abort(); + IndexedData->Abort(); ReadMetadata->ReadStats->PrintToLog(); } void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) { - if (!task->IsDataProcessed() || Context.GetProcessor().IsStopped() || !task->IsSameProcessor(Context.GetProcessor())) { + if (!task->IsDataProcessed() || Context.GetProcessor().IsStopped() || !task->IsSameProcessor(Context.GetProcessor()) || IndexedData->IsFinished()) { return; } - Y_VERIFY(task->Apply(IndexedData.GetGranulesContext())); + Y_VERIFY(task->Apply(*IndexedData)); } } diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index 88672e6cb25..6d8a77a2af6 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -76,7 +76,7 @@ private: NOlap::TReadContext Context; TReadyResults ReadyResults; NOlap::TReadMetadata::TConstPtr ReadMetadata; - NOlap::TIndexedReadData IndexedData; + std::shared_ptr<NOlap::IDataReader> IndexedData; ui64 ItemsRead = 0; const i64 MaxRowsInBatch = 5000; public: @@ -90,8 +90,7 @@ public: virtual TString DebugString() const override { return TStringBuilder() << "ready_results:(" << ReadyResults.DebugString() << ");" - << "has_buffer:" << IndexedData.GetMemoryAccessor()->HasBuffer() << ";" - << "indexed_data:(" << IndexedData.DebugString() << ")" + << "indexed_data:(" << IndexedData->DebugString() << ")" ; } @@ -102,12 +101,12 @@ public: void AddData(const TBlobRange& blobRange, TString data) override; bool Finished() const override { - return IndexedData.IsFinished() && ReadyResults.empty(); + return IndexedData->IsFinished() && ReadyResults.empty(); } NOlap::TPartialReadResult GetBatch() override; - TBlobRange GetNextBlobToRead() override; + std::optional<NBlobCache::TBlobRange> GetNextBlobToRead() override; private: void FillReadyResults(); diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 4d7a80c953a..f7e67460f83 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -109,7 +109,7 @@ public: Y_VERIFY(!ScanIterator); MemoryAccessor = std::make_shared<NOlap::TActorBasedMemoryAccesor>(SelfId(), "CSScan/Result"); - NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool, MemoryAccessor); + NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool, MemoryAccessor, false); ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); // propagate self actor id // TODO: FlagSubscribeOnSession ? @@ -149,12 +149,13 @@ private: THashMap<TUnifiedBlobId, std::vector<NBlobCache::TBlobRange>> ranges; while (InFlightGuard.CanTake()) { auto blobRange = ScanIterator->GetNextBlobToRead(); - if (!blobRange.BlobId.IsValid()) { + if (!blobRange) { break; } - InFlightGuard.Take(blobRange.Size); + Y_VERIFY(blobRange->BlobId.IsValid()); + InFlightGuard.Take(blobRange->Size); ++InFlightReads; - ranges[blobRange.BlobId].emplace_back(blobRange); + ranges[blobRange->BlobId].emplace_back(*blobRange); } if (!InFlightGuard.CanTake()) { ScanCountersPool.OnReadingOverloaded(); @@ -191,7 +192,9 @@ private: ACFL_DEBUG("event", "TEvTaskProcessedResult"); auto t = static_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult()); Y_VERIFY_DEBUG(dynamic_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult())); - ScanIterator->Apply(t); + if (!ScanIterator->Finished()) { + ScanIterator->Apply(t); + } if (!ScanIterator->HasWaitingTasks() && !NoTasksStartInstant) { NoTasksStartInstant = Now(); } @@ -372,7 +375,7 @@ private: // The loop has finished without any progress! LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " is hanging" - << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); + << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId << " debug: " << ScanIterator->DebugString()); Y_VERIFY_DEBUG(false); } @@ -445,7 +448,7 @@ private: return Finish(); } - NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool, MemoryAccessor); + NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool, MemoryAccessor, false); ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); // Used in TArrowToYdbConverter ResultYqlSchema.clear(); diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index f7a1133d18e..464f5b0fccd 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -20,11 +20,73 @@ private: std::shared_ptr<arrow::RecordBatch> LastReadKey; public: + ui64 GetRecordsCount() const { + return ResultBatch ? ResultBatch->num_rows() : 0; + } + + void InitDirection(const bool reverse) { + if (reverse && ResultBatch && ResultBatch->num_rows()) { + auto permutation = NArrow::MakePermutation(ResultBatch->num_rows(), true); + ResultBatch = NArrow::TStatusValidator::GetValid(arrow::compute::Take(ResultBatch, permutation)).record_batch(); + } + } + + void StripColumns(const std::shared_ptr<arrow::Schema>& schema) { + if (ResultBatch) { + ResultBatch = NArrow::ExtractColumns(ResultBatch, schema); + } + } + + void BuildLastKey(const std::shared_ptr<arrow::Schema>& schema) { + Y_VERIFY(!LastReadKey); + if (ResultBatch && ResultBatch->num_rows()) { + auto keyColumns = NArrow::ExtractColumns(ResultBatch, schema); + Y_VERIFY(keyColumns); + LastReadKey = keyColumns->Slice(keyColumns->num_rows() - 1); + } + } + + static std::vector<TPartialReadResult> SplitResults(const std::vector<TPartialReadResult>& resultsExt, const ui32 maxRecordsInResult) { + std::vector<TPartialReadResult> result; + std::shared_ptr<arrow::RecordBatch> currentBatch; + for (auto&& i : resultsExt) { + std::shared_ptr<arrow::RecordBatch> currentBatchSplitting = i.ResultBatch; + while (currentBatchSplitting && currentBatchSplitting->num_rows()) { + const ui32 currentRecordsCount = currentBatch ? currentBatch->num_rows() : 0; + if (currentRecordsCount + currentBatchSplitting->num_rows() < maxRecordsInResult) { + if (!currentBatch) { + currentBatch = currentBatchSplitting; + } else { + currentBatch = NArrow::CombineBatches({currentBatch, currentBatchSplitting}); + } + currentBatchSplitting = nullptr; + } else { + auto currentSlice = currentBatchSplitting->Slice(0, maxRecordsInResult - currentRecordsCount); + if (!currentBatch) { + currentBatch = currentSlice; + } else { + currentBatch = NArrow::CombineBatches({currentBatch, currentSlice}); + } + result.emplace_back(TPartialReadResult(nullptr, currentBatch)); + currentBatch = nullptr; + currentBatchSplitting = currentBatchSplitting->Slice(maxRecordsInResult - currentRecordsCount); + } + } + } + if (currentBatch && currentBatch->num_rows()) { + result.emplace_back(TPartialReadResult(nullptr, currentBatch)); + } + return result; + } + void Slice(const ui32 offset, const ui32 length) { ResultBatch = ResultBatch->Slice(offset, length); } void ApplyProgram(const NOlap::TProgramContainer& program) { + if (!program.HasProgram()) { + return; + } Y_VERIFY(!MemoryGuardInternal); auto status = program.ApplyProgram(ResultBatch); if (!status.ok()) { @@ -82,7 +144,7 @@ public: virtual bool HasWaitingTasks() const = 0; virtual bool Finished() const = 0; virtual NOlap::TPartialReadResult GetBatch() = 0; - virtual NBlobCache::TBlobRange GetNextBlobToRead() { return NBlobCache::TBlobRange(); } + virtual std::optional<NBlobCache::TBlobRange> GetNextBlobToRead() { return {}; } virtual TString DebugString() const { return "NO_DATA"; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index dfed51f0269..d9e36b4838a 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -859,8 +859,8 @@ void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMeta } THashSet<TUnifiedBlobId> uniqBlobs; - for (auto& portion : metadata.SelectInfo->Portions) { - for (auto& rec : portion.Records) { + for (auto& portion : metadata.SelectInfo->PortionsOrderedPK) { + for (auto& rec : portion->Records) { uniqBlobs.insert(rec.BlobRange.BlobId); } } diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 027c9c5bbc2..3fcff293c1c 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -38,20 +38,20 @@ struct TSelectInfo { }; std::vector<TGranuleRecord> Granules; // ordered by key (ascending) - std::vector<TPortionInfo> Portions; + std::vector<std::shared_ptr<TPortionInfo>> PortionsOrderedPK; - NColumnShard::TContainerAccessorWithDirection<std::vector<TPortionInfo>> GetPortionsOrdered(const bool reverse) const { - return NColumnShard::TContainerAccessorWithDirection<std::vector<TPortionInfo>>(Portions, reverse); + NColumnShard::TContainerAccessorWithDirection<std::vector<std::shared_ptr<TPortionInfo>>> GetPortionsOrdered(const bool reverse) const { + return NColumnShard::TContainerAccessorWithDirection<std::vector<std::shared_ptr<TPortionInfo>>>(PortionsOrderedPK, reverse); } NColumnShard::TContainerAccessorWithDirection<std::vector<TGranuleRecord>> GetGranulesOrdered(const bool reverse) const { return NColumnShard::TContainerAccessorWithDirection<std::vector<TGranuleRecord>>(Granules, reverse); } - size_t NumRecords() const { + size_t NumChunks() const { size_t records = 0; - for (auto& portionInfo : Portions) { - records += portionInfo.NumRecords(); + for (auto& portionInfo : PortionsOrderedPK) { + records += portionInfo->NumChunks(); } return records; } @@ -59,13 +59,13 @@ struct TSelectInfo { TStats Stats() const { TStats out; out.Granules = Granules.size(); - out.Portions = Portions.size(); + out.Portions = PortionsOrderedPK.size(); THashSet<TUnifiedBlobId> uniqBlob; - for (auto& portionInfo : Portions) { - out.Records += portionInfo.NumRecords(); - out.Rows += portionInfo.NumRows(); - for (auto& rec : portionInfo.Records) { + for (auto& portionInfo : PortionsOrderedPK) { + out.Records += portionInfo->NumChunks(); + out.Rows += portionInfo->NumRows(); + for (auto& rec : portionInfo->Records) { uniqBlob.insert(rec.BlobRange.BlobId); } } @@ -84,10 +84,10 @@ struct TSelectInfo { } out << "; "; } - if (info.Portions.size()) { + if (info.PortionsOrderedPK.size()) { out << "portions:"; - for (auto& portionInfo : info.Portions) { - out << portionInfo; + for (auto& portionInfo : info.PortionsOrderedPK) { + out << portionInfo->DebugString(); } } return out; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 70375672d3f..3451c1cbc3c 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -339,7 +339,7 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup( auto spg = Granules[granule]; Y_VERIFY(spg); for (auto& [portion, info] : spg->GetPortions()) { - affectedRecords += info->NumRecords(); + affectedRecords += info->NumChunks(); changes->PortionsToDrop.push_back(*info); dropPortions.insert(portion); } @@ -380,7 +380,7 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup( if (!portionInfo) { it = CleanupPortions.erase(it); } else if (portionInfo->CheckForCleanup(snapshot)) { - affectedRecords += portionInfo->NumRecords(); + affectedRecords += portionInfo->NumChunks(); changes->PortionsToDrop.push_back(*portionInfo); it = CleanupPortions.erase(it); if (affectedRecords > maxRecords) { @@ -484,7 +484,7 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering } } if (!keep && context.AllowDrop) { - dropBlobs += info->NumRecords(); + dropBlobs += info->NumBlobs(); context.Changes->PortionsToDrop.push_back(*info); SignalCounters.OnPortionToDrop(info->BlobsBytes()); } @@ -658,31 +658,8 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool up } } -static TMap<TSnapshot, std::vector<std::shared_ptr<TPortionInfo>>> GroupPortionsBySnapshot(const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portions, const TSnapshot& snapshot) { - TMap<TSnapshot, std::vector<std::shared_ptr<TPortionInfo>>> out; - for (const auto& [portion, portionInfo] : portions) { - if (portionInfo->Empty()) { - continue; - } - - TSnapshot recSnapshot = portionInfo->GetMinSnapshot(); - TSnapshot recXSnapshot = portionInfo->GetRemoveSnapshot(); - - bool visible = (recSnapshot <= snapshot); - if (recXSnapshot.GetPlanStep()) { - visible = visible && snapshot < recXSnapshot; - } - - if (visible) { - out[recSnapshot].push_back(portionInfo); - } - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "GroupPortionsBySnapshot")("analyze_portion", portionInfo->DebugString())("visible", visible)("snapshot", snapshot.DebugString()); - } - return out; -} - std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot snapshot, - const THashSet<ui32>& columnIds, + const THashSet<ui32>& /*columnIds*/, const TPKRangesFilter& pkRangesFilter) const { auto out = std::make_shared<TSelectInfo>(); @@ -695,7 +672,6 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot return out; } out->Granules.reserve(pathGranules.size()); - // TODO: out.Portions.reserve() std::optional<TMarksMap::const_iterator> previousIterator; const bool compositeMark = UseCompositeMarks(); @@ -737,25 +713,21 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot Y_VERIFY(it != Granules.end()); auto& spg = it->second; Y_VERIFY(spg); - auto& portions = spg->GetPortions(); bool granuleHasDataForSnaphsot = false; - TMap<TSnapshot, std::vector<std::shared_ptr<TPortionInfo>>> orderedPortions = GroupPortionsBySnapshot(portions, snapshot); - for (auto& [snap, vec] : orderedPortions) { - for (const auto& portionInfo : vec) { - TPortionInfo outPortion = portionInfo->CopyWithFilteredColumns(columnIds); - Y_VERIFY(outPortion.Produced()); - if (!pkRangesFilter.IsPortionInUsage(outPortion, VersionedIndex.GetLastSchema()->GetIndexInfo())) { - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_skipped") - ("granule", granule)("portion", portionInfo->GetPortion()); - continue; - } else { - AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_selected") - ("granule", granule)("portion", portionInfo->GetPortion()); - } - out->Portions.emplace_back(std::move(outPortion)); - granuleHasDataForSnaphsot = true; + std::vector<std::shared_ptr<TPortionInfo>> orderedPortions = spg->GroupOrderedPortionsByPK(snapshot); + for (const auto& portionInfo : orderedPortions) { + Y_VERIFY(portionInfo->Produced()); + if (!pkRangesFilter.IsPortionInUsage(*portionInfo, VersionedIndex.GetLastSchema()->GetIndexInfo())) { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_skipped") + ("granule", granule)("portion", portionInfo->DebugString()); + continue; + } else { + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_selected") + ("granule", granule)("portion", portionInfo->DebugString()); } + out->PortionsOrderedPK.emplace_back(portionInfo); + granuleHasDataForSnaphsot = true; } if (granuleHasDataForSnaphsot) { diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 4ae43fe7b31..cdfb768fd01 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -79,23 +79,28 @@ void TIndexedReadData::InitRead() { ui64 portionsBytes = 0; std::set<ui64> granulesReady; ui64 prevGranule = 0; - for (auto& portionInfo : ReadMetadata->SelectInfo->GetPortionsOrdered(ReadMetadata->IsDescSorted())) { + THashSet<ui32> columnIdsHash; + for (auto&& i : ReadMetadata->GetAllColumns()) { + columnIdsHash.emplace(i); + } + for (auto& portionSorted : ReadMetadata->SelectInfo->GetPortionsOrdered(ReadMetadata->IsDescSorted())) { + auto portionInfo = portionSorted->CopyWithFilteredColumns(columnIdsHash); portionsBytes += portionInfo.BlobsBytes(); Y_VERIFY_S(portionInfo.Records.size(), "ReadMeatadata: " << *ReadMetadata); NIndexedReader::TGranule::TPtr granule = GranulesContext->UpsertGranule(portionInfo.GetGranule()); - granule->RegisterBatchForFetching(portionInfo); if (prevGranule != portionInfo.GetGranule()) { Y_VERIFY(granulesReady.emplace(portionInfo.GetGranule()).second); prevGranule = portionInfo.GetGranule(); } + granule->RegisterBatchForFetching(std::move(portionInfo)); } GranulesContext->PrepareForStart(); Context.GetCounters().PortionBytes->Add(portionsBytes); auto& stats = ReadMetadata->ReadStats; stats->IndexGranules = ReadMetadata->SelectInfo->Granules.size(); - stats->IndexPortions = ReadMetadata->SelectInfo->Portions.size(); + stats->IndexPortions = ReadMetadata->SelectInfo->PortionsOrderedPK.size(); stats->IndexBatches = ReadMetadata->NumIndexedBlobs(); stats->CommittedBatches = ReadMetadata->CommittedBlobs.size(); stats->SchemaColumns = ReadMetadata->GetSchemaColumnsCount(); @@ -148,7 +153,7 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>& return preparedBatch; } -std::vector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxRowsInBatch) { +std::vector<TPartialReadResult> TIndexedReadData::DoExtractReadyResults(const int64_t maxRowsInBatch) { Y_VERIFY(SortReplaceDescription); auto& indexInfo = ReadMetadata->GetIndexInfo(); @@ -188,14 +193,7 @@ std::vector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t GranulesContext->DrainNotIndexedBatches(nullptr); } - // Extract ready to out granules: ready granules that are not blocked by other (not ready) granules - Y_VERIFY(GranulesContext); - auto out = ReadyToOut(maxRowsInBatch); - const bool requireResult = GranulesContext->IsFinished(); // not indexed or the last indexed read (even if it's empty) - if (requireResult && out.empty()) { - out.push_back(TPartialReadResult(nullptr, NArrow::MakeEmptyBatch(ReadMetadata->GetResultSchema()))); - } - return out; + return ReadyToOut(maxRowsInBatch); } /// @return batches that are not blocked by others @@ -344,21 +342,18 @@ std::vector<TPartialReadResult> TIndexedReadData::MakeResult(std::vector<std::ve return out; } -TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, - const bool internalRead, const TReadContext& context) - : Context(context) - , ReadMetadata(readMetadata) - , OnePhaseReadMode(internalRead) +TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, const TReadContext& context) + : TBase(context, readMetadata) + , OnePhaseReadMode(context.GetIsInternalRead()) { - Y_VERIFY(ReadMetadata->SelectInfo); } -bool TIndexedReadData::IsFinished() const { +bool TIndexedReadData::DoIsFinished() const { Y_VERIFY(GranulesContext); return NotIndexed.empty() && FetchBlobsQueue.empty() && PriorityBlobsQueue.empty() && GranulesContext->IsFinished(); } -void TIndexedReadData::Abort() { +void TIndexedReadData::DoAbort() { Y_VERIFY(GranulesContext); Context.MutableProcessor().Stop(); FetchBlobsQueue.Stop(); @@ -368,36 +363,36 @@ void TIndexedReadData::Abort() { WaitCommitted.clear(); } -NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob(const bool hasReadyResults) { +std::optional<TBlobRange> TIndexedReadData::DoExtractNextBlob(const bool hasReadyResults) { Y_VERIFY(GranulesContext); while (auto* f = PriorityBlobsQueue.front()) { - if (!GranulesContext->IsGranuleActualForProcessing(f->GetGranuleId())) { - ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetGranuleId()); + if (!GranulesContext->IsGranuleActualForProcessing(f->GetObjectId())) { + ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetObjectId()); PriorityBlobsQueue.pop_front(); continue; } - GranulesContext->ForceStartProcessGranule(f->GetGranuleId(), f->GetRange()); + GranulesContext->ForceStartProcessGranule(f->GetObjectId(), f->GetRange()); Context.GetCounters().OnPriorityFetch(f->GetRange().Size); return PriorityBlobsQueue.pop_front(); } while (auto* f = FetchBlobsQueue.front()) { - if (!GranulesContext->IsGranuleActualForProcessing(f->GetGranuleId())) { - ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetGranuleId()); + if (!GranulesContext->IsGranuleActualForProcessing(f->GetObjectId())) { + ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetObjectId()); FetchBlobsQueue.pop_front(); continue; } - if (GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange(), hasReadyResults)) { + if (GranulesContext->TryStartProcessGranule(f->GetObjectId(), f->GetRange(), hasReadyResults)) { Context.GetCounters().OnGeneralFetch(f->GetRange().Size); return FetchBlobsQueue.pop_front(); } else { Context.GetCounters().OnProcessingOverloaded(); - return TBlobRange(); + return {}; } } - return TBlobRange(); + return {}; } void TIndexedReadData::AddNotIndexed(const TBlobRange& blobRange, const TString& column) { @@ -415,7 +410,7 @@ void TIndexedReadData::AddNotIndexed(const TUnifiedBlobId& blobId, const std::sh WaitCommitted.erase(it); } -void TIndexedReadData::AddData(const TBlobRange& blobRange, const TString& data) { +void TIndexedReadData::DoAddData(const TBlobRange& blobRange, const TString& data) { if (GranulesContext->IsFinished()) { ACFL_DEBUG("event", "AddData on GranulesContextFinished"); return; diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index c1b31db0ddc..13fa7e27b3f 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -17,15 +17,14 @@ class TScanIteratorBase; namespace NKikimr::NOlap { -class TIndexedReadData { +class TIndexedReadData: public IDataReader, TNonCopyable { private: - TReadContext Context; + using TBase = IDataReader; std::unique_ptr<NIndexedReader::TGranulesFillingContext> GranulesContext; THashMap<TUnifiedBlobId, NOlap::TCommittedBlob> WaitCommitted; TFetchBlobsQueue FetchBlobsQueue; TFetchBlobsQueue PriorityBlobsQueue; - NOlap::TReadMetadata::TConstPtr ReadMetadata; bool OnePhaseReadMode = false; std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed; @@ -41,28 +40,24 @@ private: bool IsIndexedBlob(const TBlobRange& blobRange) const { return IndexedBlobSubscriber.contains(blobRange); } -public: - TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, const bool internalRead, const TReadContext& context); - - TString DebugString() const { +protected: + virtual TString DoDebugString() const override { return TStringBuilder() - << "internal:" << OnePhaseReadMode << ";" << "wait_committed:" << WaitCommitted.size() << ";" << "granules_context:(" << (GranulesContext ? GranulesContext->DebugString() : "NO") << ");" ; } - const std::shared_ptr<TActorBasedMemoryAccesor>& GetMemoryAccessor() const { - return Context.GetMemoryAccessor(); - } + /// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK) + virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) override; - const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept { - return Context.GetCounters(); - } + virtual void DoAbort() override; + virtual bool DoIsFinished() const override; - const NColumnShard::TDataTasksProcessorContainer& GetTasksProcessor() const noexcept { - return Context.GetProcessor(); - } + virtual void DoAddData(const TBlobRange& blobRange, const TString& data) override; + virtual std::optional<TBlobRange> DoExtractNextBlob(const bool hasReadyResults) override; +public: + TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, const TReadContext& context); NIndexedReader::TGranulesFillingContext& GetGranulesContext() { Y_VERIFY(GranulesContext); @@ -70,13 +65,6 @@ public: } void InitRead(); - void Abort(); - bool IsFinished() const; - - /// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK) - std::vector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch); - - void AddData(const TBlobRange& blobRange, const TString& data); NOlap::TReadMetadata::TConstPtr GetReadMetadata() const { return ReadMetadata; @@ -93,12 +81,6 @@ public: PriorityBlobsQueue.emplace_back(granuleId, range); } - bool HasMoreBlobs() const { - return FetchBlobsQueue.size() || PriorityBlobsQueue.size(); - } - - TBlobRange ExtractNextBlob(const bool hasReadyResults); - private: std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch( const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) const; diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h index dd4a0248415..99f7ca0983e 100644 --- a/ydb/core/tx/columnshard/engines/portions/meta.h +++ b/ydb/core/tx/columnshard/engines/portions/meta.h @@ -2,6 +2,7 @@ #include <ydb/core/tx/columnshard/common/portion.h> #include <ydb/core/formats/arrow/replace_key.h> #include <ydb/core/protos/tx_columnshard.pb.h> +#include <ydb/library/accessor/accessor.h> #include <util/stream/output.h> namespace NKikimr::NOlap { diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 759ee9fae28..026076f3fb6 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -116,6 +116,13 @@ TString TPortionInfo::DebugString() const { sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");"; } sb << "chunks:(" << Records.size() << ");"; + if (IS_TRACE_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) { + std::set<TString> blobIds; + for (auto&& i : Records) { + blobIds.emplace(::ToString(i.BlobRange.BlobId)); + } + sb << "blobs:" << JoinSeq(",", blobIds) << ";blobs_count:" << blobIds.size() << ";"; + } return sb << ")"; } @@ -155,6 +162,25 @@ bool TPortionInfo::HasPkMinMax() const { return result; } +std::vector<const NKikimr::NOlap::TColumnRecord*> TPortionInfo::GetColumnChunksPointers(const ui32 columnId) const { + std::vector<const TColumnRecord*> result; + for (auto&& c : Records) { + if (c.ColumnId == columnId) { + Y_VERIFY(c.Chunk == result.size()); + result.emplace_back(&c); + } + } + return result; +} + +size_t TPortionInfo::NumBlobs() const { + THashSet<TUnifiedBlobId> blobIds; + for (auto&& i : Records) { + blobIds.emplace(i.BlobRange.BlobId); + } + return blobIds.size(); +} + std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const { Y_VERIFY(!Blobs.empty()); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index 88d90b2f3a2..4905b075bcf 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -23,6 +23,8 @@ private: public: static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024; + std::vector<const TColumnRecord*> GetColumnChunksPointers(const ui32 columnId) const; + void ResetMeta() { Meta = TPortionMeta(); } @@ -54,7 +56,8 @@ public: bool IsEvicted() const { return Meta.GetProduced() == TPortionMeta::EProduced::EVICTED; } bool CanHaveDups() const { return !Produced(); /* || IsInserted(); */ } bool CanIntersectOthers() const { return !Valid() || IsInserted() || IsEvicted(); } - size_t NumRecords() const { return Records.size(); } + size_t NumChunks() const { return Records.size(); } + size_t NumBlobs() const; TPortionInfo CopyWithFilteredColumns(const THashSet<ui32>& columnIds) const; @@ -172,6 +175,20 @@ public: return sum; } + bool IsVisible(const TSnapshot& snapshot) const { + if (Empty()) { + return false; + } + + bool visible = (MinSnapshot <= snapshot); + if (visible && RemoveSnapshot.Valid()) { + visible = snapshot < RemoveSnapshot; + } + + AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "IsVisible")("analyze_portion", DebugString())("visible", visible)("snapshot", snapshot.DebugString()); + return visible; + } + void UpdateRecordsMeta(TPortionMeta::EProduced produced) { Meta.Produced = produced; } diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp index 9436f7a217f..5e5823dd1b9 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.cpp +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -7,21 +7,21 @@ namespace NKikimr::NOlap::NIndexedReader { -TBatch::TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo& portionInfo, const ui64 predictedBatchSize) +TBatch::TBatch(const TBatchAddress& address, TGranule& owner, TPortionInfo&& portionInfoExt, const ui64 predictedBatchSize) : BatchAddress(address) - , Portion(portionInfo.GetPortion()) + , Portion(portionInfoExt.GetPortion()) , Granule(owner.GetGranuleId()) , PredictedBatchSize(predictedBatchSize) , Owner(&owner) - , PortionInfo(&portionInfo) + , PortionInfo(std::move(portionInfoExt)) { - Y_VERIFY(Granule == PortionInfo->GetGranule()); - Y_VERIFY(portionInfo.Records.size()); + Y_VERIFY(Granule == PortionInfo.GetGranule()); + Y_VERIFY(PortionInfo.Records.size()); - if (portionInfo.CanIntersectOthers()) { + if (PortionInfo.CanIntersectOthers()) { ACFL_TRACE("event", "intersect_portion"); Owner->SetDuplicationsAvailable(true); - if (portionInfo.CanHaveDups()) { + if (PortionInfo.CanHaveDups()) { ACFL_TRACE("event", "dup_portion"); DuplicationsAvailableFlag = true; } @@ -30,10 +30,10 @@ TBatch::TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, NOlap::TReadMetadata::TConstPtr readMetadata) { Y_VERIFY(WaitIndexed.empty()); - Y_VERIFY(PortionInfo->Produced()); + Y_VERIFY(PortionInfo.Produced()); Y_VERIFY(!FetchedInfo.GetFilteredBatch()); - auto blobSchema = readMetadata->GetLoadSchema(PortionInfo->GetMinSnapshot()); + auto blobSchema = readMetadata->GetLoadSchema(PortionInfo.GetMinSnapshot()); auto readSchema = readMetadata->GetLoadSchema(readMetadata->GetSnapshot()); ISnapshotSchema::TPtr resultSchema; if (CurrentColumnIds) { @@ -41,7 +41,7 @@ NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard } else { resultSchema = readSchema; } - auto batchConstructor = PortionInfo->PrepareForAssemble(*blobSchema, *resultSchema, Data); + auto batchConstructor = PortionInfo.PrepareForAssemble(*blobSchema, *resultSchema, Data); Data.clear(); if (!FetchedInfo.GetFilter()) { return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata, @@ -68,7 +68,7 @@ bool TBatch::AskedColumnsAlready(const std::set<ui32>& columnIds) const { ui64 TBatch::GetFetchBytes(const std::set<ui32>& columnIds) { ui64 result = 0; - for (const NOlap::TColumnRecord& rec : PortionInfo->Records) { + for (const NOlap::TColumnRecord& rec : PortionInfo.Records) { if (!columnIds.contains(rec.ColumnId)) { continue; } @@ -94,7 +94,7 @@ void TBatch::ResetCommon(const std::set<ui32>& columnIds) { void TBatch::ResetNoFilter(const std::set<ui32>& columnIds) { Y_VERIFY(!FetchedInfo.GetFilter()); ResetCommon(columnIds); - for (const NOlap::TColumnRecord& rec : PortionInfo->Records) { + for (const NOlap::TColumnRecord& rec : PortionInfo.Records) { if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) { continue; } @@ -109,7 +109,7 @@ void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) { Y_VERIFY(FetchedInfo.GetFilter()); ResetCommon(columnIds); std::map<ui32, std::map<ui16, const TColumnRecord*>> orderedObjects; - for (const NOlap::TColumnRecord& rec : PortionInfo->Records) { + for (const NOlap::TColumnRecord& rec : PortionInfo.Records) { if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) { continue; } @@ -187,20 +187,20 @@ std::shared_ptr<TSortableBatchPosition> TBatch::GetFirstPK(const bool reverse, c void TBatch::GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std::shared_ptr<TSortableBatchPosition>& from, std::shared_ptr<TSortableBatchPosition>& to) const { auto indexKey = indexInfo.GetIndexKey(); - Y_VERIFY(PortionInfo->Valid()); + Y_VERIFY(PortionInfo.Valid()); if (!FirstPK) { - const NArrow::TReplaceKey& minRecord = PortionInfo->IndexKeyStart(); + const NArrow::TReplaceKey& minRecord = PortionInfo.IndexKeyStart(); auto batch = minRecord.ToBatch(indexKey); Y_VERIFY(batch); - FirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), false); - ReverseLastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), true); + FirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), false); + ReverseLastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), true); } if (!LastPK) { - const NArrow::TReplaceKey& maxRecord = PortionInfo->IndexKeyEnd(); + const NArrow::TReplaceKey& maxRecord = PortionInfo.IndexKeyEnd(); auto batch = maxRecord.ToBatch(indexKey); Y_VERIFY(batch); - LastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), false); - ReverseFirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), true); + LastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), false); + ReverseFirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), true); } if (reverse) { from = *ReverseFirstPK; @@ -216,7 +216,7 @@ bool TBatch::CheckReadyForAssemble() { auto& context = Owner->GetOwner(); auto processor = context.GetTasksProcessor(); if (auto assembleBatchTask = AssembleTask(processor.GetObject(), context.GetReadMetadata())) { - processor.Add(context, assembleBatchTask); + processor.Add(context.GetOwner(), assembleBatchTask); } return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h index fe5593efaf4..338a6107185 100644 --- a/ydb/core/tx/columnshard/engines/reader/batch.h +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -91,7 +91,7 @@ private: YDB_READONLY_DEF(TBatchFetchedInfo, FetchedInfo); THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data; TGranule* Owner; - const TPortionInfo* PortionInfo = nullptr; + const TPortionInfo PortionInfo; YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds); std::set<ui32> AskedColumnIds; @@ -117,7 +117,7 @@ public: } bool AllowEarlyFilter() const { - return PortionInfo->AllowEarlyFilter(); + return PortionInfo.AllowEarlyFilter(); } const TBatchAddress& GetBatchAddress() const { return BatchAddress; @@ -131,7 +131,7 @@ public: return GetUsefulBytes(FetchedBytes); } - TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo& portionInfo, const ui64 predictedBatchSize); + TBatch(const TBatchAddress& address, TGranule& owner, TPortionInfo&& portionInfo, const ui64 predictedBatchSize); bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData); bool AskedColumnsAlready(const std::set<ui32>& columnIds) const; @@ -155,7 +155,7 @@ public: } const TPortionInfo& GetPortionInfo() const { - return *PortionInfo; + return PortionInfo; } }; } diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp index 86151582972..2116c747330 100644 --- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp @@ -12,7 +12,7 @@ bool IDataTasksProcessor::ITask::DoExecute() { } } -bool IDataTasksProcessor::ITask::Apply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const { +bool IDataTasksProcessor::ITask::Apply(NOlap::IDataReader& indexedDataRead) const { if (OwnerOperator) { OwnerOperator->ReplyReceived(); if (OwnerOperator->IsStopped()) { @@ -42,12 +42,12 @@ bool IDataTasksProcessor::Add(ITask::TPtr task) { } -void TDataTasksProcessorContainer::Add(NOlap::NIndexedReader::TGranulesFillingContext& context, IDataTasksProcessor::ITask::TPtr task) { +void TDataTasksProcessorContainer::Add(NOlap::IDataReader& reader, IDataTasksProcessor::ITask::TPtr task) { if (Object) { Object->Add(task); } else { task->Execute(nullptr); - task->Apply(context); + task->Apply(reader); } } diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h index afe942fbe1c..c1fba77d407 100644 --- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h @@ -2,8 +2,8 @@ #include <ydb/core/tx/conveyor/usage/abstract.h> #include <ydb/library/accessor/accessor.h> -namespace NKikimr::NOlap::NIndexedReader { -class TGranulesFillingContext; +namespace NKikimr::NOlap { +class IDataReader; } namespace NKikimr::NColumnShard { @@ -23,7 +23,7 @@ public: bool DataProcessed = false; protected: TDataTasksProcessorContainer GetTasksProcessorContainer() const; - virtual bool DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const = 0; + virtual bool DoApply(NOlap::IDataReader& indexedDataRead) const = 0; virtual bool DoExecuteImpl() = 0; virtual bool DoExecute() override final; @@ -37,7 +37,7 @@ public: using TPtr = std::shared_ptr<ITask>; virtual ~ITask() = default; - bool Apply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const; + bool Apply(NOlap::IDataReader& indexedDataRead) const; bool IsDataProcessed() const noexcept { return DataProcessed; @@ -99,7 +99,7 @@ public: return Object; } - void Add(NOlap::NIndexedReader::TGranulesFillingContext& context, IDataTasksProcessor::ITask::TPtr task); + void Add(NOlap::IDataReader& reader, IDataTasksProcessor::ITask::TPtr task); }; } diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index e76f2f69937..04e6cd01e70 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -32,6 +32,10 @@ private: bool CheckBufferAvailable() const; public: + TIndexedReadData& GetOwner() { + return Owner; + } + std::shared_ptr<TGranulesLiveControl> GetGranulesLiveContext() const { return GranulesLiveContext; } diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp index c278bac23f8..972046bffac 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp @@ -54,11 +54,12 @@ bool TAssembleFilter::DoExecuteImpl() { return true; } -bool TAssembleFilter::DoApply(TGranulesFillingContext& owner) const { +bool TAssembleFilter::DoApply(IDataReader& owner) const { Y_VERIFY(OriginalCount); - owner.GetCounters().OriginalRowsCount->Add(OriginalCount); - owner.GetCounters().AssembleFilterCount->Add(1); - TBatch* batch = owner.GetBatchInfo(BatchAddress); + auto& reader = owner.GetMeAs<TIndexedReadData>(); + reader.GetCounters().OriginalRowsCount->Add(OriginalCount); + reader.GetCounters().AssembleFilterCount->Add(1); + TBatch* batch = reader.GetGranulesContext().GetBatchInfo(BatchAddress); if (batch) { batch->InitFilter(Filter, FilteredBatch, OriginalCount, EarlyFilter); } diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h index 64232c4be1b..0422a62051d 100644 --- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h @@ -25,7 +25,7 @@ namespace NKikimr::NOlap::NIndexedReader { std::set<ui32> FilterColumnIds; IOrderPolicy::TPtr BatchesOrderPolicy; protected: - virtual bool DoApply(TGranulesFillingContext& owner) const override; + virtual bool DoApply(IDataReader& owner) const override; virtual bool DoExecuteImpl() override; public: diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp index d20b7405cc2..ed8f5c4053f 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -38,7 +38,7 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco CheckReady(); } -TBatch& TGranule::RegisterBatchForFetching(const TPortionInfo& portionInfo) { +TBatch& TGranule::RegisterBatchForFetching(TPortionInfo&& portionInfo) { const ui64 batchSize = portionInfo.GetRawBytes(Owner->GetReadMetadata()->GetAllColumns()); RawDataSize += batchSize; const ui64 filtersSize = portionInfo.NumRows() * (8 + 8); @@ -50,7 +50,7 @@ TBatch& TGranule::RegisterBatchForFetching(const TPortionInfo& portionInfo) { Y_VERIFY(!ReadyFlag); ui32 batchGranuleIdx = Batches.size(); WaitBatches.emplace(batchGranuleIdx); - Batches.emplace_back(TBatchAddress(GranuleId, batchGranuleIdx), *this, portionInfo, batchSize); + Batches.emplace_back(TBatchAddress(GranuleId, batchGranuleIdx), *this, std::move(portionInfo), batchSize); Y_VERIFY(GranuleBatchNumbers.emplace(batchGranuleIdx).second); Owner->OnNewBatch(Batches.back()); return Batches.back(); @@ -149,7 +149,7 @@ void TGranule::CheckReady() { ACFL_DEBUG("event", "granule_preparation")("predicted_size", RawDataSize)("real_size", RawDataSizeReal); std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = std::move(RecordBatches); auto processor = Owner->GetTasksProcessor(); - processor.Add(*Owner, std::make_shared<TTaskGranulePreparation>(std::move(inGranule), std::move(BatchesToDedup), GranuleId, Owner->GetReadMetadata(), processor.GetObject())); + processor.Add(Owner->GetOwner(), std::make_shared<TTaskGranulePreparation>(std::move(inGranule), std::move(BatchesToDedup), GranuleId, Owner->GetReadMetadata(), processor.GetObject())); } } } diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h index 89e0ae19879..41de01f2bed 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule.h +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -151,7 +151,7 @@ public: void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); void OnBlobReady(const TBlobRange& range) noexcept; bool OnFilterReady(TBatch& batchInfo); - TBatch& RegisterBatchForFetching(const TPortionInfo& portionInfo); + TBatch& RegisterBatchForFetching(TPortionInfo&& portionInfo); void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const; }; diff --git a/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp b/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp index db05b35257d..14ad92c4362 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp +++ b/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp @@ -1,4 +1,5 @@ #include "granule_preparation.h" +#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> namespace NKikimr::NOlap::NIndexedReader { @@ -69,8 +70,9 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TTaskGranulePreparation::Specia return out; } -bool TTaskGranulePreparation::DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const { - indexedDataRead.GetGranuleVerified(GranuleId)->OnGranuleDataPrepared(std::move(BatchesInGranule)); +bool TTaskGranulePreparation::DoApply(IDataReader& indexedDataRead) const { + auto& readData = indexedDataRead.GetMeAs<TIndexedReadData>(); + readData.GetGranulesContext().GetGranuleVerified(GranuleId)->OnGranuleDataPrepared(std::move(BatchesInGranule)); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/granule_preparation.h b/ydb/core/tx/columnshard/engines/reader/granule_preparation.h index 7b893f89fbd..a7ff39f854b 100644 --- a/ydb/core/tx/columnshard/engines/reader/granule_preparation.h +++ b/ydb/core/tx/columnshard/engines/reader/granule_preparation.h @@ -24,7 +24,7 @@ private: const THashSet<const void*>& batchesToDedup); protected: - virtual bool DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const override; + virtual bool DoApply(IDataReader& indexedDataRead) const override; virtual bool DoExecuteImpl() override; public: diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp index 22890630f81..8ae30314fa1 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp @@ -115,7 +115,7 @@ std::vector<TGranule::TPtr> TPKSortingWithLimit::DoDetachReadyGranules(TResultCo TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata) : TBase(readMetadata) - , MergeStream(readMetadata->GetIndexInfo(readMetadata->GetSnapshot()).GetReplaceKey(), readMetadata->IsDescSorted()) + , MergeStream(readMetadata->GetIndexInfo(readMetadata->GetSnapshot()).GetReplaceKey(), nullptr, readMetadata->IsDescSorted()) { CurrentItemsLimit = ReadMetadata->Limit; } diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp index c48aa234633..b87bc406ef3 100644 --- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp @@ -22,8 +22,9 @@ bool TAssembleBatch::DoExecuteImpl() { return true; } -bool TAssembleBatch::DoApply(TGranulesFillingContext& owner) const { - TBatch* batch = owner.GetBatchInfo(BatchAddress); +bool TAssembleBatch::DoApply(IDataReader& owner) const { + auto& reader = owner.GetMeAs<TIndexedReadData>(); + TBatch* batch = reader.GetGranulesContext().GetBatchInfo(BatchAddress); if (batch) { batch->InitBatch(FullBatch); } diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h index 4a2350c18a6..62e8d84bd41 100644 --- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h @@ -22,7 +22,7 @@ private: const TBatchAddress BatchAddress; protected: - virtual bool DoApply(TGranulesFillingContext& owner) const override; + virtual bool DoApply(IDataReader& owner) const override; virtual bool DoExecuteImpl() override; public: virtual TString GetTaskClassIdentifier() const override { diff --git a/ydb/core/tx/columnshard/engines/reader/queue.cpp b/ydb/core/tx/columnshard/engines/reader/queue.cpp index fc857a54291..e71f26a9d4c 100644 --- a/ydb/core/tx/columnshard/engines/reader/queue.cpp +++ b/ydb/core/tx/columnshard/engines/reader/queue.cpp @@ -2,19 +2,4 @@ namespace NKikimr::NOlap { -NKikimr::NOlap::TBlobRange TFetchBlobsQueue::pop_front() { - if (!StoppedFlag && IteratorBlobsSequential.size()) { - auto result = IteratorBlobsSequential.front(); - IteratorBlobsSequential.pop_front(); - return result.GetRange(); - } else { - return TBlobRange(); - } -} - -void TFetchBlobsQueue::emplace_back(const ui64 granuleId, const TBlobRange& range) { - Y_VERIFY(!StoppedFlag); - IteratorBlobsSequential.emplace_back(granuleId, range); -} - } diff --git a/ydb/core/tx/columnshard/engines/reader/queue.h b/ydb/core/tx/columnshard/engines/reader/queue.h index 7c5415000af..c3f0ee8b13f 100644 --- a/ydb/core/tx/columnshard/engines/reader/queue.h +++ b/ydb/core/tx/columnshard/engines/reader/queue.h @@ -6,31 +6,32 @@ namespace NKikimr::NOlap { class TBatchBlobRange { private: - const ui64 GranuleId; + const ui64 ObjectId; const TBlobRange Range; public: - ui64 GetGranuleId() const { - return GranuleId; + ui64 GetObjectId() const { + return ObjectId; } const TBlobRange& GetRange() const { return Range; } - TBatchBlobRange(const ui64 granuleId, const TBlobRange range) - : GranuleId(granuleId) + TBatchBlobRange(const ui64 objectId, const TBlobRange range) + : ObjectId(objectId) , Range(range) { - + Y_VERIFY(range.BlobId.IsValid()); } }; -class TFetchBlobsQueue { +template <class TFetchTask> +class TFetchBlobsQueueImpl { private: bool StoppedFlag = false; - std::deque<TBatchBlobRange> IteratorBlobsSequential; + std::deque<TFetchTask> IteratorBlobsSequential; public: - const std::deque<TBatchBlobRange>& GetIteratorBlobsSequential() const noexcept { + const std::deque<TFetchTask>& GetIteratorBlobsSequential() const noexcept { return IteratorBlobsSequential; } @@ -51,15 +52,30 @@ public: return IteratorBlobsSequential.size(); } - const TBatchBlobRange* front() const { + const TFetchTask* front() const { if (!IteratorBlobsSequential.size()) { return nullptr; } return &IteratorBlobsSequential.front(); } - TBlobRange pop_front(); - void emplace_back(const ui64 granuleId, const TBlobRange& range); + std::optional<TBlobRange> pop_front() { + if (!StoppedFlag && IteratorBlobsSequential.size()) { + auto result = IteratorBlobsSequential.front(); + IteratorBlobsSequential.pop_front(); + return result.GetRange(); + } else { + return {}; + } + } + + void emplace_back(const ui64 objectId, const TBlobRange& range) { + Y_VERIFY(!StoppedFlag); + IteratorBlobsSequential.emplace_back(objectId, range); + } + }; +using TFetchBlobsQueue = TFetchBlobsQueueImpl<TBatchBlobRange>; + } diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.cpp b/ydb/core/tx/columnshard/engines/reader/read_context.cpp index 33f87457f19..a57a2f82a77 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_context.cpp @@ -1,14 +1,16 @@ #include "read_context.h" +#include "read_metadata.h" #include <library/cpp/actors/core/events.h> namespace NKikimr::NOlap { TReadContext::TReadContext(const NColumnShard::TDataTasksProcessorContainer& processor, const NColumnShard::TConcreteScanCounters& counters, - std::shared_ptr<NOlap::TActorBasedMemoryAccesor> memoryAccessor) + std::shared_ptr<NOlap::TActorBasedMemoryAccesor> memoryAccessor, const bool isInternalRead) : Processor(processor) , Counters(counters) , MemoryAccessor(memoryAccessor) + , IsInternalRead(isInternalRead) { } @@ -17,4 +19,13 @@ void TActorBasedMemoryAccesor::DoOnBufferReady() { OwnerId.Send(OwnerId, new NActors::TEvents::TEvWakeup(1)); } + +IDataReader::IDataReader(const TReadContext& context, NOlap::TReadMetadata::TConstPtr readMetadata) + : Context(context) + , ReadMetadata(readMetadata) +{ + Y_VERIFY(ReadMetadata); + Y_VERIFY(ReadMetadata->SelectInfo); +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h index 5ab203f7981..360f2050a95 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/read_context.h @@ -1,11 +1,15 @@ #pragma once #include "conveyor_task.h" +#include <ydb/core/tx/columnshard/blob.h> +#include <ydb/core/tx/columnshard/columnshard__scan.h> #include <ydb/core/tx/columnshard/counters/scan.h> #include <ydb/core/tx/columnshard/resources/memory.h> #include <library/cpp/actors/core/actor.h> namespace NKikimr::NOlap { +struct TReadMetadata; + class TActorBasedMemoryAccesor: public TScanMemoryLimiter::IMemoryAccessor { private: using TBase = TScanMemoryLimiter::IMemoryAccessor; @@ -25,6 +29,7 @@ private: YDB_ACCESSOR_DEF(NColumnShard::TDataTasksProcessorContainer, Processor); const NColumnShard::TConcreteScanCounters Counters; YDB_READONLY_DEF(std::shared_ptr<NOlap::TActorBasedMemoryAccesor>, MemoryAccessor); + YDB_READONLY(bool, IsInternalRead, false); public: const NColumnShard::TConcreteScanCounters& GetCounters() const { return Counters; @@ -32,14 +37,96 @@ public: TReadContext(const NColumnShard::TDataTasksProcessorContainer& processor, const NColumnShard::TConcreteScanCounters& counters, - std::shared_ptr<NOlap::TActorBasedMemoryAccesor> memoryAccessor + std::shared_ptr<NOlap::TActorBasedMemoryAccesor> memoryAccessor, const bool isInternalRead ); - TReadContext(const NColumnShard::TConcreteScanCounters& counters) + TReadContext(const NColumnShard::TConcreteScanCounters& counters, const bool isInternalRead) : Counters(counters) + , IsInternalRead(isInternalRead) { } }; +class IDataReader { +protected: + TReadContext Context; + std::shared_ptr<const TReadMetadata> ReadMetadata; + virtual void DoAddData(const TBlobRange& blobRange, const TString& data) = 0; + virtual std::optional<TBlobRange> DoExtractNextBlob(const bool hasReadyResults) = 0; + virtual TString DoDebugString() const = 0; + virtual void DoAbort() = 0; + virtual bool DoIsFinished() const = 0; + virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) = 0; +public: + IDataReader(const TReadContext& context, std::shared_ptr<const TReadMetadata> readMetadata); + virtual ~IDataReader() = default; + + const std::shared_ptr<const TReadMetadata>& GetReadMetadata() const { + return ReadMetadata; + } + + const TReadContext& GetContext() const { + return Context; + } + + TReadContext& GetContext() { + return Context; + } + + const std::shared_ptr<TActorBasedMemoryAccesor>& GetMemoryAccessor() const { + return Context.GetMemoryAccessor(); + } + + const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept { + return Context.GetCounters(); + } + + const NColumnShard::TDataTasksProcessorContainer& GetTasksProcessor() const noexcept { + return Context.GetProcessor(); + } + + void Abort() { + return DoAbort(); + } + + template <class T> + T& GetMeAs() { + auto result = dynamic_cast<T*>(this); + Y_VERIFY(result); + return *result; + } + + template <class T> + const T& GetMeAs() const { + auto result = dynamic_cast<const T*>(this); + Y_VERIFY(result); + return *result; + } + + std::vector<TPartialReadResult> ExtractReadyResults(const int64_t maxRowsInBatch) { + return DoExtractReadyResults(maxRowsInBatch); + } + + bool IsFinished() const { + return DoIsFinished(); + } + + TString DebugString() const { + TStringBuilder sb; + sb << "internal:" << Context.GetIsInternalRead() << ";" + << "has_buffer:" << (GetMemoryAccessor() ? GetMemoryAccessor()->HasBuffer() : true) << ";" + ; + sb << DoDebugString(); + return sb; + } + + void AddData(const TBlobRange& blobRange, const TString& data) { + DoAddData(blobRange, data); + } + std::optional<TBlobRange> ExtractNextBlob(const bool hasReadyResults) { + return DoExtractNextBlob(hasReadyResults); + } +}; + } diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp index 9acaa5bf9a8..ef61d957d83 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp @@ -3,32 +3,13 @@ namespace NKikimr::NOlap::NIndexedReader { -bool TSortableBatchPosition::IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const { - if (Fields.size() != (size_t)schema->num_fields()) { - return false; - } - for (ui32 i = 0; i < Fields.size(); ++i) { - if (Fields[i]->type() != schema->field(i)->type()) { - return false; - } - if (Fields[i]->name() != schema->field(i)->name()) { - return false; - } - } - return true; -} - NJson::TJsonValue TSortableBatchPosition::DebugJson() const { NJson::TJsonValue result; result["reverse"] = ReverseSort; result["records_count"] = RecordsCount; result["position"] = Position; - Y_VERIFY(Columns.size() == Fields.size()); - for (ui32 i = 0; i < Columns.size(); ++i) { - auto& jsonColumn = result["columns"].AppendValue(NJson::JSON_MAP); - jsonColumn["name"] = Fields[i]->name(); - jsonColumn["value"] = NArrow::DebugString(Columns[i], Position); - } + result["sorting"] = Sorting.DebugJson(Position); + result["data"] = Data.DebugJson(Position); return result; } @@ -105,7 +86,8 @@ std::optional<ui64> TSortableBatchPosition::FindPosition(std::shared_ptr<arrow:: void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition> point) { Y_VERIFY(point); - Y_VERIFY(point->IsSameSchema(SortSchema)); + Y_VERIFY(point->IsSameSortingSchema(SortSchema)); + Y_VERIFY(point->IsReverseSort() == Reverse); Y_VERIFY(++ControlPoints == 1); SortHeap.emplace_back(TBatchIterator(*point)); @@ -133,11 +115,11 @@ void TMergePartialStream::AddPoolSource(const std::optional<ui32> poolId, std::s void TMergePartialStream::AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap) { if (!filter || filter->IsTotalAllowFilter()) { - SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema->field_names(), Reverse, poolId)); + SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, poolId)); } else if (filter->IsTotalDenyFilter()) { return; } else { - SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema->field_names(), Reverse, poolId)); + SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, poolId)); } if (restoreHeap) { std::push_heap(SortHeap.begin(), SortHeap.end()); @@ -152,11 +134,24 @@ void TMergePartialStream::RemoveControlPoint() { SortHeap.pop_back(); } -bool TMergePartialStream::DrainCurrent() { +bool TMergePartialStream::DrainCurrent(std::shared_ptr<TRecordBatchBuilder> builder, const std::optional<TSortableBatchPosition>& readTo, const bool includeFinish) { if (SortHeap.empty()) { return false; } while (SortHeap.size()) { + if (readTo) { + auto position = TBatchIterator::TPosition(SortHeap.front()); + if (includeFinish) { + if (position.GetKeyColumns().Compare(*readTo) == std::partial_ordering::greater) { + return true; + } + } else { + if (position.GetKeyColumns().Compare(*readTo) != std::partial_ordering::less) { + return true; + } + } + } + auto currentPosition = DrainCurrentPosition(); if (currentPosition.IsControlPoint()) { return false; @@ -165,10 +160,15 @@ bool TMergePartialStream::DrainCurrent() { continue; } if (CurrentKeyColumns) { - Y_VERIFY(CurrentKeyColumns->Compare(currentPosition.GetKeyColumns()) != std::partial_ordering::greater); + AFL_VERIFY(CurrentKeyColumns->Compare(currentPosition.GetKeyColumns()) == std::partial_ordering::less)("merge_debug", DebugJson()); } CurrentKeyColumns = currentPosition.GetKeyColumns(); - return true; + if (builder) { + builder->AddRecord(*CurrentKeyColumns); + } + if (!readTo) { + return true; + } } return false; } @@ -185,4 +185,24 @@ NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const { return result; } +TSortableScanData::TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns) { + for (auto&& i : columns) { + auto c = batch->GetColumnByName(i); + AFL_VERIFY(c)("column_name", i)("columns", JoinSeq(",", columns)); + Columns.emplace_back(c); + auto f = batch->schema()->GetFieldByName(i); + Fields.emplace_back(f); + } +} + +void TRecordBatchBuilder::AddRecord(const TSortableBatchPosition& position) { + Y_VERIFY(position.GetData().GetColumns().size() == Builders.size()); + Y_VERIFY_DEBUG(IsSameFieldsSequence(position.GetData().GetFields(), Fields)); +// AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "record_add_on_read")("record", position.DebugJson()); + for (ui32 i = 0; i < position.GetData().GetColumns().size(); ++i) { + NArrow::Append(*Builders[i], *position.GetData().GetColumns()[i], position.GetPosition()); + } + ++RecordsCount; +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h index c3d3f4f47c6..211beadf74f 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h +++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h @@ -3,35 +3,89 @@ #include <ydb/core/formats/arrow/arrow_filter.h> #include <ydb/core/formats/arrow/arrow_helpers.h> #include <ydb/core/tx/columnshard/engines/index_info.h> +#include <ydb/core/formats/arrow/switch/switch_type.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> #include <util/generic/hash.h> +#include <util/string/join.h> #include <set> namespace NKikimr::NOlap::NIndexedReader { +class TRecordBatchBuilder; + +class TSortableScanData { +private: + YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Array>>, Columns); + YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields); +public: + TSortableScanData() = default; + TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns); + + bool IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const { + if (Fields.size() != (size_t)schema->num_fields()) { + return false; + } + for (ui32 i = 0; i < Fields.size(); ++i) { + if (Fields[i]->type() != schema->field(i)->type()) { + return false; + } + if (Fields[i]->name() != schema->field(i)->name()) { + return false; + } + } + return true; + } + + NJson::TJsonValue DebugJson(const i32 position) const { + NJson::TJsonValue result = NJson::JSON_MAP; + for (ui32 i = 0; i < Columns.size(); ++i) { + auto& jsonColumn = result["sorting_columns"].AppendValue(NJson::JSON_MAP); + jsonColumn["name"] = Fields[i]->name(); + if (position >= 0 && position < Columns[i]->length()) { + jsonColumn["value"] = NArrow::DebugString(Columns[i], position); + } + } + return result; + } + + std::vector<std::string> GetFieldNames() const { + std::vector<std::string> result; + for (auto&& i : Fields) { + result.emplace_back(i->name()); + } + return result; + } +}; + class TSortableBatchPosition { protected: - i64 Position = 0; + + YDB_READONLY(i64, Position, 0); i64 RecordsCount = 0; bool ReverseSort = false; - std::vector<std::shared_ptr<arrow::Array>> Columns; - std::vector<std::shared_ptr<arrow::Field>> Fields; + TSortableScanData Sorting; + TSortableScanData Data; std::shared_ptr<arrow::RecordBatch> Batch; static std::optional<ui64> FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool needGreater, const bool include); public: TSortableBatchPosition() = default; - NJson::TJsonValue DebugJson() const; + const TSortableScanData& GetData() const { + return Data; + } - bool IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const; + bool IsReverseSort() const { + return ReverseSort; + } + NJson::TJsonValue DebugJson() const; TSortableBatchPosition BuildSame(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position) const { - std::vector<std::string> fieldNames; - for (auto&& i : Fields) { - fieldNames.emplace_back(i->name()); - } - return TSortableBatchPosition(batch, position, fieldNames, ReverseSort); + return TSortableBatchPosition(batch, position, Sorting.GetFieldNames(), Data.GetFieldNames(), ReverseSort); + } + + bool IsSameSortingSchema(const std::shared_ptr<arrow::Schema>& schema) { + return Sorting.IsSameSchema(schema); } static std::shared_ptr<arrow::RecordBatch> SelectInterval(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& from, const TSortableBatchPosition& to, const bool includeFrom, const bool includeTo) { @@ -47,28 +101,23 @@ public: return batch->Slice(*idxFrom, *idxTo - *idxFrom + 1); } - TSortableBatchPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position, const std::vector<std::string>& columns, const bool reverseSort) + TSortableBatchPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position, const std::vector<std::string>& sortingColumns, const std::vector<std::string>& dataColumns, const bool reverseSort) : Position(position) , RecordsCount(batch->num_rows()) , ReverseSort(reverseSort) + , Sorting(batch, sortingColumns) + , Data(batch, dataColumns) , Batch(batch) { Y_VERIFY(batch->num_rows()); Y_VERIFY_DEBUG(batch->ValidateFull().ok()); - for (auto&& i : columns) { - auto c = batch->GetColumnByName(i); - Y_VERIFY(c); - Columns.emplace_back(c); - auto f = batch->schema()->GetFieldByName(i); - Fields.emplace_back(f); - } - Y_VERIFY(Columns.size()); + Y_VERIFY(Sorting.GetColumns().size()); } std::partial_ordering Compare(const TSortableBatchPosition& item) const { Y_VERIFY(item.ReverseSort == ReverseSort); - Y_VERIFY_DEBUG(item.Columns.size() == Columns.size()); - const auto directResult = NArrow::ColumnsCompare(Columns, Position, item.Columns, item.Position); + Y_VERIFY(item.Sorting.GetColumns().size() == Sorting.GetColumns().size()); + const auto directResult = NArrow::ColumnsCompare(Sorting.GetColumns(), Position, item.Sorting.GetColumns(), item.Position); if (ReverseSort) { if (directResult == std::partial_ordering::less) { return std::partial_ordering::greater; @@ -86,6 +135,14 @@ public: return Compare(item) == std::partial_ordering::less; } + bool operator==(const TSortableBatchPosition& item) const { + return Compare(item) == std::partial_ordering::equivalent; + } + + bool operator!=(const TSortableBatchPosition& item) const { + return Compare(item) != std::partial_ordering::equivalent; + } + bool NextPosition(const i64 delta) { return InitPosition(Position + delta); } @@ -144,10 +201,10 @@ private: } TBatchIterator(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, - const std::vector<std::string>& keyColumns, const bool reverseSort, const std::optional<ui32> poolId) + const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort, const std::optional<ui32> poolId) : ControlPointFlag(false) - , KeyColumns(batch, 0, keyColumns, reverseSort) - , VersionColumns(batch, 0, TIndexInfo::GetSpecialColumnNames(), false) + , KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort) + , VersionColumns(batch, 0, TIndexInfo::GetSpecialColumnNames(), {}, false) , RecordsCount(batch->num_rows()) , ReverseSortKff(reverseSort ? -1 : 1) , PoolId(poolId) @@ -205,7 +262,7 @@ private: if (!FilterIterator) { return false; } - return FilterIterator->GetCurrentAcceptance(); + return !FilterIterator->GetCurrentAcceptance(); } bool Next() { @@ -248,6 +305,17 @@ private: } }; + NJson::TJsonValue DebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + if (CurrentKeyColumns) { + result["current"] = CurrentKeyColumns->DebugJson(); + } + for (auto&& i : SortHeap) { + result["heap"].AppendValue(i.DebugJson()); + } + return result; + } + bool NextInHeap(const bool needPop) { if (SortHeap.empty()) { return false; @@ -280,6 +348,7 @@ private: THashMap<ui32, std::deque<TIteratorData>> BatchPools; std::vector<TBatchIterator> SortHeap; std::shared_ptr<arrow::Schema> SortSchema; + std::shared_ptr<arrow::Schema> DataSchema; const bool Reverse; ui32 ControlPoints = 0; @@ -303,10 +372,13 @@ private: void AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap); public: - TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, const bool reverse) + TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse) : SortSchema(sortSchema) + , DataSchema(dataSchema) , Reverse(reverse) { + Y_VERIFY(SortSchema); Y_VERIFY(SortSchema->num_fields()); + Y_VERIFY(!DataSchema || DataSchema->num_fields()); } bool IsValid() const { @@ -321,6 +393,10 @@ public: return it->second.size(); } + const std::optional<TSortableBatchPosition>& GetCurrentKeyColumns() const { + return CurrentKeyColumns; + } + void PutControlPoint(std::shared_ptr<TSortableBatchPosition> point); void RemoveControlPoint(); @@ -335,8 +411,46 @@ public: return SortHeap.empty(); } - bool DrainCurrent(); + bool DrainCurrent(std::shared_ptr<TRecordBatchBuilder> builder = nullptr, const std::optional<TSortableBatchPosition>& readTo = {}, const bool includeFinish = false); }; +class TRecordBatchBuilder { +private: + std::vector<std::unique_ptr<arrow::ArrayBuilder>> Builders; + std::vector<std::shared_ptr<arrow::Field>> Fields; + YDB_READONLY(ui32, RecordsCount, 0); + + bool IsSameFieldsSequence(const std::vector<std::shared_ptr<arrow::Field>>& f1, const std::vector<std::shared_ptr<arrow::Field>>& f2) { + if (f1.size() != f2.size()) { + return false; + } + for (ui32 i = 0; i < f1.size(); ++i) { + if (!f1[i]->Equals(f2[i])) { + return false; + } + } + return true; + } + +public: + TRecordBatchBuilder(const std::vector<std::shared_ptr<arrow::Field>>& fields) + : Fields(fields) { + Y_VERIFY(Fields.size()); + for (auto&& f : fields) { + Builders.emplace_back(NArrow::MakeBuilder(f)); + } + } + + std::shared_ptr<arrow::RecordBatch> Finalize() { + auto schema = std::make_shared<arrow::Schema>(Fields); + std::vector<std::shared_ptr<arrow::Array>> columns; + for (auto&& i : Builders) { + columns.emplace_back(NArrow::TStatusValidator::GetValid(i->Finish())); + } + return arrow::RecordBatch::Make(schema, columns.front()->length(), columns); + } + + void AddRecord(const TSortableBatchPosition& position); +}; } diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index c8c8317dbee..332f960e4c3 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -225,4 +225,16 @@ std::shared_ptr<NIndexedReader::IOrderPolicy> TReadMetadata::BuildSortingPolicy( return result; } +std::shared_ptr<NKikimr::NOlap::IDataReader> TReadMetadata::BuildReader(const NOlap::TReadContext& context, const TConstPtr& self) const { +// return std::make_shared<NPlainReader::TPlainReadData>(self, context); + auto result = std::make_shared<TIndexedReadData>(self, context); + result->InitRead(); + return result; +} + +NIndexedReader::TSortableBatchPosition TReadMetadata::BuildSortedPosition(const NArrow::TReplaceKey& key) const { + return NIndexedReader::TSortableBatchPosition(key.ToBatch(GetReplaceKey()), 0, + GetReplaceKey()->field_names(), {}, IsDescSorted()); +} + } diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index cd582350cb8..1aa23d03664 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -144,6 +144,9 @@ private: public: using TConstPtr = std::shared_ptr<const TReadMetadata>; + NIndexedReader::TSortableBatchPosition BuildSortedPosition(const NArrow::TReplaceKey& key) const; + std::shared_ptr<IDataReader> BuildReader(const NOlap::TReadContext& context, const TConstPtr& self) const; + const std::vector<ui32>& GetAllColumns() const { return AllColumns; } @@ -225,7 +228,7 @@ public: bool Empty() const { Y_VERIFY(SelectInfo); - return SelectInfo->Portions.empty() && CommittedBlobs.empty(); + return SelectInfo->PortionsOrderedPK.empty() && CommittedBlobs.empty(); } std::shared_ptr<arrow::Schema> GetSortingKey() const { @@ -253,9 +256,9 @@ public: return ResultIndexSchema->GetIndexInfo().GetPrimaryKey(); } - size_t NumIndexedRecords() const { + size_t NumIndexedChunks() const { Y_VERIFY(SelectInfo); - return SelectInfo->NumRecords(); + return SelectInfo->NumChunks(); } size_t NumIndexedBlobs() const { @@ -267,7 +270,7 @@ public: void Dump(IOutputStream& out) const override { out << "columns: " << GetSchemaColumnsCount() - << " index records: " << NumIndexedRecords() + << " index chunks: " << NumIndexedChunks() << " index blobs: " << NumIndexedBlobs() << " committed blobs: " << CommittedBlobs.size() // << " with program steps: " << (Program ? Program->Steps.size() : 0) diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp index edeb20565bc..8d0ebfccf8e 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp @@ -28,7 +28,7 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnaps auto& resultField = resultArrowSchema->fields()[i]; auto columnId = GetIndexInfo().GetColumnId(resultField->name()); auto oldColumnIndex = dataSchema.GetFieldIndex(columnId); - if (oldColumnIndex >= 0) { // ClumnExists + if (oldColumnIndex >= 0) { // ColumnExists auto oldColumnInfo = dataSchema.GetFieldByIndex(oldColumnIndex); Y_VERIFY(oldColumnInfo); auto columnData = batch->GetColumnByName(oldColumnInfo->name()); diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp index 96efd36cf5d..e82dc955bb9 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.cpp +++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp @@ -45,7 +45,7 @@ void TGranuleMeta::UpsertPortion(const TPortionInfo& info) { it = Portions.emplace(portionNew->GetPortion(), portionNew).first; } else { OnBeforeChangePortion(it->second); - *it->second = info; + it->second = std::make_shared<TPortionInfo>(info); } OnAfterChangePortion(it->second); } diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h index d00e16b498d..8f7dae1afe4 100644 --- a/ydb/core/tx/columnshard/engines/storage/granule.h +++ b/ydb/core/tx/columnshard/engines/storage/granule.h @@ -300,6 +300,20 @@ private: void OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter); void OnAdditiveSummaryChange() const; public: + std::vector<std::shared_ptr<TPortionInfo>> GroupOrderedPortionsByPK(const TSnapshot& snapshot) const { + std::vector<std::shared_ptr<TPortionInfo>> portions; + for (auto&& i : Portions) { + if (i.second->IsVisible(snapshot)) { + portions.emplace_back(i.second); + } + } + const auto pred = [](const std::shared_ptr<TPortionInfo>& l, const std::shared_ptr<TPortionInfo>& r) { + return l->IndexKeyStart() < r->IndexKeyStart(); + }; + std::sort(portions.begin(), portions.end(), pred); + return portions; + } + NOlap::TSerializationStats BuildSerializationStats(ISnapshotSchema::TPtr schema) const { NOlap::TSerializationStats result; for (auto&& i : GetHardSummary().GetColumnIdsSortedBySizeDescending()) { diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp index c5284b9432f..b7492e49b36 100644 --- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp @@ -447,33 +447,29 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { ui64 planStep = 1; ui64 txId = 0; auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false)); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 0); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0); } { // select from snap between insert (greater txId) ui64 planStep = 1; ui64 txId = 2; auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false)); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 0); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0); } { // select from snap after insert (greater planStep) ui64 planStep = 2; ui64 txId = 1; auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 1); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions[0].NumRecords(), 1); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK[0]->NumChunks(), columnIds.size() + TIndexInfo::GetSpecialColumnNames().size()); } { // select another pathId ui64 planStep = 2; ui64 txId = 1; auto selectInfo = engine.Select(paths[1], TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 0); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0); } } @@ -529,8 +525,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // compact planStep = 2; - bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4}); - UNIT_ASSERT(ok); +// bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4}); +// UNIT_ASSERT(ok); // read @@ -544,8 +540,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20); } // predicates @@ -559,21 +554,19 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { NOlap::TPKRangesFilter pkFilter(false); Y_VERIFY(pkFilter.Add(gt10k, nullptr, nullptr)); auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, pkFilter); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10); } { ui64 txId = 1; - std::shared_ptr<TPredicate> lt10k = MakePredicate(9999, NArrow::EOperation::Less); // TODO: better border checks + std::shared_ptr<TPredicate> lt10k = MakePredicate(8999, NArrow::EOperation::Less); // TODO: better border checks if (key[0].second == TTypeInfo(NTypeIds::Utf8)) { - lt10k = MakeStrPredicate("09999", NArrow::EOperation::Less); + lt10k = MakeStrPredicate("08999", NArrow::EOperation::Less); } NOlap::TPKRangesFilter pkFilter(false); Y_VERIFY(pkFilter.Add(nullptr, lt10k, nullptr)); auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, pkFilter); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 9); } } @@ -710,8 +703,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // compact planStep = 2; - bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4}); - UNIT_ASSERT(ok); +// bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4}); +// UNIT_ASSERT(ok); // read planStep = 3; @@ -722,18 +715,16 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // full scan ui64 txId = 1; auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20); } // Cleanup - Cleanup(engine, db, TSnapshot(planStep, 1), 20); + Cleanup(engine, db, TSnapshot(planStep, 1), 0); { // full scan ui64 txId = 1; auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20); } // TTL @@ -742,26 +733,24 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { NOlap::TTiering tiering; tiering.Ttl = NOlap::TTierInfo::MakeTtl(TDuration::MicroSeconds(TInstant::Now().MicroSeconds() - 10000), "timestamp"); pathTtls.emplace(pathId, std::move(tiering)); - Ttl(engine, db, pathTtls, 2); + Ttl(engine, db, pathTtls, 10); // read + load + read { // full scan ui64 txId = 1; auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10); } // load engine.Load(db, lostBlobs); - UNIT_ASSERT_VALUES_EQUAL(engine.GetTotalStats().EmptyGranules, 1); + UNIT_ASSERT_VALUES_EQUAL(engine.GetTotalStats().EmptyGranules, 0); { // full scan ui64 txId = 1; auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false)); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2); - UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10); } } } diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.h b/ydb/core/tx/columnshard/inflight_request_tracker.h index 8d33977ca19..6edf647be93 100644 --- a/ydb/core/tx/columnshard/inflight_request_tracker.h +++ b/ydb/core/tx/columnshard/inflight_request_tracker.h @@ -53,8 +53,8 @@ public: continue; } - for (const auto& portion : readMeta->SelectInfo->Portions) { - const ui64 portionId = portion.GetPortion(); + for (const auto& portion : readMeta->SelectInfo->PortionsOrderedPK) { + const ui64 portionId = portion->GetPortion(); auto it = PortionUseCount.find(portionId); Y_VERIFY(it != PortionUseCount.end(), "Portion id %" PRIu64 " not found in request %" PRIu64, portionId, cookie); if (it->second == 1) { @@ -62,7 +62,7 @@ public: } else { it->second--; } - for (auto& rec : portion.Records) { + for (auto& rec : portion->Records) { if (blobTracker.SetBlobInUse(rec.BlobRange.BlobId, false)) { freedBlobs.emplace(rec.BlobRange.BlobId); } @@ -105,10 +105,10 @@ private: Y_VERIFY(selectInfo); SelectStatsDelta += selectInfo->Stats(); - for (const auto& portion : readMeta->SelectInfo->Portions) { - const ui64 portionId = portion.GetPortion(); + for (const auto& portion : readMeta->SelectInfo->PortionsOrderedPK) { + const ui64 portionId = portion->GetPortion(); PortionUseCount[portionId]++; - for (auto& rec : portion.Records) { + for (auto& rec : portion->Records) { blobTracker.SetBlobInUse(rec.BlobRange.BlobId, true); } } diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index fb5d78cacab..d4c88590ff0 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -9,13 +9,18 @@ namespace { class TReadActor : public TActorBootstrapped<TReadActor> { private: void BuildResult(const TActorContext& ctx) { - auto ready = IndexedData.GetReadyResults(Max<i64>()); + auto ready = IndexedData->ExtractReadyResults(Max<i64>()); LOG_S_TRACE("Ready results with " << ready.size() << " batches at tablet " << TabletId << " (read)"); - - size_t next = 1; - for (auto it = ready.begin(); it != ready.end(); ++it, ++next) { - const bool lastOne = IndexedData.IsFinished() && (next == ready.size()); - SendResult(ctx, it->GetResultBatch(), lastOne); + if (ready.empty()) { + if (IndexedData->IsFinished()) { + SendResult(ctx, nullptr, true); + } + } else { + size_t next = 1; + for (auto it = ready.begin(); it != ready.end(); ++it, ++next) { + const bool lastOne = IndexedData->IsFinished() && (next == ready.size()); + SendResult(ctx, it->GetResultBatch(), lastOne); + } } } public: @@ -35,11 +40,11 @@ public: , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) , Result(std::move(event)) , ReadMetadata(readMetadata) - , IndexedData(ReadMetadata, true, NOlap::TReadContext(counters)) , Deadline(deadline) , ColumnShardActorId(columnShardActorId) , RequestCookie(requestCookie) , ReturnedBatchNo(0) + , Counters(counters) {} void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) { @@ -58,7 +63,7 @@ public: Y_VERIFY(event.Data.size() == event.BlobRange.Size, "%zu, %d", event.Data.size(), event.BlobRange.Size); - IndexedData.AddData(event.BlobRange, event.Data); + IndexedData->AddData(event.BlobRange, event.Data); BuildResult(ctx); DieFinished(ctx); @@ -75,7 +80,7 @@ public: void SendErrorResult(const TActorContext& ctx, NKikimrTxColumnShard::EResultStatus status) { Y_VERIFY(status != NKikimrTxColumnShard::EResultStatus::SUCCESS); SendResult(ctx, {}, true, status); - IndexedData.Abort(); + IndexedData->Abort(); } void SendResult(const TActorContext& ctx, const std::shared_ptr<arrow::RecordBatch>& batch, bool finished = false, @@ -84,16 +89,15 @@ public: auto& proto = Proto(chunkEvent.get()); TString data; - if (batch) { + if (batch && batch->num_rows()) { data = NArrow::SerializeBatchNoCompression(batch); auto metadata = proto.MutableMeta(); metadata->SetFormat(NKikimrTxColumnShard::FORMAT_ARROW); metadata->SetSchema(GetSerializedSchema(batch)); - } - - if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) { - Y_VERIFY(!data.empty()); + if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) { + Y_VERIFY(!data.empty()); + } } proto.SetBatch(ReturnedBatchNo); @@ -132,7 +136,7 @@ public: } void DieFinished(const TActorContext& ctx) { - if (IndexedData.IsFinished()) { + if (IndexedData->IsFinished()) { LOG_S_DEBUG("Finished read (with " << ReturnedBatchNo << " batches sent) at tablet " << TabletId); Send(ColumnShardActorId, new TEvPrivate::TEvReadFinished(RequestCookie)); Die(ctx); @@ -140,9 +144,8 @@ public: } void Bootstrap(const TActorContext& ctx) { - IndexedData.InitRead(); - - LOG_S_DEBUG("Starting read (" << IndexedData.DebugString() << ") at tablet " << TabletId); + IndexedData = ReadMetadata->BuildReader(NOlap::TReadContext(Counters, true), ReadMetadata); + LOG_S_DEBUG("Starting read (" << IndexedData->DebugString() << ") at tablet " << TabletId); bool earlyExit = false; if (Deadline != TInstant::Max()) { @@ -159,9 +162,9 @@ public: SendTimeouts(ctx); ctx.Send(SelfId(), new TEvents::TEvPoisonPill()); } else { - while (IndexedData.HasMoreBlobs()) { - const auto blobRange = IndexedData.ExtractNextBlob(false); - SendReadRequest(ctx, blobRange); + while (const auto blobRange = IndexedData->ExtractNextBlob(false)) { + Y_VERIFY(blobRange->BlobId.IsValid()); + SendReadRequest(ctx, *blobRange); } BuildResult(ctx); } @@ -203,11 +206,12 @@ private: TActorId BlobCacheActorId; std::unique_ptr<TEvColumnShard::TEvReadResult> Result; NOlap::TReadMetadata::TConstPtr ReadMetadata; - NOlap::TIndexedReadData IndexedData; + std::shared_ptr<NOlap::IDataReader> IndexedData; TInstant Deadline; TActorId ColumnShardActorId; const ui64 RequestCookie; ui32 ReturnedBatchNo; + const TConcreteScanCounters Counters; mutable TString SerializedSchema; TString GetSerializedSchema(const std::shared_ptr<arrow::RecordBatch>& batch) const { diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 39fdfc99760..91e28122c08 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -1491,7 +1491,6 @@ void TestReadWithProgram(const TestTableDescription& table = {}) UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT(resRead.GetData().size() > 0); auto& meta = resRead.GetMeta(); auto& schema = meta.GetSchema(); @@ -1501,11 +1500,12 @@ void TestReadWithProgram(const TestTableDescription& table = {}) switch (i) { case 1: + UNIT_ASSERT(resRead.GetData().size() > 0); UNIT_ASSERT(CheckColumns(readData[0], meta, {"level", "timestamp"})); UNIT_ASSERT(DataHas(readData, schema, {0, 100}, true)); break; case 2: - UNIT_ASSERT(CheckColumns(readData[0], meta, {"level", "timestamp"}, 0)); + UNIT_ASSERT(resRead.GetData().size() == 0); break; default: break; @@ -2355,16 +2355,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { std::optional<TBorder> From; std::optional<TBorder> To; std::optional<ui32> ExpectedCount; - bool DataReadOnEmpty = false; - TTestCaseOptions() - : DataReadOnEmpty(false) - {} + TTestCaseOptions() = default; TTestCaseOptions& SetFrom(const TBorder& border) { From = border; return *this; } TTestCaseOptions& SetTo(const TBorder& border) { To = border; return *this; } TTestCaseOptions& SetExpectedCount(ui32 count) { ExpectedCount = count; return *this; } - TTestCaseOptions& SetDataReadOnEmpty(bool flag) { DataReadOnEmpty = flag; return *this; } TSerializedTableRange MakeRange(const std::vector<std::pair<TString, TTypeInfo>>& pk) const { std::vector<TString> mem; @@ -2422,13 +2418,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { UNIT_ASSERT(event); auto& resRead = Proto(event); - Cerr << "[" << __LINE__ << "] " << Owner.YdbPk[0].second.GetTypeId() << " " - << resRead.GetBatch() << " " << resRead.GetData().size() << "\n"; + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("0_type_id", Owner.YdbPk[0].second.GetTypeId())("batch", resRead.GetBatch())("data_size", resRead.GetData().size()); UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - if (ExpectedCount && !*ExpectedCount && !DataReadOnEmpty) { + if (ExpectedCount && !*ExpectedCount) { UNIT_ASSERT(!resRead.GetBatch()); UNIT_ASSERT(resRead.GetFinished()); UNIT_ASSERT(!resRead.GetData().size()); @@ -2485,13 +2480,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } ~TTestCase() { - try { - Execute(); - Cerr << "TEST CASE " << TestCaseName << " FINISHED" << Endl; - } catch (...) { - Cerr << "TEST CASE " << TestCaseName << " FAILED" << Endl; - throw; - } + Execute(); + Cerr << "TEST CASE " << TestCaseName << " FINISHED" << Endl; } }; @@ -2620,14 +2610,13 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { TTabletReadPredicateTest testAgent(runtime, planStep, txId, table.Pk); testAgent.Test(":1)").SetTo(TBorder(val1, false)).SetExpectedCount(1); testAgent.Test(":1]").SetTo(TBorder(val1, true)).SetExpectedCount(2); - testAgent.Test(":0)").SetTo(TBorder(val0, false)).SetExpectedCount(0).SetDataReadOnEmpty(true); + testAgent.Test(":0)").SetTo(TBorder(val0, false)).SetExpectedCount(0); testAgent.Test(":0]").SetTo(TBorder(val0, true)).SetExpectedCount(1); testAgent.Test("[0:0]").SetFrom(TBorder(val0, true)).SetTo(TBorder(val0, true)).SetExpectedCount(1); testAgent.Test("[0:1)").SetFrom(TBorder(val0, true)).SetTo(TBorder(val1, false)).SetExpectedCount(1); - testAgent.Test("(0:1)").SetFrom(TBorder(val0, false)).SetTo(TBorder(val1, false)).SetExpectedCount(0).SetDataReadOnEmpty(true); - testAgent.Test("outscope1").SetFrom(TBorder(val1M, true)).SetTo(TBorder(val1M_1, true)) - .SetExpectedCount(0).SetDataReadOnEmpty(isStrPk0); + testAgent.Test("(0:1)").SetFrom(TBorder(val0, false)).SetTo(TBorder(val1, false)).SetExpectedCount(0); + testAgent.Test("outscope1").SetFrom(TBorder(val1M, true)).SetTo(TBorder(val1M_1, true)).SetExpectedCount(0); // VERIFIED AS INCORRECT INTERVAL (its good) // testAgent.Test("[0-0)").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TBorder(0, false)).SetExpectedCount(0); @@ -2642,7 +2631,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } } else { testAgent.Test("(numRows:").SetFrom(TBorder(valNumRows, false)).SetExpectedCount(0); - testAgent.Test("(numRows-1:").SetFrom(TBorder(valNumRows_1, false)).SetExpectedCount(0).SetDataReadOnEmpty(true); + testAgent.Test("(numRows-1:").SetFrom(TBorder(valNumRows_1, false)).SetExpectedCount(0); testAgent.Test("(numRows-2:").SetFrom(TBorder(valNumRows_2, false)).SetExpectedCount(1); testAgent.Test("[numRows-1:").SetFrom(TBorder(valNumRows_1, true)).SetExpectedCount(1); } |