diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-14 08:06:00 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2023-03-14 08:06:00 +0300 |
commit | 9aa65dc165f24925a281f89c975cc5117823934f (patch) | |
tree | 4693538565fa3ffde7da0deb4579d25c8211e374 | |
parent | 317a84f0cfb9a5a203ac765e9571f2dd6c06b209 (diff) | |
download | ydb-9aa65dc165f24925a281f89c975cc5117823934f.tar.gz |
external indexes
78 files changed, 2655 insertions, 5 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index 7ee00f8332..4c1257368e 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -159,6 +159,7 @@ struct TKikimrEvents : TEvents { ES_TEST_LOAD, ES_GRPC_CANCELATION, ES_DISCOVERY, + ES_EXT_INDEX, }; }; diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt index 411c78bad7..a08d5d9d87 100644 --- a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt @@ -130,6 +130,8 @@ target_link_libraries(run PUBLIC ydb-services-metadata services-bg_tasks-ds_table ydb-services-bg_tasks + services-ext_index-service + services-ext_index-metadata ydb-services-monitoring ydb-services-persqueue_cluster_discovery ydb-services-persqueue_v1 diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt index 0f62710fa1..15d3892351 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt @@ -131,6 +131,8 @@ target_link_libraries(run PUBLIC ydb-services-metadata services-bg_tasks-ds_table ydb-services-bg_tasks + services-ext_index-service + services-ext_index-metadata ydb-services-monitoring ydb-services-persqueue_cluster_discovery ydb-services-persqueue_v1 diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt index 0f62710fa1..15d3892351 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt @@ -131,6 +131,8 @@ target_link_libraries(run PUBLIC ydb-services-metadata services-bg_tasks-ds_table ydb-services-bg_tasks + services-ext_index-service + services-ext_index-metadata ydb-services-monitoring ydb-services-persqueue_cluster_discovery ydb-services-persqueue_v1 diff --git a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt index 411c78bad7..a08d5d9d87 100644 --- a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt @@ -130,6 +130,8 @@ target_link_libraries(run PUBLIC ydb-services-metadata services-bg_tasks-ds_table ydb-services-bg_tasks + services-ext_index-service + services-ext_index-metadata ydb-services-monitoring ydb-services-persqueue_cluster_discovery ydb-services-persqueue_v1 diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h index 1e859c1705..b584d31ea6 100644 --- a/ydb/core/driver_lib/run/config.h +++ b/ydb/core/driver_lib/run/config.h @@ -69,6 +69,7 @@ union TBasicKikimrServicesMask { bool EnableMetadataProvider:1; bool EnableReplicationService:1; bool EnableBackgroundTasks:1; + bool ExternalIndex:1; }; ui64 Raw; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 0efe573019..ad8726f439 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -159,6 +159,8 @@ #include <ydb/services/bg_tasks/ds_table/executor.h> #include <ydb/services/bg_tasks/service.h> +#include <ydb/services/ext_index/common/config.h> +#include <ydb/services/ext_index/service/executor.h> #include <library/cpp/actors/protos/services_common.pb.h> @@ -2297,9 +2299,26 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu } } +TExternalIndexInitializer::TExternalIndexInitializer(const TKikimrRunConfig& runConfig) + : IKikimrServicesInitializer(runConfig) { +} + +void TExternalIndexInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { + NCSIndex::TConfig serviceConfig; + if (Config.HasExternalIndexConfig()) { + Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetExternalIndexConfig())); + } + + if (serviceConfig.IsEnabled()) { + auto service = NCSIndex::CreateService(serviceConfig); + setup->LocalServices.push_back(std::make_pair( + NCSIndex::MakeServiceId(NodeId), + TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId))); + } +} + TMetadataProviderInitializer::TMetadataProviderInitializer(const TKikimrRunConfig& runConfig) - : IKikimrServicesInitializer(runConfig) -{ + : IKikimrServicesInitializer(runConfig) { } void TMetadataProviderInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index 0bb7ebddb0..1c36096e13 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -383,6 +383,12 @@ private: std::shared_ptr<TModuleFactories> Factories; }; +class TExternalIndexInitializer: public IKikimrServicesInitializer { +public: + TExternalIndexInitializer(const TKikimrRunConfig& runConfig); + void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; +}; + class TMetadataProviderInitializer: public IKikimrServicesInitializer { public: TMetadataProviderInitializer(const TKikimrRunConfig& runConfig); diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 4b12f5fb51..1d5715c57f 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1485,6 +1485,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TMetadataProviderInitializer(runConfig)); } + if (serviceMask.ExternalIndex) { + sil->AddServiceInitializer(new TExternalIndexInitializer(runConfig)); + } + if (serviceMask.EnableBackgroundTasks) { sil->AddServiceInitializer(new TBackgroundTasksInitializer(runConfig)); } diff --git a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt index 7b1e0c5eed..3a3ed2a3d9 100644 --- a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt @@ -55,6 +55,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC api-protos public-lib-operation_id cpp-client-resources + services-ext_index-common ) target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/audit_log.cpp diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt index 76a073859f..c2c39ae694 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt @@ -56,6 +56,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC api-protos public-lib-operation_id cpp-client-resources + services-ext_index-common ) target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/audit_log.cpp diff --git a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt index 76a073859f..c2c39ae694 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt @@ -56,6 +56,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC api-protos public-lib-operation_id cpp-client-resources + services-ext_index-common ) target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/audit_log.cpp diff --git a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt index 7b1e0c5eed..3a3ed2a3d9 100644 --- a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt @@ -55,6 +55,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC api-protos public-lib-operation_id cpp-client-resources + services-ext_index-common ) target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/audit_log.cpp diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index 8b8eb20a36..a7369c1026 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -14,6 +14,7 @@ #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/tx/columnshard/columnshard.h> #include <ydb/core/tx/long_tx_service/public/events.h> +#include <ydb/services/ext_index/common/service.h> #include <library/cpp/actors/wilson/wilson_profile_span.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h> @@ -467,6 +468,13 @@ protected: ui64 tableId = entry.TableId.PathId.LocalPathId; + if (NCSIndex::TServiceOperator::IsEnabled()) { + TBase::Send(NCSIndex::MakeServiceId(TBase::SelfId().NodeId()), + new NCSIndex::TEvAddData(GetDeserializedBatch(), Path, std::make_shared<NCSIndex::TNaiveDataUpsertController>(TBase::SelfId()))); + } else { + IndexReady = true; + } + if (sharding.HasRandomSharding()) { ui64 shard = sharding.GetColumnShards(0); SendWriteRequest(shard, tableId, DedupId, GetSerializedData()); @@ -537,6 +545,7 @@ private: hFunc(TEvColumnShard::TEvWriteResult, Handle); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); CFunc(TEvents::TSystem::Wakeup, HandleTimeout); + hFunc(NCSIndex::TEvAddDataResult, Handle); } } @@ -614,6 +623,7 @@ private: Y_UNUSED(ctx); switch (ev->GetTypeRewrite()) { hFunc(TEvLongTxService::TEvAttachColumnShardWritesResult, Handle); + hFunc(NCSIndex::TEvAddDataResult, Handle); } } @@ -629,8 +639,27 @@ private: } return ReplyError(msg->Record.GetStatus()); } + if (IndexReady) { + ReplySuccess(); + } else { + ColumnShardReady = true; + } + } + + void Handle(NCSIndex::TEvAddDataResult::TPtr& ev) { + const auto* msg = ev->Get(); + if (msg->GetErrorMessage()) { + NWilson::TProfileSpan pSpan(0, ActorSpan.GetTraceId(), "NCSIndex::TEvAddDataResult"); + RaiseIssue(NYql::TIssue(msg->GetErrorMessage())); + return ReplyError(Ydb::StatusIds::GENERIC_ERROR, msg->GetErrorMessage()); + } else { + if (ColumnShardReady) { + ReplySuccess(); + } else { + IndexReady = true; + } + } - ReplySuccess(); } private: @@ -665,6 +694,8 @@ private: THashSet<ui64> ShardsToRetry; NWilson::TProfileSpan ActorSpan; TActorId TimeoutTimerActorId; + bool ColumnShardReady = false; + bool IndexReady = false; }; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index e1618601c0..b8a8a78b3e 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -601,6 +601,12 @@ message TInternalRequestConfig { optional uint32 RetryPeriodFinishSeconds = 2 [default = 30]; } +message TExternalIndexConfig { + optional bool Enabled = 1 [default = true]; + optional TInternalRequestConfig RequestConfig = 2; + optional string InternalTablePath = 3; +} + message TMetadataProviderConfig { optional bool Enabled = 1 [default = true]; optional uint32 RefreshPeriodSeconds = 2 [default = 10]; @@ -1787,6 +1793,7 @@ message TAppConfig { optional TBackgroundTasksConfig BackgroundTasksConfig = 60; optional TAuditConfig AuditConfig = 61; optional TClientCertificateAuthorization ClientCertificateAuthorization = 62; + optional TExternalIndexConfig ExternalIndexConfig = 63; optional NYq.NConfig.TConfig YandexQueryConfig = 50; // TODO: remove after migration to FederatedQueryConfig diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 7f385a7310..3992f2c43a 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -360,6 +360,8 @@ enum EServiceKikimr { // Discovery DISCOVERY = 1800; DISCOVERY_CACHE = 1801; + + EXT_INDEX = 1900; }; message TActivity { diff --git a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt index 699c067000..82e750dc08 100644 --- a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt @@ -85,6 +85,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-services-cms ydb-services-datastreams ydb-services-discovery + services-ext_index-service ydb-services-fq ydb-services-kesus ydb-services-persqueue_cluster_discovery diff --git a/ydb/core/testlib/CMakeLists.linux-aarch64.txt b/ydb/core/testlib/CMakeLists.linux-aarch64.txt index 0439fe96da..2a6f45b5b4 100644 --- a/ydb/core/testlib/CMakeLists.linux-aarch64.txt +++ b/ydb/core/testlib/CMakeLists.linux-aarch64.txt @@ -86,6 +86,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-services-cms ydb-services-datastreams ydb-services-discovery + services-ext_index-service ydb-services-fq ydb-services-kesus ydb-services-persqueue_cluster_discovery diff --git a/ydb/core/testlib/CMakeLists.linux-x86_64.txt b/ydb/core/testlib/CMakeLists.linux-x86_64.txt index 0439fe96da..2a6f45b5b4 100644 --- a/ydb/core/testlib/CMakeLists.linux-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.linux-x86_64.txt @@ -86,6 +86,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-services-cms ydb-services-datastreams ydb-services-discovery + services-ext_index-service ydb-services-fq ydb-services-kesus ydb-services-persqueue_cluster_discovery diff --git a/ydb/core/testlib/CMakeLists.windows-x86_64.txt b/ydb/core/testlib/CMakeLists.windows-x86_64.txt index 699c067000..82e750dc08 100644 --- a/ydb/core/testlib/CMakeLists.windows-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.windows-x86_64.txt @@ -85,6 +85,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-services-cms ydb-services-datastreams ydb-services-discovery + services-ext_index-service ydb-services-fq ydb-services-kesus ydb-services-persqueue_cluster_discovery diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index de7607c7e8..145194530d 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -94,6 +94,9 @@ #include <ydb/services/metadata/service.h> #include <ydb/services/bg_tasks/ds_table/executor.h> #include <ydb/services/bg_tasks/service.h> +#include <ydb/services/ext_index/common/config.h> +#include <ydb/services/ext_index/common/service.h> +#include <ydb/services/ext_index/service/executor.h> #include <ydb/library/folder_service/mock/mock_folder_service.h> #include <ydb/core/client/server/msgbus_server_tracer.h> @@ -724,6 +727,11 @@ namespace Tests { const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NBackgroundTasks::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid); } + if (Settings->IsEnableExternalIndex()) { + auto* actor = NCSIndex::CreateService(NCSIndex::TConfig()); + const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); + Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid); + } Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); auto sysViewService = NSysView::CreateSysViewServiceForTests(); diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 21de04466e..a2e20023fc 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -212,6 +212,8 @@ namespace Tests { private: YDB_FLAG_ACCESSOR(EnableMetadataProvider, true); YDB_FLAG_ACCESSOR(EnableBackgroundTasks, false); + YDB_FLAG_ACCESSOR(EnableExternalIndex, false); + }; class TServer : public TThrRefBase, TMoveOnly { diff --git a/ydb/core/tx/tx_proxy/upload_rows.h b/ydb/core/tx/tx_proxy/upload_rows.h index 93d116b04e..63590b2bff 100644 --- a/ydb/core/tx/tx_proxy/upload_rows.h +++ b/ydb/core/tx/tx_proxy/upload_rows.h @@ -16,10 +16,13 @@ enum class EUploadRowsMode { WriteToTableShadow, }; +using TUploadTypes = TVector<std::pair<TString, Ydb::Type>>; +using TUploadRows = TVector<std::pair<TSerializedCellVec, TString>>; + IActor* CreateUploadRowsInternal(const TActorId& sender, const TString& table, - std::shared_ptr<TVector<std::pair<TString, Ydb::Type> > > types, - std::shared_ptr<TVector<std::pair<TSerializedCellVec, TString> > > rows, + std::shared_ptr<TUploadTypes> types, + std::shared_ptr<TUploadRows> rows, EUploadRowsMode mode = EUploadRowsMode::Normal, bool writeToPrivateTable = false); } // namespace NTxProxy diff --git a/ydb/services/CMakeLists.txt b/ydb/services/CMakeLists.txt index 764b3c3ebf..4b697f067a 100644 --- a/ydb/services/CMakeLists.txt +++ b/ydb/services/CMakeLists.txt @@ -11,6 +11,7 @@ add_subdirectory(bg_tasks) add_subdirectory(cms) add_subdirectory(datastreams) add_subdirectory(discovery) +add_subdirectory(ext_index) add_subdirectory(fq) add_subdirectory(kesus) add_subdirectory(lib) diff --git a/ydb/services/ext_index/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..2694d34fb3 --- /dev/null +++ b/ydb/services/ext_index/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(common) +add_subdirectory(metadata) +add_subdirectory(service) +add_subdirectory(ut) + +add_library(ydb-services-ext_index INTERFACE) +target_link_libraries(ydb-services-ext_index INTERFACE + contrib-libs-cxxsupp + yutil + services-ext_index-metadata + services-ext_index-service + services-ext_index-common +) diff --git a/ydb/services/ext_index/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..6e822cd7ce --- /dev/null +++ b/ydb/services/ext_index/CMakeLists.linux-aarch64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(common) +add_subdirectory(metadata) +add_subdirectory(service) +add_subdirectory(ut) + +add_library(ydb-services-ext_index INTERFACE) +target_link_libraries(ydb-services-ext_index INTERFACE + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + services-ext_index-metadata + services-ext_index-service + services-ext_index-common +) diff --git a/ydb/services/ext_index/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..6e822cd7ce --- /dev/null +++ b/ydb/services/ext_index/CMakeLists.linux-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(common) +add_subdirectory(metadata) +add_subdirectory(service) +add_subdirectory(ut) + +add_library(ydb-services-ext_index INTERFACE) +target_link_libraries(ydb-services-ext_index INTERFACE + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + services-ext_index-metadata + services-ext_index-service + services-ext_index-common +) diff --git a/ydb/services/ext_index/CMakeLists.txt b/ydb/services/ext_index/CMakeLists.txt new file mode 100644 index 0000000000..d90657116d --- /dev/null +++ b/ydb/services/ext_index/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/services/ext_index/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..2694d34fb3 --- /dev/null +++ b/ydb/services/ext_index/CMakeLists.windows-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(common) +add_subdirectory(metadata) +add_subdirectory(service) +add_subdirectory(ut) + +add_library(ydb-services-ext_index INTERFACE) +target_link_libraries(ydb-services-ext_index INTERFACE + contrib-libs-cxxsupp + yutil + services-ext_index-metadata + services-ext_index-service + services-ext_index-common +) diff --git a/ydb/services/ext_index/common/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/common/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..d748daffc9 --- /dev/null +++ b/ydb/services/ext_index/common/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ext_index-common) +target_link_libraries(services-ext_index-common PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + api-protos + ydb-core-protos + libs-apache-arrow +) +target_sources(services-ext_index-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/service.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/config.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/events.cpp +) diff --git a/ydb/services/ext_index/common/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/common/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..5b8fcd6cd8 --- /dev/null +++ b/ydb/services/ext_index/common/CMakeLists.linux-aarch64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ext_index-common) +target_link_libraries(services-ext_index-common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + api-protos + ydb-core-protos + libs-apache-arrow +) +target_sources(services-ext_index-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/service.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/config.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/events.cpp +) diff --git a/ydb/services/ext_index/common/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/common/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..5b8fcd6cd8 --- /dev/null +++ b/ydb/services/ext_index/common/CMakeLists.linux-x86_64.txt @@ -0,0 +1,24 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ext_index-common) +target_link_libraries(services-ext_index-common PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + api-protos + ydb-core-protos + libs-apache-arrow +) +target_sources(services-ext_index-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/service.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/config.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/events.cpp +) diff --git a/ydb/services/ext_index/common/CMakeLists.txt b/ydb/services/ext_index/common/CMakeLists.txt new file mode 100644 index 0000000000..d90657116d --- /dev/null +++ b/ydb/services/ext_index/common/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/services/ext_index/common/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/common/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..d748daffc9 --- /dev/null +++ b/ydb/services/ext_index/common/CMakeLists.windows-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ext_index-common) +target_link_libraries(services-ext_index-common PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + api-protos + ydb-core-protos + libs-apache-arrow +) +target_sources(services-ext_index-common PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/service.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/config.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/events.cpp +) diff --git a/ydb/services/ext_index/common/config.cpp b/ydb/services/ext_index/common/config.cpp new file mode 100644 index 0000000000..3245521881 --- /dev/null +++ b/ydb/services/ext_index/common/config.cpp @@ -0,0 +1,21 @@ +#include "config.h" +#include <ydb/core/base/appdata.h> + +namespace NKikimr::NCSIndex { + +bool TConfig::DeserializeFromProto(const NKikimrConfig::TExternalIndexConfig& config) { + EnabledFlag = config.GetEnabled(); + if (!RequestConfig.DeserializeFromProto(config.GetRequestConfig())) { + return false; + } + if (config.HasInternalTablePath()) { + InternalTablePath = config.GetInternalTablePath(); + } + return true; +} + +TString TConfig::GetTablePath() const { + return AppData()->TenantName + "/" + InternalTablePath; +} + +} diff --git a/ydb/services/ext_index/common/config.h b/ydb/services/ext_index/common/config.h new file mode 100644 index 0000000000..be48feb29b --- /dev/null +++ b/ydb/services/ext_index/common/config.h @@ -0,0 +1,19 @@ +#pragma once +#include <ydb/core/protos/config.pb.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/services/metadata/request/config.h> + +#include <util/datetime/base.h> + +namespace NKikimr::NCSIndex { + +class TConfig { +private: + YDB_READONLY_DEF(NMetadata::NRequest::TConfig, RequestConfig); + YDB_READONLY(TString, InternalTablePath, ".ext_index/tasks"); + YDB_READONLY_FLAG(Enabled, true); +public: + bool DeserializeFromProto(const NKikimrConfig::TExternalIndexConfig& config); + TString GetTablePath() const; +}; +} diff --git a/ydb/services/ext_index/common/events.cpp b/ydb/services/ext_index/common/events.cpp new file mode 100644 index 0000000000..805374e568 --- /dev/null +++ b/ydb/services/ext_index/common/events.cpp @@ -0,0 +1,5 @@ +#include "events.h" + +namespace NKikimr::NBackgroundTasks { + +} diff --git a/ydb/services/ext_index/common/events.h b/ydb/services/ext_index/common/events.h new file mode 100644 index 0000000000..8b2bfa1ee8 --- /dev/null +++ b/ydb/services/ext_index/common/events.h @@ -0,0 +1,16 @@ +#pragma once +#include <ydb/core/base/events.h> + +#include <library/cpp/actors/core/events.h> + +namespace NKikimr::NCSIndex { + +enum EEvents { + EvAddData = EventSpaceBegin(TKikimrEvents::ES_EXT_INDEX), + EvAddDataResult, + EvEnd +}; + +static_assert(EEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_EXT_INDEX), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_EXT_INDEX)"); + +} diff --git a/ydb/services/ext_index/common/service.cpp b/ydb/services/ext_index/common/service.cpp new file mode 100644 index 0000000000..afd9acee60 --- /dev/null +++ b/ydb/services/ext_index/common/service.cpp @@ -0,0 +1,23 @@ +#include "service.h" + +#include "config.h" + +namespace NKikimr::NCSIndex { + +NActors::TActorId MakeServiceId(const ui32 nodeId) { + return NActors::TActorId(nodeId, "SrvcExtIndex"); +} + +void TServiceOperator::Register(const TConfig& config) { + auto* service = Singleton<TServiceOperator>(); + std::unique_lock<std::shared_mutex> lock(service->Lock); + service->EnabledFlag = config.IsEnabled(); +} + +bool TServiceOperator::IsEnabled() { + auto* service = Singleton<TServiceOperator>(); + std::shared_lock<std::shared_mutex> lock(service->Lock); + return service->EnabledFlag; +} + +} diff --git a/ydb/services/ext_index/common/service.h b/ydb/services/ext_index/common/service.h new file mode 100644 index 0000000000..904025cad0 --- /dev/null +++ b/ydb/services/ext_index/common/service.h @@ -0,0 +1,76 @@ +#pragma once +#include "events.h" + +#include <ydb/library/accessor/accessor.h> +#include <library/cpp/actors/core/event_local.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <shared_mutex> + +namespace NKikimr::NCSIndex { + +class IDataUpsertController { +public: + using TPtr = std::shared_ptr<IDataUpsertController>; + virtual ~IDataUpsertController() = default; + virtual void OnAllIndexesUpserted() = 0; + virtual void OnAllIndexesUpsertionFailed(const TString& errorMessage) = 0; +}; + +class TEvAddData: public NActors::TEventLocal<TEvAddData, EEvents::EvAddData> { +private: + YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Data); + YDB_READONLY_DEF(TString, TablePath); + YDB_READONLY_DEF(IDataUpsertController::TPtr, ExternalController); +public: + TEvAddData(std::shared_ptr<arrow::RecordBatch> data, const TString& tablePath, IDataUpsertController::TPtr externalController) + : Data(data) + , TablePath(tablePath) + , ExternalController(externalController) + { + + } +}; + +class TEvAddDataResult: public NActors::TEventLocal<TEvAddDataResult, EEvents::EvAddDataResult> { +private: + YDB_READONLY_DEF(TString, ErrorMessage); +public: + TEvAddDataResult() = default; + + TEvAddDataResult(const TString& errorMessage) + : ErrorMessage(errorMessage) { + + } +}; + +class TNaiveDataUpsertController: public IDataUpsertController { +private: + const TActorIdentity OwnerId; +public: + TNaiveDataUpsertController(const TActorIdentity& ownerId) + : OwnerId(ownerId) { + + } + virtual void OnAllIndexesUpserted() override { + OwnerId.Send(OwnerId, new TEvAddDataResult()); + } + virtual void OnAllIndexesUpsertionFailed(const TString& errorMessage) override { + OwnerId.Send(OwnerId, new TEvAddDataResult(errorMessage)); + } +}; + +NActors::TActorId MakeServiceId(const ui32 node); + +class TConfig; + +class TServiceOperator { +private: + friend class TExecutor; + std::shared_mutex Lock; + bool EnabledFlag = false; + static void Register(const TConfig& config); +public: + static bool IsEnabled(); +}; + +} diff --git a/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..ab68de15b7 --- /dev/null +++ b/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,50 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ext_index-metadata) +target_compile_options(services-ext_index-metadata PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(services-ext_index-metadata PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + core-grpc_services-local_rpc + core-grpc_services-base + ydb-core-grpc_services + services-metadata-request + core-tx-sharding +) +target_sources(services-ext_index-metadata PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/fetcher.cpp +) + +add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata) +target_compile_options(services-ext_index-metadata.global PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(services-ext_index-metadata.global PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + core-grpc_services-local_rpc + core-grpc_services-base + ydb-core-grpc_services + services-metadata-request + core-tx-sharding +) +target_sources(services-ext_index-metadata.global PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp +) diff --git a/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..3d32b2799c --- /dev/null +++ b/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt @@ -0,0 +1,52 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ext_index-metadata) +target_compile_options(services-ext_index-metadata PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(services-ext_index-metadata PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + core-grpc_services-local_rpc + core-grpc_services-base + ydb-core-grpc_services + services-metadata-request + core-tx-sharding +) +target_sources(services-ext_index-metadata PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/fetcher.cpp +) + +add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata) +target_compile_options(services-ext_index-metadata.global PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(services-ext_index-metadata.global PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + core-grpc_services-local_rpc + core-grpc_services-base + ydb-core-grpc_services + services-metadata-request + core-tx-sharding +) +target_sources(services-ext_index-metadata.global PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp +) diff --git a/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..3d32b2799c --- /dev/null +++ b/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt @@ -0,0 +1,52 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ext_index-metadata) +target_compile_options(services-ext_index-metadata PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(services-ext_index-metadata PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + core-grpc_services-local_rpc + core-grpc_services-base + ydb-core-grpc_services + services-metadata-request + core-tx-sharding +) +target_sources(services-ext_index-metadata PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/fetcher.cpp +) + +add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata) +target_compile_options(services-ext_index-metadata.global PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(services-ext_index-metadata.global PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + core-grpc_services-local_rpc + core-grpc_services-base + ydb-core-grpc_services + services-metadata-request + core-tx-sharding +) +target_sources(services-ext_index-metadata.global PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp +) diff --git a/ydb/services/ext_index/metadata/CMakeLists.txt b/ydb/services/ext_index/metadata/CMakeLists.txt new file mode 100644 index 0000000000..d90657116d --- /dev/null +++ b/ydb/services/ext_index/metadata/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..ab68de15b7 --- /dev/null +++ b/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt @@ -0,0 +1,50 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ext_index-metadata) +target_compile_options(services-ext_index-metadata PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(services-ext_index-metadata PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + core-grpc_services-local_rpc + core-grpc_services-base + ydb-core-grpc_services + services-metadata-request + core-tx-sharding +) +target_sources(services-ext_index-metadata PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/fetcher.cpp +) + +add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata) +target_compile_options(services-ext_index-metadata.global PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(services-ext_index-metadata.global PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + core-grpc_services-local_rpc + core-grpc_services-base + ydb-core-grpc_services + services-metadata-request + core-tx-sharding +) +target_sources(services-ext_index-metadata.global PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp +) diff --git a/ydb/services/ext_index/metadata/behaviour.cpp b/ydb/services/ext_index/metadata/behaviour.cpp new file mode 100644 index 0000000000..7d50e968cf --- /dev/null +++ b/ydb/services/ext_index/metadata/behaviour.cpp @@ -0,0 +1,30 @@ +#include "behaviour.h" +#include "initializer.h" +#include "manager.h" + +namespace NKikimr::NMetadata::NCSIndex { + +TBehaviour::TFactory::TRegistrator<TBehaviour> TBehaviour::Registrator(TObject::GetTypeId()); + +TString TBehaviour::GetInternalStorageTablePath() const { + return "cs_index/external"; +} + +NModifications::IOperationsManager::TPtr TBehaviour::ConstructOperationsManager() const { + return std::make_shared<TManager>(); +} + +NInitializer::IInitializationBehaviour::TPtr TBehaviour::ConstructInitializer() const { + return std::make_shared<TInitializer>(); +} + +TString TBehaviour::GetTypeId() const { + return TObject::GetTypeId(); +} + +IClassBehaviour::TPtr TBehaviour::GetInstance() { + static std::shared_ptr<TBehaviour> result = std::make_shared<TBehaviour>(); + return result; +} + +} diff --git a/ydb/services/ext_index/metadata/behaviour.h b/ydb/services/ext_index/metadata/behaviour.h new file mode 100644 index 0000000000..1bfe358492 --- /dev/null +++ b/ydb/services/ext_index/metadata/behaviour.h @@ -0,0 +1,25 @@ +#pragma once +#include "object.h" + +#include <ydb/services/metadata/abstract/kqp_common.h> +#include <ydb/services/metadata/abstract/initialization.h> +#include <ydb/services/metadata/manager/abstract.h> +#include <ydb/services/metadata/manager/common.h> + +namespace NKikimr::NMetadata::NCSIndex { + +class TBehaviour: public TClassBehaviour<TObject> { +private: + static TFactory::TRegistrator<TBehaviour> Registrator; +protected: + virtual NInitializer::IInitializationBehaviour::TPtr ConstructInitializer() const override; + virtual NModifications::IOperationsManager::TPtr ConstructOperationsManager() const override; + virtual TString GetInternalStorageTablePath() const override; + +public: + TBehaviour() = default; + virtual TString GetTypeId() const override; + static IClassBehaviour::TPtr GetInstance(); +}; + +} diff --git a/ydb/services/ext_index/metadata/fetcher.cpp b/ydb/services/ext_index/metadata/fetcher.cpp new file mode 100644 index 0000000000..52061b6129 --- /dev/null +++ b/ydb/services/ext_index/metadata/fetcher.cpp @@ -0,0 +1,12 @@ +#include "fetcher.h" +#include "behaviour.h" + +namespace NKikimr::NMetadata::NCSIndex { + +std::vector<IClassBehaviour::TPtr> TFetcher::DoGetManagers() const { + return { + TBehaviour::GetInstance() + }; +} + +} diff --git a/ydb/services/ext_index/metadata/fetcher.h b/ydb/services/ext_index/metadata/fetcher.h new file mode 100644 index 0000000000..d4e34a29c8 --- /dev/null +++ b/ydb/services/ext_index/metadata/fetcher.h @@ -0,0 +1,15 @@ +#pragma once + +#include "snapshot.h" +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadata::NCSIndex { + +class TFetcher: public NFetcher::TSnapshotsFetcher<TSnapshot> { +protected: + virtual std::vector<IClassBehaviour::TPtr> DoGetManagers() const override; +public: +}; + +} diff --git a/ydb/services/ext_index/metadata/initializer.cpp b/ydb/services/ext_index/metadata/initializer.cpp new file mode 100644 index 0000000000..d18a0e21ac --- /dev/null +++ b/ydb/services/ext_index/metadata/initializer.cpp @@ -0,0 +1,48 @@ +#include "initializer.h" +#include "object.h" + +namespace NKikimr::NMetadata::NCSIndex { + +void TInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr controller) const { + TVector<NInitializer::ITableModifier::TPtr> result; + { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(TObject::GetBehaviour()->GetStorageTablePath()); + request.add_primary_key(TObject::TDecoder::IndexId); + request.add_primary_key(TObject::TDecoder::TablePath); + { + auto& column = *request.add_columns(); + column.set_name(TObject::TDecoder::IndexId); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UTF8); + } + { + auto& column = *request.add_columns(); + column.set_name(TObject::TDecoder::TablePath); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UTF8); + } + { + auto& column = *request.add_columns(); + column.set_name(TObject::TDecoder::Active); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::BOOL); + } + { + auto& column = *request.add_columns(); + column.set_name(TObject::TDecoder::Delete); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::BOOL); + } + { + auto& column = *request.add_columns(); + column.set_name(TObject::TDecoder::Extractor); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UTF8); + } + result.emplace_back(new NInitializer::TGenericTableModifier<NRequest::TDialogCreateTable>(request, "create")); + auto hRequest = TObject::AddHistoryTableScheme(request); + result.emplace_back(new NInitializer::TGenericTableModifier<NRequest::TDialogCreateTable>(hRequest, "create_history")); + } + result.emplace_back(NInitializer::TACLModifierConstructor::GetReadOnlyModifier(TObject::GetBehaviour()->GetStorageTablePath(), "acl")); + result.emplace_back(NInitializer::TACLModifierConstructor::GetReadOnlyModifier(TObject::GetBehaviour()->GetStorageHistoryTablePath(), "acl_history")); + controller->OnPreparationFinished(result); +} + +} diff --git a/ydb/services/ext_index/metadata/initializer.h b/ydb/services/ext_index/metadata/initializer.h new file mode 100644 index 0000000000..3fc41ccdc5 --- /dev/null +++ b/ydb/services/ext_index/metadata/initializer.h @@ -0,0 +1,15 @@ +#pragma once + +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/services/metadata/abstract/initialization.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadata::NCSIndex { + +class TInitializer: public NInitializer::IInitializationBehaviour { +protected: + virtual void DoPrepare(NInitializer::IInitializerInput::TPtr controller) const override; +public: +}; + +} diff --git a/ydb/services/ext_index/metadata/manager.cpp b/ydb/services/ext_index/metadata/manager.cpp new file mode 100644 index 0000000000..cfdd7d681c --- /dev/null +++ b/ydb/services/ext_index/metadata/manager.cpp @@ -0,0 +1,90 @@ +#include "manager.h" +#include <ydb/services/metadata/manager/ydb_value_operator.h> +#include <ydb/services/metadata/ds_table/scheme_describe.h> + +namespace NKikimr::NMetadata::NCSIndex { + +class TPreparationController: public NProvider::ISchemeDescribeController { +private: + NModifications::IAlterPreparationController<TObject>::TPtr ExtController; + TObject Object; +public: + TPreparationController(NModifications::IAlterPreparationController<TObject>::TPtr extController, TObject&& object) + : ExtController(extController) + , Object(std::move(object)) + { + + } + + virtual void OnDescriptionFailed(const TString& errorMessage, const TString& /*requestId*/) override { + ExtController->OnPreparationProblem(errorMessage); + } + virtual void OnDescriptionSuccess(NMetadata::NProvider::TTableInfo&& result, const TString& /*requestId*/) override { + if (!result->ColumnTableInfo) { + ExtController->OnPreparationProblem("we cannot use this indexes for data-shard tables"); + } else if (!Object.TryProvideTtl(result->ColumnTableInfo->Description, nullptr)) { + ExtController->OnPreparationProblem("unavailable cs-table ttl type for index construction"); + } else { + ExtController->OnPreparationFinished({ std::move(Object) }); + } + } +}; + +void TManager::DoPrepareObjectsBeforeModification(std::vector<TObject>&& patchedObjects, + NModifications::IAlterPreparationController<TObject>::TPtr controller, + const TInternalModificationContext& /*context*/) const { + if (patchedObjects.size() != 1) { + controller->OnPreparationProblem("modification possible for one object only"); + return; + } + const TString path = patchedObjects.front().GetTablePath(); + std::shared_ptr<TPreparationController> pController = std::make_shared<TPreparationController>(controller, std::move(patchedObjects.front())); + TActivationContext::Register(new NProvider::TSchemeDescriptionActor(pController, "", path)); +} + +NModifications::TOperationParsingResult TManager::DoBuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings, + TInternalModificationContext& context) const { + if (context.GetActivityType() == IOperationsManager::EActivityType::Alter) { + return "index modification currently unsupported"; + } else { + NInternal::TTableRecord result; + TStringBuf sb(settings.GetObjectId().data(), settings.GetObjectId().size()); + TStringBuf l; + TStringBuf r; + if (!sb.TrySplit(':', l, r)) { + return "incorrect objectId format (path:index_id)"; + } + result.SetColumn(TObject::TDecoder::TablePath, NInternal::TYDBValue::Utf8(l)); + result.SetColumn(TObject::TDecoder::IndexId, NInternal::TYDBValue::Utf8(r)); + + if (context.GetActivityType() == IOperationsManager::EActivityType::Drop) { + context.SetActivityType(IOperationsManager::EActivityType::Alter); + result.SetColumn(TObject::TDecoder::Delete, NInternal::TYDBValue::Bool(true)); + } else if (context.GetActivityType() == IOperationsManager::EActivityType::Create) { + if (auto dValue = settings.GetFeature<bool>(TObject::TDecoder::Delete, false)) { + result.SetColumn(TObject::TDecoder::Delete, NInternal::TYDBValue::Bool(*dValue)); + } else { + return "'delete' flag is incorrect"; + } + + if (auto extractorStr = settings.GetFeature<TString>(TObject::TDecoder::Extractor)) { + TInterfaceContainer<IIndexExtractor> object; + if (!object.DeserializeFromJson(*extractorStr)) { + return "cannot parse extractor info"; + } + result.SetColumn(TObject::TDecoder::Extractor, NInternal::TYDBValue::Utf8(object.SerializeToJson().GetStringRobust())); + } else { + return "cannot found extractor info"; + } + if (auto aValue = settings.GetFeature<bool>(TObject::TDecoder::Active, false)) { + result.SetColumn(TObject::TDecoder::Active, NInternal::TYDBValue::Bool(*aValue)); + } else { + return "'active' flag is incorrect"; + } + } + + return result; + } +} + +} diff --git a/ydb/services/ext_index/metadata/manager.h b/ydb/services/ext_index/metadata/manager.h new file mode 100644 index 0000000000..8a97b4e561 --- /dev/null +++ b/ydb/services/ext_index/metadata/manager.h @@ -0,0 +1,21 @@ +#pragma once +#include "object.h" +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/services/metadata/manager/generic_manager.h> +#include <ydb/library/accessor/accessor.h> + +namespace NKikimr::NMetadata::NCSIndex { + +class TManager: public NModifications::TGenericOperationsManager<TObject> { +protected: + virtual void DoPrepareObjectsBeforeModification(std::vector<TObject>&& patchedObjects, + NModifications::IAlterPreparationController<TObject>::TPtr controller, + const TInternalModificationContext& context) const override; + + virtual NModifications::TOperationParsingResult DoBuildPatchFromSettings( + const NYql::TObjectSettingsImpl& settings, TInternalModificationContext& context) const override; + +public: +}; + +} diff --git a/ydb/services/ext_index/metadata/object.cpp b/ydb/services/ext_index/metadata/object.cpp new file mode 100644 index 0000000000..85185cd5bc --- /dev/null +++ b/ydb/services/ext_index/metadata/object.cpp @@ -0,0 +1,92 @@ +#include "object.h" +#include "behaviour.h" +#include <ydb/core/tx/sharding/sharding.h> +#include <ydb/services/metadata/manager/ydb_value_operator.h> + +#include <util/folder/path.h> + +namespace NKikimr::NMetadata::NCSIndex { + +TExtractorCityHash64::TFactory::TRegistrator<TExtractorCityHash64> TExtractorCityHash64::Registrator(TExtractorCityHash64::ClassName); + +IClassBehaviour::TPtr TObject::GetBehaviour() { + return TBehaviour::GetInstance(); +} + +bool TObject::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue) { + if (!decoder.Read(decoder.GetIndexIdIdx(), IndexId, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetTablePathIdx(), TablePath, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetActiveIdx(), ActiveFlag, rawValue)) { + return false; + } + if (!decoder.Read(decoder.GetDeleteIdx(), DeleteFlag, rawValue)) { + return false; + } + if (!decoder.ReadFromJson(decoder.GetExtractorIdx(), Extractor, rawValue)) { + return false; + } + return true; +} + +NKikimr::NMetadata::NInternal::TTableRecord TObject::SerializeToRecord() const { + NInternal::TTableRecord result; + result.SetColumn(TDecoder::IndexId, NInternal::TYDBValue::Utf8(IndexId)); + result.SetColumn(TDecoder::TablePath, NInternal::TYDBValue::Utf8(TablePath)); + result.SetColumn(TDecoder::Delete, NInternal::TYDBValue::Bool(DeleteFlag)); + result.SetColumn(TDecoder::Active, NInternal::TYDBValue::Bool(ActiveFlag)); + result.SetColumn(TDecoder::Extractor, NInternal::TYDBValue::Utf8(Extractor.SerializeToJson().GetStringRobust())); + return result; +} + +std::optional<NKikimr::NMetadata::NCSIndex::TObject> TObject::Build(const TString& tablePath, const TString& indexId, IIndexExtractor::TPtr extractor) { + if (!extractor) { + return {}; + } + if (!tablePath || !indexId) { + return {}; + } + TObject result; + result.TablePath = TFsPath(tablePath).Fix().GetPath(); + result.IndexId = indexId; + result.Extractor = TInterfaceContainer<IIndexExtractor>(extractor); + return result; +} + +TString TObject::GetIndexTablePath() const { + return GetBehaviour()->GetStorageTableDirectory() + "/" + GetUniqueId(); +} + +bool TObject::TryProvideTtl(const NKikimrSchemeOp::TColumnTableDescription& csDescription, Ydb::Table::CreateTableRequest* cRequest) { + if (csDescription.HasTtlSettings() && csDescription.GetTtlSettings().HasEnabled()) { + auto& ttl = csDescription.GetTtlSettings().GetEnabled(); + if (!ttl.HasExpireAfterSeconds()) { + return false; + } + bool found = false; + for (auto&& i : csDescription.GetSchema().GetKeyColumnNames()) { + if (ttl.GetColumnName() == i) { + found = true; + } + } + if (!found) { + return false; + } + if (cRequest) { + auto& newTtl = *cRequest->mutable_ttl_settings()->mutable_date_type_column(); + newTtl.set_column_name("pk_" + ttl.GetColumnName()); + newTtl.set_expire_after_seconds(ttl.GetExpireAfterSeconds()); + } + } + return true; +} + +std::vector<ui64> TExtractorCityHash64::DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const { + NSharding::THashSharding hashSharding(0, Fields); + return hashSharding.MakeHashes(batch); +} + +} diff --git a/ydb/services/ext_index/metadata/object.h b/ydb/services/ext_index/metadata/object.h new file mode 100644 index 0000000000..f188029d05 --- /dev/null +++ b/ydb/services/ext_index/metadata/object.h @@ -0,0 +1,203 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <ydb/services/metadata/abstract/decoder.h> +#include <ydb/services/metadata/abstract/kqp_common.h> +#include <ydb/services/metadata/manager/object.h> +#include <ydb/public/api/protos/ydb_value.pb.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <library/cpp/object_factory/object_factory.h> +#include <library/cpp/json/writer/json_value.h> +#include <library/cpp/json/json_reader.h> +#include <util/string/cast.h> + +namespace NKikimr::NMetadata::NCSIndex { + +class IIndexExtractor { +protected: + virtual std::vector<ui64> DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0; + virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0; + virtual NJson::TJsonValue DoSerializeToJson() const = 0; +public: + using TPtr = std::shared_ptr<IIndexExtractor>; + using TFactory = NObjectFactory::TObjectFactory<IIndexExtractor, TString>; + + virtual ~IIndexExtractor() = default; + + std::vector<ui64> ExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const { + return DoExtractIndex(batch); + } + + bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + return DoDeserializeFromJson(jsonInfo); + } + + NJson::TJsonValue SerializeToJson() const { + return DoSerializeToJson(); + } + + virtual TString GetClassName() const = 0; +}; + +class TExtractorCityHash64: public IIndexExtractor { +private: + YDB_READONLY_DEF(std::vector<TString>, Fields); + static TFactory::TRegistrator<TExtractorCityHash64> Registrator; +protected: + virtual std::vector<ui64> DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const override; + virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override { + const NJson::TJsonValue::TArray* jsonFields; + if (!jsonInfo["fields"].GetArrayPointer(&jsonFields)) { + return false; + } + for (auto&& i : *jsonFields) { + TString fieldId; + if (!i["id"].GetString(&fieldId)) { + return false; + } + Fields.emplace_back(fieldId); + } + if (Fields.size() == 0) { + return false; + } + return true; + } + virtual NJson::TJsonValue DoSerializeToJson() const override { + NJson::TJsonValue result; + auto& jsonFields = result.InsertValue("fields", NJson::JSON_ARRAY); + for (auto&& i : Fields) { + auto& jsonField = jsonFields.AppendValue(NJson::JSON_MAP); + jsonField.InsertValue("id", i); + } + return result; + } +public: + static inline TString ClassName = "city64"; + + virtual TString GetClassName() const override { + return ClassName; + } +}; + +template <class TInterface> +class TInterfaceContainer { +private: + using TPtr = typename TInterface::TPtr; + using TFactory = typename TInterface::TFactory; + TPtr Object; +public: + TInterfaceContainer() = default; + + explicit TInterfaceContainer(TPtr object) + : Object(object) + { + + } + + const TInterface* operator->() const { + return Object.get(); + } + + TInterface* operator->() { + return Object.get(); + } + + TString DebugString() const { + return SerializeToJson().GetStringRobust(); + } + + bool DeserializeFromJson(const TString& jsonString) { + NJson::TJsonValue jsonInfo; + if (!NJson::ReadJsonFastTree(jsonString, &jsonInfo)) { + return false; + } + return DeserializeFromJson(jsonInfo); + } + + bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) { + TString className; + if (!jsonInfo["class_name"].GetString(&className)) { + return false; + } + TPtr result(TFactory::Construct(className)); + if (!result) { + return false; + } + if (!result->DeserializeFromJson(jsonInfo["object"])) { + return false; + } + Object = result; + return true; + } + + NJson::TJsonValue SerializeToJson() const { + if (!Object) { + return NJson::JSON_NULL; + } + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("class_name", Object->GetClassName()); + result.InsertValue("object", Object->SerializeToJson()); + return result; + } +}; + +class TObject: public NModifications::TObject<TObject> { +private: + YDB_READONLY_DEF(TString, IndexId); + + YDB_READONLY_DEF(TString, TablePath); + YDB_READONLY_FLAG(Active, false); + YDB_READONLY_FLAG(Delete, false); + YDB_READONLY_DEF(TInterfaceContainer<IIndexExtractor>, Extractor); +public: + + bool TryProvideTtl(const NKikimrSchemeOp::TColumnTableDescription& csDescription, Ydb::Table::CreateTableRequest* cRequest); + + TString DebugString() const { + return TablePath + ";" + IndexId + ";" + Extractor.DebugString() + ";" + ::ToString(ActiveFlag) + ";" + ::ToString(DeleteFlag); + } + + TObject() = default; + static std::optional<TObject> Build(const TString& tablePath, const TString& indexId, IIndexExtractor::TPtr extractor); + + TString GetIndexTablePath() const; + + TString GetUniqueId() const { + return TablePath + "/" + IndexId; + } + + class TDecoder: public NInternal::TDecoderBase { + private: + YDB_ACCESSOR(i32, IndexIdIdx, -1); + YDB_ACCESSOR(i32, TablePathIdx, -1); + YDB_ACCESSOR(i32, ExtractorIdx, -1); + YDB_ACCESSOR(i32, ActiveIdx, -1); + YDB_ACCESSOR(i32, DeleteIdx, -1); + public: + static inline const TString IndexId = "indexId"; + static inline const TString TablePath = "tablePath"; + static inline const TString Extractor = "extractor"; + static inline const TString Active = "active"; + static inline const TString Delete = "delete"; + + TDecoder(const Ydb::ResultSet& rawData) { + IndexIdIdx = GetFieldIndex(rawData, IndexId); + TablePathIdx = GetFieldIndex(rawData, TablePath); + ExtractorIdx = GetFieldIndex(rawData, Extractor); + ActiveIdx = GetFieldIndex(rawData, Active); + DeleteIdx = GetFieldIndex(rawData, Delete); + } + }; + + static IClassBehaviour::TPtr GetBehaviour(); + + bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue); + + NInternal::TTableRecord SerializeToRecord() const; + + static TString GetTypeId() { + return "CS_EXT_INDEX"; + } +}; + +} diff --git a/ydb/services/ext_index/metadata/snapshot.cpp b/ydb/services/ext_index/metadata/snapshot.cpp new file mode 100644 index 0000000000..f0dd6feea4 --- /dev/null +++ b/ydb/services/ext_index/metadata/snapshot.cpp @@ -0,0 +1,46 @@ +#include "snapshot.h" + +namespace NKikimr::NMetadata::NCSIndex { + +bool TSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) { + Y_VERIFY(rawDataResult.result_sets().size() == 1); + ParseSnapshotObjects<TObject>(rawDataResult.result_sets()[0], [this](TObject&& s) { + const TString tablePath = s.GetTablePath(); + Objects[tablePath].emplace_back(std::move(s)); + }); + return true; +} + +TString TSnapshot::DoSerializeToString() const { + TStringBuilder sb; + sb << "OBJECTS:"; + for (auto&& i : Objects) { + for (auto&& object : i.second) { + sb << object.DebugString() << ";"; + } + } + return sb; +} + +void TSnapshot::GetObjectsForActivity(std::vector<TObject>& activation, std::vector<TObject>& remove) const { + for (auto&& i : Objects) { + for (auto&& object : i.second) { + if (!object.IsActive() && !object.IsDelete()) { + activation.emplace_back(object); + } + if (object.IsDelete()) { + remove.emplace_back(object); + } + } + } +} + +std::vector<NKikimr::NMetadata::NCSIndex::TObject> TSnapshot::GetIndexes(const TString& tablePath) const { + auto it = Objects.find(TFsPath(tablePath).Fix().GetPath()); + if (it == Objects.end()) { + return {}; + } + return it->second; +} + +} diff --git a/ydb/services/ext_index/metadata/snapshot.h b/ydb/services/ext_index/metadata/snapshot.h new file mode 100644 index 0000000000..4df6f37c82 --- /dev/null +++ b/ydb/services/ext_index/metadata/snapshot.h @@ -0,0 +1,23 @@ +#pragma once +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/library/accessor/accessor.h> +#include "object.h" + +namespace NKikimr::NMetadata::NCSIndex { + +class TSnapshot: public NFetcher::ISnapshot { +private: + using TBase = NFetcher::ISnapshot; + using TObjects = std::map<TString, std::vector<TObject>>; + YDB_READONLY_DEF(TObjects, Objects); +protected: + virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override; + virtual TString DoSerializeToString() const override; +public: + using TBase::TBase; + + std::vector<TObject> GetIndexes(const TString& tablePath) const; + void GetObjectsForActivity(std::vector<TObject>& activation, std::vector<TObject>& remove) const; +}; + +} diff --git a/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..5f0e02728b --- /dev/null +++ b/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ext_index-service) +target_link_libraries(services-ext_index-service PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + services-ext_index-metadata + services-ext_index-common + api-protos +) +target_sources(services-ext_index-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_index.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/activation.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/deleting.cpp +) diff --git a/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..814de1a95c --- /dev/null +++ b/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ext_index-service) +target_link_libraries(services-ext_index-service PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + services-ext_index-metadata + services-ext_index-common + api-protos +) +target_sources(services-ext_index-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_index.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/activation.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/deleting.cpp +) diff --git a/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..814de1a95c --- /dev/null +++ b/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt @@ -0,0 +1,26 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ext_index-service) +target_link_libraries(services-ext_index-service PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-actors-core + services-ext_index-metadata + services-ext_index-common + api-protos +) +target_sources(services-ext_index-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_index.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/activation.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/deleting.cpp +) diff --git a/ydb/services/ext_index/service/CMakeLists.txt b/ydb/services/ext_index/service/CMakeLists.txt new file mode 100644 index 0000000000..d90657116d --- /dev/null +++ b/ydb/services/ext_index/service/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..5f0e02728b --- /dev/null +++ b/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt @@ -0,0 +1,25 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-ext_index-service) +target_link_libraries(services-ext_index-service PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + services-ext_index-metadata + services-ext_index-common + api-protos +) +target_sources(services-ext_index-service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_data.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_index.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/executor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/activation.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/deleting.cpp +) diff --git a/ydb/services/ext_index/service/activation.cpp b/ydb/services/ext_index/service/activation.cpp new file mode 100644 index 0000000000..3842b0505b --- /dev/null +++ b/ydb/services/ext_index/service/activation.cpp @@ -0,0 +1,97 @@ +#include "activation.h" + +namespace NKikimr::NCSIndex { + +NKikimr::NMetadata::NRequest::TDialogYQLRequest::TRequest TActivation::BuildUpdateRequest() const { + Ydb::Table::ExecuteDataQueryRequest request; + TStringBuilder sb; + sb << "DECLARE $indexId AS Utf8;" << Endl; + sb << "DECLARE $tablePath AS Utf8;" << Endl; + sb << "UPDATE `" + NMetadata::NCSIndex::TObject::GetBehaviour()->GetStorageTablePath() + "`" << Endl; + sb << "SET active = true" << Endl; + sb << "WHERE indexId = $indexId" << Endl; + sb << "AND tablePath = $tablePath" << Endl; + request.mutable_query()->set_yql_text(sb); + + { + auto& param = (*request.mutable_parameters())["$indexId"]; + param.mutable_value()->set_text_value(Object.GetIndexId()); + param.mutable_type()->set_type_id(Ydb::Type::UTF8); + } + { + auto& param = (*request.mutable_parameters())["$tablePath"]; + param.mutable_value()->set_text_value(Object.GetTablePath()); + param.mutable_type()->set_type_id(Ydb::Type::UTF8); + } + + request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write(); + request.mutable_tx_control()->set_commit_tx(true); + + return request; +} + +void TActivation::OnDescriptionSuccess(NMetadata::NProvider::TTableInfo&& result, const TString& /*requestId*/) { + if (!result->ColumnTableInfo) { + ExternalController->OnActivationFailed("incorrect column table info", RequestId); + SelfContainer = nullptr; + return; + } + Ydb::Table::CreateTableRequest request; + if (!Object.TryProvideTtl(result->ColumnTableInfo->Description, &request)) { + ExternalController->OnActivationFailed("cannot convert ttl method from column tables", RequestId); + SelfContainer = nullptr; + return; + } + + request.set_session_id(""); + request.set_path(Object.GetIndexTablePath()); + request.add_primary_key("index_hash"); + auto pkFields = result.GetPKFields(); + for (auto&& i : pkFields) { + request.add_primary_key("pk_" + i.Name); + } + { + auto& column = *request.add_columns(); + column.set_name("index_hash"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64); + } + for (auto&& i : pkFields) { + auto& column = *request.add_columns(); + column.set_name("pk_" + i.Name); + auto primitiveType = NMetadata::NInternal::TYDBType::ConvertYQLToYDB(i.PType.GetTypeId()); + if (!primitiveType) { + ExternalController->OnActivationFailed("cannot convert type yql -> ydb", RequestId); + SelfContainer = nullptr; + return; + } + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(*primitiveType); + } + NMetadata::NInitializer::TGenericTableModifier<NMetadata::NRequest::TDialogCreateTable> modifier(request, "create"); + modifier.Execute(SelfContainer, Config.GetRequestConfig()); +} + +void TActivation::OnModificationFinished(const TString& modificationId) { + if (modificationId == "create") { + NMetadata::NInitializer::ITableModifier::TPtr modifier = + NMetadata::NInitializer::TACLModifierConstructor::GetReadOnlyModifier(Object.GetIndexTablePath(), "access"); + modifier->Execute(SelfContainer, Config.GetRequestConfig()); + } else if (modificationId == "access") { + TActivationContext::ActorSystem()->Register(new NMetadata::NRequest::TSessionedActorCallback<NMetadata::NRequest::TDialogYQLRequest>( + BuildUpdateRequest(), Config.GetRequestConfig(), NACLib::TSystemUsers::Metadata(), SelfContainer)); + } else { + Y_VERIFY(false); + } +} + +void TActivation::OnModificationFailed(const TString& errorMessage, const TString& /*modificationId*/) { + ExternalController->OnActivationFailed(errorMessage, RequestId); + SelfContainer = nullptr; +} + +void TActivation::Start(std::shared_ptr<TActivation> selfContainer) { + Y_VERIFY(!!selfContainer); + SelfContainer = selfContainer; + TActivationContext::ActorSystem()->Register(new NMetadata::NProvider::TSchemeDescriptionActor(SelfContainer, RequestId, Object.GetTablePath())); +} + +} diff --git a/ydb/services/ext_index/service/activation.h b/ydb/services/ext_index/service/activation.h new file mode 100644 index 0000000000..a7dcaf74ff --- /dev/null +++ b/ydb/services/ext_index/service/activation.h @@ -0,0 +1,68 @@ +#pragma once +#include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/services/ext_index/metadata/object.h> +#include <ydb/services/metadata/ds_table/scheme_describe.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <ydb/services/ext_index/common/config.h> + +namespace NKikimr::NCSIndex { + +class IActivationExternalController { +public: + using TPtr = std::shared_ptr<IActivationExternalController>; + virtual ~IActivationExternalController() = default; + virtual void OnActivationFailed(const TString& errorMessage, const TString& requestId) = 0; + virtual void OnActivationSuccess(const TString& requestId) = 0; +}; + +class TActivation: public NMetadata::NProvider::ISchemeDescribeController, + public NMetadata::NRequest::IExternalController<NMetadata::NRequest::TDialogYQLRequest>, + public NMetadata::NInitializer::IModifierExternalController +{ +private: + mutable std::shared_ptr<TActivation> SelfContainer; + NMetadata::NCSIndex::TObject Object; + IActivationExternalController::TPtr ExternalController; + const TString RequestId; + const TConfig Config; + + NKikimr::NMetadata::NRequest::TDialogYQLRequest::TRequest BuildUpdateRequest() const; + +protected: + virtual void OnDescriptionFailed(const TString& errorMessage, const TString& requestId) override { + ExternalController->OnActivationFailed(errorMessage, requestId); + SelfContainer = nullptr; + } + virtual void OnDescriptionSuccess(NMetadata::NProvider::TTableInfo&& result, const TString& requestId) override; + + virtual void OnModificationFinished(const TString& modificationId) override; + + virtual void OnModificationFailed(const TString& errorMessage, const TString& modificationId) override; + + virtual void OnRequestResult(NMetadata::NRequest::TDialogYQLRequest::TResponse&& /*result*/) override { + ExternalController->OnActivationSuccess(RequestId); + SelfContainer = nullptr; + } + + virtual void OnRequestFailed(const TString& errorMessage) override { + ExternalController->OnActivationFailed(errorMessage, RequestId); + SelfContainer = nullptr; + } +public: + void Start(std::shared_ptr<TActivation> selfContainer); + + TActivation(const NMetadata::NCSIndex::TObject& object, + IActivationExternalController::TPtr externalController, + const TString& requestId, const TConfig& config) + : Object(object) + , ExternalController(externalController) + , RequestId(requestId) + , Config(config) + { + + } + +}; + +} diff --git a/ydb/services/ext_index/service/add_data.cpp b/ydb/services/ext_index/service/add_data.cpp new file mode 100644 index 0000000000..0322940f21 --- /dev/null +++ b/ydb/services/ext_index/service/add_data.cpp @@ -0,0 +1,38 @@ +#include "add_data.h" + +namespace NKikimr::NCSIndex { + +void TDataUpserter::OnDescriptionSuccess(NMetadata::NProvider::TTableInfo&& result, const TString& /*requestId*/) { + Y_VERIFY(SelfContainer); + const std::vector<TString> pkFields = result.GetPKFieldNames(); + AtomicCounter.Inc(); + for (auto&& i : Indexes) { + if (i.IsDelete()) { + ALS_WARN(NKikimrServices::EXT_INDEX) << "extractor is removing"; + } else if (!i.IsActive()) { + ALS_WARN(NKikimrServices::EXT_INDEX) << "extractor not active yet"; + } else { + ALS_DEBUG(NKikimrServices::EXT_INDEX) << "add data"; + AtomicCounter.Inc(); + TActivationContext::ActorSystem()->Register(new TIndexUpsertActor(Data, i, pkFields, i.GetIndexTablePath(), SelfContainer)); + } + } + OnIndexUpserted(); +} + +void TDataUpserter::Start(std::shared_ptr<TDataUpserter> selfContainer) { + if (Indexes.empty()) { + ExternalController->OnAllIndexesUpserted(); + } else { + for (auto&& i : Indexes) { + if (Indexes.front().GetTablePath() != i.GetTablePath()) { + ExternalController->OnAllIndexesUpsertionFailed("inconsistency tables list in indexes"); + return; + } + } + SelfContainer = selfContainer; + TActivationContext::ActorSystem()->Register(new NMetadata::NProvider::TSchemeDescriptionActor(SelfContainer, "", Indexes.front().GetTablePath())); + } +} + +} diff --git a/ydb/services/ext_index/service/add_data.h b/ydb/services/ext_index/service/add_data.h new file mode 100644 index 0000000000..3f9015320c --- /dev/null +++ b/ydb/services/ext_index/service/add_data.h @@ -0,0 +1,55 @@ +#pragma once +#include <ydb/services/ext_index/metadata/object.h> +#include <ydb/services/ext_index/common/service.h> +#include <ydb/services/metadata/ds_table/scheme_describe.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <ydb/core/tx/tx_proxy/proxy.h> +#include "add_index.h" + +namespace NKikimr::NCSIndex { + +class TDataUpserter: + public IIndexUpsertController, + public NMetadata::NProvider::ISchemeDescribeController { +private: + mutable std::shared_ptr<TDataUpserter> SelfContainer; + std::vector<NMetadata::NCSIndex::TObject> Indexes; + mutable TAtomicCounter AtomicCounter = 0; + IDataUpsertController::TPtr ExternalController; + std::shared_ptr<arrow::RecordBatch> Data; +public: + TDataUpserter(std::vector<NMetadata::NCSIndex::TObject>&& indexes, + IDataUpsertController::TPtr externalController, std::shared_ptr<arrow::RecordBatch> data) + : Indexes(std::move(indexes)) + , ExternalController(externalController) + , Data(data) + { + + } + + virtual void OnIndexUpserted() override { + if (SelfContainer && AtomicCounter.Dec() == 0) { + ExternalController->OnAllIndexesUpserted(); + SelfContainer = nullptr; + } + } + virtual void OnIndexUpsertionFailed(const TString& errorMessage) override { + if (SelfContainer) { + ExternalController->OnAllIndexesUpsertionFailed("IndexUpsertion:" + errorMessage); + SelfContainer = nullptr; + } + } + + virtual void OnDescriptionFailed(const TString& errorMessage, const TString& /*requestId*/) override { + if (SelfContainer) { + ExternalController->OnAllIndexesUpsertionFailed("SchemeDescription:" + errorMessage); + SelfContainer = nullptr; + } + } + + virtual void OnDescriptionSuccess(NMetadata::NProvider::TTableInfo&& result, const TString& requestId) override; + + void Start(std::shared_ptr<TDataUpserter> selfContainer); +}; + +} diff --git a/ydb/services/ext_index/service/add_index.cpp b/ydb/services/ext_index/service/add_index.cpp new file mode 100644 index 0000000000..d8c2c1dd51 --- /dev/null +++ b/ydb/services/ext_index/service/add_index.cpp @@ -0,0 +1,127 @@ +#include "add_index.h" +#include <ydb/core/tx/tx_proxy/upload_rows.h> +#include <ydb/core/formats/arrow_helpers.h> +#include <ydb/core/scheme/scheme_tablecell.h> + +#include <ydb/public/api/protos/ydb_status_codes.pb.h> +#include <ydb/services/metadata/manager/ydb_value_operator.h> + +namespace NKikimr::NCSIndex { + +namespace { +class TCellsWriter: public NArrow::IRowWriter { +private: + std::shared_ptr<NTxProxy::TUploadRows> Rows = std::make_shared<NTxProxy::TUploadRows>(); + const std::vector<ui64>& Hashes; + ui32 Index = 0; + const ui32 PKColumnsCount; +public: + TCellsWriter(const ui32 rowsCount, const ui32 pkColumnsCount, const std::vector<ui64>& hashes) + : Hashes(hashes) + , PKColumnsCount(pkColumnsCount) + { + Rows->reserve(rowsCount); + } + + std::shared_ptr<NTxProxy::TUploadRows> GetRows() const { + return Rows; + } + + virtual void AddRow(const TConstArrayRef<TCell>& cells) override { + TVector<TCell> key; + key.reserve(1 + PKColumnsCount); + key.emplace_back(TCell::Make(Hashes[Index])); + Y_VERIFY(PKColumnsCount == cells.size()); + for (auto&& c : cells) { + key.emplace_back(c); + } + TSerializedCellVec serializedKey(TSerializedCellVec::Serialize(key)); + Rows->emplace_back(serializedKey, ""); + ++Index; + } +}; +} + +void TIndexUpsertActor::Bootstrap() { + std::shared_ptr<NTxProxy::TUploadTypes> types = std::make_shared<NTxProxy::TUploadTypes>(); + std::vector<std::pair<TString, NScheme::TTypeInfo>> yqlTypes; + + std::vector<std::shared_ptr<arrow::Array>> pkColumns; + for (auto&& i : PKFields) { + auto f = Data->schema()->GetFieldByName(i); + if (!f) { + ExternalController->OnIndexUpsertionFailed("incorrect field for pk"); + PassAway(); + return; + } + pkColumns.emplace_back(Data->GetColumnByName(i)); + switch (f->type()->id()) { + case arrow::Type::INT64: + types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::INT64)); + break; + case arrow::Type::INT32: + types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::INT32)); + break; + case arrow::Type::STRING: + types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::UTF8)); + break; + case arrow::Type::BINARY: + types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::STRING)); + break; + case arrow::Type::UINT64: + types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::UINT64)); + break; + case arrow::Type::UINT32: + types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::UINT32)); + break; + case arrow::Type::TIMESTAMP: + types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::TIMESTAMP)); + break; + default: + ExternalController->OnIndexUpsertionFailed("incorrect type for pk column"); + PassAway(); + return; + } + } + + const std::vector<ui64> hashes = IndexInfo.GetExtractor()->ExtractIndex(Data); + if (hashes.size() != (size_t)Data->num_rows()) { + ExternalController->OnIndexUpsertionFailed("inconsistency hashes"); + PassAway(); + return; + } + TCellsWriter writer(Data->num_rows(), pkColumns.size(), hashes); + { + auto yqlTypes = NMetadata::NInternal::TYDBType::ConvertYDBToYQL(*types); + if (!yqlTypes) { + ExternalController->OnIndexUpsertionFailed("cannot convert types ydb -> yql"); + PassAway(); + return; + } + NArrow::TArrowToYdbConverter ydbConverter(*yqlTypes, writer); + TString errorMessage; + if (!ydbConverter.Process(*Data, errorMessage)) { + ExternalController->OnIndexUpsertionFailed(errorMessage); + PassAway(); + return; + } + } + for (auto&& i : *types) { + i.first = "pk_" + i.first; + } + types->insert(types->begin(), std::make_pair("index_hash", NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::UINT64))); + auto actor = NTxProxy::CreateUploadRowsInternal(SelfId(), IndexTablePath, types, writer.GetRows()); + Become(&TIndexUpsertActor::StateMain); + Register(actor); +} + +void TIndexUpsertActor::Handle(TEvTxUserProxy::TEvUploadRowsResponse::TPtr& ev) { + auto g = PassAwayGuard(); + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + ExternalController->OnIndexUpserted(); + } else { + ExternalController->OnIndexUpsertionFailed("incorrect response code:" + ev->Get()->Issues.ToString()); + } +} + +} diff --git a/ydb/services/ext_index/service/add_index.h b/ydb/services/ext_index/service/add_index.h new file mode 100644 index 0000000000..cc3e23ce66 --- /dev/null +++ b/ydb/services/ext_index/service/add_index.h @@ -0,0 +1,51 @@ +#pragma once +#include <ydb/services/ext_index/metadata/object.h> +#include <ydb/services/metadata/ds_table/scheme_describe.h> +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/services/ext_index/common/service.h> + +namespace NKikimr::NCSIndex { + +class IIndexUpsertController { +public: + using TPtr = std::shared_ptr<IIndexUpsertController>; + virtual void OnIndexUpserted() = 0; + virtual void OnIndexUpsertionFailed(const TString& errorMessage) = 0; +}; + +class TIndexUpsertActor: public NActors::TActorBootstrapped<TIndexUpsertActor> { +private: + std::shared_ptr<arrow::RecordBatch> Data; + const NMetadata::NCSIndex::TObject IndexInfo; + std::vector<TString> PKFields; + TString IndexTablePath; + IIndexUpsertController::TPtr ExternalController; +protected: + void Handle(TEvTxUserProxy::TEvUploadRowsResponse::TPtr& ev); + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxUserProxy::TEvUploadRowsResponse, Handle); + default: + break; + } + } + +public: + void Bootstrap(); + + TIndexUpsertActor(const std::shared_ptr<arrow::RecordBatch>& data, const NMetadata::NCSIndex::TObject& indexInfo, + const std::vector<TString>& pKFields, const TString& indexTablePath, IIndexUpsertController::TPtr externalController) + : Data(data) + , IndexInfo(indexInfo) + , PKFields(pKFields) + , IndexTablePath(indexTablePath) + , ExternalController(externalController) + { + + } + +}; + +} diff --git a/ydb/services/ext_index/service/deleting.cpp b/ydb/services/ext_index/service/deleting.cpp new file mode 100644 index 0000000000..a357b0b146 --- /dev/null +++ b/ydb/services/ext_index/service/deleting.cpp @@ -0,0 +1,54 @@ +#include "deleting.h" +#include <ydb/services/metadata/initializer/common.h> + +namespace NKikimr::NCSIndex { + +NKikimr::NMetadata::NRequest::TDialogYQLRequest::TRequest TDeleting::BuildDeleteRequest() const { + Ydb::Table::ExecuteDataQueryRequest request; + TStringBuilder sb; + sb << "DECLARE $indexId AS Utf8;" << Endl; + sb << "DECLARE $tablePath AS Utf8;" << Endl; + sb << "DELETE FROM `" + NMetadata::NCSIndex::TObject::GetBehaviour()->GetStorageTablePath() + "`" << Endl; + sb << "WHERE indexId = $indexId" << Endl; + sb << "AND tablePath = $tablePath" << Endl; + request.mutable_query()->set_yql_text(sb); + + { + auto& param = (*request.mutable_parameters())["$indexId"]; + param.mutable_value()->set_text_value(Object.GetIndexId()); + param.mutable_type()->set_type_id(Ydb::Type::UTF8); + } + { + auto& param = (*request.mutable_parameters())["$tablePath"]; + param.mutable_value()->set_text_value(Object.GetTablePath()); + param.mutable_type()->set_type_id(Ydb::Type::UTF8); + } + + request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write(); + request.mutable_tx_control()->set_commit_tx(true); + + return request; +} + +void TDeleting::Start(std::shared_ptr<TDeleting> selfContainer) { + Y_VERIFY(!!selfContainer); + SelfContainer = selfContainer; + + Ydb::Table::DropTableRequest request; + request.set_session_id(""); + request.set_path(Object.GetIndexTablePath()); + NMetadata::NInitializer::TGenericTableModifier<NMetadata::NRequest::TDialogDropTable> dropTable(request, "drop"); + dropTable.Execute(SelfContainer, Config.GetRequestConfig()); +} + +void TDeleting::OnModificationFinished(const TString& /*modificationId*/) { + TActivationContext::ActorSystem()->Register(new NMetadata::NRequest::TSessionedActorCallback<NMetadata::NRequest::TDialogYQLRequest>( + BuildDeleteRequest(), Config.GetRequestConfig(), NACLib::TSystemUsers::Metadata(), SelfContainer)); +} + +void TDeleting::OnModificationFailed(const TString& errorMessage, const TString& /*modificationId*/) { + ExternalController->OnDeletingFailed(errorMessage, RequestId); + SelfContainer = nullptr; +} + +} diff --git a/ydb/services/ext_index/service/deleting.h b/ydb/services/ext_index/service/deleting.h new file mode 100644 index 0000000000..61ac740c5a --- /dev/null +++ b/ydb/services/ext_index/service/deleting.h @@ -0,0 +1,62 @@ +#pragma once +#include <ydb/core/tx/tx_proxy/proxy.h> + +#include <ydb/services/metadata/request/request_actor.h> +#include <ydb/services/ext_index/metadata/object.h> + +#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h> +#include <ydb/services/ext_index/common/config.h> + +namespace NKikimr::NCSIndex { + +class IDeletingExternalController { +public: + using TPtr = std::shared_ptr<IDeletingExternalController>; + virtual ~IDeletingExternalController() = default; + virtual void OnDeletingFailed(const TString& errorMessage, const TString& requestId) = 0; + virtual void OnDeletingSuccess(const TString& requestId) = 0; +}; + +class TDeleting: + public NMetadata::NRequest::IExternalController<NMetadata::NRequest::TDialogYQLRequest>, + public NMetadata::NInitializer::IModifierExternalController +{ +private: + std::shared_ptr<TDeleting> SelfContainer; + NMetadata::NCSIndex::TObject Object; + IDeletingExternalController::TPtr ExternalController; + const TString RequestId; + const TConfig Config; + + NKikimr::NMetadata::NRequest::TDialogYQLRequest::TRequest BuildDeleteRequest() const; + +protected: + virtual void OnModificationFinished(const TString& modificationId) override; + + virtual void OnModificationFailed(const TString& errorMessage, const TString& modificationId) override; + + virtual void OnRequestFailed(const TString& errorMessage) override { + ExternalController->OnDeletingFailed(errorMessage, RequestId); + SelfContainer = nullptr; + } + + virtual void OnRequestResult(NMetadata::NRequest::TDialogYQLRequest::TResponse&& /*result*/) override { + ExternalController->OnDeletingSuccess(RequestId); + SelfContainer = nullptr; + } +public: + void Start(std::shared_ptr<TDeleting> selfContainer); + + TDeleting(const NMetadata::NCSIndex::TObject& object, + IDeletingExternalController::TPtr externalController, const TString& requestId, const TConfig& config) + : Object(object) + , ExternalController(externalController) + , RequestId(requestId) + , Config(config) + { + + } + +}; + +} diff --git a/ydb/services/ext_index/service/executor.cpp b/ydb/services/ext_index/service/executor.cpp new file mode 100644 index 0000000000..8491b52c80 --- /dev/null +++ b/ydb/services/ext_index/service/executor.cpp @@ -0,0 +1,116 @@ +#include "executor.h" + +#include <ydb/services/ext_index/metadata/fetcher.h> +#include <ydb/services/metadata/initializer/fetcher.h> +#include <ydb/services/metadata/initializer/manager.h> +#include <ydb/services/metadata/service.h> +#include <ydb/services/ext_index/common/service.h> +#include "add_data.h" +#include "activation.h" +#include "deleting.h" + +namespace NKikimr::NCSIndex { + +void TExecutor::Handle(TEvAddData::TPtr& ev) { + if (CheckActivity()) { + std::vector<NMetadata::NCSIndex::TObject> indexes = IndexesSnapshot->GetIndexes(ev->Get()->GetTablePath()); + std::shared_ptr<TDataUpserter> upserter = std::make_shared<TDataUpserter>(std::move(indexes), ev->Get()->GetExternalController(), ev->Get()->GetData()); + upserter->Start(upserter); + } else { + if (!DeferredEventsOnIntialization.Add(*ev)) { + Send(ev->Sender, new TEvAddDataResult("too big queue for index construction")); + } + } +} + +void TExecutor::Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr& /*ev*/) { + ActivityState = EActivity::Active; + if (IndexesSnapshot) { + DeferredEventsOnIntialization.ResendAll(SelfId()); + } +} + +class TIndexesController: public IActivationExternalController, public IDeletingExternalController { +public: + virtual void OnActivationFailed(const TString& errorMessage, const TString& requestId) override { + ALS_ERROR(NKikimrServices::EXT_INDEX) << "cannot activate index for " << requestId << ": " << errorMessage; + } + virtual void OnActivationSuccess(const TString& requestId) override { + ALS_NOTICE(NKikimrServices::EXT_INDEX) << "index activated " << requestId; + } + virtual void OnDeletingFailed(const TString& errorMessage, const TString& requestId) override { + ALS_ERROR(NKikimrServices::EXT_INDEX) << "cannot remove index for " << requestId << ": " << errorMessage; + } + virtual void OnDeletingSuccess(const TString& requestId) override { + ALS_NOTICE(NKikimrServices::EXT_INDEX) << "index deleted " << requestId; + } + +}; + +void TExecutor::Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { + { + auto snapshot = ev->Get()->GetSnapshotPtrAs<NMetadata::NInitializer::TSnapshot>(); + if (snapshot) { + if (snapshot->HasComponent("ext_index")) { + CheckActivity(); + } + return; + } + } + { + auto snapshot = ev->Get()->GetSnapshotPtrAs<NMetadata::NCSIndex::TSnapshot>(); + if (snapshot) { + IndexesSnapshot = snapshot; + if (CheckActivity()) { + DeferredEventsOnIntialization.ResendAll(SelfId()); + } + std::vector<NMetadata::NCSIndex::TObject> inactiveIndexes; + std::vector<NMetadata::NCSIndex::TObject> deletingIndexes; + IndexesSnapshot->GetObjectsForActivity(inactiveIndexes, deletingIndexes); + auto controller = std::make_shared<TIndexesController>(); + for (auto&& i : inactiveIndexes) { + auto activation = std::make_shared<TActivation>(i, controller, i.GetUniqueId(), Config); + activation->Start(activation); + } + for (auto&& i : deletingIndexes) { + auto deleting = std::make_shared<TDeleting>(i, controller, i.GetUniqueId(), Config); + deleting->Start(deleting); + } + return; + } + } + Y_VERIFY(false, "unexpected snapshot"); +} + +void TExecutor::Bootstrap() { + Become(&TExecutor::StateMain); + auto managerInitializer = std::make_shared<NMetadata::NInitializer::TFetcher>(); + Y_VERIFY(NMetadata::NProvider::TServiceOperator::IsEnabled(), "metadata service not active"); + Sender<NMetadata::NProvider::TEvSubscribeExternal>(managerInitializer).SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId())); + + auto managerIndexes = std::make_shared<NMetadata::NCSIndex::TFetcher>(); + Sender<NMetadata::NProvider::TEvSubscribeExternal>(managerIndexes).SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId())); +} + +bool TExecutor::CheckActivity() { + switch (ActivityState) { + case EActivity::Created: + ActivityState = EActivity::Preparation; + Sender<NMetadata::NProvider::TEvPrepareManager>(NMetadata::NCSIndex::TObject::GetBehaviour()).SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId())); + break; + case EActivity::Preparation: + break; + case EActivity::Active: + if (!IndexesSnapshot) { + return false; + } + return true; + } + return false; +} + +NActors::IActor* CreateService(const TConfig& config) { + return new TExecutor(config); +} + +} diff --git a/ydb/services/ext_index/service/executor.h b/ydb/services/ext_index/service/executor.h new file mode 100644 index 0000000000..bd93c53fc2 --- /dev/null +++ b/ydb/services/ext_index/service/executor.h @@ -0,0 +1,58 @@ +#pragma once +#include <ydb/services/ext_index/common/config.h> + +#include <ydb/services/metadata/initializer/accessor_init.h> +#include <ydb/services/metadata/ds_table/service.h> +#include <ydb/services/metadata/service.h> +#include <ydb/services/ext_index/metadata/snapshot.h> +#include <ydb/services/ext_index/common/service.h> + +namespace NKikimr::NCSIndex { + +class TExecutor: public NActors::TActorBootstrapped<TExecutor> { +private: + using TBase = NActors::TActorBootstrapped<TExecutor>; + TString TableName; + const TString ExecutorId = TGUID::CreateTimebased().AsUuidString(); + const TConfig Config; + std::set<TString> CurrentTaskIds; + NMetadata::NProvider::TEventsWaiter DeferredEventsOnIntialization; + std::shared_ptr<NMetadata::NCSIndex::TSnapshot> IndexesSnapshot; + + enum class EActivity { + Created, + Preparation, + Active + }; + + EActivity ActivityState = EActivity::Created; + + bool CheckActivity(); + +protected: + void Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr& ev); + void Handle(TEvAddData::TPtr& ev); + void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev); + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(NMetadata::NProvider::TEvManagerPrepared, Handle); + hFunc(TEvAddData, Handle); + hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle); + default: + break; + } + } + +public: + void Bootstrap(); + + TExecutor(const TConfig& config) + : Config(config) { + TServiceOperator::Register(Config); + } +}; + +IActor* CreateService(const TConfig& config); + +} diff --git a/ydb/services/ext_index/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/ut/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..e586329507 --- /dev/null +++ b/ydb/services/ext_index/ut/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,82 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-ext_index-ut) +target_compile_options(ydb-services-ext_index-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-ext_index-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index +) +target_link_libraries(ydb-services-ext_index-ut PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-system + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-ext_index + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + core-testlib-default + ydb-services-metadata + public-lib-yson_value +) +target_link_options(ydb-services-ext_index-ut PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-services-ext_index-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/ut/ut_ext_index.cpp +) +set_property( + TARGET + ydb-services-ext_index-ut + PROPERTY + SPLIT_FACTOR + 60 +) +add_yunittest( + NAME + ydb-services-ext_index-ut + TEST_TARGET + ydb-services-ext_index-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-ext_index-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-ext_index-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-ext_index-ut + PROPERTY + TIMEOUT + 600 +) +vcs_info(ydb-services-ext_index-ut) diff --git a/ydb/services/ext_index/ut/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/ut/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..0b356b7816 --- /dev/null +++ b/ydb/services/ext_index/ut/CMakeLists.linux-aarch64.txt @@ -0,0 +1,84 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-ext_index-ut) +target_compile_options(ydb-services-ext_index-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-ext_index-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index +) +target_link_libraries(ydb-services-ext_index-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-malloc-jemalloc + cpp-testing-unittest_main + ydb-services-ext_index + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + core-testlib-default + ydb-services-metadata + public-lib-yson_value +) +target_link_options(ydb-services-ext_index-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-services-ext_index-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/ut/ut_ext_index.cpp +) +set_property( + TARGET + ydb-services-ext_index-ut + PROPERTY + SPLIT_FACTOR + 60 +) +add_yunittest( + NAME + ydb-services-ext_index-ut + TEST_TARGET + ydb-services-ext_index-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-ext_index-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-ext_index-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-ext_index-ut + PROPERTY + TIMEOUT + 600 +) +vcs_info(ydb-services-ext_index-ut) diff --git a/ydb/services/ext_index/ut/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/ut/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..37e0035862 --- /dev/null +++ b/ydb/services/ext_index/ut/CMakeLists.linux-x86_64.txt @@ -0,0 +1,86 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-ext_index-ut) +target_compile_options(ydb-services-ext_index-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-ext_index-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index +) +target_link_libraries(ydb-services-ext_index-ut PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-ext_index + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + core-testlib-default + ydb-services-metadata + public-lib-yson_value +) +target_link_options(ydb-services-ext_index-ut PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-services-ext_index-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/ut/ut_ext_index.cpp +) +set_property( + TARGET + ydb-services-ext_index-ut + PROPERTY + SPLIT_FACTOR + 60 +) +add_yunittest( + NAME + ydb-services-ext_index-ut + TEST_TARGET + ydb-services-ext_index-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-ext_index-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-ext_index-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-ext_index-ut + PROPERTY + TIMEOUT + 600 +) +vcs_info(ydb-services-ext_index-ut) diff --git a/ydb/services/ext_index/ut/CMakeLists.txt b/ydb/services/ext_index/ut/CMakeLists.txt new file mode 100644 index 0000000000..d90657116d --- /dev/null +++ b/ydb/services/ext_index/ut/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/services/ext_index/ut/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/ut/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..0b60ddd3d8 --- /dev/null +++ b/ydb/services/ext_index/ut/CMakeLists.windows-x86_64.txt @@ -0,0 +1,74 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-ext_index-ut) +target_compile_options(ydb-services-ext_index-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-ext_index-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index +) +target_link_libraries(ydb-services-ext_index-ut PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-system + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-ext_index + library-cpp-getopt + cpp-regex-pcre + library-cpp-svnversion + core-testlib-default + ydb-services-metadata + public-lib-yson_value +) +target_sources(ydb-services-ext_index-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/ut/ut_ext_index.cpp +) +set_property( + TARGET + ydb-services-ext_index-ut + PROPERTY + SPLIT_FACTOR + 60 +) +add_yunittest( + NAME + ydb-services-ext_index-ut + TEST_TARGET + ydb-services-ext_index-ut + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-services-ext_index-ut + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-services-ext_index-ut + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-services-ext_index-ut + PROPERTY + TIMEOUT + 600 +) +vcs_info(ydb-services-ext_index-ut) diff --git a/ydb/services/ext_index/ut/ut_ext_index.cpp b/ydb/services/ext_index/ut/ut_ext_index.cpp new file mode 100644 index 0000000000..74ffdf3259 --- /dev/null +++ b/ydb/services/ext_index/ut/ut_ext_index.cpp @@ -0,0 +1,146 @@ +#include <ydb/core/cms/console/configs_dispatcher.h> +#include <ydb/core/testlib/cs_helper.h> +#include <ydb/core/tx/tiering/external_data.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/wrappers/ut_helpers/s3_mock.h> +#include <ydb/core/wrappers/s3_wrapper.h> +#include <ydb/core/wrappers/fake_storage.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/services/metadata/manager/alter.h> +#include <ydb/services/metadata/manager/common.h> +#include <ydb/services/metadata/manager/table_record.h> +#include <ydb/services/metadata/manager/ydb_value_operator.h> +#include <ydb/services/metadata/service.h> + +#include <library/cpp/actors/core/av_bootstrapped.h> +#include <library/cpp/protobuf/json/proto2json.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <util/system/hostname.h> + +namespace NKikimr { + +using namespace NColumnShard; + +class TLocalHelper: public Tests::NCS::THelper { +private: + using TBase = Tests::NCS::THelper; +public: + using TBase::TBase; + void CreateTestOlapTable(TString tableName = "olapTable", ui32 tableShardsCount = 3, + TString storeName = "olapStore", ui32 storeShardsCount = 4, + TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") { + TActorId sender = Server.GetRuntime()->AllocateEdgeActor(); + CreateTestOlapStore(sender, Sprintf(R"( + Name: "%s" + ColumnShardCount: %d + SchemaPresets { + Name: "default" + Schema { + %s + } + } + )", storeName.c_str(), storeShardsCount, PROTO_SCHEMA)); + + TString shardingColumns = "[\"timestamp\", \"uid\"]"; + if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") { + shardingColumns = "[\"uid\"]"; + } + + TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"( + Name: "%s" + ColumnShardCount: %d + TtlSettings: { + Enabled: { + ColumnName: "timestamp" + ExpireAfterSeconds : 60 + } + } + Sharding { + HashSharding { + Function: %s + Columns: %s + } + } + )", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str())); + } +}; + + +Y_UNIT_TEST_SUITE(ExternalIndex) { + + Y_UNIT_TEST(Simple) { + TPortManager pm; + + ui32 grpcPort = pm.GetPort(); + ui32 msgbPort = pm.GetPort(); + + Tests::TServerSettings serverSettings(msgbPort); + serverSettings.Port = msgbPort; + serverSettings.GrpcPort = grpcPort; + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMetadataProvider(true) + .SetEnableExternalIndex(true) + .SetEnableBackgroundTasks(true) + .SetEnableOlapSchemaOperations(true); + ; + ; + + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + server->EnableGRpc(grpcPort); + Tests::TClient client(serverSettings); + + auto& runtime = *server->GetRuntime(); + runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); + runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE); + runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NLog::PRI_DEBUG); + runtime.SetLogPriority(NKikimrServices::BG_TASKS, NLog::PRI_DEBUG); + // runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG); + + TLocalHelper lHelper(*server); + lHelper.CreateTestOlapTable("olapTable", 2); + lHelper.StartSchemaRequest("CREATE OBJECT `/Root/olapStore/olapTable:ext_index_simple` ( " + "TYPE CS_EXT_INDEX) WITH (extractor = `{\"class_name\" : \"city64\", \"object\" : {\"fields\" : [{\"id\":\"uid\"}, {\"id\":\"message\"}]}}`)"); + Cerr << "Wait tables" << Endl; + runtime.SimulateSleep(TDuration::Seconds(20)); + Cerr << "Initialization tables" << Endl; + const TInstant pkStart = Now() - TDuration::Days(15); + ui32 idx = 0; + + auto batch = lHelper.TestArrowBatch(0, (pkStart + TDuration::Seconds(2 * idx++)).GetValue(), 6000); + auto batchSize = NArrow::GetBatchDataSize(batch); + Cerr << "Inserting " << batchSize << " bytes..." << Endl; + UNIT_ASSERT(batchSize > 4 * 1024 * 1024); // NColumnShard::TLimits::MIN_BYTES_TO_INSERT + UNIT_ASSERT(batchSize < 8 * 1024 * 1024); + + for (ui32 i = 0; i < 4; ++i) { + lHelper.SendDataViaActorSystem("/Root/olapStore/olapTable", batch); + } + + { + TString resultData; + lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple`", true, &resultData); + UNIT_ASSERT_EQUAL(resultData, "[6000u]"); + } + lHelper.StartSchemaRequest("DROP OBJECT `/Root/olapStore/olapTable:ext_index_simple` (TYPE CS_EXT_INDEX)"); + for (ui32 i = 0; i < 10; ++i) { + server->GetRuntime()->SimulateSleep(TDuration::Seconds(10)); + } + lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple`", false); + { + TString resultData; + lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/external`", true, &resultData); + UNIT_ASSERT_EQUAL(resultData, "[0u]"); + } + } + +} +} |