summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-09-01 14:08:07 +0300
committerivanmorozov <[email protected]>2023-09-01 14:31:36 +0300
commitc6ec06e7c77e4029e41a21fa12f2e38d60eece59 (patch)
tree2f91624b5fbd01a3c0ec30c3de30fdfae0e3a415
parentbf418335042882be6345a85cce0fa92e8c95537a (diff)
KIKIMR-19213: reading through interface
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp25
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h9
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp17
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.h64
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine.h28
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp60
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp55
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h42
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.h1
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp26
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h19
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp42
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h8
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.h10
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filling_context.h4
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp9
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule_preparation.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/queue.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/reader/queue.h40
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_context.cpp13
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_context.h91
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp74
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_filter_merger.h168
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.cpp12
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_metadata.h11
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/storage/granule.h14
-rw-r--r--ydb/core/tx/columnshard/engines/ut_logs_engine.cpp53
-rw-r--r--ydb/core/tx/columnshard/inflight_request_tracker.h12
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp48
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp33
41 files changed, 676 insertions, 368 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index c73a20a2332..6ff5f6e3b38 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -8,31 +8,30 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP
: Context(context)
, ReadyResults(context.GetCounters())
, ReadMetadata(readMetadata)
- , IndexedData(ReadMetadata, false, context)
{
- IndexedData.InitRead();
+ IndexedData = ReadMetadata->BuildReader(context, ReadMetadata);
Y_VERIFY(ReadMetadata->IsSorted());
if (ReadMetadata->Empty()) {
- IndexedData.Abort();
+ IndexedData->Abort();
}
}
void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data) {
- IndexedData.AddData(blobRange, data);
+ IndexedData->AddData(blobRange, data);
}
-NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() {
+NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() {
FillReadyResults();
return ReadyResults.pop_front();
}
-NKikimr::NColumnShard::TBlobRange TColumnShardScanIterator::GetNextBlobToRead() {
- return IndexedData.ExtractNextBlob(ReadyResults.size());
+std::optional<NBlobCache::TBlobRange> TColumnShardScanIterator::GetNextBlobToRead() {
+ return IndexedData->ExtractNextBlob(ReadyResults.size());
}
void TColumnShardScanIterator::FillReadyResults() {
- auto ready = IndexedData.GetReadyResults(MaxRowsInBatch);
+ auto ready = IndexedData->ExtractReadyResults(MaxRowsInBatch);
i64 limitLeft = ReadMetadata->Limit == 0 ? INT64_MAX : ReadMetadata->Limit - ItemsRead;
for (size_t i = 0; i < ready.size() && limitLeft; ++i) {
if (ready[i].GetResultBatch()->num_rows() == 0 && !ready[i].GetLastReadKey()) {
@@ -49,10 +48,10 @@ void TColumnShardScanIterator::FillReadyResults() {
}
if (limitLeft == 0) {
- IndexedData.Abort();
+ IndexedData->Abort();
}
- if (IndexedData.IsFinished()) {
+ if (IndexedData->IsFinished()) {
Context.MutableProcessor().Stop();
}
}
@@ -62,15 +61,15 @@ bool TColumnShardScanIterator::HasWaitingTasks() const {
}
TColumnShardScanIterator::~TColumnShardScanIterator() {
- IndexedData.Abort();
+ IndexedData->Abort();
ReadMetadata->ReadStats->PrintToLog();
}
void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) {
- if (!task->IsDataProcessed() || Context.GetProcessor().IsStopped() || !task->IsSameProcessor(Context.GetProcessor())) {
+ if (!task->IsDataProcessed() || Context.GetProcessor().IsStopped() || !task->IsSameProcessor(Context.GetProcessor()) || IndexedData->IsFinished()) {
return;
}
- Y_VERIFY(task->Apply(IndexedData.GetGranulesContext()));
+ Y_VERIFY(task->Apply(*IndexedData));
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index 88672e6cb25..6d8a77a2af6 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -76,7 +76,7 @@ private:
NOlap::TReadContext Context;
TReadyResults ReadyResults;
NOlap::TReadMetadata::TConstPtr ReadMetadata;
- NOlap::TIndexedReadData IndexedData;
+ std::shared_ptr<NOlap::IDataReader> IndexedData;
ui64 ItemsRead = 0;
const i64 MaxRowsInBatch = 5000;
public:
@@ -90,8 +90,7 @@ public:
virtual TString DebugString() const override {
return TStringBuilder()
<< "ready_results:(" << ReadyResults.DebugString() << ");"
- << "has_buffer:" << IndexedData.GetMemoryAccessor()->HasBuffer() << ";"
- << "indexed_data:(" << IndexedData.DebugString() << ")"
+ << "indexed_data:(" << IndexedData->DebugString() << ")"
;
}
@@ -102,12 +101,12 @@ public:
void AddData(const TBlobRange& blobRange, TString data) override;
bool Finished() const override {
- return IndexedData.IsFinished() && ReadyResults.empty();
+ return IndexedData->IsFinished() && ReadyResults.empty();
}
NOlap::TPartialReadResult GetBatch() override;
- TBlobRange GetNextBlobToRead() override;
+ std::optional<NBlobCache::TBlobRange> GetNextBlobToRead() override;
private:
void FillReadyResults();
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 4d7a80c953a..f7e67460f83 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -109,7 +109,7 @@ public:
Y_VERIFY(!ScanIterator);
MemoryAccessor = std::make_shared<NOlap::TActorBasedMemoryAccesor>(SelfId(), "CSScan/Result");
- NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool, MemoryAccessor);
+ NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool, MemoryAccessor, false);
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
// propagate self actor id // TODO: FlagSubscribeOnSession ?
@@ -149,12 +149,13 @@ private:
THashMap<TUnifiedBlobId, std::vector<NBlobCache::TBlobRange>> ranges;
while (InFlightGuard.CanTake()) {
auto blobRange = ScanIterator->GetNextBlobToRead();
- if (!blobRange.BlobId.IsValid()) {
+ if (!blobRange) {
break;
}
- InFlightGuard.Take(blobRange.Size);
+ Y_VERIFY(blobRange->BlobId.IsValid());
+ InFlightGuard.Take(blobRange->Size);
++InFlightReads;
- ranges[blobRange.BlobId].emplace_back(blobRange);
+ ranges[blobRange->BlobId].emplace_back(*blobRange);
}
if (!InFlightGuard.CanTake()) {
ScanCountersPool.OnReadingOverloaded();
@@ -191,7 +192,9 @@ private:
ACFL_DEBUG("event", "TEvTaskProcessedResult");
auto t = static_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult());
Y_VERIFY_DEBUG(dynamic_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult()));
- ScanIterator->Apply(t);
+ if (!ScanIterator->Finished()) {
+ ScanIterator->Apply(t);
+ }
if (!ScanIterator->HasWaitingTasks() && !NoTasksStartInstant) {
NoTasksStartInstant = Now();
}
@@ -372,7 +375,7 @@ private:
// The loop has finished without any progress!
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " is hanging"
- << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
+ << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId << " debug: " << ScanIterator->DebugString());
Y_VERIFY_DEBUG(false);
}
@@ -445,7 +448,7 @@ private:
return Finish();
}
- NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool, MemoryAccessor);
+ NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool, MemoryAccessor, false);
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
// Used in TArrowToYdbConverter
ResultYqlSchema.clear();
diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h
index f7a1133d18e..464f5b0fccd 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.h
+++ b/ydb/core/tx/columnshard/columnshard__scan.h
@@ -20,11 +20,73 @@ private:
std::shared_ptr<arrow::RecordBatch> LastReadKey;
public:
+ ui64 GetRecordsCount() const {
+ return ResultBatch ? ResultBatch->num_rows() : 0;
+ }
+
+ void InitDirection(const bool reverse) {
+ if (reverse && ResultBatch && ResultBatch->num_rows()) {
+ auto permutation = NArrow::MakePermutation(ResultBatch->num_rows(), true);
+ ResultBatch = NArrow::TStatusValidator::GetValid(arrow::compute::Take(ResultBatch, permutation)).record_batch();
+ }
+ }
+
+ void StripColumns(const std::shared_ptr<arrow::Schema>& schema) {
+ if (ResultBatch) {
+ ResultBatch = NArrow::ExtractColumns(ResultBatch, schema);
+ }
+ }
+
+ void BuildLastKey(const std::shared_ptr<arrow::Schema>& schema) {
+ Y_VERIFY(!LastReadKey);
+ if (ResultBatch && ResultBatch->num_rows()) {
+ auto keyColumns = NArrow::ExtractColumns(ResultBatch, schema);
+ Y_VERIFY(keyColumns);
+ LastReadKey = keyColumns->Slice(keyColumns->num_rows() - 1);
+ }
+ }
+
+ static std::vector<TPartialReadResult> SplitResults(const std::vector<TPartialReadResult>& resultsExt, const ui32 maxRecordsInResult) {
+ std::vector<TPartialReadResult> result;
+ std::shared_ptr<arrow::RecordBatch> currentBatch;
+ for (auto&& i : resultsExt) {
+ std::shared_ptr<arrow::RecordBatch> currentBatchSplitting = i.ResultBatch;
+ while (currentBatchSplitting && currentBatchSplitting->num_rows()) {
+ const ui32 currentRecordsCount = currentBatch ? currentBatch->num_rows() : 0;
+ if (currentRecordsCount + currentBatchSplitting->num_rows() < maxRecordsInResult) {
+ if (!currentBatch) {
+ currentBatch = currentBatchSplitting;
+ } else {
+ currentBatch = NArrow::CombineBatches({currentBatch, currentBatchSplitting});
+ }
+ currentBatchSplitting = nullptr;
+ } else {
+ auto currentSlice = currentBatchSplitting->Slice(0, maxRecordsInResult - currentRecordsCount);
+ if (!currentBatch) {
+ currentBatch = currentSlice;
+ } else {
+ currentBatch = NArrow::CombineBatches({currentBatch, currentSlice});
+ }
+ result.emplace_back(TPartialReadResult(nullptr, currentBatch));
+ currentBatch = nullptr;
+ currentBatchSplitting = currentBatchSplitting->Slice(maxRecordsInResult - currentRecordsCount);
+ }
+ }
+ }
+ if (currentBatch && currentBatch->num_rows()) {
+ result.emplace_back(TPartialReadResult(nullptr, currentBatch));
+ }
+ return result;
+ }
+
void Slice(const ui32 offset, const ui32 length) {
ResultBatch = ResultBatch->Slice(offset, length);
}
void ApplyProgram(const NOlap::TProgramContainer& program) {
+ if (!program.HasProgram()) {
+ return;
+ }
Y_VERIFY(!MemoryGuardInternal);
auto status = program.ApplyProgram(ResultBatch);
if (!status.ok()) {
@@ -82,7 +144,7 @@ public:
virtual bool HasWaitingTasks() const = 0;
virtual bool Finished() const = 0;
virtual NOlap::TPartialReadResult GetBatch() = 0;
- virtual NBlobCache::TBlobRange GetNextBlobToRead() { return NBlobCache::TBlobRange(); }
+ virtual std::optional<NBlobCache::TBlobRange> GetNextBlobToRead() { return {}; }
virtual TString DebugString() const {
return "NO_DATA";
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index dfed51f0269..d9e36b4838a 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -859,8 +859,8 @@ void TColumnShard::MapExternBlobs(const TActorContext& /*ctx*/, NOlap::TReadMeta
}
THashSet<TUnifiedBlobId> uniqBlobs;
- for (auto& portion : metadata.SelectInfo->Portions) {
- for (auto& rec : portion.Records) {
+ for (auto& portion : metadata.SelectInfo->PortionsOrderedPK) {
+ for (auto& rec : portion->Records) {
uniqBlobs.insert(rec.BlobRange.BlobId);
}
}
diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h
index 027c9c5bbc2..3fcff293c1c 100644
--- a/ydb/core/tx/columnshard/engines/column_engine.h
+++ b/ydb/core/tx/columnshard/engines/column_engine.h
@@ -38,20 +38,20 @@ struct TSelectInfo {
};
std::vector<TGranuleRecord> Granules; // ordered by key (ascending)
- std::vector<TPortionInfo> Portions;
+ std::vector<std::shared_ptr<TPortionInfo>> PortionsOrderedPK;
- NColumnShard::TContainerAccessorWithDirection<std::vector<TPortionInfo>> GetPortionsOrdered(const bool reverse) const {
- return NColumnShard::TContainerAccessorWithDirection<std::vector<TPortionInfo>>(Portions, reverse);
+ NColumnShard::TContainerAccessorWithDirection<std::vector<std::shared_ptr<TPortionInfo>>> GetPortionsOrdered(const bool reverse) const {
+ return NColumnShard::TContainerAccessorWithDirection<std::vector<std::shared_ptr<TPortionInfo>>>(PortionsOrderedPK, reverse);
}
NColumnShard::TContainerAccessorWithDirection<std::vector<TGranuleRecord>> GetGranulesOrdered(const bool reverse) const {
return NColumnShard::TContainerAccessorWithDirection<std::vector<TGranuleRecord>>(Granules, reverse);
}
- size_t NumRecords() const {
+ size_t NumChunks() const {
size_t records = 0;
- for (auto& portionInfo : Portions) {
- records += portionInfo.NumRecords();
+ for (auto& portionInfo : PortionsOrderedPK) {
+ records += portionInfo->NumChunks();
}
return records;
}
@@ -59,13 +59,13 @@ struct TSelectInfo {
TStats Stats() const {
TStats out;
out.Granules = Granules.size();
- out.Portions = Portions.size();
+ out.Portions = PortionsOrderedPK.size();
THashSet<TUnifiedBlobId> uniqBlob;
- for (auto& portionInfo : Portions) {
- out.Records += portionInfo.NumRecords();
- out.Rows += portionInfo.NumRows();
- for (auto& rec : portionInfo.Records) {
+ for (auto& portionInfo : PortionsOrderedPK) {
+ out.Records += portionInfo->NumChunks();
+ out.Rows += portionInfo->NumRows();
+ for (auto& rec : portionInfo->Records) {
uniqBlob.insert(rec.BlobRange.BlobId);
}
}
@@ -84,10 +84,10 @@ struct TSelectInfo {
}
out << "; ";
}
- if (info.Portions.size()) {
+ if (info.PortionsOrderedPK.size()) {
out << "portions:";
- for (auto& portionInfo : info.Portions) {
- out << portionInfo;
+ for (auto& portionInfo : info.PortionsOrderedPK) {
+ out << portionInfo->DebugString();
}
}
return out;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index 70375672d3f..3451c1cbc3c 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -339,7 +339,7 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
auto spg = Granules[granule];
Y_VERIFY(spg);
for (auto& [portion, info] : spg->GetPortions()) {
- affectedRecords += info->NumRecords();
+ affectedRecords += info->NumChunks();
changes->PortionsToDrop.push_back(*info);
dropPortions.insert(portion);
}
@@ -380,7 +380,7 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
if (!portionInfo) {
it = CleanupPortions.erase(it);
} else if (portionInfo->CheckForCleanup(snapshot)) {
- affectedRecords += portionInfo->NumRecords();
+ affectedRecords += portionInfo->NumChunks();
changes->PortionsToDrop.push_back(*portionInfo);
it = CleanupPortions.erase(it);
if (affectedRecords > maxRecords) {
@@ -484,7 +484,7 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
}
}
if (!keep && context.AllowDrop) {
- dropBlobs += info->NumRecords();
+ dropBlobs += info->NumBlobs();
context.Changes->PortionsToDrop.push_back(*info);
SignalCounters.OnPortionToDrop(info->BlobsBytes());
}
@@ -658,31 +658,8 @@ bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool up
}
}
-static TMap<TSnapshot, std::vector<std::shared_ptr<TPortionInfo>>> GroupPortionsBySnapshot(const THashMap<ui64, std::shared_ptr<TPortionInfo>>& portions, const TSnapshot& snapshot) {
- TMap<TSnapshot, std::vector<std::shared_ptr<TPortionInfo>>> out;
- for (const auto& [portion, portionInfo] : portions) {
- if (portionInfo->Empty()) {
- continue;
- }
-
- TSnapshot recSnapshot = portionInfo->GetMinSnapshot();
- TSnapshot recXSnapshot = portionInfo->GetRemoveSnapshot();
-
- bool visible = (recSnapshot <= snapshot);
- if (recXSnapshot.GetPlanStep()) {
- visible = visible && snapshot < recXSnapshot;
- }
-
- if (visible) {
- out[recSnapshot].push_back(portionInfo);
- }
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "GroupPortionsBySnapshot")("analyze_portion", portionInfo->DebugString())("visible", visible)("snapshot", snapshot.DebugString());
- }
- return out;
-}
-
std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot snapshot,
- const THashSet<ui32>& columnIds,
+ const THashSet<ui32>& /*columnIds*/,
const TPKRangesFilter& pkRangesFilter) const
{
auto out = std::make_shared<TSelectInfo>();
@@ -695,7 +672,6 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
return out;
}
out->Granules.reserve(pathGranules.size());
- // TODO: out.Portions.reserve()
std::optional<TMarksMap::const_iterator> previousIterator;
const bool compositeMark = UseCompositeMarks();
@@ -737,25 +713,21 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(ui64 pathId, TSnapshot
Y_VERIFY(it != Granules.end());
auto& spg = it->second;
Y_VERIFY(spg);
- auto& portions = spg->GetPortions();
bool granuleHasDataForSnaphsot = false;
- TMap<TSnapshot, std::vector<std::shared_ptr<TPortionInfo>>> orderedPortions = GroupPortionsBySnapshot(portions, snapshot);
- for (auto& [snap, vec] : orderedPortions) {
- for (const auto& portionInfo : vec) {
- TPortionInfo outPortion = portionInfo->CopyWithFilteredColumns(columnIds);
- Y_VERIFY(outPortion.Produced());
- if (!pkRangesFilter.IsPortionInUsage(outPortion, VersionedIndex.GetLastSchema()->GetIndexInfo())) {
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_skipped")
- ("granule", granule)("portion", portionInfo->GetPortion());
- continue;
- } else {
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_selected")
- ("granule", granule)("portion", portionInfo->GetPortion());
- }
- out->Portions.emplace_back(std::move(outPortion));
- granuleHasDataForSnaphsot = true;
+ std::vector<std::shared_ptr<TPortionInfo>> orderedPortions = spg->GroupOrderedPortionsByPK(snapshot);
+ for (const auto& portionInfo : orderedPortions) {
+ Y_VERIFY(portionInfo->Produced());
+ if (!pkRangesFilter.IsPortionInUsage(*portionInfo, VersionedIndex.GetLastSchema()->GetIndexInfo())) {
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_skipped")
+ ("granule", granule)("portion", portionInfo->DebugString());
+ continue;
+ } else {
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "portion_selected")
+ ("granule", granule)("portion", portionInfo->DebugString());
}
+ out->PortionsOrderedPK.emplace_back(portionInfo);
+ granuleHasDataForSnaphsot = true;
}
if (granuleHasDataForSnaphsot) {
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 4ae43fe7b31..cdfb768fd01 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -79,23 +79,28 @@ void TIndexedReadData::InitRead() {
ui64 portionsBytes = 0;
std::set<ui64> granulesReady;
ui64 prevGranule = 0;
- for (auto& portionInfo : ReadMetadata->SelectInfo->GetPortionsOrdered(ReadMetadata->IsDescSorted())) {
+ THashSet<ui32> columnIdsHash;
+ for (auto&& i : ReadMetadata->GetAllColumns()) {
+ columnIdsHash.emplace(i);
+ }
+ for (auto& portionSorted : ReadMetadata->SelectInfo->GetPortionsOrdered(ReadMetadata->IsDescSorted())) {
+ auto portionInfo = portionSorted->CopyWithFilteredColumns(columnIdsHash);
portionsBytes += portionInfo.BlobsBytes();
Y_VERIFY_S(portionInfo.Records.size(), "ReadMeatadata: " << *ReadMetadata);
NIndexedReader::TGranule::TPtr granule = GranulesContext->UpsertGranule(portionInfo.GetGranule());
- granule->RegisterBatchForFetching(portionInfo);
if (prevGranule != portionInfo.GetGranule()) {
Y_VERIFY(granulesReady.emplace(portionInfo.GetGranule()).second);
prevGranule = portionInfo.GetGranule();
}
+ granule->RegisterBatchForFetching(std::move(portionInfo));
}
GranulesContext->PrepareForStart();
Context.GetCounters().PortionBytes->Add(portionsBytes);
auto& stats = ReadMetadata->ReadStats;
stats->IndexGranules = ReadMetadata->SelectInfo->Granules.size();
- stats->IndexPortions = ReadMetadata->SelectInfo->Portions.size();
+ stats->IndexPortions = ReadMetadata->SelectInfo->PortionsOrderedPK.size();
stats->IndexBatches = ReadMetadata->NumIndexedBlobs();
stats->CommittedBatches = ReadMetadata->CommittedBlobs.size();
stats->SchemaColumns = ReadMetadata->GetSchemaColumnsCount();
@@ -148,7 +153,7 @@ TIndexedReadData::MakeNotIndexedBatch(const std::shared_ptr<arrow::RecordBatch>&
return preparedBatch;
}
-std::vector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxRowsInBatch) {
+std::vector<TPartialReadResult> TIndexedReadData::DoExtractReadyResults(const int64_t maxRowsInBatch) {
Y_VERIFY(SortReplaceDescription);
auto& indexInfo = ReadMetadata->GetIndexInfo();
@@ -188,14 +193,7 @@ std::vector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t
GranulesContext->DrainNotIndexedBatches(nullptr);
}
- // Extract ready to out granules: ready granules that are not blocked by other (not ready) granules
- Y_VERIFY(GranulesContext);
- auto out = ReadyToOut(maxRowsInBatch);
- const bool requireResult = GranulesContext->IsFinished(); // not indexed or the last indexed read (even if it's empty)
- if (requireResult && out.empty()) {
- out.push_back(TPartialReadResult(nullptr, NArrow::MakeEmptyBatch(ReadMetadata->GetResultSchema())));
- }
- return out;
+ return ReadyToOut(maxRowsInBatch);
}
/// @return batches that are not blocked by others
@@ -344,21 +342,18 @@ std::vector<TPartialReadResult> TIndexedReadData::MakeResult(std::vector<std::ve
return out;
}
-TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata,
- const bool internalRead, const TReadContext& context)
- : Context(context)
- , ReadMetadata(readMetadata)
- , OnePhaseReadMode(internalRead)
+TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, const TReadContext& context)
+ : TBase(context, readMetadata)
+ , OnePhaseReadMode(context.GetIsInternalRead())
{
- Y_VERIFY(ReadMetadata->SelectInfo);
}
-bool TIndexedReadData::IsFinished() const {
+bool TIndexedReadData::DoIsFinished() const {
Y_VERIFY(GranulesContext);
return NotIndexed.empty() && FetchBlobsQueue.empty() && PriorityBlobsQueue.empty() && GranulesContext->IsFinished();
}
-void TIndexedReadData::Abort() {
+void TIndexedReadData::DoAbort() {
Y_VERIFY(GranulesContext);
Context.MutableProcessor().Stop();
FetchBlobsQueue.Stop();
@@ -368,36 +363,36 @@ void TIndexedReadData::Abort() {
WaitCommitted.clear();
}
-NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob(const bool hasReadyResults) {
+std::optional<TBlobRange> TIndexedReadData::DoExtractNextBlob(const bool hasReadyResults) {
Y_VERIFY(GranulesContext);
while (auto* f = PriorityBlobsQueue.front()) {
- if (!GranulesContext->IsGranuleActualForProcessing(f->GetGranuleId())) {
- ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetGranuleId());
+ if (!GranulesContext->IsGranuleActualForProcessing(f->GetObjectId())) {
+ ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetObjectId());
PriorityBlobsQueue.pop_front();
continue;
}
- GranulesContext->ForceStartProcessGranule(f->GetGranuleId(), f->GetRange());
+ GranulesContext->ForceStartProcessGranule(f->GetObjectId(), f->GetRange());
Context.GetCounters().OnPriorityFetch(f->GetRange().Size);
return PriorityBlobsQueue.pop_front();
}
while (auto* f = FetchBlobsQueue.front()) {
- if (!GranulesContext->IsGranuleActualForProcessing(f->GetGranuleId())) {
- ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetGranuleId());
+ if (!GranulesContext->IsGranuleActualForProcessing(f->GetObjectId())) {
+ ACFL_DEBUG("event", "!IsGranuleActualForProcessing")("granule_id", f->GetObjectId());
FetchBlobsQueue.pop_front();
continue;
}
- if (GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange(), hasReadyResults)) {
+ if (GranulesContext->TryStartProcessGranule(f->GetObjectId(), f->GetRange(), hasReadyResults)) {
Context.GetCounters().OnGeneralFetch(f->GetRange().Size);
return FetchBlobsQueue.pop_front();
} else {
Context.GetCounters().OnProcessingOverloaded();
- return TBlobRange();
+ return {};
}
}
- return TBlobRange();
+ return {};
}
void TIndexedReadData::AddNotIndexed(const TBlobRange& blobRange, const TString& column) {
@@ -415,7 +410,7 @@ void TIndexedReadData::AddNotIndexed(const TUnifiedBlobId& blobId, const std::sh
WaitCommitted.erase(it);
}
-void TIndexedReadData::AddData(const TBlobRange& blobRange, const TString& data) {
+void TIndexedReadData::DoAddData(const TBlobRange& blobRange, const TString& data) {
if (GranulesContext->IsFinished()) {
ACFL_DEBUG("event", "AddData on GranulesContextFinished");
return;
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index c1b31db0ddc..13fa7e27b3f 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -17,15 +17,14 @@ class TScanIteratorBase;
namespace NKikimr::NOlap {
-class TIndexedReadData {
+class TIndexedReadData: public IDataReader, TNonCopyable {
private:
- TReadContext Context;
+ using TBase = IDataReader;
std::unique_ptr<NIndexedReader::TGranulesFillingContext> GranulesContext;
THashMap<TUnifiedBlobId, NOlap::TCommittedBlob> WaitCommitted;
TFetchBlobsQueue FetchBlobsQueue;
TFetchBlobsQueue PriorityBlobsQueue;
- NOlap::TReadMetadata::TConstPtr ReadMetadata;
bool OnePhaseReadMode = false;
std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed;
@@ -41,28 +40,24 @@ private:
bool IsIndexedBlob(const TBlobRange& blobRange) const {
return IndexedBlobSubscriber.contains(blobRange);
}
-public:
- TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, const bool internalRead, const TReadContext& context);
-
- TString DebugString() const {
+protected:
+ virtual TString DoDebugString() const override {
return TStringBuilder()
- << "internal:" << OnePhaseReadMode << ";"
<< "wait_committed:" << WaitCommitted.size() << ";"
<< "granules_context:(" << (GranulesContext ? GranulesContext->DebugString() : "NO") << ");"
;
}
- const std::shared_ptr<TActorBasedMemoryAccesor>& GetMemoryAccessor() const {
- return Context.GetMemoryAccessor();
- }
+ /// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK)
+ virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) override;
- const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept {
- return Context.GetCounters();
- }
+ virtual void DoAbort() override;
+ virtual bool DoIsFinished() const override;
- const NColumnShard::TDataTasksProcessorContainer& GetTasksProcessor() const noexcept {
- return Context.GetProcessor();
- }
+ virtual void DoAddData(const TBlobRange& blobRange, const TString& data) override;
+ virtual std::optional<TBlobRange> DoExtractNextBlob(const bool hasReadyResults) override;
+public:
+ TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, const TReadContext& context);
NIndexedReader::TGranulesFillingContext& GetGranulesContext() {
Y_VERIFY(GranulesContext);
@@ -70,13 +65,6 @@ public:
}
void InitRead();
- void Abort();
- bool IsFinished() const;
-
- /// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK)
- std::vector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch);
-
- void AddData(const TBlobRange& blobRange, const TString& data);
NOlap::TReadMetadata::TConstPtr GetReadMetadata() const {
return ReadMetadata;
@@ -93,12 +81,6 @@ public:
PriorityBlobsQueue.emplace_back(granuleId, range);
}
- bool HasMoreBlobs() const {
- return FetchBlobsQueue.size() || PriorityBlobsQueue.size();
- }
-
- TBlobRange ExtractNextBlob(const bool hasReadyResults);
-
private:
std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch(
const std::shared_ptr<arrow::RecordBatch>& batch, const TSnapshot& snapshot) const;
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h
index dd4a0248415..99f7ca0983e 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.h
+++ b/ydb/core/tx/columnshard/engines/portions/meta.h
@@ -2,6 +2,7 @@
#include <ydb/core/tx/columnshard/common/portion.h>
#include <ydb/core/formats/arrow/replace_key.h>
#include <ydb/core/protos/tx_columnshard.pb.h>
+#include <ydb/library/accessor/accessor.h>
#include <util/stream/output.h>
namespace NKikimr::NOlap {
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index 759ee9fae28..026076f3fb6 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -116,6 +116,13 @@ TString TPortionInfo::DebugString() const {
sb << "remove_snapshot:(" << RemoveSnapshot.DebugString() << ");";
}
sb << "chunks:(" << Records.size() << ");";
+ if (IS_TRACE_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD)) {
+ std::set<TString> blobIds;
+ for (auto&& i : Records) {
+ blobIds.emplace(::ToString(i.BlobRange.BlobId));
+ }
+ sb << "blobs:" << JoinSeq(",", blobIds) << ";blobs_count:" << blobIds.size() << ";";
+ }
return sb << ")";
}
@@ -155,6 +162,25 @@ bool TPortionInfo::HasPkMinMax() const {
return result;
}
+std::vector<const NKikimr::NOlap::TColumnRecord*> TPortionInfo::GetColumnChunksPointers(const ui32 columnId) const {
+ std::vector<const TColumnRecord*> result;
+ for (auto&& c : Records) {
+ if (c.ColumnId == columnId) {
+ Y_VERIFY(c.Chunk == result.size());
+ result.emplace_back(&c);
+ }
+ }
+ return result;
+}
+
+size_t TPortionInfo::NumBlobs() const {
+ THashSet<TUnifiedBlobId> blobIds;
+ for (auto&& i : Records) {
+ blobIds.emplace(i.BlobRange.BlobId);
+ }
+ return blobIds.size();
+}
+
std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const {
Y_VERIFY(!Blobs.empty());
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index 88d90b2f3a2..4905b075bcf 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -23,6 +23,8 @@ private:
public:
static constexpr const ui32 BLOB_BYTES_LIMIT = 8 * 1024 * 1024;
+ std::vector<const TColumnRecord*> GetColumnChunksPointers(const ui32 columnId) const;
+
void ResetMeta() {
Meta = TPortionMeta();
}
@@ -54,7 +56,8 @@ public:
bool IsEvicted() const { return Meta.GetProduced() == TPortionMeta::EProduced::EVICTED; }
bool CanHaveDups() const { return !Produced(); /* || IsInserted(); */ }
bool CanIntersectOthers() const { return !Valid() || IsInserted() || IsEvicted(); }
- size_t NumRecords() const { return Records.size(); }
+ size_t NumChunks() const { return Records.size(); }
+ size_t NumBlobs() const;
TPortionInfo CopyWithFilteredColumns(const THashSet<ui32>& columnIds) const;
@@ -172,6 +175,20 @@ public:
return sum;
}
+ bool IsVisible(const TSnapshot& snapshot) const {
+ if (Empty()) {
+ return false;
+ }
+
+ bool visible = (MinSnapshot <= snapshot);
+ if (visible && RemoveSnapshot.Valid()) {
+ visible = snapshot < RemoveSnapshot;
+ }
+
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "IsVisible")("analyze_portion", DebugString())("visible", visible)("snapshot", snapshot.DebugString());
+ return visible;
+ }
+
void UpdateRecordsMeta(TPortionMeta::EProduced produced) {
Meta.Produced = produced;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
index 9436f7a217f..5e5823dd1b9 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -7,21 +7,21 @@
namespace NKikimr::NOlap::NIndexedReader {
-TBatch::TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo& portionInfo, const ui64 predictedBatchSize)
+TBatch::TBatch(const TBatchAddress& address, TGranule& owner, TPortionInfo&& portionInfoExt, const ui64 predictedBatchSize)
: BatchAddress(address)
- , Portion(portionInfo.GetPortion())
+ , Portion(portionInfoExt.GetPortion())
, Granule(owner.GetGranuleId())
, PredictedBatchSize(predictedBatchSize)
, Owner(&owner)
- , PortionInfo(&portionInfo)
+ , PortionInfo(std::move(portionInfoExt))
{
- Y_VERIFY(Granule == PortionInfo->GetGranule());
- Y_VERIFY(portionInfo.Records.size());
+ Y_VERIFY(Granule == PortionInfo.GetGranule());
+ Y_VERIFY(PortionInfo.Records.size());
- if (portionInfo.CanIntersectOthers()) {
+ if (PortionInfo.CanIntersectOthers()) {
ACFL_TRACE("event", "intersect_portion");
Owner->SetDuplicationsAvailable(true);
- if (portionInfo.CanHaveDups()) {
+ if (PortionInfo.CanHaveDups()) {
ACFL_TRACE("event", "dup_portion");
DuplicationsAvailableFlag = true;
}
@@ -30,10 +30,10 @@ TBatch::TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo
NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, NOlap::TReadMetadata::TConstPtr readMetadata) {
Y_VERIFY(WaitIndexed.empty());
- Y_VERIFY(PortionInfo->Produced());
+ Y_VERIFY(PortionInfo.Produced());
Y_VERIFY(!FetchedInfo.GetFilteredBatch());
- auto blobSchema = readMetadata->GetLoadSchema(PortionInfo->GetMinSnapshot());
+ auto blobSchema = readMetadata->GetLoadSchema(PortionInfo.GetMinSnapshot());
auto readSchema = readMetadata->GetLoadSchema(readMetadata->GetSnapshot());
ISnapshotSchema::TPtr resultSchema;
if (CurrentColumnIds) {
@@ -41,7 +41,7 @@ NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard
} else {
resultSchema = readSchema;
}
- auto batchConstructor = PortionInfo->PrepareForAssemble(*blobSchema, *resultSchema, Data);
+ auto batchConstructor = PortionInfo.PrepareForAssemble(*blobSchema, *resultSchema, Data);
Data.clear();
if (!FetchedInfo.GetFilter()) {
return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata,
@@ -68,7 +68,7 @@ bool TBatch::AskedColumnsAlready(const std::set<ui32>& columnIds) const {
ui64 TBatch::GetFetchBytes(const std::set<ui32>& columnIds) {
ui64 result = 0;
- for (const NOlap::TColumnRecord& rec : PortionInfo->Records) {
+ for (const NOlap::TColumnRecord& rec : PortionInfo.Records) {
if (!columnIds.contains(rec.ColumnId)) {
continue;
}
@@ -94,7 +94,7 @@ void TBatch::ResetCommon(const std::set<ui32>& columnIds) {
void TBatch::ResetNoFilter(const std::set<ui32>& columnIds) {
Y_VERIFY(!FetchedInfo.GetFilter());
ResetCommon(columnIds);
- for (const NOlap::TColumnRecord& rec : PortionInfo->Records) {
+ for (const NOlap::TColumnRecord& rec : PortionInfo.Records) {
if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) {
continue;
}
@@ -109,7 +109,7 @@ void TBatch::ResetWithFilter(const std::set<ui32>& columnIds) {
Y_VERIFY(FetchedInfo.GetFilter());
ResetCommon(columnIds);
std::map<ui32, std::map<ui16, const TColumnRecord*>> orderedObjects;
- for (const NOlap::TColumnRecord& rec : PortionInfo->Records) {
+ for (const NOlap::TColumnRecord& rec : PortionInfo.Records) {
if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) {
continue;
}
@@ -187,20 +187,20 @@ std::shared_ptr<TSortableBatchPosition> TBatch::GetFirstPK(const bool reverse, c
void TBatch::GetPKBorders(const bool reverse, const TIndexInfo& indexInfo, std::shared_ptr<TSortableBatchPosition>& from, std::shared_ptr<TSortableBatchPosition>& to) const {
auto indexKey = indexInfo.GetIndexKey();
- Y_VERIFY(PortionInfo->Valid());
+ Y_VERIFY(PortionInfo.Valid());
if (!FirstPK) {
- const NArrow::TReplaceKey& minRecord = PortionInfo->IndexKeyStart();
+ const NArrow::TReplaceKey& minRecord = PortionInfo.IndexKeyStart();
auto batch = minRecord.ToBatch(indexKey);
Y_VERIFY(batch);
- FirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), false);
- ReverseLastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), true);
+ FirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), false);
+ ReverseLastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), true);
}
if (!LastPK) {
- const NArrow::TReplaceKey& maxRecord = PortionInfo->IndexKeyEnd();
+ const NArrow::TReplaceKey& maxRecord = PortionInfo.IndexKeyEnd();
auto batch = maxRecord.ToBatch(indexKey);
Y_VERIFY(batch);
- LastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), false);
- ReverseFirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), true);
+ LastPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), false);
+ ReverseFirstPK = std::make_shared<TSortableBatchPosition>(batch, 0, indexKey->field_names(), std::vector<std::string>(), true);
}
if (reverse) {
from = *ReverseFirstPK;
@@ -216,7 +216,7 @@ bool TBatch::CheckReadyForAssemble() {
auto& context = Owner->GetOwner();
auto processor = context.GetTasksProcessor();
if (auto assembleBatchTask = AssembleTask(processor.GetObject(), context.GetReadMetadata())) {
- processor.Add(context, assembleBatchTask);
+ processor.Add(context.GetOwner(), assembleBatchTask);
}
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
index fe5593efaf4..338a6107185 100644
--- a/ydb/core/tx/columnshard/engines/reader/batch.h
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -91,7 +91,7 @@ private:
YDB_READONLY_DEF(TBatchFetchedInfo, FetchedInfo);
THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Data;
TGranule* Owner;
- const TPortionInfo* PortionInfo = nullptr;
+ const TPortionInfo PortionInfo;
YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds);
std::set<ui32> AskedColumnIds;
@@ -117,7 +117,7 @@ public:
}
bool AllowEarlyFilter() const {
- return PortionInfo->AllowEarlyFilter();
+ return PortionInfo.AllowEarlyFilter();
}
const TBatchAddress& GetBatchAddress() const {
return BatchAddress;
@@ -131,7 +131,7 @@ public:
return GetUsefulBytes(FetchedBytes);
}
- TBatch(const TBatchAddress& address, TGranule& owner, const TPortionInfo& portionInfo, const ui64 predictedBatchSize);
+ TBatch(const TBatchAddress& address, TGranule& owner, TPortionInfo&& portionInfo, const ui64 predictedBatchSize);
bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData);
bool AskedColumnsAlready(const std::set<ui32>& columnIds) const;
@@ -155,7 +155,7 @@ public:
}
const TPortionInfo& GetPortionInfo() const {
- return *PortionInfo;
+ return PortionInfo;
}
};
}
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
index 86151582972..2116c747330 100644
--- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
@@ -12,7 +12,7 @@ bool IDataTasksProcessor::ITask::DoExecute() {
}
}
-bool IDataTasksProcessor::ITask::Apply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const {
+bool IDataTasksProcessor::ITask::Apply(NOlap::IDataReader& indexedDataRead) const {
if (OwnerOperator) {
OwnerOperator->ReplyReceived();
if (OwnerOperator->IsStopped()) {
@@ -42,12 +42,12 @@ bool IDataTasksProcessor::Add(ITask::TPtr task) {
}
-void TDataTasksProcessorContainer::Add(NOlap::NIndexedReader::TGranulesFillingContext& context, IDataTasksProcessor::ITask::TPtr task) {
+void TDataTasksProcessorContainer::Add(NOlap::IDataReader& reader, IDataTasksProcessor::ITask::TPtr task) {
if (Object) {
Object->Add(task);
} else {
task->Execute(nullptr);
- task->Apply(context);
+ task->Apply(reader);
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
index afe942fbe1c..c1fba77d407 100644
--- a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
@@ -2,8 +2,8 @@
#include <ydb/core/tx/conveyor/usage/abstract.h>
#include <ydb/library/accessor/accessor.h>
-namespace NKikimr::NOlap::NIndexedReader {
-class TGranulesFillingContext;
+namespace NKikimr::NOlap {
+class IDataReader;
}
namespace NKikimr::NColumnShard {
@@ -23,7 +23,7 @@ public:
bool DataProcessed = false;
protected:
TDataTasksProcessorContainer GetTasksProcessorContainer() const;
- virtual bool DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const = 0;
+ virtual bool DoApply(NOlap::IDataReader& indexedDataRead) const = 0;
virtual bool DoExecuteImpl() = 0;
virtual bool DoExecute() override final;
@@ -37,7 +37,7 @@ public:
using TPtr = std::shared_ptr<ITask>;
virtual ~ITask() = default;
- bool Apply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const;
+ bool Apply(NOlap::IDataReader& indexedDataRead) const;
bool IsDataProcessed() const noexcept {
return DataProcessed;
@@ -99,7 +99,7 @@ public:
return Object;
}
- void Add(NOlap::NIndexedReader::TGranulesFillingContext& context, IDataTasksProcessor::ITask::TPtr task);
+ void Add(NOlap::IDataReader& reader, IDataTasksProcessor::ITask::TPtr task);
};
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h
index e76f2f69937..04e6cd01e70 100644
--- a/ydb/core/tx/columnshard/engines/reader/filling_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h
@@ -32,6 +32,10 @@ private:
bool CheckBufferAvailable() const;
public:
+ TIndexedReadData& GetOwner() {
+ return Owner;
+ }
+
std::shared_ptr<TGranulesLiveControl> GetGranulesLiveContext() const {
return GranulesLiveContext;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
index c278bac23f8..972046bffac 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
@@ -54,11 +54,12 @@ bool TAssembleFilter::DoExecuteImpl() {
return true;
}
-bool TAssembleFilter::DoApply(TGranulesFillingContext& owner) const {
+bool TAssembleFilter::DoApply(IDataReader& owner) const {
Y_VERIFY(OriginalCount);
- owner.GetCounters().OriginalRowsCount->Add(OriginalCount);
- owner.GetCounters().AssembleFilterCount->Add(1);
- TBatch* batch = owner.GetBatchInfo(BatchAddress);
+ auto& reader = owner.GetMeAs<TIndexedReadData>();
+ reader.GetCounters().OriginalRowsCount->Add(OriginalCount);
+ reader.GetCounters().AssembleFilterCount->Add(1);
+ TBatch* batch = reader.GetGranulesContext().GetBatchInfo(BatchAddress);
if (batch) {
batch->InitFilter(Filter, FilteredBatch, OriginalCount, EarlyFilter);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
index 64232c4be1b..0422a62051d 100644
--- a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
@@ -25,7 +25,7 @@ namespace NKikimr::NOlap::NIndexedReader {
std::set<ui32> FilterColumnIds;
IOrderPolicy::TPtr BatchesOrderPolicy;
protected:
- virtual bool DoApply(TGranulesFillingContext& owner) const override;
+ virtual bool DoApply(IDataReader& owner) const override;
virtual bool DoExecuteImpl() override;
public:
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
index d20b7405cc2..ed8f5c4053f 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -38,7 +38,7 @@ void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::Reco
CheckReady();
}
-TBatch& TGranule::RegisterBatchForFetching(const TPortionInfo& portionInfo) {
+TBatch& TGranule::RegisterBatchForFetching(TPortionInfo&& portionInfo) {
const ui64 batchSize = portionInfo.GetRawBytes(Owner->GetReadMetadata()->GetAllColumns());
RawDataSize += batchSize;
const ui64 filtersSize = portionInfo.NumRows() * (8 + 8);
@@ -50,7 +50,7 @@ TBatch& TGranule::RegisterBatchForFetching(const TPortionInfo& portionInfo) {
Y_VERIFY(!ReadyFlag);
ui32 batchGranuleIdx = Batches.size();
WaitBatches.emplace(batchGranuleIdx);
- Batches.emplace_back(TBatchAddress(GranuleId, batchGranuleIdx), *this, portionInfo, batchSize);
+ Batches.emplace_back(TBatchAddress(GranuleId, batchGranuleIdx), *this, std::move(portionInfo), batchSize);
Y_VERIFY(GranuleBatchNumbers.emplace(batchGranuleIdx).second);
Owner->OnNewBatch(Batches.back());
return Batches.back();
@@ -149,7 +149,7 @@ void TGranule::CheckReady() {
ACFL_DEBUG("event", "granule_preparation")("predicted_size", RawDataSize)("real_size", RawDataSizeReal);
std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = std::move(RecordBatches);
auto processor = Owner->GetTasksProcessor();
- processor.Add(*Owner, std::make_shared<TTaskGranulePreparation>(std::move(inGranule), std::move(BatchesToDedup), GranuleId, Owner->GetReadMetadata(), processor.GetObject()));
+ processor.Add(Owner->GetOwner(), std::make_shared<TTaskGranulePreparation>(std::move(inGranule), std::move(BatchesToDedup), GranuleId, Owner->GetReadMetadata(), processor.GetObject()));
}
}
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
index 89e0ae19879..41de01f2bed 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -151,7 +151,7 @@ public:
void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
void OnBlobReady(const TBlobRange& range) noexcept;
bool OnFilterReady(TBatch& batchInfo);
- TBatch& RegisterBatchForFetching(const TPortionInfo& portionInfo);
+ TBatch& RegisterBatchForFetching(TPortionInfo&& portionInfo);
void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) const;
};
diff --git a/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp b/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
index db05b35257d..14ad92c4362 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/granule_preparation.cpp
@@ -1,4 +1,5 @@
#include "granule_preparation.h"
+#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
namespace NKikimr::NOlap::NIndexedReader {
@@ -69,8 +70,9 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TTaskGranulePreparation::Specia
return out;
}
-bool TTaskGranulePreparation::DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const {
- indexedDataRead.GetGranuleVerified(GranuleId)->OnGranuleDataPrepared(std::move(BatchesInGranule));
+bool TTaskGranulePreparation::DoApply(IDataReader& indexedDataRead) const {
+ auto& readData = indexedDataRead.GetMeAs<TIndexedReadData>();
+ readData.GetGranulesContext().GetGranuleVerified(GranuleId)->OnGranuleDataPrepared(std::move(BatchesInGranule));
return true;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule_preparation.h b/ydb/core/tx/columnshard/engines/reader/granule_preparation.h
index 7b893f89fbd..a7ff39f854b 100644
--- a/ydb/core/tx/columnshard/engines/reader/granule_preparation.h
+++ b/ydb/core/tx/columnshard/engines/reader/granule_preparation.h
@@ -24,7 +24,7 @@ private:
const THashSet<const void*>& batchesToDedup);
protected:
- virtual bool DoApply(NOlap::NIndexedReader::TGranulesFillingContext& indexedDataRead) const override;
+ virtual bool DoApply(IDataReader& indexedDataRead) const override;
virtual bool DoExecuteImpl() override;
public:
diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
index 22890630f81..8ae30314fa1 100644
--- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp
@@ -115,7 +115,7 @@ std::vector<TGranule::TPtr> TPKSortingWithLimit::DoDetachReadyGranules(TResultCo
TPKSortingWithLimit::TPKSortingWithLimit(TReadMetadata::TConstPtr readMetadata)
: TBase(readMetadata)
- , MergeStream(readMetadata->GetIndexInfo(readMetadata->GetSnapshot()).GetReplaceKey(), readMetadata->IsDescSorted())
+ , MergeStream(readMetadata->GetIndexInfo(readMetadata->GetSnapshot()).GetReplaceKey(), nullptr, readMetadata->IsDescSorted())
{
CurrentItemsLimit = ReadMetadata->Limit;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
index c48aa234633..b87bc406ef3 100644
--- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
@@ -22,8 +22,9 @@ bool TAssembleBatch::DoExecuteImpl() {
return true;
}
-bool TAssembleBatch::DoApply(TGranulesFillingContext& owner) const {
- TBatch* batch = owner.GetBatchInfo(BatchAddress);
+bool TAssembleBatch::DoApply(IDataReader& owner) const {
+ auto& reader = owner.GetMeAs<TIndexedReadData>();
+ TBatch* batch = reader.GetGranulesContext().GetBatchInfo(BatchAddress);
if (batch) {
batch->InitBatch(FullBatch);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h
index 4a2350c18a6..62e8d84bd41 100644
--- a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h
@@ -22,7 +22,7 @@ private:
const TBatchAddress BatchAddress;
protected:
- virtual bool DoApply(TGranulesFillingContext& owner) const override;
+ virtual bool DoApply(IDataReader& owner) const override;
virtual bool DoExecuteImpl() override;
public:
virtual TString GetTaskClassIdentifier() const override {
diff --git a/ydb/core/tx/columnshard/engines/reader/queue.cpp b/ydb/core/tx/columnshard/engines/reader/queue.cpp
index fc857a54291..e71f26a9d4c 100644
--- a/ydb/core/tx/columnshard/engines/reader/queue.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/queue.cpp
@@ -2,19 +2,4 @@
namespace NKikimr::NOlap {
-NKikimr::NOlap::TBlobRange TFetchBlobsQueue::pop_front() {
- if (!StoppedFlag && IteratorBlobsSequential.size()) {
- auto result = IteratorBlobsSequential.front();
- IteratorBlobsSequential.pop_front();
- return result.GetRange();
- } else {
- return TBlobRange();
- }
-}
-
-void TFetchBlobsQueue::emplace_back(const ui64 granuleId, const TBlobRange& range) {
- Y_VERIFY(!StoppedFlag);
- IteratorBlobsSequential.emplace_back(granuleId, range);
-}
-
}
diff --git a/ydb/core/tx/columnshard/engines/reader/queue.h b/ydb/core/tx/columnshard/engines/reader/queue.h
index 7c5415000af..c3f0ee8b13f 100644
--- a/ydb/core/tx/columnshard/engines/reader/queue.h
+++ b/ydb/core/tx/columnshard/engines/reader/queue.h
@@ -6,31 +6,32 @@ namespace NKikimr::NOlap {
class TBatchBlobRange {
private:
- const ui64 GranuleId;
+ const ui64 ObjectId;
const TBlobRange Range;
public:
- ui64 GetGranuleId() const {
- return GranuleId;
+ ui64 GetObjectId() const {
+ return ObjectId;
}
const TBlobRange& GetRange() const {
return Range;
}
- TBatchBlobRange(const ui64 granuleId, const TBlobRange range)
- : GranuleId(granuleId)
+ TBatchBlobRange(const ui64 objectId, const TBlobRange range)
+ : ObjectId(objectId)
, Range(range)
{
-
+ Y_VERIFY(range.BlobId.IsValid());
}
};
-class TFetchBlobsQueue {
+template <class TFetchTask>
+class TFetchBlobsQueueImpl {
private:
bool StoppedFlag = false;
- std::deque<TBatchBlobRange> IteratorBlobsSequential;
+ std::deque<TFetchTask> IteratorBlobsSequential;
public:
- const std::deque<TBatchBlobRange>& GetIteratorBlobsSequential() const noexcept {
+ const std::deque<TFetchTask>& GetIteratorBlobsSequential() const noexcept {
return IteratorBlobsSequential;
}
@@ -51,15 +52,30 @@ public:
return IteratorBlobsSequential.size();
}
- const TBatchBlobRange* front() const {
+ const TFetchTask* front() const {
if (!IteratorBlobsSequential.size()) {
return nullptr;
}
return &IteratorBlobsSequential.front();
}
- TBlobRange pop_front();
- void emplace_back(const ui64 granuleId, const TBlobRange& range);
+ std::optional<TBlobRange> pop_front() {
+ if (!StoppedFlag && IteratorBlobsSequential.size()) {
+ auto result = IteratorBlobsSequential.front();
+ IteratorBlobsSequential.pop_front();
+ return result.GetRange();
+ } else {
+ return {};
+ }
+ }
+
+ void emplace_back(const ui64 objectId, const TBlobRange& range) {
+ Y_VERIFY(!StoppedFlag);
+ IteratorBlobsSequential.emplace_back(objectId, range);
+ }
+
};
+using TFetchBlobsQueue = TFetchBlobsQueueImpl<TBatchBlobRange>;
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.cpp b/ydb/core/tx/columnshard/engines/reader/read_context.cpp
index 33f87457f19..a57a2f82a77 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_context.cpp
@@ -1,14 +1,16 @@
#include "read_context.h"
+#include "read_metadata.h"
#include <library/cpp/actors/core/events.h>
namespace NKikimr::NOlap {
TReadContext::TReadContext(const NColumnShard::TDataTasksProcessorContainer& processor,
const NColumnShard::TConcreteScanCounters& counters,
- std::shared_ptr<NOlap::TActorBasedMemoryAccesor> memoryAccessor)
+ std::shared_ptr<NOlap::TActorBasedMemoryAccesor> memoryAccessor, const bool isInternalRead)
: Processor(processor)
, Counters(counters)
, MemoryAccessor(memoryAccessor)
+ , IsInternalRead(isInternalRead)
{
}
@@ -17,4 +19,13 @@ void TActorBasedMemoryAccesor::DoOnBufferReady() {
OwnerId.Send(OwnerId, new NActors::TEvents::TEvWakeup(1));
}
+
+IDataReader::IDataReader(const TReadContext& context, NOlap::TReadMetadata::TConstPtr readMetadata)
+ : Context(context)
+ , ReadMetadata(readMetadata)
+{
+ Y_VERIFY(ReadMetadata);
+ Y_VERIFY(ReadMetadata->SelectInfo);
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h
index 5ab203f7981..360f2050a95 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_context.h
@@ -1,11 +1,15 @@
#pragma once
#include "conveyor_task.h"
+#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/columnshard__scan.h>
#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/tx/columnshard/resources/memory.h>
#include <library/cpp/actors/core/actor.h>
namespace NKikimr::NOlap {
+struct TReadMetadata;
+
class TActorBasedMemoryAccesor: public TScanMemoryLimiter::IMemoryAccessor {
private:
using TBase = TScanMemoryLimiter::IMemoryAccessor;
@@ -25,6 +29,7 @@ private:
YDB_ACCESSOR_DEF(NColumnShard::TDataTasksProcessorContainer, Processor);
const NColumnShard::TConcreteScanCounters Counters;
YDB_READONLY_DEF(std::shared_ptr<NOlap::TActorBasedMemoryAccesor>, MemoryAccessor);
+ YDB_READONLY(bool, IsInternalRead, false);
public:
const NColumnShard::TConcreteScanCounters& GetCounters() const {
return Counters;
@@ -32,14 +37,96 @@ public:
TReadContext(const NColumnShard::TDataTasksProcessorContainer& processor,
const NColumnShard::TConcreteScanCounters& counters,
- std::shared_ptr<NOlap::TActorBasedMemoryAccesor> memoryAccessor
+ std::shared_ptr<NOlap::TActorBasedMemoryAccesor> memoryAccessor, const bool isInternalRead
);
- TReadContext(const NColumnShard::TConcreteScanCounters& counters)
+ TReadContext(const NColumnShard::TConcreteScanCounters& counters, const bool isInternalRead)
: Counters(counters)
+ , IsInternalRead(isInternalRead)
{
}
};
+class IDataReader {
+protected:
+ TReadContext Context;
+ std::shared_ptr<const TReadMetadata> ReadMetadata;
+ virtual void DoAddData(const TBlobRange& blobRange, const TString& data) = 0;
+ virtual std::optional<TBlobRange> DoExtractNextBlob(const bool hasReadyResults) = 0;
+ virtual TString DoDebugString() const = 0;
+ virtual void DoAbort() = 0;
+ virtual bool DoIsFinished() const = 0;
+ virtual std::vector<TPartialReadResult> DoExtractReadyResults(const int64_t maxRowsInBatch) = 0;
+public:
+ IDataReader(const TReadContext& context, std::shared_ptr<const TReadMetadata> readMetadata);
+ virtual ~IDataReader() = default;
+
+ const std::shared_ptr<const TReadMetadata>& GetReadMetadata() const {
+ return ReadMetadata;
+ }
+
+ const TReadContext& GetContext() const {
+ return Context;
+ }
+
+ TReadContext& GetContext() {
+ return Context;
+ }
+
+ const std::shared_ptr<TActorBasedMemoryAccesor>& GetMemoryAccessor() const {
+ return Context.GetMemoryAccessor();
+ }
+
+ const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept {
+ return Context.GetCounters();
+ }
+
+ const NColumnShard::TDataTasksProcessorContainer& GetTasksProcessor() const noexcept {
+ return Context.GetProcessor();
+ }
+
+ void Abort() {
+ return DoAbort();
+ }
+
+ template <class T>
+ T& GetMeAs() {
+ auto result = dynamic_cast<T*>(this);
+ Y_VERIFY(result);
+ return *result;
+ }
+
+ template <class T>
+ const T& GetMeAs() const {
+ auto result = dynamic_cast<const T*>(this);
+ Y_VERIFY(result);
+ return *result;
+ }
+
+ std::vector<TPartialReadResult> ExtractReadyResults(const int64_t maxRowsInBatch) {
+ return DoExtractReadyResults(maxRowsInBatch);
+ }
+
+ bool IsFinished() const {
+ return DoIsFinished();
+ }
+
+ TString DebugString() const {
+ TStringBuilder sb;
+ sb << "internal:" << Context.GetIsInternalRead() << ";"
+ << "has_buffer:" << (GetMemoryAccessor() ? GetMemoryAccessor()->HasBuffer() : true) << ";"
+ ;
+ sb << DoDebugString();
+ return sb;
+ }
+
+ void AddData(const TBlobRange& blobRange, const TString& data) {
+ DoAddData(blobRange, data);
+ }
+ std::optional<TBlobRange> ExtractNextBlob(const bool hasReadyResults) {
+ return DoExtractNextBlob(hasReadyResults);
+ }
+};
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
index 9acaa5bf9a8..ef61d957d83 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp
@@ -3,32 +3,13 @@
namespace NKikimr::NOlap::NIndexedReader {
-bool TSortableBatchPosition::IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const {
- if (Fields.size() != (size_t)schema->num_fields()) {
- return false;
- }
- for (ui32 i = 0; i < Fields.size(); ++i) {
- if (Fields[i]->type() != schema->field(i)->type()) {
- return false;
- }
- if (Fields[i]->name() != schema->field(i)->name()) {
- return false;
- }
- }
- return true;
-}
-
NJson::TJsonValue TSortableBatchPosition::DebugJson() const {
NJson::TJsonValue result;
result["reverse"] = ReverseSort;
result["records_count"] = RecordsCount;
result["position"] = Position;
- Y_VERIFY(Columns.size() == Fields.size());
- for (ui32 i = 0; i < Columns.size(); ++i) {
- auto& jsonColumn = result["columns"].AppendValue(NJson::JSON_MAP);
- jsonColumn["name"] = Fields[i]->name();
- jsonColumn["value"] = NArrow::DebugString(Columns[i], Position);
- }
+ result["sorting"] = Sorting.DebugJson(Position);
+ result["data"] = Data.DebugJson(Position);
return result;
}
@@ -105,7 +86,8 @@ std::optional<ui64> TSortableBatchPosition::FindPosition(std::shared_ptr<arrow::
void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition> point) {
Y_VERIFY(point);
- Y_VERIFY(point->IsSameSchema(SortSchema));
+ Y_VERIFY(point->IsSameSortingSchema(SortSchema));
+ Y_VERIFY(point->IsReverseSort() == Reverse);
Y_VERIFY(++ControlPoints == 1);
SortHeap.emplace_back(TBatchIterator(*point));
@@ -133,11 +115,11 @@ void TMergePartialStream::AddPoolSource(const std::optional<ui32> poolId, std::s
void TMergePartialStream::AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap) {
if (!filter || filter->IsTotalAllowFilter()) {
- SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema->field_names(), Reverse, poolId));
+ SortHeap.emplace_back(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, poolId));
} else if (filter->IsTotalDenyFilter()) {
return;
} else {
- SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema->field_names(), Reverse, poolId));
+ SortHeap.emplace_back(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, poolId));
}
if (restoreHeap) {
std::push_heap(SortHeap.begin(), SortHeap.end());
@@ -152,11 +134,24 @@ void TMergePartialStream::RemoveControlPoint() {
SortHeap.pop_back();
}
-bool TMergePartialStream::DrainCurrent() {
+bool TMergePartialStream::DrainCurrent(std::shared_ptr<TRecordBatchBuilder> builder, const std::optional<TSortableBatchPosition>& readTo, const bool includeFinish) {
if (SortHeap.empty()) {
return false;
}
while (SortHeap.size()) {
+ if (readTo) {
+ auto position = TBatchIterator::TPosition(SortHeap.front());
+ if (includeFinish) {
+ if (position.GetKeyColumns().Compare(*readTo) == std::partial_ordering::greater) {
+ return true;
+ }
+ } else {
+ if (position.GetKeyColumns().Compare(*readTo) != std::partial_ordering::less) {
+ return true;
+ }
+ }
+ }
+
auto currentPosition = DrainCurrentPosition();
if (currentPosition.IsControlPoint()) {
return false;
@@ -165,10 +160,15 @@ bool TMergePartialStream::DrainCurrent() {
continue;
}
if (CurrentKeyColumns) {
- Y_VERIFY(CurrentKeyColumns->Compare(currentPosition.GetKeyColumns()) != std::partial_ordering::greater);
+ AFL_VERIFY(CurrentKeyColumns->Compare(currentPosition.GetKeyColumns()) == std::partial_ordering::less)("merge_debug", DebugJson());
}
CurrentKeyColumns = currentPosition.GetKeyColumns();
- return true;
+ if (builder) {
+ builder->AddRecord(*CurrentKeyColumns);
+ }
+ if (!readTo) {
+ return true;
+ }
}
return false;
}
@@ -185,4 +185,24 @@ NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const {
return result;
}
+TSortableScanData::TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns) {
+ for (auto&& i : columns) {
+ auto c = batch->GetColumnByName(i);
+ AFL_VERIFY(c)("column_name", i)("columns", JoinSeq(",", columns));
+ Columns.emplace_back(c);
+ auto f = batch->schema()->GetFieldByName(i);
+ Fields.emplace_back(f);
+ }
+}
+
+void TRecordBatchBuilder::AddRecord(const TSortableBatchPosition& position) {
+ Y_VERIFY(position.GetData().GetColumns().size() == Builders.size());
+ Y_VERIFY_DEBUG(IsSameFieldsSequence(position.GetData().GetFields(), Fields));
+// AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "record_add_on_read")("record", position.DebugJson());
+ for (ui32 i = 0; i < position.GetData().GetColumns().size(); ++i) {
+ NArrow::Append(*Builders[i], *position.GetData().GetColumns()[i], position.GetPosition());
+ }
+ ++RecordsCount;
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
index c3d3f4f47c6..211beadf74f 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_filter_merger.h
@@ -3,35 +3,89 @@
#include <ydb/core/formats/arrow/arrow_filter.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/tx/columnshard/engines/index_info.h>
+#include <ydb/core/formats/arrow/switch/switch_type.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
#include <util/generic/hash.h>
+#include <util/string/join.h>
#include <set>
namespace NKikimr::NOlap::NIndexedReader {
+class TRecordBatchBuilder;
+
+class TSortableScanData {
+private:
+ YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Array>>, Columns);
+ YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::Field>>, Fields);
+public:
+ TSortableScanData() = default;
+ TSortableScanData(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<std::string>& columns);
+
+ bool IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const {
+ if (Fields.size() != (size_t)schema->num_fields()) {
+ return false;
+ }
+ for (ui32 i = 0; i < Fields.size(); ++i) {
+ if (Fields[i]->type() != schema->field(i)->type()) {
+ return false;
+ }
+ if (Fields[i]->name() != schema->field(i)->name()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ NJson::TJsonValue DebugJson(const i32 position) const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ for (ui32 i = 0; i < Columns.size(); ++i) {
+ auto& jsonColumn = result["sorting_columns"].AppendValue(NJson::JSON_MAP);
+ jsonColumn["name"] = Fields[i]->name();
+ if (position >= 0 && position < Columns[i]->length()) {
+ jsonColumn["value"] = NArrow::DebugString(Columns[i], position);
+ }
+ }
+ return result;
+ }
+
+ std::vector<std::string> GetFieldNames() const {
+ std::vector<std::string> result;
+ for (auto&& i : Fields) {
+ result.emplace_back(i->name());
+ }
+ return result;
+ }
+};
+
class TSortableBatchPosition {
protected:
- i64 Position = 0;
+
+ YDB_READONLY(i64, Position, 0);
i64 RecordsCount = 0;
bool ReverseSort = false;
- std::vector<std::shared_ptr<arrow::Array>> Columns;
- std::vector<std::shared_ptr<arrow::Field>> Fields;
+ TSortableScanData Sorting;
+ TSortableScanData Data;
std::shared_ptr<arrow::RecordBatch> Batch;
static std::optional<ui64> FindPosition(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& forFound, const bool needGreater, const bool include);
public:
TSortableBatchPosition() = default;
- NJson::TJsonValue DebugJson() const;
+ const TSortableScanData& GetData() const {
+ return Data;
+ }
- bool IsSameSchema(const std::shared_ptr<arrow::Schema> schema) const;
+ bool IsReverseSort() const {
+ return ReverseSort;
+ }
+ NJson::TJsonValue DebugJson() const;
TSortableBatchPosition BuildSame(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position) const {
- std::vector<std::string> fieldNames;
- for (auto&& i : Fields) {
- fieldNames.emplace_back(i->name());
- }
- return TSortableBatchPosition(batch, position, fieldNames, ReverseSort);
+ return TSortableBatchPosition(batch, position, Sorting.GetFieldNames(), Data.GetFieldNames(), ReverseSort);
+ }
+
+ bool IsSameSortingSchema(const std::shared_ptr<arrow::Schema>& schema) {
+ return Sorting.IsSameSchema(schema);
}
static std::shared_ptr<arrow::RecordBatch> SelectInterval(std::shared_ptr<arrow::RecordBatch> batch, const TSortableBatchPosition& from, const TSortableBatchPosition& to, const bool includeFrom, const bool includeTo) {
@@ -47,28 +101,23 @@ public:
return batch->Slice(*idxFrom, *idxTo - *idxFrom + 1);
}
- TSortableBatchPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position, const std::vector<std::string>& columns, const bool reverseSort)
+ TSortableBatchPosition(std::shared_ptr<arrow::RecordBatch> batch, const ui32 position, const std::vector<std::string>& sortingColumns, const std::vector<std::string>& dataColumns, const bool reverseSort)
: Position(position)
, RecordsCount(batch->num_rows())
, ReverseSort(reverseSort)
+ , Sorting(batch, sortingColumns)
+ , Data(batch, dataColumns)
, Batch(batch)
{
Y_VERIFY(batch->num_rows());
Y_VERIFY_DEBUG(batch->ValidateFull().ok());
- for (auto&& i : columns) {
- auto c = batch->GetColumnByName(i);
- Y_VERIFY(c);
- Columns.emplace_back(c);
- auto f = batch->schema()->GetFieldByName(i);
- Fields.emplace_back(f);
- }
- Y_VERIFY(Columns.size());
+ Y_VERIFY(Sorting.GetColumns().size());
}
std::partial_ordering Compare(const TSortableBatchPosition& item) const {
Y_VERIFY(item.ReverseSort == ReverseSort);
- Y_VERIFY_DEBUG(item.Columns.size() == Columns.size());
- const auto directResult = NArrow::ColumnsCompare(Columns, Position, item.Columns, item.Position);
+ Y_VERIFY(item.Sorting.GetColumns().size() == Sorting.GetColumns().size());
+ const auto directResult = NArrow::ColumnsCompare(Sorting.GetColumns(), Position, item.Sorting.GetColumns(), item.Position);
if (ReverseSort) {
if (directResult == std::partial_ordering::less) {
return std::partial_ordering::greater;
@@ -86,6 +135,14 @@ public:
return Compare(item) == std::partial_ordering::less;
}
+ bool operator==(const TSortableBatchPosition& item) const {
+ return Compare(item) == std::partial_ordering::equivalent;
+ }
+
+ bool operator!=(const TSortableBatchPosition& item) const {
+ return Compare(item) != std::partial_ordering::equivalent;
+ }
+
bool NextPosition(const i64 delta) {
return InitPosition(Position + delta);
}
@@ -144,10 +201,10 @@ private:
}
TBatchIterator(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter,
- const std::vector<std::string>& keyColumns, const bool reverseSort, const std::optional<ui32> poolId)
+ const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort, const std::optional<ui32> poolId)
: ControlPointFlag(false)
- , KeyColumns(batch, 0, keyColumns, reverseSort)
- , VersionColumns(batch, 0, TIndexInfo::GetSpecialColumnNames(), false)
+ , KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort)
+ , VersionColumns(batch, 0, TIndexInfo::GetSpecialColumnNames(), {}, false)
, RecordsCount(batch->num_rows())
, ReverseSortKff(reverseSort ? -1 : 1)
, PoolId(poolId)
@@ -205,7 +262,7 @@ private:
if (!FilterIterator) {
return false;
}
- return FilterIterator->GetCurrentAcceptance();
+ return !FilterIterator->GetCurrentAcceptance();
}
bool Next() {
@@ -248,6 +305,17 @@ private:
}
};
+ NJson::TJsonValue DebugJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ if (CurrentKeyColumns) {
+ result["current"] = CurrentKeyColumns->DebugJson();
+ }
+ for (auto&& i : SortHeap) {
+ result["heap"].AppendValue(i.DebugJson());
+ }
+ return result;
+ }
+
bool NextInHeap(const bool needPop) {
if (SortHeap.empty()) {
return false;
@@ -280,6 +348,7 @@ private:
THashMap<ui32, std::deque<TIteratorData>> BatchPools;
std::vector<TBatchIterator> SortHeap;
std::shared_ptr<arrow::Schema> SortSchema;
+ std::shared_ptr<arrow::Schema> DataSchema;
const bool Reverse;
ui32 ControlPoints = 0;
@@ -303,10 +372,13 @@ private:
void AddNewToHeap(const std::optional<ui32> poolId, std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter, const bool restoreHeap);
public:
- TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, const bool reverse)
+ TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse)
: SortSchema(sortSchema)
+ , DataSchema(dataSchema)
, Reverse(reverse) {
+ Y_VERIFY(SortSchema);
Y_VERIFY(SortSchema->num_fields());
+ Y_VERIFY(!DataSchema || DataSchema->num_fields());
}
bool IsValid() const {
@@ -321,6 +393,10 @@ public:
return it->second.size();
}
+ const std::optional<TSortableBatchPosition>& GetCurrentKeyColumns() const {
+ return CurrentKeyColumns;
+ }
+
void PutControlPoint(std::shared_ptr<TSortableBatchPosition> point);
void RemoveControlPoint();
@@ -335,8 +411,46 @@ public:
return SortHeap.empty();
}
- bool DrainCurrent();
+ bool DrainCurrent(std::shared_ptr<TRecordBatchBuilder> builder = nullptr, const std::optional<TSortableBatchPosition>& readTo = {}, const bool includeFinish = false);
};
+class TRecordBatchBuilder {
+private:
+ std::vector<std::unique_ptr<arrow::ArrayBuilder>> Builders;
+ std::vector<std::shared_ptr<arrow::Field>> Fields;
+ YDB_READONLY(ui32, RecordsCount, 0);
+
+ bool IsSameFieldsSequence(const std::vector<std::shared_ptr<arrow::Field>>& f1, const std::vector<std::shared_ptr<arrow::Field>>& f2) {
+ if (f1.size() != f2.size()) {
+ return false;
+ }
+ for (ui32 i = 0; i < f1.size(); ++i) {
+ if (!f1[i]->Equals(f2[i])) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+public:
+ TRecordBatchBuilder(const std::vector<std::shared_ptr<arrow::Field>>& fields)
+ : Fields(fields) {
+ Y_VERIFY(Fields.size());
+ for (auto&& f : fields) {
+ Builders.emplace_back(NArrow::MakeBuilder(f));
+ }
+ }
+
+ std::shared_ptr<arrow::RecordBatch> Finalize() {
+ auto schema = std::make_shared<arrow::Schema>(Fields);
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ for (auto&& i : Builders) {
+ columns.emplace_back(NArrow::TStatusValidator::GetValid(i->Finish()));
+ }
+ return arrow::RecordBatch::Make(schema, columns.front()->length(), columns);
+ }
+
+ void AddRecord(const TSortableBatchPosition& position);
+};
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
index c8c8317dbee..332f960e4c3 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp
@@ -225,4 +225,16 @@ std::shared_ptr<NIndexedReader::IOrderPolicy> TReadMetadata::BuildSortingPolicy(
return result;
}
+std::shared_ptr<NKikimr::NOlap::IDataReader> TReadMetadata::BuildReader(const NOlap::TReadContext& context, const TConstPtr& self) const {
+// return std::make_shared<NPlainReader::TPlainReadData>(self, context);
+ auto result = std::make_shared<TIndexedReadData>(self, context);
+ result->InitRead();
+ return result;
+}
+
+NIndexedReader::TSortableBatchPosition TReadMetadata::BuildSortedPosition(const NArrow::TReplaceKey& key) const {
+ return NIndexedReader::TSortableBatchPosition(key.ToBatch(GetReplaceKey()), 0,
+ GetReplaceKey()->field_names(), {}, IsDescSorted());
+}
+
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
index cd582350cb8..1aa23d03664 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h
@@ -144,6 +144,9 @@ private:
public:
using TConstPtr = std::shared_ptr<const TReadMetadata>;
+ NIndexedReader::TSortableBatchPosition BuildSortedPosition(const NArrow::TReplaceKey& key) const;
+ std::shared_ptr<IDataReader> BuildReader(const NOlap::TReadContext& context, const TConstPtr& self) const;
+
const std::vector<ui32>& GetAllColumns() const {
return AllColumns;
}
@@ -225,7 +228,7 @@ public:
bool Empty() const {
Y_VERIFY(SelectInfo);
- return SelectInfo->Portions.empty() && CommittedBlobs.empty();
+ return SelectInfo->PortionsOrderedPK.empty() && CommittedBlobs.empty();
}
std::shared_ptr<arrow::Schema> GetSortingKey() const {
@@ -253,9 +256,9 @@ public:
return ResultIndexSchema->GetIndexInfo().GetPrimaryKey();
}
- size_t NumIndexedRecords() const {
+ size_t NumIndexedChunks() const {
Y_VERIFY(SelectInfo);
- return SelectInfo->NumRecords();
+ return SelectInfo->NumChunks();
}
size_t NumIndexedBlobs() const {
@@ -267,7 +270,7 @@ public:
void Dump(IOutputStream& out) const override {
out << "columns: " << GetSchemaColumnsCount()
- << " index records: " << NumIndexedRecords()
+ << " index chunks: " << NumIndexedChunks()
<< " index blobs: " << NumIndexedBlobs()
<< " committed blobs: " << CommittedBlobs.size()
// << " with program steps: " << (Program ? Program->Steps.size() : 0)
diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
index edeb20565bc..8d0ebfccf8e 100644
--- a/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp
@@ -28,7 +28,7 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnaps
auto& resultField = resultArrowSchema->fields()[i];
auto columnId = GetIndexInfo().GetColumnId(resultField->name());
auto oldColumnIndex = dataSchema.GetFieldIndex(columnId);
- if (oldColumnIndex >= 0) { // ClumnExists
+ if (oldColumnIndex >= 0) { // ColumnExists
auto oldColumnInfo = dataSchema.GetFieldByIndex(oldColumnIndex);
Y_VERIFY(oldColumnInfo);
auto columnData = batch->GetColumnByName(oldColumnInfo->name());
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.cpp b/ydb/core/tx/columnshard/engines/storage/granule.cpp
index 96efd36cf5d..e82dc955bb9 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/granule.cpp
@@ -45,7 +45,7 @@ void TGranuleMeta::UpsertPortion(const TPortionInfo& info) {
it = Portions.emplace(portionNew->GetPortion(), portionNew).first;
} else {
OnBeforeChangePortion(it->second);
- *it->second = info;
+ it->second = std::make_shared<TPortionInfo>(info);
}
OnAfterChangePortion(it->second);
}
diff --git a/ydb/core/tx/columnshard/engines/storage/granule.h b/ydb/core/tx/columnshard/engines/storage/granule.h
index d00e16b498d..8f7dae1afe4 100644
--- a/ydb/core/tx/columnshard/engines/storage/granule.h
+++ b/ydb/core/tx/columnshard/engines/storage/granule.h
@@ -300,6 +300,20 @@ private:
void OnAfterChangePortion(const std::shared_ptr<TPortionInfo> portionAfter);
void OnAdditiveSummaryChange() const;
public:
+ std::vector<std::shared_ptr<TPortionInfo>> GroupOrderedPortionsByPK(const TSnapshot& snapshot) const {
+ std::vector<std::shared_ptr<TPortionInfo>> portions;
+ for (auto&& i : Portions) {
+ if (i.second->IsVisible(snapshot)) {
+ portions.emplace_back(i.second);
+ }
+ }
+ const auto pred = [](const std::shared_ptr<TPortionInfo>& l, const std::shared_ptr<TPortionInfo>& r) {
+ return l->IndexKeyStart() < r->IndexKeyStart();
+ };
+ std::sort(portions.begin(), portions.end(), pred);
+ return portions;
+ }
+
NOlap::TSerializationStats BuildSerializationStats(ISnapshotSchema::TPtr schema) const {
NOlap::TSerializationStats result;
for (auto&& i : GetHardSummary().GetColumnIdsSortedBySizeDescending()) {
diff --git a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
index c5284b9432f..b7492e49b36 100644
--- a/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
+++ b/ydb/core/tx/columnshard/engines/ut_logs_engine.cpp
@@ -447,33 +447,29 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
ui64 planStep = 1;
ui64 txId = 0;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false));
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 0);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
}
{ // select from snap between insert (greater txId)
ui64 planStep = 1;
ui64 txId = 2;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), columnIds, NOlap::TPKRangesFilter(false));
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 0);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
}
{ // select from snap after insert (greater planStep)
ui64 planStep = 2;
ui64 txId = 1;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 1);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions[0].NumRecords(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK[0]->NumChunks(), columnIds.size() + TIndexInfo::GetSpecialColumnNames().size());
}
{ // select another pathId
ui64 planStep = 2;
ui64 txId = 1;
auto selectInfo = engine.Select(paths[1], TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 0);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
}
}
@@ -529,8 +525,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// compact
planStep = 2;
- bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4});
- UNIT_ASSERT(ok);
+// bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4});
+// UNIT_ASSERT(ok);
// read
@@ -544,8 +540,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
}
// predicates
@@ -559,21 +554,19 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
NOlap::TPKRangesFilter pkFilter(false);
Y_VERIFY(pkFilter.Add(gt10k, nullptr, nullptr));
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, pkFilter);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
}
{
ui64 txId = 1;
- std::shared_ptr<TPredicate> lt10k = MakePredicate(9999, NArrow::EOperation::Less); // TODO: better border checks
+ std::shared_ptr<TPredicate> lt10k = MakePredicate(8999, NArrow::EOperation::Less); // TODO: better border checks
if (key[0].second == TTypeInfo(NTypeIds::Utf8)) {
- lt10k = MakeStrPredicate("09999", NArrow::EOperation::Less);
+ lt10k = MakeStrPredicate("08999", NArrow::EOperation::Less);
}
NOlap::TPKRangesFilter pkFilter(false);
Y_VERIFY(pkFilter.Add(nullptr, lt10k, nullptr));
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, pkFilter);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 9);
}
}
@@ -710,8 +703,8 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
// compact
planStep = 2;
- bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4});
- UNIT_ASSERT(ok);
+// bool ok = Compact(engine, db, TSnapshot(planStep, 1), std::move(blobs), step, {20, 4, 4});
+// UNIT_ASSERT(ok);
// read
planStep = 3;
@@ -722,18 +715,16 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
}
// Cleanup
- Cleanup(engine, db, TSnapshot(planStep, 1), 20);
+ Cleanup(engine, db, TSnapshot(planStep, 1), 0);
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 4);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 4);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
}
// TTL
@@ -742,26 +733,24 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
NOlap::TTiering tiering;
tiering.Ttl = NOlap::TTierInfo::MakeTtl(TDuration::MicroSeconds(TInstant::Now().MicroSeconds() - 10000), "timestamp");
pathTtls.emplace(pathId, std::move(tiering));
- Ttl(engine, db, pathTtls, 2);
+ Ttl(engine, db, pathTtls, 10);
// read + load + read
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
}
// load
engine.Load(db, lostBlobs);
- UNIT_ASSERT_VALUES_EQUAL(engine.GetTotalStats().EmptyGranules, 1);
+ UNIT_ASSERT_VALUES_EQUAL(engine.GetTotalStats().EmptyGranules, 0);
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), oneColumnId, NOlap::TPKRangesFilter(false));
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 2);
- UNIT_ASSERT_VALUES_EQUAL(selectInfo->Granules.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
}
}
}
diff --git a/ydb/core/tx/columnshard/inflight_request_tracker.h b/ydb/core/tx/columnshard/inflight_request_tracker.h
index 8d33977ca19..6edf647be93 100644
--- a/ydb/core/tx/columnshard/inflight_request_tracker.h
+++ b/ydb/core/tx/columnshard/inflight_request_tracker.h
@@ -53,8 +53,8 @@ public:
continue;
}
- for (const auto& portion : readMeta->SelectInfo->Portions) {
- const ui64 portionId = portion.GetPortion();
+ for (const auto& portion : readMeta->SelectInfo->PortionsOrderedPK) {
+ const ui64 portionId = portion->GetPortion();
auto it = PortionUseCount.find(portionId);
Y_VERIFY(it != PortionUseCount.end(), "Portion id %" PRIu64 " not found in request %" PRIu64, portionId, cookie);
if (it->second == 1) {
@@ -62,7 +62,7 @@ public:
} else {
it->second--;
}
- for (auto& rec : portion.Records) {
+ for (auto& rec : portion->Records) {
if (blobTracker.SetBlobInUse(rec.BlobRange.BlobId, false)) {
freedBlobs.emplace(rec.BlobRange.BlobId);
}
@@ -105,10 +105,10 @@ private:
Y_VERIFY(selectInfo);
SelectStatsDelta += selectInfo->Stats();
- for (const auto& portion : readMeta->SelectInfo->Portions) {
- const ui64 portionId = portion.GetPortion();
+ for (const auto& portion : readMeta->SelectInfo->PortionsOrderedPK) {
+ const ui64 portionId = portion->GetPortion();
PortionUseCount[portionId]++;
- for (auto& rec : portion.Records) {
+ for (auto& rec : portion->Records) {
blobTracker.SetBlobInUse(rec.BlobRange.BlobId, true);
}
}
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index fb5d78cacab..d4c88590ff0 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -9,13 +9,18 @@ namespace {
class TReadActor : public TActorBootstrapped<TReadActor> {
private:
void BuildResult(const TActorContext& ctx) {
- auto ready = IndexedData.GetReadyResults(Max<i64>());
+ auto ready = IndexedData->ExtractReadyResults(Max<i64>());
LOG_S_TRACE("Ready results with " << ready.size() << " batches at tablet " << TabletId << " (read)");
-
- size_t next = 1;
- for (auto it = ready.begin(); it != ready.end(); ++it, ++next) {
- const bool lastOne = IndexedData.IsFinished() && (next == ready.size());
- SendResult(ctx, it->GetResultBatch(), lastOne);
+ if (ready.empty()) {
+ if (IndexedData->IsFinished()) {
+ SendResult(ctx, nullptr, true);
+ }
+ } else {
+ size_t next = 1;
+ for (auto it = ready.begin(); it != ready.end(); ++it, ++next) {
+ const bool lastOne = IndexedData->IsFinished() && (next == ready.size());
+ SendResult(ctx, it->GetResultBatch(), lastOne);
+ }
}
}
public:
@@ -35,11 +40,11 @@ public:
, BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId())
, Result(std::move(event))
, ReadMetadata(readMetadata)
- , IndexedData(ReadMetadata, true, NOlap::TReadContext(counters))
, Deadline(deadline)
, ColumnShardActorId(columnShardActorId)
, RequestCookie(requestCookie)
, ReturnedBatchNo(0)
+ , Counters(counters)
{}
void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev, const TActorContext& ctx) {
@@ -58,7 +63,7 @@ public:
Y_VERIFY(event.Data.size() == event.BlobRange.Size, "%zu, %d", event.Data.size(), event.BlobRange.Size);
- IndexedData.AddData(event.BlobRange, event.Data);
+ IndexedData->AddData(event.BlobRange, event.Data);
BuildResult(ctx);
DieFinished(ctx);
@@ -75,7 +80,7 @@ public:
void SendErrorResult(const TActorContext& ctx, NKikimrTxColumnShard::EResultStatus status) {
Y_VERIFY(status != NKikimrTxColumnShard::EResultStatus::SUCCESS);
SendResult(ctx, {}, true, status);
- IndexedData.Abort();
+ IndexedData->Abort();
}
void SendResult(const TActorContext& ctx, const std::shared_ptr<arrow::RecordBatch>& batch, bool finished = false,
@@ -84,16 +89,15 @@ public:
auto& proto = Proto(chunkEvent.get());
TString data;
- if (batch) {
+ if (batch && batch->num_rows()) {
data = NArrow::SerializeBatchNoCompression(batch);
auto metadata = proto.MutableMeta();
metadata->SetFormat(NKikimrTxColumnShard::FORMAT_ARROW);
metadata->SetSchema(GetSerializedSchema(batch));
- }
-
- if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
- Y_VERIFY(!data.empty());
+ if (status == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
+ Y_VERIFY(!data.empty());
+ }
}
proto.SetBatch(ReturnedBatchNo);
@@ -132,7 +136,7 @@ public:
}
void DieFinished(const TActorContext& ctx) {
- if (IndexedData.IsFinished()) {
+ if (IndexedData->IsFinished()) {
LOG_S_DEBUG("Finished read (with " << ReturnedBatchNo << " batches sent) at tablet " << TabletId);
Send(ColumnShardActorId, new TEvPrivate::TEvReadFinished(RequestCookie));
Die(ctx);
@@ -140,9 +144,8 @@ public:
}
void Bootstrap(const TActorContext& ctx) {
- IndexedData.InitRead();
-
- LOG_S_DEBUG("Starting read (" << IndexedData.DebugString() << ") at tablet " << TabletId);
+ IndexedData = ReadMetadata->BuildReader(NOlap::TReadContext(Counters, true), ReadMetadata);
+ LOG_S_DEBUG("Starting read (" << IndexedData->DebugString() << ") at tablet " << TabletId);
bool earlyExit = false;
if (Deadline != TInstant::Max()) {
@@ -159,9 +162,9 @@ public:
SendTimeouts(ctx);
ctx.Send(SelfId(), new TEvents::TEvPoisonPill());
} else {
- while (IndexedData.HasMoreBlobs()) {
- const auto blobRange = IndexedData.ExtractNextBlob(false);
- SendReadRequest(ctx, blobRange);
+ while (const auto blobRange = IndexedData->ExtractNextBlob(false)) {
+ Y_VERIFY(blobRange->BlobId.IsValid());
+ SendReadRequest(ctx, *blobRange);
}
BuildResult(ctx);
}
@@ -203,11 +206,12 @@ private:
TActorId BlobCacheActorId;
std::unique_ptr<TEvColumnShard::TEvReadResult> Result;
NOlap::TReadMetadata::TConstPtr ReadMetadata;
- NOlap::TIndexedReadData IndexedData;
+ std::shared_ptr<NOlap::IDataReader> IndexedData;
TInstant Deadline;
TActorId ColumnShardActorId;
const ui64 RequestCookie;
ui32 ReturnedBatchNo;
+ const TConcreteScanCounters Counters;
mutable TString SerializedSchema;
TString GetSerializedSchema(const std::shared_ptr<arrow::RecordBatch>& batch) const {
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 39fdfc99760..91e28122c08 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -1491,7 +1491,6 @@ void TestReadWithProgram(const TestTableDescription& table = {})
UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
- UNIT_ASSERT(resRead.GetData().size() > 0);
auto& meta = resRead.GetMeta();
auto& schema = meta.GetSchema();
@@ -1501,11 +1500,12 @@ void TestReadWithProgram(const TestTableDescription& table = {})
switch (i) {
case 1:
+ UNIT_ASSERT(resRead.GetData().size() > 0);
UNIT_ASSERT(CheckColumns(readData[0], meta, {"level", "timestamp"}));
UNIT_ASSERT(DataHas(readData, schema, {0, 100}, true));
break;
case 2:
- UNIT_ASSERT(CheckColumns(readData[0], meta, {"level", "timestamp"}, 0));
+ UNIT_ASSERT(resRead.GetData().size() == 0);
break;
default:
break;
@@ -2355,16 +2355,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
std::optional<TBorder> From;
std::optional<TBorder> To;
std::optional<ui32> ExpectedCount;
- bool DataReadOnEmpty = false;
- TTestCaseOptions()
- : DataReadOnEmpty(false)
- {}
+ TTestCaseOptions() = default;
TTestCaseOptions& SetFrom(const TBorder& border) { From = border; return *this; }
TTestCaseOptions& SetTo(const TBorder& border) { To = border; return *this; }
TTestCaseOptions& SetExpectedCount(ui32 count) { ExpectedCount = count; return *this; }
- TTestCaseOptions& SetDataReadOnEmpty(bool flag) { DataReadOnEmpty = flag; return *this; }
TSerializedTableRange MakeRange(const std::vector<std::pair<TString, TTypeInfo>>& pk) const {
std::vector<TString> mem;
@@ -2422,13 +2418,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
UNIT_ASSERT(event);
auto& resRead = Proto(event);
- Cerr << "[" << __LINE__ << "] " << Owner.YdbPk[0].second.GetTypeId() << " "
- << resRead.GetBatch() << " " << resRead.GetData().size() << "\n";
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("0_type_id", Owner.YdbPk[0].second.GetTypeId())("batch", resRead.GetBatch())("data_size", resRead.GetData().size());
UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard);
UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
- if (ExpectedCount && !*ExpectedCount && !DataReadOnEmpty) {
+ if (ExpectedCount && !*ExpectedCount) {
UNIT_ASSERT(!resRead.GetBatch());
UNIT_ASSERT(resRead.GetFinished());
UNIT_ASSERT(!resRead.GetData().size());
@@ -2485,13 +2480,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
~TTestCase() {
- try {
- Execute();
- Cerr << "TEST CASE " << TestCaseName << " FINISHED" << Endl;
- } catch (...) {
- Cerr << "TEST CASE " << TestCaseName << " FAILED" << Endl;
- throw;
- }
+ Execute();
+ Cerr << "TEST CASE " << TestCaseName << " FINISHED" << Endl;
}
};
@@ -2620,14 +2610,13 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
TTabletReadPredicateTest testAgent(runtime, planStep, txId, table.Pk);
testAgent.Test(":1)").SetTo(TBorder(val1, false)).SetExpectedCount(1);
testAgent.Test(":1]").SetTo(TBorder(val1, true)).SetExpectedCount(2);
- testAgent.Test(":0)").SetTo(TBorder(val0, false)).SetExpectedCount(0).SetDataReadOnEmpty(true);
+ testAgent.Test(":0)").SetTo(TBorder(val0, false)).SetExpectedCount(0);
testAgent.Test(":0]").SetTo(TBorder(val0, true)).SetExpectedCount(1);
testAgent.Test("[0:0]").SetFrom(TBorder(val0, true)).SetTo(TBorder(val0, true)).SetExpectedCount(1);
testAgent.Test("[0:1)").SetFrom(TBorder(val0, true)).SetTo(TBorder(val1, false)).SetExpectedCount(1);
- testAgent.Test("(0:1)").SetFrom(TBorder(val0, false)).SetTo(TBorder(val1, false)).SetExpectedCount(0).SetDataReadOnEmpty(true);
- testAgent.Test("outscope1").SetFrom(TBorder(val1M, true)).SetTo(TBorder(val1M_1, true))
- .SetExpectedCount(0).SetDataReadOnEmpty(isStrPk0);
+ testAgent.Test("(0:1)").SetFrom(TBorder(val0, false)).SetTo(TBorder(val1, false)).SetExpectedCount(0);
+ testAgent.Test("outscope1").SetFrom(TBorder(val1M, true)).SetTo(TBorder(val1M_1, true)).SetExpectedCount(0);
// VERIFIED AS INCORRECT INTERVAL (its good)
// testAgent.Test("[0-0)").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TBorder(0, false)).SetExpectedCount(0);
@@ -2642,7 +2631,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
} else {
testAgent.Test("(numRows:").SetFrom(TBorder(valNumRows, false)).SetExpectedCount(0);
- testAgent.Test("(numRows-1:").SetFrom(TBorder(valNumRows_1, false)).SetExpectedCount(0).SetDataReadOnEmpty(true);
+ testAgent.Test("(numRows-1:").SetFrom(TBorder(valNumRows_1, false)).SetExpectedCount(0);
testAgent.Test("(numRows-2:").SetFrom(TBorder(valNumRows_2, false)).SetExpectedCount(1);
testAgent.Test("[numRows-1:").SetFrom(TBorder(valNumRows_1, true)).SetExpectedCount(1);
}