diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-30 11:21:29 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-30 11:21:29 +0300 |
commit | 54947f36c3e859b9de760b04ca870e347433683b (patch) | |
tree | 31358ff3a303726e5931911c835d22008a6258bd | |
parent | 3e1899838408bbad47622007aa382bc8a2b01f87 (diff) | |
download | ydb-54947f36c3e859b9de760b04ca870e347433683b.tar.gz |
context for common objects providing
15 files changed, 88 insertions, 38 deletions
diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.cpp b/ydb/core/tx/columnshard/columnshard__index_scan.cpp index 56d73d17475..fd7e8d47815 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__index_scan.cpp @@ -4,12 +4,10 @@ namespace NKikimr::NColumnShard { -TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, - NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TConcreteScanCounters& scanCounters) - : ReadMetadata(readMetadata) - , IndexedData(ReadMetadata, false, scanCounters, processor) - , DataTasksProcessor(processor) - , ScanCounters(scanCounters) +TColumnShardScanIterator::TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, const NOlap::TReadContext& context) + : Context(context) + , ReadMetadata(readMetadata) + , IndexedData(ReadMetadata, false, context) { ui32 batchNo = 0; for (size_t i = 0; i < ReadMetadata->CommittedBlobs.size(); ++i, ++batchNo) { @@ -89,26 +87,26 @@ void TColumnShardScanIterator::FillReadyResults() { } if (limitLeft == 0) { - DataTasksProcessor.Stop(); WaitCommitted.clear(); IndexedData.Abort(); } if (WaitCommitted.empty() && IndexedData.IsFinished()) { - DataTasksProcessor.Stop(); + Context.MutableProcessor().Stop(); } } bool TColumnShardScanIterator::HasWaitingTasks() const { - return DataTasksProcessor.InWaiting(); + return Context.GetProcessor().InWaiting(); } TColumnShardScanIterator::~TColumnShardScanIterator() { + IndexedData.Abort(); ReadMetadata->ReadStats->PrintToLog(); } void TColumnShardScanIterator::Apply(IDataTasksProcessor::ITask::TPtr task) { - if (!task->IsDataProcessed() || DataTasksProcessor.IsStopped() || !task->IsSameProcessor(DataTasksProcessor)) { + if (!task->IsDataProcessed() || Context.GetProcessor().IsStopped() || !task->IsSameProcessor(Context.GetProcessor())) { return; } Y_VERIFY(task->Apply(IndexedData.GetGranulesContext())); diff --git a/ydb/core/tx/columnshard/columnshard__index_scan.h b/ydb/core/tx/columnshard/columnshard__index_scan.h index 44b4fb85651..c41ecc88507 100644 --- a/ydb/core/tx/columnshard/columnshard__index_scan.h +++ b/ydb/core/tx/columnshard/columnshard__index_scan.h @@ -29,16 +29,15 @@ using NOlap::TBlobRange; class TColumnShardScanIterator: public TScanIteratorBase { private: + NOlap::TReadContext Context; NOlap::TReadMetadata::TConstPtr ReadMetadata; NOlap::TIndexedReadData IndexedData; std::unordered_map<NOlap::TCommittedBlob, ui32, THash<NOlap::TCommittedBlob>> WaitCommitted; TDeque<NOlap::TPartialReadResult> ReadyResults; ui64 ItemsRead = 0; const i64 MaxRowsInBatch = 5000; - NColumnShard::TDataTasksProcessorContainer DataTasksProcessor; - NColumnShard::TConcreteScanCounters ScanCounters; public: - TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, NColumnShard::TDataTasksProcessorContainer processor, const NColumnShard::TConcreteScanCounters& scanCounters); + TColumnShardScanIterator(NOlap::TReadMetadata::TConstPtr readMetadata, const NOlap::TReadContext& context); ~TColumnShardScanIterator(); virtual std::optional<ui32> GetAvailableResultsCount() const override { diff --git a/ydb/core/tx/columnshard/columnshard__scan.cpp b/ydb/core/tx/columnshard/columnshard__scan.cpp index b4747eb6aee..cbb1c75b5bd 100644 --- a/ydb/core/tx/columnshard/columnshard__scan.cpp +++ b/ydb/core/tx/columnshard/columnshard__scan.cpp @@ -1,3 +1,5 @@ +#include "engines/reader/read_context.h" + #include <ydb/core/tx/columnshard/columnshard__scan.h> #include <ydb/core/tx/columnshard/columnshard__index_scan.h> #include <ydb/core/tx/columnshard/columnshard__stats_scan.h> @@ -99,7 +101,8 @@ public: Schedule(Deadline, new TEvents::TEvWakeup); Y_VERIFY(!ScanIterator); - ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(MakeTasksProcessor(), ScanCountersPool); + NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool); + ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); // propagate self actor id // TODO: FlagSubscribeOnSession ? Send(ScanComputeActorId, new TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen), IEventHandle::FlagTrackDelivery); @@ -416,7 +419,8 @@ private: return Finish(); } - ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(MakeTasksProcessor(), ScanCountersPool); + NOlap::TReadContext context(MakeTasksProcessor(), ScanCountersPool); + ScanIterator = ReadMetadataRanges[ReadMetadataIndex]->StartScan(context); // Used in TArrowToYdbConverter ResultYqlSchema.clear(); } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 770d55539f1..9ebde609884 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -107,11 +107,11 @@ void TIndexedReadData::AddBlobForFetch(const TBlobRange& range, NIndexedReader:: Y_VERIFY(IndexedBlobs.emplace(range).second); Y_VERIFY(IndexedBlobSubscriber.emplace(range, &batch).second); if (batch.GetFetchedInfo().GetFilter()) { - Counters.PostFilterBytes->Add(range.Size); + Context.GetCounters().PostFilterBytes->Add(range.Size); ReadMetadata->ReadStats->DataAdditionalBytes += range.Size; PriorityBlobsQueue.emplace_back(batch.GetGranule(), range); } else { - Counters.FilterBytes->Add(range.Size); + Context.GetCounters().FilterBytes->Add(range.Size); ReadMetadata->ReadStats->DataFilterBytes += range.Size; FetchBlobsQueue.emplace_back(batch.GetGranule(), range); } @@ -144,7 +144,7 @@ void TIndexedReadData::InitRead(ui32 inputBatch) { } GranulesContext->PrepareForStart(); - Counters.PortionBytes->Add(portionsBytes); + Context.GetCounters().PortionBytes->Add(portionsBytes); auto& stats = ReadMetadata->ReadStats; stats->IndexGranules = ReadMetadata->SelectInfo->Granules.size(); stats->IndexPortions = ReadMetadata->SelectInfo->Portions.size(); @@ -411,7 +411,7 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco // Extract the last row's PK auto keyBatch = NArrow::ExtractColumns(batch, indexInfo.GetReplaceKey()); - auto lastKey = keyBatch->Slice(keyBatch->num_rows()-1, 1); + auto lastKey = keyBatch->Slice(keyBatch->num_rows() - 1, 1); // Leave only requested columns auto resultBatch = NArrow::ExtractColumns(batch, ReadMetadata->GetResultSchema()); @@ -435,9 +435,8 @@ TIndexedReadData::MakeResult(std::vector<std::vector<std::shared_ptr<arrow::Reco } TIndexedReadData::TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, - const bool internalRead, const NColumnShard::TConcreteScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor) - : Counters(counters) - , TasksProcessor(tasksProcessor) + const bool internalRead, const TReadContext& context) + : Context(context) , ReadMetadata(readMetadata) , OnePhaseReadMode(internalRead) { @@ -451,6 +450,7 @@ bool TIndexedReadData::IsFinished() const { void TIndexedReadData::Abort() { Y_VERIFY(GranulesContext); + Context.MutableProcessor().Stop(); FetchBlobsQueue.Stop(); PriorityBlobsQueue.Stop(); GranulesContext->Abort(); @@ -462,7 +462,7 @@ NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob() { auto* f = PriorityBlobsQueue.front(); if (f) { GranulesContext->ForceStartProcessGranule(f->GetGranuleId(), f->GetRange()); - Counters.OnPriorityFetch(f->GetRange().Size); + Context.GetCounters().OnPriorityFetch(f->GetRange().Size); return PriorityBlobsQueue.pop_front(); } } @@ -472,10 +472,10 @@ NKikimr::NOlap::TBlobRange TIndexedReadData::ExtractNextBlob() { return TBlobRange(); } if (GranulesContext->TryStartProcessGranule(f->GetGranuleId(), f->GetRange())) { - Counters.OnGeneralFetch(f->GetRange().Size); + Context.GetCounters().OnGeneralFetch(f->GetRange().Size); return FetchBlobsQueue.pop_front(); } else { - Counters.OnProcessingOverloaded(); + Context.GetCounters().OnProcessingOverloaded(); return TBlobRange(); } } diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.h b/ydb/core/tx/columnshard/engines/indexed_read_data.h index 3ea10230cf2..527b7d8a0cf 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.h +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.h @@ -19,10 +19,9 @@ namespace NKikimr::NOlap { class TIndexedReadData { private: + TReadContext Context; std::unique_ptr<NIndexedReader::TGranulesFillingContext> GranulesContext; - NColumnShard::TConcreteScanCounters Counters; - NColumnShard::TDataTasksProcessorContainer TasksProcessor; TFetchBlobsQueue FetchBlobsQueue; TFetchBlobsQueue PriorityBlobsQueue; NOlap::TReadMetadata::TConstPtr ReadMetadata; @@ -36,15 +35,14 @@ private: std::shared_ptr<NArrow::TSortDescription> SortReplaceDescription; public: - TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, - const bool internalRead, const NColumnShard::TConcreteScanCounters& counters, NColumnShard::TDataTasksProcessorContainer tasksProcessor); + TIndexedReadData(NOlap::TReadMetadata::TConstPtr readMetadata, const bool internalRead, const TReadContext& context); const NColumnShard::TConcreteScanCounters& GetCounters() const noexcept { - return Counters; + return Context.GetCounters(); } const NColumnShard::TDataTasksProcessorContainer& GetTasksProcessor() const noexcept { - return TasksProcessor; + return Context.GetProcessor(); } NIndexedReader::TGranulesFillingContext& GetGranulesContext() { diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt index 09b5f88568c..528f351f281 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.darwin-x86_64.txt @@ -44,6 +44,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt index 1cd668a0451..056b2971521 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-aarch64.txt @@ -45,6 +45,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt index 1cd668a0451..056b2971521 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.linux-x86_64.txt @@ -45,6 +45,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt index 09b5f88568c..528f351f281 100644 --- a/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/engines/reader/CMakeLists.windows-x86_64.txt @@ -44,6 +44,7 @@ target_sources(columnshard-engines-reader PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/queue.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_filter_merger.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_context.cpp ) generate_enum_serilization(columnshard-engines-reader ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/engines/reader/read_metadata.h diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.cpp b/ydb/core/tx/columnshard/engines/reader/read_context.cpp new file mode 100644 index 00000000000..390bc57ae8e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/read_context.cpp @@ -0,0 +1,14 @@ +#include "read_context.h" +#include <library/cpp/actors/core/events.h> + +namespace NKikimr::NOlap { + +TReadContext::TReadContext(const NColumnShard::TDataTasksProcessorContainer& processor, + const NColumnShard::TConcreteScanCounters& counters) + : Processor(processor) + , Counters(counters) +{ + +} + +} diff --git a/ydb/core/tx/columnshard/engines/reader/read_context.h b/ydb/core/tx/columnshard/engines/reader/read_context.h new file mode 100644 index 00000000000..09ede34dc0e --- /dev/null +++ b/ydb/core/tx/columnshard/engines/reader/read_context.h @@ -0,0 +1,28 @@ +#pragma once +#include "conveyor_task.h" +#include <ydb/core/tx/columnshard/counters/scan.h> +#include <library/cpp/actors/core/actor.h> + +namespace NKikimr::NOlap { + +class TReadContext { +private: + YDB_ACCESSOR_DEF(NColumnShard::TDataTasksProcessorContainer, Processor); + const NColumnShard::TConcreteScanCounters Counters; +public: + const NColumnShard::TConcreteScanCounters& GetCounters() const { + return Counters; + } + + TReadContext(const NColumnShard::TDataTasksProcessorContainer& processor, + const NColumnShard::TConcreteScanCounters& counters + ); + + TReadContext(const NColumnShard::TConcreteScanCounters& counters) + : Counters(counters) + { + + } +}; + +} diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp index 1870d352856..3fd405fc311 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.cpp @@ -35,8 +35,8 @@ std::shared_ptr<arrow::RecordBatch> TDataStorageAccessor::GetCachedBatch(const T return BatchCache.Get(blobId); } -std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TConcreteScanCounters& scanCounters) const { - return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), tasksProcessor, scanCounters); +std::unique_ptr<NColumnShard::TScanIteratorBase> TReadMetadata::StartScan(const NOlap::TReadContext& readContext) const { + return std::make_unique<NColumnShard::TColumnShardScanIterator>(this->shared_from_this(), readContext); } bool TReadMetadata::Init(const TReadDescription& readDescription, const TDataStorageAccessor& dataAccessor, std::string& error) { @@ -177,7 +177,7 @@ std::vector<std::pair<TString, NScheme::TTypeInfo>> TReadStatsMetadata::GetKeyYq return NOlap::GetColumns(NColumnShard::PrimaryIndexStatsSchema, NColumnShard::PrimaryIndexStatsSchema.KeyColumns); } -std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan(NColumnShard::TDataTasksProcessorContainer /*tasksProcessor*/, const NColumnShard::TConcreteScanCounters& /*scanCounters*/) const { +std::unique_ptr<NColumnShard::TScanIteratorBase> TReadStatsMetadata::StartScan(const NOlap::TReadContext& /*readContext*/) const { return std::make_unique<NColumnShard::TStatsIterator>(this->shared_from_this()); } diff --git a/ydb/core/tx/columnshard/engines/reader/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/read_metadata.h index 32a0eca89a4..af5debd2112 100644 --- a/ydb/core/tx/columnshard/engines/reader/read_metadata.h +++ b/ydb/core/tx/columnshard/engines/reader/read_metadata.h @@ -1,6 +1,7 @@ #pragma once #include "conveyor_task.h" #include "description.h" +#include "read_context.h" #include <ydb/library/accessor/accessor.h> #include <ydb/core/tx/columnshard/blob.h> #include <ydb/core/tx/columnshard/counters.h> @@ -117,7 +118,7 @@ public: virtual std::vector<std::pair<TString, NScheme::TTypeInfo>> GetResultYqlSchema() const = 0; virtual std::vector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const = 0; - virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TConcreteScanCounters& scanCounters) const = 0; + virtual std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(const NOlap::TReadContext& readContext) const = 0; // TODO: can this only be done for base class? friend IOutputStream& operator << (IOutputStream& out, const TReadMetadataBase& meta) { @@ -145,6 +146,9 @@ private: public: using TConstPtr = std::shared_ptr<const TReadMetadata>; + const std::vector<ui32>& GetAllColumns() const { + return AllColumns; + } std::shared_ptr<TSelectInfo> SelectInfo; std::vector<TCommittedBlob> CommittedBlobs; THashMap<TUnifiedBlobId, std::shared_ptr<arrow::RecordBatch>> CommittedBatches; @@ -261,7 +265,7 @@ public: return SelectInfo->Stats().Blobs; } - std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TConcreteScanCounters& scanCounters) const override; + std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(const NOlap::TReadContext& readContext) const override; void Dump(IOutputStream& out) const override { out << "columns: " << GetSchemaColumnsCount() @@ -302,7 +306,7 @@ public: std::vector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const override; - std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(NColumnShard::TDataTasksProcessorContainer tasksProcessor, const NColumnShard::TConcreteScanCounters& scanCounters) const override; + std::unique_ptr<NColumnShard::TScanIteratorBase> StartScan(const NOlap::TReadContext& readContext) const override; }; } diff --git a/ydb/core/tx/columnshard/engines/reader/ya.make b/ydb/core/tx/columnshard/engines/reader/ya.make index f6b7761c890..b90f70893c4 100644 --- a/ydb/core/tx/columnshard/engines/reader/ya.make +++ b/ydb/core/tx/columnshard/engines/reader/ya.make @@ -13,6 +13,7 @@ SRCS( queue.cpp read_filter_merger.cpp read_metadata.cpp + read_context.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index ca48cf66d51..6d54d0618d2 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -35,7 +35,7 @@ public: , BlobCacheActorId(NBlobCache::MakeBlobCacheServiceId()) , Result(std::move(event)) , ReadMetadata(readMetadata) - , IndexedData(ReadMetadata, true, counters, TDataTasksProcessorContainer()) + , IndexedData(ReadMetadata, true, NOlap::TReadContext(counters)) , Deadline(deadline) , ColumnShardActorId(columnShardActorId) , RequestCookie(requestCookie) |