diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-25 16:04:37 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-04-25 16:04:37 +0300 |
commit | 4ef444a11e91cc3be7a395c0d212a2602f20cce6 (patch) | |
tree | 7447c8580bc1905d892a0f158ce19a51b05f4823 | |
parent | 00fe65a36352e32d68310fe30a4223c74391c6a3 (diff) | |
download | ydb-4ef444a11e91cc3be7a395c0d212a2602f20cce6.tar.gz |
dont read all projected/sorted/group columns before early filter (step-0 in program)
48 files changed, 1101 insertions, 502 deletions
diff --git a/ydb/core/formats/arrow_filter.cpp b/ydb/core/formats/arrow_filter.cpp index 7eaea60dee..28fe9892ea 100644 --- a/ydb/core/formats/arrow_filter.cpp +++ b/ydb/core/formats/arrow_filter.cpp @@ -312,7 +312,7 @@ bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch) { if (!batch || !batch->num_rows()) { return false; } - Y_VERIFY(Filter.empty() || Count == (size_t)batch->num_rows()); + Y_VERIFY_S(Filter.empty() || Count == (size_t)batch->num_rows(), Count << " != " << batch->num_rows()); if (IsTotalDenyFilter()) { batch = batch->Slice(0, 0); return false; diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index 8e03552359..db47cb93bc 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -266,6 +266,7 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow:: for (auto& field : dstSchema->fields()) { columns.push_back(srcBatch->GetColumnByName(field->name())); + Y_VERIFY(columns.back()); if (!columns.back()->type()->Equals(field->type())) { columns.back() = {}; } @@ -842,7 +843,8 @@ bool ReserveData(arrow::ArrayBuilder& builder, const size_t size) { return true; } -bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, const std::vector<std::string>& columnsOrder) { +bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, + const std::vector<std::string>& columnsOrder, const bool orderFieldsAreNecessary) { if (batches.empty()) { result = nullptr; return true; @@ -855,6 +857,7 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b std::vector<std::shared_ptr<arrow::Array>> columns; std::map<std::string, ui32> fieldNames; for (auto&& i : batches) { + Y_VERIFY(i); for (auto&& f : i->schema()->fields()) { if (!fieldNames.emplace(f->name(), fields.size()).second) { return false; @@ -877,7 +880,11 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b std::vector<std::shared_ptr<arrow::Array>> columnsOrdered; for (auto&& i : columnsOrder) { auto it = fieldNames.find(i); - Y_VERIFY(it != fieldNames.end()); + if (orderFieldsAreNecessary) { + Y_VERIFY(it != fieldNames.end()); + } else if (it == fieldNames.end()) { + continue; + } fieldsOrdered.emplace_back(fields[it->second]); columnsOrdered.emplace_back(columns[it->second]); } diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index b4f55f0ebf..3d6f060171 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -109,7 +109,7 @@ std::shared_ptr<arrow::UInt64Array> MakePermutation(int size, bool reverse = fal TVector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema); size_t LowerBound(const std::vector<TRawReplaceKey>& batchKeys, const TReplaceKey& key, size_t offset = 0); bool ReserveData(arrow::ArrayBuilder& builder, const size_t size); -bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, const std::vector<std::string>& columnsOrder = {}); +bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, const std::vector<std::string>& columnsOrder = {}, const bool orderFieldsAreNecessary = true); std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch, const std::shared_ptr<arrow::Schema>& sortingKey); diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index 3760e8b8e0..4e5ea10006 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -35,8 +35,15 @@ message TReadStats { optional uint64 IndexPortions = 5; optional uint64 IndexBatches = 6; optional uint64 NotIndexedBatches = 7; - optional uint32 UsedColumns = 8; - optional uint64 DataBytes = 9; + optional uint32 SchemaColumns = 8; + optional uint64 PortionsBytes = 9; + optional uint64 DataFilterBytes = 10; + optional uint64 DataAdditionalBytes = 11; + + optional uint32 FilterColumns = 12; + optional uint32 AdditionalColumns = 13; + + optional uint32 SelectedRows = 14; } message TLogicalMetadata { diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index 6f852e9ea2..434628715d 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -70,6 +70,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 1cf2e0e0f6..8921b5bc76 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -71,6 +71,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 1cf2e0e0f6..8921b5bc76 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -71,6 +71,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index 6f852e9ea2..434628715d 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -70,6 +70,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index 0838327fb7..065929dec5 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -1,18 +1,27 @@ #include "columnshard__index_scan.h" +#include <ydb/core/tx/conveyor/usage/service.h> +#include <ydb/core/tx/conveyor/usage/events.h> namespace NKikimr::NColumnShard { -TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata) +TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, + NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TScanCounters& scanCounters) : ReadMetadata(readMetadata) - , IndexedData(ReadMetadata) + , IndexedData(ReadMetadata, FetchBlobsQueue, false, scanCounters) + , DataTasksProcessor(processor) + , ScanCounters(scanCounters) { ui32 batchNo = 0; for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i, ++batchNo) { const auto& cmtBlob = ReadMetadata->CommittedBlobs[i]; WaitCommitted.emplace(cmtBlob, batchNo); } - std::vector<TBlobRange> indexedBlobs = IndexedData.InitRead(batchNo, true); - + // Read all committed blobs + for (const auto& cmtBlob : ReadMetadata->CommittedBlobs) { + auto& blobId = cmtBlob.BlobId; + FetchBlobsQueue.emplace_back(TBlobRange(blobId, 0, blobId.BlobSize())); + } + IndexedData.InitRead(batchNo, true); // Add cached batches without read for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) { auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 }); @@ -23,25 +32,17 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.PlanStep, cmtBlob.TxId); } - // Read all committed blobs - for (const auto& cmtBlob : ReadMetadata->CommittedBlobs) { - auto& blobId = cmtBlob.BlobId; - BlobsToRead.push_back(TBlobRange(blobId, 0, blobId.BlobSize())); - } - Y_VERIFY(ReadMetadata->IsSorted()); - for (auto&& blobRange : indexedBlobs) { - BlobsToRead.emplace_back(blobRange); + if (ReadMetadata->Empty()) { + FetchBlobsQueue.Stop(); } - - IsReadFinished = ReadMetadata->Empty(); } -void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data, IDataTasksProcessor::TPtr processor) { +void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data) { const auto& blobId = blobRange.BlobId; if (IndexedData.IsIndexedBlob(blobRange)) { - IndexedData.AddIndexed(blobRange, data, processor); + IndexedData.AddIndexed(blobRange, data, DataTasksProcessor); } else { auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 }); if (cmt.empty()) { @@ -67,12 +68,7 @@ NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() { } NKikimr::NColumnShard::TBlobRange TColumnShardScanIterator::GetNextBlobToRead() { - if (IsReadFinished || NextBlobIdxToRead == BlobsToRead.size()) { - return TBlobRange(); - } - const auto& blob = BlobsToRead[NextBlobIdxToRead]; - ++NextBlobIdxToRead; - return blob; + return FetchBlobsQueue.pop_front(); } void TColumnShardScanIterator::FillReadyResults() { @@ -96,14 +92,31 @@ void TColumnShardScanIterator::FillReadyResults() { } if (limitLeft == 0) { + DataTasksProcessor.Stop(); WaitCommitted.clear(); IndexedData.Abort(); - IsReadFinished = true; + FetchBlobsQueue.Stop(); } - if (WaitCommitted.empty() && !IndexedData.IsInProgress() && NextBlobIdxToRead == BlobsToRead.size()) { - IsReadFinished = true; + if (WaitCommitted.empty() && !IndexedData.IsInProgress() && FetchBlobsQueue.empty()) { + DataTasksProcessor.Stop(); + FetchBlobsQueue.Stop(); + } +} + +bool TColumnShardScanIterator::HasWaitingTasks() const { + return DataTasksProcessor.InWaiting(); +} + +TColumnShardScanIterator::~TColumnShardScanIterator() { + ReadMetadata->ReadStats->PrintToLog(); +} + +void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) { + if (!task->IsDataProcessed()) { + return; } + task->Apply(IndexedData); } } diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index 247abe95ed..3450928dfa 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -28,27 +28,28 @@ using NOlap::TUnifiedBlobId; using NOlap::TBlobRange; class TColumnShardScanIterator : public TScanIteratorBase { +private: NOlap::TReadMetadata::TConstPtr ReadMetadata; + NOlap::TFetchBlobsQueue FetchBlobsQueue; NOlap::TIndexedReadData IndexedData; std::unordered_map<NOlap::TCommittedBlob, ui32, THash<NOlap::TCommittedBlob>> WaitCommitted; - TVector<TBlobRange> BlobsToRead; - ui64 NextBlobIdxToRead = 0; TDeque<NOlap::TPartialReadResult> ReadyResults; - bool IsReadFinished = false; ui64 ItemsRead = 0; const i64 MaxRowsInBatch = 5000; - + NColumnShard::TDataTasksProcessorContainer DataTasksProcessor; + NColumnShard::TScanCounters ScanCounters; public: - TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata); + TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TScanCounters& scanCounters); + ~TColumnShardScanIterator(); - virtual void Apply(IDataPreparationTask::TPtr task) override { - task->Apply(IndexedData); - } + virtual void Apply(IDataTasksProcessor::ITask::TPtr task) override; + + virtual bool HasWaitingTasks() const override; - void AddData(const TBlobRange& blobRange, TString data, IDataTasksProcessor::TPtr processor) override; + void AddData(const TBlobRange& blobRange, TString data) override; bool Finished() const override { - return IsReadFinished && ReadyResults.empty(); + return FetchBlobsQueue.IsStopped() && ReadyResults.empty(); } NOlap::TPartialReadResult GetBatch() override; diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp index 3857498fe1..bcf66fbf87 100644 --- a/ydb/core/tx/columnshard/columnshard__read.cpp +++ b/ydb/core/tx/columnshard/columnshard__read.cpp @@ -130,7 +130,7 @@ void TTxRead::Complete(const TActorContext& ctx) { TInstant deadline = TInstant::Max(); // TODO ctx.Register(CreateReadActor(Self->TabletID(), Ev->Get()->GetSource(), - std::move(Result), ReadMetadata, deadline, Self->SelfId(), requestCookie)); + std::move(Result), ReadMetadata, deadline, Self->SelfId(), requestCookie, Self->ReadCounters)); } } diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 93da0790f8..b408adb99a 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -54,14 +54,13 @@ class TLocalDataTasksProcessor: public IDataTasksProcessor { private: const TActorIdentity OwnerActorId; protected: - virtual bool DoAdd(IDataPreparationTask::TPtr task) override { + virtual bool DoAdd(IDataTasksProcessor::ITask::TPtr task) override { OwnerActorId.Send(NConveyor::MakeServiceId(OwnerActorId.NodeId()), new NConveyor::TEvExecution::TEvNewTask(task)); return true; } public: TLocalDataTasksProcessor(const TActorIdentity& ownerActorId) - : OwnerActorId(ownerActorId) - { + : OwnerActorId(ownerActorId) { } }; @@ -75,7 +74,7 @@ public: TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId, ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie, ui64 tabletId, TDuration timeout, TVector<TTxScan::TReadMetadataPtr>&& readMetadataList, - NKikimrTxDataShard::EScanDataFormat dataFormat) + NKikimrTxDataShard::EScanDataFormat dataFormat, const TScanCounters& scanCountersPool) : ColumnShardActorId(columnShardActorId) , ScanComputeActorId(scanComputeActorId) , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) @@ -88,6 +87,7 @@ public: , ReadMetadataRanges(std::move(readMetadataList)) , ReadMetadataIndex(0) , Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT)) + , ScanCountersPool(scanCountersPool) { KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema(); } @@ -100,18 +100,23 @@ public: new IEventHandle(SelfId(), SelfId(), new TEvents::TEvWakeup)); Y_VERIFY(!ScanIterator); - ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(); + ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(MakeTasksProcessor(), ScanCountersPool); // propagate self actor id // TODO: FlagSubscribeOnSession ? Send(ScanComputeActorId, new TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen), IEventHandle::FlagTrackDelivery); - if (NConveyor::TServiceOperator::IsEnabled()) { - DataTasksProcessor = std::make_shared<TLocalDataTasksProcessor>(SelfId()); - } Become(&TColumnShardScan::StateScan); } private: + IDataTasksProcessor::TPtr MakeTasksProcessor() const { + if (NConveyor::TServiceOperator::IsEnabled()) { + return std::make_shared<TLocalDataTasksProcessor>(SelfId()); + } else { + return nullptr; + } + } + STATEFN(StateScan) { auto g = Stats.MakeGuard("processing"); switch (ev->GetTypeRewrite()) { @@ -161,19 +166,17 @@ private: } void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) { - TaskResultsCounter.Inc(); auto g = Stats.MakeGuard("task_result"); LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN, "Scan " << ScanActorId << " got ScanDataAck" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId); if (ev->Get()->GetErrorMessage()) { ALS_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN) << "Scan " << ScanActorId << " got finished error " << ev->Get()->GetErrorMessage() << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId; - DataTasksProcessor->Stop(); SendScanError(ev->Get()->GetErrorMessage()); Finish(); } else { - auto t = dynamic_pointer_cast<IDataPreparationTask>(ev->Get()->GetResult()); - Y_VERIFY(t); + auto t = static_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult()); + Y_VERIFY_DEBUG(dynamic_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult())); ScanIterator->Apply(t); } ContinueProcessing(); @@ -233,7 +236,7 @@ private: if (ScanIterator) { { auto g = Stats.MakeGuard("AddData"); - ScanIterator->AddData(blobRange, event.Data, DataTasksProcessor); + ScanIterator->AddData(blobRange, event.Data); } ContinueProcessing(); } @@ -363,7 +366,7 @@ private: // * we have finished scanning ALL the ranges // * or there is an in-flight blob read or ScanData message for which // we will get a reply and will be able to proceed further - if (!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads != 0 || (DataTasksProcessor && DataTasksProcessor->GetDataCounter() > TaskResultsCounter.Val())) { + if (!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads != 0 || ScanIterator->HasWaitingTasks()) { return; } } @@ -443,8 +446,7 @@ private: return Finish(); } - ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(); - + ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(MakeTasksProcessor(), ScanCountersPool); // Used in TArrowToYdbConverter ResultYqlSchema.clear(); } @@ -587,6 +589,7 @@ private: const TSerializedTableRange TableRange; const TSmallVec<bool> SkipNullKeys; const TInstant Deadline; + TScanCounters ScanCountersPool; TActorId TimeoutActorId; TMaybe<TString> AbortReason; @@ -597,9 +600,6 @@ private: i64 InFlightReadBytes = 0; bool Finished = false; - TAtomicCounter TaskResultsCounter = 0; - IDataTasksProcessor::TPtr DataTasksProcessor; - class TBlobStats { private: ui64 PartsCount = 0; @@ -1009,7 +1009,7 @@ void TTxScan::Complete(const TActorContext& ctx) { Self->IncCounter(COUNTER_READ_INDEX_BYTES, statsDelta.Bytes); auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor, - scanId, txId, scanGen, requestCookie, Self->TabletID(), timeout, std::move(ReadMetadataRanges), dataFormat)); + scanId, txId, scanGen, requestCookie, Self->TabletID(), timeout, std::move(ReadMetadataRanges), dataFormat, Self->ScanCounters)); LOG_S_DEBUG("TTxScan starting " << scanActor << " txId: " << txId diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h index f462aedc46..54fe14916f 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.h +++ b/ydb/core/tx/columnshard/columnshard__scan.h @@ -1,7 +1,8 @@ #pragma once #include "blob_cache.h" -#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> +#include "engines/reader/conveyor_task.h" +#include "engines/indexed_read_data.h" namespace NKikimr::NColumnShard { @@ -9,10 +10,11 @@ class TScanIteratorBase { public: virtual ~TScanIteratorBase() = default; - virtual void Apply(IDataPreparationTask::TPtr /*processor*/) { + virtual void Apply(IDataTasksProcessor::ITask::TPtr /*processor*/) { } - virtual void AddData(const NBlobCache::TBlobRange& /*blobRange*/, TString /*data*/, IDataTasksProcessor::TPtr /*processor*/) {} + virtual void AddData(const NBlobCache::TBlobRange& /*blobRange*/, TString /*data*/) {} + virtual bool HasWaitingTasks() const = 0; virtual bool Finished() const = 0; virtual NOlap::TPartialReadResult GetBatch() = 0; virtual NBlobCache::TBlobRange GetNextBlobToRead() { return NBlobCache::TBlobRange(); } diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h index 6a422f136d..bce22aa077 100644 --- a/ydb/core/tx/columnshard/columnshard__stats_scan.h +++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h @@ -43,6 +43,10 @@ public: { } + virtual bool HasWaitingTasks() const override { + return false; + } + bool Finished() const override { return IndexStats.empty(); } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index a504e6e127..592adb0618 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -113,6 +113,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , TTabletExecutedFlat(info, tablet, nullptr) , PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig())) , InsertTable(std::make_unique<NOlap::TInsertTable>()) + , ReadCounters("Read") + , ScanCounters("Scan") { TabletCountersPtr.reset(new TProtobufTabletCounters< ESimpleCounters_descriptor, diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index b8e53757a9..9a5c0e8ccf 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -1,5 +1,6 @@ #pragma once #include "defs.h" +#include "counters.h" #include "columnshard.h" #include "columnshard_common.h" #include "columnshard_ttl.h" @@ -38,7 +39,7 @@ IActor* CreateReadActor(ui64 tabletId, NOlap::TReadMetadata::TConstPtr readMetadata, const TInstant& deadline, const TActorId& columnShardActorId, - ui64 requestCookie); + ui64 requestCookie, const TScanCounters& counters); IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId); IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev); @@ -372,7 +373,7 @@ private: TInstant LastStatsReport; TActorId IndexingActor; // It's logically bounded to 1: we move each portion of data to multiple indices. - TActorId CompactionActor; // It's memory bounded to 1: we have no memory for parallel compation. + TActorId CompactionActor; // It's memory bounded to 1: we have no memory for parallel compaction. TActorId EvictionActor; TActorId StatsReportPipe; @@ -382,6 +383,8 @@ private: std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache; std::unique_ptr<NOlap::TInsertTable> InsertTable; TBatchCache BatchCache; + TScanCounters ReadCounters; + TScanCounters ScanCounters; THashMap<ui64, TBasicTxInfo> BasicTxInfo; TSet<TDeadlineQueueItem> DeadlineQueue; diff --git a/ydb/core/tx/columnshard/counters.cpp b/ydb/core/tx/columnshard/counters.cpp new file mode 100644 index 0000000000..dcf1476e52 --- /dev/null +++ b/ydb/core/tx/columnshard/counters.cpp @@ -0,0 +1,14 @@ +#include "counters.h" +#include <ydb/core/base/appdata.h> +#include <ydb/core/base/counters.h> + +namespace NKikimr::NColumnShard { + +TScanCounters::TScanCounters(const TString& module) { + ::NMonitoring::TDynamicCounterPtr subGroup = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "columnshard"); + PortionBytes = subGroup->GetCounter(module + "/PortionBytes", true); + FilterBytes = subGroup->GetCounter(module + "/FilterBytes", true); + PostFilterBytes = subGroup->GetCounter(module + "/PostFilterBytes", true); +} + +} diff --git a/ydb/core/tx/columnshard/counters.h b/ydb/core/tx/columnshard/counters.h new file mode 100644 index 0000000000..357def8e37 --- /dev/null +++ b/ydb/core/tx/columnshard/counters.h @@ -0,0 +1,17 @@ +#pragma once +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NColumnShard { + +class TScanCounters { +private: + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PortionBytes); + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterBytes); + YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PostFilterBytes); +public: + TScanCounters(const TString& module = "Scan"); +}; + + +} diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt index 398cfd3d0c..a7adbff89e 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(reader) add_subdirectory(ut) add_library(tx-columnshard-engines) @@ -22,6 +23,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC ydb-core-scheme ydb-core-tablet ydb-core-tablet_flat + columnshard-engines-reader udf-service-exception_policy ) target_sources(tx-columnshard-engines PRIVATE diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt index ef42c70332..39ca9cc7e6 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(reader) add_subdirectory(ut) add_library(tx-columnshard-engines) @@ -23,6 +24,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC ydb-core-scheme ydb-core-tablet ydb-core-tablet_flat + columnshard-engines-reader udf-service-exception_policy ) target_sources(tx-columnshard-engines PRIVATE diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt index ef42c70332..39ca9cc7e6 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(reader) add_subdirectory(ut) add_library(tx-columnshard-engines) @@ -23,6 +24,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC ydb-core-scheme ydb-core-tablet ydb-core-tablet_flat + columnshard-engines-reader udf-service-exception_policy ) target_sources(tx-columnshard-engines PRIVATE diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt index 398cfd3d0c..a7adbff89e 100644 --- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(reader) add_subdirectory(ut) add_library(tx-columnshard-engines) @@ -22,6 +23,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC ydb-core-scheme ydb-core-tablet ydb-core-tablet_flat + columnshard-engines-reader udf-service-exception_policy ) target_sources(tx-columnshard-engines PRIVATE diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp index ed031498d5..e2a041af03 100644 --- a/ydb/core/tx/columnshard/engines/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/index_info.cpp @@ -46,10 +46,11 @@ std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns( } std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchemaSnapshot() { - return std::make_shared<arrow::Schema>(arrow::FieldVector{ + static std::shared_ptr<arrow::Schema> result = std::make_shared<arrow::Schema>(arrow::FieldVector{ arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()), arrow::field(SPEC_COL_TX_ID, arrow::uint64()) }); + return result; } bool TIndexInfo::IsSpecialColumn(const arrow::Field& field) { @@ -59,7 +60,13 @@ bool TIndexInfo::IsSpecialColumn(const arrow::Field& field) { } -ui32 TIndexInfo::GetColumnId(const TString& name) const { +ui32 TIndexInfo::GetColumnId(const std::string& name) const { + auto id = GetColumnIdOptional(name); + Y_VERIFY(!!id); + return *id; +} + +std::optional<ui32> TIndexInfo::GetColumnIdOptional(const std::string& name) const { const auto ni = ColumnNames.find(name); if (ni == ColumnNames.end()) { @@ -68,7 +75,7 @@ ui32 TIndexInfo::GetColumnId(const TString& name) const { } else if (name == SPEC_COL_TX_ID) { return ui32(ESpecialColumn::TX_ID); } - Y_VERIFY(false); + return {}; } return ni->second; diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h index 293cb0cbaa..d9b96bc902 100644 --- a/ydb/core/tx/columnshard/engines/index_info.h +++ b/ydb/core/tx/columnshard/engines/index_info.h @@ -58,7 +58,8 @@ public: } /// Returns an id of the column located by name. The name should exists in the schema. - ui32 GetColumnId(const TString& name) const; + ui32 GetColumnId(const std::string& name) const; + std::optional<ui32> GetColumnIdOptional(const std::string& name) const; /// Returns a name of the column located by id. TString GetColumnName(ui32 id, bool required = true) const; @@ -80,7 +81,7 @@ public: return KeyColumns[0]; } - // Sorting key: colud be less or greater then traditional PK + // Sorting key: could be less or greater then traditional PK // It could be empty for append-only tables. It could be greater then PK for better columns compression. // If sorting key includes uniqueness key as a prefix we are able to use MergeSort for REPLACE. const std::shared_ptr<arrow::Schema>& GetSortingKey() const { return SortingKey; } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 9af8f09c5e..ee8e9f68b2 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -113,145 +113,56 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v } -TIndexedReadData::TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo) - : BatchNo(batchNo) - , Portion(portionInfo.Records[0].Portion) - , Granule(owner.GetGranuleId()) - , Owner(&owner) - , PortionInfo(&portionInfo) -{ - for (const NOlap::TColumnRecord& rec : portionInfo.Records) { - Y_VERIFY(WaitIndexed.emplace(rec.BlobRange).second); - Y_VERIFY(rec.Portion == Portion); - Y_VERIFY(rec.Valid()); - Y_VERIFY(Granule == rec.Granule); - } - - if (portionInfo.CanIntersectOthers()) { - Owner->SetDuplicationsAvailable(true); - if (portionInfo.CanHaveDups()) { - SetDuplicationsAvailable(true); - } - } +std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const { + return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), tasksProcessor, scanCounters); } -NColumnShard::IDataPreparationTask::TPtr TIndexedReadData::TBatch::AssembleIndexedBatch(NColumnShard::IDataTasksProcessor::TPtr processor, NOlap::TReadMetadata::TConstPtr readMetadata) { - Y_VERIFY(PortionInfo->Produced()); - - auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->IndexInfo, readMetadata->LoadSchema, Data); - - for (auto& rec : PortionInfo->Records) { - auto& blobRange = rec.BlobRange; - Data.erase(blobRange); +std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds(const bool noTrivial) const { + std::set<ui32> result; + if (LessPredicate) { + for (auto&& i : LessPredicate->ColumnNames()) { + result.emplace(IndexInfo.GetColumnId(i)); + } } - - return std::make_shared<TAssembledNotFiltered>(std::move(batchConstructor), readMetadata, GetBatchNo(), PortionInfo->AllowEarlyFilter(), processor); -} - -bool TIndexedReadData::TAssembledNotFiltered::DoExecuteImpl() { - /// @warning The replace logic is correct only in assumption that predicate is applied over a part of ReplaceKey. - /// It's not OK to apply predicate before replacing key duplicates otherwise. - /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here - const std::set<std::string> columnNames = ReadMetadata->GetFilterColumns(true); - if (columnNames.empty()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("method", "TAssembledNotFiltered::DoExecuteImpl") - ("event", "skip_data_no_columns")("columns_count", BatchConstructor.GetColumnsCount()); - FilteredBatch = BatchConstructor.Assemble(); - return true; - } - TPortionInfo::TPreparedBatchData::TAssembleOptions options; - options.SetIncludeColumnNames(columnNames); - auto batch = BatchConstructor.Assemble(options); - const size_t ignoredColumnsCount = BatchConstructor.GetColumnsCount() - batch->schema()->num_fields(); - Y_VERIFY(batch); - - NArrow::TColumnFilter globalFilter = NOlap::FilterPortion(batch, *ReadMetadata); - if (!globalFilter.Apply(batch)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("method", "TAssembledNotFiltered::DoExecuteImpl") - ("event", "skip_data")("columns_count", ignoredColumnsCount); - FilteredBatch = nullptr; - return true; + if (GreaterPredicate) { + for (auto&& i : GreaterPredicate->ColumnNames()) { + result.emplace(IndexInfo.GetColumnId(i)); + } } -#if 1 // optimization - if (ReadMetadata->Program && AllowEarlyFilter) { - auto filter = NOlap::EarlyFilter(batch, ReadMetadata->Program); - globalFilter.CombineSequential(filter); - if (!filter.Apply(batch)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("method", "TAssembledNotFiltered::DoExecuteImpl") - ("event", "skip_data")("columns_count", ignoredColumnsCount); - FilteredBatch = nullptr; - return true; + if (Program) { + for (auto&& i : Program->GetEarlyFilterColumns()) { + auto id = IndexInfo.GetColumnIdOptional(i); + if (id) { + result.emplace(*id); + } } } -#else - Y_UNUSED(AllowEarlyFilter); -#endif - - if (BatchConstructor.GetColumnsCount() > (size_t)batch->schema()->num_fields()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("method", "TAssembledNotFiltered::DoExecuteImpl") - ("event", "not_skip_data")("columns_count", ignoredColumnsCount)("num_rows", batch->num_rows()); - TPortionInfo::TPreparedBatchData::TAssembleOptions options; - options.SetExcludeColumnNames(columnNames); - - if (globalFilter.GetInactiveHeadSize() > globalFilter.GetInactiveTailSize()) { - options.SetRecordsCountLimit(globalFilter.Size() - globalFilter.GetInactiveHeadSize()) - .SetForwardAssemble(false); - globalFilter.CutInactiveHead(); - } else { - options.SetRecordsCountLimit(globalFilter.Size() - globalFilter.GetInactiveTailSize()); - globalFilter.CutInactiveTail(); + if (noTrivial && result.empty()) { + return result; + } + if (PlanStep) { + auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); + for (auto&& i : snapSchema->fields()) { + result.emplace(IndexInfo.GetColumnId(i->name())); } - std::shared_ptr<arrow::RecordBatch> fBatch = BatchConstructor.Assemble(options); - Y_VERIFY(globalFilter.Apply(fBatch)); - Y_VERIFY(NArrow::MergeBatchColumns({ batch, fBatch }, batch, BatchConstructor.GetColumnsOrder())); } - - FilteredBatch = batch; - return true; -} - -bool TIndexedReadData::TAssembledNotFiltered::DoApply(TIndexedReadData& owner) const { - owner.PortionFinished(BatchNo, FilteredBatch); - return true; -} - -std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan() const { - return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this()); + return result; } -std::set<std::string> TReadMetadata::GetFilterColumns(const bool early) const { - std::set<std::string> result; +std::set<ui32> TReadMetadata::GetUsedColumnIds() const { + std::set<ui32> result; if (PlanStep) { auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); for (auto&& i : snapSchema->fields()) { - result.emplace(i->name()); + result.emplace(IndexInfo.GetColumnId(i->name())); } } - if (LessPredicate) { - for (auto&& i : LessPredicate->ColumnNames()) { - result.emplace(i); - } - } - if (GreaterPredicate) { - for (auto&& i : GreaterPredicate->ColumnNames()) { - result.emplace(i); - } - } - if (Program) { - if (!early) { - for (auto&& i : Program->SourceColumns) { - result.emplace(i.second); - } - } else { - for (auto&& i : Program->GetEarlyFilterColumns()) { - result.emplace(i); - } - } + for (auto&& f : LoadSchema->fields()) { + result.emplace(IndexInfo.GetColumnId(f->name())); } return result; } - TVector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetResultYqlSchema() const { return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, ResultColumnIds); } @@ -260,12 +171,25 @@ TVector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetKeyYqlSch return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, NColumnShard::PrimaryIndexStatsSchema.KeyColumns); } -std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan() const { +std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer /*tasksProcessor*/, const NColumnShard::TScanCounters& /*scanCounters*/) const { return std::make_unique<NColumnShard::TStatsIterator>(this->shared_from_this()); } +void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) { + Y_VERIFY(IndexedBlobs.emplace(range).second); + Y_VERIFY(IndexedBlobSubscriber.emplace(range, &batch).second); + if (batch.GetFilter()) { + Counters.GetPostFilterBytes()->Add(range.Size); + ReadMetadata->ReadStats->DataAdditionalBytes += range.Size; + FetchBlobsQueue.emplace_front(range); + } else { + Counters.GetFilterBytes()->Add(range.Size); + ReadMetadata->ReadStats->DataFilterBytes += range.Size; + FetchBlobsQueue.emplace_back(range); + } +} -std::vector<TBlobRange> TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) { +void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) { Y_VERIFY(ReadMetadata->BlobSchema); Y_VERIFY(ReadMetadata->LoadSchema); Y_VERIFY(ReadMetadata->ResultSchema); @@ -277,54 +201,48 @@ std::vector<TBlobRange> TIndexedReadData::InitRead(ui32 inputBatch, bool inGranu NotIndexed.resize(inputBatch); ui32 batchNo = inputBatch; - BatchInfo.resize(inputBatch + ReadMetadata->SelectInfo->Portions.size(), nullptr); + Batches.resize(inputBatch + ReadMetadata->SelectInfo->Portions.size(), nullptr); - ui64 dataBytes = 0; + ui64 portionsBytes = 0; for (auto& portionInfo : ReadMetadata->SelectInfo->Portions) { + portionsBytes += portionInfo.BlobsBytes(); Y_VERIFY_S(portionInfo.Records.size() > 0, "ReadMeatadata: " << *ReadMetadata); ui64 granule = portionInfo.Records[0].Granule; auto itGranule = Granules.find(granule); if (itGranule == Granules.end()) { - itGranule = Granules.emplace(granule, TGranule(granule, *this)).first; - } - - TBatch& currentBatch = itGranule->second.AddBatch(batchNo, portionInfo); - BatchInfo[batchNo] = ¤tBatch; - - for (auto&& i : currentBatch.GetWaitingBlobs()) { - Y_VERIFY(IndexedBlobs.emplace(i).second); - Y_VERIFY(IndexedBlobSubscriber.emplace(i, ¤tBatch).second); - dataBytes += i.Size; + itGranule = Granules.emplace(granule, NIndexedReader::TGranule(granule, *this)).first; } + NIndexedReader::TBatch& currentBatch = itGranule->second.AddBatch(batchNo, portionInfo); + currentBatch.Reset(&EarlyFilterColumns); + Batches[batchNo] = ¤tBatch; ++batchNo; } auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted()); - std::vector<TBlobRange> out; for (ui64 granule : granulesOrder) { auto it = Granules.find(granule); Y_VERIFY(it != Granules.end()); - it->second.FillBlobsForFetch(out); if (inGranulesOrder) { GranulesOutOrder.emplace_back(&it->second); } } - + Counters.GetPortionBytes()->Add(portionsBytes); auto& stats = ReadMetadata->ReadStats; stats->IndexGranules = ReadMetadata->SelectInfo->Granules.size(); stats->IndexPortions = ReadMetadata->SelectInfo->Portions.size(); stats->IndexBatches = ReadMetadata->NumIndexedBlobs(); stats->CommittedBatches = ReadMetadata->CommittedBlobs.size(); - stats->UsedColumns = ReadMetadata->LoadSchema->num_fields(); - stats->DataBytes = dataBytes; - return out; + stats->SchemaColumns = ReadMetadata->LoadSchema->num_fields(); + stats->FilterColumns = EarlyFilterColumns.size(); + stats->AdditionalColumns = PostFilterColumns.size(); + stats->PortionsBytes = portionsBytes; } -void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& data, NColumnShard::IDataTasksProcessor::TPtr processor) { - TBatch* portionBatch = nullptr; +void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& data, NColumnShard::TDataTasksProcessorContainer processor) { + NIndexedReader::TBatch* portionBatch = nullptr; { auto it = IndexedBlobSubscriber.find(blobRange); Y_VERIFY_DEBUG(it != IndexedBlobSubscriber.end()); @@ -338,13 +256,8 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& da return; } if (portionBatch->IsFetchingReady()) { - if (auto batch = portionBatch->AssembleIndexedBatch(processor, ReadMetadata)) { - if (processor) { - processor->Add(batch); - } else { - batch->Execute(); - batch->Apply(*this); - } + if (auto batch = portionBatch->AssembleTask(processor.GetObject(), ReadMetadata)) { + processor.Add(*this, batch); } } } @@ -413,8 +326,8 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR return out; } -std::vector<TIndexedReadData::TGranule*> TIndexedReadData::DetachReadyInOrder() { - std::vector<TIndexedReadData::TGranule*> out; +std::vector<NIndexedReader::TGranule*> TIndexedReadData::DetachReadyInOrder() { + std::vector<NIndexedReader::TGranule*> out; out.reserve(GranulesToOut.size()); if (GranulesOutOrder.empty()) { @@ -424,7 +337,7 @@ std::vector<TIndexedReadData::TGranule*> TIndexedReadData::DetachReadyInOrder() GranulesToOut.clear(); } else { while (GranulesOutOrder.size()) { - TGranule* granule = GranulesOutOrder.front(); + NIndexedReader::TGranule* granule = GranulesOutOrder.front(); if (!granule->IsReady()) { break; } @@ -451,7 +364,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData:: OutNotIndexed.erase(0); } - std::vector<TGranule*> ready = DetachReadyInOrder(); + std::vector<NIndexedReader::TGranule*> ready = DetachReadyInOrder(); for (auto&& granule : ready) { bool canHaveDups = granule->IsDuplicationsAvailable(); std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = granule->GetReadyBatches(); @@ -638,20 +551,30 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco return out; } -void TIndexedReadData::PortionFinished(const ui32 batchNo, std::shared_ptr<arrow::RecordBatch> batch) { - Y_VERIFY(BatchInfo[batchNo]); - BatchInfo[batchNo]->InitBatch(batch); -} - +NIndexedReader::TBatch& TIndexedReadData::GetBatchInfo(const ui32 batchNo) { + Y_VERIFY(batchNo < Batches.size()); + auto ptr = Batches[batchNo]; + Y_VERIFY(ptr); + return *ptr; } -namespace NKikimr::NColumnShard { - -bool IDataPreparationTask::DoExecute() { - if (OwnerOperator && OwnerOperator->IsStopped()) { - return true; +TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue, + const bool internalRead, const NColumnShard::TScanCounters& counters) + : Counters(counters) + , FetchBlobsQueue(fetchBlobsQueue) + , ReadMetadata(readMetadata) +{ + PostFilterColumns = ReadMetadata->GetUsedColumnIds(); + EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds(true); + if (internalRead || EarlyFilterColumns.empty()) { + EarlyFilterColumns = PostFilterColumns; + PostFilterColumns.clear(); } else { - return DoExecuteImpl(); + for (auto&& i : EarlyFilterColumns) { + PostFilterColumns.erase(i); + } } + Y_VERIFY(ReadMetadata->SelectInfo); } + } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 09c032c025..cccbe5801d 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -2,70 +2,15 @@ #include "defs.h" #include "column_engine.h" #include "predicate.h" -#include <ydb/core/tx/conveyor/usage/abstract.h> +#include "reader/queue.h" +#include "reader/granule.h" +#include "reader/batch.h" -namespace NKikimr::NColumnShard { -class TScanIteratorBase; -} - -namespace NKikimr::NOlap { -class TIndexedReadData; -} +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/columnshard/counters.h> namespace NKikimr::NColumnShard { - -class IDataTasksProcessor; - -class IDataPreparationTask: public NConveyor::ITask { -private: - std::shared_ptr<IDataTasksProcessor> OwnerOperator; -protected: - virtual bool DoApply(NOlap::TIndexedReadData& indexedDataRead) const = 0; - virtual bool DoExecuteImpl() = 0; - - virtual bool DoExecute() override final; -public: - IDataPreparationTask(std::shared_ptr<IDataTasksProcessor> ownerOperator) - : OwnerOperator(ownerOperator) - { - - } - using TPtr = std::shared_ptr<IDataPreparationTask>; - virtual ~IDataPreparationTask() = default; - bool Apply(NOlap::TIndexedReadData& indexedDataRead) const { - return DoApply(indexedDataRead); - } -}; - -class IDataTasksProcessor { -private: - TAtomicCounter DataProcessorAddDataCounter = 0; -protected: - virtual bool DoAdd(IDataPreparationTask::TPtr task) = 0; - std::atomic<bool> Stopped = false; -public: - i64 GetDataCounter() const { - return DataProcessorAddDataCounter.Val(); - } - - void Stop() { - Stopped = true; - } - bool IsStopped() const { - return Stopped; - } - - using TPtr = std::shared_ptr<IDataTasksProcessor>; - virtual ~IDataTasksProcessor() = default; - bool Add(IDataPreparationTask::TPtr task) { - if (DoAdd(task)) { - DataProcessorAddDataCounter.Inc(); - return true; - } - return false; - - } -}; +class TScanIteratorBase; } namespace NKikimr::NOlap { @@ -77,20 +22,47 @@ struct TReadStats { ui64 IndexPortions{0}; ui64 IndexBatches{0}; ui64 CommittedBatches{0}; - ui32 UsedColumns{0}; - ui64 DataBytes{0}; + ui64 PortionsBytes{ 0 }; + ui64 DataFilterBytes{ 0 }; + ui64 DataAdditionalBytes{ 0 }; + + ui32 SchemaColumns = 0; + ui32 FilterColumns = 0; + ui32 AdditionalColumns = 0; + + ui32 SelectedRows = 0; TReadStats(ui32 indexNo) : BeginTimestamp(TInstant::Now()) , SelectedIndex(indexNo) {} + void PrintToLog() { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN) + ("event", "statistic") + ("begin", BeginTimestamp) + ("selected", SelectedIndex) + ("index_granules", IndexGranules) + ("index_portions", IndexPortions) + ("index_batches", IndexBatches) + ("committed_batches", CommittedBatches) + ("schema_columns", SchemaColumns) + ("filter_columns", FilterColumns) + ("additional_columns", AdditionalColumns) + ("portions_bytes", PortionsBytes) + ("data_filter_bytes", DataFilterBytes) + ("data_additional_bytes", DataAdditionalBytes) + ("delta_bytes", PortionsBytes - DataFilterBytes - DataAdditionalBytes) + ("selected_rows", SelectedRows) + ; + } + TDuration Duration() { return TInstant::Now() - BeginTimestamp; } }; -// Holds all metedata that is needed to perform read/scan +// Holds all metadata that is needed to perform read/scan struct TReadMetadataBase { using TConstPtr = std::shared_ptr<const TReadMetadataBase>; @@ -119,7 +91,7 @@ struct TReadMetadataBase { virtual TVector<std::pair<TString, NScheme::TTypeInfo>> GetResultYqlSchema() const = 0; virtual TVector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const = 0; - virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan() const = 0; + virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const = 0; virtual void Dump(IOutputStream& out) const { Y_UNUSED(out); }; // TODO: can this only be done for base class? @@ -146,7 +118,16 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_ , ReadStats(std::make_shared<TReadStats>(info.GetId())) {} - std::set<std::string> GetFilterColumns(const bool early) const; + std::vector<std::string> GetColumnsOrder() const { + std::vector<std::string> result; + for (auto&& i : LoadSchema->fields()) { + result.emplace_back(i->name()); + } + return result; + } + + std::set<ui32> GetEarlyFilterColumnIds(const bool noTrivial) const; + std::set<ui32> GetUsedColumnIds() const; bool Empty() const { Y_VERIFY(SelectInfo); @@ -185,7 +166,7 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_ return SelectInfo->Stats().Blobs; } - std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan() const override; + std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override; void Dump(IOutputStream& out) const override { out << "columns: " << (LoadSchema ? LoadSchema->num_fields() : 0) @@ -228,7 +209,7 @@ struct TReadStatsMetadata : public TReadMetadataBase, public std::enable_shared_ TVector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const override; - std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan() const override; + std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override; }; // Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation @@ -243,15 +224,22 @@ struct TPartialReadResult { }; class TIndexedReadData { +private: + std::set<ui32> EarlyFilterColumns; + YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns); + bool AbortedFlag = false; + NColumnShard::TScanCounters Counters; + std::vector<NIndexedReader::TBatch*> Batches; + TFetchBlobsQueue& FetchBlobsQueue; + friend class NIndexedReader::TBatch; + friend class NIndexedReader::TGranule; public: - TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata) - : ReadMetadata(readMetadata) - { - Y_VERIFY(ReadMetadata->SelectInfo); - } + TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue, const bool internalRead, const NColumnShard::TScanCounters& counters); + + NIndexedReader::TBatch& GetBatchInfo(const ui32 batchNo); - /// @returns blobId -> granule map. Granules could be read independently - std::vector<TBlobRange> InitRead(ui32 numNotIndexed, bool inGranulesOrder = false); + /// Initial FetchBlobsQueue filling (queue from external scan iterator). Granules could be read independently + void InitRead(ui32 numNotIndexed, bool inGranulesOrder = false); /// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK) TVector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch); @@ -269,155 +257,34 @@ public: NotIndexed[batchNo] = MakeNotIndexedBatch(batch, planStep, txId); } - void AddIndexed(const TBlobRange& blobRange, const TString& column, NColumnShard::IDataTasksProcessor::TPtr processor); + void AddIndexed(const TBlobRange& blobRange, const TString& column, NColumnShard::TDataTasksProcessorContainer processor); bool IsInProgress() const { return Granules.size() > ReadyGranulesAccumulator.size(); } bool IsIndexedBlob(const TBlobRange& blobRange) const { return IndexedBlobs.contains(blobRange); } void Abort() { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort"); for (auto&& i : Granules) { ReadyGranulesAccumulator.emplace(i.first); } + AbortedFlag = true; Y_VERIFY(ReadyGranulesAccumulator.size() == Granules.size()); Y_VERIFY(!IsInProgress()); } private: NOlap::TReadMetadata::TConstPtr ReadMetadata; - std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed; - - class TAssembledNotFiltered: public NColumnShard::IDataPreparationTask { - private: - using TBase = NColumnShard::IDataPreparationTask; - TPortionInfo::TPreparedBatchData BatchConstructor; - std::shared_ptr<arrow::RecordBatch> FilteredBatch; - NOlap::TReadMetadata::TConstPtr ReadMetadata; - ui32 BatchNo = 0; - bool AllowEarlyFilter = false; - protected: - virtual bool DoApply(TIndexedReadData& owner) const override; - virtual bool DoExecuteImpl() override; - public: - TAssembledNotFiltered(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata, - const ui32 batchNo, const bool allowEarlyFilter, NColumnShard::IDataTasksProcessor::TPtr processor) - : TBase(processor) - , BatchConstructor(batchConstructor) - , ReadMetadata(readMetadata) - , BatchNo(batchNo) - , AllowEarlyFilter(allowEarlyFilter) - { - - } - }; - void PortionFinished(const ui32 batchNo, std::shared_ptr<arrow::RecordBatch> batch); - - class TGranule; - - class TBatch { - private: - YDB_READONLY(ui64, BatchNo, 0); - YDB_READONLY(ui64, Portion, 0); - YDB_READONLY(ui64, Granule, 0); - THashSet<TBlobRange> WaitIndexed; - YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch); - YDB_FLAG_ACCESSOR(DuplicationsAvailable, false); - THashMap<TBlobRange, TString> Data; - TGranule* Owner = nullptr; - const TPortionInfo* PortionInfo = nullptr; - - friend class TGranule; - TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo & portionInfo); - void FillBlobsForFetch(std::vector<TBlobRange>& result) const { - for (auto&& i : WaitIndexed) { - result.emplace_back(i); - } - } - public: - NColumnShard::IDataPreparationTask::TPtr AssembleIndexedBatch(NColumnShard::IDataTasksProcessor::TPtr processor, NOlap::TReadMetadata::TConstPtr readMetadata); - - const THashSet<TBlobRange>& GetWaitingBlobs() const { - return WaitIndexed; - } - - const TGranule& GetOwner() const { - return *Owner; - } - - bool IsFetchingReady() const { - return WaitIndexed.empty(); - } - - const TPortionInfo& GetPortionInfo() const { - return *PortionInfo; - } - - void InitBatch(std::shared_ptr<arrow::RecordBatch> batch) { - Y_VERIFY(!FilteredBatch); - FilteredBatch = batch; - Owner->OnBatchReady(*this, batch); - } - - bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData) { - if (!WaitIndexed.erase(bRange)) { - return false; - } - Data.emplace(bRange, blobData); - return true; - } - }; - - class TGranule { - private: - YDB_READONLY(ui64, GranuleId, 0); - YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::RecordBatch>>, ReadyBatches); - YDB_FLAG_ACCESSOR(DuplicationsAvailable, false); - YDB_READONLY_FLAG(Ready, false); - THashMap<ui32, TBatch> Batches; - std::set<ui32> WaitBatches; - TIndexedReadData* Owner = nullptr; - void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) { - Y_VERIFY(!ReadyFlag); - Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchNo())); - if (batch && batch->num_rows()) { - ReadyBatches.emplace_back(batch); - } - Owner->OnBatchReady(batchInfo, batch); - if (WaitBatches.empty()) { - ReadyFlag = true; - Owner->OnGranuleReady(*this); - } - } - public: - friend class TIndexedReadData::TBatch; - TGranule(const ui64 granuleId, TIndexedReadData& owner) - : GranuleId(granuleId) - , Owner(&owner) - { - - } - - void FillBlobsForFetch(std::vector<TBlobRange>& result) const { - for (auto&& i : Batches) { - i.second.FillBlobsForFetch(result); - } - } - - TBatch& AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo) { - Y_VERIFY(!ReadyFlag); - WaitBatches.emplace(batchNo); - auto infoEmplace = Batches.emplace(batchNo, TBatch(batchNo, *this, portionInfo)); - Y_VERIFY(infoEmplace.second); - return infoEmplace.first->second; - } - }; + std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed; - void OnGranuleReady(TGranule& granule) { + void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch); + void OnGranuleReady(NIndexedReader::TGranule& granule) { Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second); - Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second); + Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag); } - void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) { + void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) { if (batch && batch->num_rows()) { + ReadMetadata->ReadStats->SelectedRows += batch->num_rows(); if (batchInfo.IsDuplicationsAvailable()) { Y_VERIFY(batchInfo.GetOwner().IsDuplicationsAvailable()); BatchesToDedup.insert(batch.get()); @@ -428,18 +295,17 @@ private: } THashSet<const void*> BatchesToDedup; - THashMap<TBlobRange, TBatch*> IndexedBlobSubscriber; // blobId -> batch - THashMap<ui64, TGranule*> GranulesToOut; + THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; // blobId -> batch + THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut; std::set<ui64> ReadyGranulesAccumulator; THashSet<TBlobRange> IndexedBlobs; ui32 ReadyNotIndexed{0}; THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> OutNotIndexed; // granule -> not indexed to append - TVector<TBatch*> BatchInfo; - THashMap<ui64, TGranule> Granules; - TDeque<TGranule*> GranulesOutOrder; + THashMap<ui64, NIndexedReader::TGranule> Granules; + TDeque<NIndexedReader::TGranule*> GranulesOutOrder; std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription; - std::vector<TGranule*> DetachReadyInOrder(); + std::vector<NIndexedReader::TGranule*> DetachReadyInOrder(); const TIndexInfo& IndexInfo() const { return ReadMetadata->IndexInfo; @@ -448,7 +314,6 @@ private: std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch( const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) const; - NColumnShard::IDataPreparationTask::TPtr AssembleIndexedBatch(const TBatch& batch, NColumnShard::IDataTasksProcessor::TPtr processor); std::shared_ptr<arrow::RecordBatch> MergeNotIndexed( std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const; std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> ReadyToOut(); diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp index 4d4a94cb70..50bdb2ff6a 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portion_info.cpp @@ -31,11 +31,23 @@ TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& arr TPortionInfo::TPreparedBatchData TPortionInfo::PrepareForAssemble(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::Schema>& schema, - const THashMap<TBlobRange, TString>& blobsData) const { + const THashMap<TBlobRange, TString>& blobsData, const std::optional<std::set<ui32>>& columnIds) const { // Correct records order TMap<int, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks + std::vector<std::shared_ptr<arrow::Field>> schemaFields; + + for (auto&& i : schema->fields()) { + if (columnIds && !columnIds->contains(indexInfo.GetColumnId(i->name()))) { + continue; + } + schemaFields.emplace_back(i); + } + for (auto& rec : Records) { + if (columnIds && !columnIds->contains(rec.ColumnId)) { + continue; + } ui32 columnId = rec.ColumnId; TString columnName = indexInfo.GetColumnName(columnId); std::string name(columnName.data(), columnName.size()); @@ -70,7 +82,7 @@ TPortionInfo::TPreparedBatchData TPortionInfo::PrepareForAssemble(const TIndexIn columns.emplace_back(TPreparedColumn(field, std::move(blobs))); } - return TPreparedBatchData(std::move(columns), schema); + return TPreparedBatchData(std::move(columns), std::make_shared<arrow::Schema>(schemaFields)); } void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) { @@ -264,25 +276,12 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(con } std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const { - std::vector<std::shared_ptr<arrow::Field>> fields; - for (auto&& f : Schema->fields()) { - if (!options.CheckFieldAcceptance(f->name())) { - continue; - } - fields.emplace_back(f); - } - if (fields.empty()) { - return nullptr; - } std::vector<std::shared_ptr<arrow::ChunkedArray>> columns; for (auto&& i : Columns) { - if (!options.CheckFieldAcceptance(i.GetName())) { - continue; - } columns.emplace_back(i.Assemble(options.GetRecordsCountLimitDef(Max<ui32>()), !options.IsForwardAssemble())); } - auto table = arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns); + auto table = arrow::Table::Make(Schema, columns); auto res = table->CombineChunks(); Y_VERIFY(res.ok()); return NArrow::ToBatch(*res); diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h index 1937c9be6c..6230256ac8 100644 --- a/ydb/core/tx/columnshard/engines/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portion_info.h @@ -118,6 +118,14 @@ struct TPortionInfo { return {sum, max}; } + ui64 BlobsBytes() const { + ui32 sum = 0; + for (auto& rec : Records) { + sum += rec.BlobRange.Size; + } + return sum; + } + void UpdateRecords(ui64 portion, const THashMap<ui64, ui64>& granuleRemap, const TSnapshot& snapshot) { for (auto& rec : Records) { rec.Portion = portion; @@ -222,30 +230,11 @@ struct TPortionInfo { class TAssembleOptions { private: - YDB_OPT(std::set<std::string>, IncludeColumnNames); - YDB_OPT(std::set<std::string>, ExcludeColumnNames); YDB_OPT(ui32, RecordsCountLimit); YDB_FLAG_ACCESSOR(ForwardAssemble, true); public: - bool CheckFieldAcceptance(const std::string& fieldName) const { - if (!!IncludeColumnNames && !IncludeColumnNames->contains(fieldName)) { - return false; - } - if (!!ExcludeColumnNames && ExcludeColumnNames->contains(fieldName)) { - return false; - } - return true; - } }; - std::vector<std::string> GetColumnsOrder() const { - std::vector<std::string> result; - for (auto&& i : Schema->fields()) { - result.emplace_back(i->name()); - } - return result; - } - size_t GetColumnsCount() const { return Columns.size(); } @@ -262,11 +251,11 @@ struct TPortionInfo { TPreparedBatchData PrepareForAssemble(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::Schema>& schema, - const THashMap<TBlobRange, TString>& data) const; + const THashMap<TBlobRange, TString>& data, const std::optional<std::set<ui32>>& columnIds) const; std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const TIndexInfo& indexInfo, const std::shared_ptr<arrow::Schema>& schema, const THashMap<TBlobRange, TString>& data) const { - return PrepareForAssemble(indexInfo, schema, data).Assemble(); + return PrepareForAssemble(indexInfo, schema, data, {}).Assemble(); } static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array, diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..44908d898b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-engines-reader) +target_compile_options(columnshard-engines-reader PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(columnshard-engines-reader PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + ydb-core-formats +) +target_sources(columnshard-engines-reader PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp +) diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..ba709a0cff --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-engines-reader) +target_compile_options(columnshard-engines-reader PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(columnshard-engines-reader PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + ydb-core-formats +) +target_sources(columnshard-engines-reader PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp +) diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..ba709a0cff --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-engines-reader) +target_compile_options(columnshard-engines-reader PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(columnshard-engines-reader PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + ydb-core-formats +) +target_sources(columnshard-engines-reader PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp +) diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..44908d898b --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(columnshard-engines-reader) +target_compile_options(columnshard-engines-reader PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(columnshard-engines-reader PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + ydb-core-formats +) +target_sources(columnshard-engines-reader PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp +) diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp new file mode 100644 index 0000000000..a51115669d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp @@ -0,0 +1,102 @@ +#include "batch.h" +#include "granule.h" +#include "filter_assembler.h" +#include "postfilter_assembler.h" +#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> + +namespace NKikimr::NOlap::NIndexedReader { + +TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo) + : BatchNo(batchNo) + , Portion(portionInfo.Records[0].Portion) + , Granule(owner.GetGranuleId()) + , Owner(&owner) + , PortionInfo(&portionInfo) { + Y_VERIFY(portionInfo.Records.size()); + + if (portionInfo.CanIntersectOthers()) { + Owner->SetDuplicationsAvailable(true); + if (portionInfo.CanHaveDups()) { + SetDuplicationsAvailable(true); + } + } +} + +NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, NOlap::TReadMetadata::TConstPtr readMetadata) { + Y_VERIFY(WaitIndexed.empty()); + Y_VERIFY(PortionInfo->Produced()); + Y_VERIFY(!FilteredBatch); + auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->IndexInfo, readMetadata->LoadSchema, Data, CurrentColumnIds); + Data.clear(); + if (!Filter) { + return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata, *this, PortionInfo->AllowEarlyFilter(), processor); + } else { + Y_VERIFY(FilterBatch); + return std::make_shared<TAssembleBatch>(std::move(batchConstructor), *this, readMetadata->GetColumnsOrder(), processor); + } +} + +bool TBatch::AskedColumnsAlready(const std::set<ui32>& columnIds) const { + if (AskedColumnIds < columnIds) { + return false; + } + for (auto&& i : columnIds) { + if (!AskedColumnIds.contains(i)) { + return false; + } + } + return true; +} + +void TBatch::Reset(const std::set<ui32>* columnIds) { + if (!columnIds) { + CurrentColumnIds.reset(); + } else { + CurrentColumnIds = *columnIds; + Y_VERIFY(CurrentColumnIds->size()); + } + if (CurrentColumnIds) { + for (auto&& i : *CurrentColumnIds) { + AskedColumnIds.emplace(i); + } + } + Y_VERIFY(WaitIndexed.empty()); + Y_VERIFY(Data.empty()); + WaitingBytes = 0; + for (const NOlap::TColumnRecord& rec : PortionInfo->Records) { + if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) { + continue; + } + Y_VERIFY(WaitIndexed.emplace(rec.BlobRange).second); + Owner->Owner->AddBlobForFetch(rec.BlobRange, *this); + Y_VERIFY(rec.Portion == Portion); + Y_VERIFY(rec.Valid()); + Y_VERIFY(Granule == rec.Granule); + WaitingBytes += rec.BlobRange.Size; + } +} + +void TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch) { + Y_VERIFY(filter); + Y_VERIFY(!Filter); + Y_VERIFY(!FilterBatch); + Filter = filter; + FilterBatch = filterBatch; +} + +void TBatch::InitBatch(std::shared_ptr<arrow::RecordBatch> batch) { + Y_VERIFY(!FilteredBatch); + FilteredBatch = batch; + Owner->OnBatchReady(*this, batch); +} + +bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData) { + if (!WaitIndexed.erase(bRange)) { + return false; + } + WaitingBytes -= bRange.Size; + Data.emplace(bRange, blobData); + return true; +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h new file mode 100644 index 0000000000..7e30a7f26f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/batch.h @@ -0,0 +1,68 @@ +#pragma once +#include "conveyor_task.h" + +#include <ydb/core/formats/arrow_filter.h> +#include <ydb/core/tx/columnshard/blob.h> +#include <ydb/core/tx/columnshard/engines/portion_info.h> + +#include <ydb/library/accessor/accessor.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + +namespace NKikimr::NOlap { +struct TReadMetadata; +} + +namespace NKikimr::NOlap::NIndexedReader { + +class TGranule; + +class TBatch { +private: + YDB_READONLY(ui64, BatchNo, 0); + YDB_READONLY(ui64, Portion, 0); + YDB_READONLY(ui64, Granule, 0); + YDB_READONLY(ui64, WaitingBytes, 0); + + THashSet<TBlobRange> WaitIndexed; + YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch); + YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch); + YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter); + YDB_FLAG_ACCESSOR(DuplicationsAvailable, false); + THashMap<TBlobRange, TString> Data; + TGranule* Owner = nullptr; + const TPortionInfo* PortionInfo = nullptr; + + friend class TGranule; + TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo); + + YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds); + std::set<ui32> AskedColumnIds; +public: + bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData); + bool AskedColumnsAlready(const std::set<ui32>& columnIds) const; + + void Reset(const std::set<ui32>* columnIds); + + void InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch); + void InitBatch(std::shared_ptr<arrow::RecordBatch> batch); + + NColumnShard::IDataTasksProcessor::ITask::TPtr AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, std::shared_ptr<const NOlap::TReadMetadata> readMetadata); + + const THashSet<TBlobRange>& GetWaitingBlobs() const { + return WaitIndexed; + } + + const TGranule& GetOwner() const { + return *Owner; + } + + bool IsFetchingReady() const { + return WaitIndexed.empty(); + } + + const TPortionInfo& GetPortionInfo() const { + return *PortionInfo; + } +}; +} diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp new file mode 100644 index 0000000000..e41445afab --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp @@ -0,0 +1,43 @@ +#include "conveyor_task.h" +#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> + +namespace NKikimr::NColumnShard { + +bool IDataTasksProcessor::ITask::DoExecute() { + if (OwnerOperator && OwnerOperator->IsStopped()) { + return true; + } else { + DataProcessedFlag = true; + return DoExecuteImpl(); + } +} + +bool IDataTasksProcessor::ITask::Apply(NOlap::TIndexedReadData& indexedDataRead) const { + if (OwnerOperator) { + OwnerOperator->ReplyReceived(); + } + return DoApply(indexedDataRead); +} + +bool IDataTasksProcessor::Add(ITask::TPtr task) { + if (IsStopped()) { + return false; + } + if (DoAdd(task)) { + DataProcessorAddDataCounter.Inc(); + return true; + } + return false; +} + + +void TDataTasksProcessorContainer::Add(NOlap::TIndexedReadData& indexedDataRead, IDataTasksProcessor::ITask::TPtr task) { + if (Object) { + Object->Add(task); + } else { + task->Execute(); + task->Apply(indexedDataRead); + } +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h new file mode 100644 index 0000000000..f9d7adacca --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h @@ -0,0 +1,85 @@ +#pragma once +#include <ydb/core/tx/conveyor/usage/abstract.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NOlap { +class TIndexedReadData; +} + +namespace NKikimr::NColumnShard { + +class IDataTasksProcessor; + +class IDataTasksProcessor { +private: + TAtomicCounter DataProcessorAddDataCounter = 0; + void ReplyReceived() { + Y_VERIFY(DataProcessorAddDataCounter.Dec() >= 0); + } +public: + class ITask: public NConveyor::ITask { + private: + std::shared_ptr<IDataTasksProcessor> OwnerOperator; + YDB_READONLY_FLAG(DataProcessed, false); + protected: + virtual bool DoApply(NOlap::TIndexedReadData& indexedDataRead) const = 0; + virtual bool DoExecuteImpl() = 0; + + virtual bool DoExecute() override final; + public: + ITask(std::shared_ptr<IDataTasksProcessor> ownerOperator) + : OwnerOperator(ownerOperator) { + + } + using TPtr = std::shared_ptr<ITask>; + virtual ~ITask() = default; + bool Apply(NOlap::TIndexedReadData& indexedDataRead) const; + }; +protected: + virtual bool DoAdd(ITask::TPtr task) = 0; + std::atomic<bool> Stopped = false; +public: + i64 GetDataCounter() const { + return DataProcessorAddDataCounter.Val(); + } + + void Stop() { + Stopped = true; + } + bool IsStopped() const { + return Stopped; + } + bool InWaiting() const { + return !IsStopped() && DataProcessorAddDataCounter.Val(); + } + + using TPtr = std::shared_ptr<IDataTasksProcessor>; + virtual ~IDataTasksProcessor() = default; + bool Add(ITask::TPtr task); +}; + +class TDataTasksProcessorContainer { +private: + YDB_READONLY_DEF(IDataTasksProcessor::TPtr, Object); +public: + TDataTasksProcessorContainer() = default; + TDataTasksProcessorContainer(IDataTasksProcessor::TPtr object) + : Object(object) + { + + } + + void Stop() { + if (Object) { + Object->Stop(); + } + } + + bool InWaiting() const { + return Object && Object->InWaiting(); + } + + void Add(NOlap::TIndexedReadData& indexedDataRead, IDataTasksProcessor::ITask::TPtr task); +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp new file mode 100644 index 0000000000..76448d7563 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp @@ -0,0 +1,56 @@ +#include "filter_assembler.h" +#include <ydb/core/tx/columnshard/engines/filter.h> + +namespace NKikimr::NOlap::NIndexedReader { + +bool TAssembleFilter::DoExecuteImpl() { + /// @warning The replace logic is correct only in assumption that predicate is applied over a part of ReplaceKey. + /// It's not OK to apply predicate before replacing key duplicates otherwise. + /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here + auto batch = BatchConstructor.Assemble(); + Y_VERIFY(batch); + Y_VERIFY(batch->num_rows()); + const ui32 originalCount = batch->num_rows(); + Filter = std::make_shared<NArrow::TColumnFilter>(NOlap::FilterPortion(batch, *ReadMetadata)); + if (!Filter->Apply(batch)) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", originalCount); + FilteredBatch = nullptr; + return true; + } +#if 1 // optimization + if (ReadMetadata->Program && AllowEarlyFilter) { + auto filter = NOlap::EarlyFilter(batch, ReadMetadata->Program); + Filter->CombineSequential(filter); + if (!filter.Apply(batch)) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", originalCount); + FilteredBatch = nullptr; + return true; + } + } +#else + Y_UNUSED(AllowEarlyFilter); +#endif + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data")("original_count", originalCount)("filtered_count", batch->num_rows()); + + FilteredBatch = batch; + return true; +} + +bool TAssembleFilter::DoApply(TIndexedReadData& owner) const { + TBatch& batch = owner.GetBatchInfo(BatchNo); + batch.InitFilter(Filter, FilteredBatch); + if (batch.AskedColumnsAlready(owner.GetPostFilterColumns()) || !FilteredBatch || FilteredBatch->num_rows() == 0) { + batch.InitBatch(FilteredBatch); + } else { + batch.Reset(&owner.GetPostFilterColumns()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data") + ("filtered_count", FilteredBatch->num_rows()) + ("blobs_count", batch.GetWaitingBlobs().size()) + ("columns_count", batch.GetCurrentColumnIds()->size()) + ("fetch_size", batch.GetWaitingBytes()) + ; + } + return true; +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h new file mode 100644 index 0000000000..7f6b3f7c9c --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h @@ -0,0 +1,37 @@ +#pragma once +#include "conveyor_task.h" + +#include <ydb/core/formats/arrow_filter.h> +#include <ydb/core/tx/columnshard/engines/portion_info.h> +#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + +namespace NKikimr::NOlap::NIndexedReader { + + class TAssembleFilter: public NColumnShard::IDataTasksProcessor::ITask { + private: + using TBase = NColumnShard::IDataTasksProcessor::ITask; + TPortionInfo::TPreparedBatchData BatchConstructor; + std::shared_ptr<arrow::RecordBatch> FilteredBatch; + NOlap::TReadMetadata::TConstPtr ReadMetadata; + std::shared_ptr<NArrow::TColumnFilter> Filter; + const ui32 BatchNo; + bool AllowEarlyFilter = false; + protected: + virtual bool DoApply(TIndexedReadData& owner) const override; + virtual bool DoExecuteImpl() override; + public: + TAssembleFilter(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata, + TBatch& batch, const bool allowEarlyFilter, NColumnShard::IDataTasksProcessor::TPtr processor) + : TBase(processor) + , BatchConstructor(batchConstructor) + , ReadMetadata(readMetadata) + , BatchNo(batch.GetBatchNo()) + , AllowEarlyFilter(allowEarlyFilter) + { + + } + }; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp new file mode 100644 index 0000000000..f92bf1b482 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp @@ -0,0 +1,28 @@ +#include "granule.h" +#include <ydb/core/tx/columnshard/engines/portion_info.h> +#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> + +namespace NKikimr::NOlap::NIndexedReader { + +void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) { + Y_VERIFY(!ReadyFlag); + Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchNo())); + if (batch && batch->num_rows()) { + ReadyBatches.emplace_back(batch); + } + Owner->OnBatchReady(batchInfo, batch); + if (WaitBatches.empty()) { + ReadyFlag = true; + Owner->OnGranuleReady(*this); + } +} + +NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo) { + Y_VERIFY(!ReadyFlag); + WaitBatches.emplace(batchNo); + auto infoEmplace = Batches.emplace(batchNo, TBatch(batchNo, *this, portionInfo)); + Y_VERIFY(infoEmplace.second); + return infoEmplace.first->second; +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h new file mode 100644 index 0000000000..cad0a9318e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/granule.h @@ -0,0 +1,32 @@ +#pragma once +#include "batch.h" + +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/columnshard/engines/portion_info.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + +namespace NKikimr::NOlap::NIndexedReader { + +class TGranule { +private: + YDB_READONLY(ui64, GranuleId, 0); + YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::RecordBatch>>, ReadyBatches); + YDB_FLAG_ACCESSOR(DuplicationsAvailable, false); + YDB_READONLY_FLAG(Ready, false); + THashMap<ui32, TBatch> Batches; + std::set<ui32> WaitBatches; + TIndexedReadData* Owner = nullptr; + void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch); + friend class NIndexedReader::TBatch; +public: + TGranule(const ui64 granuleId, TIndexedReadData& owner) + : GranuleId(granuleId) + , Owner(&owner) { + + } + + TBatch& AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo); +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp new file mode 100644 index 0000000000..2e9ff7520c --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp @@ -0,0 +1,55 @@ +#include "postfilter_assembler.h" +#include "batch.h" +#include <ydb/core/tx/columnshard/engines/indexed_read_data.h> + +namespace NKikimr::NOlap::NIndexedReader { + +bool TAssembleBatch::DoExecuteImpl() { + /// @warning The replace logic is correct only in assumption that predicate is applied over a part of ReplaceKey. + /// It's not OK to apply predicate before replacing key duplicates otherwise. + /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here + + Y_VERIFY(BatchConstructor.GetColumnsCount()); + + TPortionInfo::TPreparedBatchData::TAssembleOptions options; + if (Filter->GetInactiveHeadSize() > Filter->GetInactiveTailSize()) { + options.SetRecordsCountLimit(Filter->Size() - Filter->GetInactiveHeadSize()) + .SetForwardAssemble(false); + Filter->CutInactiveHead(); + } else { + options.SetRecordsCountLimit(Filter->Size() - Filter->GetInactiveTailSize()); + Filter->CutInactiveTail(); + } + + auto addBatch = BatchConstructor.Assemble(options); + Y_VERIFY(addBatch); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN) + ("columns_count", addBatch->num_columns())("num_rows", addBatch->num_rows()); + Y_VERIFY(Filter->Apply(addBatch)); + Y_VERIFY(NArrow::MergeBatchColumns({ FilterBatch, addBatch }, FullBatch, FullColumnsOrder, true)); + + return true; +} + +bool TAssembleBatch::DoApply(TIndexedReadData& owner) const { + TBatch& batch = owner.GetBatchInfo(BatchNo); + batch.InitBatch(FullBatch); + return true; +} + +TAssembleBatch::TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstructor, + TBatch& currentBatch, const std::vector<std::string>& fullColumnsOrder, + NColumnShard::IDataTasksProcessor::TPtr processor) + : TBase(processor) + , BatchConstructor(batchConstructor) + , FullColumnsOrder(fullColumnsOrder) + , Filter(currentBatch.GetFilter()) + , FilterBatch(currentBatch.GetFilterBatch()) + , BatchNo(currentBatch.GetBatchNo()) +{ + Y_VERIFY(currentBatch.GetFilter()); + Y_VERIFY(currentBatch.GetFilterBatch()); + Y_VERIFY(!currentBatch.GetFilteredBatch()); +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h new file mode 100644 index 0000000000..616b079e01 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h @@ -0,0 +1,29 @@ +#pragma once +#include "conveyor_task.h" + +#include <ydb/core/tx/columnshard/engines/portion_info.h> +#include <ydb/core/formats/arrow_filter.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + +namespace NKikimr::NOlap::NIndexedReader { +class TBatch; +class TAssembleBatch: public NColumnShard::IDataTasksProcessor::ITask { +private: + using TBase = NColumnShard::IDataTasksProcessor::ITask; + TPortionInfo::TPreparedBatchData BatchConstructor; + std::shared_ptr<arrow::RecordBatch> FullBatch; + std::vector<std::string> FullColumnsOrder; + + std::shared_ptr<NArrow::TColumnFilter> Filter; + std::shared_ptr<arrow::RecordBatch> FilterBatch; + + const ui32 BatchNo; +protected: + virtual bool DoApply(TIndexedReadData& owner) const override; + virtual bool DoExecuteImpl() override; +public: + TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstructor, + TBatch& currentBatch, const std::vector<std::string>& fullColumnsOrder, NColumnShard::IDataTasksProcessor::TPtr processor); +}; +} diff --git a/ydb/core/tx/columnshard/engines/reader/queue.cpp b/ydb/core/tx/columnshard/engines/reader/queue.cpp new file mode 100644 index 0000000000..59185c12d5 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/queue.cpp @@ -0,0 +1,25 @@ +#include "queue.h" + +namespace NKikimr::NOlap { + +NKikimr::NOlap::TBlobRange TFetchBlobsQueue::pop_front() { + if (!StoppedFlag && IteratorBlobsSequential.size()) { + TBlobRange result = IteratorBlobsSequential.front(); + IteratorBlobsSequential.pop_front(); + return result; + } else { + return TBlobRange(); + } +} + +void TFetchBlobsQueue::emplace_front(const TBlobRange& range) { + Y_VERIFY(!StoppedFlag); + IteratorBlobsSequential.emplace_front(range); +} + +void TFetchBlobsQueue::emplace_back(const TBlobRange& range) { + Y_VERIFY(!StoppedFlag); + IteratorBlobsSequential.emplace_back(range); +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/queue.h b/ydb/core/tx/columnshard/engines/reader/queue.h new file mode 100644 index 0000000000..f8a3da4159 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/queue.h @@ -0,0 +1,35 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/tx/columnshard/blob.h> + +namespace NKikimr::NOlap { + +class TFetchBlobsQueue { +private: + bool StoppedFlag = false; + YDB_ACCESSOR_DEF(std::deque<TBlobRange>, IteratorBlobsSequential); +public: + bool IsStopped() const { + return StoppedFlag; + } + + void Stop() { + IteratorBlobsSequential.clear(); + StoppedFlag = true; + } + + bool empty() const { + return IteratorBlobsSequential.empty(); + } + + size_t size() const { + return IteratorBlobsSequential.size(); + } + + TBlobRange pop_front(); + + void emplace_front(const TBlobRange& range); + void emplace_back(const TBlobRange& range); +}; + +} diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 07d539d9c1..358a52ac01 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -17,13 +17,13 @@ public: NOlap::TReadMetadata::TConstPtr readMetadata, const TInstant& deadline, const TActorId& columnShardActorId, - ui64 requestCookie) + ui64 requestCookie, const TScanCounters& counters) : TabletId(tabletId) , DstActor(dstActor) , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) , Result(std::move(event)) , ReadMetadata(readMetadata) - , IndexedData(ReadMetadata) + , IndexedData(ReadMetadata, IndexedBlobsForFetch, true, counters) , Deadline(deadline) , ColumnShardActorId(columnShardActorId) , RequestCookie(requestCookie) @@ -51,7 +51,7 @@ public: return; // ignore duplicate parts } WaitIndexed.erase(event.BlobRange); - IndexedData.AddIndexed(event.BlobRange, event.Data, nullptr); + IndexedData.AddIndexed(event.BlobRange, event.Data, NColumnShard::TDataTasksProcessorContainer()); } else if (CommittedBlobs.contains(blobId)) { auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0}); if (cmt.empty()) { @@ -128,8 +128,13 @@ public: protoStats->SetIndexPortions(stats->IndexPortions); protoStats->SetIndexBatches(stats->IndexBatches); protoStats->SetNotIndexedBatches(stats->CommittedBatches); - protoStats->SetUsedColumns(stats->UsedColumns); - protoStats->SetDataBytes(stats->DataBytes); + protoStats->SetSchemaColumns(stats->SchemaColumns); + protoStats->SetFilterColumns(stats->FilterColumns); + protoStats->SetAdditionalColumns(stats->AdditionalColumns); + protoStats->SetDataFilterBytes(stats->DataFilterBytes); + protoStats->SetDataAdditionalBytes(stats->DataAdditionalBytes); + protoStats->SetPortionsBytes(stats->PortionsBytes); + protoStats->SetSelectedRows(stats->SelectedRows); } if (Deadline != TInstant::Max()) { @@ -163,8 +168,9 @@ public: WaitCommitted.emplace(cmtBlob, notIndexed); } - auto indexedBlobsForFetch = IndexedData.InitRead(notIndexed); - for (auto&& blobRange : indexedBlobsForFetch) { + IndexedData.InitRead(notIndexed); + while (IndexedBlobsForFetch.size()) { + const auto blobRange = IndexedBlobsForFetch.pop_front(); WaitIndexed.insert(blobRange); IndexedBlobs.emplace(blobRange); } @@ -245,6 +251,7 @@ private: TActorId BlobCacheActorId; std::unique_ptr<TEvColumnShard::TEvReadResult> Result; NOlap::TReadMetadata::TConstPtr ReadMetadata; + NOlap::TFetchBlobsQueue IndexedBlobsForFetch; NOlap::TIndexedReadData IndexedData; TInstant Deadline; TActorId ColumnShardActorId; @@ -278,10 +285,10 @@ IActor* CreateReadActor(ui64 tabletId, NOlap::TReadMetadata::TConstPtr readMetadata, const TInstant& deadline, const TActorId& columnShardActorId, - ui64 requestCookie) + ui64 requestCookie, const TScanCounters& counters) { return new TReadActor(tabletId, dstActor, std::move(event), readMetadata, - deadline, columnShardActorId, requestCookie); + deadline, columnShardActorId, requestCookie, counters); } } diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index eb13dae181..75ef8ee71a 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -906,11 +906,11 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString if (ydbSchema == TTestSchema::YdbSchema()) { if (codec == "" || codec == "lz4") { - UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 50); + UNIT_ASSERT_VALUES_EQUAL(readStats.GetPortionsBytes() / 100000, 50); } else if (codec == "none") { - UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 75); + UNIT_ASSERT_VALUES_EQUAL(readStats.GetPortionsBytes() / 100000, 75); } else if (codec == "zstd") { - UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 26); + UNIT_ASSERT_VALUES_EQUAL(readStats.GetPortionsBytes() / 100000, 26); } else { UNIT_ASSERT(false); } @@ -1138,7 +1138,7 @@ void TestCompactionInGranuleImpl(bool reboots, UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1); UNIT_ASSERT(readStats.GetIndexBatches() > 0); UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); - UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message" + UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message" UNIT_ASSERT(readStats.GetIndexPortions() <= 2); // got compaction RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); @@ -2065,7 +2065,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0); UNIT_ASSERT(readStats.GetIndexBatches() > 0); //UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO - UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message" + UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message" UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 3); // got 2 split compactions //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), x); } @@ -2126,7 +2126,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0); UNIT_ASSERT(readStats.GetIndexBatches() > 0); //UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO - UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message" + UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message" UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1); //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), 1); // TODO: min-max index optimization? } @@ -2186,7 +2186,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0); UNIT_ASSERT(readStats.GetIndexBatches() > 0); //UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO - UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message" + UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message" UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1); //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), 0); // TODO: min-max index optimization? } diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp index a5847ac677..feab71de25 100644 --- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp @@ -635,7 +635,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt if (resRead.GetFinished()) { UNIT_ASSERT(meta.HasReadStats()); auto& readStats = meta.GetReadStats(); - ui64 numBytes = readStats.GetDataBytes(); // compressed bytes in storage + ui64 numBytes = readStats.GetPortionsBytes(); // compressed bytes in storage specRowsBytes.back().second += numBytes; break; } |