diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-16 17:53:49 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-16 17:53:49 +0300 |
commit | b40833909a29351baa2cbd151a23ebb10f9a601f (patch) | |
tree | d184915bd35cde21cee81d26c8b70f895323557a | |
parent | c3ac866fbfd9872ea4afc6b2c15fc27cf384d06c (diff) | |
download | ydb-b40833909a29351baa2cbd151a23ebb10f9a601f.tar.gz |
extract index from json
32 files changed, 637 insertions, 197 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index fd8a2c8ba8..c2fe9828f5 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -1112,7 +1112,6 @@ static bool ConvertColumn(std::shared_ptr<arrow::Array>& column, std::shared_ptr auto value = binaryArray.Value(i); const auto binaryJson = NBinaryJson::SerializeToBinaryJson(TStringBuf(value.data(), value.size())); if (!binaryJson.Defined() || !builder.Append(binaryJson->Data(), binaryJson->Size()).ok()) { - ALS_ERROR(0) << "NOT PARSED JSON: " << TStringBuf(value.data(), value.size()); return false; } } diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 579dc8076d..ce71e56a89 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -76,7 +76,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { %s } } - )", storeName.c_str(), storeShardsCount, PROTO_SCHEMA)); + )", storeName.c_str(), storeShardsCount, GetTestTableSchema().data())); TString shardingColumns = "[\"timestamp\", \"uid\"]"; if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") { @@ -3799,10 +3799,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { //EnableDebugLogging(kikimr); - auto& server = kikimr.GetTestServer(); auto tableClient = kikimr.GetTableClient(); - Tests::NCommon::THelper lHelper(server); - auto session = tableClient.CreateSession().GetValueSync().GetSession(); auto query = TStringBuilder() << R"( @@ -3856,7 +3853,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST(OlapUpsert) { - TestOlapUpsert(2); // it should lead to planned tx + TestOlapUpsert(2); } Y_UNIT_TEST(OlapDeleteImmediate) { diff --git a/ydb/core/testlib/cs_helper.cpp b/ydb/core/testlib/cs_helper.cpp index 38723048be..0d9784e7a5 100644 --- a/ydb/core/testlib/cs_helper.cpp +++ b/ydb/core/testlib/cs_helper.cpp @@ -101,14 +101,16 @@ void THelperSchemaless::SendDataViaActorSystem(TString testTable, ui64 pathIdBeg // std::shared_ptr<arrow::Schema> THelper::GetArrowSchema() { - return std::make_shared<arrow::Schema>( - std::vector<std::shared_ptr<arrow::Field>>{ - arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO)), - arrow::field("resource_id", arrow::utf8()), - arrow::field("uid", arrow::utf8()), - arrow::field("level", arrow::int32()), - arrow::field("message", arrow::utf8()) - }); + std::vector<std::shared_ptr<arrow::Field>> fields; + fields.emplace_back(arrow::field("timestamp", arrow::timestamp(arrow::TimeUnit::TimeUnit::MICRO))); + fields.emplace_back(arrow::field("resource_id", arrow::utf8())); + fields.emplace_back(arrow::field("uid", arrow::utf8())); + fields.emplace_back(arrow::field("level", arrow::int32())); + fields.emplace_back(arrow::field("message", arrow::utf8())); + if (GetWithJsonDocument()) { + fields.emplace_back(arrow::field("json_payload", arrow::utf8())); + } + return std::make_shared<arrow::Schema>(fields); } std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) { @@ -119,6 +121,12 @@ std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui arrow::StringBuilder b3; arrow::Int32Builder b4; arrow::StringBuilder b5; + arrow::StringBuilder b6; + + NJson::TJsonValue jsonInfo; + jsonInfo["a"]["b"] = 1; + jsonInfo["a"]["c"] = "asds"; + jsonInfo["b"] = "asd"; for (size_t i = 0; i < rowCount; ++i) { std::string uid("uid_" + std::to_string(tsBegin + i)); @@ -128,6 +136,9 @@ std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui Y_VERIFY(b3.Append(uid).ok()); Y_VERIFY(b4.Append(i % 5).ok()); Y_VERIFY(b5.Append(message).ok()); + jsonInfo["a"]["b"] = i; + auto jsonStringBase = jsonInfo.GetStringRobust(); + Y_VERIFY(b6.Append(jsonStringBase.data(), jsonStringBase.size()).ok()); } std::shared_ptr<arrow::TimestampArray> a1; @@ -135,14 +146,38 @@ std::shared_ptr<arrow::RecordBatch> THelper::TestArrowBatch(ui64 pathIdBegin, ui std::shared_ptr<arrow::StringArray> a3; std::shared_ptr<arrow::Int32Array> a4; std::shared_ptr<arrow::StringArray> a5; + std::shared_ptr<arrow::StringArray> a6; Y_VERIFY(b1.Finish(&a1).ok()); Y_VERIFY(b2.Finish(&a2).ok()); Y_VERIFY(b3.Finish(&a3).ok()); Y_VERIFY(b4.Finish(&a4).ok()); Y_VERIFY(b5.Finish(&a5).ok()); + Y_VERIFY(b6.Finish(&a6).ok()); - return arrow::RecordBatch::Make(schema, rowCount, { a1, a2, a3, a4, a5 }); + if (GetWithJsonDocument()) { + return arrow::RecordBatch::Make(schema, rowCount, { a1, a2, a3, a4, a5, a6 }); + } else { + return arrow::RecordBatch::Make(schema, rowCount, { a1, a2, a3, a4, a5 }); + } + +} + +TString THelper::GetTestTableSchema() const { + TStringBuilder sb; + sb << R"(Columns{ Name: "timestamp" Type : "Timestamp" NotNull : true })"; + sb << R"(Columns{ Name: "resource_id" Type : "Utf8" })"; + sb << R"(Columns{ Name: "uid" Type : "Utf8" })"; + sb << R"(Columns{ Name: "level" Type : "Int32" })"; + sb << R"(Columns{ Name: "message" Type : "Utf8" })"; + if (GetWithJsonDocument()) { + sb << R"(Columns{ Name: "json_payload" Type : "JsonDocument" })"; + } + sb << R"( + KeyColumnNames: "timestamp" + Engine : COLUMN_ENGINE_REPLACING_TIMESERIES + )"; + return sb; } // Clickbench table diff --git a/ydb/core/testlib/cs_helper.h b/ydb/core/testlib/cs_helper.h index 1e7738c9e1..b171f66051 100644 --- a/ydb/core/testlib/cs_helper.h +++ b/ydb/core/testlib/cs_helper.h @@ -25,6 +25,7 @@ private: using TBase = THelperSchemaless; std::shared_ptr<arrow::Schema> GetArrowSchema(); + YDB_FLAG_ACCESSOR(WithJsonDocument, false); public: using TBase::TBase; @@ -43,6 +44,8 @@ public: Engine: COLUMN_ENGINE_REPLACING_TIMESERIES )"; + TString GetTestTableSchema() const; + std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount) override; }; diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp index e71c4ffb04..3a79d3b387 100644 --- a/ydb/core/tx/tiering/ut/ut_tiers.cpp +++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp @@ -42,7 +42,7 @@ public: %s } } - )", storeName.c_str(), storeShardsCount, PROTO_SCHEMA)); + )", storeName.c_str(), storeShardsCount, GetTestTableSchema().data())); TString shardingColumns = "[\"timestamp\", \"uid\"]"; if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") { diff --git a/ydb/library/accessor/accessor.h b/ydb/library/accessor/accessor.h index 28c78ef491..715f840f10 100644 --- a/ydb/library/accessor/accessor.h +++ b/ydb/library/accessor/accessor.h @@ -18,6 +18,9 @@ public:\ name ## Flag = value;\ return *this;\ }\ + bool Get##name() const noexcept {\ + return name ## Flag;\ + }\ private: namespace NYDBAccessor { diff --git a/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt index ab68de15b7..e88ff51742 100644 --- a/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt @@ -6,11 +6,9 @@ # original buildsystem will not be accepted. +add_subdirectory(extractor) add_library(services-ext_index-metadata) -target_compile_options(services-ext_index-metadata PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) target_link_libraries(services-ext_index-metadata PUBLIC contrib-libs-cxxsupp yutil @@ -20,9 +18,10 @@ target_link_libraries(services-ext_index-metadata PUBLIC core-grpc_services-base ydb-core-grpc_services services-metadata-request - core-tx-sharding + ext_index-metadata-extractor ) target_sources(services-ext_index-metadata PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp @@ -30,9 +29,6 @@ target_sources(services-ext_index-metadata PRIVATE ) add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata) -target_compile_options(services-ext_index-metadata.global PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) target_link_libraries(services-ext_index-metadata.global PUBLIC contrib-libs-cxxsupp yutil @@ -42,9 +38,8 @@ target_link_libraries(services-ext_index-metadata.global PUBLIC core-grpc_services-base ydb-core-grpc_services services-metadata-request - core-tx-sharding + ext_index-metadata-extractor ) target_sources(services-ext_index-metadata.global PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp ) diff --git a/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt index 3d32b2799c..9001838817 100644 --- a/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt +++ b/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt @@ -6,11 +6,9 @@ # original buildsystem will not be accepted. +add_subdirectory(extractor) add_library(services-ext_index-metadata) -target_compile_options(services-ext_index-metadata PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) target_link_libraries(services-ext_index-metadata PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp @@ -21,9 +19,10 @@ target_link_libraries(services-ext_index-metadata PUBLIC core-grpc_services-base ydb-core-grpc_services services-metadata-request - core-tx-sharding + ext_index-metadata-extractor ) target_sources(services-ext_index-metadata PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp @@ -31,9 +30,6 @@ target_sources(services-ext_index-metadata PRIVATE ) add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata) -target_compile_options(services-ext_index-metadata.global PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) target_link_libraries(services-ext_index-metadata.global PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp @@ -44,9 +40,8 @@ target_link_libraries(services-ext_index-metadata.global PUBLIC core-grpc_services-base ydb-core-grpc_services services-metadata-request - core-tx-sharding + ext_index-metadata-extractor ) target_sources(services-ext_index-metadata.global PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp ) diff --git a/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt index 3d32b2799c..9001838817 100644 --- a/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt +++ b/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt @@ -6,11 +6,9 @@ # original buildsystem will not be accepted. +add_subdirectory(extractor) add_library(services-ext_index-metadata) -target_compile_options(services-ext_index-metadata PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) target_link_libraries(services-ext_index-metadata PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp @@ -21,9 +19,10 @@ target_link_libraries(services-ext_index-metadata PUBLIC core-grpc_services-base ydb-core-grpc_services services-metadata-request - core-tx-sharding + ext_index-metadata-extractor ) target_sources(services-ext_index-metadata PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp @@ -31,9 +30,6 @@ target_sources(services-ext_index-metadata PRIVATE ) add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata) -target_compile_options(services-ext_index-metadata.global PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) target_link_libraries(services-ext_index-metadata.global PUBLIC contrib-libs-linux-headers contrib-libs-cxxsupp @@ -44,9 +40,8 @@ target_link_libraries(services-ext_index-metadata.global PUBLIC core-grpc_services-base ydb-core-grpc_services services-metadata-request - core-tx-sharding + ext_index-metadata-extractor ) target_sources(services-ext_index-metadata.global PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp ) diff --git a/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt index ab68de15b7..e88ff51742 100644 --- a/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt +++ b/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt @@ -6,11 +6,9 @@ # original buildsystem will not be accepted. +add_subdirectory(extractor) add_library(services-ext_index-metadata) -target_compile_options(services-ext_index-metadata PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) target_link_libraries(services-ext_index-metadata PUBLIC contrib-libs-cxxsupp yutil @@ -20,9 +18,10 @@ target_link_libraries(services-ext_index-metadata PUBLIC core-grpc_services-base ydb-core-grpc_services services-metadata-request - core-tx-sharding + ext_index-metadata-extractor ) target_sources(services-ext_index-metadata PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp @@ -30,9 +29,6 @@ target_sources(services-ext_index-metadata PRIVATE ) add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata) -target_compile_options(services-ext_index-metadata.global PRIVATE - -DUSE_CURRENT_UDF_ABI_VERSION -) target_link_libraries(services-ext_index-metadata.global PUBLIC contrib-libs-cxxsupp yutil @@ -42,9 +38,8 @@ target_link_libraries(services-ext_index-metadata.global PUBLIC core-grpc_services-base ydb-core-grpc_services services-metadata-request - core-tx-sharding + ext_index-metadata-extractor ) target_sources(services-ext_index-metadata.global PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp ) diff --git a/ydb/services/ext_index/metadata/extractor/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/metadata/extractor/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..213856d9fd --- /dev/null +++ b/ydb/services/ext_index/metadata/extractor/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,39 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ext_index-metadata-extractor) +target_compile_options(ext_index-metadata-extractor PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ext_index-metadata-extractor PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-tx-sharding +) +target_sources(ext_index-metadata-extractor PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/container.cpp +) + +add_global_library_for(ext_index-metadata-extractor.global ext_index-metadata-extractor) +target_compile_options(ext_index-metadata-extractor.global PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ext_index-metadata-extractor.global PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-tx-sharding +) +target_sources(ext_index-metadata-extractor.global PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/city.cpp +) diff --git a/ydb/services/ext_index/metadata/extractor/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/metadata/extractor/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..c2abc92f8b --- /dev/null +++ b/ydb/services/ext_index/metadata/extractor/CMakeLists.linux-aarch64.txt @@ -0,0 +1,41 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ext_index-metadata-extractor) +target_compile_options(ext_index-metadata-extractor PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ext_index-metadata-extractor PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-tx-sharding +) +target_sources(ext_index-metadata-extractor PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/container.cpp +) + +add_global_library_for(ext_index-metadata-extractor.global ext_index-metadata-extractor) +target_compile_options(ext_index-metadata-extractor.global PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ext_index-metadata-extractor.global PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-tx-sharding +) +target_sources(ext_index-metadata-extractor.global PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/city.cpp +) diff --git a/ydb/services/ext_index/metadata/extractor/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/metadata/extractor/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..c2abc92f8b --- /dev/null +++ b/ydb/services/ext_index/metadata/extractor/CMakeLists.linux-x86_64.txt @@ -0,0 +1,41 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ext_index-metadata-extractor) +target_compile_options(ext_index-metadata-extractor PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ext_index-metadata-extractor PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-tx-sharding +) +target_sources(ext_index-metadata-extractor PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/container.cpp +) + +add_global_library_for(ext_index-metadata-extractor.global ext_index-metadata-extractor) +target_compile_options(ext_index-metadata-extractor.global PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ext_index-metadata-extractor.global PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-tx-sharding +) +target_sources(ext_index-metadata-extractor.global PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/city.cpp +) diff --git a/ydb/services/ext_index/metadata/extractor/CMakeLists.txt b/ydb/services/ext_index/metadata/extractor/CMakeLists.txt new file mode 100644 index 0000000000..d90657116d --- /dev/null +++ b/ydb/services/ext_index/metadata/extractor/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/services/ext_index/metadata/extractor/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/metadata/extractor/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..213856d9fd --- /dev/null +++ b/ydb/services/ext_index/metadata/extractor/CMakeLists.windows-x86_64.txt @@ -0,0 +1,39 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ext_index-metadata-extractor) +target_compile_options(ext_index-metadata-extractor PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ext_index-metadata-extractor PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-tx-sharding +) +target_sources(ext_index-metadata-extractor PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/abstract.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/container.cpp +) + +add_global_library_for(ext_index-metadata-extractor.global ext_index-metadata-extractor) +target_compile_options(ext_index-metadata-extractor.global PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(ext_index-metadata-extractor.global PUBLIC + contrib-libs-cxxsupp + yutil + libs-apache-arrow + ydb-core-protos + core-tx-sharding +) +target_sources(ext_index-metadata-extractor.global PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/extractor/city.cpp +) diff --git a/ydb/services/ext_index/metadata/extractor/abstract.cpp b/ydb/services/ext_index/metadata/extractor/abstract.cpp new file mode 100644 index 0000000000..60bfde925b --- /dev/null +++ b/ydb/services/ext_index/metadata/extractor/abstract.cpp @@ -0,0 +1,5 @@ +#include "abstract.h" + +namespace NKikimr::NMetadata::NCSIndex { + +} diff --git a/ydb/services/ext_index/metadata/extractor/abstract.h b/ydb/services/ext_index/metadata/extractor/abstract.h new file mode 100644 index 0000000000..05d21bde42 --- /dev/null +++ b/ydb/services/ext_index/metadata/extractor/abstract.h @@ -0,0 +1,34 @@ +#pragma once +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <library/cpp/object_factory/object_factory.h> +#include <library/cpp/json/writer/json_value.h> + +namespace NKikimr::NMetadata::NCSIndex { + +class IIndexExtractor { +protected: + virtual std::vector<ui64> DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0; + virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0; + virtual NJson::TJsonValue DoSerializeToJson() const = 0; +public: + using TPtr = std::shared_ptr<IIndexExtractor>; + using TFactory = NObjectFactory::TObjectFactory<IIndexExtractor, TString>; + + virtual ~IIndexExtractor() = default; + + std::vector<ui64> ExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const { + return DoExtractIndex(batch); + } + + bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + return DoDeserializeFromJson(jsonInfo); + } + + NJson::TJsonValue SerializeToJson() const { + return DoSerializeToJson(); + } + + virtual TString GetClassName() const = 0; +}; + +} diff --git a/ydb/services/ext_index/metadata/extractor/city.cpp b/ydb/services/ext_index/metadata/extractor/city.cpp new file mode 100644 index 0000000000..d9b5150871 --- /dev/null +++ b/ydb/services/ext_index/metadata/extractor/city.cpp @@ -0,0 +1,161 @@ +#include "city.h" +#include <ydb/core/protos/services.pb.h> +#include <ydb/core/tx/sharding/sharding.h> +#include <ydb/library/yql/utils/yql_panic.h> +#include <ydb/library/yql/minikql/jsonpath/jsonpath.h> +#include <ydb/library/yql/minikql/jsonpath/value.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/array/builder_binary.h> +#include <library/cpp/json/fast_sax/parser.h> +#include <library/cpp/actors/core/log.h> +#include <util/string/split.h> +#include <util/string/join.h> + +namespace NKikimr::NMetadata::NCSIndex { + +TExtractorCityHash64::TFactory::TRegistrator<TExtractorCityHash64> TExtractorCityHash64::Registrator(TExtractorCityHash64::ClassName); + +template <class TArrayBuilder> +class TArrayInserter { +protected: + TArrayBuilder ArrayBuilder; +public: + TArrayInserter(const ui32 rowsCount) { + YQL_ENSURE(ArrayBuilder.Reserve(rowsCount).ok()); + } + + std::shared_ptr<arrow::Array> BuildColumn() { + std::shared_ptr<arrow::StringArray> result; + if (!ArrayBuilder.Finish(&result).ok()) { + ALS_ERROR(NKikimrServices::EXT_INDEX) << "cannot build array for hash calculation"; + return nullptr; + } + return result; + } +}; + +class TStringArrayInserter: public TArrayInserter<arrow::StringBuilder> { +private: + using TBase = TArrayInserter<arrow::StringBuilder>; +public: + using TBase::TBase; + void OnFound(const TStringBuf& value) { + YQL_ENSURE(ArrayBuilder.Append(value.data(), value.size()).ok()); + } +}; + + + +std::vector<ui64> TExtractorCityHash64::DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const { + auto schema = batch->schema(); + std::vector<std::shared_ptr<arrow::Field>> fields; + std::vector<std::shared_ptr<arrow::Array>> columns; + std::vector<TString> fieldIds; + for (ui32 i = 0; i < Fields.size(); ++i) { + auto c = batch->GetColumnByName(Fields[i].GetFieldId()); + auto f = schema->GetFieldByName(Fields[i].GetFieldId()); + if (!c || !f) { + ALS_ERROR(NKikimrServices::EXT_INDEX) << "incorrect field name in batch: " << Fields[i].GetFieldId(); + return {}; + } + if (!Fields[i].GetJsonPath()) { + fields.emplace_back(f); + columns.emplace_back(c); + fieldIds.emplace_back(f->name()); + } else if (c->type()->id() == arrow::Type::STRING) { + ALS_ERROR(NKikimrServices::EXT_INDEX) << "json have not been simple string. it must be JsonDocument for " << Fields[i].GetFieldId(); + return {}; + } else if (c->type()->id() == arrow::Type::BINARY) { + auto typedColumn = std::static_pointer_cast<arrow::BinaryArray>(c); + + std::shared_ptr<TStringArrayInserter> fetcher = std::make_shared<TStringArrayInserter>(batch->num_rows()); + TVector<TString> values; + for (ui32 r = 0; r < batch->num_rows(); ++r) { + auto sv = typedColumn->Value(r); + TStringBuf sb(sv.data(), sv.size()); + auto reader = NBinaryJson::TBinaryJsonReader::Make(sb); + auto binaryJsonRoot = NYql::NJsonPath::TValue(reader->GetRootCursor()); + + NYql::TIssues issues; + const NYql::NJsonPath::TJsonPathPtr jsonPath = NYql::NJsonPath::ParseJsonPath(Fields[i].GetJsonPath(), issues, 100); + if (!issues.Empty()) { + ALS_ERROR(NKikimrServices::EXT_INDEX) << "cannot parse path for json extraction: " << issues.ToString(); + return {}; + } + + const auto result = NYql::NJsonPath::ExecuteJsonPath(jsonPath, binaryJsonRoot, NYql::NJsonPath::TVariablesMap{}, nullptr); + if (result.IsError()) { + ALS_ERROR(NKikimrServices::EXT_INDEX) << "Runtime errors found on json path usage: " << result.GetError().ToString(); + return {}; + } + + const auto& nodes = result.GetNodes(); + for (size_t i = 0; i < nodes.size(); i++) { + switch (nodes[i].GetType()) { + case NYql::NJsonPath::EValueType::Bool: + values.emplace_back(::ToString(nodes[i].GetBool())); + break; + case NYql::NJsonPath::EValueType::Number: + values.emplace_back(::ToString(nodes[i].GetNumber())); + break; + case NYql::NJsonPath::EValueType::String: + values.emplace_back(nodes[i].GetString()); + break; + case NYql::NJsonPath::EValueType::Null: + values.emplace_back(""); + case NYql::NJsonPath::EValueType::Object: + case NYql::NJsonPath::EValueType::Array: + ALS_ERROR(NKikimrServices::EXT_INDEX) << "Cannot use object and array as hash param for index construction"; + return {}; + } + } + fetcher->OnFound(JoinSeq(",", values)); + values.clear(); + } + fieldIds.emplace_back(Fields[i].GetFullId()); + fields.emplace_back(std::make_shared<arrow::Field>(Fields[i].GetFullId(), std::make_shared<arrow::StringType>())); + columns.emplace_back(fetcher->BuildColumn()); + } else { + ALS_ERROR(NKikimrServices::EXT_INDEX) << "incorrect column type for json extraction: " << Fields[i].GetFieldId(); + return {}; + } + } + auto sBuilder = std::make_shared<arrow::SchemaBuilder>(fields); + auto newSchema = sBuilder->Finish(); + if (!newSchema.ok()) { + ALS_ERROR(NKikimrServices::EXT_INDEX) << "cannot build new schema"; + return {}; + } + auto newBatch = arrow::RecordBatch::Make(*newSchema, batch->num_rows(), columns); + NSharding::THashSharding hashSharding(0, fieldIds); + return hashSharding.MakeHashes(newBatch); +} + +bool TExtractorCityHash64::DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + const NJson::TJsonValue::TArray* jsonFields; + if (!jsonInfo["fields"].GetArrayPointer(&jsonFields)) { + return false; + } + for (auto&& i : *jsonFields) { + TExtractorField field; + if (!field.DeserializeFromJson(i)) { + return false; + } + Fields.emplace_back(field); + } + if (Fields.size() == 0) { + return false; + } + return true; +} + +NJson::TJsonValue TExtractorCityHash64::DoSerializeToJson() const { + NJson::TJsonValue result; + auto& jsonFields = result.InsertValue("fields", NJson::JSON_ARRAY); + for (auto&& i : Fields) { + jsonFields.AppendValue(i.SerializeToJson()); + } + return result; +} + +} diff --git a/ydb/services/ext_index/metadata/extractor/city.h b/ydb/services/ext_index/metadata/extractor/city.h new file mode 100644 index 0000000000..99011b0a92 --- /dev/null +++ b/ydb/services/ext_index/metadata/extractor/city.h @@ -0,0 +1,54 @@ +#pragma once +#include "abstract.h" +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadata::NCSIndex { + +class TExtractorField { +private: + YDB_ACCESSOR_DEF(TString, FieldId); + YDB_ACCESSOR_DEF(TString, JsonPath); +public: + TString GetFullId() const { + if (JsonPath) { + return "json:" + FieldId + ":" + JsonPath; + } else { + return FieldId; + } + } + NJson::TJsonValue SerializeToJson() const { + NJson::TJsonValue result(NJson::JSON_MAP); + result.InsertValue("id", FieldId); + result.InsertValue("path", JsonPath); + return result; + } + bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + if (!jsonInfo["id"].GetString(&FieldId)) { + return false; + } + if (jsonInfo.Has("path")) { + if (!jsonInfo["path"].GetString(&JsonPath)) { + return false; + } + } + return true; + } +}; + +class TExtractorCityHash64: public IIndexExtractor { +private: + YDB_READONLY_DEF(std::vector<TExtractorField>, Fields); + static TFactory::TRegistrator<TExtractorCityHash64> Registrator; +protected: + virtual std::vector<ui64> DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const override; + virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override; + virtual NJson::TJsonValue DoSerializeToJson() const override; +public: + static inline TString ClassName = "city64"; + + virtual TString GetClassName() const override { + return ClassName; + } +}; + +} diff --git a/ydb/services/ext_index/metadata/extractor/container.cpp b/ydb/services/ext_index/metadata/extractor/container.cpp new file mode 100644 index 0000000000..b31ddf8688 --- /dev/null +++ b/ydb/services/ext_index/metadata/extractor/container.cpp @@ -0,0 +1,5 @@ +#include "container.h" + +namespace NKikimr::NMetadata::NCSIndex { + +} diff --git a/ydb/services/ext_index/metadata/extractor/container.h b/ydb/services/ext_index/metadata/extractor/container.h new file mode 100644 index 0000000000..af3ae0216d --- /dev/null +++ b/ydb/services/ext_index/metadata/extractor/container.h @@ -0,0 +1,69 @@ +#pragma once +#include <library/cpp/json/writer/json_value.h> +#include <library/cpp/json/json_reader.h> + +namespace NKikimr::NMetadata::NCSIndex { + +template <class TInterface> +class TInterfaceContainer { +private: + using TPtr = typename TInterface::TPtr; + using TFactory = typename TInterface::TFactory; + TPtr Object; +public: + TInterfaceContainer() = default; + + explicit TInterfaceContainer(TPtr object) + : Object(object) + { + + } + + const TInterface* operator->() const { + return Object.get(); + } + + TInterface* operator->() { + return Object.get(); + } + + TString DebugString() const { + return SerializeToJson().GetStringRobust(); + } + + bool DeserializeFromJson(const TString& jsonString) { + NJson::TJsonValue jsonInfo; + if (!NJson::ReadJsonFastTree(jsonString, &jsonInfo)) { + return false; + } + return DeserializeFromJson(jsonInfo); + } + + bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + TString className; + if (!jsonInfo["class_name"].GetString(&className)) { + return false; + } + TPtr result(TFactory::Construct(className)); + if (!result) { + return false; + } + if (!result->DeserializeFromJson(jsonInfo["object"])) { + return false; + } + Object = result; + return true; + } + + NJson::TJsonValue SerializeToJson() const { + if (!Object) { + return NJson::JSON_NULL; + } + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("class_name", Object->GetClassName()); + result.InsertValue("object", Object->SerializeToJson()); + return result; + } +}; + +} diff --git a/ydb/services/ext_index/metadata/extractor/ya.make b/ydb/services/ext_index/metadata/extractor/ya.make new file mode 100644 index 0000000000..7d673a81f7 --- /dev/null +++ b/ydb/services/ext_index/metadata/extractor/ya.make @@ -0,0 +1,17 @@ +LIBRARY() + +SRCS( + abstract.cpp + GLOBAL city.cpp + container.cpp +) + +PEERDIR( + contrib/libs/apache/arrow + ydb/core/protos + ydb/core/tx/sharding +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/services/ext_index/metadata/object.cpp b/ydb/services/ext_index/metadata/object.cpp index 85185cd5bc..609e0357cc 100644 --- a/ydb/services/ext_index/metadata/object.cpp +++ b/ydb/services/ext_index/metadata/object.cpp @@ -1,14 +1,11 @@ #include "object.h" #include "behaviour.h" -#include <ydb/core/tx/sharding/sharding.h> #include <ydb/services/metadata/manager/ydb_value_operator.h> #include <util/folder/path.h> namespace NKikimr::NMetadata::NCSIndex { -TExtractorCityHash64::TFactory::TRegistrator<TExtractorCityHash64> TExtractorCityHash64::Registrator(TExtractorCityHash64::ClassName); - IClassBehaviour::TPtr TObject::GetBehaviour() { return TBehaviour::GetInstance(); } @@ -84,9 +81,4 @@ bool TObject::TryProvideTtl(const NKikimrSchemeOp::TColumnTableDescription& csDe return true; } -std::vector<ui64> TExtractorCityHash64::DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const { - NSharding::THashSharding hashSharding(0, Fields); - return hashSharding.MakeHashes(batch); -} - } diff --git a/ydb/services/ext_index/metadata/object.h b/ydb/services/ext_index/metadata/object.h index f188029d05..b56c8e67c3 100644 --- a/ydb/services/ext_index/metadata/object.h +++ b/ydb/services/ext_index/metadata/object.h @@ -10,137 +10,11 @@ #include <library/cpp/json/writer/json_value.h> #include <library/cpp/json/json_reader.h> #include <util/string/cast.h> +#include "extractor/abstract.h" +#include "extractor/container.h" namespace NKikimr::NMetadata::NCSIndex { -class IIndexExtractor { -protected: - virtual std::vector<ui64> DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0; - virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0; - virtual NJson::TJsonValue DoSerializeToJson() const = 0; -public: - using TPtr = std::shared_ptr<IIndexExtractor>; - using TFactory = NObjectFactory::TObjectFactory<IIndexExtractor, TString>; - - virtual ~IIndexExtractor() = default; - - std::vector<ui64> ExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const { - return DoExtractIndex(batch); - } - - bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) { - return DoDeserializeFromJson(jsonInfo); - } - - NJson::TJsonValue SerializeToJson() const { - return DoSerializeToJson(); - } - - virtual TString GetClassName() const = 0; -}; - -class TExtractorCityHash64: public IIndexExtractor { -private: - YDB_READONLY_DEF(std::vector<TString>, Fields); - static TFactory::TRegistrator<TExtractorCityHash64> Registrator; -protected: - virtual std::vector<ui64> DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const override; - virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override { - const NJson::TJsonValue::TArray* jsonFields; - if (!jsonInfo["fields"].GetArrayPointer(&jsonFields)) { - return false; - } - for (auto&& i : *jsonFields) { - TString fieldId; - if (!i["id"].GetString(&fieldId)) { - return false; - } - Fields.emplace_back(fieldId); - } - if (Fields.size() == 0) { - return false; - } - return true; - } - virtual NJson::TJsonValue DoSerializeToJson() const override { - NJson::TJsonValue result; - auto& jsonFields = result.InsertValue("fields", NJson::JSON_ARRAY); - for (auto&& i : Fields) { - auto& jsonField = jsonFields.AppendValue(NJson::JSON_MAP); - jsonField.InsertValue("id", i); - } - return result; - } -public: - static inline TString ClassName = "city64"; - - virtual TString GetClassName() const override { - return ClassName; - } -}; - -template <class TInterface> -class TInterfaceContainer { -private: - using TPtr = typename TInterface::TPtr; - using TFactory = typename TInterface::TFactory; - TPtr Object; -public: - TInterfaceContainer() = default; - - explicit TInterfaceContainer(TPtr object) - : Object(object) - { - - } - - const TInterface* operator->() const { - return Object.get(); - } - - TInterface* operator->() { - return Object.get(); - } - - TString DebugString() const { - return SerializeToJson().GetStringRobust(); - } - - bool DeserializeFromJson(const TString& jsonString) { - NJson::TJsonValue jsonInfo; - if (!NJson::ReadJsonFastTree(jsonString, &jsonInfo)) { - return false; - } - return DeserializeFromJson(jsonInfo); - } - - bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) { - TString className; - if (!jsonInfo["class_name"].GetString(&className)) { - return false; - } - TPtr result(TFactory::Construct(className)); - if (!result) { - return false; - } - if (!result->DeserializeFromJson(jsonInfo["object"])) { - return false; - } - Object = result; - return true; - } - - NJson::TJsonValue SerializeToJson() const { - if (!Object) { - return NJson::JSON_NULL; - } - NJson::TJsonValue result = NJson::JSON_MAP; - result.InsertValue("class_name", Object->GetClassName()); - result.InsertValue("object", Object->SerializeToJson()); - return result; - } -}; - class TObject: public NModifications::TObject<TObject> { private: YDB_READONLY_DEF(TString, IndexId); diff --git a/ydb/services/ext_index/metadata/ya.make b/ydb/services/ext_index/metadata/ya.make index 3b6b097ff2..6284eac2d0 100644 --- a/ydb/services/ext_index/metadata/ya.make +++ b/ydb/services/ext_index/metadata/ya.make @@ -1,7 +1,7 @@ LIBRARY() SRCS( - GLOBAL object.cpp + object.cpp GLOBAL behaviour.cpp manager.cpp initializer.cpp @@ -16,9 +16,7 @@ PEERDIR( ydb/core/grpc_services/base ydb/core/grpc_services ydb/services/metadata/request - ydb/core/tx/sharding + ydb/services/ext_index/metadata/extractor ) -YQL_LAST_ABI_VERSION() - END() diff --git a/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt index 5f0e02728b..ee55c1e81c 100644 --- a/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt +++ b/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt @@ -14,6 +14,7 @@ target_link_libraries(services-ext_index-service PUBLIC cpp-actors-core services-ext_index-metadata services-ext_index-common + yql-minikql-jsonpath api-protos ) target_sources(services-ext_index-service PRIVATE diff --git a/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt index 814de1a95c..b3532697f8 100644 --- a/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt +++ b/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt @@ -15,6 +15,7 @@ target_link_libraries(services-ext_index-service PUBLIC cpp-actors-core services-ext_index-metadata services-ext_index-common + yql-minikql-jsonpath api-protos ) target_sources(services-ext_index-service PRIVATE diff --git a/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt index 814de1a95c..b3532697f8 100644 --- a/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt +++ b/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt @@ -15,6 +15,7 @@ target_link_libraries(services-ext_index-service PUBLIC cpp-actors-core services-ext_index-metadata services-ext_index-common + yql-minikql-jsonpath api-protos ) target_sources(services-ext_index-service PRIVATE diff --git a/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt index 5f0e02728b..ee55c1e81c 100644 --- a/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt +++ b/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt @@ -14,6 +14,7 @@ target_link_libraries(services-ext_index-service PUBLIC cpp-actors-core services-ext_index-metadata services-ext_index-common + yql-minikql-jsonpath api-protos ) target_sources(services-ext_index-service PRIVATE diff --git a/ydb/services/ext_index/service/add_index.cpp b/ydb/services/ext_index/service/add_index.cpp index c15eb50a73..0ead4ed16f 100644 --- a/ydb/services/ext_index/service/add_index.cpp +++ b/ydb/services/ext_index/service/add_index.cpp @@ -66,7 +66,7 @@ void TIndexUpsertActor::Bootstrap() { const std::vector<ui64> hashes = IndexInfo.GetExtractor()->ExtractIndex(Data); if (hashes.size() != (size_t)Data->num_rows()) { - ExternalController->OnIndexUpsertionFailed("inconsistency hashes"); + ExternalController->OnIndexUpsertionFailed("inconsistency hashes: " + ::ToString(hashes.size()) + " != " + ::ToString(Data->num_rows())); PassAway(); return; } diff --git a/ydb/services/ext_index/service/ya.make b/ydb/services/ext_index/service/ya.make index 18171d97b1..b3966eec80 100644 --- a/ydb/services/ext_index/service/ya.make +++ b/ydb/services/ext_index/service/ya.make @@ -12,6 +12,7 @@ PEERDIR( library/cpp/actors/core ydb/services/ext_index/metadata ydb/services/ext_index/common + ydb/library/yql/minikql/jsonpath ydb/public/api/protos ) diff --git a/ydb/services/ext_index/ut/ut_ext_index.cpp b/ydb/services/ext_index/ut/ut_ext_index.cpp index 74ffdf3259..d6f0c97b41 100644 --- a/ydb/services/ext_index/ut/ut_ext_index.cpp +++ b/ydb/services/ext_index/ut/ut_ext_index.cpp @@ -6,6 +6,7 @@ #include <ydb/core/wrappers/ut_helpers/s3_mock.h> #include <ydb/core/wrappers/s3_wrapper.h> #include <ydb/core/wrappers/fake_storage.h> +#include <ydb/core/tx/sharding/xx_hash.h> #include <ydb/library/accessor/accessor.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/services/metadata/manager/alter.h> @@ -19,6 +20,7 @@ #include <library/cpp/testing/unittest/registar.h> #include <util/system/hostname.h> +#include <contrib/libs/xxhash/xxhash.h> namespace NKikimr { @@ -42,7 +44,7 @@ public: %s } } - )", storeName.c_str(), storeShardsCount, PROTO_SCHEMA)); + )", storeName.c_str(), storeShardsCount, GetTestTableSchema().data())); TString shardingColumns = "[\"timestamp\", \"uid\"]"; if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") { @@ -86,7 +88,6 @@ Y_UNIT_TEST_SUITE(ExternalIndex) { .SetEnableExternalIndex(true) .SetEnableBackgroundTasks(true) .SetEnableOlapSchemaOperations(true); - ; ; Tests::TServer::TPtr server = new Tests::TServer(serverSettings); @@ -106,16 +107,22 @@ Y_UNIT_TEST_SUITE(ExternalIndex) { // runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG); TLocalHelper lHelper(*server); + lHelper.SetWithJsonDocument(true); lHelper.CreateTestOlapTable("olapTable", 2); lHelper.StartSchemaRequest("CREATE OBJECT `/Root/olapStore/olapTable:ext_index_simple` ( " - "TYPE CS_EXT_INDEX) WITH (extractor = `{\"class_name\" : \"city64\", \"object\" : {\"fields\" : [{\"id\":\"uid\"}, {\"id\":\"message\"}]}}`)"); + "TYPE CS_EXT_INDEX) WITH (extractor = `{\"class_name\" : \"city64\", \"object\" :" + "{\"fields\" : [{\"id\":\"uid\"}, {\"id\":\"level\"}, {\"id\":\"json_payload\", \"path\" : \"strict $.a.b\"}]}}`)"); + lHelper.StartSchemaRequest("CREATE OBJECT `/Root/olapStore/olapTable:ext_index_simple1` ( " + "TYPE CS_EXT_INDEX) WITH (extractor = `{\"class_name\" : \"city64\", \"object\" :" + "{\"fields\" : [{\"id\":\"uid\"}]}}`)"); Cerr << "Wait tables" << Endl; runtime.SimulateSleep(TDuration::Seconds(20)); Cerr << "Initialization tables" << Endl; const TInstant pkStart = Now() - TDuration::Days(15); ui32 idx = 0; - auto batch = lHelper.TestArrowBatch(0, (pkStart + TDuration::Seconds(2 * idx++)).GetValue(), 6000); + const ui32 tsStart = (pkStart + TDuration::Seconds(2 * idx++)).GetValue(); + auto batch = lHelper.TestArrowBatch(0, tsStart, 6000); auto batchSize = NArrow::GetBatchDataSize(batch); Cerr << "Inserting " << batchSize << " bytes..." << Endl; UNIT_ASSERT(batchSize > 4 * 1024 * 1024); // NColumnShard::TLimits::MIN_BYTES_TO_INSERT @@ -128,13 +135,38 @@ Y_UNIT_TEST_SUITE(ExternalIndex) { { TString resultData; lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple`", true, &resultData); + Cerr << resultData << Endl; + UNIT_ASSERT_EQUAL(resultData, "[6000u]"); + } + { + NSharding::TStreamStringHashCalcer calcer(0); + calcer.Start(); + TString resultData; + TString uid = "uid_" + ::ToString(tsStart + 2); + ui32 level = 2; + TString id = "2"; + calcer.Update((const ui8*)uid.data(), uid.size()); + calcer.Update((const ui8*)&level, sizeof(level)); + calcer.Update((const ui8*)id.data(), id.size()); + const ui64 hash = calcer.Finish(); + lHelper.StartDataRequest("SELECT * FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple` WHERE index_hash = " + + ::ToString(hash), true, &resultData); + Cerr << resultData << "/" << tsStart << Endl; + UNIT_ASSERT_EQUAL(resultData, "[[" + ::ToString(hash) + "u];[" + ::ToString(tsStart + 2) + "u]]"); + } + { + TString resultData; + lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple1`", true, &resultData); + Cerr << resultData << Endl; UNIT_ASSERT_EQUAL(resultData, "[6000u]"); } lHelper.StartSchemaRequest("DROP OBJECT `/Root/olapStore/olapTable:ext_index_simple` (TYPE CS_EXT_INDEX)"); + lHelper.StartSchemaRequest("DROP OBJECT `/Root/olapStore/olapTable:ext_index_simple1` (TYPE CS_EXT_INDEX)"); for (ui32 i = 0; i < 10; ++i) { server->GetRuntime()->SimulateSleep(TDuration::Seconds(10)); } lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple`", false); + lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple1`", false); { TString resultData; lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/external`", true, &resultData); |