aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2023-03-14 08:06:00 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2023-03-14 08:06:00 +0300
commit9aa65dc165f24925a281f89c975cc5117823934f (patch)
tree4693538565fa3ffde7da0deb4579d25c8211e374
parent317a84f0cfb9a5a203ac765e9571f2dd6c06b209 (diff)
downloadydb-9aa65dc165f24925a281f89c975cc5117823934f.tar.gz
external indexes
-rw-r--r--ydb/core/base/events.h1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt2
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt2
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt2
-rw-r--r--ydb/core/driver_lib/run/config.h1
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp23
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.h6
-rw-r--r--ydb/core/driver_lib/run/run.cpp4
-rw-r--r--ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.cpp33
-rw-r--r--ydb/core/protos/config.proto7
-rw-r--r--ydb/core/protos/services.proto2
-rw-r--r--ydb/core/testlib/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/testlib/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/testlib/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/testlib/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/testlib/test_client.cpp8
-rw-r--r--ydb/core/testlib/test_client.h2
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows.h7
-rw-r--r--ydb/services/CMakeLists.txt1
-rw-r--r--ydb/services/ext_index/CMakeLists.darwin-x86_64.txt21
-rw-r--r--ydb/services/ext_index/CMakeLists.linux-aarch64.txt22
-rw-r--r--ydb/services/ext_index/CMakeLists.linux-x86_64.txt22
-rw-r--r--ydb/services/ext_index/CMakeLists.txt17
-rw-r--r--ydb/services/ext_index/CMakeLists.windows-x86_64.txt21
-rw-r--r--ydb/services/ext_index/common/CMakeLists.darwin-x86_64.txt23
-rw-r--r--ydb/services/ext_index/common/CMakeLists.linux-aarch64.txt24
-rw-r--r--ydb/services/ext_index/common/CMakeLists.linux-x86_64.txt24
-rw-r--r--ydb/services/ext_index/common/CMakeLists.txt17
-rw-r--r--ydb/services/ext_index/common/CMakeLists.windows-x86_64.txt23
-rw-r--r--ydb/services/ext_index/common/config.cpp21
-rw-r--r--ydb/services/ext_index/common/config.h19
-rw-r--r--ydb/services/ext_index/common/events.cpp5
-rw-r--r--ydb/services/ext_index/common/events.h16
-rw-r--r--ydb/services/ext_index/common/service.cpp23
-rw-r--r--ydb/services/ext_index/common/service.h76
-rw-r--r--ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt50
-rw-r--r--ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt52
-rw-r--r--ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt52
-rw-r--r--ydb/services/ext_index/metadata/CMakeLists.txt17
-rw-r--r--ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt50
-rw-r--r--ydb/services/ext_index/metadata/behaviour.cpp30
-rw-r--r--ydb/services/ext_index/metadata/behaviour.h25
-rw-r--r--ydb/services/ext_index/metadata/fetcher.cpp12
-rw-r--r--ydb/services/ext_index/metadata/fetcher.h15
-rw-r--r--ydb/services/ext_index/metadata/initializer.cpp48
-rw-r--r--ydb/services/ext_index/metadata/initializer.h15
-rw-r--r--ydb/services/ext_index/metadata/manager.cpp90
-rw-r--r--ydb/services/ext_index/metadata/manager.h21
-rw-r--r--ydb/services/ext_index/metadata/object.cpp92
-rw-r--r--ydb/services/ext_index/metadata/object.h203
-rw-r--r--ydb/services/ext_index/metadata/snapshot.cpp46
-rw-r--r--ydb/services/ext_index/metadata/snapshot.h23
-rw-r--r--ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt25
-rw-r--r--ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt26
-rw-r--r--ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt26
-rw-r--r--ydb/services/ext_index/service/CMakeLists.txt17
-rw-r--r--ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt25
-rw-r--r--ydb/services/ext_index/service/activation.cpp97
-rw-r--r--ydb/services/ext_index/service/activation.h68
-rw-r--r--ydb/services/ext_index/service/add_data.cpp38
-rw-r--r--ydb/services/ext_index/service/add_data.h55
-rw-r--r--ydb/services/ext_index/service/add_index.cpp127
-rw-r--r--ydb/services/ext_index/service/add_index.h51
-rw-r--r--ydb/services/ext_index/service/deleting.cpp54
-rw-r--r--ydb/services/ext_index/service/deleting.h62
-rw-r--r--ydb/services/ext_index/service/executor.cpp116
-rw-r--r--ydb/services/ext_index/service/executor.h58
-rw-r--r--ydb/services/ext_index/ut/CMakeLists.darwin-x86_64.txt82
-rw-r--r--ydb/services/ext_index/ut/CMakeLists.linux-aarch64.txt84
-rw-r--r--ydb/services/ext_index/ut/CMakeLists.linux-x86_64.txt86
-rw-r--r--ydb/services/ext_index/ut/CMakeLists.txt17
-rw-r--r--ydb/services/ext_index/ut/CMakeLists.windows-x86_64.txt74
-rw-r--r--ydb/services/ext_index/ut/ut_ext_index.cpp146
78 files changed, 2655 insertions, 5 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index 7ee00f8332..4c1257368e 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -159,6 +159,7 @@ struct TKikimrEvents : TEvents {
ES_TEST_LOAD,
ES_GRPC_CANCELATION,
ES_DISCOVERY,
+ ES_EXT_INDEX,
};
};
diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
index 411c78bad7..a08d5d9d87 100644
--- a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
@@ -130,6 +130,8 @@ target_link_libraries(run PUBLIC
ydb-services-metadata
services-bg_tasks-ds_table
ydb-services-bg_tasks
+ services-ext_index-service
+ services-ext_index-metadata
ydb-services-monitoring
ydb-services-persqueue_cluster_discovery
ydb-services-persqueue_v1
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
index 0f62710fa1..15d3892351 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
@@ -131,6 +131,8 @@ target_link_libraries(run PUBLIC
ydb-services-metadata
services-bg_tasks-ds_table
ydb-services-bg_tasks
+ services-ext_index-service
+ services-ext_index-metadata
ydb-services-monitoring
ydb-services-persqueue_cluster_discovery
ydb-services-persqueue_v1
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
index 0f62710fa1..15d3892351 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
@@ -131,6 +131,8 @@ target_link_libraries(run PUBLIC
ydb-services-metadata
services-bg_tasks-ds_table
ydb-services-bg_tasks
+ services-ext_index-service
+ services-ext_index-metadata
ydb-services-monitoring
ydb-services-persqueue_cluster_discovery
ydb-services-persqueue_v1
diff --git a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
index 411c78bad7..a08d5d9d87 100644
--- a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
@@ -130,6 +130,8 @@ target_link_libraries(run PUBLIC
ydb-services-metadata
services-bg_tasks-ds_table
ydb-services-bg_tasks
+ services-ext_index-service
+ services-ext_index-metadata
ydb-services-monitoring
ydb-services-persqueue_cluster_discovery
ydb-services-persqueue_v1
diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h
index 1e859c1705..b584d31ea6 100644
--- a/ydb/core/driver_lib/run/config.h
+++ b/ydb/core/driver_lib/run/config.h
@@ -69,6 +69,7 @@ union TBasicKikimrServicesMask {
bool EnableMetadataProvider:1;
bool EnableReplicationService:1;
bool EnableBackgroundTasks:1;
+ bool ExternalIndex:1;
};
ui64 Raw;
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 0efe573019..ad8726f439 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -159,6 +159,8 @@
#include <ydb/services/bg_tasks/ds_table/executor.h>
#include <ydb/services/bg_tasks/service.h>
+#include <ydb/services/ext_index/common/config.h>
+#include <ydb/services/ext_index/service/executor.h>
#include <library/cpp/actors/protos/services_common.pb.h>
@@ -2297,9 +2299,26 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
}
}
+TExternalIndexInitializer::TExternalIndexInitializer(const TKikimrRunConfig& runConfig)
+ : IKikimrServicesInitializer(runConfig) {
+}
+
+void TExternalIndexInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
+ NCSIndex::TConfig serviceConfig;
+ if (Config.HasExternalIndexConfig()) {
+ Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetExternalIndexConfig()));
+ }
+
+ if (serviceConfig.IsEnabled()) {
+ auto service = NCSIndex::CreateService(serviceConfig);
+ setup->LocalServices.push_back(std::make_pair(
+ NCSIndex::MakeServiceId(NodeId),
+ TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
+ }
+}
+
TMetadataProviderInitializer::TMetadataProviderInitializer(const TKikimrRunConfig& runConfig)
- : IKikimrServicesInitializer(runConfig)
-{
+ : IKikimrServicesInitializer(runConfig) {
}
void TMetadataProviderInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h
index 0bb7ebddb0..1c36096e13 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.h
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h
@@ -383,6 +383,12 @@ private:
std::shared_ptr<TModuleFactories> Factories;
};
+class TExternalIndexInitializer: public IKikimrServicesInitializer {
+public:
+ TExternalIndexInitializer(const TKikimrRunConfig& runConfig);
+ void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
+};
+
class TMetadataProviderInitializer: public IKikimrServicesInitializer {
public:
TMetadataProviderInitializer(const TKikimrRunConfig& runConfig);
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index 4b12f5fb51..1d5715c57f 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -1485,6 +1485,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TMetadataProviderInitializer(runConfig));
}
+ if (serviceMask.ExternalIndex) {
+ sil->AddServiceInitializer(new TExternalIndexInitializer(runConfig));
+ }
+
if (serviceMask.EnableBackgroundTasks) {
sil->AddServiceInitializer(new TBackgroundTasksInitializer(runConfig));
}
diff --git a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
index 7b1e0c5eed..3a3ed2a3d9 100644
--- a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
@@ -55,6 +55,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
api-protos
public-lib-operation_id
cpp-client-resources
+ services-ext_index-common
)
target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/audit_log.cpp
diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
index 76a073859f..c2c39ae694 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
@@ -56,6 +56,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
api-protos
public-lib-operation_id
cpp-client-resources
+ services-ext_index-common
)
target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/audit_log.cpp
diff --git a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
index 76a073859f..c2c39ae694 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
@@ -56,6 +56,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
api-protos
public-lib-operation_id
cpp-client-resources
+ services-ext_index-common
)
target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/audit_log.cpp
diff --git a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
index 7b1e0c5eed..3a3ed2a3d9 100644
--- a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
@@ -55,6 +55,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
api-protos
public-lib-operation_id
cpp-client-resources
+ services-ext_index-common
)
target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/audit_log.cpp
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp
index 8b8eb20a36..a7369c1026 100644
--- a/ydb/core/grpc_services/rpc_long_tx.cpp
+++ b/ydb/core/grpc_services/rpc_long_tx.cpp
@@ -14,6 +14,7 @@
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/long_tx_service/public/events.h>
+#include <ydb/services/ext_index/common/service.h>
#include <library/cpp/actors/wilson/wilson_profile_span.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/compute/api.h>
@@ -467,6 +468,13 @@ protected:
ui64 tableId = entry.TableId.PathId.LocalPathId;
+ if (NCSIndex::TServiceOperator::IsEnabled()) {
+ TBase::Send(NCSIndex::MakeServiceId(TBase::SelfId().NodeId()),
+ new NCSIndex::TEvAddData(GetDeserializedBatch(), Path, std::make_shared<NCSIndex::TNaiveDataUpsertController>(TBase::SelfId())));
+ } else {
+ IndexReady = true;
+ }
+
if (sharding.HasRandomSharding()) {
ui64 shard = sharding.GetColumnShards(0);
SendWriteRequest(shard, tableId, DedupId, GetSerializedData());
@@ -537,6 +545,7 @@ private:
hFunc(TEvColumnShard::TEvWriteResult, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
+ hFunc(NCSIndex::TEvAddDataResult, Handle);
}
}
@@ -614,6 +623,7 @@ private:
Y_UNUSED(ctx);
switch (ev->GetTypeRewrite()) {
hFunc(TEvLongTxService::TEvAttachColumnShardWritesResult, Handle);
+ hFunc(NCSIndex::TEvAddDataResult, Handle);
}
}
@@ -629,8 +639,27 @@ private:
}
return ReplyError(msg->Record.GetStatus());
}
+ if (IndexReady) {
+ ReplySuccess();
+ } else {
+ ColumnShardReady = true;
+ }
+ }
+
+ void Handle(NCSIndex::TEvAddDataResult::TPtr& ev) {
+ const auto* msg = ev->Get();
+ if (msg->GetErrorMessage()) {
+ NWilson::TProfileSpan pSpan(0, ActorSpan.GetTraceId(), "NCSIndex::TEvAddDataResult");
+ RaiseIssue(NYql::TIssue(msg->GetErrorMessage()));
+ return ReplyError(Ydb::StatusIds::GENERIC_ERROR, msg->GetErrorMessage());
+ } else {
+ if (ColumnShardReady) {
+ ReplySuccess();
+ } else {
+ IndexReady = true;
+ }
+ }
- ReplySuccess();
}
private:
@@ -665,6 +694,8 @@ private:
THashSet<ui64> ShardsToRetry;
NWilson::TProfileSpan ActorSpan;
TActorId TimeoutTimerActorId;
+ bool ColumnShardReady = false;
+ bool IndexReady = false;
};
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index e1618601c0..b8a8a78b3e 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -601,6 +601,12 @@ message TInternalRequestConfig {
optional uint32 RetryPeriodFinishSeconds = 2 [default = 30];
}
+message TExternalIndexConfig {
+ optional bool Enabled = 1 [default = true];
+ optional TInternalRequestConfig RequestConfig = 2;
+ optional string InternalTablePath = 3;
+}
+
message TMetadataProviderConfig {
optional bool Enabled = 1 [default = true];
optional uint32 RefreshPeriodSeconds = 2 [default = 10];
@@ -1787,6 +1793,7 @@ message TAppConfig {
optional TBackgroundTasksConfig BackgroundTasksConfig = 60;
optional TAuditConfig AuditConfig = 61;
optional TClientCertificateAuthorization ClientCertificateAuthorization = 62;
+ optional TExternalIndexConfig ExternalIndexConfig = 63;
optional NYq.NConfig.TConfig YandexQueryConfig = 50; // TODO: remove after migration to FederatedQueryConfig
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 7f385a7310..3992f2c43a 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -360,6 +360,8 @@ enum EServiceKikimr {
// Discovery
DISCOVERY = 1800;
DISCOVERY_CACHE = 1801;
+
+ EXT_INDEX = 1900;
};
message TActivity {
diff --git a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt
index 699c067000..82e750dc08 100644
--- a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt
@@ -85,6 +85,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-services-cms
ydb-services-datastreams
ydb-services-discovery
+ services-ext_index-service
ydb-services-fq
ydb-services-kesus
ydb-services-persqueue_cluster_discovery
diff --git a/ydb/core/testlib/CMakeLists.linux-aarch64.txt b/ydb/core/testlib/CMakeLists.linux-aarch64.txt
index 0439fe96da..2a6f45b5b4 100644
--- a/ydb/core/testlib/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/testlib/CMakeLists.linux-aarch64.txt
@@ -86,6 +86,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-services-cms
ydb-services-datastreams
ydb-services-discovery
+ services-ext_index-service
ydb-services-fq
ydb-services-kesus
ydb-services-persqueue_cluster_discovery
diff --git a/ydb/core/testlib/CMakeLists.linux-x86_64.txt b/ydb/core/testlib/CMakeLists.linux-x86_64.txt
index 0439fe96da..2a6f45b5b4 100644
--- a/ydb/core/testlib/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/testlib/CMakeLists.linux-x86_64.txt
@@ -86,6 +86,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-services-cms
ydb-services-datastreams
ydb-services-discovery
+ services-ext_index-service
ydb-services-fq
ydb-services-kesus
ydb-services-persqueue_cluster_discovery
diff --git a/ydb/core/testlib/CMakeLists.windows-x86_64.txt b/ydb/core/testlib/CMakeLists.windows-x86_64.txt
index 699c067000..82e750dc08 100644
--- a/ydb/core/testlib/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/testlib/CMakeLists.windows-x86_64.txt
@@ -85,6 +85,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-services-cms
ydb-services-datastreams
ydb-services-discovery
+ services-ext_index-service
ydb-services-fq
ydb-services-kesus
ydb-services-persqueue_cluster_discovery
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index de7607c7e8..145194530d 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -94,6 +94,9 @@
#include <ydb/services/metadata/service.h>
#include <ydb/services/bg_tasks/ds_table/executor.h>
#include <ydb/services/bg_tasks/service.h>
+#include <ydb/services/ext_index/common/config.h>
+#include <ydb/services/ext_index/common/service.h>
+#include <ydb/services/ext_index/service/executor.h>
#include <ydb/library/folder_service/mock/mock_folder_service.h>
#include <ydb/core/client/server/msgbus_server_tracer.h>
@@ -724,6 +727,11 @@ namespace Tests {
const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NBackgroundTasks::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid);
}
+ if (Settings->IsEnableExternalIndex()) {
+ auto* actor = NCSIndex::CreateService(NCSIndex::TConfig());
+ const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
+ Runtime->RegisterService(NCSIndex::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid);
+ }
Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
auto sysViewService = NSysView::CreateSysViewServiceForTests();
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index 21de04466e..a2e20023fc 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -212,6 +212,8 @@ namespace Tests {
private:
YDB_FLAG_ACCESSOR(EnableMetadataProvider, true);
YDB_FLAG_ACCESSOR(EnableBackgroundTasks, false);
+ YDB_FLAG_ACCESSOR(EnableExternalIndex, false);
+
};
class TServer : public TThrRefBase, TMoveOnly {
diff --git a/ydb/core/tx/tx_proxy/upload_rows.h b/ydb/core/tx/tx_proxy/upload_rows.h
index 93d116b04e..63590b2bff 100644
--- a/ydb/core/tx/tx_proxy/upload_rows.h
+++ b/ydb/core/tx/tx_proxy/upload_rows.h
@@ -16,10 +16,13 @@ enum class EUploadRowsMode {
WriteToTableShadow,
};
+using TUploadTypes = TVector<std::pair<TString, Ydb::Type>>;
+using TUploadRows = TVector<std::pair<TSerializedCellVec, TString>>;
+
IActor* CreateUploadRowsInternal(const TActorId& sender,
const TString& table,
- std::shared_ptr<TVector<std::pair<TString, Ydb::Type> > > types,
- std::shared_ptr<TVector<std::pair<TSerializedCellVec, TString> > > rows,
+ std::shared_ptr<TUploadTypes> types,
+ std::shared_ptr<TUploadRows> rows,
EUploadRowsMode mode = EUploadRowsMode::Normal,
bool writeToPrivateTable = false);
} // namespace NTxProxy
diff --git a/ydb/services/CMakeLists.txt b/ydb/services/CMakeLists.txt
index 764b3c3ebf..4b697f067a 100644
--- a/ydb/services/CMakeLists.txt
+++ b/ydb/services/CMakeLists.txt
@@ -11,6 +11,7 @@ add_subdirectory(bg_tasks)
add_subdirectory(cms)
add_subdirectory(datastreams)
add_subdirectory(discovery)
+add_subdirectory(ext_index)
add_subdirectory(fq)
add_subdirectory(kesus)
add_subdirectory(lib)
diff --git a/ydb/services/ext_index/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..2694d34fb3
--- /dev/null
+++ b/ydb/services/ext_index/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(common)
+add_subdirectory(metadata)
+add_subdirectory(service)
+add_subdirectory(ut)
+
+add_library(ydb-services-ext_index INTERFACE)
+target_link_libraries(ydb-services-ext_index INTERFACE
+ contrib-libs-cxxsupp
+ yutil
+ services-ext_index-metadata
+ services-ext_index-service
+ services-ext_index-common
+)
diff --git a/ydb/services/ext_index/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..6e822cd7ce
--- /dev/null
+++ b/ydb/services/ext_index/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(common)
+add_subdirectory(metadata)
+add_subdirectory(service)
+add_subdirectory(ut)
+
+add_library(ydb-services-ext_index INTERFACE)
+target_link_libraries(ydb-services-ext_index INTERFACE
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ services-ext_index-metadata
+ services-ext_index-service
+ services-ext_index-common
+)
diff --git a/ydb/services/ext_index/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..6e822cd7ce
--- /dev/null
+++ b/ydb/services/ext_index/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(common)
+add_subdirectory(metadata)
+add_subdirectory(service)
+add_subdirectory(ut)
+
+add_library(ydb-services-ext_index INTERFACE)
+target_link_libraries(ydb-services-ext_index INTERFACE
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ services-ext_index-metadata
+ services-ext_index-service
+ services-ext_index-common
+)
diff --git a/ydb/services/ext_index/CMakeLists.txt b/ydb/services/ext_index/CMakeLists.txt
new file mode 100644
index 0000000000..d90657116d
--- /dev/null
+++ b/ydb/services/ext_index/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/services/ext_index/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..2694d34fb3
--- /dev/null
+++ b/ydb/services/ext_index/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,21 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(common)
+add_subdirectory(metadata)
+add_subdirectory(service)
+add_subdirectory(ut)
+
+add_library(ydb-services-ext_index INTERFACE)
+target_link_libraries(ydb-services-ext_index INTERFACE
+ contrib-libs-cxxsupp
+ yutil
+ services-ext_index-metadata
+ services-ext_index-service
+ services-ext_index-common
+)
diff --git a/ydb/services/ext_index/common/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/common/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..d748daffc9
--- /dev/null
+++ b/ydb/services/ext_index/common/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ext_index-common)
+target_link_libraries(services-ext_index-common PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ api-protos
+ ydb-core-protos
+ libs-apache-arrow
+)
+target_sources(services-ext_index-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/events.cpp
+)
diff --git a/ydb/services/ext_index/common/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/common/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..5b8fcd6cd8
--- /dev/null
+++ b/ydb/services/ext_index/common/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ext_index-common)
+target_link_libraries(services-ext_index-common PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ api-protos
+ ydb-core-protos
+ libs-apache-arrow
+)
+target_sources(services-ext_index-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/events.cpp
+)
diff --git a/ydb/services/ext_index/common/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/common/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..5b8fcd6cd8
--- /dev/null
+++ b/ydb/services/ext_index/common/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,24 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ext_index-common)
+target_link_libraries(services-ext_index-common PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ api-protos
+ ydb-core-protos
+ libs-apache-arrow
+)
+target_sources(services-ext_index-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/events.cpp
+)
diff --git a/ydb/services/ext_index/common/CMakeLists.txt b/ydb/services/ext_index/common/CMakeLists.txt
new file mode 100644
index 0000000000..d90657116d
--- /dev/null
+++ b/ydb/services/ext_index/common/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/services/ext_index/common/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/common/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..d748daffc9
--- /dev/null
+++ b/ydb/services/ext_index/common/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ext_index-common)
+target_link_libraries(services-ext_index-common PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ api-protos
+ ydb-core-protos
+ libs-apache-arrow
+)
+target_sources(services-ext_index-common PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/common/events.cpp
+)
diff --git a/ydb/services/ext_index/common/config.cpp b/ydb/services/ext_index/common/config.cpp
new file mode 100644
index 0000000000..3245521881
--- /dev/null
+++ b/ydb/services/ext_index/common/config.cpp
@@ -0,0 +1,21 @@
+#include "config.h"
+#include <ydb/core/base/appdata.h>
+
+namespace NKikimr::NCSIndex {
+
+bool TConfig::DeserializeFromProto(const NKikimrConfig::TExternalIndexConfig& config) {
+ EnabledFlag = config.GetEnabled();
+ if (!RequestConfig.DeserializeFromProto(config.GetRequestConfig())) {
+ return false;
+ }
+ if (config.HasInternalTablePath()) {
+ InternalTablePath = config.GetInternalTablePath();
+ }
+ return true;
+}
+
+TString TConfig::GetTablePath() const {
+ return AppData()->TenantName + "/" + InternalTablePath;
+}
+
+}
diff --git a/ydb/services/ext_index/common/config.h b/ydb/services/ext_index/common/config.h
new file mode 100644
index 0000000000..be48feb29b
--- /dev/null
+++ b/ydb/services/ext_index/common/config.h
@@ -0,0 +1,19 @@
+#pragma once
+#include <ydb/core/protos/config.pb.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/services/metadata/request/config.h>
+
+#include <util/datetime/base.h>
+
+namespace NKikimr::NCSIndex {
+
+class TConfig {
+private:
+ YDB_READONLY_DEF(NMetadata::NRequest::TConfig, RequestConfig);
+ YDB_READONLY(TString, InternalTablePath, ".ext_index/tasks");
+ YDB_READONLY_FLAG(Enabled, true);
+public:
+ bool DeserializeFromProto(const NKikimrConfig::TExternalIndexConfig& config);
+ TString GetTablePath() const;
+};
+}
diff --git a/ydb/services/ext_index/common/events.cpp b/ydb/services/ext_index/common/events.cpp
new file mode 100644
index 0000000000..805374e568
--- /dev/null
+++ b/ydb/services/ext_index/common/events.cpp
@@ -0,0 +1,5 @@
+#include "events.h"
+
+namespace NKikimr::NBackgroundTasks {
+
+}
diff --git a/ydb/services/ext_index/common/events.h b/ydb/services/ext_index/common/events.h
new file mode 100644
index 0000000000..8b2bfa1ee8
--- /dev/null
+++ b/ydb/services/ext_index/common/events.h
@@ -0,0 +1,16 @@
+#pragma once
+#include <ydb/core/base/events.h>
+
+#include <library/cpp/actors/core/events.h>
+
+namespace NKikimr::NCSIndex {
+
+enum EEvents {
+ EvAddData = EventSpaceBegin(TKikimrEvents::ES_EXT_INDEX),
+ EvAddDataResult,
+ EvEnd
+};
+
+static_assert(EEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_EXT_INDEX), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_EXT_INDEX)");
+
+}
diff --git a/ydb/services/ext_index/common/service.cpp b/ydb/services/ext_index/common/service.cpp
new file mode 100644
index 0000000000..afd9acee60
--- /dev/null
+++ b/ydb/services/ext_index/common/service.cpp
@@ -0,0 +1,23 @@
+#include "service.h"
+
+#include "config.h"
+
+namespace NKikimr::NCSIndex {
+
+NActors::TActorId MakeServiceId(const ui32 nodeId) {
+ return NActors::TActorId(nodeId, "SrvcExtIndex");
+}
+
+void TServiceOperator::Register(const TConfig& config) {
+ auto* service = Singleton<TServiceOperator>();
+ std::unique_lock<std::shared_mutex> lock(service->Lock);
+ service->EnabledFlag = config.IsEnabled();
+}
+
+bool TServiceOperator::IsEnabled() {
+ auto* service = Singleton<TServiceOperator>();
+ std::shared_lock<std::shared_mutex> lock(service->Lock);
+ return service->EnabledFlag;
+}
+
+}
diff --git a/ydb/services/ext_index/common/service.h b/ydb/services/ext_index/common/service.h
new file mode 100644
index 0000000000..904025cad0
--- /dev/null
+++ b/ydb/services/ext_index/common/service.h
@@ -0,0 +1,76 @@
+#pragma once
+#include "events.h"
+
+#include <ydb/library/accessor/accessor.h>
+#include <library/cpp/actors/core/event_local.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <shared_mutex>
+
+namespace NKikimr::NCSIndex {
+
+class IDataUpsertController {
+public:
+ using TPtr = std::shared_ptr<IDataUpsertController>;
+ virtual ~IDataUpsertController() = default;
+ virtual void OnAllIndexesUpserted() = 0;
+ virtual void OnAllIndexesUpsertionFailed(const TString& errorMessage) = 0;
+};
+
+class TEvAddData: public NActors::TEventLocal<TEvAddData, EEvents::EvAddData> {
+private:
+ YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Data);
+ YDB_READONLY_DEF(TString, TablePath);
+ YDB_READONLY_DEF(IDataUpsertController::TPtr, ExternalController);
+public:
+ TEvAddData(std::shared_ptr<arrow::RecordBatch> data, const TString& tablePath, IDataUpsertController::TPtr externalController)
+ : Data(data)
+ , TablePath(tablePath)
+ , ExternalController(externalController)
+ {
+
+ }
+};
+
+class TEvAddDataResult: public NActors::TEventLocal<TEvAddDataResult, EEvents::EvAddDataResult> {
+private:
+ YDB_READONLY_DEF(TString, ErrorMessage);
+public:
+ TEvAddDataResult() = default;
+
+ TEvAddDataResult(const TString& errorMessage)
+ : ErrorMessage(errorMessage) {
+
+ }
+};
+
+class TNaiveDataUpsertController: public IDataUpsertController {
+private:
+ const TActorIdentity OwnerId;
+public:
+ TNaiveDataUpsertController(const TActorIdentity& ownerId)
+ : OwnerId(ownerId) {
+
+ }
+ virtual void OnAllIndexesUpserted() override {
+ OwnerId.Send(OwnerId, new TEvAddDataResult());
+ }
+ virtual void OnAllIndexesUpsertionFailed(const TString& errorMessage) override {
+ OwnerId.Send(OwnerId, new TEvAddDataResult(errorMessage));
+ }
+};
+
+NActors::TActorId MakeServiceId(const ui32 node);
+
+class TConfig;
+
+class TServiceOperator {
+private:
+ friend class TExecutor;
+ std::shared_mutex Lock;
+ bool EnabledFlag = false;
+ static void Register(const TConfig& config);
+public:
+ static bool IsEnabled();
+};
+
+}
diff --git a/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..ab68de15b7
--- /dev/null
+++ b/ydb/services/ext_index/metadata/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,50 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ext_index-metadata)
+target_compile_options(services-ext_index-metadata PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(services-ext_index-metadata PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ core-grpc_services-local_rpc
+ core-grpc_services-base
+ ydb-core-grpc_services
+ services-metadata-request
+ core-tx-sharding
+)
+target_sources(services-ext_index-metadata PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/fetcher.cpp
+)
+
+add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata)
+target_compile_options(services-ext_index-metadata.global PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(services-ext_index-metadata.global PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ core-grpc_services-local_rpc
+ core-grpc_services-base
+ ydb-core-grpc_services
+ services-metadata-request
+ core-tx-sharding
+)
+target_sources(services-ext_index-metadata.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp
+)
diff --git a/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..3d32b2799c
--- /dev/null
+++ b/ydb/services/ext_index/metadata/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,52 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ext_index-metadata)
+target_compile_options(services-ext_index-metadata PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(services-ext_index-metadata PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ core-grpc_services-local_rpc
+ core-grpc_services-base
+ ydb-core-grpc_services
+ services-metadata-request
+ core-tx-sharding
+)
+target_sources(services-ext_index-metadata PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/fetcher.cpp
+)
+
+add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata)
+target_compile_options(services-ext_index-metadata.global PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(services-ext_index-metadata.global PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ core-grpc_services-local_rpc
+ core-grpc_services-base
+ ydb-core-grpc_services
+ services-metadata-request
+ core-tx-sharding
+)
+target_sources(services-ext_index-metadata.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp
+)
diff --git a/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..3d32b2799c
--- /dev/null
+++ b/ydb/services/ext_index/metadata/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,52 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ext_index-metadata)
+target_compile_options(services-ext_index-metadata PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(services-ext_index-metadata PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ core-grpc_services-local_rpc
+ core-grpc_services-base
+ ydb-core-grpc_services
+ services-metadata-request
+ core-tx-sharding
+)
+target_sources(services-ext_index-metadata PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/fetcher.cpp
+)
+
+add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata)
+target_compile_options(services-ext_index-metadata.global PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(services-ext_index-metadata.global PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ core-grpc_services-local_rpc
+ core-grpc_services-base
+ ydb-core-grpc_services
+ services-metadata-request
+ core-tx-sharding
+)
+target_sources(services-ext_index-metadata.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp
+)
diff --git a/ydb/services/ext_index/metadata/CMakeLists.txt b/ydb/services/ext_index/metadata/CMakeLists.txt
new file mode 100644
index 0000000000..d90657116d
--- /dev/null
+++ b/ydb/services/ext_index/metadata/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..ab68de15b7
--- /dev/null
+++ b/ydb/services/ext_index/metadata/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,50 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ext_index-metadata)
+target_compile_options(services-ext_index-metadata PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(services-ext_index-metadata PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ core-grpc_services-local_rpc
+ core-grpc_services-base
+ ydb-core-grpc_services
+ services-metadata-request
+ core-tx-sharding
+)
+target_sources(services-ext_index-metadata PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/manager.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/initializer.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/snapshot.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/fetcher.cpp
+)
+
+add_global_library_for(services-ext_index-metadata.global services-ext_index-metadata)
+target_compile_options(services-ext_index-metadata.global PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(services-ext_index-metadata.global PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ core-grpc_services-local_rpc
+ core-grpc_services-base
+ ydb-core-grpc_services
+ services-metadata-request
+ core-tx-sharding
+)
+target_sources(services-ext_index-metadata.global PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/object.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/metadata/behaviour.cpp
+)
diff --git a/ydb/services/ext_index/metadata/behaviour.cpp b/ydb/services/ext_index/metadata/behaviour.cpp
new file mode 100644
index 0000000000..7d50e968cf
--- /dev/null
+++ b/ydb/services/ext_index/metadata/behaviour.cpp
@@ -0,0 +1,30 @@
+#include "behaviour.h"
+#include "initializer.h"
+#include "manager.h"
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+TBehaviour::TFactory::TRegistrator<TBehaviour> TBehaviour::Registrator(TObject::GetTypeId());
+
+TString TBehaviour::GetInternalStorageTablePath() const {
+ return "cs_index/external";
+}
+
+NModifications::IOperationsManager::TPtr TBehaviour::ConstructOperationsManager() const {
+ return std::make_shared<TManager>();
+}
+
+NInitializer::IInitializationBehaviour::TPtr TBehaviour::ConstructInitializer() const {
+ return std::make_shared<TInitializer>();
+}
+
+TString TBehaviour::GetTypeId() const {
+ return TObject::GetTypeId();
+}
+
+IClassBehaviour::TPtr TBehaviour::GetInstance() {
+ static std::shared_ptr<TBehaviour> result = std::make_shared<TBehaviour>();
+ return result;
+}
+
+}
diff --git a/ydb/services/ext_index/metadata/behaviour.h b/ydb/services/ext_index/metadata/behaviour.h
new file mode 100644
index 0000000000..1bfe358492
--- /dev/null
+++ b/ydb/services/ext_index/metadata/behaviour.h
@@ -0,0 +1,25 @@
+#pragma once
+#include "object.h"
+
+#include <ydb/services/metadata/abstract/kqp_common.h>
+#include <ydb/services/metadata/abstract/initialization.h>
+#include <ydb/services/metadata/manager/abstract.h>
+#include <ydb/services/metadata/manager/common.h>
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+class TBehaviour: public TClassBehaviour<TObject> {
+private:
+ static TFactory::TRegistrator<TBehaviour> Registrator;
+protected:
+ virtual NInitializer::IInitializationBehaviour::TPtr ConstructInitializer() const override;
+ virtual NModifications::IOperationsManager::TPtr ConstructOperationsManager() const override;
+ virtual TString GetInternalStorageTablePath() const override;
+
+public:
+ TBehaviour() = default;
+ virtual TString GetTypeId() const override;
+ static IClassBehaviour::TPtr GetInstance();
+};
+
+}
diff --git a/ydb/services/ext_index/metadata/fetcher.cpp b/ydb/services/ext_index/metadata/fetcher.cpp
new file mode 100644
index 0000000000..52061b6129
--- /dev/null
+++ b/ydb/services/ext_index/metadata/fetcher.cpp
@@ -0,0 +1,12 @@
+#include "fetcher.h"
+#include "behaviour.h"
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+std::vector<IClassBehaviour::TPtr> TFetcher::DoGetManagers() const {
+ return {
+ TBehaviour::GetInstance()
+ };
+}
+
+}
diff --git a/ydb/services/ext_index/metadata/fetcher.h b/ydb/services/ext_index/metadata/fetcher.h
new file mode 100644
index 0000000000..d4e34a29c8
--- /dev/null
+++ b/ydb/services/ext_index/metadata/fetcher.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include "snapshot.h"
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+class TFetcher: public NFetcher::TSnapshotsFetcher<TSnapshot> {
+protected:
+ virtual std::vector<IClassBehaviour::TPtr> DoGetManagers() const override;
+public:
+};
+
+}
diff --git a/ydb/services/ext_index/metadata/initializer.cpp b/ydb/services/ext_index/metadata/initializer.cpp
new file mode 100644
index 0000000000..d18a0e21ac
--- /dev/null
+++ b/ydb/services/ext_index/metadata/initializer.cpp
@@ -0,0 +1,48 @@
+#include "initializer.h"
+#include "object.h"
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+void TInitializer::DoPrepare(NInitializer::IInitializerInput::TPtr controller) const {
+ TVector<NInitializer::ITableModifier::TPtr> result;
+ {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(TObject::GetBehaviour()->GetStorageTablePath());
+ request.add_primary_key(TObject::TDecoder::IndexId);
+ request.add_primary_key(TObject::TDecoder::TablePath);
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TObject::TDecoder::IndexId);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UTF8);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TObject::TDecoder::TablePath);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UTF8);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TObject::TDecoder::Active);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::BOOL);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TObject::TDecoder::Delete);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::BOOL);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name(TObject::TDecoder::Extractor);
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UTF8);
+ }
+ result.emplace_back(new NInitializer::TGenericTableModifier<NRequest::TDialogCreateTable>(request, "create"));
+ auto hRequest = TObject::AddHistoryTableScheme(request);
+ result.emplace_back(new NInitializer::TGenericTableModifier<NRequest::TDialogCreateTable>(hRequest, "create_history"));
+ }
+ result.emplace_back(NInitializer::TACLModifierConstructor::GetReadOnlyModifier(TObject::GetBehaviour()->GetStorageTablePath(), "acl"));
+ result.emplace_back(NInitializer::TACLModifierConstructor::GetReadOnlyModifier(TObject::GetBehaviour()->GetStorageHistoryTablePath(), "acl_history"));
+ controller->OnPreparationFinished(result);
+}
+
+}
diff --git a/ydb/services/ext_index/metadata/initializer.h b/ydb/services/ext_index/metadata/initializer.h
new file mode 100644
index 0000000000..3fc41ccdc5
--- /dev/null
+++ b/ydb/services/ext_index/metadata/initializer.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/abstract/initialization.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+class TInitializer: public NInitializer::IInitializationBehaviour {
+protected:
+ virtual void DoPrepare(NInitializer::IInitializerInput::TPtr controller) const override;
+public:
+};
+
+}
diff --git a/ydb/services/ext_index/metadata/manager.cpp b/ydb/services/ext_index/metadata/manager.cpp
new file mode 100644
index 0000000000..cfdd7d681c
--- /dev/null
+++ b/ydb/services/ext_index/metadata/manager.cpp
@@ -0,0 +1,90 @@
+#include "manager.h"
+#include <ydb/services/metadata/manager/ydb_value_operator.h>
+#include <ydb/services/metadata/ds_table/scheme_describe.h>
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+class TPreparationController: public NProvider::ISchemeDescribeController {
+private:
+ NModifications::IAlterPreparationController<TObject>::TPtr ExtController;
+ TObject Object;
+public:
+ TPreparationController(NModifications::IAlterPreparationController<TObject>::TPtr extController, TObject&& object)
+ : ExtController(extController)
+ , Object(std::move(object))
+ {
+
+ }
+
+ virtual void OnDescriptionFailed(const TString& errorMessage, const TString& /*requestId*/) override {
+ ExtController->OnPreparationProblem(errorMessage);
+ }
+ virtual void OnDescriptionSuccess(NMetadata::NProvider::TTableInfo&& result, const TString& /*requestId*/) override {
+ if (!result->ColumnTableInfo) {
+ ExtController->OnPreparationProblem("we cannot use this indexes for data-shard tables");
+ } else if (!Object.TryProvideTtl(result->ColumnTableInfo->Description, nullptr)) {
+ ExtController->OnPreparationProblem("unavailable cs-table ttl type for index construction");
+ } else {
+ ExtController->OnPreparationFinished({ std::move(Object) });
+ }
+ }
+};
+
+void TManager::DoPrepareObjectsBeforeModification(std::vector<TObject>&& patchedObjects,
+ NModifications::IAlterPreparationController<TObject>::TPtr controller,
+ const TInternalModificationContext& /*context*/) const {
+ if (patchedObjects.size() != 1) {
+ controller->OnPreparationProblem("modification possible for one object only");
+ return;
+ }
+ const TString path = patchedObjects.front().GetTablePath();
+ std::shared_ptr<TPreparationController> pController = std::make_shared<TPreparationController>(controller, std::move(patchedObjects.front()));
+ TActivationContext::Register(new NProvider::TSchemeDescriptionActor(pController, "", path));
+}
+
+NModifications::TOperationParsingResult TManager::DoBuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings,
+ TInternalModificationContext& context) const {
+ if (context.GetActivityType() == IOperationsManager::EActivityType::Alter) {
+ return "index modification currently unsupported";
+ } else {
+ NInternal::TTableRecord result;
+ TStringBuf sb(settings.GetObjectId().data(), settings.GetObjectId().size());
+ TStringBuf l;
+ TStringBuf r;
+ if (!sb.TrySplit(':', l, r)) {
+ return "incorrect objectId format (path:index_id)";
+ }
+ result.SetColumn(TObject::TDecoder::TablePath, NInternal::TYDBValue::Utf8(l));
+ result.SetColumn(TObject::TDecoder::IndexId, NInternal::TYDBValue::Utf8(r));
+
+ if (context.GetActivityType() == IOperationsManager::EActivityType::Drop) {
+ context.SetActivityType(IOperationsManager::EActivityType::Alter);
+ result.SetColumn(TObject::TDecoder::Delete, NInternal::TYDBValue::Bool(true));
+ } else if (context.GetActivityType() == IOperationsManager::EActivityType::Create) {
+ if (auto dValue = settings.GetFeature<bool>(TObject::TDecoder::Delete, false)) {
+ result.SetColumn(TObject::TDecoder::Delete, NInternal::TYDBValue::Bool(*dValue));
+ } else {
+ return "'delete' flag is incorrect";
+ }
+
+ if (auto extractorStr = settings.GetFeature<TString>(TObject::TDecoder::Extractor)) {
+ TInterfaceContainer<IIndexExtractor> object;
+ if (!object.DeserializeFromJson(*extractorStr)) {
+ return "cannot parse extractor info";
+ }
+ result.SetColumn(TObject::TDecoder::Extractor, NInternal::TYDBValue::Utf8(object.SerializeToJson().GetStringRobust()));
+ } else {
+ return "cannot found extractor info";
+ }
+ if (auto aValue = settings.GetFeature<bool>(TObject::TDecoder::Active, false)) {
+ result.SetColumn(TObject::TDecoder::Active, NInternal::TYDBValue::Bool(*aValue));
+ } else {
+ return "'active' flag is incorrect";
+ }
+ }
+
+ return result;
+ }
+}
+
+}
diff --git a/ydb/services/ext_index/metadata/manager.h b/ydb/services/ext_index/metadata/manager.h
new file mode 100644
index 0000000000..8a97b4e561
--- /dev/null
+++ b/ydb/services/ext_index/metadata/manager.h
@@ -0,0 +1,21 @@
+#pragma once
+#include "object.h"
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/manager/generic_manager.h>
+#include <ydb/library/accessor/accessor.h>
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+class TManager: public NModifications::TGenericOperationsManager<TObject> {
+protected:
+ virtual void DoPrepareObjectsBeforeModification(std::vector<TObject>&& patchedObjects,
+ NModifications::IAlterPreparationController<TObject>::TPtr controller,
+ const TInternalModificationContext& context) const override;
+
+ virtual NModifications::TOperationParsingResult DoBuildPatchFromSettings(
+ const NYql::TObjectSettingsImpl& settings, TInternalModificationContext& context) const override;
+
+public:
+};
+
+}
diff --git a/ydb/services/ext_index/metadata/object.cpp b/ydb/services/ext_index/metadata/object.cpp
new file mode 100644
index 0000000000..85185cd5bc
--- /dev/null
+++ b/ydb/services/ext_index/metadata/object.cpp
@@ -0,0 +1,92 @@
+#include "object.h"
+#include "behaviour.h"
+#include <ydb/core/tx/sharding/sharding.h>
+#include <ydb/services/metadata/manager/ydb_value_operator.h>
+
+#include <util/folder/path.h>
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+TExtractorCityHash64::TFactory::TRegistrator<TExtractorCityHash64> TExtractorCityHash64::Registrator(TExtractorCityHash64::ClassName);
+
+IClassBehaviour::TPtr TObject::GetBehaviour() {
+ return TBehaviour::GetInstance();
+}
+
+bool TObject::DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue) {
+ if (!decoder.Read(decoder.GetIndexIdIdx(), IndexId, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetTablePathIdx(), TablePath, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetActiveIdx(), ActiveFlag, rawValue)) {
+ return false;
+ }
+ if (!decoder.Read(decoder.GetDeleteIdx(), DeleteFlag, rawValue)) {
+ return false;
+ }
+ if (!decoder.ReadFromJson(decoder.GetExtractorIdx(), Extractor, rawValue)) {
+ return false;
+ }
+ return true;
+}
+
+NKikimr::NMetadata::NInternal::TTableRecord TObject::SerializeToRecord() const {
+ NInternal::TTableRecord result;
+ result.SetColumn(TDecoder::IndexId, NInternal::TYDBValue::Utf8(IndexId));
+ result.SetColumn(TDecoder::TablePath, NInternal::TYDBValue::Utf8(TablePath));
+ result.SetColumn(TDecoder::Delete, NInternal::TYDBValue::Bool(DeleteFlag));
+ result.SetColumn(TDecoder::Active, NInternal::TYDBValue::Bool(ActiveFlag));
+ result.SetColumn(TDecoder::Extractor, NInternal::TYDBValue::Utf8(Extractor.SerializeToJson().GetStringRobust()));
+ return result;
+}
+
+std::optional<NKikimr::NMetadata::NCSIndex::TObject> TObject::Build(const TString& tablePath, const TString& indexId, IIndexExtractor::TPtr extractor) {
+ if (!extractor) {
+ return {};
+ }
+ if (!tablePath || !indexId) {
+ return {};
+ }
+ TObject result;
+ result.TablePath = TFsPath(tablePath).Fix().GetPath();
+ result.IndexId = indexId;
+ result.Extractor = TInterfaceContainer<IIndexExtractor>(extractor);
+ return result;
+}
+
+TString TObject::GetIndexTablePath() const {
+ return GetBehaviour()->GetStorageTableDirectory() + "/" + GetUniqueId();
+}
+
+bool TObject::TryProvideTtl(const NKikimrSchemeOp::TColumnTableDescription& csDescription, Ydb::Table::CreateTableRequest* cRequest) {
+ if (csDescription.HasTtlSettings() && csDescription.GetTtlSettings().HasEnabled()) {
+ auto& ttl = csDescription.GetTtlSettings().GetEnabled();
+ if (!ttl.HasExpireAfterSeconds()) {
+ return false;
+ }
+ bool found = false;
+ for (auto&& i : csDescription.GetSchema().GetKeyColumnNames()) {
+ if (ttl.GetColumnName() == i) {
+ found = true;
+ }
+ }
+ if (!found) {
+ return false;
+ }
+ if (cRequest) {
+ auto& newTtl = *cRequest->mutable_ttl_settings()->mutable_date_type_column();
+ newTtl.set_column_name("pk_" + ttl.GetColumnName());
+ newTtl.set_expire_after_seconds(ttl.GetExpireAfterSeconds());
+ }
+ }
+ return true;
+}
+
+std::vector<ui64> TExtractorCityHash64::DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ NSharding::THashSharding hashSharding(0, Fields);
+ return hashSharding.MakeHashes(batch);
+}
+
+}
diff --git a/ydb/services/ext_index/metadata/object.h b/ydb/services/ext_index/metadata/object.h
new file mode 100644
index 0000000000..f188029d05
--- /dev/null
+++ b/ydb/services/ext_index/metadata/object.h
@@ -0,0 +1,203 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/services/metadata/abstract/decoder.h>
+#include <ydb/services/metadata/abstract/kqp_common.h>
+#include <ydb/services/metadata/manager/object.h>
+#include <ydb/public/api/protos/ydb_value.pb.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <library/cpp/object_factory/object_factory.h>
+#include <library/cpp/json/writer/json_value.h>
+#include <library/cpp/json/json_reader.h>
+#include <util/string/cast.h>
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+class IIndexExtractor {
+protected:
+ virtual std::vector<ui64> DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const = 0;
+ virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) = 0;
+ virtual NJson::TJsonValue DoSerializeToJson() const = 0;
+public:
+ using TPtr = std::shared_ptr<IIndexExtractor>;
+ using TFactory = NObjectFactory::TObjectFactory<IIndexExtractor, TString>;
+
+ virtual ~IIndexExtractor() = default;
+
+ std::vector<ui64> ExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const {
+ return DoExtractIndex(batch);
+ }
+
+ bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
+ return DoDeserializeFromJson(jsonInfo);
+ }
+
+ NJson::TJsonValue SerializeToJson() const {
+ return DoSerializeToJson();
+ }
+
+ virtual TString GetClassName() const = 0;
+};
+
+class TExtractorCityHash64: public IIndexExtractor {
+private:
+ YDB_READONLY_DEF(std::vector<TString>, Fields);
+ static TFactory::TRegistrator<TExtractorCityHash64> Registrator;
+protected:
+ virtual std::vector<ui64> DoExtractIndex(const std::shared_ptr<arrow::RecordBatch>& batch) const override;
+ virtual bool DoDeserializeFromJson(const NJson::TJsonValue& jsonInfo) override {
+ const NJson::TJsonValue::TArray* jsonFields;
+ if (!jsonInfo["fields"].GetArrayPointer(&jsonFields)) {
+ return false;
+ }
+ for (auto&& i : *jsonFields) {
+ TString fieldId;
+ if (!i["id"].GetString(&fieldId)) {
+ return false;
+ }
+ Fields.emplace_back(fieldId);
+ }
+ if (Fields.size() == 0) {
+ return false;
+ }
+ return true;
+ }
+ virtual NJson::TJsonValue DoSerializeToJson() const override {
+ NJson::TJsonValue result;
+ auto& jsonFields = result.InsertValue("fields", NJson::JSON_ARRAY);
+ for (auto&& i : Fields) {
+ auto& jsonField = jsonFields.AppendValue(NJson::JSON_MAP);
+ jsonField.InsertValue("id", i);
+ }
+ return result;
+ }
+public:
+ static inline TString ClassName = "city64";
+
+ virtual TString GetClassName() const override {
+ return ClassName;
+ }
+};
+
+template <class TInterface>
+class TInterfaceContainer {
+private:
+ using TPtr = typename TInterface::TPtr;
+ using TFactory = typename TInterface::TFactory;
+ TPtr Object;
+public:
+ TInterfaceContainer() = default;
+
+ explicit TInterfaceContainer(TPtr object)
+ : Object(object)
+ {
+
+ }
+
+ const TInterface* operator->() const {
+ return Object.get();
+ }
+
+ TInterface* operator->() {
+ return Object.get();
+ }
+
+ TString DebugString() const {
+ return SerializeToJson().GetStringRobust();
+ }
+
+ bool DeserializeFromJson(const TString& jsonString) {
+ NJson::TJsonValue jsonInfo;
+ if (!NJson::ReadJsonFastTree(jsonString, &jsonInfo)) {
+ return false;
+ }
+ return DeserializeFromJson(jsonInfo);
+ }
+
+ bool DeserializeFromJson(const NJson::TJsonValue& jsonInfo) {
+ TString className;
+ if (!jsonInfo["class_name"].GetString(&className)) {
+ return false;
+ }
+ TPtr result(TFactory::Construct(className));
+ if (!result) {
+ return false;
+ }
+ if (!result->DeserializeFromJson(jsonInfo["object"])) {
+ return false;
+ }
+ Object = result;
+ return true;
+ }
+
+ NJson::TJsonValue SerializeToJson() const {
+ if (!Object) {
+ return NJson::JSON_NULL;
+ }
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("class_name", Object->GetClassName());
+ result.InsertValue("object", Object->SerializeToJson());
+ return result;
+ }
+};
+
+class TObject: public NModifications::TObject<TObject> {
+private:
+ YDB_READONLY_DEF(TString, IndexId);
+
+ YDB_READONLY_DEF(TString, TablePath);
+ YDB_READONLY_FLAG(Active, false);
+ YDB_READONLY_FLAG(Delete, false);
+ YDB_READONLY_DEF(TInterfaceContainer<IIndexExtractor>, Extractor);
+public:
+
+ bool TryProvideTtl(const NKikimrSchemeOp::TColumnTableDescription& csDescription, Ydb::Table::CreateTableRequest* cRequest);
+
+ TString DebugString() const {
+ return TablePath + ";" + IndexId + ";" + Extractor.DebugString() + ";" + ::ToString(ActiveFlag) + ";" + ::ToString(DeleteFlag);
+ }
+
+ TObject() = default;
+ static std::optional<TObject> Build(const TString& tablePath, const TString& indexId, IIndexExtractor::TPtr extractor);
+
+ TString GetIndexTablePath() const;
+
+ TString GetUniqueId() const {
+ return TablePath + "/" + IndexId;
+ }
+
+ class TDecoder: public NInternal::TDecoderBase {
+ private:
+ YDB_ACCESSOR(i32, IndexIdIdx, -1);
+ YDB_ACCESSOR(i32, TablePathIdx, -1);
+ YDB_ACCESSOR(i32, ExtractorIdx, -1);
+ YDB_ACCESSOR(i32, ActiveIdx, -1);
+ YDB_ACCESSOR(i32, DeleteIdx, -1);
+ public:
+ static inline const TString IndexId = "indexId";
+ static inline const TString TablePath = "tablePath";
+ static inline const TString Extractor = "extractor";
+ static inline const TString Active = "active";
+ static inline const TString Delete = "delete";
+
+ TDecoder(const Ydb::ResultSet& rawData) {
+ IndexIdIdx = GetFieldIndex(rawData, IndexId);
+ TablePathIdx = GetFieldIndex(rawData, TablePath);
+ ExtractorIdx = GetFieldIndex(rawData, Extractor);
+ ActiveIdx = GetFieldIndex(rawData, Active);
+ DeleteIdx = GetFieldIndex(rawData, Delete);
+ }
+ };
+
+ static IClassBehaviour::TPtr GetBehaviour();
+
+ bool DeserializeFromRecord(const TDecoder& decoder, const Ydb::Value& rawValue);
+
+ NInternal::TTableRecord SerializeToRecord() const;
+
+ static TString GetTypeId() {
+ return "CS_EXT_INDEX";
+ }
+};
+
+}
diff --git a/ydb/services/ext_index/metadata/snapshot.cpp b/ydb/services/ext_index/metadata/snapshot.cpp
new file mode 100644
index 0000000000..f0dd6feea4
--- /dev/null
+++ b/ydb/services/ext_index/metadata/snapshot.cpp
@@ -0,0 +1,46 @@
+#include "snapshot.h"
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+bool TSnapshot::DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawDataResult) {
+ Y_VERIFY(rawDataResult.result_sets().size() == 1);
+ ParseSnapshotObjects<TObject>(rawDataResult.result_sets()[0], [this](TObject&& s) {
+ const TString tablePath = s.GetTablePath();
+ Objects[tablePath].emplace_back(std::move(s));
+ });
+ return true;
+}
+
+TString TSnapshot::DoSerializeToString() const {
+ TStringBuilder sb;
+ sb << "OBJECTS:";
+ for (auto&& i : Objects) {
+ for (auto&& object : i.second) {
+ sb << object.DebugString() << ";";
+ }
+ }
+ return sb;
+}
+
+void TSnapshot::GetObjectsForActivity(std::vector<TObject>& activation, std::vector<TObject>& remove) const {
+ for (auto&& i : Objects) {
+ for (auto&& object : i.second) {
+ if (!object.IsActive() && !object.IsDelete()) {
+ activation.emplace_back(object);
+ }
+ if (object.IsDelete()) {
+ remove.emplace_back(object);
+ }
+ }
+ }
+}
+
+std::vector<NKikimr::NMetadata::NCSIndex::TObject> TSnapshot::GetIndexes(const TString& tablePath) const {
+ auto it = Objects.find(TFsPath(tablePath).Fix().GetPath());
+ if (it == Objects.end()) {
+ return {};
+ }
+ return it->second;
+}
+
+}
diff --git a/ydb/services/ext_index/metadata/snapshot.h b/ydb/services/ext_index/metadata/snapshot.h
new file mode 100644
index 0000000000..4df6f37c82
--- /dev/null
+++ b/ydb/services/ext_index/metadata/snapshot.h
@@ -0,0 +1,23 @@
+#pragma once
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/library/accessor/accessor.h>
+#include "object.h"
+
+namespace NKikimr::NMetadata::NCSIndex {
+
+class TSnapshot: public NFetcher::ISnapshot {
+private:
+ using TBase = NFetcher::ISnapshot;
+ using TObjects = std::map<TString, std::vector<TObject>>;
+ YDB_READONLY_DEF(TObjects, Objects);
+protected:
+ virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override;
+ virtual TString DoSerializeToString() const override;
+public:
+ using TBase::TBase;
+
+ std::vector<TObject> GetIndexes(const TString& tablePath) const;
+ void GetObjectsForActivity(std::vector<TObject>& activation, std::vector<TObject>& remove) const;
+};
+
+}
diff --git a/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..5f0e02728b
--- /dev/null
+++ b/ydb/services/ext_index/service/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ext_index-service)
+target_link_libraries(services-ext_index-service PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ services-ext_index-metadata
+ services-ext_index-common
+ api-protos
+)
+target_sources(services-ext_index-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_index.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/activation.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/deleting.cpp
+)
diff --git a/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..814de1a95c
--- /dev/null
+++ b/ydb/services/ext_index/service/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ext_index-service)
+target_link_libraries(services-ext_index-service PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ services-ext_index-metadata
+ services-ext_index-common
+ api-protos
+)
+target_sources(services-ext_index-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_index.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/activation.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/deleting.cpp
+)
diff --git a/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..814de1a95c
--- /dev/null
+++ b/ydb/services/ext_index/service/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,26 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ext_index-service)
+target_link_libraries(services-ext_index-service PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ services-ext_index-metadata
+ services-ext_index-common
+ api-protos
+)
+target_sources(services-ext_index-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_index.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/activation.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/deleting.cpp
+)
diff --git a/ydb/services/ext_index/service/CMakeLists.txt b/ydb/services/ext_index/service/CMakeLists.txt
new file mode 100644
index 0000000000..d90657116d
--- /dev/null
+++ b/ydb/services/ext_index/service/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..5f0e02728b
--- /dev/null
+++ b/ydb/services/ext_index/service/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,25 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-ext_index-service)
+target_link_libraries(services-ext_index-service PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ services-ext_index-metadata
+ services-ext_index-common
+ api-protos
+)
+target_sources(services-ext_index-service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_data.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/add_index.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/executor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/activation.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/service/deleting.cpp
+)
diff --git a/ydb/services/ext_index/service/activation.cpp b/ydb/services/ext_index/service/activation.cpp
new file mode 100644
index 0000000000..3842b0505b
--- /dev/null
+++ b/ydb/services/ext_index/service/activation.cpp
@@ -0,0 +1,97 @@
+#include "activation.h"
+
+namespace NKikimr::NCSIndex {
+
+NKikimr::NMetadata::NRequest::TDialogYQLRequest::TRequest TActivation::BuildUpdateRequest() const {
+ Ydb::Table::ExecuteDataQueryRequest request;
+ TStringBuilder sb;
+ sb << "DECLARE $indexId AS Utf8;" << Endl;
+ sb << "DECLARE $tablePath AS Utf8;" << Endl;
+ sb << "UPDATE `" + NMetadata::NCSIndex::TObject::GetBehaviour()->GetStorageTablePath() + "`" << Endl;
+ sb << "SET active = true" << Endl;
+ sb << "WHERE indexId = $indexId" << Endl;
+ sb << "AND tablePath = $tablePath" << Endl;
+ request.mutable_query()->set_yql_text(sb);
+
+ {
+ auto& param = (*request.mutable_parameters())["$indexId"];
+ param.mutable_value()->set_text_value(Object.GetIndexId());
+ param.mutable_type()->set_type_id(Ydb::Type::UTF8);
+ }
+ {
+ auto& param = (*request.mutable_parameters())["$tablePath"];
+ param.mutable_value()->set_text_value(Object.GetTablePath());
+ param.mutable_type()->set_type_id(Ydb::Type::UTF8);
+ }
+
+ request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write();
+ request.mutable_tx_control()->set_commit_tx(true);
+
+ return request;
+}
+
+void TActivation::OnDescriptionSuccess(NMetadata::NProvider::TTableInfo&& result, const TString& /*requestId*/) {
+ if (!result->ColumnTableInfo) {
+ ExternalController->OnActivationFailed("incorrect column table info", RequestId);
+ SelfContainer = nullptr;
+ return;
+ }
+ Ydb::Table::CreateTableRequest request;
+ if (!Object.TryProvideTtl(result->ColumnTableInfo->Description, &request)) {
+ ExternalController->OnActivationFailed("cannot convert ttl method from column tables", RequestId);
+ SelfContainer = nullptr;
+ return;
+ }
+
+ request.set_session_id("");
+ request.set_path(Object.GetIndexTablePath());
+ request.add_primary_key("index_hash");
+ auto pkFields = result.GetPKFields();
+ for (auto&& i : pkFields) {
+ request.add_primary_key("pk_" + i.Name);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("index_hash");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::UINT64);
+ }
+ for (auto&& i : pkFields) {
+ auto& column = *request.add_columns();
+ column.set_name("pk_" + i.Name);
+ auto primitiveType = NMetadata::NInternal::TYDBType::ConvertYQLToYDB(i.PType.GetTypeId());
+ if (!primitiveType) {
+ ExternalController->OnActivationFailed("cannot convert type yql -> ydb", RequestId);
+ SelfContainer = nullptr;
+ return;
+ }
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(*primitiveType);
+ }
+ NMetadata::NInitializer::TGenericTableModifier<NMetadata::NRequest::TDialogCreateTable> modifier(request, "create");
+ modifier.Execute(SelfContainer, Config.GetRequestConfig());
+}
+
+void TActivation::OnModificationFinished(const TString& modificationId) {
+ if (modificationId == "create") {
+ NMetadata::NInitializer::ITableModifier::TPtr modifier =
+ NMetadata::NInitializer::TACLModifierConstructor::GetReadOnlyModifier(Object.GetIndexTablePath(), "access");
+ modifier->Execute(SelfContainer, Config.GetRequestConfig());
+ } else if (modificationId == "access") {
+ TActivationContext::ActorSystem()->Register(new NMetadata::NRequest::TSessionedActorCallback<NMetadata::NRequest::TDialogYQLRequest>(
+ BuildUpdateRequest(), Config.GetRequestConfig(), NACLib::TSystemUsers::Metadata(), SelfContainer));
+ } else {
+ Y_VERIFY(false);
+ }
+}
+
+void TActivation::OnModificationFailed(const TString& errorMessage, const TString& /*modificationId*/) {
+ ExternalController->OnActivationFailed(errorMessage, RequestId);
+ SelfContainer = nullptr;
+}
+
+void TActivation::Start(std::shared_ptr<TActivation> selfContainer) {
+ Y_VERIFY(!!selfContainer);
+ SelfContainer = selfContainer;
+ TActivationContext::ActorSystem()->Register(new NMetadata::NProvider::TSchemeDescriptionActor(SelfContainer, RequestId, Object.GetTablePath()));
+}
+
+}
diff --git a/ydb/services/ext_index/service/activation.h b/ydb/services/ext_index/service/activation.h
new file mode 100644
index 0000000000..a7dcaf74ff
--- /dev/null
+++ b/ydb/services/ext_index/service/activation.h
@@ -0,0 +1,68 @@
+#pragma once
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/services/ext_index/metadata/object.h>
+#include <ydb/services/metadata/ds_table/scheme_describe.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <ydb/services/ext_index/common/config.h>
+
+namespace NKikimr::NCSIndex {
+
+class IActivationExternalController {
+public:
+ using TPtr = std::shared_ptr<IActivationExternalController>;
+ virtual ~IActivationExternalController() = default;
+ virtual void OnActivationFailed(const TString& errorMessage, const TString& requestId) = 0;
+ virtual void OnActivationSuccess(const TString& requestId) = 0;
+};
+
+class TActivation: public NMetadata::NProvider::ISchemeDescribeController,
+ public NMetadata::NRequest::IExternalController<NMetadata::NRequest::TDialogYQLRequest>,
+ public NMetadata::NInitializer::IModifierExternalController
+{
+private:
+ mutable std::shared_ptr<TActivation> SelfContainer;
+ NMetadata::NCSIndex::TObject Object;
+ IActivationExternalController::TPtr ExternalController;
+ const TString RequestId;
+ const TConfig Config;
+
+ NKikimr::NMetadata::NRequest::TDialogYQLRequest::TRequest BuildUpdateRequest() const;
+
+protected:
+ virtual void OnDescriptionFailed(const TString& errorMessage, const TString& requestId) override {
+ ExternalController->OnActivationFailed(errorMessage, requestId);
+ SelfContainer = nullptr;
+ }
+ virtual void OnDescriptionSuccess(NMetadata::NProvider::TTableInfo&& result, const TString& requestId) override;
+
+ virtual void OnModificationFinished(const TString& modificationId) override;
+
+ virtual void OnModificationFailed(const TString& errorMessage, const TString& modificationId) override;
+
+ virtual void OnRequestResult(NMetadata::NRequest::TDialogYQLRequest::TResponse&& /*result*/) override {
+ ExternalController->OnActivationSuccess(RequestId);
+ SelfContainer = nullptr;
+ }
+
+ virtual void OnRequestFailed(const TString& errorMessage) override {
+ ExternalController->OnActivationFailed(errorMessage, RequestId);
+ SelfContainer = nullptr;
+ }
+public:
+ void Start(std::shared_ptr<TActivation> selfContainer);
+
+ TActivation(const NMetadata::NCSIndex::TObject& object,
+ IActivationExternalController::TPtr externalController,
+ const TString& requestId, const TConfig& config)
+ : Object(object)
+ , ExternalController(externalController)
+ , RequestId(requestId)
+ , Config(config)
+ {
+
+ }
+
+};
+
+}
diff --git a/ydb/services/ext_index/service/add_data.cpp b/ydb/services/ext_index/service/add_data.cpp
new file mode 100644
index 0000000000..0322940f21
--- /dev/null
+++ b/ydb/services/ext_index/service/add_data.cpp
@@ -0,0 +1,38 @@
+#include "add_data.h"
+
+namespace NKikimr::NCSIndex {
+
+void TDataUpserter::OnDescriptionSuccess(NMetadata::NProvider::TTableInfo&& result, const TString& /*requestId*/) {
+ Y_VERIFY(SelfContainer);
+ const std::vector<TString> pkFields = result.GetPKFieldNames();
+ AtomicCounter.Inc();
+ for (auto&& i : Indexes) {
+ if (i.IsDelete()) {
+ ALS_WARN(NKikimrServices::EXT_INDEX) << "extractor is removing";
+ } else if (!i.IsActive()) {
+ ALS_WARN(NKikimrServices::EXT_INDEX) << "extractor not active yet";
+ } else {
+ ALS_DEBUG(NKikimrServices::EXT_INDEX) << "add data";
+ AtomicCounter.Inc();
+ TActivationContext::ActorSystem()->Register(new TIndexUpsertActor(Data, i, pkFields, i.GetIndexTablePath(), SelfContainer));
+ }
+ }
+ OnIndexUpserted();
+}
+
+void TDataUpserter::Start(std::shared_ptr<TDataUpserter> selfContainer) {
+ if (Indexes.empty()) {
+ ExternalController->OnAllIndexesUpserted();
+ } else {
+ for (auto&& i : Indexes) {
+ if (Indexes.front().GetTablePath() != i.GetTablePath()) {
+ ExternalController->OnAllIndexesUpsertionFailed("inconsistency tables list in indexes");
+ return;
+ }
+ }
+ SelfContainer = selfContainer;
+ TActivationContext::ActorSystem()->Register(new NMetadata::NProvider::TSchemeDescriptionActor(SelfContainer, "", Indexes.front().GetTablePath()));
+ }
+}
+
+}
diff --git a/ydb/services/ext_index/service/add_data.h b/ydb/services/ext_index/service/add_data.h
new file mode 100644
index 0000000000..3f9015320c
--- /dev/null
+++ b/ydb/services/ext_index/service/add_data.h
@@ -0,0 +1,55 @@
+#pragma once
+#include <ydb/services/ext_index/metadata/object.h>
+#include <ydb/services/ext_index/common/service.h>
+#include <ydb/services/metadata/ds_table/scheme_describe.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include "add_index.h"
+
+namespace NKikimr::NCSIndex {
+
+class TDataUpserter:
+ public IIndexUpsertController,
+ public NMetadata::NProvider::ISchemeDescribeController {
+private:
+ mutable std::shared_ptr<TDataUpserter> SelfContainer;
+ std::vector<NMetadata::NCSIndex::TObject> Indexes;
+ mutable TAtomicCounter AtomicCounter = 0;
+ IDataUpsertController::TPtr ExternalController;
+ std::shared_ptr<arrow::RecordBatch> Data;
+public:
+ TDataUpserter(std::vector<NMetadata::NCSIndex::TObject>&& indexes,
+ IDataUpsertController::TPtr externalController, std::shared_ptr<arrow::RecordBatch> data)
+ : Indexes(std::move(indexes))
+ , ExternalController(externalController)
+ , Data(data)
+ {
+
+ }
+
+ virtual void OnIndexUpserted() override {
+ if (SelfContainer && AtomicCounter.Dec() == 0) {
+ ExternalController->OnAllIndexesUpserted();
+ SelfContainer = nullptr;
+ }
+ }
+ virtual void OnIndexUpsertionFailed(const TString& errorMessage) override {
+ if (SelfContainer) {
+ ExternalController->OnAllIndexesUpsertionFailed("IndexUpsertion:" + errorMessage);
+ SelfContainer = nullptr;
+ }
+ }
+
+ virtual void OnDescriptionFailed(const TString& errorMessage, const TString& /*requestId*/) override {
+ if (SelfContainer) {
+ ExternalController->OnAllIndexesUpsertionFailed("SchemeDescription:" + errorMessage);
+ SelfContainer = nullptr;
+ }
+ }
+
+ virtual void OnDescriptionSuccess(NMetadata::NProvider::TTableInfo&& result, const TString& requestId) override;
+
+ void Start(std::shared_ptr<TDataUpserter> selfContainer);
+};
+
+}
diff --git a/ydb/services/ext_index/service/add_index.cpp b/ydb/services/ext_index/service/add_index.cpp
new file mode 100644
index 0000000000..d8c2c1dd51
--- /dev/null
+++ b/ydb/services/ext_index/service/add_index.cpp
@@ -0,0 +1,127 @@
+#include "add_index.h"
+#include <ydb/core/tx/tx_proxy/upload_rows.h>
+#include <ydb/core/formats/arrow_helpers.h>
+#include <ydb/core/scheme/scheme_tablecell.h>
+
+#include <ydb/public/api/protos/ydb_status_codes.pb.h>
+#include <ydb/services/metadata/manager/ydb_value_operator.h>
+
+namespace NKikimr::NCSIndex {
+
+namespace {
+class TCellsWriter: public NArrow::IRowWriter {
+private:
+ std::shared_ptr<NTxProxy::TUploadRows> Rows = std::make_shared<NTxProxy::TUploadRows>();
+ const std::vector<ui64>& Hashes;
+ ui32 Index = 0;
+ const ui32 PKColumnsCount;
+public:
+ TCellsWriter(const ui32 rowsCount, const ui32 pkColumnsCount, const std::vector<ui64>& hashes)
+ : Hashes(hashes)
+ , PKColumnsCount(pkColumnsCount)
+ {
+ Rows->reserve(rowsCount);
+ }
+
+ std::shared_ptr<NTxProxy::TUploadRows> GetRows() const {
+ return Rows;
+ }
+
+ virtual void AddRow(const TConstArrayRef<TCell>& cells) override {
+ TVector<TCell> key;
+ key.reserve(1 + PKColumnsCount);
+ key.emplace_back(TCell::Make(Hashes[Index]));
+ Y_VERIFY(PKColumnsCount == cells.size());
+ for (auto&& c : cells) {
+ key.emplace_back(c);
+ }
+ TSerializedCellVec serializedKey(TSerializedCellVec::Serialize(key));
+ Rows->emplace_back(serializedKey, "");
+ ++Index;
+ }
+};
+}
+
+void TIndexUpsertActor::Bootstrap() {
+ std::shared_ptr<NTxProxy::TUploadTypes> types = std::make_shared<NTxProxy::TUploadTypes>();
+ std::vector<std::pair<TString, NScheme::TTypeInfo>> yqlTypes;
+
+ std::vector<std::shared_ptr<arrow::Array>> pkColumns;
+ for (auto&& i : PKFields) {
+ auto f = Data->schema()->GetFieldByName(i);
+ if (!f) {
+ ExternalController->OnIndexUpsertionFailed("incorrect field for pk");
+ PassAway();
+ return;
+ }
+ pkColumns.emplace_back(Data->GetColumnByName(i));
+ switch (f->type()->id()) {
+ case arrow::Type::INT64:
+ types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::INT64));
+ break;
+ case arrow::Type::INT32:
+ types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::INT32));
+ break;
+ case arrow::Type::STRING:
+ types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::UTF8));
+ break;
+ case arrow::Type::BINARY:
+ types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::STRING));
+ break;
+ case arrow::Type::UINT64:
+ types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::UINT64));
+ break;
+ case arrow::Type::UINT32:
+ types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::UINT32));
+ break;
+ case arrow::Type::TIMESTAMP:
+ types->emplace_back(i, NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::TIMESTAMP));
+ break;
+ default:
+ ExternalController->OnIndexUpsertionFailed("incorrect type for pk column");
+ PassAway();
+ return;
+ }
+ }
+
+ const std::vector<ui64> hashes = IndexInfo.GetExtractor()->ExtractIndex(Data);
+ if (hashes.size() != (size_t)Data->num_rows()) {
+ ExternalController->OnIndexUpsertionFailed("inconsistency hashes");
+ PassAway();
+ return;
+ }
+ TCellsWriter writer(Data->num_rows(), pkColumns.size(), hashes);
+ {
+ auto yqlTypes = NMetadata::NInternal::TYDBType::ConvertYDBToYQL(*types);
+ if (!yqlTypes) {
+ ExternalController->OnIndexUpsertionFailed("cannot convert types ydb -> yql");
+ PassAway();
+ return;
+ }
+ NArrow::TArrowToYdbConverter ydbConverter(*yqlTypes, writer);
+ TString errorMessage;
+ if (!ydbConverter.Process(*Data, errorMessage)) {
+ ExternalController->OnIndexUpsertionFailed(errorMessage);
+ PassAway();
+ return;
+ }
+ }
+ for (auto&& i : *types) {
+ i.first = "pk_" + i.first;
+ }
+ types->insert(types->begin(), std::make_pair("index_hash", NMetadata::NInternal::TYDBType::Primitive(Ydb::Type::UINT64)));
+ auto actor = NTxProxy::CreateUploadRowsInternal(SelfId(), IndexTablePath, types, writer.GetRows());
+ Become(&TIndexUpsertActor::StateMain);
+ Register(actor);
+}
+
+void TIndexUpsertActor::Handle(TEvTxUserProxy::TEvUploadRowsResponse::TPtr& ev) {
+ auto g = PassAwayGuard();
+ if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) {
+ ExternalController->OnIndexUpserted();
+ } else {
+ ExternalController->OnIndexUpsertionFailed("incorrect response code:" + ev->Get()->Issues.ToString());
+ }
+}
+
+}
diff --git a/ydb/services/ext_index/service/add_index.h b/ydb/services/ext_index/service/add_index.h
new file mode 100644
index 0000000000..cc3e23ce66
--- /dev/null
+++ b/ydb/services/ext_index/service/add_index.h
@@ -0,0 +1,51 @@
+#pragma once
+#include <ydb/services/ext_index/metadata/object.h>
+#include <ydb/services/metadata/ds_table/scheme_describe.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/services/ext_index/common/service.h>
+
+namespace NKikimr::NCSIndex {
+
+class IIndexUpsertController {
+public:
+ using TPtr = std::shared_ptr<IIndexUpsertController>;
+ virtual void OnIndexUpserted() = 0;
+ virtual void OnIndexUpsertionFailed(const TString& errorMessage) = 0;
+};
+
+class TIndexUpsertActor: public NActors::TActorBootstrapped<TIndexUpsertActor> {
+private:
+ std::shared_ptr<arrow::RecordBatch> Data;
+ const NMetadata::NCSIndex::TObject IndexInfo;
+ std::vector<TString> PKFields;
+ TString IndexTablePath;
+ IIndexUpsertController::TPtr ExternalController;
+protected:
+ void Handle(TEvTxUserProxy::TEvUploadRowsResponse::TPtr& ev);
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxUserProxy::TEvUploadRowsResponse, Handle);
+ default:
+ break;
+ }
+ }
+
+public:
+ void Bootstrap();
+
+ TIndexUpsertActor(const std::shared_ptr<arrow::RecordBatch>& data, const NMetadata::NCSIndex::TObject& indexInfo,
+ const std::vector<TString>& pKFields, const TString& indexTablePath, IIndexUpsertController::TPtr externalController)
+ : Data(data)
+ , IndexInfo(indexInfo)
+ , PKFields(pKFields)
+ , IndexTablePath(indexTablePath)
+ , ExternalController(externalController)
+ {
+
+ }
+
+};
+
+}
diff --git a/ydb/services/ext_index/service/deleting.cpp b/ydb/services/ext_index/service/deleting.cpp
new file mode 100644
index 0000000000..a357b0b146
--- /dev/null
+++ b/ydb/services/ext_index/service/deleting.cpp
@@ -0,0 +1,54 @@
+#include "deleting.h"
+#include <ydb/services/metadata/initializer/common.h>
+
+namespace NKikimr::NCSIndex {
+
+NKikimr::NMetadata::NRequest::TDialogYQLRequest::TRequest TDeleting::BuildDeleteRequest() const {
+ Ydb::Table::ExecuteDataQueryRequest request;
+ TStringBuilder sb;
+ sb << "DECLARE $indexId AS Utf8;" << Endl;
+ sb << "DECLARE $tablePath AS Utf8;" << Endl;
+ sb << "DELETE FROM `" + NMetadata::NCSIndex::TObject::GetBehaviour()->GetStorageTablePath() + "`" << Endl;
+ sb << "WHERE indexId = $indexId" << Endl;
+ sb << "AND tablePath = $tablePath" << Endl;
+ request.mutable_query()->set_yql_text(sb);
+
+ {
+ auto& param = (*request.mutable_parameters())["$indexId"];
+ param.mutable_value()->set_text_value(Object.GetIndexId());
+ param.mutable_type()->set_type_id(Ydb::Type::UTF8);
+ }
+ {
+ auto& param = (*request.mutable_parameters())["$tablePath"];
+ param.mutable_value()->set_text_value(Object.GetTablePath());
+ param.mutable_type()->set_type_id(Ydb::Type::UTF8);
+ }
+
+ request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write();
+ request.mutable_tx_control()->set_commit_tx(true);
+
+ return request;
+}
+
+void TDeleting::Start(std::shared_ptr<TDeleting> selfContainer) {
+ Y_VERIFY(!!selfContainer);
+ SelfContainer = selfContainer;
+
+ Ydb::Table::DropTableRequest request;
+ request.set_session_id("");
+ request.set_path(Object.GetIndexTablePath());
+ NMetadata::NInitializer::TGenericTableModifier<NMetadata::NRequest::TDialogDropTable> dropTable(request, "drop");
+ dropTable.Execute(SelfContainer, Config.GetRequestConfig());
+}
+
+void TDeleting::OnModificationFinished(const TString& /*modificationId*/) {
+ TActivationContext::ActorSystem()->Register(new NMetadata::NRequest::TSessionedActorCallback<NMetadata::NRequest::TDialogYQLRequest>(
+ BuildDeleteRequest(), Config.GetRequestConfig(), NACLib::TSystemUsers::Metadata(), SelfContainer));
+}
+
+void TDeleting::OnModificationFailed(const TString& errorMessage, const TString& /*modificationId*/) {
+ ExternalController->OnDeletingFailed(errorMessage, RequestId);
+ SelfContainer = nullptr;
+}
+
+}
diff --git a/ydb/services/ext_index/service/deleting.h b/ydb/services/ext_index/service/deleting.h
new file mode 100644
index 0000000000..61ac740c5a
--- /dev/null
+++ b/ydb/services/ext_index/service/deleting.h
@@ -0,0 +1,62 @@
+#pragma once
+#include <ydb/core/tx/tx_proxy/proxy.h>
+
+#include <ydb/services/metadata/request/request_actor.h>
+#include <ydb/services/ext_index/metadata/object.h>
+
+#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <ydb/services/ext_index/common/config.h>
+
+namespace NKikimr::NCSIndex {
+
+class IDeletingExternalController {
+public:
+ using TPtr = std::shared_ptr<IDeletingExternalController>;
+ virtual ~IDeletingExternalController() = default;
+ virtual void OnDeletingFailed(const TString& errorMessage, const TString& requestId) = 0;
+ virtual void OnDeletingSuccess(const TString& requestId) = 0;
+};
+
+class TDeleting:
+ public NMetadata::NRequest::IExternalController<NMetadata::NRequest::TDialogYQLRequest>,
+ public NMetadata::NInitializer::IModifierExternalController
+{
+private:
+ std::shared_ptr<TDeleting> SelfContainer;
+ NMetadata::NCSIndex::TObject Object;
+ IDeletingExternalController::TPtr ExternalController;
+ const TString RequestId;
+ const TConfig Config;
+
+ NKikimr::NMetadata::NRequest::TDialogYQLRequest::TRequest BuildDeleteRequest() const;
+
+protected:
+ virtual void OnModificationFinished(const TString& modificationId) override;
+
+ virtual void OnModificationFailed(const TString& errorMessage, const TString& modificationId) override;
+
+ virtual void OnRequestFailed(const TString& errorMessage) override {
+ ExternalController->OnDeletingFailed(errorMessage, RequestId);
+ SelfContainer = nullptr;
+ }
+
+ virtual void OnRequestResult(NMetadata::NRequest::TDialogYQLRequest::TResponse&& /*result*/) override {
+ ExternalController->OnDeletingSuccess(RequestId);
+ SelfContainer = nullptr;
+ }
+public:
+ void Start(std::shared_ptr<TDeleting> selfContainer);
+
+ TDeleting(const NMetadata::NCSIndex::TObject& object,
+ IDeletingExternalController::TPtr externalController, const TString& requestId, const TConfig& config)
+ : Object(object)
+ , ExternalController(externalController)
+ , RequestId(requestId)
+ , Config(config)
+ {
+
+ }
+
+};
+
+}
diff --git a/ydb/services/ext_index/service/executor.cpp b/ydb/services/ext_index/service/executor.cpp
new file mode 100644
index 0000000000..8491b52c80
--- /dev/null
+++ b/ydb/services/ext_index/service/executor.cpp
@@ -0,0 +1,116 @@
+#include "executor.h"
+
+#include <ydb/services/ext_index/metadata/fetcher.h>
+#include <ydb/services/metadata/initializer/fetcher.h>
+#include <ydb/services/metadata/initializer/manager.h>
+#include <ydb/services/metadata/service.h>
+#include <ydb/services/ext_index/common/service.h>
+#include "add_data.h"
+#include "activation.h"
+#include "deleting.h"
+
+namespace NKikimr::NCSIndex {
+
+void TExecutor::Handle(TEvAddData::TPtr& ev) {
+ if (CheckActivity()) {
+ std::vector<NMetadata::NCSIndex::TObject> indexes = IndexesSnapshot->GetIndexes(ev->Get()->GetTablePath());
+ std::shared_ptr<TDataUpserter> upserter = std::make_shared<TDataUpserter>(std::move(indexes), ev->Get()->GetExternalController(), ev->Get()->GetData());
+ upserter->Start(upserter);
+ } else {
+ if (!DeferredEventsOnIntialization.Add(*ev)) {
+ Send(ev->Sender, new TEvAddDataResult("too big queue for index construction"));
+ }
+ }
+}
+
+void TExecutor::Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr& /*ev*/) {
+ ActivityState = EActivity::Active;
+ if (IndexesSnapshot) {
+ DeferredEventsOnIntialization.ResendAll(SelfId());
+ }
+}
+
+class TIndexesController: public IActivationExternalController, public IDeletingExternalController {
+public:
+ virtual void OnActivationFailed(const TString& errorMessage, const TString& requestId) override {
+ ALS_ERROR(NKikimrServices::EXT_INDEX) << "cannot activate index for " << requestId << ": " << errorMessage;
+ }
+ virtual void OnActivationSuccess(const TString& requestId) override {
+ ALS_NOTICE(NKikimrServices::EXT_INDEX) << "index activated " << requestId;
+ }
+ virtual void OnDeletingFailed(const TString& errorMessage, const TString& requestId) override {
+ ALS_ERROR(NKikimrServices::EXT_INDEX) << "cannot remove index for " << requestId << ": " << errorMessage;
+ }
+ virtual void OnDeletingSuccess(const TString& requestId) override {
+ ALS_NOTICE(NKikimrServices::EXT_INDEX) << "index deleted " << requestId;
+ }
+
+};
+
+void TExecutor::Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
+ {
+ auto snapshot = ev->Get()->GetSnapshotPtrAs<NMetadata::NInitializer::TSnapshot>();
+ if (snapshot) {
+ if (snapshot->HasComponent("ext_index")) {
+ CheckActivity();
+ }
+ return;
+ }
+ }
+ {
+ auto snapshot = ev->Get()->GetSnapshotPtrAs<NMetadata::NCSIndex::TSnapshot>();
+ if (snapshot) {
+ IndexesSnapshot = snapshot;
+ if (CheckActivity()) {
+ DeferredEventsOnIntialization.ResendAll(SelfId());
+ }
+ std::vector<NMetadata::NCSIndex::TObject> inactiveIndexes;
+ std::vector<NMetadata::NCSIndex::TObject> deletingIndexes;
+ IndexesSnapshot->GetObjectsForActivity(inactiveIndexes, deletingIndexes);
+ auto controller = std::make_shared<TIndexesController>();
+ for (auto&& i : inactiveIndexes) {
+ auto activation = std::make_shared<TActivation>(i, controller, i.GetUniqueId(), Config);
+ activation->Start(activation);
+ }
+ for (auto&& i : deletingIndexes) {
+ auto deleting = std::make_shared<TDeleting>(i, controller, i.GetUniqueId(), Config);
+ deleting->Start(deleting);
+ }
+ return;
+ }
+ }
+ Y_VERIFY(false, "unexpected snapshot");
+}
+
+void TExecutor::Bootstrap() {
+ Become(&TExecutor::StateMain);
+ auto managerInitializer = std::make_shared<NMetadata::NInitializer::TFetcher>();
+ Y_VERIFY(NMetadata::NProvider::TServiceOperator::IsEnabled(), "metadata service not active");
+ Sender<NMetadata::NProvider::TEvSubscribeExternal>(managerInitializer).SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()));
+
+ auto managerIndexes = std::make_shared<NMetadata::NCSIndex::TFetcher>();
+ Sender<NMetadata::NProvider::TEvSubscribeExternal>(managerIndexes).SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()));
+}
+
+bool TExecutor::CheckActivity() {
+ switch (ActivityState) {
+ case EActivity::Created:
+ ActivityState = EActivity::Preparation;
+ Sender<NMetadata::NProvider::TEvPrepareManager>(NMetadata::NCSIndex::TObject::GetBehaviour()).SendTo(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()));
+ break;
+ case EActivity::Preparation:
+ break;
+ case EActivity::Active:
+ if (!IndexesSnapshot) {
+ return false;
+ }
+ return true;
+ }
+ return false;
+}
+
+NActors::IActor* CreateService(const TConfig& config) {
+ return new TExecutor(config);
+}
+
+}
diff --git a/ydb/services/ext_index/service/executor.h b/ydb/services/ext_index/service/executor.h
new file mode 100644
index 0000000000..bd93c53fc2
--- /dev/null
+++ b/ydb/services/ext_index/service/executor.h
@@ -0,0 +1,58 @@
+#pragma once
+#include <ydb/services/ext_index/common/config.h>
+
+#include <ydb/services/metadata/initializer/accessor_init.h>
+#include <ydb/services/metadata/ds_table/service.h>
+#include <ydb/services/metadata/service.h>
+#include <ydb/services/ext_index/metadata/snapshot.h>
+#include <ydb/services/ext_index/common/service.h>
+
+namespace NKikimr::NCSIndex {
+
+class TExecutor: public NActors::TActorBootstrapped<TExecutor> {
+private:
+ using TBase = NActors::TActorBootstrapped<TExecutor>;
+ TString TableName;
+ const TString ExecutorId = TGUID::CreateTimebased().AsUuidString();
+ const TConfig Config;
+ std::set<TString> CurrentTaskIds;
+ NMetadata::NProvider::TEventsWaiter DeferredEventsOnIntialization;
+ std::shared_ptr<NMetadata::NCSIndex::TSnapshot> IndexesSnapshot;
+
+ enum class EActivity {
+ Created,
+ Preparation,
+ Active
+ };
+
+ EActivity ActivityState = EActivity::Created;
+
+ bool CheckActivity();
+
+protected:
+ void Handle(NMetadata::NProvider::TEvManagerPrepared::TPtr& ev);
+ void Handle(TEvAddData::TPtr& ev);
+ void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev);
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NMetadata::NProvider::TEvManagerPrepared, Handle);
+ hFunc(TEvAddData, Handle);
+ hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle);
+ default:
+ break;
+ }
+ }
+
+public:
+ void Bootstrap();
+
+ TExecutor(const TConfig& config)
+ : Config(config) {
+ TServiceOperator::Register(Config);
+ }
+};
+
+IActor* CreateService(const TConfig& config);
+
+}
diff --git a/ydb/services/ext_index/ut/CMakeLists.darwin-x86_64.txt b/ydb/services/ext_index/ut/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..e586329507
--- /dev/null
+++ b/ydb/services/ext_index/ut/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,82 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-ext_index-ut)
+target_compile_options(ydb-services-ext_index-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-ext_index-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index
+)
+target_link_libraries(ydb-services-ext_index-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-system
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-ext_index
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ core-testlib-default
+ ydb-services-metadata
+ public-lib-yson_value
+)
+target_link_options(ydb-services-ext_index-ut PRIVATE
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-services-ext_index-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/ut/ut_ext_index.cpp
+)
+set_property(
+ TARGET
+ ydb-services-ext_index-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 60
+)
+add_yunittest(
+ NAME
+ ydb-services-ext_index-ut
+ TEST_TARGET
+ ydb-services-ext_index-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ext_index-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ext_index-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ext_index-ut
+ PROPERTY
+ TIMEOUT
+ 600
+)
+vcs_info(ydb-services-ext_index-ut)
diff --git a/ydb/services/ext_index/ut/CMakeLists.linux-aarch64.txt b/ydb/services/ext_index/ut/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..0b356b7816
--- /dev/null
+++ b/ydb/services/ext_index/ut/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,84 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-ext_index-ut)
+target_compile_options(ydb-services-ext_index-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-ext_index-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index
+)
+target_link_libraries(ydb-services-ext_index-ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-jemalloc
+ cpp-testing-unittest_main
+ ydb-services-ext_index
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ core-testlib-default
+ ydb-services-metadata
+ public-lib-yson_value
+)
+target_link_options(ydb-services-ext_index-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-services-ext_index-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/ut/ut_ext_index.cpp
+)
+set_property(
+ TARGET
+ ydb-services-ext_index-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 60
+)
+add_yunittest(
+ NAME
+ ydb-services-ext_index-ut
+ TEST_TARGET
+ ydb-services-ext_index-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ext_index-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ext_index-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ext_index-ut
+ PROPERTY
+ TIMEOUT
+ 600
+)
+vcs_info(ydb-services-ext_index-ut)
diff --git a/ydb/services/ext_index/ut/CMakeLists.linux-x86_64.txt b/ydb/services/ext_index/ut/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..37e0035862
--- /dev/null
+++ b/ydb/services/ext_index/ut/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,86 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-ext_index-ut)
+target_compile_options(ydb-services-ext_index-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-ext_index-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index
+)
+target_link_libraries(ydb-services-ext_index-ut PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-ext_index
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ core-testlib-default
+ ydb-services-metadata
+ public-lib-yson_value
+)
+target_link_options(ydb-services-ext_index-ut PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-services-ext_index-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/ut/ut_ext_index.cpp
+)
+set_property(
+ TARGET
+ ydb-services-ext_index-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 60
+)
+add_yunittest(
+ NAME
+ ydb-services-ext_index-ut
+ TEST_TARGET
+ ydb-services-ext_index-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ext_index-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ext_index-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ext_index-ut
+ PROPERTY
+ TIMEOUT
+ 600
+)
+vcs_info(ydb-services-ext_index-ut)
diff --git a/ydb/services/ext_index/ut/CMakeLists.txt b/ydb/services/ext_index/ut/CMakeLists.txt
new file mode 100644
index 0000000000..d90657116d
--- /dev/null
+++ b/ydb/services/ext_index/ut/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/services/ext_index/ut/CMakeLists.windows-x86_64.txt b/ydb/services/ext_index/ut/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..0b60ddd3d8
--- /dev/null
+++ b/ydb/services/ext_index/ut/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,74 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-ext_index-ut)
+target_compile_options(ydb-services-ext_index-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-ext_index-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index
+)
+target_link_libraries(ydb-services-ext_index-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-system
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-ext_index
+ library-cpp-getopt
+ cpp-regex-pcre
+ library-cpp-svnversion
+ core-testlib-default
+ ydb-services-metadata
+ public-lib-yson_value
+)
+target_sources(ydb-services-ext_index-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/ext_index/ut/ut_ext_index.cpp
+)
+set_property(
+ TARGET
+ ydb-services-ext_index-ut
+ PROPERTY
+ SPLIT_FACTOR
+ 60
+)
+add_yunittest(
+ NAME
+ ydb-services-ext_index-ut
+ TEST_TARGET
+ ydb-services-ext_index-ut
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ext_index-ut
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ext_index-ut
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-services-ext_index-ut
+ PROPERTY
+ TIMEOUT
+ 600
+)
+vcs_info(ydb-services-ext_index-ut)
diff --git a/ydb/services/ext_index/ut/ut_ext_index.cpp b/ydb/services/ext_index/ut/ut_ext_index.cpp
new file mode 100644
index 0000000000..74ffdf3259
--- /dev/null
+++ b/ydb/services/ext_index/ut/ut_ext_index.cpp
@@ -0,0 +1,146 @@
+#include <ydb/core/cms/console/configs_dispatcher.h>
+#include <ydb/core/testlib/cs_helper.h>
+#include <ydb/core/tx/tiering/external_data.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
+#include <ydb/core/wrappers/s3_wrapper.h>
+#include <ydb/core/wrappers/fake_storage.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/services/metadata/manager/alter.h>
+#include <ydb/services/metadata/manager/common.h>
+#include <ydb/services/metadata/manager/table_record.h>
+#include <ydb/services/metadata/manager/ydb_value_operator.h>
+#include <ydb/services/metadata/service.h>
+
+#include <library/cpp/actors/core/av_bootstrapped.h>
+#include <library/cpp/protobuf/json/proto2json.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <util/system/hostname.h>
+
+namespace NKikimr {
+
+using namespace NColumnShard;
+
+class TLocalHelper: public Tests::NCS::THelper {
+private:
+ using TBase = Tests::NCS::THelper;
+public:
+ using TBase::TBase;
+ void CreateTestOlapTable(TString tableName = "olapTable", ui32 tableShardsCount = 3,
+ TString storeName = "olapStore", ui32 storeShardsCount = 4,
+ TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") {
+ TActorId sender = Server.GetRuntime()->AllocateEdgeActor();
+ CreateTestOlapStore(sender, Sprintf(R"(
+ Name: "%s"
+ ColumnShardCount: %d
+ SchemaPresets {
+ Name: "default"
+ Schema {
+ %s
+ }
+ }
+ )", storeName.c_str(), storeShardsCount, PROTO_SCHEMA));
+
+ TString shardingColumns = "[\"timestamp\", \"uid\"]";
+ if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") {
+ shardingColumns = "[\"uid\"]";
+ }
+
+ TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"(
+ Name: "%s"
+ ColumnShardCount: %d
+ TtlSettings: {
+ Enabled: {
+ ColumnName: "timestamp"
+ ExpireAfterSeconds : 60
+ }
+ }
+ Sharding {
+ HashSharding {
+ Function: %s
+ Columns: %s
+ }
+ }
+ )", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str()));
+ }
+};
+
+
+Y_UNIT_TEST_SUITE(ExternalIndex) {
+
+ Y_UNIT_TEST(Simple) {
+ TPortManager pm;
+
+ ui32 grpcPort = pm.GetPort();
+ ui32 msgbPort = pm.GetPort();
+
+ Tests::TServerSettings serverSettings(msgbPort);
+ serverSettings.Port = msgbPort;
+ serverSettings.GrpcPort = grpcPort;
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMetadataProvider(true)
+ .SetEnableExternalIndex(true)
+ .SetEnableBackgroundTasks(true)
+ .SetEnableOlapSchemaOperations(true);
+ ;
+ ;
+
+ Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
+ server->EnableGRpc(grpcPort);
+ Tests::TClient client(serverSettings);
+
+ auto& runtime = *server->GetRuntime();
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_NOTICE);
+ runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NLog::PRI_DEBUG);
+ runtime.SetLogPriority(NKikimrServices::BG_TASKS, NLog::PRI_DEBUG);
+ // runtime.SetLogPriority(NKikimrServices::TX_PROXY_SCHEME_CACHE, NLog::PRI_DEBUG);
+
+ TLocalHelper lHelper(*server);
+ lHelper.CreateTestOlapTable("olapTable", 2);
+ lHelper.StartSchemaRequest("CREATE OBJECT `/Root/olapStore/olapTable:ext_index_simple` ( "
+ "TYPE CS_EXT_INDEX) WITH (extractor = `{\"class_name\" : \"city64\", \"object\" : {\"fields\" : [{\"id\":\"uid\"}, {\"id\":\"message\"}]}}`)");
+ Cerr << "Wait tables" << Endl;
+ runtime.SimulateSleep(TDuration::Seconds(20));
+ Cerr << "Initialization tables" << Endl;
+ const TInstant pkStart = Now() - TDuration::Days(15);
+ ui32 idx = 0;
+
+ auto batch = lHelper.TestArrowBatch(0, (pkStart + TDuration::Seconds(2 * idx++)).GetValue(), 6000);
+ auto batchSize = NArrow::GetBatchDataSize(batch);
+ Cerr << "Inserting " << batchSize << " bytes..." << Endl;
+ UNIT_ASSERT(batchSize > 4 * 1024 * 1024); // NColumnShard::TLimits::MIN_BYTES_TO_INSERT
+ UNIT_ASSERT(batchSize < 8 * 1024 * 1024);
+
+ for (ui32 i = 0; i < 4; ++i) {
+ lHelper.SendDataViaActorSystem("/Root/olapStore/olapTable", batch);
+ }
+
+ {
+ TString resultData;
+ lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple`", true, &resultData);
+ UNIT_ASSERT_EQUAL(resultData, "[6000u]");
+ }
+ lHelper.StartSchemaRequest("DROP OBJECT `/Root/olapStore/olapTable:ext_index_simple` (TYPE CS_EXT_INDEX)");
+ for (ui32 i = 0; i < 10; ++i) {
+ server->GetRuntime()->SimulateSleep(TDuration::Seconds(10));
+ }
+ lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/Root/olapStore/olapTable/ext_index_simple`", false);
+ {
+ TString resultData;
+ lHelper.StartDataRequest("SELECT COUNT(*) FROM `/Root/.metadata/cs_index/external`", true, &resultData);
+ UNIT_ASSERT_EQUAL(resultData, "[0u]");
+ }
+ }
+
+}
+}