aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-04-25 16:04:37 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-04-25 16:04:37 +0300
commit4ef444a11e91cc3be7a395c0d212a2602f20cce6 (patch)
tree7447c8580bc1905d892a0f158ce19a51b05f4823
parent00fe65a36352e32d68310fe30a4223c74391c6a3 (diff)
downloadydb-4ef444a11e91cc3be7a395c0d212a2602f20cce6.tar.gz
dont read all projected/sorted/group columns before early filter (step-0 in program)
-rw-r--r--ydb/core/formats/arrow_filter.cpp2
-rw-r--r--ydb/core/formats/arrow_helpers.cpp11
-rw-r--r--ydb/core/formats/arrow_helpers.h2
-rw-r--r--ydb/core/protos/tx_columnshard.proto11
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.cpp63
-rw-r--r--ydb/core/tx/columnshard/columnshard__index_scan.h21
-rw-r--r--ydb/core/tx/columnshard/columnshard__read.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp40
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.h8
-rw-r--r--ydb/core/tx/columnshard/columnshard__stats_scan.h4
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h7
-rw-r--r--ydb/core/tx/columnshard/counters.cpp14
-rw-r--r--ydb/core/tx/columnshard/counters.h17
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.cpp13
-rw-r--r--ydb/core/tx/columnshard/engines/index_info.h5
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.cpp251
-rw-r--r--ydb/core/tx/columnshard/engines/indexed_read_data.h289
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.cpp31
-rw-r--r--ydb/core/tx/columnshard/engines/portion_info.h31
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt28
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt29
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt29
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.txt17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt28
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.cpp102
-rw-r--r--ydb/core/tx/columnshard/engines/reader/batch.h68
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp43
-rw-r--r--ydb/core/tx/columnshard/engines/reader/conveyor_task.h85
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp56
-rw-r--r--ydb/core/tx/columnshard/engines/reader/filter_assembler.h37
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.cpp28
-rw-r--r--ydb/core/tx/columnshard/engines/reader/granule.h32
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp55
-rw-r--r--ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h29
-rw-r--r--ydb/core/tx/columnshard/engines/reader/queue.cpp25
-rw-r--r--ydb/core/tx/columnshard/engines/reader/queue.h35
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp25
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp14
-rw-r--r--ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp2
48 files changed, 1101 insertions, 502 deletions
diff --git a/ydb/core/formats/arrow_filter.cpp b/ydb/core/formats/arrow_filter.cpp
index 7eaea60dee..28fe9892ea 100644
--- a/ydb/core/formats/arrow_filter.cpp
+++ b/ydb/core/formats/arrow_filter.cpp
@@ -312,7 +312,7 @@ bool TColumnFilter::Apply(std::shared_ptr<arrow::RecordBatch>& batch) {
if (!batch || !batch->num_rows()) {
return false;
}
- Y_VERIFY(Filter.empty() || Count == (size_t)batch->num_rows());
+ Y_VERIFY_S(Filter.empty() || Count == (size_t)batch->num_rows(), Count << " != " << batch->num_rows());
if (IsTotalDenyFilter()) {
batch = batch->Slice(0, 0);
return false;
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index 8e03552359..db47cb93bc 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -266,6 +266,7 @@ std::shared_ptr<arrow::RecordBatch> ExtractColumns(const std::shared_ptr<arrow::
for (auto& field : dstSchema->fields()) {
columns.push_back(srcBatch->GetColumnByName(field->name()));
+ Y_VERIFY(columns.back());
if (!columns.back()->type()->Equals(field->type())) {
columns.back() = {};
}
@@ -842,7 +843,8 @@ bool ReserveData(arrow::ArrayBuilder& builder, const size_t size) {
return true;
}
-bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, const std::vector<std::string>& columnsOrder) {
+bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result,
+ const std::vector<std::string>& columnsOrder, const bool orderFieldsAreNecessary) {
if (batches.empty()) {
result = nullptr;
return true;
@@ -855,6 +857,7 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b
std::vector<std::shared_ptr<arrow::Array>> columns;
std::map<std::string, ui32> fieldNames;
for (auto&& i : batches) {
+ Y_VERIFY(i);
for (auto&& f : i->schema()->fields()) {
if (!fieldNames.emplace(f->name(), fields.size()).second) {
return false;
@@ -877,7 +880,11 @@ bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& b
std::vector<std::shared_ptr<arrow::Array>> columnsOrdered;
for (auto&& i : columnsOrder) {
auto it = fieldNames.find(i);
- Y_VERIFY(it != fieldNames.end());
+ if (orderFieldsAreNecessary) {
+ Y_VERIFY(it != fieldNames.end());
+ } else if (it == fieldNames.end()) {
+ continue;
+ }
fieldsOrdered.emplace_back(fields[it->second]);
columnsOrdered.emplace_back(columns[it->second]);
}
diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h
index b4f55f0ebf..3d6f060171 100644
--- a/ydb/core/formats/arrow_helpers.h
+++ b/ydb/core/formats/arrow_helpers.h
@@ -109,7 +109,7 @@ std::shared_ptr<arrow::UInt64Array> MakePermutation(int size, bool reverse = fal
TVector<TString> ColumnNames(const std::shared_ptr<arrow::Schema>& schema);
size_t LowerBound(const std::vector<TRawReplaceKey>& batchKeys, const TReplaceKey& key, size_t offset = 0);
bool ReserveData(arrow::ArrayBuilder& builder, const size_t size);
-bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, const std::vector<std::string>& columnsOrder = {});
+bool MergeBatchColumns(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, std::shared_ptr<arrow::RecordBatch>& result, const std::vector<std::string>& columnsOrder = {}, const bool orderFieldsAreNecessary = true);
std::shared_ptr<arrow::UInt64Array> MakeSortPermutation(const std::shared_ptr<arrow::RecordBatch>& batch,
const std::shared_ptr<arrow::Schema>& sortingKey);
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto
index 3760e8b8e0..4e5ea10006 100644
--- a/ydb/core/protos/tx_columnshard.proto
+++ b/ydb/core/protos/tx_columnshard.proto
@@ -35,8 +35,15 @@ message TReadStats {
optional uint64 IndexPortions = 5;
optional uint64 IndexBatches = 6;
optional uint64 NotIndexedBatches = 7;
- optional uint32 UsedColumns = 8;
- optional uint64 DataBytes = 9;
+ optional uint32 SchemaColumns = 8;
+ optional uint64 PortionsBytes = 9;
+ optional uint64 DataFilterBytes = 10;
+ optional uint64 DataAdditionalBytes = 11;
+
+ optional uint32 FilterColumns = 12;
+ optional uint32 AdditionalColumns = 13;
+
+ optional uint32 SelectedRows = 14;
}
message TLogicalMetadata {
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
index 6f852e9ea2..434628715d 100644
--- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt
@@ -70,6 +70,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
index 1cf2e0e0f6..8921b5bc76 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt
@@ -71,6 +71,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
index 1cf2e0e0f6..8921b5bc76 100644
--- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt
@@ -71,6 +71,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp
diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
index 6f852e9ea2..434628715d 100644
--- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt
@@ -70,6 +70,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_costs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_private_events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/counters.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/defs.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
index 0838327fb7..065929dec5 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp
@@ -1,18 +1,27 @@
#include "columnshard__index_scan.h"
+#include <ydb/core/tx/conveyor/usage/service.h>
+#include <ydb/core/tx/conveyor/usage/events.h>
namespace NKikimr::NColumnShard {
-TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata)
+TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata,
+ NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TScanCounters& scanCounters)
: ReadMetadata(readMetadata)
- , IndexedData(ReadMetadata)
+ , IndexedData(ReadMetadata, FetchBlobsQueue, false, scanCounters)
+ , DataTasksProcessor(processor)
+ , ScanCounters(scanCounters)
{
ui32 batchNo = 0;
for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i, ++batchNo) {
const auto& cmtBlob = ReadMetadata->CommittedBlobs[i];
WaitCommitted.emplace(cmtBlob, batchNo);
}
- std::vector<TBlobRange> indexedBlobs = IndexedData.InitRead(batchNo, true);
-
+ // Read all committed blobs
+ for (const auto& cmtBlob : ReadMetadata->CommittedBlobs) {
+ auto& blobId = cmtBlob.BlobId;
+ FetchBlobsQueue.emplace_back(TBlobRange(blobId, 0, blobId.BlobSize()));
+ }
+ IndexedData.InitRead(batchNo, true);
// Add cached batches without read
for (auto& [blobId, batch] : ReadMetadata->CommittedBatches) {
auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 });
@@ -23,25 +32,17 @@ TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstP
IndexedData.AddNotIndexed(batchNo, batch, cmtBlob.PlanStep, cmtBlob.TxId);
}
- // Read all committed blobs
- for (const auto& cmtBlob : ReadMetadata->CommittedBlobs) {
- auto& blobId = cmtBlob.BlobId;
- BlobsToRead.push_back(TBlobRange(blobId, 0, blobId.BlobSize()));
- }
-
Y_VERIFY(ReadMetadata->IsSorted());
- for (auto&& blobRange : indexedBlobs) {
- BlobsToRead.emplace_back(blobRange);
+ if (ReadMetadata->Empty()) {
+ FetchBlobsQueue.Stop();
}
-
- IsReadFinished = ReadMetadata->Empty();
}
-void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data, IDataTasksProcessor::TPtr processor) {
+void TColumnShardScanIterator::AddData(const TBlobRange& blobRange, TString data) {
const auto& blobId = blobRange.BlobId;
if (IndexedData.IsIndexedBlob(blobRange)) {
- IndexedData.AddIndexed(blobRange, data, processor);
+ IndexedData.AddIndexed(blobRange, data, DataTasksProcessor);
} else {
auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{ blobId, 0, 0 });
if (cmt.empty()) {
@@ -67,12 +68,7 @@ NKikimr::NOlap::TPartialReadResult TColumnShardScanIterator::GetBatch() {
}
NKikimr::NColumnShard::TBlobRange TColumnShardScanIterator::GetNextBlobToRead() {
- if (IsReadFinished || NextBlobIdxToRead == BlobsToRead.size()) {
- return TBlobRange();
- }
- const auto& blob = BlobsToRead[NextBlobIdxToRead];
- ++NextBlobIdxToRead;
- return blob;
+ return FetchBlobsQueue.pop_front();
}
void TColumnShardScanIterator::FillReadyResults() {
@@ -96,14 +92,31 @@ void TColumnShardScanIterator::FillReadyResults() {
}
if (limitLeft == 0) {
+ DataTasksProcessor.Stop();
WaitCommitted.clear();
IndexedData.Abort();
- IsReadFinished = true;
+ FetchBlobsQueue.Stop();
}
- if (WaitCommitted.empty() && !IndexedData.IsInProgress() && NextBlobIdxToRead == BlobsToRead.size()) {
- IsReadFinished = true;
+ if (WaitCommitted.empty() && !IndexedData.IsInProgress() && FetchBlobsQueue.empty()) {
+ DataTasksProcessor.Stop();
+ FetchBlobsQueue.Stop();
+ }
+}
+
+bool TColumnShardScanIterator::HasWaitingTasks() const {
+ return DataTasksProcessor.InWaiting();
+}
+
+TColumnShardScanIterator::~TColumnShardScanIterator() {
+ ReadMetadata->ReadStats->PrintToLog();
+}
+
+void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) {
+ if (!task->IsDataProcessed()) {
+ return;
}
+ task->Apply(IndexedData);
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h
index 247abe95ed..3450928dfa 100644
--- a/ydb/core/tx/columnshard/columnshard__index_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__index_scan.h
@@ -28,27 +28,28 @@ using NOlap::TUnifiedBlobId;
using NOlap::TBlobRange;
class TColumnShardScanIterator : public TScanIteratorBase {
+private:
NOlap::TReadMetadata::TConstPtr ReadMetadata;
+ NOlap::TFetchBlobsQueue FetchBlobsQueue;
NOlap::TIndexedReadData IndexedData;
std::unordered_map<NOlap::TCommittedBlob, ui32, THash<NOlap::TCommittedBlob>> WaitCommitted;
- TVector<TBlobRange> BlobsToRead;
- ui64 NextBlobIdxToRead = 0;
TDeque<NOlap::TPartialReadResult> ReadyResults;
- bool IsReadFinished = false;
ui64 ItemsRead = 0;
const i64 MaxRowsInBatch = 5000;
-
+ NColumnShard::TDataTasksProcessorContainer DataTasksProcessor;
+ NColumnShard::TScanCounters ScanCounters;
public:
- TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata);
+ TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TScanCounters& scanCounters);
+ ~TColumnShardScanIterator();
- virtual void Apply(IDataPreparationTask::TPtr task) override {
- task->Apply(IndexedData);
- }
+ virtual void Apply(IDataTasksProcessor::ITask::TPtr task) override;
+
+ virtual bool HasWaitingTasks() const override;
- void AddData(const TBlobRange& blobRange, TString data, IDataTasksProcessor::TPtr processor) override;
+ void AddData(const TBlobRange& blobRange, TString data) override;
bool Finished() const override {
- return IsReadFinished && ReadyResults.empty();
+ return FetchBlobsQueue.IsStopped() && ReadyResults.empty();
}
NOlap::TPartialReadResult GetBatch() override;
diff --git a/ydb/core/tx/columnshard/columnshard__read.cpp b/ydb/core/tx/columnshard/columnshard__read.cpp
index 3857498fe1..bcf66fbf87 100644
--- a/ydb/core/tx/columnshard/columnshard__read.cpp
+++ b/ydb/core/tx/columnshard/columnshard__read.cpp
@@ -130,7 +130,7 @@ void TTxRead::Complete(const TActorContext& ctx) {
TInstant deadline = TInstant::Max(); // TODO
ctx.Register(CreateReadActor(Self->TabletID(), Ev->Get()->GetSource(),
- std::move(Result), ReadMetadata, deadline, Self->SelfId(), requestCookie));
+ std::move(Result), ReadMetadata, deadline, Self->SelfId(), requestCookie, Self->ReadCounters));
}
}
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 93da0790f8..b408adb99a 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -54,14 +54,13 @@ class TLocalDataTasksProcessor: public IDataTasksProcessor {
private:
const TActorIdentity OwnerActorId;
protected:
- virtual bool DoAdd(IDataPreparationTask::TPtr task) override {
+ virtual bool DoAdd(IDataTasksProcessor::ITask::TPtr task) override {
OwnerActorId.Send(NConveyor::MakeServiceId(OwnerActorId.NodeId()), new NConveyor::TEvExecution::TEvNewTask(task));
return true;
}
public:
TLocalDataTasksProcessor(const TActorIdentity& ownerActorId)
- : OwnerActorId(ownerActorId)
- {
+ : OwnerActorId(ownerActorId) {
}
};
@@ -75,7 +74,7 @@ public:
TColumnShardScan(const TActorId& columnShardActorId, const TActorId& scanComputeActorId,
ui32 scanId, ui64 txId, ui32 scanGen, ui64 requestCookie,
ui64 tabletId, TDuration timeout, TVector<TTxScan::TReadMetadataPtr>&& readMetadataList,
- NKikimrTxDataShard::EScanDataFormat dataFormat)
+ NKikimrTxDataShard::EScanDataFormat dataFormat, const TScanCounters& scanCountersPool)
: ColumnShardActorId(columnShardActorId)
, ScanComputeActorId(scanComputeActorId)
, BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId())
@@ -88,6 +87,7 @@ public:
, ReadMetadataRanges(std::move(readMetadataList))
, ReadMetadataIndex(0)
, Deadline(TInstant::Now() + (timeout ? timeout + SCAN_HARD_TIMEOUT_GAP : SCAN_HARD_TIMEOUT))
+ , ScanCountersPool(scanCountersPool)
{
KeyYqlSchema = ReadMetadataRanges[ReadMetadataIndex]->GetKeyYqlSchema();
}
@@ -100,18 +100,23 @@ public:
new IEventHandle(SelfId(), SelfId(), new TEvents::TEvWakeup));
Y_VERIFY(!ScanIterator);
- ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan();
+ ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(MakeTasksProcessor(), ScanCountersPool);
// propagate self actor id // TODO: FlagSubscribeOnSession ?
Send(ScanComputeActorId, new TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen), IEventHandle::FlagTrackDelivery);
- if (NConveyor::TServiceOperator::IsEnabled()) {
- DataTasksProcessor = std::make_shared<TLocalDataTasksProcessor>(SelfId());
- }
Become(&TColumnShardScan::StateScan);
}
private:
+ IDataTasksProcessor::TPtr MakeTasksProcessor() const {
+ if (NConveyor::TServiceOperator::IsEnabled()) {
+ return std::make_shared<TLocalDataTasksProcessor>(SelfId());
+ } else {
+ return nullptr;
+ }
+ }
+
STATEFN(StateScan) {
auto g = Stats.MakeGuard("processing");
switch (ev->GetTypeRewrite()) {
@@ -161,19 +166,17 @@ private:
}
void HandleScan(NConveyor::TEvExecution::TEvTaskProcessedResult::TPtr& ev) {
- TaskResultsCounter.Inc();
auto g = Stats.MakeGuard("task_result");
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_COLUMNSHARD_SCAN,
"Scan " << ScanActorId << " got ScanDataAck" << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId);
if (ev->Get()->GetErrorMessage()) {
ALS_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)
<< "Scan " << ScanActorId << " got finished error " << ev->Get()->GetErrorMessage() << " txId: " << TxId << " scanId: " << ScanId << " gen: " << ScanGen << " tablet: " << TabletId;
- DataTasksProcessor->Stop();
SendScanError(ev->Get()->GetErrorMessage());
Finish();
} else {
- auto t = dynamic_pointer_cast<IDataPreparationTask>(ev->Get()->GetResult());
- Y_VERIFY(t);
+ auto t = static_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult());
+ Y_VERIFY_DEBUG(dynamic_pointer_cast<IDataTasksProcessor::ITask>(ev->Get()->GetResult()));
ScanIterator->Apply(t);
}
ContinueProcessing();
@@ -233,7 +236,7 @@ private:
if (ScanIterator) {
{
auto g = Stats.MakeGuard("AddData");
- ScanIterator->AddData(blobRange, event.Data, DataTasksProcessor);
+ ScanIterator->AddData(blobRange, event.Data);
}
ContinueProcessing();
}
@@ -363,7 +366,7 @@ private:
// * we have finished scanning ALL the ranges
// * or there is an in-flight blob read or ScanData message for which
// we will get a reply and will be able to proceed further
- if (!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads != 0 || (DataTasksProcessor && DataTasksProcessor->GetDataCounter() > TaskResultsCounter.Val())) {
+ if (!ScanIterator || !ChunksLimiter.HasMore() || InFlightReads != 0 || ScanIterator->HasWaitingTasks()) {
return;
}
}
@@ -443,8 +446,7 @@ private:
return Finish();
}
- ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan();
-
+ ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(MakeTasksProcessor(), ScanCountersPool);
// Used in TArrowToYdbConverter
ResultYqlSchema.clear();
}
@@ -587,6 +589,7 @@ private:
const TSerializedTableRange TableRange;
const TSmallVec<bool> SkipNullKeys;
const TInstant Deadline;
+ TScanCounters ScanCountersPool;
TActorId TimeoutActorId;
TMaybe<TString> AbortReason;
@@ -597,9 +600,6 @@ private:
i64 InFlightReadBytes = 0;
bool Finished = false;
- TAtomicCounter TaskResultsCounter = 0;
- IDataTasksProcessor::TPtr DataTasksProcessor;
-
class TBlobStats {
private:
ui64 PartsCount = 0;
@@ -1009,7 +1009,7 @@ void TTxScan::Complete(const TActorContext& ctx) {
Self->IncCounter(COUNTER_READ_INDEX_BYTES, statsDelta.Bytes);
auto scanActor = ctx.Register(new TColumnShardScan(Self->SelfId(), scanComputeActor,
- scanId, txId, scanGen, requestCookie, Self->TabletID(), timeout, std::move(ReadMetadataRanges), dataFormat));
+ scanId, txId, scanGen, requestCookie, Self->TabletID(), timeout, std::move(ReadMetadataRanges), dataFormat, Self->ScanCounters));
LOG_S_DEBUG("TTxScan starting " << scanActor
<< " txId: " << txId
diff --git a/ydb/core/tx/columnshard/columnshard__scan.h b/ydb/core/tx/columnshard/columnshard__scan.h
index f462aedc46..54fe14916f 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.h
+++ b/ydb/core/tx/columnshard/columnshard__scan.h
@@ -1,7 +1,8 @@
#pragma once
#include "blob_cache.h"
-#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
+#include "engines/reader/conveyor_task.h"
+#include "engines/indexed_read_data.h"
namespace NKikimr::NColumnShard {
@@ -9,10 +10,11 @@ class TScanIteratorBase {
public:
virtual ~TScanIteratorBase() = default;
- virtual void Apply(IDataPreparationTask::TPtr /*processor*/) {
+ virtual void Apply(IDataTasksProcessor::ITask::TPtr /*processor*/) {
}
- virtual void AddData(const NBlobCache::TBlobRange& /*blobRange*/, TString /*data*/, IDataTasksProcessor::TPtr /*processor*/) {}
+ virtual void AddData(const NBlobCache::TBlobRange& /*blobRange*/, TString /*data*/) {}
+ virtual bool HasWaitingTasks() const = 0;
virtual bool Finished() const = 0;
virtual NOlap::TPartialReadResult GetBatch() = 0;
virtual NBlobCache::TBlobRange GetNextBlobToRead() { return NBlobCache::TBlobRange(); }
diff --git a/ydb/core/tx/columnshard/columnshard__stats_scan.h b/ydb/core/tx/columnshard/columnshard__stats_scan.h
index 6a422f136d..bce22aa077 100644
--- a/ydb/core/tx/columnshard/columnshard__stats_scan.h
+++ b/ydb/core/tx/columnshard/columnshard__stats_scan.h
@@ -43,6 +43,10 @@ public:
{
}
+ virtual bool HasWaitingTasks() const override {
+ return false;
+ }
+
bool Finished() const override {
return IndexStats.empty();
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index a504e6e127..592adb0618 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -113,6 +113,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, TTabletExecutedFlat(info, tablet, nullptr)
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
, InsertTable(std::make_unique<NOlap::TInsertTable>())
+ , ReadCounters("Read")
+ , ScanCounters("Scan")
{
TabletCountersPtr.reset(new TProtobufTabletCounters<
ESimpleCounters_descriptor,
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index b8e53757a9..9a5c0e8ccf 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -1,5 +1,6 @@
#pragma once
#include "defs.h"
+#include "counters.h"
#include "columnshard.h"
#include "columnshard_common.h"
#include "columnshard_ttl.h"
@@ -38,7 +39,7 @@ IActor* CreateReadActor(ui64 tabletId,
NOlap::TReadMetadata::TConstPtr readMetadata,
const TInstant& deadline,
const TActorId& columnShardActorId,
- ui64 requestCookie);
+ ui64 requestCookie, const TScanCounters& counters);
IActor* CreateColumnShardScan(const TActorId& scanComputeActor, ui32 scanId, ui64 txId);
IActor* CreateExportActor(const ui64 tabletId, const TActorId& dstActor, TAutoPtr<TEvPrivate::TEvExport> ev);
@@ -372,7 +373,7 @@ private:
TInstant LastStatsReport;
TActorId IndexingActor; // It's logically bounded to 1: we move each portion of data to multiple indices.
- TActorId CompactionActor; // It's memory bounded to 1: we have no memory for parallel compation.
+ TActorId CompactionActor; // It's memory bounded to 1: we have no memory for parallel compaction.
TActorId EvictionActor;
TActorId StatsReportPipe;
@@ -382,6 +383,8 @@ private:
std::unique_ptr<NTabletPipe::IClientCache> PipeClientCache;
std::unique_ptr<NOlap::TInsertTable> InsertTable;
TBatchCache BatchCache;
+ TScanCounters ReadCounters;
+ TScanCounters ScanCounters;
THashMap<ui64, TBasicTxInfo> BasicTxInfo;
TSet<TDeadlineQueueItem> DeadlineQueue;
diff --git a/ydb/core/tx/columnshard/counters.cpp b/ydb/core/tx/columnshard/counters.cpp
new file mode 100644
index 0000000000..dcf1476e52
--- /dev/null
+++ b/ydb/core/tx/columnshard/counters.cpp
@@ -0,0 +1,14 @@
+#include "counters.h"
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/counters.h>
+
+namespace NKikimr::NColumnShard {
+
+TScanCounters::TScanCounters(const TString& module) {
+ ::NMonitoring::TDynamicCounterPtr subGroup = GetServiceCounters(AppData()->Counters, "tablets")->GetSubgroup("subsystem", "columnshard");
+ PortionBytes = subGroup->GetCounter(module + "/PortionBytes", true);
+ FilterBytes = subGroup->GetCounter(module + "/FilterBytes", true);
+ PostFilterBytes = subGroup->GetCounter(module + "/PostFilterBytes", true);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/counters.h b/ydb/core/tx/columnshard/counters.h
new file mode 100644
index 0000000000..357def8e37
--- /dev/null
+++ b/ydb/core/tx/columnshard/counters.h
@@ -0,0 +1,17 @@
+#pragma once
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NColumnShard {
+
+class TScanCounters {
+private:
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PortionBytes);
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, FilterBytes);
+ YDB_READONLY_DEF(NMonitoring::TDynamicCounters::TCounterPtr, PostFilterBytes);
+public:
+ TScanCounters(const TString& module = "Scan");
+};
+
+
+}
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
index 398cfd3d0c..a7adbff89e 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(reader)
add_subdirectory(ut)
add_library(tx-columnshard-engines)
@@ -22,6 +23,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
ydb-core-scheme
ydb-core-tablet
ydb-core-tablet_flat
+ columnshard-engines-reader
udf-service-exception_policy
)
target_sources(tx-columnshard-engines PRIVATE
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
index ef42c70332..39ca9cc7e6 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(reader)
add_subdirectory(ut)
add_library(tx-columnshard-engines)
@@ -23,6 +24,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
ydb-core-scheme
ydb-core-tablet
ydb-core-tablet_flat
+ columnshard-engines-reader
udf-service-exception_policy
)
target_sources(tx-columnshard-engines PRIVATE
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
index ef42c70332..39ca9cc7e6 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(reader)
add_subdirectory(ut)
add_library(tx-columnshard-engines)
@@ -23,6 +24,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
ydb-core-scheme
ydb-core-tablet
ydb-core-tablet_flat
+ columnshard-engines-reader
udf-service-exception_policy
)
target_sources(tx-columnshard-engines PRIVATE
diff --git a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
index 398cfd3d0c..a7adbff89e 100644
--- a/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/engines/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,7 @@
# original buildsystem will not be accepted.
+add_subdirectory(reader)
add_subdirectory(ut)
add_library(tx-columnshard-engines)
@@ -22,6 +23,7 @@ target_link_libraries(tx-columnshard-engines PUBLIC
ydb-core-scheme
ydb-core-tablet
ydb-core-tablet_flat
+ columnshard-engines-reader
udf-service-exception_policy
)
target_sources(tx-columnshard-engines PRIVATE
diff --git a/ydb/core/tx/columnshard/engines/index_info.cpp b/ydb/core/tx/columnshard/engines/index_info.cpp
index ed031498d5..e2a041af03 100644
--- a/ydb/core/tx/columnshard/engines/index_info.cpp
+++ b/ydb/core/tx/columnshard/engines/index_info.cpp
@@ -46,10 +46,11 @@ std::shared_ptr<arrow::RecordBatch> TIndexInfo::AddSpecialColumns(
}
std::shared_ptr<arrow::Schema> TIndexInfo::ArrowSchemaSnapshot() {
- return std::make_shared<arrow::Schema>(arrow::FieldVector{
+ static std::shared_ptr<arrow::Schema> result = std::make_shared<arrow::Schema>(arrow::FieldVector{
arrow::field(SPEC_COL_PLAN_STEP, arrow::uint64()),
arrow::field(SPEC_COL_TX_ID, arrow::uint64())
});
+ return result;
}
bool TIndexInfo::IsSpecialColumn(const arrow::Field& field) {
@@ -59,7 +60,13 @@ bool TIndexInfo::IsSpecialColumn(const arrow::Field& field) {
}
-ui32 TIndexInfo::GetColumnId(const TString& name) const {
+ui32 TIndexInfo::GetColumnId(const std::string& name) const {
+ auto id = GetColumnIdOptional(name);
+ Y_VERIFY(!!id);
+ return *id;
+}
+
+std::optional<ui32> TIndexInfo::GetColumnIdOptional(const std::string& name) const {
const auto ni = ColumnNames.find(name);
if (ni == ColumnNames.end()) {
@@ -68,7 +75,7 @@ ui32 TIndexInfo::GetColumnId(const TString& name) const {
} else if (name == SPEC_COL_TX_ID) {
return ui32(ESpecialColumn::TX_ID);
}
- Y_VERIFY(false);
+ return {};
}
return ni->second;
diff --git a/ydb/core/tx/columnshard/engines/index_info.h b/ydb/core/tx/columnshard/engines/index_info.h
index 293cb0cbaa..d9b96bc902 100644
--- a/ydb/core/tx/columnshard/engines/index_info.h
+++ b/ydb/core/tx/columnshard/engines/index_info.h
@@ -58,7 +58,8 @@ public:
}
/// Returns an id of the column located by name. The name should exists in the schema.
- ui32 GetColumnId(const TString& name) const;
+ ui32 GetColumnId(const std::string& name) const;
+ std::optional<ui32> GetColumnIdOptional(const std::string& name) const;
/// Returns a name of the column located by id.
TString GetColumnName(ui32 id, bool required = true) const;
@@ -80,7 +81,7 @@ public:
return KeyColumns[0];
}
- // Sorting key: colud be less or greater then traditional PK
+ // Sorting key: could be less or greater then traditional PK
// It could be empty for append-only tables. It could be greater then PK for better columns compression.
// If sorting key includes uniqueness key as a prefix we are able to use MergeSort for REPLACE.
const std::shared_ptr<arrow::Schema>& GetSortingKey() const { return SortingKey; }
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
index 9af8f09c5e..ee8e9f68b2 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp
@@ -113,145 +113,56 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v
}
-TIndexedReadData::TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo)
- : BatchNo(batchNo)
- , Portion(portionInfo.Records[0].Portion)
- , Granule(owner.GetGranuleId())
- , Owner(&owner)
- , PortionInfo(&portionInfo)
-{
- for (const NOlap::TColumnRecord& rec : portionInfo.Records) {
- Y_VERIFY(WaitIndexed.emplace(rec.BlobRange).second);
- Y_VERIFY(rec.Portion == Portion);
- Y_VERIFY(rec.Valid());
- Y_VERIFY(Granule == rec.Granule);
- }
-
- if (portionInfo.CanIntersectOthers()) {
- Owner->SetDuplicationsAvailable(true);
- if (portionInfo.CanHaveDups()) {
- SetDuplicationsAvailable(true);
- }
- }
+std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const {
+ return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), tasksProcessor, scanCounters);
}
-NColumnShard::IDataPreparationTask::TPtr TIndexedReadData::TBatch::AssembleIndexedBatch(NColumnShard::IDataTasksProcessor::TPtr processor, NOlap::TReadMetadata::TConstPtr readMetadata) {
- Y_VERIFY(PortionInfo->Produced());
-
- auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->IndexInfo, readMetadata->LoadSchema, Data);
-
- for (auto& rec : PortionInfo->Records) {
- auto& blobRange = rec.BlobRange;
- Data.erase(blobRange);
+std::set<ui32> TReadMetadata::GetEarlyFilterColumnIds(const bool noTrivial) const {
+ std::set<ui32> result;
+ if (LessPredicate) {
+ for (auto&& i : LessPredicate->ColumnNames()) {
+ result.emplace(IndexInfo.GetColumnId(i));
+ }
}
-
- return std::make_shared<TAssembledNotFiltered>(std::move(batchConstructor), readMetadata, GetBatchNo(), PortionInfo->AllowEarlyFilter(), processor);
-}
-
-bool TIndexedReadData::TAssembledNotFiltered::DoExecuteImpl() {
- /// @warning The replace logic is correct only in assumption that predicate is applied over a part of ReplaceKey.
- /// It's not OK to apply predicate before replacing key duplicates otherwise.
- /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
- const std::set<std::string> columnNames = ReadMetadata->GetFilterColumns(true);
- if (columnNames.empty()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("method", "TAssembledNotFiltered::DoExecuteImpl")
- ("event", "skip_data_no_columns")("columns_count", BatchConstructor.GetColumnsCount());
- FilteredBatch = BatchConstructor.Assemble();
- return true;
- }
- TPortionInfo::TPreparedBatchData::TAssembleOptions options;
- options.SetIncludeColumnNames(columnNames);
- auto batch = BatchConstructor.Assemble(options);
- const size_t ignoredColumnsCount = BatchConstructor.GetColumnsCount() - batch->schema()->num_fields();
- Y_VERIFY(batch);
-
- NArrow::TColumnFilter globalFilter = NOlap::FilterPortion(batch, *ReadMetadata);
- if (!globalFilter.Apply(batch)) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("method", "TAssembledNotFiltered::DoExecuteImpl")
- ("event", "skip_data")("columns_count", ignoredColumnsCount);
- FilteredBatch = nullptr;
- return true;
+ if (GreaterPredicate) {
+ for (auto&& i : GreaterPredicate->ColumnNames()) {
+ result.emplace(IndexInfo.GetColumnId(i));
+ }
}
-#if 1 // optimization
- if (ReadMetadata->Program && AllowEarlyFilter) {
- auto filter = NOlap::EarlyFilter(batch, ReadMetadata->Program);
- globalFilter.CombineSequential(filter);
- if (!filter.Apply(batch)) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("method", "TAssembledNotFiltered::DoExecuteImpl")
- ("event", "skip_data")("columns_count", ignoredColumnsCount);
- FilteredBatch = nullptr;
- return true;
+ if (Program) {
+ for (auto&& i : Program->GetEarlyFilterColumns()) {
+ auto id = IndexInfo.GetColumnIdOptional(i);
+ if (id) {
+ result.emplace(*id);
+ }
}
}
-#else
- Y_UNUSED(AllowEarlyFilter);
-#endif
-
- if (BatchConstructor.GetColumnsCount() > (size_t)batch->schema()->num_fields()) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("method", "TAssembledNotFiltered::DoExecuteImpl")
- ("event", "not_skip_data")("columns_count", ignoredColumnsCount)("num_rows", batch->num_rows());
- TPortionInfo::TPreparedBatchData::TAssembleOptions options;
- options.SetExcludeColumnNames(columnNames);
-
- if (globalFilter.GetInactiveHeadSize() > globalFilter.GetInactiveTailSize()) {
- options.SetRecordsCountLimit(globalFilter.Size() - globalFilter.GetInactiveHeadSize())
- .SetForwardAssemble(false);
- globalFilter.CutInactiveHead();
- } else {
- options.SetRecordsCountLimit(globalFilter.Size() - globalFilter.GetInactiveTailSize());
- globalFilter.CutInactiveTail();
+ if (noTrivial && result.empty()) {
+ return result;
+ }
+ if (PlanStep) {
+ auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
+ for (auto&& i : snapSchema->fields()) {
+ result.emplace(IndexInfo.GetColumnId(i->name()));
}
- std::shared_ptr<arrow::RecordBatch> fBatch = BatchConstructor.Assemble(options);
- Y_VERIFY(globalFilter.Apply(fBatch));
- Y_VERIFY(NArrow::MergeBatchColumns({ batch, fBatch }, batch, BatchConstructor.GetColumnsOrder()));
}
-
- FilteredBatch = batch;
- return true;
-}
-
-bool TIndexedReadData::TAssembledNotFiltered::DoApply(TIndexedReadData& owner) const {
- owner.PortionFinished(BatchNo, FilteredBatch);
- return true;
-}
-
-std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan() const {
- return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this());
+ return result;
}
-std::set<std::string> TReadMetadata::GetFilterColumns(const bool early) const {
- std::set<std::string> result;
+std::set<ui32> TReadMetadata::GetUsedColumnIds() const {
+ std::set<ui32> result;
if (PlanStep) {
auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
for (auto&& i : snapSchema->fields()) {
- result.emplace(i->name());
+ result.emplace(IndexInfo.GetColumnId(i->name()));
}
}
- if (LessPredicate) {
- for (auto&& i : LessPredicate->ColumnNames()) {
- result.emplace(i);
- }
- }
- if (GreaterPredicate) {
- for (auto&& i : GreaterPredicate->ColumnNames()) {
- result.emplace(i);
- }
- }
- if (Program) {
- if (!early) {
- for (auto&& i : Program->SourceColumns) {
- result.emplace(i.second);
- }
- } else {
- for (auto&& i : Program->GetEarlyFilterColumns()) {
- result.emplace(i);
- }
- }
+ for (auto&& f : LoadSchema->fields()) {
+ result.emplace(IndexInfo.GetColumnId(f->name()));
}
return result;
}
-
TVector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetResultYqlSchema() const {
return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, ResultColumnIds);
}
@@ -260,12 +171,25 @@ TVector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetKeyYqlSch
return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, NColumnShard::PrimaryIndexStatsSchema.KeyColumns);
}
-std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan() const {
+std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer /*tasksProcessor*/, const NColumnShard::TScanCounters& /*scanCounters*/) const {
return std::make_unique<NColumnShard::TStatsIterator>(this->shared_from_this());
}
+void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch) {
+ Y_VERIFY(IndexedBlobs.emplace(range).second);
+ Y_VERIFY(IndexedBlobSubscriber.emplace(range, &batch).second);
+ if (batch.GetFilter()) {
+ Counters.GetPostFilterBytes()->Add(range.Size);
+ ReadMetadata->ReadStats->DataAdditionalBytes += range.Size;
+ FetchBlobsQueue.emplace_front(range);
+ } else {
+ Counters.GetFilterBytes()->Add(range.Size);
+ ReadMetadata->ReadStats->DataFilterBytes += range.Size;
+ FetchBlobsQueue.emplace_back(range);
+ }
+}
-std::vector<TBlobRange> TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) {
+void TIndexedReadData::InitRead(ui32 inputBatch, bool inGranulesOrder) {
Y_VERIFY(ReadMetadata->BlobSchema);
Y_VERIFY(ReadMetadata->LoadSchema);
Y_VERIFY(ReadMetadata->ResultSchema);
@@ -277,54 +201,48 @@ std::vector<TBlobRange> TIndexedReadData::InitRead(ui32 inputBatch, bool inGranu
NotIndexed.resize(inputBatch);
ui32 batchNo = inputBatch;
- BatchInfo.resize(inputBatch + ReadMetadata->SelectInfo->Portions.size(), nullptr);
+ Batches.resize(inputBatch + ReadMetadata->SelectInfo->Portions.size(), nullptr);
- ui64 dataBytes = 0;
+ ui64 portionsBytes = 0;
for (auto& portionInfo : ReadMetadata->SelectInfo->Portions) {
+ portionsBytes += portionInfo.BlobsBytes();
Y_VERIFY_S(portionInfo.Records.size() > 0, "ReadMeatadata: " << *ReadMetadata);
ui64 granule = portionInfo.Records[0].Granule;
auto itGranule = Granules.find(granule);
if (itGranule == Granules.end()) {
- itGranule = Granules.emplace(granule, TGranule(granule, *this)).first;
- }
-
- TBatch& currentBatch = itGranule->second.AddBatch(batchNo, portionInfo);
- BatchInfo[batchNo] = &currentBatch;
-
- for (auto&& i : currentBatch.GetWaitingBlobs()) {
- Y_VERIFY(IndexedBlobs.emplace(i).second);
- Y_VERIFY(IndexedBlobSubscriber.emplace(i, &currentBatch).second);
- dataBytes += i.Size;
+ itGranule = Granules.emplace(granule, NIndexedReader::TGranule(granule, *this)).first;
}
+ NIndexedReader::TBatch& currentBatch = itGranule->second.AddBatch(batchNo, portionInfo);
+ currentBatch.Reset(&EarlyFilterColumns);
+ Batches[batchNo] = &currentBatch;
++batchNo;
}
auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted());
- std::vector<TBlobRange> out;
for (ui64 granule : granulesOrder) {
auto it = Granules.find(granule);
Y_VERIFY(it != Granules.end());
- it->second.FillBlobsForFetch(out);
if (inGranulesOrder) {
GranulesOutOrder.emplace_back(&it->second);
}
}
-
+ Counters.GetPortionBytes()->Add(portionsBytes);
auto& stats = ReadMetadata->ReadStats;
stats->IndexGranules = ReadMetadata->SelectInfo->Granules.size();
stats->IndexPortions = ReadMetadata->SelectInfo->Portions.size();
stats->IndexBatches = ReadMetadata->NumIndexedBlobs();
stats->CommittedBatches = ReadMetadata->CommittedBlobs.size();
- stats->UsedColumns = ReadMetadata->LoadSchema->num_fields();
- stats->DataBytes = dataBytes;
- return out;
+ stats->SchemaColumns = ReadMetadata->LoadSchema->num_fields();
+ stats->FilterColumns = EarlyFilterColumns.size();
+ stats->AdditionalColumns = PostFilterColumns.size();
+ stats->PortionsBytes = portionsBytes;
}
-void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& data, NColumnShard::IDataTasksProcessor::TPtr processor) {
- TBatch* portionBatch = nullptr;
+void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& data, NColumnShard::TDataTasksProcessorContainer processor) {
+ NIndexedReader::TBatch* portionBatch = nullptr;
{
auto it = IndexedBlobSubscriber.find(blobRange);
Y_VERIFY_DEBUG(it != IndexedBlobSubscriber.end());
@@ -338,13 +256,8 @@ void TIndexedReadData::AddIndexed(const TBlobRange& blobRange, const TString& da
return;
}
if (portionBatch->IsFetchingReady()) {
- if (auto batch = portionBatch->AssembleIndexedBatch(processor, ReadMetadata)) {
- if (processor) {
- processor->Add(batch);
- } else {
- batch->Execute();
- batch->Apply(*this);
- }
+ if (auto batch = portionBatch->AssembleTask(processor.GetObject(), ReadMetadata)) {
+ processor.Add(*this, batch);
}
}
}
@@ -413,8 +326,8 @@ TVector<TPartialReadResult> TIndexedReadData::GetReadyResults(const int64_t maxR
return out;
}
-std::vector<TIndexedReadData::TGranule*> TIndexedReadData::DetachReadyInOrder() {
- std::vector<TIndexedReadData::TGranule*> out;
+std::vector<NIndexedReader::TGranule*> TIndexedReadData::DetachReadyInOrder() {
+ std::vector<NIndexedReader::TGranule*> out;
out.reserve(GranulesToOut.size());
if (GranulesOutOrder.empty()) {
@@ -424,7 +337,7 @@ std::vector<TIndexedReadData::TGranule*> TIndexedReadData::DetachReadyInOrder()
GranulesToOut.clear();
} else {
while (GranulesOutOrder.size()) {
- TGranule* granule = GranulesOutOrder.front();
+ NIndexedReader::TGranule* granule = GranulesOutOrder.front();
if (!granule->IsReady()) {
break;
}
@@ -451,7 +364,7 @@ std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> TIndexedReadData::
OutNotIndexed.erase(0);
}
- std::vector<TGranule*> ready = DetachReadyInOrder();
+ std::vector<NIndexedReader::TGranule*> ready = DetachReadyInOrder();
for (auto&& granule : ready) {
bool canHaveDups = granule->IsDuplicationsAvailable();
std::vector<std::shared_ptr<arrow::RecordBatch>> inGranule = granule->GetReadyBatches();
@@ -638,20 +551,30 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco
return out;
}
-void TIndexedReadData::PortionFinished(const ui32 batchNo, std::shared_ptr<arrow::RecordBatch> batch) {
- Y_VERIFY(BatchInfo[batchNo]);
- BatchInfo[batchNo]->InitBatch(batch);
-}
-
+NIndexedReader::TBatch& TIndexedReadData::GetBatchInfo(const ui32 batchNo) {
+ Y_VERIFY(batchNo < Batches.size());
+ auto ptr = Batches[batchNo];
+ Y_VERIFY(ptr);
+ return *ptr;
}
-namespace NKikimr::NColumnShard {
-
-bool IDataPreparationTask::DoExecute() {
- if (OwnerOperator && OwnerOperator->IsStopped()) {
- return true;
+TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue,
+ const bool internalRead, const NColumnShard::TScanCounters& counters)
+ : Counters(counters)
+ , FetchBlobsQueue(fetchBlobsQueue)
+ , ReadMetadata(readMetadata)
+{
+ PostFilterColumns = ReadMetadata->GetUsedColumnIds();
+ EarlyFilterColumns = ReadMetadata->GetEarlyFilterColumnIds(true);
+ if (internalRead || EarlyFilterColumns.empty()) {
+ EarlyFilterColumns = PostFilterColumns;
+ PostFilterColumns.clear();
} else {
- return DoExecuteImpl();
+ for (auto&& i : EarlyFilterColumns) {
+ PostFilterColumns.erase(i);
+ }
}
+ Y_VERIFY(ReadMetadata->SelectInfo);
}
+
}
diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h
index 09c032c025..cccbe5801d 100644
--- a/ydb/core/tx/columnshard/engines/indexed_read_data.h
+++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h
@@ -2,70 +2,15 @@
#include "defs.h"
#include "column_engine.h"
#include "predicate.h"
-#include <ydb/core/tx/conveyor/usage/abstract.h>
+#include "reader/queue.h"
+#include "reader/granule.h"
+#include "reader/batch.h"
-namespace NKikimr::NColumnShard {
-class TScanIteratorBase;
-}
-
-namespace NKikimr::NOlap {
-class TIndexedReadData;
-}
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/columnshard/counters.h>
namespace NKikimr::NColumnShard {
-
-class IDataTasksProcessor;
-
-class IDataPreparationTask: public NConveyor::ITask {
-private:
- std::shared_ptr<IDataTasksProcessor> OwnerOperator;
-protected:
- virtual bool DoApply(NOlap::TIndexedReadData& indexedDataRead) const = 0;
- virtual bool DoExecuteImpl() = 0;
-
- virtual bool DoExecute() override final;
-public:
- IDataPreparationTask(std::shared_ptr<IDataTasksProcessor> ownerOperator)
- : OwnerOperator(ownerOperator)
- {
-
- }
- using TPtr = std::shared_ptr<IDataPreparationTask>;
- virtual ~IDataPreparationTask() = default;
- bool Apply(NOlap::TIndexedReadData& indexedDataRead) const {
- return DoApply(indexedDataRead);
- }
-};
-
-class IDataTasksProcessor {
-private:
- TAtomicCounter DataProcessorAddDataCounter = 0;
-protected:
- virtual bool DoAdd(IDataPreparationTask::TPtr task) = 0;
- std::atomic<bool> Stopped = false;
-public:
- i64 GetDataCounter() const {
- return DataProcessorAddDataCounter.Val();
- }
-
- void Stop() {
- Stopped = true;
- }
- bool IsStopped() const {
- return Stopped;
- }
-
- using TPtr = std::shared_ptr<IDataTasksProcessor>;
- virtual ~IDataTasksProcessor() = default;
- bool Add(IDataPreparationTask::TPtr task) {
- if (DoAdd(task)) {
- DataProcessorAddDataCounter.Inc();
- return true;
- }
- return false;
-
- }
-};
+class TScanIteratorBase;
}
namespace NKikimr::NOlap {
@@ -77,20 +22,47 @@ struct TReadStats {
ui64 IndexPortions{0};
ui64 IndexBatches{0};
ui64 CommittedBatches{0};
- ui32 UsedColumns{0};
- ui64 DataBytes{0};
+ ui64 PortionsBytes{ 0 };
+ ui64 DataFilterBytes{ 0 };
+ ui64 DataAdditionalBytes{ 0 };
+
+ ui32 SchemaColumns = 0;
+ ui32 FilterColumns = 0;
+ ui32 AdditionalColumns = 0;
+
+ ui32 SelectedRows = 0;
TReadStats(ui32 indexNo)
: BeginTimestamp(TInstant::Now())
, SelectedIndex(indexNo)
{}
+ void PrintToLog() {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)
+ ("event", "statistic")
+ ("begin", BeginTimestamp)
+ ("selected", SelectedIndex)
+ ("index_granules", IndexGranules)
+ ("index_portions", IndexPortions)
+ ("index_batches", IndexBatches)
+ ("committed_batches", CommittedBatches)
+ ("schema_columns", SchemaColumns)
+ ("filter_columns", FilterColumns)
+ ("additional_columns", AdditionalColumns)
+ ("portions_bytes", PortionsBytes)
+ ("data_filter_bytes", DataFilterBytes)
+ ("data_additional_bytes", DataAdditionalBytes)
+ ("delta_bytes", PortionsBytes - DataFilterBytes - DataAdditionalBytes)
+ ("selected_rows", SelectedRows)
+ ;
+ }
+
TDuration Duration() {
return TInstant::Now() - BeginTimestamp;
}
};
-// Holds all metedata that is needed to perform read/scan
+// Holds all metadata that is needed to perform read/scan
struct TReadMetadataBase {
using TConstPtr = std::shared_ptr<const TReadMetadataBase>;
@@ -119,7 +91,7 @@ struct TReadMetadataBase {
virtual TVector<std::pair<TString, NScheme::TTypeInfo>> GetResultYqlSchema() const = 0;
virtual TVector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const = 0;
- virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan() const = 0;
+ virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const = 0;
virtual void Dump(IOutputStream& out) const { Y_UNUSED(out); };
// TODO: can this only be done for base class?
@@ -146,7 +118,16 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_
, ReadStats(std::make_shared<TReadStats>(info.GetId()))
{}
- std::set<std::string> GetFilterColumns(const bool early) const;
+ std::vector<std::string> GetColumnsOrder() const {
+ std::vector<std::string> result;
+ for (auto&& i : LoadSchema->fields()) {
+ result.emplace_back(i->name());
+ }
+ return result;
+ }
+
+ std::set<ui32> GetEarlyFilterColumnIds(const bool noTrivial) const;
+ std::set<ui32> GetUsedColumnIds() const;
bool Empty() const {
Y_VERIFY(SelectInfo);
@@ -185,7 +166,7 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_
return SelectInfo->Stats().Blobs;
}
- std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan() const override;
+ std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override;
void Dump(IOutputStream& out) const override {
out << "columns: " << (LoadSchema ? LoadSchema->num_fields() : 0)
@@ -228,7 +209,7 @@ struct TReadStatsMetadata : public TReadMetadataBase, public std::enable_shared_
TVector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const override;
- std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan() const override;
+ std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TScanCounters& scanCounters) const override;
};
// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
@@ -243,15 +224,22 @@ struct TPartialReadResult {
};
class TIndexedReadData {
+private:
+ std::set<ui32> EarlyFilterColumns;
+ YDB_READONLY_DEF(std::set<ui32>, PostFilterColumns);
+ bool AbortedFlag = false;
+ NColumnShard::TScanCounters Counters;
+ std::vector<NIndexedReader::TBatch*> Batches;
+ TFetchBlobsQueue& FetchBlobsQueue;
+ friend class NIndexedReader::TBatch;
+ friend class NIndexedReader::TGranule;
public:
- TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata)
- : ReadMetadata(readMetadata)
- {
- Y_VERIFY(ReadMetadata->SelectInfo);
- }
+ TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, TFetchBlobsQueue& fetchBlobsQueue, const bool internalRead, const NColumnShard::TScanCounters& counters);
+
+ NIndexedReader::TBatch& GetBatchInfo(const ui32 batchNo);
- /// @returns blobId -> granule map. Granules could be read independently
- std::vector<TBlobRange> InitRead(ui32 numNotIndexed, bool inGranulesOrder = false);
+ /// Initial FetchBlobsQueue filling (queue from external scan iterator). Granules could be read independently
+ void InitRead(ui32 numNotIndexed, bool inGranulesOrder = false);
/// @returns batches and corresponding last keys in correct order (i.e. sorted by by PK)
TVector<TPartialReadResult> GetReadyResults(const int64_t maxRowsInBatch);
@@ -269,155 +257,34 @@ public:
NotIndexed[batchNo] = MakeNotIndexedBatch(batch, planStep, txId);
}
- void AddIndexed(const TBlobRange& blobRange, const TString& column, NColumnShard::IDataTasksProcessor::TPtr processor);
+ void AddIndexed(const TBlobRange& blobRange, const TString& column, NColumnShard::TDataTasksProcessorContainer processor);
bool IsInProgress() const { return Granules.size() > ReadyGranulesAccumulator.size(); }
bool IsIndexedBlob(const TBlobRange& blobRange) const {
return IndexedBlobs.contains(blobRange);
}
void Abort() {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort");
for (auto&& i : Granules) {
ReadyGranulesAccumulator.emplace(i.first);
}
+ AbortedFlag = true;
Y_VERIFY(ReadyGranulesAccumulator.size() == Granules.size());
Y_VERIFY(!IsInProgress());
}
private:
NOlap::TReadMetadata::TConstPtr ReadMetadata;
- std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed;
-
- class TAssembledNotFiltered: public NColumnShard::IDataPreparationTask {
- private:
- using TBase = NColumnShard::IDataPreparationTask;
- TPortionInfo::TPreparedBatchData BatchConstructor;
- std::shared_ptr<arrow::RecordBatch> FilteredBatch;
- NOlap::TReadMetadata::TConstPtr ReadMetadata;
- ui32 BatchNo = 0;
- bool AllowEarlyFilter = false;
- protected:
- virtual bool DoApply(TIndexedReadData& owner) const override;
- virtual bool DoExecuteImpl() override;
- public:
- TAssembledNotFiltered(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata,
- const ui32 batchNo, const bool allowEarlyFilter, NColumnShard::IDataTasksProcessor::TPtr processor)
- : TBase(processor)
- , BatchConstructor(batchConstructor)
- , ReadMetadata(readMetadata)
- , BatchNo(batchNo)
- , AllowEarlyFilter(allowEarlyFilter)
- {
-
- }
- };
- void PortionFinished(const ui32 batchNo, std::shared_ptr<arrow::RecordBatch> batch);
-
- class TGranule;
-
- class TBatch {
- private:
- YDB_READONLY(ui64, BatchNo, 0);
- YDB_READONLY(ui64, Portion, 0);
- YDB_READONLY(ui64, Granule, 0);
- THashSet<TBlobRange> WaitIndexed;
- YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch);
- YDB_FLAG_ACCESSOR(DuplicationsAvailable, false);
- THashMap<TBlobRange, TString> Data;
- TGranule* Owner = nullptr;
- const TPortionInfo* PortionInfo = nullptr;
-
- friend class TGranule;
- TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo & portionInfo);
- void FillBlobsForFetch(std::vector<TBlobRange>& result) const {
- for (auto&& i : WaitIndexed) {
- result.emplace_back(i);
- }
- }
- public:
- NColumnShard::IDataPreparationTask::TPtr AssembleIndexedBatch(NColumnShard::IDataTasksProcessor::TPtr processor, NOlap::TReadMetadata::TConstPtr readMetadata);
-
- const THashSet<TBlobRange>& GetWaitingBlobs() const {
- return WaitIndexed;
- }
-
- const TGranule& GetOwner() const {
- return *Owner;
- }
-
- bool IsFetchingReady() const {
- return WaitIndexed.empty();
- }
-
- const TPortionInfo& GetPortionInfo() const {
- return *PortionInfo;
- }
-
- void InitBatch(std::shared_ptr<arrow::RecordBatch> batch) {
- Y_VERIFY(!FilteredBatch);
- FilteredBatch = batch;
- Owner->OnBatchReady(*this, batch);
- }
-
- bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData) {
- if (!WaitIndexed.erase(bRange)) {
- return false;
- }
- Data.emplace(bRange, blobData);
- return true;
- }
- };
-
- class TGranule {
- private:
- YDB_READONLY(ui64, GranuleId, 0);
- YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::RecordBatch>>, ReadyBatches);
- YDB_FLAG_ACCESSOR(DuplicationsAvailable, false);
- YDB_READONLY_FLAG(Ready, false);
- THashMap<ui32, TBatch> Batches;
- std::set<ui32> WaitBatches;
- TIndexedReadData* Owner = nullptr;
- void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) {
- Y_VERIFY(!ReadyFlag);
- Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchNo()));
- if (batch && batch->num_rows()) {
- ReadyBatches.emplace_back(batch);
- }
- Owner->OnBatchReady(batchInfo, batch);
- if (WaitBatches.empty()) {
- ReadyFlag = true;
- Owner->OnGranuleReady(*this);
- }
- }
- public:
- friend class TIndexedReadData::TBatch;
- TGranule(const ui64 granuleId, TIndexedReadData& owner)
- : GranuleId(granuleId)
- , Owner(&owner)
- {
-
- }
-
- void FillBlobsForFetch(std::vector<TBlobRange>& result) const {
- for (auto&& i : Batches) {
- i.second.FillBlobsForFetch(result);
- }
- }
-
- TBatch& AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo) {
- Y_VERIFY(!ReadyFlag);
- WaitBatches.emplace(batchNo);
- auto infoEmplace = Batches.emplace(batchNo, TBatch(batchNo, *this, portionInfo));
- Y_VERIFY(infoEmplace.second);
- return infoEmplace.first->second;
- }
- };
+ std::vector<std::shared_ptr<arrow::RecordBatch>> NotIndexed;
- void OnGranuleReady(TGranule& granule) {
+ void AddBlobForFetch(const TBlobRange& range, NIndexedReader::TBatch& batch);
+ void OnGranuleReady(NIndexedReader::TGranule& granule) {
Y_VERIFY(GranulesToOut.emplace(granule.GetGranuleId(), &granule).second);
- Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second);
+ Y_VERIFY(ReadyGranulesAccumulator.emplace(granule.GetGranuleId()).second || AbortedFlag);
}
- void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) {
+ void OnBatchReady(const NIndexedReader::TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) {
if (batch && batch->num_rows()) {
+ ReadMetadata->ReadStats->SelectedRows += batch->num_rows();
if (batchInfo.IsDuplicationsAvailable()) {
Y_VERIFY(batchInfo.GetOwner().IsDuplicationsAvailable());
BatchesToDedup.insert(batch.get());
@@ -428,18 +295,17 @@ private:
}
THashSet<const void*> BatchesToDedup;
- THashMap<TBlobRange, TBatch*> IndexedBlobSubscriber; // blobId -> batch
- THashMap<ui64, TGranule*> GranulesToOut;
+ THashMap<TBlobRange, NIndexedReader::TBatch*> IndexedBlobSubscriber; // blobId -> batch
+ THashMap<ui64, NIndexedReader::TGranule*> GranulesToOut;
std::set<ui64> ReadyGranulesAccumulator;
THashSet<TBlobRange> IndexedBlobs;
ui32 ReadyNotIndexed{0};
THashMap<ui64, std::shared_ptr<arrow::RecordBatch>> OutNotIndexed; // granule -> not indexed to append
- TVector<TBatch*> BatchInfo;
- THashMap<ui64, TGranule> Granules;
- TDeque<TGranule*> GranulesOutOrder;
+ THashMap<ui64, NIndexedReader::TGranule> Granules;
+ TDeque<NIndexedReader::TGranule*> GranulesOutOrder;
std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription;
- std::vector<TGranule*> DetachReadyInOrder();
+ std::vector<NIndexedReader::TGranule*> DetachReadyInOrder();
const TIndexInfo& IndexInfo() const {
return ReadMetadata->IndexInfo;
@@ -448,7 +314,6 @@ private:
std::shared_ptr<arrow::RecordBatch> MakeNotIndexedBatch(
const std::shared_ptr<arrow::RecordBatch>& batch, ui64 planStep, ui64 txId) const;
- NColumnShard::IDataPreparationTask::TPtr AssembleIndexedBatch(const TBatch& batch, NColumnShard::IDataTasksProcessor::TPtr processor);
std::shared_ptr<arrow::RecordBatch> MergeNotIndexed(
std::vector<std::shared_ptr<arrow::RecordBatch>>&& batches) const;
std::vector<std::vector<std::shared_ptr<arrow::RecordBatch>>> ReadyToOut();
diff --git a/ydb/core/tx/columnshard/engines/portion_info.cpp b/ydb/core/tx/columnshard/engines/portion_info.cpp
index 4d4a94cb70..50bdb2ff6a 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portion_info.cpp
@@ -31,11 +31,23 @@ TString TPortionInfo::AddOneChunkColumn(const std::shared_ptr<arrow::Array>& arr
TPortionInfo::TPreparedBatchData TPortionInfo::PrepareForAssemble(const TIndexInfo& indexInfo,
const std::shared_ptr<arrow::Schema>& schema,
- const THashMap<TBlobRange, TString>& blobsData) const {
+ const THashMap<TBlobRange, TString>& blobsData, const std::optional<std::set<ui32>>& columnIds) const {
// Correct records order
TMap<int, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks
+ std::vector<std::shared_ptr<arrow::Field>> schemaFields;
+
+ for (auto&& i : schema->fields()) {
+ if (columnIds && !columnIds->contains(indexInfo.GetColumnId(i->name()))) {
+ continue;
+ }
+ schemaFields.emplace_back(i);
+ }
+
for (auto& rec : Records) {
+ if (columnIds && !columnIds->contains(rec.ColumnId)) {
+ continue;
+ }
ui32 columnId = rec.ColumnId;
TString columnName = indexInfo.GetColumnName(columnId);
std::string name(columnName.data(), columnName.size());
@@ -70,7 +82,7 @@ TPortionInfo::TPreparedBatchData TPortionInfo::PrepareForAssemble(const TIndexIn
columns.emplace_back(TPreparedColumn(field, std::move(blobs)));
}
- return TPreparedBatchData(std::move(columns), schema);
+ return TPreparedBatchData(std::move(columns), std::make_shared<arrow::Schema>(schemaFields));
}
void TPortionInfo::AddMinMax(ui32 columnId, const std::shared_ptr<arrow::Array>& column, bool sorted) {
@@ -264,25 +276,12 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble(con
}
std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const {
- std::vector<std::shared_ptr<arrow::Field>> fields;
- for (auto&& f : Schema->fields()) {
- if (!options.CheckFieldAcceptance(f->name())) {
- continue;
- }
- fields.emplace_back(f);
- }
- if (fields.empty()) {
- return nullptr;
- }
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
for (auto&& i : Columns) {
- if (!options.CheckFieldAcceptance(i.GetName())) {
- continue;
- }
columns.emplace_back(i.Assemble(options.GetRecordsCountLimitDef(Max<ui32>()), !options.IsForwardAssemble()));
}
- auto table = arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns);
+ auto table = arrow::Table::Make(Schema, columns);
auto res = table->CombineChunks();
Y_VERIFY(res.ok());
return NArrow::ToBatch(*res);
diff --git a/ydb/core/tx/columnshard/engines/portion_info.h b/ydb/core/tx/columnshard/engines/portion_info.h
index 1937c9be6c..6230256ac8 100644
--- a/ydb/core/tx/columnshard/engines/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portion_info.h
@@ -118,6 +118,14 @@ struct TPortionInfo {
return {sum, max};
}
+ ui64 BlobsBytes() const {
+ ui32 sum = 0;
+ for (auto& rec : Records) {
+ sum += rec.BlobRange.Size;
+ }
+ return sum;
+ }
+
void UpdateRecords(ui64 portion, const THashMap<ui64, ui64>& granuleRemap, const TSnapshot& snapshot) {
for (auto& rec : Records) {
rec.Portion = portion;
@@ -222,30 +230,11 @@ struct TPortionInfo {
class TAssembleOptions {
private:
- YDB_OPT(std::set<std::string>, IncludeColumnNames);
- YDB_OPT(std::set<std::string>, ExcludeColumnNames);
YDB_OPT(ui32, RecordsCountLimit);
YDB_FLAG_ACCESSOR(ForwardAssemble, true);
public:
- bool CheckFieldAcceptance(const std::string& fieldName) const {
- if (!!IncludeColumnNames && !IncludeColumnNames->contains(fieldName)) {
- return false;
- }
- if (!!ExcludeColumnNames && ExcludeColumnNames->contains(fieldName)) {
- return false;
- }
- return true;
- }
};
- std::vector<std::string> GetColumnsOrder() const {
- std::vector<std::string> result;
- for (auto&& i : Schema->fields()) {
- result.emplace_back(i->name());
- }
- return result;
- }
-
size_t GetColumnsCount() const {
return Columns.size();
}
@@ -262,11 +251,11 @@ struct TPortionInfo {
TPreparedBatchData PrepareForAssemble(const TIndexInfo& indexInfo,
const std::shared_ptr<arrow::Schema>& schema,
- const THashMap<TBlobRange, TString>& data) const;
+ const THashMap<TBlobRange, TString>& data, const std::optional<std::set<ui32>>& columnIds) const;
std::shared_ptr<arrow::RecordBatch> AssembleInBatch(const TIndexInfo& indexInfo,
const std::shared_ptr<arrow::Schema>& schema,
const THashMap<TBlobRange, TString>& data) const {
- return PrepareForAssemble(indexInfo, schema, data).Assemble();
+ return PrepareForAssemble(indexInfo, schema, data, {}).Assemble();
}
static TString SerializeColumn(const std::shared_ptr<arrow::Array>& array,
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..44908d898b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,28 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-engines-reader)
+target_compile_options(columnshard-engines-reader PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(columnshard-engines-reader PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ ydb-core-formats
+)
+target_sources(columnshard-engines-reader PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..ba709a0cff
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,29 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-engines-reader)
+target_compile_options(columnshard-engines-reader PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(columnshard-engines-reader PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ ydb-core-formats
+)
+target_sources(columnshard-engines-reader PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..ba709a0cff
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,29 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-engines-reader)
+target_compile_options(columnshard-engines-reader PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(columnshard-engines-reader PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ ydb-core-formats
+)
+target_sources(columnshard-engines-reader PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..44908d898b
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,28 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(columnshard-engines-reader)
+target_compile_options(columnshard-engines-reader PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(columnshard-engines-reader PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ ydb-core-formats
+)
+target_sources(columnshard-engines-reader PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/granule.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/batch.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp
+)
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.cpp b/ydb/core/tx/columnshard/engines/reader/batch.cpp
new file mode 100644
index 0000000000..a51115669d
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/batch.cpp
@@ -0,0 +1,102 @@
+#include "batch.h"
+#include "granule.h"
+#include "filter_assembler.h"
+#include "postfilter_assembler.h"
+#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+TBatch::TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo)
+ : BatchNo(batchNo)
+ , Portion(portionInfo.Records[0].Portion)
+ , Granule(owner.GetGranuleId())
+ , Owner(&owner)
+ , PortionInfo(&portionInfo) {
+ Y_VERIFY(portionInfo.Records.size());
+
+ if (portionInfo.CanIntersectOthers()) {
+ Owner->SetDuplicationsAvailable(true);
+ if (portionInfo.CanHaveDups()) {
+ SetDuplicationsAvailable(true);
+ }
+ }
+}
+
+NColumnShard::IDataTasksProcessor::ITask::TPtr TBatch::AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, NOlap::TReadMetadata::TConstPtr readMetadata) {
+ Y_VERIFY(WaitIndexed.empty());
+ Y_VERIFY(PortionInfo->Produced());
+ Y_VERIFY(!FilteredBatch);
+ auto batchConstructor = PortionInfo->PrepareForAssemble(readMetadata->IndexInfo, readMetadata->LoadSchema, Data, CurrentColumnIds);
+ Data.clear();
+ if (!Filter) {
+ return std::make_shared<TAssembleFilter>(std::move(batchConstructor), readMetadata, *this, PortionInfo->AllowEarlyFilter(), processor);
+ } else {
+ Y_VERIFY(FilterBatch);
+ return std::make_shared<TAssembleBatch>(std::move(batchConstructor), *this, readMetadata->GetColumnsOrder(), processor);
+ }
+}
+
+bool TBatch::AskedColumnsAlready(const std::set<ui32>& columnIds) const {
+ if (AskedColumnIds < columnIds) {
+ return false;
+ }
+ for (auto&& i : columnIds) {
+ if (!AskedColumnIds.contains(i)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void TBatch::Reset(const std::set<ui32>* columnIds) {
+ if (!columnIds) {
+ CurrentColumnIds.reset();
+ } else {
+ CurrentColumnIds = *columnIds;
+ Y_VERIFY(CurrentColumnIds->size());
+ }
+ if (CurrentColumnIds) {
+ for (auto&& i : *CurrentColumnIds) {
+ AskedColumnIds.emplace(i);
+ }
+ }
+ Y_VERIFY(WaitIndexed.empty());
+ Y_VERIFY(Data.empty());
+ WaitingBytes = 0;
+ for (const NOlap::TColumnRecord& rec : PortionInfo->Records) {
+ if (CurrentColumnIds && !CurrentColumnIds->contains(rec.ColumnId)) {
+ continue;
+ }
+ Y_VERIFY(WaitIndexed.emplace(rec.BlobRange).second);
+ Owner->Owner->AddBlobForFetch(rec.BlobRange, *this);
+ Y_VERIFY(rec.Portion == Portion);
+ Y_VERIFY(rec.Valid());
+ Y_VERIFY(Granule == rec.Granule);
+ WaitingBytes += rec.BlobRange.Size;
+ }
+}
+
+void TBatch::InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch) {
+ Y_VERIFY(filter);
+ Y_VERIFY(!Filter);
+ Y_VERIFY(!FilterBatch);
+ Filter = filter;
+ FilterBatch = filterBatch;
+}
+
+void TBatch::InitBatch(std::shared_ptr<arrow::RecordBatch> batch) {
+ Y_VERIFY(!FilteredBatch);
+ FilteredBatch = batch;
+ Owner->OnBatchReady(*this, batch);
+}
+
+bool TBatch::AddIndexedReady(const TBlobRange& bRange, const TString& blobData) {
+ if (!WaitIndexed.erase(bRange)) {
+ return false;
+ }
+ WaitingBytes -= bRange.Size;
+ Data.emplace(bRange, blobData);
+ return true;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/batch.h b/ydb/core/tx/columnshard/engines/reader/batch.h
new file mode 100644
index 0000000000..7e30a7f26f
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/batch.h
@@ -0,0 +1,68 @@
+#pragma once
+#include "conveyor_task.h"
+
+#include <ydb/core/formats/arrow_filter.h>
+#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/engines/portion_info.h>
+
+#include <ydb/library/accessor/accessor.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NOlap {
+struct TReadMetadata;
+}
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+class TGranule;
+
+class TBatch {
+private:
+ YDB_READONLY(ui64, BatchNo, 0);
+ YDB_READONLY(ui64, Portion, 0);
+ YDB_READONLY(ui64, Granule, 0);
+ YDB_READONLY(ui64, WaitingBytes, 0);
+
+ THashSet<TBlobRange> WaitIndexed;
+ YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilteredBatch);
+ YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, FilterBatch);
+ YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter);
+ YDB_FLAG_ACCESSOR(DuplicationsAvailable, false);
+ THashMap<TBlobRange, TString> Data;
+ TGranule* Owner = nullptr;
+ const TPortionInfo* PortionInfo = nullptr;
+
+ friend class TGranule;
+ TBatch(const ui32 batchNo, TGranule& owner, const TPortionInfo& portionInfo);
+
+ YDB_READONLY_DEF(std::optional<std::set<ui32>>, CurrentColumnIds);
+ std::set<ui32> AskedColumnIds;
+public:
+ bool AddIndexedReady(const TBlobRange& bRange, const TString& blobData);
+ bool AskedColumnsAlready(const std::set<ui32>& columnIds) const;
+
+ void Reset(const std::set<ui32>* columnIds);
+
+ void InitFilter(std::shared_ptr<NArrow::TColumnFilter> filter, std::shared_ptr<arrow::RecordBatch> filterBatch);
+ void InitBatch(std::shared_ptr<arrow::RecordBatch> batch);
+
+ NColumnShard::IDataTasksProcessor::ITask::TPtr AssembleTask(NColumnShard::IDataTasksProcessor::TPtr processor, std::shared_ptr<const NOlap::TReadMetadata> readMetadata);
+
+ const THashSet<TBlobRange>& GetWaitingBlobs() const {
+ return WaitIndexed;
+ }
+
+ const TGranule& GetOwner() const {
+ return *Owner;
+ }
+
+ bool IsFetchingReady() const {
+ return WaitIndexed.empty();
+ }
+
+ const TPortionInfo& GetPortionInfo() const {
+ return *PortionInfo;
+ }
+};
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
new file mode 100644
index 0000000000..e41445afab
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.cpp
@@ -0,0 +1,43 @@
+#include "conveyor_task.h"
+#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
+
+namespace NKikimr::NColumnShard {
+
+bool IDataTasksProcessor::ITask::DoExecute() {
+ if (OwnerOperator && OwnerOperator->IsStopped()) {
+ return true;
+ } else {
+ DataProcessedFlag = true;
+ return DoExecuteImpl();
+ }
+}
+
+bool IDataTasksProcessor::ITask::Apply(NOlap::TIndexedReadData& indexedDataRead) const {
+ if (OwnerOperator) {
+ OwnerOperator->ReplyReceived();
+ }
+ return DoApply(indexedDataRead);
+}
+
+bool IDataTasksProcessor::Add(ITask::TPtr task) {
+ if (IsStopped()) {
+ return false;
+ }
+ if (DoAdd(task)) {
+ DataProcessorAddDataCounter.Inc();
+ return true;
+ }
+ return false;
+}
+
+
+void TDataTasksProcessorContainer::Add(NOlap::TIndexedReadData& indexedDataRead, IDataTasksProcessor::ITask::TPtr task) {
+ if (Object) {
+ Object->Add(task);
+ } else {
+ task->Execute();
+ task->Apply(indexedDataRead);
+ }
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/conveyor_task.h b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
new file mode 100644
index 0000000000..f9d7adacca
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/conveyor_task.h
@@ -0,0 +1,85 @@
+#pragma once
+#include <ydb/core/tx/conveyor/usage/abstract.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NOlap {
+class TIndexedReadData;
+}
+
+namespace NKikimr::NColumnShard {
+
+class IDataTasksProcessor;
+
+class IDataTasksProcessor {
+private:
+ TAtomicCounter DataProcessorAddDataCounter = 0;
+ void ReplyReceived() {
+ Y_VERIFY(DataProcessorAddDataCounter.Dec() >= 0);
+ }
+public:
+ class ITask: public NConveyor::ITask {
+ private:
+ std::shared_ptr<IDataTasksProcessor> OwnerOperator;
+ YDB_READONLY_FLAG(DataProcessed, false);
+ protected:
+ virtual bool DoApply(NOlap::TIndexedReadData& indexedDataRead) const = 0;
+ virtual bool DoExecuteImpl() = 0;
+
+ virtual bool DoExecute() override final;
+ public:
+ ITask(std::shared_ptr<IDataTasksProcessor> ownerOperator)
+ : OwnerOperator(ownerOperator) {
+
+ }
+ using TPtr = std::shared_ptr<ITask>;
+ virtual ~ITask() = default;
+ bool Apply(NOlap::TIndexedReadData& indexedDataRead) const;
+ };
+protected:
+ virtual bool DoAdd(ITask::TPtr task) = 0;
+ std::atomic<bool> Stopped = false;
+public:
+ i64 GetDataCounter() const {
+ return DataProcessorAddDataCounter.Val();
+ }
+
+ void Stop() {
+ Stopped = true;
+ }
+ bool IsStopped() const {
+ return Stopped;
+ }
+ bool InWaiting() const {
+ return !IsStopped() && DataProcessorAddDataCounter.Val();
+ }
+
+ using TPtr = std::shared_ptr<IDataTasksProcessor>;
+ virtual ~IDataTasksProcessor() = default;
+ bool Add(ITask::TPtr task);
+};
+
+class TDataTasksProcessorContainer {
+private:
+ YDB_READONLY_DEF(IDataTasksProcessor::TPtr, Object);
+public:
+ TDataTasksProcessorContainer() = default;
+ TDataTasksProcessorContainer(IDataTasksProcessor::TPtr object)
+ : Object(object)
+ {
+
+ }
+
+ void Stop() {
+ if (Object) {
+ Object->Stop();
+ }
+ }
+
+ bool InWaiting() const {
+ return Object && Object->InWaiting();
+ }
+
+ void Add(NOlap::TIndexedReadData& indexedDataRead, IDataTasksProcessor::ITask::TPtr task);
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
new file mode 100644
index 0000000000..76448d7563
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.cpp
@@ -0,0 +1,56 @@
+#include "filter_assembler.h"
+#include <ydb/core/tx/columnshard/engines/filter.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+bool TAssembleFilter::DoExecuteImpl() {
+ /// @warning The replace logic is correct only in assumption that predicate is applied over a part of ReplaceKey.
+ /// It's not OK to apply predicate before replacing key duplicates otherwise.
+ /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
+ auto batch = BatchConstructor.Assemble();
+ Y_VERIFY(batch);
+ Y_VERIFY(batch->num_rows());
+ const ui32 originalCount = batch->num_rows();
+ Filter = std::make_shared<NArrow::TColumnFilter>(NOlap::FilterPortion(batch, *ReadMetadata));
+ if (!Filter->Apply(batch)) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", originalCount);
+ FilteredBatch = nullptr;
+ return true;
+ }
+#if 1 // optimization
+ if (ReadMetadata->Program && AllowEarlyFilter) {
+ auto filter = NOlap::EarlyFilter(batch, ReadMetadata->Program);
+ Filter->CombineSequential(filter);
+ if (!filter.Apply(batch)) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", originalCount);
+ FilteredBatch = nullptr;
+ return true;
+ }
+ }
+#else
+ Y_UNUSED(AllowEarlyFilter);
+#endif
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data")("original_count", originalCount)("filtered_count", batch->num_rows());
+
+ FilteredBatch = batch;
+ return true;
+}
+
+bool TAssembleFilter::DoApply(TIndexedReadData& owner) const {
+ TBatch& batch = owner.GetBatchInfo(BatchNo);
+ batch.InitFilter(Filter, FilteredBatch);
+ if (batch.AskedColumnsAlready(owner.GetPostFilterColumns()) || !FilteredBatch || FilteredBatch->num_rows() == 0) {
+ batch.InitBatch(FilteredBatch);
+ } else {
+ batch.Reset(&owner.GetPostFilterColumns());
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "additional_data")
+ ("filtered_count", FilteredBatch->num_rows())
+ ("blobs_count", batch.GetWaitingBlobs().size())
+ ("columns_count", batch.GetCurrentColumnIds()->size())
+ ("fetch_size", batch.GetWaitingBytes())
+ ;
+ }
+ return true;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
new file mode 100644
index 0000000000..7f6b3f7c9c
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/filter_assembler.h
@@ -0,0 +1,37 @@
+#pragma once
+#include "conveyor_task.h"
+
+#include <ydb/core/formats/arrow_filter.h>
+#include <ydb/core/tx/columnshard/engines/portion_info.h>
+#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+ class TAssembleFilter: public NColumnShard::IDataTasksProcessor::ITask {
+ private:
+ using TBase = NColumnShard::IDataTasksProcessor::ITask;
+ TPortionInfo::TPreparedBatchData BatchConstructor;
+ std::shared_ptr<arrow::RecordBatch> FilteredBatch;
+ NOlap::TReadMetadata::TConstPtr ReadMetadata;
+ std::shared_ptr<NArrow::TColumnFilter> Filter;
+ const ui32 BatchNo;
+ bool AllowEarlyFilter = false;
+ protected:
+ virtual bool DoApply(TIndexedReadData& owner) const override;
+ virtual bool DoExecuteImpl() override;
+ public:
+ TAssembleFilter(TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata,
+ TBatch& batch, const bool allowEarlyFilter, NColumnShard::IDataTasksProcessor::TPtr processor)
+ : TBase(processor)
+ , BatchConstructor(batchConstructor)
+ , ReadMetadata(readMetadata)
+ , BatchNo(batch.GetBatchNo())
+ , AllowEarlyFilter(allowEarlyFilter)
+ {
+
+ }
+ };
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.cpp b/ydb/core/tx/columnshard/engines/reader/granule.cpp
new file mode 100644
index 0000000000..f92bf1b482
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/granule.cpp
@@ -0,0 +1,28 @@
+#include "granule.h"
+#include <ydb/core/tx/columnshard/engines/portion_info.h>
+#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+void TGranule::OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch) {
+ Y_VERIFY(!ReadyFlag);
+ Y_VERIFY(WaitBatches.erase(batchInfo.GetBatchNo()));
+ if (batch && batch->num_rows()) {
+ ReadyBatches.emplace_back(batch);
+ }
+ Owner->OnBatchReady(batchInfo, batch);
+ if (WaitBatches.empty()) {
+ ReadyFlag = true;
+ Owner->OnGranuleReady(*this);
+ }
+}
+
+NKikimr::NOlap::NIndexedReader::TBatch& TGranule::AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo) {
+ Y_VERIFY(!ReadyFlag);
+ WaitBatches.emplace(batchNo);
+ auto infoEmplace = Batches.emplace(batchNo, TBatch(batchNo, *this, portionInfo));
+ Y_VERIFY(infoEmplace.second);
+ return infoEmplace.first->second;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/granule.h b/ydb/core/tx/columnshard/engines/reader/granule.h
new file mode 100644
index 0000000000..cad0a9318e
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/granule.h
@@ -0,0 +1,32 @@
+#pragma once
+#include "batch.h"
+
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/columnshard/engines/portion_info.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+class TGranule {
+private:
+ YDB_READONLY(ui64, GranuleId, 0);
+ YDB_READONLY_DEF(std::vector<std::shared_ptr<arrow::RecordBatch>>, ReadyBatches);
+ YDB_FLAG_ACCESSOR(DuplicationsAvailable, false);
+ YDB_READONLY_FLAG(Ready, false);
+ THashMap<ui32, TBatch> Batches;
+ std::set<ui32> WaitBatches;
+ TIndexedReadData* Owner = nullptr;
+ void OnBatchReady(const TBatch& batchInfo, std::shared_ptr<arrow::RecordBatch> batch);
+ friend class NIndexedReader::TBatch;
+public:
+ TGranule(const ui64 granuleId, TIndexedReadData& owner)
+ : GranuleId(granuleId)
+ , Owner(&owner) {
+
+ }
+
+ TBatch& AddBatch(const ui32 batchNo, const TPortionInfo& portionInfo);
+};
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
new file mode 100644
index 0000000000..2e9ff7520c
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.cpp
@@ -0,0 +1,55 @@
+#include "postfilter_assembler.h"
+#include "batch.h"
+#include <ydb/core/tx/columnshard/engines/indexed_read_data.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+
+bool TAssembleBatch::DoExecuteImpl() {
+ /// @warning The replace logic is correct only in assumption that predicate is applied over a part of ReplaceKey.
+ /// It's not OK to apply predicate before replacing key duplicates otherwise.
+ /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
+
+ Y_VERIFY(BatchConstructor.GetColumnsCount());
+
+ TPortionInfo::TPreparedBatchData::TAssembleOptions options;
+ if (Filter->GetInactiveHeadSize() > Filter->GetInactiveTailSize()) {
+ options.SetRecordsCountLimit(Filter->Size() - Filter->GetInactiveHeadSize())
+ .SetForwardAssemble(false);
+ Filter->CutInactiveHead();
+ } else {
+ options.SetRecordsCountLimit(Filter->Size() - Filter->GetInactiveTailSize());
+ Filter->CutInactiveTail();
+ }
+
+ auto addBatch = BatchConstructor.Assemble(options);
+ Y_VERIFY(addBatch);
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)
+ ("columns_count", addBatch->num_columns())("num_rows", addBatch->num_rows());
+ Y_VERIFY(Filter->Apply(addBatch));
+ Y_VERIFY(NArrow::MergeBatchColumns({ FilterBatch, addBatch }, FullBatch, FullColumnsOrder, true));
+
+ return true;
+}
+
+bool TAssembleBatch::DoApply(TIndexedReadData& owner) const {
+ TBatch& batch = owner.GetBatchInfo(BatchNo);
+ batch.InitBatch(FullBatch);
+ return true;
+}
+
+TAssembleBatch::TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstructor,
+ TBatch& currentBatch, const std::vector<std::string>& fullColumnsOrder,
+ NColumnShard::IDataTasksProcessor::TPtr processor)
+ : TBase(processor)
+ , BatchConstructor(batchConstructor)
+ , FullColumnsOrder(fullColumnsOrder)
+ , Filter(currentBatch.GetFilter())
+ , FilterBatch(currentBatch.GetFilterBatch())
+ , BatchNo(currentBatch.GetBatchNo())
+{
+ Y_VERIFY(currentBatch.GetFilter());
+ Y_VERIFY(currentBatch.GetFilterBatch());
+ Y_VERIFY(!currentBatch.GetFilteredBatch());
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h
new file mode 100644
index 0000000000..616b079e01
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/postfilter_assembler.h
@@ -0,0 +1,29 @@
+#pragma once
+#include "conveyor_task.h"
+
+#include <ydb/core/tx/columnshard/engines/portion_info.h>
+#include <ydb/core/formats/arrow_filter.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NOlap::NIndexedReader {
+class TBatch;
+class TAssembleBatch: public NColumnShard::IDataTasksProcessor::ITask {
+private:
+ using TBase = NColumnShard::IDataTasksProcessor::ITask;
+ TPortionInfo::TPreparedBatchData BatchConstructor;
+ std::shared_ptr<arrow::RecordBatch> FullBatch;
+ std::vector<std::string> FullColumnsOrder;
+
+ std::shared_ptr<NArrow::TColumnFilter> Filter;
+ std::shared_ptr<arrow::RecordBatch> FilterBatch;
+
+ const ui32 BatchNo;
+protected:
+ virtual bool DoApply(TIndexedReadData& owner) const override;
+ virtual bool DoExecuteImpl() override;
+public:
+ TAssembleBatch(TPortionInfo::TPreparedBatchData&& batchConstructor,
+ TBatch& currentBatch, const std::vector<std::string>& fullColumnsOrder, NColumnShard::IDataTasksProcessor::TPtr processor);
+};
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/queue.cpp b/ydb/core/tx/columnshard/engines/reader/queue.cpp
new file mode 100644
index 0000000000..59185c12d5
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/queue.cpp
@@ -0,0 +1,25 @@
+#include "queue.h"
+
+namespace NKikimr::NOlap {
+
+NKikimr::NOlap::TBlobRange TFetchBlobsQueue::pop_front() {
+ if (!StoppedFlag && IteratorBlobsSequential.size()) {
+ TBlobRange result = IteratorBlobsSequential.front();
+ IteratorBlobsSequential.pop_front();
+ return result;
+ } else {
+ return TBlobRange();
+ }
+}
+
+void TFetchBlobsQueue::emplace_front(const TBlobRange& range) {
+ Y_VERIFY(!StoppedFlag);
+ IteratorBlobsSequential.emplace_front(range);
+}
+
+void TFetchBlobsQueue::emplace_back(const TBlobRange& range) {
+ Y_VERIFY(!StoppedFlag);
+ IteratorBlobsSequential.emplace_back(range);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/engines/reader/queue.h b/ydb/core/tx/columnshard/engines/reader/queue.h
new file mode 100644
index 0000000000..f8a3da4159
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/queue.h
@@ -0,0 +1,35 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/tx/columnshard/blob.h>
+
+namespace NKikimr::NOlap {
+
+class TFetchBlobsQueue {
+private:
+ bool StoppedFlag = false;
+ YDB_ACCESSOR_DEF(std::deque<TBlobRange>, IteratorBlobsSequential);
+public:
+ bool IsStopped() const {
+ return StoppedFlag;
+ }
+
+ void Stop() {
+ IteratorBlobsSequential.clear();
+ StoppedFlag = true;
+ }
+
+ bool empty() const {
+ return IteratorBlobsSequential.empty();
+ }
+
+ size_t size() const {
+ return IteratorBlobsSequential.size();
+ }
+
+ TBlobRange pop_front();
+
+ void emplace_front(const TBlobRange& range);
+ void emplace_back(const TBlobRange& range);
+};
+
+}
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 07d539d9c1..358a52ac01 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -17,13 +17,13 @@ public:
NOlap::TReadMetadata::TConstPtr readMetadata,
const TInstant& deadline,
const TActorId& columnShardActorId,
- ui64 requestCookie)
+ ui64 requestCookie, const TScanCounters& counters)
: TabletId(tabletId)
, DstActor(dstActor)
, BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId())
, Result(std::move(event))
, ReadMetadata(readMetadata)
- , IndexedData(ReadMetadata)
+ , IndexedData(ReadMetadata, IndexedBlobsForFetch, true, counters)
, Deadline(deadline)
, ColumnShardActorId(columnShardActorId)
, RequestCookie(requestCookie)
@@ -51,7 +51,7 @@ public:
return; // ignore duplicate parts
}
WaitIndexed.erase(event.BlobRange);
- IndexedData.AddIndexed(event.BlobRange, event.Data, nullptr);
+ IndexedData.AddIndexed(event.BlobRange, event.Data, NColumnShard::TDataTasksProcessorContainer());
} else if (CommittedBlobs.contains(blobId)) {
auto cmt = WaitCommitted.extract(NOlap::TCommittedBlob{blobId, 0, 0});
if (cmt.empty()) {
@@ -128,8 +128,13 @@ public:
protoStats->SetIndexPortions(stats->IndexPortions);
protoStats->SetIndexBatches(stats->IndexBatches);
protoStats->SetNotIndexedBatches(stats->CommittedBatches);
- protoStats->SetUsedColumns(stats->UsedColumns);
- protoStats->SetDataBytes(stats->DataBytes);
+ protoStats->SetSchemaColumns(stats->SchemaColumns);
+ protoStats->SetFilterColumns(stats->FilterColumns);
+ protoStats->SetAdditionalColumns(stats->AdditionalColumns);
+ protoStats->SetDataFilterBytes(stats->DataFilterBytes);
+ protoStats->SetDataAdditionalBytes(stats->DataAdditionalBytes);
+ protoStats->SetPortionsBytes(stats->PortionsBytes);
+ protoStats->SetSelectedRows(stats->SelectedRows);
}
if (Deadline != TInstant::Max()) {
@@ -163,8 +168,9 @@ public:
WaitCommitted.emplace(cmtBlob, notIndexed);
}
- auto indexedBlobsForFetch = IndexedData.InitRead(notIndexed);
- for (auto&& blobRange : indexedBlobsForFetch) {
+ IndexedData.InitRead(notIndexed);
+ while (IndexedBlobsForFetch.size()) {
+ const auto blobRange = IndexedBlobsForFetch.pop_front();
WaitIndexed.insert(blobRange);
IndexedBlobs.emplace(blobRange);
}
@@ -245,6 +251,7 @@ private:
TActorId BlobCacheActorId;
std::unique_ptr<TEvColumnShard::TEvReadResult> Result;
NOlap::TReadMetadata::TConstPtr ReadMetadata;
+ NOlap::TFetchBlobsQueue IndexedBlobsForFetch;
NOlap::TIndexedReadData IndexedData;
TInstant Deadline;
TActorId ColumnShardActorId;
@@ -278,10 +285,10 @@ IActor* CreateReadActor(ui64 tabletId,
NOlap::TReadMetadata::TConstPtr readMetadata,
const TInstant& deadline,
const TActorId& columnShardActorId,
- ui64 requestCookie)
+ ui64 requestCookie, const TScanCounters& counters)
{
return new TReadActor(tabletId, dstActor, std::move(event), readMetadata,
- deadline, columnShardActorId, requestCookie);
+ deadline, columnShardActorId, requestCookie, counters);
}
}
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index eb13dae181..75ef8ee71a 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -906,11 +906,11 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString
if (ydbSchema == TTestSchema::YdbSchema()) {
if (codec == "" || codec == "lz4") {
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 50);
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetPortionsBytes() / 100000, 50);
} else if (codec == "none") {
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 75);
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetPortionsBytes() / 100000, 75);
} else if (codec == "zstd") {
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetDataBytes() / 100000, 26);
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetPortionsBytes() / 100000, 26);
} else {
UNIT_ASSERT(false);
}
@@ -1138,7 +1138,7 @@ void TestCompactionInGranuleImpl(bool reboots,
UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1);
UNIT_ASSERT(readStats.GetIndexBatches() > 0);
UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0);
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message"
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message"
UNIT_ASSERT(readStats.GetIndexPortions() <= 2); // got compaction
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
@@ -2065,7 +2065,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0);
UNIT_ASSERT(readStats.GetIndexBatches() > 0);
//UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message"
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message"
UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 3); // got 2 split compactions
//UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), x);
}
@@ -2126,7 +2126,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0);
UNIT_ASSERT(readStats.GetIndexBatches() > 0);
//UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message"
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message"
UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1);
//UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), 1); // TODO: min-max index optimization?
}
@@ -2186,7 +2186,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0);
UNIT_ASSERT(readStats.GetIndexBatches() > 0);
//UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO
- UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message"
+ UNIT_ASSERT_VALUES_EQUAL(readStats.GetSchemaColumns(), 7); // planStep, txId + 4 PK columns + "message"
UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 1);
//UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), 0); // TODO: min-max index optimization?
}
diff --git a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
index a5847ac677..feab71de25 100644
--- a/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
@@ -635,7 +635,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
if (resRead.GetFinished()) {
UNIT_ASSERT(meta.HasReadStats());
auto& readStats = meta.GetReadStats();
- ui64 numBytes = readStats.GetDataBytes(); // compressed bytes in storage
+ ui64 numBytes = readStats.GetPortionsBytes(); // compressed bytes in storage
specRowsBytes.back().second += numBytes;
break;
}