diff options
| author | ivanmorozov <[email protected]> | 2023-08-31 12:52:36 +0300 |
|---|---|---|
| committer | ivanmorozov <[email protected]> | 2023-08-31 13:26:05 +0300 |
| commit | 6339731c5f32baf347685cf790959e4c609dc97f (patch) | |
| tree | 51e7bba05009db04502f1ff8ec2b359c86a5aedf | |
| parent | d790e7c230d95322bb52471a02802f56feb459ae (diff) | |
KIKIMR-19218: TSpecialKeys for unify store and reuse important batch records
| -rw-r--r-- | ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt | 1 | ||||
| -rw-r--r-- | ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt | 1 | ||||
| -rw-r--r-- | ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt | 1 | ||||
| -rw-r--r-- | ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt | 1 | ||||
| -rw-r--r-- | ydb/core/formats/arrow/special_keys.cpp | 51 | ||||
| -rw-r--r-- | ydb/core/formats/arrow/special_keys.h | 47 | ||||
| -rw-r--r-- | ydb/core/formats/arrow/ya.make | 1 |
7 files changed, 103 insertions, 0 deletions
diff --git a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt index a2b306eb306..793eeea34be 100644 --- a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt @@ -49,4 +49,5 @@ target_sources(core-formats-arrow PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/replace_key.cpp ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/size_calcer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ssa_program_optimizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/special_keys.cpp ) diff --git a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt index 9db7b6a92ae..57ebad2a1ca 100644 --- a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt +++ b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt @@ -50,4 +50,5 @@ target_sources(core-formats-arrow PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/replace_key.cpp ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/size_calcer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ssa_program_optimizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/special_keys.cpp ) diff --git a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt index 9db7b6a92ae..57ebad2a1ca 100644 --- a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt @@ -50,4 +50,5 @@ target_sources(core-formats-arrow PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/replace_key.cpp ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/size_calcer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ssa_program_optimizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/special_keys.cpp ) diff --git a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt index ce82e9eac7e..ae91024fc63 100644 --- a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt +++ b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt @@ -49,4 +49,5 @@ target_sources(core-formats-arrow PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/replace_key.cpp ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/size_calcer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ssa_program_optimizer.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/special_keys.cpp ) diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp new file mode 100644 index 00000000000..4ea5149800e --- /dev/null +++ b/ydb/core/formats/arrow/special_keys.cpp @@ -0,0 +1,51 @@ +#include "special_keys.h" +#include <ydb/core/formats/arrow/serializer/full.h> +#include <ydb/core/formats/arrow/arrow_helpers.h> +#include <ydb/core/formats/arrow/arrow_filter.h> + +namespace NKikimr::NArrow { + +bool TSpecialKeys::DeserializeFromString(const TString& data) { + if (!data) { + return false; + } + Data = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TFullDataDeserializer().Deserialize(data)); + return !!Data; +} + +std::optional<NKikimr::NArrow::TReplaceKey> TSpecialKeys::GetKeyByIndex(const ui32 position, const std::shared_ptr<arrow::Schema>& schema) const { + Y_VERIFY(position < Data->num_rows()); + if (schema) { + return NArrow::TReplaceKey::FromBatch(Data, schema, position); + } else { + return NArrow::TReplaceKey::FromBatch(Data, position); + } +} + +TString TSpecialKeys::SerializeToString() const { + return NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(Data); +} + +TFirstLastSpecialKeys::TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames /*= {}*/) { + Y_VERIFY(batch); + Y_VERIFY(batch->num_rows()); + std::shared_ptr<arrow::RecordBatch> keyBatch = batch; + if (columnNames.size()) { + keyBatch = NArrow::ExtractColumns(batch, columnNames); + } + std::vector<bool> bits(batch->num_rows(), false); + bits[0] = true; + bits[batch->num_rows() - 1] = true; + + auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows()); + Data = NArrow::TStatusValidator::GetValid(arrow::compute::Filter(keyBatch, filter)).record_batch(); + Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2); +} + +TFirstLastSpecialKeys::TFirstLastSpecialKeys(const TString& data): TBase(data) { + Y_VERIFY(Data); + Y_VERIFY_DEBUG(Data->ValidateFull().ok()); + Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2); +} + +} diff --git a/ydb/core/formats/arrow/special_keys.h b/ydb/core/formats/arrow/special_keys.h new file mode 100644 index 00000000000..38451fdd7b6 --- /dev/null +++ b/ydb/core/formats/arrow/special_keys.h @@ -0,0 +1,47 @@ +#pragma once +#include <ydb/core/formats/arrow/replace_key.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> + +namespace NKikimr::NArrow { + +class TSpecialKeys { +protected: + std::shared_ptr<arrow::RecordBatch> Data; + + bool DeserializeFromString(const TString& data); + + std::optional<TReplaceKey> GetKeyByIndex(const ui32 position, const std::shared_ptr<arrow::Schema>& schema) const; + + TSpecialKeys() = default; + TSpecialKeys(std::shared_ptr<arrow::RecordBatch> data) + : Data(data) { + Y_VERIFY(Data); + Y_VERIFY(Data->num_rows()); + } + +public: + + TSpecialKeys(const TString& data) { + Y_VERIFY(DeserializeFromString(data)); + } + + TString SerializeToString() const; +}; + +class TFirstLastSpecialKeys: public TSpecialKeys { +private: + using TBase = TSpecialKeys; +public: + std::optional<TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema) const { + return GetKeyByIndex(0, schema); + } + std::optional<TReplaceKey> GetMax(const std::shared_ptr<arrow::Schema>& schema) const { + return GetKeyByIndex(Data->num_rows() - 1, schema); + } + + explicit TFirstLastSpecialKeys(const TString& data); + + explicit TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames = {}); +}; + +} diff --git a/ydb/core/formats/arrow/ya.make b/ydb/core/formats/arrow/ya.make index 37000a29807..4dbdc011d65 100644 --- a/ydb/core/formats/arrow/ya.make +++ b/ydb/core/formats/arrow/ya.make @@ -50,6 +50,7 @@ SRCS( size_calcer.cpp sort_cursor.h ssa_program_optimizer.cpp + special_keys.cpp ) END() |
