diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-16 16:49:36 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-06-16 16:49:36 +0300 |
commit | d4153d39978fe17e3370d3d70999939b57709884 (patch) | |
tree | 9eb04b02c1b0409c7283216428b90cefa3d98bd7 | |
parent | 135a41cd35053f49e5423def5bc862ee440fd80c (diff) | |
download | ydb-d4153d39978fe17e3370d3d70999939b57709884.tar.gz |
speed up in case reverse sorting - fix out of memory
19 files changed, 209 insertions, 24 deletions
diff --git a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt index 7914110dfd..0c8e6922f2 100644 --- a/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(testlib) @@ -38,6 +39,7 @@ target_link_libraries(core-tx-columnshard PUBLIC ydb-core-tablet_flat tx-columnshard-engines tx-columnshard-counters + tx-columnshard-common core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt index 99b3dea111..e6d1912b29 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-aarch64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(testlib) @@ -39,6 +40,7 @@ target_link_libraries(core-tx-columnshard PUBLIC ydb-core-tablet_flat tx-columnshard-engines tx-columnshard-counters + tx-columnshard-common core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt index 99b3dea111..e6d1912b29 100644 --- a/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.linux-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(testlib) @@ -39,6 +40,7 @@ target_link_libraries(core-tx-columnshard PUBLIC ydb-core-tablet_flat tx-columnshard-engines tx-columnshard-counters + tx-columnshard-common core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt index 7914110dfd..0c8e6922f2 100644 --- a/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt +++ b/ydb/core/tx/columnshard/CMakeLists.windows-x86_64.txt @@ -6,6 +6,7 @@ # original buildsystem will not be accepted. +add_subdirectory(common) add_subdirectory(counters) add_subdirectory(engines) add_subdirectory(testlib) @@ -38,6 +39,7 @@ target_link_libraries(core-tx-columnshard PUBLIC ydb-core-tablet_flat tx-columnshard-engines tx-columnshard-counters + tx-columnshard-common core-tx-tiering tx-conveyor-usage tx-long_tx_service-public diff --git a/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..228521a024 --- /dev/null +++ b/ydb/core/tx/columnshard/common/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-common) +target_link_libraries(tx-columnshard-common PUBLIC + contrib-libs-cxxsupp + yutil +) +target_sources(tx-columnshard-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp +) diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..9f38a9ba79 --- /dev/null +++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-aarch64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-common) +target_link_libraries(tx-columnshard-common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil +) +target_sources(tx-columnshard-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp +) diff --git a/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..9f38a9ba79 --- /dev/null +++ b/ydb/core/tx/columnshard/common/CMakeLists.linux-x86_64.txt @@ -0,0 +1,18 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-common) +target_link_libraries(tx-columnshard-common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil +) +target_sources(tx-columnshard-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp +) diff --git a/ydb/core/tx/columnshard/common/CMakeLists.txt b/ydb/core/tx/columnshard/common/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/tx/columnshard/common/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..228521a024 --- /dev/null +++ b/ydb/core/tx/columnshard/common/CMakeLists.windows-x86_64.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(tx-columnshard-common) +target_link_libraries(tx-columnshard-common PUBLIC + contrib-libs-cxxsupp + yutil +) +target_sources(tx-columnshard-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/common/reverse_accessor.cpp +) diff --git a/ydb/core/tx/columnshard/common/reverse_accessor.cpp b/ydb/core/tx/columnshard/common/reverse_accessor.cpp new file mode 100644 index 0000000000..a56add4228 --- /dev/null +++ b/ydb/core/tx/columnshard/common/reverse_accessor.cpp @@ -0,0 +1,5 @@ +#include "reverse_accessor.h" + +namespace NKikimr::NColumnShard { + +} diff --git a/ydb/core/tx/columnshard/common/reverse_accessor.h b/ydb/core/tx/columnshard/common/reverse_accessor.h new file mode 100644 index 0000000000..5e86931741 --- /dev/null +++ b/ydb/core/tx/columnshard/common/reverse_accessor.h @@ -0,0 +1,80 @@ +#pragma once +#include <optional> + +namespace NKikimr::NColumnShard { + +template <class TContainer> +class TContainerAccessorWithDirection { +private: + const TContainer& Container = nullptr; + const bool Reverse = false; +public: + TContainerAccessorWithDirection(const TContainer& c, const bool reverse) + : Container(c) + , Reverse(reverse) { + + } + + class TIterator { + private: + std::optional<typename TContainer::const_iterator> ForwardIterator; + std::optional<typename TContainer::const_reverse_iterator> ReverseIterator; + public: + TIterator(typename TContainer::const_iterator it) + : ForwardIterator(it) { + + } + + TIterator(typename TContainer::const_reverse_iterator it) + : ReverseIterator(it) { + + } + + TIterator operator++() { + if (ForwardIterator) { + return ++(*ForwardIterator); + } else { + Y_VERIFY(ReverseIterator); + return ++(*ReverseIterator); + } + } + + bool operator==(const TIterator& item) const { + if (ForwardIterator) { + Y_VERIFY(item.ForwardIterator); + return *ForwardIterator == *item.ForwardIterator; + } else { + Y_VERIFY(ReverseIterator); + Y_VERIFY(item.ReverseIterator); + return *ReverseIterator == *item.ReverseIterator; + } + } + + auto& operator*() { + if (ForwardIterator) { + return **ForwardIterator; + } else { + Y_VERIFY(ReverseIterator); + return **ReverseIterator; + } + } + }; + + TIterator begin() { + if (!Reverse) { + return Container.begin(); + } else { + return Container.rbegin(); + } + } + + TIterator end() { + if (!Reverse) { + return Container.end(); + } else { + return Container.rend(); + } + } +}; + +} diff --git a/ydb/core/tx/columnshard/common/ya.make b/ydb/core/tx/columnshard/common/ya.make new file mode 100644 index 0000000000..91a0d3a545 --- /dev/null +++ b/ydb/core/tx/columnshard/common/ya.make @@ -0,0 +1,10 @@ +LIBRARY() + +SRCS( + reverse_accessor.cpp +) + +PEERDIR( +) + +END() diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index fbc893ae5a..eb18bce1aa 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -12,6 +12,7 @@ #include <ydb/core/formats/arrow/replace_key.h> #include <ydb/core/tx/columnshard/blob.h> +#include <ydb/core/tx/columnshard/common/reverse_accessor.h> namespace NKikimr::NOlap { @@ -253,20 +254,12 @@ struct TSelectInfo { std::vector<TGranuleRecord> Granules; // ordered by key (ascending) std::vector<TPortionInfo> Portions; - std::vector<ui64> GranulesOrder(bool rev = false) const { - size_t size = Granules.size(); - std::vector<ui64> order(size); - if (rev) { - size_t pos = size - 1; - for (size_t i = 0; i < size; ++i, --pos) { - order[i] = Granules[pos].Granule; - } - } else { - for (size_t i = 0; i < size; ++i) { - order[i] = Granules[i].Granule; - } - } - return order; + NColumnShard::TContainerAccessorWithDirection<std::vector<TPortionInfo>> GetPortionsOrdered(const bool reverse) const { + return NColumnShard::TContainerAccessorWithDirection<std::vector<TPortionInfo>>(Portions, reverse); + } + + NColumnShard::TContainerAccessorWithDirection<std::vector<TGranuleRecord>> GetGranulesOrdered(bool reverse = false) const { + return NColumnShard::TContainerAccessorWithDirection<std::vector<TGranuleRecord>>(Granules, reverse); } size_t NumRecords() const { diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index b1b6e6c905..096a28471a 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -139,7 +139,7 @@ void TIndexedReadData::InitRead(ui32 inputBatch) { Y_VERIFY(!GranulesContext); GranulesContext = std::make_unique<NIndexedReader::TGranulesFillingContext>(ReadMetadata, *this, OnePhaseReadMode); ui64 portionsBytes = 0; - for (auto& portionInfo : ReadMetadata->SelectInfo->Portions) { + for (auto& portionInfo : ReadMetadata->SelectInfo->GetPortionsOrdered(ReadMetadata->IsDescSorted())) { portionsBytes += portionInfo.BlobsBytes(); Y_VERIFY_S(portionInfo.Records.size(), "ReadMeatadata: " << *ReadMetadata); diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp index 372efaaaf8..9a498b7966 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.cpp @@ -59,6 +59,10 @@ NKikimr::NColumnShard::TDataTasksProcessorContainer TGranulesFillingContext::Get } void TGranulesFillingContext::DrainNotIndexedBatches(THashMap<ui64, std::shared_ptr<arrow::RecordBatch>>* batches) { + if (NotIndexedBatchesInitialized) { + return; + } + NotIndexedBatchesInitialized = true; for (auto&& [_, gPtr] : GranulesWaiting) { if (!batches) { gPtr->AddNotIndexedBatch(nullptr); diff --git a/ydb/core/tx/columnshard/engines/reader/filling_context.h b/ydb/core/tx/columnshard/engines/reader/filling_context.h index 5465ac6727..f0edd8d1f6 100644 --- a/ydb/core/tx/columnshard/engines/reader/filling_context.h +++ b/ydb/core/tx/columnshard/engines/reader/filling_context.h @@ -13,9 +13,9 @@ namespace NKikimr::NOlap::NIndexedReader { class TGranulesFillingContext { private: YDB_READONLY_DEF(std::vector<std::string>, PKColumnNames); - bool AbortedFlag = false; TReadMetadata::TConstPtr ReadMetadata; const bool InternalReading = false; + bool NotIndexedBatchesInitialized = false; TIndexedReadData& Owner; THashMap<ui64, NIndexedReader::TGranule::TPtr> GranulesToOut; std::set<ui64> ReadyGranulesAccumulator; @@ -95,7 +95,6 @@ public: void Abort() { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "abort"); GranulesWaiting.clear(); - AbortedFlag = true; Y_VERIFY(!IsInProgress()); } @@ -110,7 +109,7 @@ public: void OnGranuleReady(const ui64 granuleId) { auto granule = GetGranuleVerified(granuleId); Y_VERIFY(GranulesToOut.emplace(granule->GetGranuleId(), granule).second); - Y_VERIFY(ReadyGranulesAccumulator.emplace(granule->GetGranuleId()).second || AbortedFlag); + Y_VERIFY(ReadyGranulesAccumulator.emplace(granule->GetGranuleId()).second); Y_VERIFY(GranulesWaiting.erase(granuleId)); GranulesInProcessing.erase(granule->GetGranuleId()); BlobsSizeInProcessing -= granule->GetBlobsDataSize(); diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp index 95642e73a1..d081209318 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_control/default.cpp @@ -4,9 +4,8 @@ namespace NKikimr::NOlap::NIndexedReader { void TAnySorting::DoFill(TGranulesFillingContext& context) { - auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted()); - for (ui64 granule : granulesOrder) { - TGranule::TPtr g = context.GetGranuleVerified(granule); + for (auto&& granule : ReadMetadata->SelectInfo->GetGranulesOrdered(ReadMetadata->IsDescSorted())) { + TGranule::TPtr g = context.GetGranuleVerified(granule.Granule); GranulesOutOrder.emplace_back(g); } } diff --git a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp index b38708eb4a..09b30f1838 100644 --- a/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp +++ b/ydb/core/tx/columnshard/engines/reader/order_control/pk_with_limit.cpp @@ -92,9 +92,8 @@ bool TPKSortingWithLimit::DoOnFilterReady(TBatch& /*batchInfo*/, const TGranule& } void TPKSortingWithLimit::DoFill(TGranulesFillingContext& context) { - auto granulesOrder = ReadMetadata->SelectInfo->GranulesOrder(ReadMetadata->IsDescSorted()); - for (ui64 granule : granulesOrder) { - TGranule::TPtr g = context.GetGranuleVerified(granule); + for (auto&& granule : ReadMetadata->SelectInfo->GetGranulesOrdered(ReadMetadata->IsDescSorted())) { + TGranule::TPtr g = context.GetGranuleVerified(granule.Granule); GranulesOutOrder.emplace_back(g); GranulesOutOrderForPortions.emplace_back(g->SortBatchesByPK(ReadMetadata->IsDescSorted(), ReadMetadata), g); } diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index 52cc9e0293..f9a61b5240 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -52,6 +52,7 @@ PEERDIR( ydb/core/tablet_flat ydb/core/tx/columnshard/engines ydb/core/tx/columnshard/counters + ydb/core/tx/columnshard/common ydb/core/tx/tiering ydb/core/tx/conveyor/usage ydb/core/tx/long_tx_service/public |