diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-01 10:55:45 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-11-01 11:40:23 +0300 |
commit | 967a51911964e64bd631f640b95cdfa5012809e8 (patch) | |
tree | 58e17ccb0ed2c639d49d770a543296963507caf1 | |
parent | a932c9ba0041a678a2a52524d39510f8bca468c9 (diff) | |
download | ydb-967a51911964e64bd631f640b95cdfa5012809e8.tar.gz |
KIKIMR-19880: build assemblers in separated threads
23 files changed, 305 insertions, 99 deletions
diff --git a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.darwin-x86_64.txt index 4be14a0cb0..386fb1cf80 100644 --- a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.darwin-x86_64.txt @@ -19,4 +19,5 @@ target_sources(tx-columnshard-blobs_reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp ) diff --git a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-aarch64.txt index 8ac26f94f4..0f4ffad48a 100644 --- a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-aarch64.txt @@ -20,4 +20,5 @@ target_sources(tx-columnshard-blobs_reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp ) diff --git a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-x86_64.txt index 8ac26f94f4..0f4ffad48a 100644 --- a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.linux-x86_64.txt @@ -20,4 +20,5 @@ target_sources(tx-columnshard-blobs_reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp ) diff --git a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.windows-x86_64.txt index 4be14a0cb0..386fb1cf80 100644 --- a/ydb/core/tx/columnshard/blobs_reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/blobs_reader/CMakeLists.windows-x86_64.txt @@ -19,4 +19,5 @@ target_sources(tx-columnshard-blobs_reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/task.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/events.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp ) diff --git a/ydb/core/tx/columnshard/blobs_reader/events.h b/ydb/core/tx/columnshard/blobs_reader/events.h index ac020741eb..2d77bdc5d8 100644 --- a/ydb/core/tx/columnshard/blobs_reader/events.h +++ b/ydb/core/tx/columnshard/blobs_reader/events.h @@ -1,6 +1,24 @@ #pragma once +#include <ydb/core/tx/columnshard/columnshard_private_events.h> +#include <ydb/library/accessor/accessor.h> + +#include <library/cpp/actors/core/event_local.h> + namespace NKikimr::NOlap::NBlobOperations::NRead { +class ITask; + +class TEvStartReadTask: public NActors::TEventLocal<TEvStartReadTask, NColumnShard::TEvPrivate::EEv::EvStartReadTask> { +private: + YDB_READONLY_DEF(std::shared_ptr<ITask>, Task); +public: + + explicit TEvStartReadTask(std::shared_ptr<ITask> task) + : Task(task) { + } + +}; + } diff --git a/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp b/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp new file mode 100644 index 0000000000..5b6a255faf --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_reader/read_coordinator.cpp @@ -0,0 +1,63 @@ +#include "read_coordinator.h"
+
+namespace NKikimr::NOlap::NBlobOperations::NRead {
+
+TAtomicCounter TReadCoordinatorActor::WaitingBlobsCount = 0;
+
+void TReadCoordinatorActor::Handle(TEvStartReadTask::TPtr& ev) {
+ const auto& externalTaskId = ev->Get()->GetTask()->GetExternalTaskId();
+ NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("external_task_id", externalTaskId);
+ THashSet<TBlobRange> rangesInProgress;
+ for (auto&& agent : ev->Get()->GetTask()->GetAgents()) {
+ for (auto&& b : agent->GetRangesForRead()) {
+ for (auto&& r : b.second) {
+ auto it = BlobTasks.find(r);
+ if (it != BlobTasks.end()) {
+ ACFL_DEBUG("event", "TEvReadTask")("enqueued_blob_id", r);
+ rangesInProgress.emplace(r);
+ } else {
+ ACFL_DEBUG("event", "TEvReadTask")("blob_id", r);
+ it = BlobTasks.emplace(r, std::vector<std::shared_ptr<ITask>>()).first;
+ WaitingBlobsCount.Inc();
+ }
+ it->second.emplace_back(ev->Get()->GetTask());
+ }
+ }
+ }
+ ev->Get()->GetTask()->StartBlobsFetching(rangesInProgress);
+ ACFL_DEBUG("task", ev->Get()->GetTask()->DebugString());
+ AFL_VERIFY(ev->Get()->GetTask()->GetAllRangesSize());
+}
+
+void TReadCoordinatorActor::Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev) {
+ ACFL_TRACE("event", "TEvReadBlobRangeResult")("blob_id", ev->Get()->BlobRange);
+
+ auto& event = *ev->Get();
+ auto it = BlobTasks.find(event.BlobRange);
+ AFL_VERIFY(it != BlobTasks.end())("blob_id", event.BlobRange);
+ for (auto&& i : it->second) {
+ if (event.Status != NKikimrProto::EReplyStatus::OK) {
+ i->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob"));
+ } else {
+ i->AddData(event.BlobRange, event.Data);
+ }
+ }
+ WaitingBlobsCount.Dec();
+ BlobTasks.erase(it);
+}
+
+TReadCoordinatorActor::TReadCoordinatorActor(ui64 tabletId, const TActorId& parent)
+ : TabletId(tabletId)
+ , Parent(parent) {
+
+}
+
+TReadCoordinatorActor::~TReadCoordinatorActor() {
+ for (auto&& i : BlobTasks) {
+ for (auto&& t : i.second) {
+ t->Abort();
+ }
+ }
+}
+
+} diff --git a/ydb/core/tx/columnshard/blobs_reader/read_coordinator.h b/ydb/core/tx/columnshard/blobs_reader/read_coordinator.h new file mode 100644 index 0000000000..2e0addcaf7 --- /dev/null +++ b/ydb/core/tx/columnshard/blobs_reader/read_coordinator.h @@ -0,0 +1,45 @@ +#pragma once
+
+#include "task.h"
+#include "events.h"
+#include <ydb/core/tx/columnshard/blob.h>
+#include <ydb/core/tx/columnshard/blob_cache.h>
+
+#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NOlap::NBlobOperations::NRead {
+
+class TReadCoordinatorActor: public NActors::TActorBootstrapped<TReadCoordinatorActor> {
+private:
+ ui64 TabletId;
+ NActors::TActorId Parent;
+ THashMap<TBlobRange, std::vector<std::shared_ptr<ITask>>> BlobTasks;
+public:
+ static TAtomicCounter WaitingBlobsCount;
+ TReadCoordinatorActor(ui64 tabletId, const TActorId& parent);
+
+ void Handle(TEvStartReadTask::TPtr& ev);
+ void Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev);
+
+ void Bootstrap() {
+ Become(&TThis::StateWait);
+ }
+
+ STFUNC(StateWait) {
+ TLogContextGuard gLogging(NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent", Parent));
+ switch (ev->GetTypeRewrite()) {
+ cFunc(NActors::TEvents::TEvPoison::EventType, PassAway);
+ hFunc(TEvStartReadTask, Handle);
+ hFunc(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult, Handle);
+ default:
+ break;
+ }
+ }
+
+ ~TReadCoordinatorActor();
+};
+
+} diff --git a/ydb/core/tx/columnshard/blobs_reader/ya.make b/ydb/core/tx/columnshard/blobs_reader/ya.make index a4ff0dcb5e..f8f311b6b6 100644 --- a/ydb/core/tx/columnshard/blobs_reader/ya.make +++ b/ydb/core/tx/columnshard/blobs_reader/ya.make @@ -4,6 +4,7 @@ SRCS( actor.cpp task.cpp events.cpp + read_coordinator.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index 5ffa7e8295..d9e0cc8b13 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -1,5 +1,8 @@ #include "engines/reader/read_context.h" #include "blobs_reader/events.h" +#include "blobs_reader/read_coordinator.h" +#include "blobs_reader/actor.h" +#include "resource_subscriber/actor.h" #include <ydb/core/tx/columnshard/columnshard__scan.h> #include <ydb/core/tx/columnshard/columnshard__index_scan.h> @@ -20,8 +23,6 @@ #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/services/metadata/request/common.h> #include <util/generic/noncopyable.h> -#include "blobs_reader/actor.h" -#include "resource_subscriber/actor.h" namespace NKikimr::NColumnShard { @@ -60,6 +61,7 @@ class TColumnShardScan : public TActorBootstrapped<TColumnShardScan>, NArrow::IR private: std::shared_ptr<NOlap::TActorBasedMemoryAccesor> MemoryAccessor; TActorId ResourceSubscribeActorId; + TActorId ReadCoordinatorActorId; const std::shared_ptr<NOlap::IStoragesManager> StoragesManager; public: static constexpr auto ActorActivityType() { @@ -68,7 +70,8 @@ public: public: virtual void PassAway() override { - Send(ResourceSubscribeActorId , new TEvents::TEvPoisonPill); + Send(ResourceSubscribeActorId, new TEvents::TEvPoisonPill); + Send(ReadCoordinatorActorId, new TEvents::TEvPoisonPill); IActor::PassAway(); } @@ -107,9 +110,10 @@ public: Y_ABORT_UNLESS(!ScanIterator); MemoryAccessor = std::make_shared<NOlap::TActorBasedMemoryAccesor>(SelfId(), "CSScan/Result"); ResourceSubscribeActorId = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletId, SelfId())); + ReadCoordinatorActorId = ctx.Register(new NOlap::NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId())); std::shared_ptr<NOlap::TReadContext> context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false, - ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId); + ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId); ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); // propagate self actor id // TODO: FlagSubscribeOnSession ? @@ -365,7 +369,7 @@ private: return Finish(); } - auto context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false, ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId); + auto context = std::make_shared<NOlap::TReadContext>(StoragesManager, ScanCountersPool, false, ReadMetadataRanges[ReadMetadataIndex], SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId); ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); // Used in TArrowToYdbConverter ResultYqlSchema.clear(); @@ -1070,7 +1074,8 @@ std::vector<NKikimr::NOlap::TPartialReadResult> TPartialReadResult::SplitResults std::vector<TPartialReadResult> result; for (auto&& i : resultBatches) { - i.FillResult(result, mergePartsToMax); + Y_UNUSED(mergePartsToMax); + i.FillResult(result, true); } return result; } diff --git a/ydb/core/tx/columnshard/engines/filter.cpp b/ydb/core/tx/columnshard/engines/filter.cpp index a8e43d52f9..307ee53692 100644 --- a/ydb/core/tx/columnshard/engines/filter.cpp +++ b/ydb/core/tx/columnshard/engines/filter.cpp @@ -45,10 +45,10 @@ NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatc return result; } -NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion, const TReadMetadata& readMetadata) { +NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& portion, const TReadMetadata& readMetadata, const bool useSnapshotFilter) { Y_ABORT_UNLESS(portion); NArrow::TColumnFilter result = readMetadata.GetPKRangesFilter().BuildFilter(portion); - if (readMetadata.GetSnapshot().GetPlanStep()) { + if (readMetadata.GetSnapshot().GetPlanStep() && useSnapshotFilter) { auto snapSchema = TIndexInfo::ArrowSchemaSnapshot(); result = result.And(MakeSnapshotFilter(portion, snapSchema, readMetadata.GetSnapshot())); } diff --git a/ydb/core/tx/columnshard/engines/filter.h b/ydb/core/tx/columnshard/engines/filter.h index 9d5c412d7e..7eaab200ff 100644 --- a/ydb/core/tx/columnshard/engines/filter.h +++ b/ydb/core/tx/columnshard/engines/filter.h @@ -11,7 +11,7 @@ NArrow::TColumnFilter MakeSnapshotFilter(const std::shared_ptr<arrow::RecordBatc const TSnapshot& snapshot); struct TReadMetadata; -NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata); +NArrow::TColumnFilter FilterPortion(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata, const bool useSnapshotFilter); NArrow::TColumnFilter FilterNotIndexed(const std::shared_ptr<arrow::RecordBatch>& batch, const TReadMetadata& readMetadata); NArrow::TColumnFilter EarlyFilter(const std::shared_ptr<arrow::RecordBatch>& batch, std::shared_ptr<NSsa::TProgram> ssa); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index 8d3128fd8a..cb1cdb9070 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -219,7 +219,7 @@ std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() c return (*res)->column(0); } -std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const { +std::shared_ptr<arrow::Table> TPortionInfo::TPreparedBatchData::AssembleTable(const TAssembleOptions& options) const { std::vector<std::shared_ptr<arrow::ChunkedArray>> columns; std::vector<std::shared_ptr<arrow::Field>> fields; for (auto&& i : Columns) { @@ -230,7 +230,11 @@ std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(c fields.emplace_back(i.GetField()); } - auto table = arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns); + return arrow::Table::Make(std::make_shared<arrow::Schema>(fields), columns); +} + +std::shared_ptr<arrow::RecordBatch> TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const { + auto table = AssembleTable(options); auto res = table->CombineChunks(); Y_ABORT_UNLESS(res.ok()); return NArrow::ToBatch(*res); diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index f215bee39f..f23b65ad6b 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -498,6 +498,7 @@ public: } std::shared_ptr<arrow::RecordBatch> Assemble(const TAssembleOptions& options = {}) const; + std::shared_ptr<arrow::Table> AssembleTable(const TAssembleOptions& options = {}) const; }; template <class TExternalBlobInfo> @@ -514,16 +515,24 @@ public: TMap<size_t, TMap<ui32, TBlobRange>> columnChunks; // position in schema -> ordered chunks TMap<size_t, size_t> positionsMap; - for (auto& rec : Records) { - auto resulPos = resultSchema.GetFieldIndex(rec.ColumnId); - if (resulPos < 0) { - continue; + { + int resulPos = -1; + int dataSchemaPos = -1; + std::optional<ui32> predColumnId; + for (auto& rec : Records) { + if (!predColumnId || rec.ColumnId != *predColumnId) { + resulPos = resultSchema.GetFieldIndex(rec.ColumnId); + dataSchemaPos = dataSchema.GetFieldIndex(rec.ColumnId); + } + predColumnId = rec.ColumnId; + if (resulPos < 0) { + continue; + } + Y_ASSERT(dataSchemaPos >= 0); + positionsMap[resulPos] = dataSchemaPos; + AFL_VERIFY(columnChunks[resulPos].emplace(rec.Chunk, rec.BlobRange).second)("record", rec.DebugString()); + // AFL_VERIFY(rowsCount == NumRows(rec.ColumnId))("error", "Inconsistent rows")("portion", DebugString())("record", rec.DebugString())("column_records", NumRows(rec.ColumnId)); } - auto pos = dataSchema.GetFieldIndex(rec.ColumnId); - Y_ASSERT(pos >= 0); - positionsMap[resulPos] = pos; - AFL_VERIFY(columnChunks[resulPos].emplace(rec.Chunk, rec.BlobRange).second)("record", rec.DebugString()); -// AFL_VERIFY(rowsCount == NumRows(rec.ColumnId))("error", "Inconsistent rows")("portion", DebugString())("record", rec.DebugString())("column_records", NumRows(rec.ColumnId)); } // Make chunked arrays for columns diff --git a/ydb/core/tx/columnshard/engines/predicate/container.h b/ydb/core/tx/columnshard/engines/predicate/container.h index 8c43ea63ee..0606335cc9 100644 --- a/ydb/core/tx/columnshard/engines/predicate/container.h +++ b/ydb/core/tx/columnshard/engines/predicate/container.h @@ -63,7 +63,7 @@ public: static std::optional<TPredicateContainer> BuildPredicateTo(std::shared_ptr<NOlap::TPredicate> object, const TIndexInfo* indexInfo); - NKikimr::NArrow::TColumnFilter BuildFilter(std::shared_ptr<arrow::RecordBatch> data) const { + NKikimr::NArrow::TColumnFilter BuildFilter(const std::shared_ptr<arrow::RecordBatch>& data) const { if (!Object) { return NArrow::TColumnFilter::BuildAllowFilter(); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp index 28df6610fc..daf0dfaf5a 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.cpp @@ -8,10 +8,12 @@ bool TAssembleBatch::DoExecute() { /// It's not OK to apply predicate before replacing key duplicates otherwise. /// Assumption: dup(A, B) <=> PK(A) = PK(B) => Predicate(A) = Predicate(B) => all or no dups for PK(A) here - Y_ABORT_UNLESS(BatchConstructor.GetColumnsCount()); + auto batchConstructor = BuildBatchConstructor(FetchColumnIds); + + Y_ABORT_UNLESS(batchConstructor.GetColumnsCount()); TPortionInfo::TPreparedBatchData::TAssembleOptions options; - auto addBatch = BatchConstructor.Assemble(options); + auto addBatch = batchConstructor.Assemble(options); Y_ABORT_UNLESS(addBatch); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN) ("columns_count", addBatch->num_columns())("num_rows", addBatch->num_rows()); @@ -26,13 +28,12 @@ bool TAssembleFFBatch::DoApply(IDataReader& /*owner*/) const { return true; } -TAssembleBatch::TAssembleBatch(const NActors::TActorId& scanActorId, TPortionInfo::TPreparedBatchData&& batchConstructor, - const std::shared_ptr<IDataSource>& source, const std::shared_ptr<NArrow::TColumnFilter>& filter, NColumnShard::TCounterGuard&& taskGuard) - : TBase(scanActorId) - , BatchConstructor(batchConstructor) +TAssembleBatch::TAssembleBatch(const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<TPortionInfo>& portionInfo, + const std::shared_ptr<IDataSource>& source, const std::set<ui32>& columnIds, const THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>& blobs, const std::shared_ptr<NArrow::TColumnFilter>& filter) + : TBase(context, portionInfo, source, std::move(blobs)) , Filter(filter) - , Source(source) - , TaskGuard(std::move(taskGuard)) + , TaskGuard(Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard()) + , FetchColumnIds(columnIds) { TBase::SetPriority(TBase::EPriority::High); } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h index 12ce46a872..a4aac716ae 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/column_assembler.h @@ -5,26 +5,26 @@ #include <ydb/core/tx/columnshard/counters/common/object_counter.h> #include <ydb/core/tx/columnshard/counters/scan.h> #include <ydb/core/formats/arrow/arrow_filter.h> +#include "filter_assembler.h" namespace NKikimr::NOlap::NPlainReader { class TBatch; -class TAssembleBatch: public NColumnShard::IDataTasksProcessor::ITask, public NColumnShard::TMonitoringObjectsCounter<TAssembleBatch, true, true> { +class TAssembleBatch: public TAssemblerCommon, public NColumnShard::TMonitoringObjectsCounter<TAssembleBatch, true, true> { private: - using TBase = NColumnShard::IDataTasksProcessor::ITask; - TPortionInfo::TPreparedBatchData BatchConstructor; + using TBase = TAssemblerCommon; std::shared_ptr<NArrow::TColumnFilter> Filter; protected: - std::shared_ptr<IDataSource> Source; std::shared_ptr<arrow::RecordBatch> Result; const NColumnShard::TCounterGuard TaskGuard; + const std::set<ui32> FetchColumnIds; virtual bool DoExecute() override; public: virtual TString GetTaskClassIdentifier() const override { return "PlainReader::TAssembleBatch"; } - TAssembleBatch(const NActors::TActorId& scanActorId, TPortionInfo::TPreparedBatchData&& batchConstructor, - const std::shared_ptr<IDataSource>& source, const std::shared_ptr<NArrow::TColumnFilter>& filter, NColumnShard::TCounterGuard&& taskGuard); + TAssembleBatch(const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<TPortionInfo>& portionInfo, + const std::shared_ptr<IDataSource>& source, const std::set<ui32>& columnIds, const THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>& blobs, const std::shared_ptr<NArrow::TColumnFilter>& filter); }; class TAssembleFFBatch: public TAssembleBatch { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp index cb15d1f327..6c3100e51a 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.cpp @@ -9,7 +9,7 @@ namespace NKikimr::NOlap::NPlainReader { -TPortionInfo::TPreparedBatchData TAssembleColumnsTaskConstructor::BuildBatchAssembler() { +THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> TAssembleColumnsTaskConstructor::BuildBatchAssembler() { auto blobs = ExtractBlobsData(); THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> blobsDataAssemble; for (auto&& i : blobs) { @@ -18,34 +18,29 @@ TPortionInfo::TPreparedBatchData TAssembleColumnsTaskConstructor::BuildBatchAsse for (auto&& i : NullBlocks) { AFL_VERIFY(blobsDataAssemble.emplace(i.first, i.second).second); } - auto blobSchema = Context->GetReadMetadata()->GetLoadSchema(PortionInfo->GetMinSnapshot()); - auto readSchema = Context->GetReadMetadata()->GetLoadSchema(Context->GetReadMetadata()->GetSnapshot()); - ISnapshotSchema::TPtr resultSchema; - if (ColumnIds.size()) { - resultSchema = std::make_shared<TFilteredSnapshotSchema>(readSchema, ColumnIds); - } else { - resultSchema = readSchema; - } - - return PortionInfo->PrepareForAssemble(*blobSchema, *resultSchema, blobsDataAssemble); + return blobsDataAssemble; } void TEFTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) { - NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TAssembleFilter>(Context->GetCommonContext()->GetScanActorId(), BuildBatchAssembler(), - Context->GetReadMetadata(), Source, ColumnIds, UseEarlyFilter, Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard(), PortionInfo->RecordSnapshotMax())); + auto task = std::make_shared<TAssembleFilter>(Context, PortionInfo, Source, ColumnIds, UseEarlyFilter, BuildBatchAssembler()); + task->SetPriority(NConveyor::ITask::EPriority::Normal); + NConveyor::TScanServiceOperator::SendTaskToExecute(task); } void TFFColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) { - NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TAssembleFFBatch>(Context->GetCommonContext()->GetScanActorId(), BuildBatchAssembler(), - Source, AppliedFilter, Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard())); + auto task = std::make_shared<TAssembleFFBatch>(Context, PortionInfo, Source, ColumnIds, BuildBatchAssembler(), AppliedFilter); + task->SetPriority(NConveyor::ITask::EPriority::High); + NConveyor::TScanServiceOperator::SendTaskToExecute(task); } void TCommittedColumnsTaskConstructor::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) { auto blobs = ExtractBlobsData(); Y_ABORT_UNLESS(NullBlocks.size() == 0); Y_ABORT_UNLESS(blobs.size() == 1); - NConveyor::TScanServiceOperator::SendTaskToExecute(std::make_shared<TCommittedAssembler>(Context->GetCommonContext()->GetScanActorId(), blobs.begin()->second, - Context->GetReadMetadata(), Source, CommittedBlob, Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard())); + auto task = std::make_shared<TCommittedAssembler>(Context->GetCommonContext()->GetScanActorId(), blobs.begin()->second, + Context->GetReadMetadata(), Source, CommittedBlob, Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard()); + task->SetPriority(NConveyor::ITask::EPriority::High); + NConveyor::TScanServiceOperator::SendTaskToExecute(task); } bool IFetchTaskConstructor::DoOnError(const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h index 8cf0be0272..bd6ef28dcd 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/constructor.h @@ -51,7 +51,7 @@ private: protected: std::set<ui32> ColumnIds; std::shared_ptr<TPortionInfo> PortionInfo; - TPortionInfo::TPreparedBatchData BuildBatchAssembler(); + THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> BuildBatchAssembler(); public: TAssembleColumnsTaskConstructor(const std::shared_ptr<TSpecialReadContext>& context, const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, THashMap<TBlobRange, ui32>&& nullBlocks, const std::set<ui32>& columnIds, const TPortionDataSource& portion, const std::shared_ptr<IDataSource>& sourcePtr, const TString& taskCustomer) diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp index a02a3b525f..1c88f99aa9 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.cpp @@ -12,11 +12,29 @@ bool TAssembleFilter::DoExecute() { TPortionInfo::TPreparedBatchData::TAssembleOptions options; options.IncludedColumnIds = FilterColumnIds; - auto batch = BatchConstructor.Assemble(options); + if (RecordsMaxSnapshot < ReadMetadata->GetSnapshot() && UseFilter) { + for (auto&& i : TIndexInfo::GetSpecialColumnIds()) { + options.IncludedColumnIds->erase(i); + } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_special_columns"); + } + + auto batchConstructor = BuildBatchConstructor(FilterColumnIds); + + auto batch = batchConstructor.Assemble(options); Y_ABORT_UNLESS(batch); Y_ABORT_UNLESS(batch->num_rows()); + if (RecordsMaxSnapshot < ReadMetadata->GetSnapshot() && UseFilter) { + for (auto&& f : TIndexInfo::ArrowSchemaSnapshot()->fields()) { + auto c = NArrow::TStatusValidator::GetValid(arrow::MakeArrayOfNull(f->type(), batch->num_rows())); + batch = NArrow::TStatusValidator::GetValid(batch->AddColumn(batch->num_columns(), f, c)); + } + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "restore_fake_special_columns"); + } + OriginalCount = batch->num_rows(); - AppliedFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::FilterPortion(batch, *ReadMetadata)); + AppliedFilter = std::make_shared<NArrow::TColumnFilter>(NOlap::FilterPortion(batch, *ReadMetadata, ReadMetadata->GetSnapshot() <= RecordsMaxSnapshot)); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "first_filter_using"); if (!AppliedFilter->Apply(batch)) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "skip_data")("original_count", OriginalCount)("columns_count", FilterColumnIds.size()); return true; @@ -34,17 +52,17 @@ bool TAssembleFilter::DoExecute() { } } - if ((size_t)batch->schema()->num_fields() < BatchConstructor.GetColumnsCount()) { + if ((size_t)batch->schema()->num_fields() < batchConstructor.GetColumnsCount()) { TPortionInfo::TPreparedBatchData::TAssembleOptions options; options.ExcludedColumnIds = FilterColumnIds; - auto addBatch = BatchConstructor.Assemble(options); + auto addBatch = batchConstructor.Assemble(options); Y_ABORT_UNLESS(addBatch); Y_ABORT_UNLESS(AppliedFilter->Apply(addBatch)); - Y_ABORT_UNLESS(NArrow::MergeBatchColumns({ batch, addBatch }, batch, BatchConstructor.GetSchemaColumnNames(), true)); + Y_ABORT_UNLESS(NArrow::MergeBatchColumns({ batch, addBatch }, batch, batchConstructor.GetSchemaColumnNames(), true)); } - AFL_VERIFY(AppliedFilter->Size() == OriginalCount)("original", OriginalCount)("af_count", AppliedFilter->Size()); + AFL_VERIFY(AppliedFilter->IsTotalAllowFilter() || AppliedFilter->Size() == OriginalCount)("original", OriginalCount)("af_count", AppliedFilter->Size()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "not_skip_data") - ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", BatchConstructor.GetColumnsCount())("allow_early", AllowEarlyFilter) + ("original_count", OriginalCount)("filtered_count", batch->num_rows())("columns_count", batchConstructor.GetColumnsCount())("use_filter", UseFilter) ("filter_columns", FilterColumnIds.size())("af_count", AppliedFilter->Size())("ef_count", earlyFilter ? earlyFilter->Size() : 0); FilteredBatch = batch; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h index 927d792a4e..d0d9b3e197 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/filter_assembler.h @@ -8,44 +8,76 @@ namespace NKikimr::NOlap::NPlainReader { - class TAssembleFilter: public NColumnShard::IDataTasksProcessor::ITask, public NColumnShard::TMonitoringObjectsCounter<TAssembleFilter, true, true> { - private: - using TBase = NColumnShard::IDataTasksProcessor::ITask; - TPortionInfo::TPreparedBatchData BatchConstructor; - std::shared_ptr<arrow::RecordBatch> FilteredBatch; - const std::shared_ptr<IDataSource> Source; - TReadMetadata::TConstPtr ReadMetadata; - std::shared_ptr<NArrow::TColumnFilter> AppliedFilter; - std::shared_ptr<NArrow::TColumnFilter> EarlyFilter; - const TSnapshot RecordsMaxSnapshot; - ui32 OriginalCount = 0; - bool AllowEarlyFilter = false; - std::set<ui32> FilterColumnIds; - const bool UseFilter = true; - const NColumnShard::TCounterGuard TaskGuard; - protected: - virtual bool DoApply(IDataReader& owner) const override; - virtual bool DoExecute() override; - public: - - virtual TString GetTaskClassIdentifier() const override { - return "PlainReading::TAssembleFilter"; - } +class TAssemblerCommon: public NColumnShard::IDataTasksProcessor::ITask { +private: + using TBase = NColumnShard::IDataTasksProcessor::ITask; +protected: + const std::shared_ptr<TSpecialReadContext> Context; + const std::shared_ptr<IDataSource> Source; + const std::shared_ptr<TPortionInfo> PortionInfo; + const TReadMetadata::TConstPtr ReadMetadata; + THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Blobs; - TAssembleFilter(const NActors::TActorId& scanActorId, TPortionInfo::TPreparedBatchData&& batchConstructor, NOlap::TReadMetadata::TConstPtr readMetadata, - const std::shared_ptr<IDataSource>& source, const std::set<ui32>& filterColumnIds, const bool useFilter, NColumnShard::TCounterGuard&& taskGuard, const TSnapshot& recordsMaxSnapshot) - : TBase(scanActorId) - , BatchConstructor(batchConstructor) - , Source(source) - , ReadMetadata(readMetadata) - , RecordsMaxSnapshot(recordsMaxSnapshot) - , FilterColumnIds(filterColumnIds) - , UseFilter(useFilter) - , TaskGuard(std::move(taskGuard)) - { - Y_UNUSED(RecordsMaxSnapshot); - TBase::SetPriority(TBase::EPriority::Normal); + TPortionInfo::TPreparedBatchData BuildBatchConstructor(const std::set<ui32>& columnIds) const { + auto blobSchema = ReadMetadata->GetLoadSchema(PortionInfo->GetMinSnapshot()); + auto readSchema = ReadMetadata->GetLoadSchema(Context->GetReadMetadata()->GetSnapshot()); + ISnapshotSchema::TPtr resultSchema; + if (columnIds.size()) { + resultSchema = std::make_shared<TFilteredSnapshotSchema>(readSchema, columnIds); + } else { + resultSchema = readSchema; } - }; + + return PortionInfo->PrepareForAssemble(*blobSchema, *resultSchema, Blobs); + } + +public: + TAssemblerCommon(const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<TPortionInfo>& portionInfo, + const std::shared_ptr<IDataSource>& source, const THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>& blobs) + : TBase(context->GetCommonContext()->GetScanActorId()) + , Context(context) + , Source(source) + , PortionInfo(portionInfo) + , ReadMetadata(Context->GetReadMetadata()) + , Blobs(blobs) + { + + } +}; + +class TAssembleFilter: public TAssemblerCommon, public NColumnShard::TMonitoringObjectsCounter<TAssembleFilter, true, true> { +private: + using TBase = TAssemblerCommon; + + std::shared_ptr<arrow::RecordBatch> FilteredBatch; + std::shared_ptr<NArrow::TColumnFilter> AppliedFilter; + std::shared_ptr<NArrow::TColumnFilter> EarlyFilter; + const TSnapshot RecordsMaxSnapshot; + ui32 OriginalCount = 0; + std::set<ui32> FilterColumnIds; + const bool UseFilter = true; + const NColumnShard::TCounterGuard TaskGuard; + THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo> Blobs; +protected: + virtual bool DoApply(IDataReader& owner) const override; + virtual bool DoExecute() override; +public: + + virtual TString GetTaskClassIdentifier() const override { + return "PlainReading::TAssembleFilter"; + } + + TAssembleFilter(const std::shared_ptr<TSpecialReadContext>& context, const std::shared_ptr<TPortionInfo>& portionInfo, + const std::shared_ptr<IDataSource>& source, const std::set<ui32>& filterColumnIds, const bool useFilter, const THashMap<TBlobRange, TPortionInfo::TAssembleBlobInfo>& blobs) + : TBase(context, portionInfo, source, std::move(blobs)) + , RecordsMaxSnapshot(PortionInfo->RecordSnapshotMax()) + , FilterColumnIds(filterColumnIds) + , UseFilter(useFilter) + , TaskGuard(Context->GetCommonContext()->GetCounters().GetAssembleTasksGuard()) + { + Y_UNUSED(RecordsMaxSnapshot); + TBase::SetPriority(TBase::EPriority::Normal); + } +}; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index c370b76c7b..d76f0ad0f5 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -5,6 +5,7 @@ #include "plain_read_data.h" #include <ydb/core/formats/arrow/serializer/full.h> #include <ydb/core/tx/columnshard/blobs_reader/actor.h> +#include <ydb/core/tx/columnshard/blobs_reader/events.h> namespace NKikimr::NOlap::NPlainReader { @@ -93,6 +94,7 @@ void TPortionDataSource::DoStartFilterStage(const std::shared_ptr<IDataSource>& std::vector<std::shared_ptr<IBlobsReadingAction>> actions = {readAction}; auto constructor = std::make_shared<TEFTaskConstructor>(GetContext(), actions, std::move(nullBlocks), columnIds, *this, sourcePtr, FetchingPlan->CanUseEarlyFilterImmediately(), "ReaderFilter"); +// NActors::TActivationContext::AsActorContext().Send(GetContext()->GetCommonContext()->GetReadCoordinatorActorId(), new NOlap::NBlobOperations::NRead::TEvStartReadTask(constructor)); NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor)); } diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h index 130876ae11..228e3cded4 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/read_context.h @@ -34,6 +34,7 @@ private: NResourceBroker::NSubscribe::TTaskContext ResourcesTaskContext; const TActorId ScanActorId; const TActorId ResourceSubscribeActorId; + const TActorId ReadCoordinatorActorId; public: bool IsReverse() const { return ReadMetadata->IsDescSorted(); @@ -43,6 +44,10 @@ public: return ResourceSubscribeActorId; } + const TActorId& GetReadCoordinatorActorId() const { + return ReadCoordinatorActorId; + } + const TActorId& GetScanActorId() const { return ScanActorId; } @@ -60,7 +65,7 @@ public: } TReadContext(const std::shared_ptr<IStoragesManager>& storagesManager, const NColumnShard::TConcreteScanCounters& counters, const bool isInternalRead, const TReadMetadataBase::TConstPtr& readMetadata, - const TActorId& scanActorId, const TActorId& resourceSubscribeActorId) + const TActorId& scanActorId, const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId) : StoragesManager(storagesManager) , Counters(counters) , IsInternalRead(isInternalRead) @@ -68,6 +73,7 @@ public: , ResourcesTaskContext("CS::SCAN_READ", counters.ResourcesSubscriberCounters) , ScanActorId(scanActorId) , ResourceSubscribeActorId(resourceSubscribeActorId) + , ReadCoordinatorActorId(readCoordinatorActorId) { Y_ABORT_UNLESS(ReadMetadata); } diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 1165be0275..106a583c05 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -7,6 +7,7 @@ #include "blobs_reader/actor.h" #include "engines/reader/read_context.h" #include "resource_subscriber/actor.h" +#include "blobs_reader/read_coordinator.h" namespace NKikimr::NColumnShard { namespace { @@ -155,7 +156,8 @@ public: void Bootstrap(const TActorContext& ctx) { ResourceSubscribeActorId = ctx.Register(new NOlap::NResourceBroker::NSubscribe::TActor(TabletId, SelfId())); - IndexedData = ReadMetadata->BuildReader(std::make_shared<NOlap::TReadContext>(Storages, Counters, true, ReadMetadata, SelfId(), ResourceSubscribeActorId)); + ReadCoordinatorActorId = ctx.Register(new NOlap::NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId())); + IndexedData = ReadMetadata->BuildReader(std::make_shared<NOlap::TReadContext>(Storages, Counters, true, ReadMetadata, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId)); LOG_S_DEBUG("Starting read (" << IndexedData->DebugString(false) << ") at tablet " << TabletId); bool earlyExit = false; @@ -202,6 +204,7 @@ private: TActorId BlobCacheActorId; std::unique_ptr<TEvColumnShard::TEvReadResult> Result; TActorId ResourceSubscribeActorId; + TActorId ReadCoordinatorActorId; NOlap::TReadMetadata::TConstPtr ReadMetadata; std::shared_ptr<NOlap::IDataReader> IndexedData; TInstant Deadline; |