aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-03-16 17:53:49 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-03-16 17:53:49 +0300
commitb40833909a29351baa2cbd151a23ebb10f9a601f (patch)
treed184915bd35cde21cee81d26c8b70f895323557a
parentc3ac866fbfd9872ea4afc6b2c15fc27cf384d06c (diff)
downloadydb-b40833909a29351baa2cbd151a23ebb10f9a601f.tar.gz
extract index from json
-rw-r--r--ydb/core/formats/arrow_helpers.cpp1
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp7
-rw-r--r--ydb/core/testlib/cs_helper.cpp53
-rw-r--r--ydb/core/testlib/cs_helper.h3
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp2
-rw-r--r--ydb/library/accessor/accessor.h3
-rw-r--r--ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt13
-rw-r--r--ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt13
-rw-r--r--ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt13
-rw-r--r--ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt13
-rw-r--r--ydb/services/ext_index/metadata/extractor/CMakeLists.darwin-x86_64.txt39
-rw-r--r--ydb/services/ext_index/metadata/extractor/CMakeLists.linux-aarch64.txt41
-rw-r--r--ydb/services/ext_index/metadata/extractor/CMakeLists.linux-x86_64.txt41
-rw-r--r--ydb/services/ext_index/metadata/extractor/CMakeLists.txt17
-rw-r--r--ydb/services/ext_index/metadata/extractor/CMakeLists.windows-x86_64.txt39
-rw-r--r--ydb/services/ext_index/metadata/extractor/abstract.cpp5
-rw-r--r--ydb/services/ext_index/metadata/extractor/abstract.h34
-rw-r--r--ydb/services/ext_index/metadata/extractor/city.cpp161
-rw-r--r--ydb/services/ext_index/metadata/extractor/city.h54
-rw-r--r--ydb/services/ext_index/metadata/extractor/container.cpp5
-rw-r--r--ydb/services/ext_index/metadata/extractor/container.h69
-rw-r--r--ydb/services/ext_index/metadata/extractor/ya.make17
-rw-r--r--ydb/services/ext_index/metadata/object.cpp8
-rw-r--r--ydb/services/ext_index/metadata/object.h130
-rw-r--r--ydb/services/ext_index/metadata/ya.make6
-rw-r--r--ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/services/ext_index/service/add_index.cpp2
-rw-r--r--ydb/services/ext_index/service/ya.make1
-rw-r--r--ydb/services/ext_index/ut/ut_ext_index.cpp40
32 files changed, 637 insertions, 197 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp
index fd8a2c8ba8..c2fe9828f5 100644
--- a/ydb/core/formats/arrow_helpers.cpp
+++ b/ydb/core/formats/arrow_helpers.cpp
@@ -1112,7 +1112,6 @@ static bool ConvertColumn(std::shared_ptr<arrow::Array>& column, std::shared_ptr
auto value = binaryArray.Value(i);
const auto binaryJson = NBinaryJson::SerializeToBinaryJson(TStringBuf(value.data(), value.size()));
if (!binaryJson.Defined() || !builder.Append(binaryJson->Data(), binaryJson->Size()).ok()) {
- ALS_ERROR(0) << "NOT PARSED JSON: " << TStringBuf(value.data(), value.size());
return false;
}
}
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index 579dc8076d..ce71e56a89 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -76,7 +76,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
%s
}
}
- )", storeName.c_str(), storeShardsCount, PROTO_SCHEMA));
+ )", storeName.c_str(), storeShardsCount, GetTestTableSchema().data()));
TString shardingColumns = "[\"timestamp\", \"uid\"]";
if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") {
@@ -3799,10 +3799,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
//EnableDebugLogging(kikimr);
- auto& server = kikimr.GetTestServer();
auto tableClient = kikimr.GetTableClient();
- Tests::NCommon::THelper lHelper(server);
-
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto query = TStringBuilder() << R"(
@@ -3856,7 +3853,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST(OlapUpsert) {
- TestOlapUpsert(2); // it should lead to planned tx
+ TestOlapUpsert(2);
}
Y_UNIT_TEST(OlapDeleteImmediate) {
diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp
index 38723048be..0d9784e7a5 100644
--- a/ydb/core/testlib/cs_helper.cpp
+++ b/ydb/core/testlib/cs_helper.cpp
@@ -101,14 +101,16 @@ void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBeg
//
std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() {
- return std::make_shared<arrow::Schema>(
- std::vector<std::shared_ptr<arrow::Field>>{
- arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)),
- arrow::field("resource_id", arrow::utf8()),
- arrow::field("uid", arrow::utf8()),
- arrow::field("level", arrow::int32()),
- arrow::field("message", arrow::utf8())
- });
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ fields.emplace_back(arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)));
+ fields.emplace_back(arrow::field("resource_id", arrow::utf8()));
+ fields.emplace_back(arrow::field("uid", arrow::utf8()));
+ fields.emplace_back(arrow::field("level", arrow::int32()));
+ fields.emplace_back(arrow::field("message", arrow::utf8()));
+ if (GetWithJsonDocument()) {
+ fields.emplace_back(arrow::field("json_payload", arrow::utf8()));
+ }
+ return std::make_shared<arrow::Schema>(fields);
}
std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) {
@@ -119,6 +121,12 @@ std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui
arrow::StringBuilder b3;
arrow::Int32Builder b4;
arrow::StringBuilder b5;
+ arrow::StringBuilder b6;
+
+ NJson::TJsonValue jsonInfo;
+ jsonInfo["a"]["b"] = 1;
+ jsonInfo["a"]["c"] = "asds";
+ jsonInfo["b"] = "asd";
for (size_t i = 0; i < rowCount; ++i) {
std::string uid("uid_" + std::to_string(tsBegin + i));
@@ -128,6 +136,9 @@ std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui
Y_VERIFY(b3.Append(uid).ok());
Y_VERIFY(b4.Append(i % 5).ok());
Y_VERIFY(b5.Append(message).ok());
+ jsonInfo["a"]["b"] = i;
+ auto jsonStringBase = jsonInfo.GetStringRobust();
+ Y_VERIFY(b6.Append(jsonStringBase.data(), jsonStringBase.size()).ok());
}
std::shared_ptr<arrow::TimestampArray> a1;
@@ -135,14 +146,38 @@ std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui
std::shared_ptr<arrow::StringArray> a3;
std::shared_ptr<arrow::Int32Array> a4;
std::shared_ptr<arrow::StringArray> a5;
+ std::shared_ptr<arrow::StringArray> a6;
Y_VERIFY(b1.Finish(&a1).ok());
Y_VERIFY(b2.Finish(&a2).ok());
Y_VERIFY(b3.Finish(&a3).ok());
Y_VERIFY(b4.Finish(&a4).ok());
Y_VERIFY(b5.Finish(&a5).ok());
+ Y_VERIFY(b6.Finish(&a6).ok());
- return arrow::RecordBatch::Make(schema, rowCount, { a1, a2, a3, a4, a5 });
+ if (GetWithJsonDocument()) {
+ return arrow::RecordBatch::Make(schema, rowCount, { a1, a2, a3, a4, a5, a6 });
+ } else {
+ return arrow::RecordBatch::Make(schema, rowCount, { a1, a2, a3, a4, a5 });
+ }
+
+}
+
+TString THelper::GetTestTableSchema() const {
+ TStringBuilder sb;
+ sb << R"(Columns{ Name: "timestamp" Type : "Timestamp" NotNull : true })";
+ sb << R"(Columns{ Name: "resource_id" Type : "Utf8" })";
+ sb << R"(Columns{ Name: "uid" Type : "Utf8" })";
+ sb << R"(Columns{ Name: "level" Type : "Int32" })";
+ sb << R"(Columns{ Name: "message" Type : "Utf8" })";
+ if (GetWithJsonDocument()) {
+ sb << R"(Columns{ Name: "json_payload" Type : "JsonDocument" })";
+ }
+ sb << R"(
+ KeyColumnNames: "timestamp"
+ Engine : COLUMN_ENGINE_REPLACING_TIMESERIES
+ )";
+ return sb;
}
// Clickbench table
diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h
index 1e7738c9e1..b171f66051 100644
--- a/ydb/core/testlib/cs_helper.h
+++ b/ydb/core/testlib/cs_helper.h
@@ -25,6 +25,7 @@ private:
using TBase = THelperSchemaless;
std::shared_ptr<arrow::Schema> GetArrowSchema();
+ YDB_FLAG_ACCESSOR(WithJsonDocument, false);
public:
using TBase::TBase;
@@ -43,6 +44,8 @@ public:
Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
)";
+ TString GetTestTableSchema() const;
+
std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) override;
};
diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index e71c4ffb04..3a79d3b387 100644
--- a/ydb/core/tx/tiering/ut/ut_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -42,7 +42,7 @@ public:
%s
}
}
- )", storeName.c_str(), storeShardsCount, PROTO_SCHEMA));
+ )", storeName.c_str(), storeShardsCount, GetTestTableSchema().data()));
TString shardingColumns = "[\"timestamp\", \"uid\"]";
if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") {
diff --git a/ydb/library/accessor/accessor.h b/ydb/library/accessor/accessor.h
index 28c78ef491..715f840f10 100644
--- a/ydb/library/accessor/accessor.h
+++ b/ydb/library/accessor/accessor.h
@@ -18,6 +18,9 @@ public:\
name ## Flag = value;\
return *this;\
}\
+ bool Get##name() const noexcept {\
+ return name ## Flag;\
+ }\
private:
namespace NYDBAccessor {
diff --git a/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt
index ab68de15b7..e88ff51742 100644
--- a/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt
@@ -6,11 +6,9 @@
# original buildsystem will not be accepted.
+add_subdirectory(extractor)
add_library(services-ext_index-metadata)
-target_compile_options(services-ext_index-metadata PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
target_link_libraries(services-ext_index-metadata PUBLIC
contrib-libs-cxxsupp
yutil
@@ -20,9 +18,10 @@ target_link_libraries(services-ext_index-metadata PUBLIC
core-grpc_services-base
ydb-core-grpc_services
services-metadata-request
- core-tx-sharding
+ ext_index-metadata-extractor
)
target_sources(services-ext_index-metadata PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp
@@ -30,9 +29,6 @@ target_sources(services-ext_index-metadata PRIVATE
)
add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata)
-target_compile_options(services-ext_index-metadata.global PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
target_link_libraries(services-ext_index-metadata.global PUBLIC
contrib-libs-cxxsupp
yutil
@@ -42,9 +38,8 @@ target_link_libraries(services-ext_index-metadata.global PUBLIC
core-grpc_services-base
ydb-core-grpc_services
services-metadata-request
- core-tx-sharding
+ ext_index-metadata-extractor
)
target_sources(services-ext_index-metadata.global PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp
)
diff --git a/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt
index 3d32b2799c..9001838817 100644
--- a/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt
@@ -6,11 +6,9 @@
# original buildsystem will not be accepted.
+add_subdirectory(extractor)
add_library(services-ext_index-metadata)
-target_compile_options(services-ext_index-metadata PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
target_link_libraries(services-ext_index-metadata PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
@@ -21,9 +19,10 @@ target_link_libraries(services-ext_index-metadata PUBLIC
core-grpc_services-base
ydb-core-grpc_services
services-metadata-request
- core-tx-sharding
+ ext_index-metadata-extractor
)
target_sources(services-ext_index-metadata PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp
@@ -31,9 +30,6 @@ target_sources(services-ext_index-metadata PRIVATE
)
add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata)
-target_compile_options(services-ext_index-metadata.global PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
target_link_libraries(services-ext_index-metadata.global PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
@@ -44,9 +40,8 @@ target_link_libraries(services-ext_index-metadata.global PUBLIC
core-grpc_services-base
ydb-core-grpc_services
services-metadata-request
- core-tx-sharding
+ ext_index-metadata-extractor
)
target_sources(services-ext_index-metadata.global PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp
)
diff --git a/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt
index 3d32b2799c..9001838817 100644
--- a/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt
@@ -6,11 +6,9 @@
# original buildsystem will not be accepted.
+add_subdirectory(extractor)
add_library(services-ext_index-metadata)
-target_compile_options(services-ext_index-metadata PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
target_link_libraries(services-ext_index-metadata PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
@@ -21,9 +19,10 @@ target_link_libraries(services-ext_index-metadata PUBLIC
core-grpc_services-base
ydb-core-grpc_services
services-metadata-request
- core-tx-sharding
+ ext_index-metadata-extractor
)
target_sources(services-ext_index-metadata PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp
@@ -31,9 +30,6 @@ target_sources(services-ext_index-metadata PRIVATE
)
add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata)
-target_compile_options(services-ext_index-metadata.global PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
target_link_libraries(services-ext_index-metadata.global PUBLIC
contrib-libs-linux-headers
contrib-libs-cxxsupp
@@ -44,9 +40,8 @@ target_link_libraries(services-ext_index-metadata.global PUBLIC
core-grpc_services-base
ydb-core-grpc_services
services-metadata-request
- core-tx-sharding
+ ext_index-metadata-extractor
)
target_sources(services-ext_index-metadata.global PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp
)
diff --git a/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt
index ab68de15b7..e88ff51742 100644
--- a/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt
@@ -6,11 +6,9 @@
# original buildsystem will not be accepted.
+add_subdirectory(extractor)
add_library(services-ext_index-metadata)
-target_compile_options(services-ext_index-metadata PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
target_link_libraries(services-ext_index-metadata PUBLIC
contrib-libs-cxxsupp
yutil
@@ -20,9 +18,10 @@ target_link_libraries(services-ext_index-metadata PUBLIC
core-grpc_services-base
ydb-core-grpc_services
services-metadata-request
- core-tx-sharding
+ ext_index-metadata-extractor
)
target_sources(services-ext_index-metadata PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp
@@ -30,9 +29,6 @@ target_sources(services-ext_index-metadata PRIVATE
)
add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata)
-target_compile_options(services-ext_index-metadata.global PRIVATE
- -DUSE_CURRENT_UDF_ABI_VERSION
-)
target_link_libraries(services-ext_index-metadata.global PUBLIC
contrib-libs-cxxsupp
yutil
@@ -42,9 +38,8 @@ target_link_libraries(services-ext_index-metadata.global PUBLIC
core-grpc_services-base
ydb-core-grpc_services
services-metadata-request
- core-tx-sharding
+ ext_index-metadata-extractor
)
target_sources(services-ext_index-metadata.global PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp
${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp
)
diff --git a/ydb/services/ext_index/metadata/extractor/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/metadata/extractor/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..213856d9fd
--- /dev/null
+++ b/ydb/services/ext_index/metadata/extractor/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,39 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ext_index-metadata-extractor)
+target_compile_options(ext_index-metadata-extractor PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ext_index-metadata-extractor PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-tx-sharding
+)
+target_sources(ext_index-metadata-extractor PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/container.cpp
+)
+
+add_global_library_for(ext_index-metadata-extractor.global ext_index-metadata-extractor)
+target_compile_options(ext_index-metadata-extractor.global PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ext_index-metadata-extractor.global PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-tx-sharding
+)
+target_sources(ext_index-metadata-extractor.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/city.cpp
+)
diff --git a/ydb/services/ext_index/metadata/extractor/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/metadata/extractor/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..c2abc92f8b
--- /dev/null
+++ b/ydb/services/ext_index/metadata/extractor/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,41 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ext_index-metadata-extractor)
+target_compile_options(ext_index-metadata-extractor PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ext_index-metadata-extractor PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-tx-sharding
+)
+target_sources(ext_index-metadata-extractor PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/container.cpp
+)
+
+add_global_library_for(ext_index-metadata-extractor.global ext_index-metadata-extractor)
+target_compile_options(ext_index-metadata-extractor.global PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ext_index-metadata-extractor.global PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-tx-sharding
+)
+target_sources(ext_index-metadata-extractor.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/city.cpp
+)
diff --git a/ydb/services/ext_index/metadata/extractor/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/metadata/extractor/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..c2abc92f8b
--- /dev/null
+++ b/ydb/services/ext_index/metadata/extractor/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,41 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ext_index-metadata-extractor)
+target_compile_options(ext_index-metadata-extractor PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ext_index-metadata-extractor PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-tx-sharding
+)
+target_sources(ext_index-metadata-extractor PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/container.cpp
+)
+
+add_global_library_for(ext_index-metadata-extractor.global ext_index-metadata-extractor)
+target_compile_options(ext_index-metadata-extractor.global PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ext_index-metadata-extractor.global PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-tx-sharding
+)
+target_sources(ext_index-metadata-extractor.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/city.cpp
+)
diff --git a/ydb/services/ext_index/metadata/extractor/CMakeLists.txt b/ydb/services/ext_index/metadata/extractor/CMakeLists.txt
new file mode 100644
index 0000000000..d90657116d
--- /dev/null
+++ b/ydb/services/ext_index/metadata/extractor/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/services/ext_index/metadata/extractor/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/metadata/extractor/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..213856d9fd
--- /dev/null
+++ b/ydb/services/ext_index/metadata/extractor/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,39 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ext_index-metadata-extractor)
+target_compile_options(ext_index-metadata-extractor PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ext_index-metadata-extractor PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-tx-sharding
+)
+target_sources(ext_index-metadata-extractor PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/abstract.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/container.cpp
+)
+
+add_global_library_for(ext_index-metadata-extractor.global ext_index-metadata-extractor)
+target_compile_options(ext_index-metadata-extractor.global PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(ext_index-metadata-extractor.global PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ libs-apache-arrow
+ ydb-core-protos
+ core-tx-sharding
+)
+target_sources(ext_index-metadata-extractor.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/city.cpp
+)
diff --git a/ydb/services/ext_index/metadata/extractor/abstract.cpp b/ydb/services/ext_index/metadata/extractor/abstract.cpp
new file mode 100644
index 0000000000..60bfde925b
--- /dev/null
+++ b/ydb/services/ext_index/metadata/extractor/abstract.cpp
@@ -0,0 +1,5 @@
+#include "abstract.h"
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+}
diff --git a/ydb/services/ext_index/metadata/extractor/abstract.h b/ydb/services/ext_index/metadata/extractor/abstract.h
new file mode 100644
index 0000000000..05d21bde42
--- /dev/null
+++ b/ydb/services/ext_index/metadata/extractor/abstract.h
@@ -0,0 +1,34 @@
+#pragma once
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <library/cpp/object_factory/object_factory.h>
+#include <library/cpp/json/writer/json_value.h>
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+class IIndexExtractor {
+protected:
+ virtual std::vector<ui64> DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0;
+ virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0;
+ virtual NJson::TJsonValue DoSerializeToJson() const = 0;
+public:
+ using TPtr = std::shared_ptr<IIndexExtractor>;
+ using TFactory = NObjectFactory::TObjectFactory<IIndexExtractor, TString>;
+
+ virtual ~IIndexExtractor() = default;
+
+ std::vector<ui64> ExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ return DoExtractIndex(batch);
+ }
+
+ bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
+ return DoDeserializeFromJson(jsonInfo);
+ }
+
+ NJson::TJsonValue SerializeToJson() const {
+ return DoSerializeToJson();
+ }
+
+ virtual TString GetClassName() const = 0;
+};
+
+}
diff --git a/ydb/services/ext_index/metadata/extractor/city.cpp b/ydb/services/ext_index/metadata/extractor/city.cpp
new file mode 100644
index 0000000000..d9b5150871
--- /dev/null
+++ b/ydb/services/ext_index/metadata/extractor/city.cpp
@@ -0,0 +1,161 @@
+#include "city.h"
+#include <ydb/core/protos/services.pb.h>
+#include <ydb/core/tx/sharding/sharding.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+#include <ydb/library/yql/minikql/jsonpath/jsonpath.h>
+#include <ydb/library/yql/minikql/jsonpath/value.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_binary.h>
+#include <library/cpp/json/fast_sax/parser.h>
+#include <library/cpp/actors/core/log.h>
+#include <util/string/split.h>
+#include <util/string/join.h>
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+TExtractorCityHash64::TFactory::TRegistrator<TExtractorCityHash64> TExtractorCityHash64::Registrator(TExtractorCityHash64::ClassName);
+
+template <class TArrayBuilder>
+class TArrayInserter {
+protected:
+ TArrayBuilder ArrayBuilder;
+public:
+ TArrayInserter(const ui32 rowsCount) {
+ YQL_ENSURE(ArrayBuilder.Reserve(rowsCount).ok());
+ }
+
+ std::shared_ptr<arrow::Array> BuildColumn() {
+ std::shared_ptr<arrow::StringArray> result;
+ if (!ArrayBuilder.Finish(&result).ok()) {
+ ALS_ERROR(NKikimrServices::EXT_INDEX) << "cannot build array for hash calculation";
+ return nullptr;
+ }
+ return result;
+ }
+};
+
+class TStringArrayInserter: public TArrayInserter<arrow::StringBuilder> {
+private:
+ using TBase = TArrayInserter<arrow::StringBuilder>;
+public:
+ using TBase::TBase;
+ void OnFound(const TStringBuf& value) {
+ YQL_ENSURE(ArrayBuilder.Append(value.data(), value.size()).ok());
+ }
+};
+
+
+
+std::vector<ui64> TExtractorCityHash64::DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ auto schema = batch->schema();
+ std::vector<std::shared_ptr<arrow::Field>> fields;
+ std::vector<std::shared_ptr<arrow::Array>> columns;
+ std::vector<TString> fieldIds;
+ for (ui32 i = 0; i < Fields.size(); ++i) {
+ auto c = batch->GetColumnByName(Fields[i].GetFieldId());
+ auto f = schema->GetFieldByName(Fields[i].GetFieldId());
+ if (!c || !f) {
+ ALS_ERROR(NKikimrServices::EXT_INDEX) << "incorrect field name in batch: " << Fields[i].GetFieldId();
+ return {};
+ }
+ if (!Fields[i].GetJsonPath()) {
+ fields.emplace_back(f);
+ columns.emplace_back(c);
+ fieldIds.emplace_back(f->name());
+ } else if (c->type()->id() == arrow::Type::STRING) {
+ ALS_ERROR(NKikimrServices::EXT_INDEX) << "json have not been simple string. it must be JsonDocument for " << Fields[i].GetFieldId();
+ return {};
+ } else if (c->type()->id() == arrow::Type::BINARY) {
+ auto typedColumn = std::static_pointer_cast<arrow::BinaryArray>(c);
+
+ std::shared_ptr<TStringArrayInserter> fetcher = std::make_shared<TStringArrayInserter>(batch->num_rows());
+ TVector<TString> values;
+ for (ui32 r = 0; r < batch->num_rows(); ++r) {
+ auto sv = typedColumn->Value(r);
+ TStringBuf sb(sv.data(), sv.size());
+ auto reader = NBinaryJson::TBinaryJsonReader::Make(sb);
+ auto binaryJsonRoot = NYql::NJsonPath::TValue(reader->GetRootCursor());
+
+ NYql::TIssues issues;
+ const NYql::NJsonPath::TJsonPathPtr jsonPath = NYql::NJsonPath::ParseJsonPath(Fields[i].GetJsonPath(), issues, 100);
+ if (!issues.Empty()) {
+ ALS_ERROR(NKikimrServices::EXT_INDEX) << "cannot parse path for json extraction: " << issues.ToString();
+ return {};
+ }
+
+ const auto result = NYql::NJsonPath::ExecuteJsonPath(jsonPath, binaryJsonRoot, NYql::NJsonPath::TVariablesMap{}, nullptr);
+ if (result.IsError()) {
+ ALS_ERROR(NKikimrServices::EXT_INDEX) << "Runtime errors found on json path usage: " << result.GetError().ToString();
+ return {};
+ }
+
+ const auto& nodes = result.GetNodes();
+ for (size_t i = 0; i < nodes.size(); i++) {
+ switch (nodes[i].GetType()) {
+ case NYql::NJsonPath::EValueType::Bool:
+ values.emplace_back(::ToString(nodes[i].GetBool()));
+ break;
+ case NYql::NJsonPath::EValueType::Number:
+ values.emplace_back(::ToString(nodes[i].GetNumber()));
+ break;
+ case NYql::NJsonPath::EValueType::String:
+ values.emplace_back(nodes[i].GetString());
+ break;
+ case NYql::NJsonPath::EValueType::Null:
+ values.emplace_back("");
+ case NYql::NJsonPath::EValueType::Object:
+ case NYql::NJsonPath::EValueType::Array:
+ ALS_ERROR(NKikimrServices::EXT_INDEX) << "Cannot use object and array as hash param for index construction";
+ return {};
+ }
+ }
+ fetcher->OnFound(JoinSeq(",", values));
+ values.clear();
+ }
+ fieldIds.emplace_back(Fields[i].GetFullId());
+ fields.emplace_back(std::make_shared<arrow::Field>(Fields[i].GetFullId(), std::make_shared<arrow::StringType>()));
+ columns.emplace_back(fetcher->BuildColumn());
+ } else {
+ ALS_ERROR(NKikimrServices::EXT_INDEX) << "incorrect column type for json extraction: " << Fields[i].GetFieldId();
+ return {};
+ }
+ }
+ auto sBuilder = std::make_shared<arrow::SchemaBuilder>(fields);
+ auto newSchema = sBuilder->Finish();
+ if (!newSchema.ok()) {
+ ALS_ERROR(NKikimrServices::EXT_INDEX) << "cannot build new schema";
+ return {};
+ }
+ auto newBatch = arrow::RecordBatch::Make(*newSchema, batch->num_rows(), columns);
+ NSharding::THashSharding hashSharding(0, fieldIds);
+ return hashSharding.MakeHashes(newBatch);
+}
+
+bool TExtractorCityHash64::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
+ const NJson::TJsonValue::TArray* jsonFields;
+ if (!jsonInfo["fields"].GetArrayPointer(&jsonFields)) {
+ return false;
+ }
+ for (auto&& i : *jsonFields) {
+ TExtractorField field;
+ if (!field.DeserializeFromJson(i)) {
+ return false;
+ }
+ Fields.emplace_back(field);
+ }
+ if (Fields.size() == 0) {
+ return false;
+ }
+ return true;
+}
+
+NJson::TJsonValue TExtractorCityHash64::DoSerializeToJson() const {
+ NJson::TJsonValue result;
+ auto& jsonFields = result.InsertValue("fields", NJson::JSON_ARRAY);
+ for (auto&& i : Fields) {
+ jsonFields.AppendValue(i.SerializeToJson());
+ }
+ return result;
+}
+
+}
diff --git a/ydb/services/ext_index/metadata/extractor/city.h b/ydb/services/ext_index/metadata/extractor/city.h
new file mode 100644
index 0000000000..99011b0a92
--- /dev/null
+++ b/ydb/services/ext_index/metadata/extractor/city.h
@@ -0,0 +1,54 @@
+#pragma once
+#include "abstract.h"
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+class TExtractorField {
+private:
+ YDB_ACCESSOR_DEF(TString, FieldId);
+ YDB_ACCESSOR_DEF(TString, JsonPath);
+public:
+ TString GetFullId() const {
+ if (JsonPath) {
+ return "json:" + FieldId + ":" + JsonPath;
+ } else {
+ return FieldId;
+ }
+ }
+ NJson::TJsonValue SerializeToJson() const {
+ NJson::TJsonValue result(NJson::JSON_MAP);
+ result.InsertValue("id", FieldId);
+ result.InsertValue("path", JsonPath);
+ return result;
+ }
+ bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
+ if (!jsonInfo["id"].GetString(&FieldId)) {
+ return false;
+ }
+ if (jsonInfo.Has("path")) {
+ if (!jsonInfo["path"].GetString(&JsonPath)) {
+ return false;
+ }
+ }
+ return true;
+ }
+};
+
+class TExtractorCityHash64: public IIndexExtractor {
+private:
+ YDB_READONLY_DEF(std::vector<TExtractorField>, Fields);
+ static TFactory::TRegistrator<TExtractorCityHash64> Registrator;
+protected:
+ virtual std::vector<ui64> DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+ virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override;
+ virtual NJson::TJsonValue DoSerializeToJson() const override;
+public:
+ static inline TString ClassName = "city64";
+
+ virtual TString GetClassName() const override {
+ return ClassName;
+ }
+};
+
+}
diff --git a/ydb/services/ext_index/metadata/extractor/container.cpp b/ydb/services/ext_index/metadata/extractor/container.cpp
new file mode 100644
index 0000000000..b31ddf8688
--- /dev/null
+++ b/ydb/services/ext_index/metadata/extractor/container.cpp
@@ -0,0 +1,5 @@
+#include "container.h"
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+}
diff --git a/ydb/services/ext_index/metadata/extractor/container.h b/ydb/services/ext_index/metadata/extractor/container.h
new file mode 100644
index 0000000000..af3ae0216d
--- /dev/null
+++ b/ydb/services/ext_index/metadata/extractor/container.h
@@ -0,0 +1,69 @@
+#pragma once
+#include <library/cpp/json/writer/json_value.h>
+#include <library/cpp/json/json_reader.h>
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+template <class TInterface>
+class TInterfaceContainer {
+private:
+ using TPtr = typename TInterface::TPtr;
+ using TFactory = typename TInterface::TFactory;
+ TPtr Object;
+public:
+ TInterfaceContainer() = default;
+
+ explicit TInterfaceContainer(TPtr object)
+ : Object(object)
+ {
+
+ }
+
+ const TInterface* operator->() const {
+ return Object.get();
+ }
+
+ TInterface* operator->() {
+ return Object.get();
+ }
+
+ TString DebugString() const {
+ return SerializeToJson().GetStringRobust();
+ }
+
+ bool DeserializeFromJson(const TString& jsonString) {
+ NJson::TJsonValue jsonInfo;
+ if (!NJson::ReadJsonFastTree(jsonString, &jsonInfo)) {
+ return false;
+ }
+ return DeserializeFromJson(jsonInfo);
+ }
+
+ bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
+ TString className;
+ if (!jsonInfo["class_name"].GetString(&className)) {
+ return false;
+ }
+ TPtr result(TFactory::Construct(className));
+ if (!result) {
+ return false;
+ }
+ if (!result->DeserializeFromJson(jsonInfo["object"])) {
+ return false;
+ }
+ Object = result;
+ return true;
+ }
+
+ NJson::TJsonValue SerializeToJson() const {
+ if (!Object) {
+ return NJson::JSON_NULL;
+ }
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("class_name", Object->GetClassName());
+ result.InsertValue("object", Object->SerializeToJson());
+ return result;
+ }
+};
+
+}
diff --git a/ydb/services/ext_index/metadata/extractor/ya.make b/ydb/services/ext_index/metadata/extractor/ya.make
new file mode 100644
index 0000000000..7d673a81f7
--- /dev/null
+++ b/ydb/services/ext_index/metadata/extractor/ya.make
@@ -0,0 +1,17 @@
+LIBRARY()
+
+SRCS(
+ abstract.cpp
+ GLOBAL city.cpp
+ container.cpp
+)
+
+PEERDIR(
+ contrib/libs/apache/arrow
+ ydb/core/protos
+ ydb/core/tx/sharding
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/services/ext_index/metadata/object.cpp b/ydb/services/ext_index/metadata/object.cpp
index 85185cd5bc..609e0357cc 100644
--- a/ydb/services/ext_index/metadata/object.cpp
+++ b/ydb/services/ext_index/metadata/object.cpp
@@ -1,14 +1,11 @@
#include "object.h"
#include "behaviour.h"
-#include <ydb/core/tx/sharding/sharding.h>
#include <ydb/services/metadata/manager/ydb_value_operator.h>
#include <util/folder/path.h>
namespace NKikimr::NMetadata::NCSIndex {
-TExtractorCityHash64::TFactory::TRegistrator<TExtractorCityHash64> TExtractorCityHash64::Registrator(TExtractorCityHash64::ClassName);
-
IClassBehaviour::TPtr TObject::GetBehaviour() {
return TBehaviour::GetInstance();
}
@@ -84,9 +81,4 @@ bool TObject::TryProvideTtl(const NKikimrSchemeOp::TColumnTableDescription& csDe
return true;
}
-std::vector<ui64> TExtractorCityHash64::DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const {
- NSharding::THashSharding hashSharding(0, Fields);
- return hashSharding.MakeHashes(batch);
-}
-
}
diff --git a/ydb/services/ext_index/metadata/object.h b/ydb/services/ext_index/metadata/object.h
index f188029d05..b56c8e67c3 100644
--- a/ydb/services/ext_index/metadata/object.h
+++ b/ydb/services/ext_index/metadata/object.h
@@ -10,137 +10,11 @@
#include <library/cpp/json/writer/json_value.h>
#include <library/cpp/json/json_reader.h>
#include <util/string/cast.h>
+#include "extractor/abstract.h"
+#include "extractor/container.h"
namespace NKikimr::NMetadata::NCSIndex {
-class IIndexExtractor {
-protected:
- virtual std::vector<ui64> DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0;
- virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0;
- virtual NJson::TJsonValue DoSerializeToJson() const = 0;
-public:
- using TPtr = std::shared_ptr<IIndexExtractor>;
- using TFactory = NObjectFactory::TObjectFactory<IIndexExtractor, TString>;
-
- virtual ~IIndexExtractor() = default;
-
- std::vector<ui64> ExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const {
- return DoExtractIndex(batch);
- }
-
- bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
- return DoDeserializeFromJson(jsonInfo);
- }
-
- NJson::TJsonValue SerializeToJson() const {
- return DoSerializeToJson();
- }
-
- virtual TString GetClassName() const = 0;
-};
-
-class TExtractorCityHash64: public IIndexExtractor {
-private:
- YDB_READONLY_DEF(std::vector<TString>, Fields);
- static TFactory::TRegistrator<TExtractorCityHash64> Registrator;
-protected:
- virtual std::vector<ui64> DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
- virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override {
- const NJson::TJsonValue::TArray* jsonFields;
- if (!jsonInfo["fields"].GetArrayPointer(&jsonFields)) {
- return false;
- }
- for (auto&& i : *jsonFields) {
- TString fieldId;
- if (!i["id"].GetString(&fieldId)) {
- return false;
- }
- Fields.emplace_back(fieldId);
- }
- if (Fields.size() == 0) {
- return false;
- }
- return true;
- }
- virtual NJson::TJsonValue DoSerializeToJson() const override {
- NJson::TJsonValue result;
- auto& jsonFields = result.InsertValue("fields", NJson::JSON_ARRAY);
- for (auto&& i : Fields) {
- auto& jsonField = jsonFields.AppendValue(NJson::JSON_MAP);
- jsonField.InsertValue("id", i);
- }
- return result;
- }
-public:
- static inline TString ClassName = "city64";
-
- virtual TString GetClassName() const override {
- return ClassName;
- }
-};
-
-template <class TInterface>
-class TInterfaceContainer {
-private:
- using TPtr = typename TInterface::TPtr;
- using TFactory = typename TInterface::TFactory;
- TPtr Object;
-public:
- TInterfaceContainer() = default;
-
- explicit TInterfaceContainer(TPtr object)
- : Object(object)
- {
-
- }
-
- const TInterface* operator->() const {
- return Object.get();
- }
-
- TInterface* operator->() {
- return Object.get();
- }
-
- TString DebugString() const {
- return SerializeToJson().GetStringRobust();
- }
-
- bool DeserializeFromJson(const TString& jsonString) {
- NJson::TJsonValue jsonInfo;
- if (!NJson::ReadJsonFastTree(jsonString, &jsonInfo)) {
- return false;
- }
- return DeserializeFromJson(jsonInfo);
- }
-
- bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
- TString className;
- if (!jsonInfo["class_name"].GetString(&className)) {
- return false;
- }
- TPtr result(TFactory::Construct(className));
- if (!result) {
- return false;
- }
- if (!result->DeserializeFromJson(jsonInfo["object"])) {
- return false;
- }
- Object = result;
- return true;
- }
-
- NJson::TJsonValue SerializeToJson() const {
- if (!Object) {
- return NJson::JSON_NULL;
- }
- NJson::TJsonValue result = NJson::JSON_MAP;
- result.InsertValue("class_name", Object->GetClassName());
- result.InsertValue("object", Object->SerializeToJson());
- return result;
- }
-};
-
class TObject: public NModifications::TObject<TObject> {
private:
YDB_READONLY_DEF(TString, IndexId);
diff --git a/ydb/services/ext_index/metadata/ya.make b/ydb/services/ext_index/metadata/ya.make
index 3b6b097ff2..6284eac2d0 100644
--- a/ydb/services/ext_index/metadata/ya.make
+++ b/ydb/services/ext_index/metadata/ya.make
@@ -1,7 +1,7 @@
LIBRARY()
SRCS(
- GLOBAL object.cpp
+ object.cpp
GLOBAL behaviour.cpp
manager.cpp
initializer.cpp
@@ -16,9 +16,7 @@ PEERDIR(
ydb/core/grpc_services/base
ydb/core/grpc_services
ydb/services/metadata/request
- ydb/core/tx/sharding
+ ydb/services/ext_index/metadata/extractor
)
-YQL_LAST_ABI_VERSION()
-
END()
diff --git a/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt
index 5f0e02728b..ee55c1e81c 100644
--- a/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt
+++ b/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt
@@ -14,6 +14,7 @@ target_link_libraries(services-ext_index-service PUBLIC
cpp-actors-core
services-ext_index-metadata
services-ext_index-common
+ yql-minikql-jsonpath
api-protos
)
target_sources(services-ext_index-service PRIVATE
diff --git a/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt
index 814de1a95c..b3532697f8 100644
--- a/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt
@@ -15,6 +15,7 @@ target_link_libraries(services-ext_index-service PUBLIC
cpp-actors-core
services-ext_index-metadata
services-ext_index-common
+ yql-minikql-jsonpath
api-protos
)
target_sources(services-ext_index-service PRIVATE
diff --git a/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt
index 814de1a95c..b3532697f8 100644
--- a/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt
+++ b/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt
@@ -15,6 +15,7 @@ target_link_libraries(services-ext_index-service PUBLIC
cpp-actors-core
services-ext_index-metadata
services-ext_index-common
+ yql-minikql-jsonpath
api-protos
)
target_sources(services-ext_index-service PRIVATE
diff --git a/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt
index 5f0e02728b..ee55c1e81c 100644
--- a/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt
+++ b/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt
@@ -14,6 +14,7 @@ target_link_libraries(services-ext_index-service PUBLIC
cpp-actors-core
services-ext_index-metadata
services-ext_index-common
+ yql-minikql-jsonpath
api-protos
)
target_sources(services-ext_index-service PRIVATE
diff --git a/ydb/services/ext_index/service/add_index.cpp b/ydb/services/ext_index/service/add_index.cpp
index c15eb50a73..0ead4ed16f 100644
--- a/ydb/services/ext_index/service/add_index.cpp
+++ b/ydb/services/ext_index/service/add_index.cpp
@@ -66,7 +66,7 @@ void TIndexUpsertActor::Bootstrap() {
const std::vector<ui64> hashes = IndexInfo.GetExtractor()->ExtractIndex(Data);
if (hashes.size() != (size_t)Data->num_rows()) {
- ExternalController->OnIndexUpsertionFailed("inconsistency hashes");
+ ExternalController->OnIndexUpsertionFailed("inconsistency hashes: " + ::ToString(hashes.size()) + " != " + ::ToString(Data->num_rows()));
PassAway();
return;
}
diff --git a/ydb/services/ext_index/service/ya.make b/ydb/services/ext_index/service/ya.make
index 18171d97b1..b3966eec80 100644
--- a/ydb/services/ext_index/service/ya.make
+++ b/ydb/services/ext_index/service/ya.make
@@ -12,6 +12,7 @@ PEERDIR(
library/cpp/actors/core
ydb/services/ext_index/metadata
ydb/services/ext_index/common
+ ydb/library/yql/minikql/jsonpath
ydb/public/api/protos
)
diff --git a/ydb/services/ext_index/ut/ut_ext_index.cpp b/ydb/services/ext_index/ut/ut_ext_index.cpp
index 74ffdf3259..d6f0c97b41 100644
--- a/ydb/services/ext_index/ut/ut_ext_index.cpp
+++ b/ydb/services/ext_index/ut/ut_ext_index.cpp
@@ -6,6 +6,7 @@
#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
#include <ydb/core/wrappers/s3_wrapper.h>
#include <ydb/core/wrappers/fake_storage.h>
+#include <ydb/core/tx/sharding/xx_hash.h>
#include <ydb/library/accessor/accessor.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/services/metadata/manager/alter.h>
@@ -19,6 +20,7 @@
#include <library/cpp/testing/unittest/registar.h>
#include <util/system/hostname.h>
+#include <contrib/libs/xxhash/xxhash.h>
namespace NKikimr {
@@ -42,7 +44,7 @@ public:
%s
}
}
- )", storeName.c_str(), storeShardsCount, PROTO_SCHEMA));
+ )", storeName.c_str(), storeShardsCount, GetTestTableSchema().data()));
TString shardingColumns = "[\"timestamp\", \"uid\"]";
if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") {
@@ -86,7 +88,6 @@ Y_UNIT_TEST_SUITE(ExternalIndex) {
.SetEnableExternalIndex(true)
.SetEnableBackgroundTasks(true)
.SetEnableOlapSchemaOperations(true);
- ;
;
Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
@@ -106,16 +107,22 @@ Y_UNIT_TEST_SUITE(ExternalIndex) {
// runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
TLocalHelper lHelper(*server);
+ lHelper.SetWithJsonDocument(true);
lHelper.CreateTestOlapTable("olapTable", 2);
lHelper.StartSchemaRequest("CREATE OBJECT `/Root/olapStore/olapTable:ext_index_simple` ( "
- "TYPE CS_EXT_INDEX) WITH (extractor = `{\"class_name\" : \"city64\", \"object\" : {\"fields\" : [{\"id\":\"uid\"}, {\"id\":\"message\"}]}}`)");
+ "TYPE CS_EXT_INDEX) WITH (extractor = `{\"class_name\" : \"city64\", \"object\" :"
+ "{\"fields\" : [{\"id\":\"uid\"}, {\"id\":\"level\"}, {\"id\":\"json_payload\", \"path\" : \"strict $.a.b\"}]}}`)");
+ lHelper.StartSchemaRequest("CREATE OBJECT `/Root/olapStore/olapTable:ext_index_simple1` ( "
+ "TYPE CS_EXT_INDEX) WITH (extractor = `{\"class_name\" : \"city64\", \"object\" :"
+ "{\"fields\" : [{\"id\":\"uid\"}]}}`)");
Cerr << "Wait tables" << Endl;
runtime.SimulateSleep(TDuration::Seconds(20));
Cerr << "Initialization tables" << Endl;
const TInstant pkStart = Now() - TDuration::Days(15);
ui32 idx = 0;
- auto batch = lHelper.TestArrowBatch(0, (pkStart + TDuration::Seconds(2 * idx++)).GetValue(), 6000);
+ const ui32 tsStart = (pkStart + TDuration::Seconds(2 * idx++)).GetValue();
+ auto batch = lHelper.TestArrowBatch(0, tsStart, 6000);
auto batchSize = NArrow::GetBatchDataSize(batch);
Cerr << "Inserting " << batchSize << " bytes..." << Endl;
UNIT_ASSERT(batchSize > 4 * 1024 * 1024); // NColumnShard::TLimits::MIN_BYTES_TO_INSERT
@@ -128,13 +135,38 @@ Y_UNIT_TEST_SUITE(ExternalIndex) {
{
TString resultData;
lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple`", true, &resultData);
+ Cerr << resultData << Endl;
+ UNIT_ASSERT_EQUAL(resultData, "[6000u]");
+ }
+ {
+ NSharding::TStreamStringHashCalcer calcer(0);
+ calcer.Start();
+ TString resultData;
+ TString uid = "uid_" + ::ToString(tsStart + 2);
+ ui32 level = 2;
+ TString id = "2";
+ calcer.Update((const ui8*)uid.data(), uid.size());
+ calcer.Update((const ui8*)&level, sizeof(level));
+ calcer.Update((const ui8*)id.data(), id.size());
+ const ui64 hash = calcer.Finish();
+ lHelper.StartDataRequest("SELECT * FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple` WHERE index_hash = " +
+ ::ToString(hash), true, &resultData);
+ Cerr << resultData << "/" << tsStart << Endl;
+ UNIT_ASSERT_EQUAL(resultData, "[[" + ::ToString(hash) + "u];[" + ::ToString(tsStart + 2) + "u]]");
+ }
+ {
+ TString resultData;
+ lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple1`", true, &resultData);
+ Cerr << resultData << Endl;
UNIT_ASSERT_EQUAL(resultData, "[6000u]");
}
lHelper.StartSchemaRequest("DROP OBJECT `/Root/olapStore/olapTable:ext_index_simple` (TYPE CS_EXT_INDEX)");
+ lHelper.StartSchemaRequest("DROP OBJECT `/Root/olapStore/olapTable:ext_index_simple1` (TYPE CS_EXT_INDEX)");
for (ui32 i = 0; i < 10; ++i) {
server->GetRuntime()->SimulateSleep(TDuration::Seconds(10));
}
lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple`", false);
+ lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple1`", false);
{
TString resultData;
lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/external`", true, &resultData);