aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-11-01 10:55:45 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-11-01 11:40:23 +0300
commit967a51911964e64bd631f640b95cdfa5012809e8 (patch)
tree58e17ccb0ed2c639d49d770a543296963507caf1
parenta932c9ba0041a678a2a52524d39510f8bca468c9 (diff)
downloadydb-967a51911964e64bd631f640b95cdfa5012809e8.tar.gz
KIKIMR-19880: build assemblers in separated threads
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/events.h18
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp63
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/read_coordinator.h45
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/ya.make1
-rw-r--r--ydb/core/tx/columnshard/columnshard__scan.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/filter.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/filter.h2
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h27
-rw-r--r--ydb/core/tx/columnshard/engines/predicate/container.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h12
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp29
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp32
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h106
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/read_context.h8
-rw-r--r--ydb/core/tx/columnshard/read_actor.cpp5
23 files changed, 305 insertions, 99 deletions
diff --git a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.darwin-x86_64.txt
index 4be14a0cb0..386fb1cf80 100644
--- a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.darwin-x86_64.txt
@@ -19,4 +19,5 @@ target_sources(tx-columnshard-blobs_reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp
)
diff --git a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-aarch64.txt
index 8ac26f94f4..0f4ffad48a 100644
--- a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-aarch64.txt
@@ -20,4 +20,5 @@ target_sources(tx-columnshard-blobs_reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp
)
diff --git a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-x86_64.txt
index 8ac26f94f4..0f4ffad48a 100644
--- a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-x86_64.txt
@@ -20,4 +20,5 @@ target_sources(tx-columnshard-blobs_reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp
)
diff --git a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.windows-x86_64.txt
index 4be14a0cb0..386fb1cf80 100644
--- a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.windows-x86_64.txt
@@ -19,4 +19,5 @@ target_sources(tx-columnshard-blobs_reader PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/task.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/events.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp
)
diff --git a/ydb/core/tx/columnshard/blobs_reader/events.h b/ydb/core/tx/columnshard/blobs_reader/events.h
index ac020741eb..2d77bdc5d8 100644
--- a/ydb/core/tx/columnshard/blobs_reader/events.h
+++ b/ydb/core/tx/columnshard/blobs_reader/events.h
@@ -1,6 +1,24 @@
#pragma once
+#include <ydb/core/tx/columnshard/columnshard_private_events.h>
+#include <ydb/library/accessor/accessor.h>
+
+#include <library/cpp/actors/core/event_local.h>
+
namespace NKikimr::NOlap::NBlobOperations::NRead {
+class ITask;
+
+class TEvStartReadTask: public NActors::TEventLocal<TEvStartReadTask, NColumnShard::TEvPrivate::EEv::EvStartReadTask> {
+private:
+ YDB_READONLY_DEF(std::shared_ptr<ITask>, Task);
+public:
+
+ explicit TEvStartReadTask(std::shared_ptr<ITask> task)
+ : Task(task) {
+ }
+
+};
+
}
diff --git a/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp b/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp
new file mode 100644
index 0000000000..5b6a255faf
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp
@@ -0,0 +1,63 @@
+#include "read_coordinator.h"
+
+namespace NKikimr::NOlap::NBlobOperations::NRead {
+
+TAtomicCounter TReadCoordinatorActor::WaitingBlobsCount = 0;
+
+void TReadCoordinatorActor::Handle(TEvStartReadTask::TPtr& ev) {
+ const auto& externalTaskId = ev->Get()->GetTask()->GetExternalTaskId();
+ NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("external_task_id", externalTaskId);
+ THashSet<TBlobRange> rangesInProgress;
+ for (auto&& agent : ev->Get()->GetTask()->GetAgents()) {
+ for (auto&& b : agent->GetRangesForRead()) {
+ for (auto&& r : b.second) {
+ auto it = BlobTasks.find(r);
+ if (it != BlobTasks.end()) {
+ ACFL_DEBUG("event", "TEvReadTask")("enqueued_blob_id", r);
+ rangesInProgress.emplace(r);
+ } else {
+ ACFL_DEBUG("event", "TEvReadTask")("blob_id", r);
+ it = BlobTasks.emplace(r, std::vector<std::shared_ptr<ITask>>()).first;
+ WaitingBlobsCount.Inc();
+ }
+ it->second.emplace_back(ev->Get()->GetTask());
+ }
+ }
+ }
+ ev->Get()->GetTask()->StartBlobsFetching(rangesInProgress);
+ ACFL_DEBUG("task", ev->Get()->GetTask()->DebugString());
+ AFL_VERIFY(ev->Get()->GetTask()->GetAllRangesSize());
+}
+
+void TReadCoordinatorActor::Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) {
+ ACFL_TRACE("event", "TEvReadBlobRangeResult")("blob_id", ev->Get()->BlobRange);
+
+ auto& event = *ev->Get();
+ auto it = BlobTasks.find(event.BlobRange);
+ AFL_VERIFY(it != BlobTasks.end())("blob_id", event.BlobRange);
+ for (auto&& i : it->second) {
+ if (event.Status != NKikimrProto::EReplyStatus::OK) {
+ i->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob"));
+ } else {
+ i->AddData(event.BlobRange, event.Data);
+ }
+ }
+ WaitingBlobsCount.Dec();
+ BlobTasks.erase(it);
+}
+
+TReadCoordinatorActor::TReadCoordinatorActor(ui64 tabletId, const TActorId& parent)
+ : TabletId(tabletId)
+ , Parent(parent) {
+
+}
+
+TReadCoordinatorActor::~TReadCoordinatorActor() {
+ for (auto&& i : BlobTasks) {
+ for (auto&& t : i.second) {
+ t->Abort();
+ }
+ }
+}
+
+}
diff --git a/ydb/core/tx/columnshard/blobs_reader/read_coordinator.h b/ydb/core/tx/columnshard/blobs_reader/read_coordinator.h
new file mode 100644
index 0000000000..2e0addcaf7
--- /dev/null
+++ b/ydb/core/tx/columnshard/blobs_reader/read_coordinator.h
@@ -0,0 +1,45 @@
+#pragma once
+
+#include "task.h"
+#include "events.h"
+#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/blob_cache.h>
+
+#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NOlap::NBlobOperations::NRead {
+
+class TReadCoordinatorActor: public NActors::TActorBootstrapped<TReadCoordinatorActor> {
+private:
+ ui64 TabletId;
+ NActors::TActorId Parent;
+ THashMap<TBlobRange, std::vector<std::shared_ptr<ITask>>> BlobTasks;
+public:
+ static TAtomicCounter WaitingBlobsCount;
+ TReadCoordinatorActor(ui64 tabletId, const TActorId& parent);
+
+ void Handle(TEvStartReadTask::TPtr& ev);
+ void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev);
+
+ void Bootstrap() {
+ Become(&TThis::StateWait);
+ }
+
+ STFUNC(StateWait) {
+ TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent", Parent));
+ switch (ev->GetTypeRewrite()) {
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway);
+ hFunc(TEvStartReadTask, Handle);
+ hFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle);
+ default:
+ break;
+ }
+ }
+
+ ~TReadCoordinatorActor();
+};
+
+}
diff --git a/ydb/core/tx/columnshard/blobs_reader/ya.make b/ydb/core/tx/columnshard/blobs_reader/ya.make
index a4ff0dcb5e..f8f311b6b6 100644
--- a/ydb/core/tx/columnshard/blobs_reader/ya.make
+++ b/ydb/core/tx/columnshard/blobs_reader/ya.make
@@ -4,6 +4,7 @@ SRCS(
actor.cpp
task.cpp
events.cpp
+ read_coordinator.cpp
)
PEERDIR(
diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp
index 5ffa7e8295..d9e0cc8b13 100644
--- a/ydb/core/tx/columnshard/columnshard__scan.cpp
+++ b/ydb/core/tx/columnshard/columnshard__scan.cpp
@@ -1,5 +1,8 @@
#include "engines/reader/read_context.h"
#include "blobs_reader/events.h"
+#include "blobs_reader/read_coordinator.h"
+#include "blobs_reader/actor.h"
+#include "resource_subscriber/actor.h"
#include <ydb/core/tx/columnshard/columnshard__scan.h>
#include <ydb/core/tx/columnshard/columnshard__index_scan.h>
@@ -20,8 +23,6 @@
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/services/metadata/request/common.h>
#include <util/generic/noncopyable.h>
-#include "blobs_reader/actor.h"
-#include "resource_subscriber/actor.h"
namespace NKikimr::NColumnShard {
@@ -60,6 +61,7 @@ class TColumnShardScan : public TActorBootstrapped<TColumnShardScan>, NArrow::IR
private:
std::shared_ptr<NOlap::TActorBasedMemoryAccesor> MemoryAccessor;
TActorId ResourceSubscribeActorId;
+ TActorId ReadCoordinatorActorId;
const std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
public:
static constexpr auto ActorActivityType() {
@@ -68,7 +70,8 @@ public:
public:
virtual void PassAway() override {
- Send(ResourceSubscribeActorId , new TEvents::TEvPoisonPill);
+ Send(ResourceSubscribeActorId, new TEvents::TEvPoisonPill);
+ Send(ReadCoordinatorActorId, new TEvents::TEvPoisonPill);
IActor::PassAway();
}
@@ -107,9 +110,10 @@ public:
Y_ABORT_UNLESS(!ScanIterator);
MemoryAccessor = std::make_shared<NOlap::TActorBasedMemoryAccesor>(SelfId(), "CSScan/Result");
ResourceSubscribeActorId = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletId, SelfId()));
+ ReadCoordinatorActorId = ctx.Register(new NOlap::NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));
std::shared_ptr<NOlap::TReadContext> context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false,
- ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId);
+ ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId);
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
// propagate self actor id // TODO: FlagSubscribeOnSession ?
@@ -365,7 +369,7 @@ private:
return Finish();
}
- auto context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false, ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId);
+ auto context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false, ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId);
ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context);
// Used in TArrowToYdbConverter
ResultYqlSchema.clear();
@@ -1070,7 +1074,8 @@ std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults
std::vector<TPartialReadResult> result;
for (auto&& i : resultBatches) {
- i.FillResult(result, mergePartsToMax);
+ Y_UNUSED(mergePartsToMax);
+ i.FillResult(result, true);
}
return result;
}
diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp
index a8e43d52f9..307ee53692 100644
--- a/ydb/core/tx/columnshard/engines/filter.cpp
+++ b/ydb/core/tx/columnshard/engines/filter.cpp
@@ -45,10 +45,10 @@ NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatc
return result;
}
-NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion, const TReadMetadata& readMetadata) {
+NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion, const TReadMetadata& readMetadata, const bool useSnapshotFilter) {
Y_ABORT_UNLESS(portion);
NArrow::TColumnFilter result = readMetadata.GetPKRangesFilter().BuildFilter(portion);
- if (readMetadata.GetSnapshot().GetPlanStep()) {
+ if (readMetadata.GetSnapshot().GetPlanStep() && useSnapshotFilter) {
auto snapSchema = TIndexInfo::ArrowSchemaSnapshot();
result = result.And(MakeSnapshotFilter(portion, snapSchema, readMetadata.GetSnapshot()));
}
diff --git a/ydb/core/tx/columnshard/engines/filter.h b/ydb/core/tx/columnshard/engines/filter.h
index 9d5c412d7e..7eaab200ff 100644
--- a/ydb/core/tx/columnshard/engines/filter.h
+++ b/ydb/core/tx/columnshard/engines/filter.h
@@ -11,7 +11,7 @@ NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatc
const TSnapshot& snapshot);
struct TReadMetadata;
-NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata);
+NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata, const bool useSnapshotFilter);
NArrow::TColumnFilter FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata);
NArrow::TColumnFilter EarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch, std::shared_ptr<NSsa::TProgram> ssa);
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index 8d3128fd8a..cb1cdb9070 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -219,7 +219,7 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() c
return (*res)->column(0);
}
-std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const {
+std::shared_ptr<arrow::Table> TPortionInfo::TPreparedBatchData::AssembleTable(const TAssembleOptions& options) const {
std::vector<std::shared_ptr<arrow::ChunkedArray>> columns;
std::vector<std::shared_ptr<arrow::Field>> fields;
for (auto&& i : Columns) {
@@ -230,7 +230,11 @@ std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(c
fields.emplace_back(i.GetField());
}
- auto table = arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns);
+ return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns);
+}
+
+std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const {
+ auto table = AssembleTable(options);
auto res = table->CombineChunks();
Y_ABORT_UNLESS(res.ok());
return NArrow::ToBatch(*res);
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index f215bee39f..f23b65ad6b 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -498,6 +498,7 @@ public:
}
std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = {}) const;
+ std::shared_ptr<arrow::Table> AssembleTable(const TAssembleOptions& options = {}) const;
};
template <class TExternalBlobInfo>
@@ -514,16 +515,24 @@ public:
TMap<size_t, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks
TMap<size_t, size_t> positionsMap;
- for (auto& rec : Records) {
- auto resulPos = resultSchema.GetFieldIndex(rec.ColumnId);
- if (resulPos < 0) {
- continue;
+ {
+ int resulPos = -1;
+ int dataSchemaPos = -1;
+ std::optional<ui32> predColumnId;
+ for (auto& rec : Records) {
+ if (!predColumnId || rec.ColumnId != *predColumnId) {
+ resulPos = resultSchema.GetFieldIndex(rec.ColumnId);
+ dataSchemaPos = dataSchema.GetFieldIndex(rec.ColumnId);
+ }
+ predColumnId = rec.ColumnId;
+ if (resulPos < 0) {
+ continue;
+ }
+ Y_ASSERT(dataSchemaPos >= 0);
+ positionsMap[resulPos] = dataSchemaPos;
+ AFL_VERIFY(columnChunks[resulPos].emplace(rec.Chunk, rec.BlobRange).second)("record", rec.DebugString());
+ // AFL_VERIFY(rowsCount == NumRows(rec.ColumnId))("error", "Inconsistent rows")("portion", DebugString())("record", rec.DebugString())("column_records", NumRows(rec.ColumnId));
}
- auto pos = dataSchema.GetFieldIndex(rec.ColumnId);
- Y_ASSERT(pos >= 0);
- positionsMap[resulPos] = pos;
- AFL_VERIFY(columnChunks[resulPos].emplace(rec.Chunk, rec.BlobRange).second)("record", rec.DebugString());
-// AFL_VERIFY(rowsCount == NumRows(rec.ColumnId))("error", "Inconsistent rows")("portion", DebugString())("record", rec.DebugString())("column_records", NumRows(rec.ColumnId));
}
// Make chunked arrays for columns
diff --git a/ydb/core/tx/columnshard/engines/predicate/container.h b/ydb/core/tx/columnshard/engines/predicate/container.h
index 8c43ea63ee..0606335cc9 100644
--- a/ydb/core/tx/columnshard/engines/predicate/container.h
+++ b/ydb/core/tx/columnshard/engines/predicate/container.h
@@ -63,7 +63,7 @@ public:
static std::optional<TPredicateContainer> BuildPredicateTo(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo);
- NKikimr::NArrow::TColumnFilter BuildFilter(std::shared_ptr<arrow::RecordBatch> data) const {
+ NKikimr::NArrow::TColumnFilter BuildFilter(const std::shared_ptr<arrow::RecordBatch>& data) const {
if (!Object) {
return NArrow::TColumnFilter::BuildAllowFilter();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
index 28df6610fc..daf0dfaf5a 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp
@@ -8,10 +8,12 @@ bool TAssembleBatch::DoExecute() {
/// It's not OK to apply predicate before replacing key duplicates otherwise.
/// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here
- Y_ABORT_UNLESS(BatchConstructor.GetColumnsCount());
+ auto batchConstructor = BuildBatchConstructor(FetchColumnIds);
+
+ Y_ABORT_UNLESS(batchConstructor.GetColumnsCount());
TPortionInfo::TPreparedBatchData::TAssembleOptions options;
- auto addBatch = BatchConstructor.Assemble(options);
+ auto addBatch = batchConstructor.Assemble(options);
Y_ABORT_UNLESS(addBatch);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)
("columns_count", addBatch->num_columns())("num_rows", addBatch->num_rows());
@@ -26,13 +28,12 @@ bool TAssembleFFBatch::DoApply(IDataReader& /*owner*/) const {
return true;
}
-TAssembleBatch::TAssembleBatch(const NActors::TActorId& scanActorId, TPortionInfo::TPreparedBatchData&& batchConstructor,
- const std::shared_ptr<IDataSource>& source, const std::shared_ptr<NArrow::TColumnFilter>& filter, NColumnShard::TCounterGuard&& taskGuard)
- : TBase(scanActorId)
- , BatchConstructor(batchConstructor)
+TAssembleBatch::TAssembleBatch(const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<TPortionInfo>& portionInfo,
+ const std::shared_ptr<IDataSource>& source, const std::set<ui32>& columnIds, const THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>& blobs, const std::shared_ptr<NArrow::TColumnFilter>& filter)
+ : TBase(context, portionInfo, source, std::move(blobs))
, Filter(filter)
- , Source(source)
- , TaskGuard(std::move(taskGuard))
+ , TaskGuard(Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard())
+ , FetchColumnIds(columnIds)
{
TBase::SetPriority(TBase::EPriority::High);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h
index 12ce46a872..a4aac716ae 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h
@@ -5,26 +5,26 @@
#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/formats/arrow/arrow_filter.h>
+#include "filter_assembler.h"
namespace NKikimr::NOlap::NPlainReader {
class TBatch;
-class TAssembleBatch: public NColumnShard::IDataTasksProcessor::ITask, public NColumnShard::TMonitoringObjectsCounter<TAssembleBatch, true, true> {
+class TAssembleBatch: public TAssemblerCommon, public NColumnShard::TMonitoringObjectsCounter<TAssembleBatch, true, true> {
private:
- using TBase = NColumnShard::IDataTasksProcessor::ITask;
- TPortionInfo::TPreparedBatchData BatchConstructor;
+ using TBase = TAssemblerCommon;
std::shared_ptr<NArrow::TColumnFilter> Filter;
protected:
- std::shared_ptr<IDataSource> Source;
std::shared_ptr<arrow::RecordBatch> Result;
const NColumnShard::TCounterGuard TaskGuard;
+ const std::set<ui32> FetchColumnIds;
virtual bool DoExecute() override;
public:
virtual TString GetTaskClassIdentifier() const override {
return "PlainReader::TAssembleBatch";
}
- TAssembleBatch(const NActors::TActorId& scanActorId, TPortionInfo::TPreparedBatchData&& batchConstructor,
- const std::shared_ptr<IDataSource>& source, const std::shared_ptr<NArrow::TColumnFilter>& filter, NColumnShard::TCounterGuard&& taskGuard);
+ TAssembleBatch(const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<TPortionInfo>& portionInfo,
+ const std::shared_ptr<IDataSource>& source, const std::set<ui32>& columnIds, const THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>& blobs, const std::shared_ptr<NArrow::TColumnFilter>& filter);
};
class TAssembleFFBatch: public TAssembleBatch {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
index cb15d1f327..6c3100e51a 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp
@@ -9,7 +9,7 @@
namespace NKikimr::NOlap::NPlainReader {
-TPortionInfo::TPreparedBatchData TAssembleColumnsTaskConstructor::BuildBatchAssembler() {
+THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> TAssembleColumnsTaskConstructor::BuildBatchAssembler() {
auto blobs = ExtractBlobsData();
THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> blobsDataAssemble;
for (auto&& i : blobs) {
@@ -18,34 +18,29 @@ TPortionInfo::TPreparedBatchData TAssembleColumnsTaskConstructor::BuildBatchAsse
for (auto&& i : NullBlocks) {
AFL_VERIFY(blobsDataAssemble.emplace(i.first, i.second).second);
}
- auto blobSchema = Context->GetReadMetadata()->GetLoadSchema(PortionInfo->GetMinSnapshot());
- auto readSchema = Context->GetReadMetadata()->GetLoadSchema(Context->GetReadMetadata()->GetSnapshot());
- ISnapshotSchema::TPtr resultSchema;
- if (ColumnIds.size()) {
- resultSchema = std::make_shared<TFilteredSnapshotSchema>(readSchema, ColumnIds);
- } else {
- resultSchema = readSchema;
- }
-
- return PortionInfo->PrepareForAssemble(*blobSchema, *resultSchema, blobsDataAssemble);
+ return blobsDataAssemble;
}
void TEFTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
- NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TAssembleFilter>(Context->GetCommonContext()->GetScanActorId(), BuildBatchAssembler(),
- Context->GetReadMetadata(), Source, ColumnIds, UseEarlyFilter, Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard(), PortionInfo->RecordSnapshotMax()));
+ auto task = std::make_shared<TAssembleFilter>(Context, PortionInfo, Source, ColumnIds, UseEarlyFilter, BuildBatchAssembler());
+ task->SetPriority(NConveyor::ITask::EPriority::Normal);
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
}
void TFFColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
- NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TAssembleFFBatch>(Context->GetCommonContext()->GetScanActorId(), BuildBatchAssembler(),
- Source, AppliedFilter, Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard()));
+ auto task = std::make_shared<TAssembleFFBatch>(Context, PortionInfo, Source, ColumnIds, BuildBatchAssembler(), AppliedFilter);
+ task->SetPriority(NConveyor::ITask::EPriority::High);
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
}
void TCommittedColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
auto blobs = ExtractBlobsData();
Y_ABORT_UNLESS(NullBlocks.size() == 0);
Y_ABORT_UNLESS(blobs.size() == 1);
- NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TCommittedAssembler>(Context->GetCommonContext()->GetScanActorId(), blobs.begin()->second,
- Context->GetReadMetadata(), Source, CommittedBlob, Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard()));
+ auto task = std::make_shared<TCommittedAssembler>(Context->GetCommonContext()->GetScanActorId(), blobs.begin()->second,
+ Context->GetReadMetadata(), Source, CommittedBlob, Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard());
+ task->SetPriority(NConveyor::ITask::EPriority::High);
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task);
}
bool IFetchTaskConstructor::DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) {
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
index 8cf0be0272..bd6ef28dcd 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h
@@ -51,7 +51,7 @@ private:
protected:
std::set<ui32> ColumnIds;
std::shared_ptr<TPortionInfo> PortionInfo;
- TPortionInfo::TPreparedBatchData BuildBatchAssembler();
+ THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> BuildBatchAssembler();
public:
TAssembleColumnsTaskConstructor(const std::shared_ptr<TSpecialReadContext>& context, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks,
const std::set<ui32>& columnIds, const TPortionDataSource& portion, const std::shared_ptr<IDataSource>& sourcePtr, const TString& taskCustomer)
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
index a02a3b525f..1c88f99aa9 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp
@@ -12,11 +12,29 @@ bool TAssembleFilter::DoExecute() {
TPortionInfo::TPreparedBatchData::TAssembleOptions options;
options.IncludedColumnIds = FilterColumnIds;
- auto batch = BatchConstructor.Assemble(options);
+ if (RecordsMaxSnapshot < ReadMetadata->GetSnapshot() && UseFilter) {
+ for (auto&& i : TIndexInfo::GetSpecialColumnIds()) {
+ options.IncludedColumnIds->erase(i);
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_special_columns");
+ }
+
+ auto batchConstructor = BuildBatchConstructor(FilterColumnIds);
+
+ auto batch = batchConstructor.Assemble(options);
Y_ABORT_UNLESS(batch);
Y_ABORT_UNLESS(batch->num_rows());
+ if (RecordsMaxSnapshot < ReadMetadata->GetSnapshot() && UseFilter) {
+ for (auto&& f : TIndexInfo::ArrowSchemaSnapshot()->fields()) {
+ auto c = NArrow::TStatusValidator::GetValid(arrow::MakeArrayOfNull(f->type(), batch->num_rows()));
+ batch = NArrow::TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), f, c));
+ }
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "restore_fake_special_columns");
+ }
+
OriginalCount = batch->num_rows();
- AppliedFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::FilterPortion(batch, *ReadMetadata));
+ AppliedFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::FilterPortion(batch, *ReadMetadata, ReadMetadata->GetSnapshot() <= RecordsMaxSnapshot));
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "first_filter_using");
if (!AppliedFilter->Apply(batch)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount)("columns_count", FilterColumnIds.size());
return true;
@@ -34,17 +52,17 @@ bool TAssembleFilter::DoExecute() {
}
}
- if ((size_t)batch->schema()->num_fields() < BatchConstructor.GetColumnsCount()) {
+ if ((size_t)batch->schema()->num_fields() < batchConstructor.GetColumnsCount()) {
TPortionInfo::TPreparedBatchData::TAssembleOptions options;
options.ExcludedColumnIds = FilterColumnIds;
- auto addBatch = BatchConstructor.Assemble(options);
+ auto addBatch = batchConstructor.Assemble(options);
Y_ABORT_UNLESS(addBatch);
Y_ABORT_UNLESS(AppliedFilter->Apply(addBatch));
- Y_ABORT_UNLESS(NArrow::MergeBatchColumns({ batch, addBatch }, batch, BatchConstructor.GetSchemaColumnNames(), true));
+ Y_ABORT_UNLESS(NArrow::MergeBatchColumns({ batch, addBatch }, batch, batchConstructor.GetSchemaColumnNames(), true));
}
- AFL_VERIFY(AppliedFilter->Size() == OriginalCount)("original", OriginalCount)("af_count", AppliedFilter->Size());
+ AFL_VERIFY(AppliedFilter->IsTotalAllowFilter() || AppliedFilter->Size() == OriginalCount)("original", OriginalCount)("af_count", AppliedFilter->Size());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data")
- ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", BatchConstructor.GetColumnsCount())("allow_early", AllowEarlyFilter)
+ ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", batchConstructor.GetColumnsCount())("use_filter", UseFilter)
("filter_columns", FilterColumnIds.size())("af_count", AppliedFilter->Size())("ef_count", earlyFilter ? earlyFilter->Size() : 0);
FilteredBatch = batch;
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h
index 927d792a4e..d0d9b3e197 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h
@@ -8,44 +8,76 @@
namespace NKikimr::NOlap::NPlainReader {
- class TAssembleFilter: public NColumnShard::IDataTasksProcessor::ITask, public NColumnShard::TMonitoringObjectsCounter<TAssembleFilter, true, true> {
- private:
- using TBase = NColumnShard::IDataTasksProcessor::ITask;
- TPortionInfo::TPreparedBatchData BatchConstructor;
- std::shared_ptr<arrow::RecordBatch> FilteredBatch;
- const std::shared_ptr<IDataSource> Source;
- TReadMetadata::TConstPtr ReadMetadata;
- std::shared_ptr<NArrow::TColumnFilter> AppliedFilter;
- std::shared_ptr<NArrow::TColumnFilter> EarlyFilter;
- const TSnapshot RecordsMaxSnapshot;
- ui32 OriginalCount = 0;
- bool AllowEarlyFilter = false;
- std::set<ui32> FilterColumnIds;
- const bool UseFilter = true;
- const NColumnShard::TCounterGuard TaskGuard;
- protected:
- virtual bool DoApply(IDataReader& owner) const override;
- virtual bool DoExecute() override;
- public:
-
- virtual TString GetTaskClassIdentifier() const override {
- return "PlainReading::TAssembleFilter";
- }
+class TAssemblerCommon: public NColumnShard::IDataTasksProcessor::ITask {
+private:
+ using TBase = NColumnShard::IDataTasksProcessor::ITask;
+protected:
+ const std::shared_ptr<TSpecialReadContext> Context;
+ const std::shared_ptr<IDataSource> Source;
+ const std::shared_ptr<TPortionInfo> PortionInfo;
+ const TReadMetadata::TConstPtr ReadMetadata;
+ THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Blobs;
- TAssembleFilter(const NActors::TActorId& scanActorId, TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata,
- const std::shared_ptr<IDataSource>& source, const std::set<ui32>& filterColumnIds, const bool useFilter, NColumnShard::TCounterGuard&& taskGuard, const TSnapshot& recordsMaxSnapshot)
- : TBase(scanActorId)
- , BatchConstructor(batchConstructor)
- , Source(source)
- , ReadMetadata(readMetadata)
- , RecordsMaxSnapshot(recordsMaxSnapshot)
- , FilterColumnIds(filterColumnIds)
- , UseFilter(useFilter)
- , TaskGuard(std::move(taskGuard))
- {
- Y_UNUSED(RecordsMaxSnapshot);
- TBase::SetPriority(TBase::EPriority::Normal);
+ TPortionInfo::TPreparedBatchData BuildBatchConstructor(const std::set<ui32>& columnIds) const {
+ auto blobSchema = ReadMetadata->GetLoadSchema(PortionInfo->GetMinSnapshot());
+ auto readSchema = ReadMetadata->GetLoadSchema(Context->GetReadMetadata()->GetSnapshot());
+ ISnapshotSchema::TPtr resultSchema;
+ if (columnIds.size()) {
+ resultSchema = std::make_shared<TFilteredSnapshotSchema>(readSchema, columnIds);
+ } else {
+ resultSchema = readSchema;
}
- };
+
+ return PortionInfo->PrepareForAssemble(*blobSchema, *resultSchema, Blobs);
+ }
+
+public:
+ TAssemblerCommon(const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<TPortionInfo>& portionInfo,
+ const std::shared_ptr<IDataSource>& source, const THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>& blobs)
+ : TBase(context->GetCommonContext()->GetScanActorId())
+ , Context(context)
+ , Source(source)
+ , PortionInfo(portionInfo)
+ , ReadMetadata(Context->GetReadMetadata())
+ , Blobs(blobs)
+ {
+
+ }
+};
+
+class TAssembleFilter: public TAssemblerCommon, public NColumnShard::TMonitoringObjectsCounter<TAssembleFilter, true, true> {
+private:
+ using TBase = TAssemblerCommon;
+
+ std::shared_ptr<arrow::RecordBatch> FilteredBatch;
+ std::shared_ptr<NArrow::TColumnFilter> AppliedFilter;
+ std::shared_ptr<NArrow::TColumnFilter> EarlyFilter;
+ const TSnapshot RecordsMaxSnapshot;
+ ui32 OriginalCount = 0;
+ std::set<ui32> FilterColumnIds;
+ const bool UseFilter = true;
+ const NColumnShard::TCounterGuard TaskGuard;
+ THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Blobs;
+protected:
+ virtual bool DoApply(IDataReader& owner) const override;
+ virtual bool DoExecute() override;
+public:
+
+ virtual TString GetTaskClassIdentifier() const override {
+ return "PlainReading::TAssembleFilter";
+ }
+
+ TAssembleFilter(const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<TPortionInfo>& portionInfo,
+ const std::shared_ptr<IDataSource>& source, const std::set<ui32>& filterColumnIds, const bool useFilter, const THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>& blobs)
+ : TBase(context, portionInfo, source, std::move(blobs))
+ , RecordsMaxSnapshot(PortionInfo->RecordSnapshotMax())
+ , FilterColumnIds(filterColumnIds)
+ , UseFilter(useFilter)
+ , TaskGuard(Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard())
+ {
+ Y_UNUSED(RecordsMaxSnapshot);
+ TBase::SetPriority(TBase::EPriority::Normal);
+ }
+};
}
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
index c370b76c7b..d76f0ad0f5 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp
@@ -5,6 +5,7 @@
#include "plain_read_data.h"
#include <ydb/core/formats/arrow/serializer/full.h>
#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
+#include <ydb/core/tx/columnshard/blobs_reader/events.h>
namespace NKikimr::NOlap::NPlainReader {
@@ -93,6 +94,7 @@ void TPortionDataSource::DoStartFilterStage(const std::shared_ptr<IDataSource>&
std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction};
auto constructor = std::make_shared<TEFTaskConstructor>(GetContext(), actions, std::move(nullBlocks), columnIds, *this, sourcePtr, FetchingPlan->CanUseEarlyFilterImmediately(), "ReaderFilter");
+// NActors::TActivationContext::AsActorContext().Send(GetContext()->GetCommonContext()->GetReadCoordinatorActorId(), new NOlap::NBlobOperations::NRead::TEvStartReadTask(constructor));
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
}
diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h
index 130876ae11..228e3cded4 100644
--- a/ydb/core/tx/columnshard/engines/reader/read_context.h
+++ b/ydb/core/tx/columnshard/engines/reader/read_context.h
@@ -34,6 +34,7 @@ private:
NResourceBroker::NSubscribe::TTaskContext ResourcesTaskContext;
const TActorId ScanActorId;
const TActorId ResourceSubscribeActorId;
+ const TActorId ReadCoordinatorActorId;
public:
bool IsReverse() const {
return ReadMetadata->IsDescSorted();
@@ -43,6 +44,10 @@ public:
return ResourceSubscribeActorId;
}
+ const TActorId& GetReadCoordinatorActorId() const {
+ return ReadCoordinatorActorId;
+ }
+
const TActorId& GetScanActorId() const {
return ScanActorId;
}
@@ -60,7 +65,7 @@ public:
}
TReadContext(const std::shared_ptr<IStoragesManager>& storagesManager, const NColumnShard::TConcreteScanCounters& counters, const bool isInternalRead, const TReadMetadataBase::TConstPtr& readMetadata,
- const TActorId& scanActorId, const TActorId& resourceSubscribeActorId)
+ const TActorId& scanActorId, const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId)
: StoragesManager(storagesManager)
, Counters(counters)
, IsInternalRead(isInternalRead)
@@ -68,6 +73,7 @@ public:
, ResourcesTaskContext("CS::SCAN_READ", counters.ResourcesSubscriberCounters)
, ScanActorId(scanActorId)
, ResourceSubscribeActorId(resourceSubscribeActorId)
+ , ReadCoordinatorActorId(readCoordinatorActorId)
{
Y_ABORT_UNLESS(ReadMetadata);
}
diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp
index 1165be0275..106a583c05 100644
--- a/ydb/core/tx/columnshard/read_actor.cpp
+++ b/ydb/core/tx/columnshard/read_actor.cpp
@@ -7,6 +7,7 @@
#include "blobs_reader/actor.h"
#include "engines/reader/read_context.h"
#include "resource_subscriber/actor.h"
+#include "blobs_reader/read_coordinator.h"
namespace NKikimr::NColumnShard {
namespace {
@@ -155,7 +156,8 @@ public:
void Bootstrap(const TActorContext& ctx) {
ResourceSubscribeActorId = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletId, SelfId()));
- IndexedData = ReadMetadata->BuildReader(std::make_shared<NOlap::TReadContext>(Storages, Counters, true, ReadMetadata, SelfId(), ResourceSubscribeActorId));
+ ReadCoordinatorActorId = ctx.Register(new NOlap::NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));
+ IndexedData = ReadMetadata->BuildReader(std::make_shared<NOlap::TReadContext>(Storages, Counters, true, ReadMetadata, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId));
LOG_S_DEBUG("Starting read (" << IndexedData->DebugString(false) << ") at tablet " << TabletId);
bool earlyExit = false;
@@ -202,6 +204,7 @@ private:
TActorId BlobCacheActorId;
std::unique_ptr<TEvColumnShard::TEvReadResult> Result;
TActorId ResourceSubscribeActorId;
+ TActorId ReadCoordinatorActorId;
NOlap::TReadMetadata::TConstPtr ReadMetadata;
std::shared_ptr<NOlap::IDataReader> IndexedData;
TInstant Deadline;