summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-08-31 12:52:36 +0300
committerivanmorozov <[email protected]>2023-08-31 13:26:05 +0300
commit6339731c5f32baf347685cf790959e4c609dc97f (patch)
tree51e7bba05009db04502f1ff8ec2b359c86a5aedf
parentd790e7c230d95322bb52471a02802f56feb459ae (diff)
KIKIMR-19218: TSpecialKeys for unify store and reuse important batch records
-rw-r--r--ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/formats/arrow/special_keys.cpp51
-rw-r--r--ydb/core/formats/arrow/special_keys.h47
-rw-r--r--ydb/core/formats/arrow/ya.make1
7 files changed, 103 insertions, 0 deletions
diff --git a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
index a2b306eb306..793eeea34be 100644
--- a/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.darwin-x86_64.txt
@@ -49,4 +49,5 @@ target_sources(core-formats-arrow PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/replace_key.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/size_calcer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ssa_program_optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/special_keys.cpp
)
diff --git a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
index 9db7b6a92ae..57ebad2a1ca 100644
--- a/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.linux-aarch64.txt
@@ -50,4 +50,5 @@ target_sources(core-formats-arrow PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/replace_key.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/size_calcer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ssa_program_optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/special_keys.cpp
)
diff --git a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
index 9db7b6a92ae..57ebad2a1ca 100644
--- a/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.linux-x86_64.txt
@@ -50,4 +50,5 @@ target_sources(core-formats-arrow PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/replace_key.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/size_calcer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ssa_program_optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/special_keys.cpp
)
diff --git a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
index ce82e9eac7e..ae91024fc63 100644
--- a/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/formats/arrow/CMakeLists.windows-x86_64.txt
@@ -49,4 +49,5 @@ target_sources(core-formats-arrow PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/replace_key.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/size_calcer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/ssa_program_optimizer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/special_keys.cpp
)
diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp
new file mode 100644
index 00000000000..4ea5149800e
--- /dev/null
+++ b/ydb/core/formats/arrow/special_keys.cpp
@@ -0,0 +1,51 @@
+#include "special_keys.h"
+#include <ydb/core/formats/arrow/serializer/full.h>
+#include <ydb/core/formats/arrow/arrow_helpers.h>
+#include <ydb/core/formats/arrow/arrow_filter.h>
+
+namespace NKikimr::NArrow {
+
+bool TSpecialKeys::DeserializeFromString(const TString& data) {
+ if (!data) {
+ return false;
+ }
+ Data = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TFullDataDeserializer().Deserialize(data));
+ return !!Data;
+}
+
+std::optional<NKikimr::NArrow::TReplaceKey> TSpecialKeys::GetKeyByIndex(const ui32 position, const std::shared_ptr<arrow::Schema>& schema) const {
+ Y_VERIFY(position < Data->num_rows());
+ if (schema) {
+ return NArrow::TReplaceKey::FromBatch(Data, schema, position);
+ } else {
+ return NArrow::TReplaceKey::FromBatch(Data, position);
+ }
+}
+
+TString TSpecialKeys::SerializeToString() const {
+ return NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(Data);
+}
+
+TFirstLastSpecialKeys::TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames /*= {}*/) {
+ Y_VERIFY(batch);
+ Y_VERIFY(batch->num_rows());
+ std::shared_ptr<arrow::RecordBatch> keyBatch = batch;
+ if (columnNames.size()) {
+ keyBatch = NArrow::ExtractColumns(batch, columnNames);
+ }
+ std::vector<bool> bits(batch->num_rows(), false);
+ bits[0] = true;
+ bits[batch->num_rows() - 1] = true;
+
+ auto filter = NArrow::TColumnFilter(std::move(bits)).BuildArrowFilter(batch->num_rows());
+ Data = NArrow::TStatusValidator::GetValid(arrow::compute::Filter(keyBatch, filter)).record_batch();
+ Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2);
+}
+
+TFirstLastSpecialKeys::TFirstLastSpecialKeys(const TString& data): TBase(data) {
+ Y_VERIFY(Data);
+ Y_VERIFY_DEBUG(Data->ValidateFull().ok());
+ Y_VERIFY(Data->num_rows() == 1 || Data->num_rows() == 2);
+}
+
+}
diff --git a/ydb/core/formats/arrow/special_keys.h b/ydb/core/formats/arrow/special_keys.h
new file mode 100644
index 00000000000..38451fdd7b6
--- /dev/null
+++ b/ydb/core/formats/arrow/special_keys.h
@@ -0,0 +1,47 @@
+#pragma once
+#include <ydb/core/formats/arrow/replace_key.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+
+namespace NKikimr::NArrow {
+
+class TSpecialKeys {
+protected:
+ std::shared_ptr<arrow::RecordBatch> Data;
+
+ bool DeserializeFromString(const TString& data);
+
+ std::optional<TReplaceKey> GetKeyByIndex(const ui32 position, const std::shared_ptr<arrow::Schema>& schema) const;
+
+ TSpecialKeys() = default;
+ TSpecialKeys(std::shared_ptr<arrow::RecordBatch> data)
+ : Data(data) {
+ Y_VERIFY(Data);
+ Y_VERIFY(Data->num_rows());
+ }
+
+public:
+
+ TSpecialKeys(const TString& data) {
+ Y_VERIFY(DeserializeFromString(data));
+ }
+
+ TString SerializeToString() const;
+};
+
+class TFirstLastSpecialKeys: public TSpecialKeys {
+private:
+ using TBase = TSpecialKeys;
+public:
+ std::optional<TReplaceKey> GetMin(const std::shared_ptr<arrow::Schema>& schema) const {
+ return GetKeyByIndex(0, schema);
+ }
+ std::optional<TReplaceKey> GetMax(const std::shared_ptr<arrow::Schema>& schema) const {
+ return GetKeyByIndex(Data->num_rows() - 1, schema);
+ }
+
+ explicit TFirstLastSpecialKeys(const TString& data);
+
+ explicit TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames = {});
+};
+
+}
diff --git a/ydb/core/formats/arrow/ya.make b/ydb/core/formats/arrow/ya.make
index 37000a29807..4dbdc011d65 100644
--- a/ydb/core/formats/arrow/ya.make
+++ b/ydb/core/formats/arrow/ya.make
@@ -50,6 +50,7 @@ SRCS(
size_calcer.cpp
sort_cursor.h
ssa_program_optimizer.cpp
+ special_keys.cpp
)
END()