aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-10-23 10:01:09 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-10-23 10:01:09 +0300
commitb0391b746c94239133124589d8387ce982c60c91 (patch)
tree0d18fb5d29d0d9d9c09d965f94c00b1f4056ffbf
parent3dcc4596463443e3f7dd3191c29cc2e02bc0f36f (diff)
downloadydb-b0391b746c94239133124589d8387ce982c60c91.tar.gz
add metadata service
-rw-r--r--ydb/core/base/events.h1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.txt2
-rw-r--r--ydb/core/driver_lib/run/config.h1
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp22
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.h6
-rw-r--r--ydb/core/driver_lib/run/run.cpp4
-rw-r--r--ydb/core/grpc_services/rpc_create_table.cpp5
-rw-r--r--ydb/core/grpc_services/rpc_execute_data_query.cpp5
-rw-r--r--ydb/core/protos/config.proto8
-rw-r--r--ydb/core/protos/services.proto4
-rw-r--r--ydb/core/testlib/CMakeLists.txt2
-rw-r--r--ydb/core/testlib/test_client.cpp52
-rw-r--r--ydb/core/testlib/test_client.h7
-rw-r--r--ydb/core/tx/columnshard/CMakeLists.txt2
-rw-r--r--ydb/core/tx/columnshard/external_data.cpp63
-rw-r--r--ydb/core/tx/columnshard/external_data.h42
-rw-r--r--ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/tx/columnshard/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_schema.cpp250
-rw-r--r--ydb/services/CMakeLists.txt1
-rw-r--r--ydb/services/metadata/CMakeLists.txt21
-rw-r--r--ydb/services/metadata/abstract/CMakeLists.txt21
-rw-r--r--ydb/services/metadata/abstract/common.cpp5
-rw-r--r--ydb/services/metadata/abstract/common.h111
-rw-r--r--ydb/services/metadata/ds_table/CMakeLists.txt27
-rw-r--r--ydb/services/metadata/ds_table/accessor_init.cpp16
-rw-r--r--ydb/services/metadata/ds_table/accessor_init.h38
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.cpp68
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.h48
-rw-r--r--ydb/services/metadata/ds_table/accessor_subscribe.cpp39
-rw-r--r--ydb/services/metadata/ds_table/accessor_subscribe.h80
-rw-r--r--ydb/services/metadata/ds_table/config.cpp24
-rw-r--r--ydb/services/metadata/ds_table/config.h20
-rw-r--r--ydb/services/metadata/ds_table/request_actor.cpp5
-rw-r--r--ydb/services/metadata/ds_table/request_actor.h138
-rw-r--r--ydb/services/metadata/ds_table/service.cpp33
-rw-r--r--ydb/services/metadata/ds_table/service.h39
-rw-r--r--ydb/services/metadata/service.cpp9
-rw-r--r--ydb/services/metadata/service.h29
39 files changed, 1242 insertions, 8 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index b17cc32123..f30912e074 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -148,6 +148,7 @@ struct TKikimrEvents : TEvents {
ES_HTTP_PROXY,
ES_BLOB_DEPOT,
ES_DATASHARD_LOAD,
+ ES_METADATA_PROVIDER,
};
};
diff --git a/ydb/core/driver_lib/run/CMakeLists.txt b/ydb/core/driver_lib/run/CMakeLists.txt
index e4bdb347da..1dad9985c2 100644
--- a/ydb/core/driver_lib/run/CMakeLists.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.txt
@@ -122,6 +122,8 @@ target_link_libraries(run PUBLIC
ydb-services-fq
ydb-services-kesus
ydb-services-local_discovery
+ services-metadata-ds_table
+ ydb-services-metadata
ydb-services-monitoring
ydb-services-persqueue_cluster_discovery
ydb-services-persqueue_v1
diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h
index 19d597af21..f90503bf41 100644
--- a/ydb/core/driver_lib/run/config.h
+++ b/ydb/core/driver_lib/run/config.h
@@ -65,6 +65,7 @@ union TBasicKikimrServicesMask {
bool EnableYandexQuery:1;
bool EnableSequenceProxyService:1;
bool EnableHttpProxy:1;
+ bool EnableMetadataProvider : 1;
};
ui64 Raw;
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 4fd6d21632..ecdb212a4e 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -148,6 +148,9 @@
#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
#include <ydb/library/yql/parser/pg_wrapper/interface/comp_factory.h>
+#include <ydb/services/metadata/ds_table/service.h>
+#include <ydb/services/metadata/service.h>
+
#include <library/cpp/actors/protos/services_common.pb.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -2011,6 +2014,25 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
}
}
+TMetadataProviderInitializer::TMetadataProviderInitializer(const TKikimrRunConfig& runConfig)
+ : IKikimrServicesInitializer(runConfig)
+{
+}
+
+void TMetadataProviderInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
+ NMetadataProvider::TConfig serviceConfig;
+ if (Config.HasMetadataProviderConfig()) {
+ Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetMetadataProviderConfig()));
+ }
+
+ if (serviceConfig.IsEnabled()) {
+ auto service = NMetadataProvider::CreateService(serviceConfig);
+ setup->LocalServices.push_back(std::make_pair(
+ NMetadataProvider::MakeServiceId(),
+ TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId)));
+ }
+}
+
TMemoryLogInitializer::TMemoryLogInitializer(
const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig)
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h
index 648026c5ac..9ec1bdccf4 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.h
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h
@@ -383,6 +383,12 @@ private:
std::shared_ptr<TModuleFactories> Factories;
};
+class TMetadataProviderInitializer: public IKikimrServicesInitializer {
+public:
+ TMetadataProviderInitializer(const TKikimrRunConfig& runConfig);
+ void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
+};
+
class TMemoryLogInitializer : public IKikimrServicesInitializer {
size_t LogBufferSize = 0;
size_t LogGrainSize = 0;
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index e5ba487e6f..7ad5a71684 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -1429,6 +1429,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TKqpServiceInitializer(runConfig, ModuleFactories));
}
+ if (serviceMask.EnableMetadataProvider) {
+ sil->AddServiceInitializer(new TMetadataProviderInitializer(runConfig));
+ }
+
if (serviceMask.EnableCms) {
sil->AddServiceInitializer(new TCmsServiceInitializer(runConfig));
}
diff --git a/ydb/core/grpc_services/rpc_create_table.cpp b/ydb/core/grpc_services/rpc_create_table.cpp
index 3cd152fc70..3dd3ed00a6 100644
--- a/ydb/core/grpc_services/rpc_create_table.cpp
+++ b/ydb/core/grpc_services/rpc_create_table.cpp
@@ -233,5 +233,10 @@ void DoCreateTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvi
TActivationContext::AsActorContext().Register(new TCreateTableRPC(p.release()));
}
+template<>
+IActor* TEvCreateTableRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
+ return new TCreateTableRPC(msg);
+}
+
} // namespace NGRpcService
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp
index 375edaea36..5f62ffefae 100644
--- a/ydb/core/grpc_services/rpc_execute_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp
@@ -260,5 +260,10 @@ void DoExecuteDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacility
TActivationContext::AsActorContext().Register(new TExecuteDataQueryRPC(p.release()));
}
+template<>
+IActor* TEvExecuteDataQueryRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
+ return new TExecuteDataQueryRPC(msg);
+}
+
} // namespace NGRpcService
} // namespace NKikimr
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index e5ff07a69b..9dbbd9d748 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -574,6 +574,13 @@ message TKQPConfig {
repeated NKikimrKqp.TKqpSetting Settings = 10;
}
+message TMetadataProviderConfig {
+ optional bool Enabled = 1 [default = true];
+ optional uint32 RefreshPeriodSeconds = 2 [default = 10];
+ optional uint32 RetryPeriodStartSeconds = 3 [default = 3];
+ optional uint32 RetryPeriodFinishSeconds = 4 [default = 30];
+}
+
message TMemoryLogConfig {
optional uint64 LogBufferSize = 1;
optional uint64 LogGrainSize = 2;
@@ -1633,6 +1640,7 @@ message TAppConfig {
optional TTracingConfig TracingConfig = 55;
optional TFailureInjectionConfig FailureInjectionConfig = 56;
optional THttpProxyConfig PublicHttpConfig = 57;
+ optional TMetadataProviderConfig MetadataProviderConfig = 59;
optional NYq.NConfig.TConfig YandexQueryConfig = 50; // TODO: remove after migration to FederatedQueryConfig
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 05b2da81c7..c2c24e2f06 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -326,6 +326,10 @@ enum EServiceKikimr {
// DATASHARD section again
DS_LOAD_TEST = 1400;
+
+ // MetadataProvider
+ METADATA_PROVIDER = 1500;
+
};
message TActivity {
diff --git a/ydb/core/testlib/CMakeLists.txt b/ydb/core/testlib/CMakeLists.txt
index 98c3ba82f1..c5ab86c1f7 100644
--- a/ydb/core/testlib/CMakeLists.txt
+++ b/ydb/core/testlib/CMakeLists.txt
@@ -61,6 +61,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
core-tx-long_tx_service
core-tx-mediator
tx-replication-controller
+ core-tx-schemeshard
core-tx-sequenceproxy
core-tx-sequenceshard
core-tx-time_cast
@@ -90,6 +91,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-services-persqueue_v1
ydb-services-rate_limiter
ydb-services-monitoring
+ services-metadata-ds_table
ydb-services-ydb
ydb-services-yq
ydb-core-http_proxy
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index c82ee285c2..c8c8bfac98 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -88,6 +88,8 @@
#include <ydb/library/security/ydb_credentials_provider_factory.h>
#include <ydb/core/yq/libs/init/init.h>
#include <ydb/core/yq/libs/mock/yql_mock.h>
+#include <ydb/services/metadata/ds_table/service.h>
+#include <ydb/services/metadata/service.h>
#include <ydb/library/folder_service/mock/mock_folder_service.h>
#include <ydb/core/client/server/msgbus_server_tracer.h>
@@ -352,7 +354,50 @@ namespace Tests {
);
}
- void TServer::SetupDomains(TAppPrepare &app) {
+ void TServer::SetupRootStoragePools(const TActorId sender) const {
+ if (GetSettings().StoragePoolTypes.empty()) {
+ return;
+ }
+
+ auto& runtime = *GetRuntime();
+ auto& settings = GetSettings();
+
+ auto tid = ChangeStateStorage(SchemeRoot, settings.Domain);
+ const TDomainsInfo::TDomain& domain = runtime.GetAppData().DomainsInfo->GetDomain(settings.Domain);
+
+ auto evTx = MakeHolder<NSchemeShard::TEvSchemeShard::TEvModifySchemeTransaction>(1, tid);
+ auto transaction = evTx->Record.AddTransaction();
+ transaction->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain);
+ transaction->SetWorkingDir("/");
+ auto op = transaction->MutableSubDomain();
+ op->SetName(domain.Name);
+
+ for (const auto& [kind, pool] : settings.StoragePoolTypes) {
+ auto* p = op->AddStoragePools();
+ p->SetKind(kind);
+ p->SetName(pool.GetName());
+ }
+
+ runtime.SendToPipe(tid, sender, evTx.Release(), 0, GetPipeConfigWithRetries());
+
+ {
+ TAutoPtr<IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<NSchemeShard::TEvSchemeShard::TEvModifySchemeTransactionResult>(handle);
+ UNIT_ASSERT_VALUES_EQUAL(event->Record.GetSchemeshardId(), tid);
+ UNIT_ASSERT_VALUES_EQUAL(event->Record.GetStatus(), NKikimrScheme::EStatus::StatusAccepted);
+ }
+
+ auto evSubscribe = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(1);
+ runtime.SendToPipe(tid, sender, evSubscribe.Release(), 0, GetPipeConfigWithRetries());
+
+ {
+ TAutoPtr<IEventHandle> handle;
+ auto event = runtime.GrabEdgeEvent<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(handle);
+ UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), 1);
+ }
+ }
+
+ void TServer::SetupDomains(TAppPrepare& app) {
const ui32 domainId = Settings->Domain;
ui64 planResolution = Settings->DomainPlanResolution;
if (!planResolution) {
@@ -579,6 +624,11 @@ namespace Tests {
auto aid = Runtime->Register(dispatcher, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(Runtime->GetNodeId(nodeIdx)), aid);
}
+ if (Settings->IsEnableMetadataProvider()) {
+ auto* actor = NMetadataProvider::CreateService(NMetadataProvider::TConfig());
+ const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
+ Runtime->RegisterService(NMetadataProvider::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid);
+ }
Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0);
auto sysViewService = NSysView::CreateSysViewServiceForTests();
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index 9326e9718f..2401c11e54 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -17,7 +17,6 @@
#include <ydb/library/yql/minikql/mkql_function_registry.h>
#include <ydb/library/mkql_proto/protos/minikql.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
-#include <library/cpp/grpc/server/grpc_server.h>
#include <ydb/core/testlib/basics/runtime.h>
#include <ydb/core/testlib/basics/appdata.h>
#include <ydb/core/protos/kesus.pb.h>
@@ -27,6 +26,9 @@
#include <ydb/core/persqueue/actor_persqueue_client_iface.h>
#include <ydb/core/yq/libs/shared_resources/interface/shared_resources.h>
#include <ydb/core/http_proxy/auth_factory.h>
+#include <ydb/library/accessor/accessor.h>
+
+#include <library/cpp/grpc/server/grpc_server.h>
#include <google/protobuf/text_format.h>
@@ -209,6 +211,8 @@ namespace Tests {
TServerSettings(const TServerSettings& settings) = default;
TServerSettings& operator=(const TServerSettings& settings) = default;
+ private:
+ YDB_FLAG_ACCESSOR(EnableMetadataProvider, true);
};
class TServer : public TThrRefBase, TMoveOnly {
@@ -240,6 +244,7 @@ namespace Tests {
void EnableGRpc(const NGrpc::TServerOptions& options);
void EnableGRpc(ui16 port);
+ void SetupRootStoragePools(const TActorId sender) const;
void SetupDefaultProfiles();
diff --git a/ydb/core/tx/columnshard/CMakeLists.txt b/ydb/core/tx/columnshard/CMakeLists.txt
index 74b573329e..f1c69be237 100644
--- a/ydb/core/tx/columnshard/CMakeLists.txt
+++ b/ydb/core/tx/columnshard/CMakeLists.txt
@@ -18,6 +18,7 @@ target_link_libraries(core-tx-columnshard PUBLIC
yutil
tools-enum_parser-enum_serialization_runtime
cpp-actors-core
+ ydb-services-metadata
ydb-core-actorlib_impl
ydb-core-base
core-blobstorage-dsproxy
@@ -61,6 +62,7 @@ target_sources(core-tx-columnshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/external_data.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp
diff --git a/ydb/core/tx/columnshard/external_data.cpp b/ydb/core/tx/columnshard/external_data.cpp
new file mode 100644
index 0000000000..e75ec77db8
--- /dev/null
+++ b/ydb/core/tx/columnshard/external_data.cpp
@@ -0,0 +1,63 @@
+#include "external_data.h"
+
+namespace NKikimr::NColumnShard {
+
+std::optional<TString> TCSKVSnapshot::GetValue(const TString& key) const {
+ auto it = Data.find(key);
+ if (it == Data.end()) {
+ return {};
+ } else {
+ return it->second;
+ }
+}
+
+bool TCSKVSnapshot::DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) {
+ if (rawData.columns().size() != 2) {
+ Cerr << "incorrect proto columns info" << Endl;
+ return false;
+ }
+ i32 keyIdx = -1;
+ i32 valueIdx = -1;
+ ui32 idx = 0;
+ for (auto&& i : rawData.columns()) {
+ if (i.name() == "key") {
+ keyIdx = idx;
+ } else if (i.name() == "value") {
+ valueIdx = idx;
+ }
+ ++idx;
+ }
+ if (keyIdx < 0 || valueIdx < 0) {
+ Cerr << "incorrect table columns";
+ return false;
+ }
+ for (auto&& r : rawData.rows()) {
+ TString key(r.items()[keyIdx].bytes_value());
+ TString value(r.items()[valueIdx].bytes_value());
+ if (!Data.emplace(key, value).second) {
+ Cerr << "keys duplication: " << key;
+ return false;
+ }
+ }
+ return true;
+}
+
+Ydb::Table::CreateTableRequest TSnapshotConstructor::DoGetTableSchema() const {
+ Ydb::Table::CreateTableRequest request;
+ request.set_session_id("");
+ request.set_path(TablePath);
+ request.add_primary_key("key");
+ {
+ auto& column = *request.add_columns();
+ column.set_name("key");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ {
+ auto& column = *request.add_columns();
+ column.set_name("value");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
+ return request;
+}
+
+}
diff --git a/ydb/core/tx/columnshard/external_data.h b/ydb/core/tx/columnshard/external_data.h
new file mode 100644
index 0000000000..500e7d94d7
--- /dev/null
+++ b/ydb/core/tx/columnshard/external_data.h
@@ -0,0 +1,42 @@
+#pragma once
+#include <ydb/services/metadata/service.h>
+
+namespace NKikimr::NColumnShard {
+
+class TCSKVSnapshot: public NMetadataProvider::ISnapshot {
+private:
+ using TBase = NMetadataProvider::ISnapshot;
+ using TKVData = TMap<TString, TString>;
+ YDB_READONLY_DEF(TKVData, Data);
+protected:
+ virtual bool DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) override;
+ virtual TString DoSerializeToString() const override {
+ TStringBuilder sb;
+ for (auto&& i : Data) {
+ sb << i.first << "=" << i.second << ";";
+ }
+ return sb;
+ }
+public:
+ std::optional<TString> GetValue(const TString& key) const;
+
+ using TBase::TBase;
+};
+
+class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TCSKVSnapshot> {
+private:
+ const TString TablePath;
+protected:
+ virtual Ydb::Table::CreateTableRequest DoGetTableSchema() const override;
+ virtual const TString& DoGetTablePath() const override {
+ return TablePath;
+ }
+public:
+ TSnapshotConstructor(const TString& tablePath)
+ : TablePath(tablePath)
+ {
+
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt b/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt
index e61780de69..ac70305047 100644
--- a/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt
+++ b/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt
@@ -24,6 +24,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut PUBLIC
cpp-regex-pcre
library-cpp-svnversion
core-testlib-default
+ ydb-services-metadata
ydb-core-tx
public-lib-yson_value
)
diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt b/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt
index 62ba940e34..4de436d122 100644
--- a/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt
+++ b/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt
@@ -26,6 +26,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut PUBLIC
cpp-regex-pcre
library-cpp-svnversion
core-testlib-default
+ ydb-services-metadata
ydb-core-tx
public-lib-yson_value
)
diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
index e8e6a8e65a..c0aebd1889 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
@@ -1,6 +1,16 @@
#include "columnshard_ut_common.h"
+#include "external_data.h"
#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
#include <ydb/core/wrappers/s3_wrapper.h>
+#include <ydb/services/metadata/service.h>
+#include <ydb/core/cms/console/configs_dispatcher.h>
+//#include <ydb/core/testlib/cs_helper.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+#include <library/cpp/actors/core/av_bootstrapped.h>
+
#include <util/system/hostname.h>
namespace NKikimr {
@@ -574,8 +584,6 @@ void TestHotAndColdTiers(bool reboot) {
TString bucket = "ydb";
TPortManager portManager;
const ui16 port = portManager.GetPort();
- const TString connString = "fake";
- Cerr << "S3 at " << connString << "\n";
TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());
@@ -591,14 +599,14 @@ void TestHotAndColdTiers(bool reboot) {
s3Config.SetVerifySSL(false);
#if 0
s3Config.SetEndpoint("storage.cloud-preprod.yandex.net");
- s3Config.SetBucket("ch-s3");
- s3Config.SetAccessKey(); < --
- s3Config.SetSecretKey(); < --
+ s3Config.SetBucket("tiering-test-01");
+ s3Config.SetAccessKey("...");
+ s3Config.SetSecretKey("...");
s3Config.SetProxyHost("localhost");
s3Config.SetProxyPort(8080);
s3Config.SetProxyScheme(NKikimrSchemeOp::TS3Settings::HTTP);
#else
- s3Config.SetEndpoint(connString);
+ s3Config.SetEndpoint("fake");
s3Config.SetBucket(bucket);
#endif
s3Config.SetRequestTimeoutMs(10000);
@@ -694,8 +702,238 @@ void TestDrop(bool reboots) {
namespace NColumnShard {
extern bool gAllowLogBatchingDefaultValue;
}
+/*
+class TLocalHelper: public Tests::NCS::THelper {
+private:
+ using TBase = Tests::NCS::THelper;
+public:
+ using TBase::TBase;
+ void CreateTestOlapTable(TString tableName = "olapTable", TString storeName = "olapStore",
+ ui32 storeShardsCount = 4, ui32 tableShardsCount = 3,
+ TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") {
+ TActorId sender = Server.GetRuntime()->AllocateEdgeActor();
+ CreateTestOlapStore(sender, Sprintf(R"(
+ Name: "%s"
+ ColumnShardCount: %d
+ SchemaPresets {
+ Name: "default"
+ Schema {
+ Columns { Name: "timestamp" Type: "Timestamp" }
+ #Columns { Name: "resource_type" Type: "Utf8" }
+ Columns { Name: "resource_id" Type: "Utf8" }
+ Columns { Name: "uid" Type: "Utf8" }
+ Columns { Name: "level" Type: "Int32" }
+ Columns { Name: "message" Type: "Utf8" }
+ #Columns { Name: "json_payload" Type: "Json" }
+ #Columns { Name: "ingested_at" Type: "Timestamp" }
+ #Columns { Name: "saved_at" Type: "Timestamp" }
+ #Columns { Name: "request_id" Type: "Utf8" }
+ KeyColumnNames: "timestamp"
+ Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ StorageTiers {
+ Name: "tier1"
+ ObjectStorage {
+ Endpoint: "fake"
+ AccessKey: "$a"
+ }
+ }
+ }
+ }
+ )", storeName.c_str(), storeShardsCount));
+
+ TString shardingColumns = "[\"timestamp\", \"uid\"]";
+ if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") {
+ shardingColumns = "[\"uid\"]";
+ }
+
+ TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"(
+ Name: "%s"
+ ColumnShardCount: %d
+ Sharding {
+ HashSharding {
+ Function: %s
+ Columns: %s
+ }
+ })", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str()));
+ }
+};
+*/
Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
+ class TTestCSEmulator: public NActors::TActorBootstrapped<TTestCSEmulator> {
+ private:
+ using TBase = NActors::TActorBootstrapped<TTestCSEmulator>;
+ std::shared_ptr<TSnapshotConstructor> ExternalDataManipulation;
+ TActorId ProviderId;
+ TInstant Start;
+ YDB_READONLY_FLAG(Found, false);
+ public:
+ STFUNC(StateInit) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle);
+ default:
+ Y_VERIFY(false);
+ }
+ }
+
+ void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev, const TActorContext&) {
+ auto value = ev->Get()->GetSnapshotAs<TCSKVSnapshot>()->GetValue("/Root/olapStore.tier1.a");
+ if (value && *value == "b") {
+ FoundFlag = true;
+ } else {
+ Cerr << ev->Get()->GetSnapshot()->SerializeToString() << Endl;
+ }
+ }
+
+ void Bootstrap() {
+ ProviderId = NMetadataProvider::MakeServiceId(1);
+ ExternalDataManipulation = std::make_shared<TSnapshotConstructor>("/Root/.external_data/kv_settings");
+ Become(&TThis::StateInit);
+ Sender<NMetadataProvider::TEvSubscribeExternal>(ExternalDataManipulation).SendTo(ProviderId);
+ Start = Now();
+ }
+ };
+
+ Y_UNIT_TEST(DSConfigsStub) {
+ TPortManager pm;
+
+ ui32 grpcPort = pm.GetPort();
+ ui32 msgbPort = pm.GetPort();
+
+ Tests::TServerSettings serverSettings(msgbPort);
+ serverSettings.Port = msgbPort;
+ serverSettings.GrpcPort = grpcPort;
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMetadataProvider(true)
+ .SetEnableKqpSessionActor(true)
+ .SetEnableOlapSchemaOperations(true);
+ ;
+
+ Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
+ server->EnableGRpc(grpcPort);
+ // server->SetupDefaultProfiles();
+
+ Tests::TClient client(serverSettings);
+
+ auto& runtime = *server->GetRuntime();
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+ {
+ TTestCSEmulator* emulator = new TTestCSEmulator;
+ runtime.Register(emulator);
+ {
+ const TInstant start = Now();
+ while (Now() - start < TDuration::Seconds(10)) {
+ runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
+ Cerr << "Step " << event->Type << Endl;
+ return false;
+ }, {}, TDuration::Seconds(1));
+ Sleep(TDuration::Seconds(1));
+ Cerr << "Step finished" << Endl;
+ }
+ }
+
+ NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto sResult = f.GetValueSync();
+ Cerr << sResult.GetIssues().ToString() << Endl;
+ auto session = sResult.GetSession();
+ session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/kv_settings` (key, value) VALUES ('/Root/olapStore.tier1.a', 'b')"
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ });
+
+ const TInstant start = Now();
+ while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
+ runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10));
+ }
+ Y_VERIFY(emulator->IsFound());
+ }
+ //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
+ //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
+ }
+
+ /*
+ Y_UNIT_TEST(DSConfigs) {
+ TPortManager pm;
+
+ ui32 grpcPort = pm.GetPort();
+ ui32 msgbPort = pm.GetPort();
+
+ Tests::TServerSettings serverSettings(msgbPort);
+ serverSettings.Port = msgbPort;
+ serverSettings.GrpcPort = grpcPort;
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMetadataProvider(true)
+ .SetEnableKqpSessionActor(true)
+ .SetEnableOlapSchemaOperations(true);
+ ;
+
+ Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
+ server->EnableGRpc(grpcPort);
+// server->SetupDefaultProfiles();
+
+ Tests::TClient client(serverSettings);
+
+ auto& runtime = *server->GetRuntime();
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->InitRoot(sender);
+ TLocalHelper lHelper(*server);
+ lHelper.CreateTestOlapTable();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ const TInstant start = Now();
+ while (Now() - start < TDuration::Seconds(10)) {
+ runtime.WaitForEdgeEvents([](TTestActorRuntimeBase& , TAutoPtr<IEventHandle>& event) {
+ Cerr << "Step " << event->Type << Endl;
+ return false;
+ }, {}, TDuration::Seconds(1));
+ Sleep(TDuration::Seconds(1));
+ Cerr << "Step finished" << Endl;
+ }
+
+ NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto sResult = f.GetValueSync();
+ Cerr << sResult.GetIssues().ToString() << Endl;
+ auto session = sResult.GetSession();
+ session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/kv_settings` (key, value) VALUES ('/Root/olapStore.tier1.a', 'b')"
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ });
+
+ bool found = false;
+ bool* foundPtr = &found;
+ const auto pred = [foundPtr](TTestActorRuntimeBase& , TAutoPtr<IEventHandle>& event)->TTestActorRuntimeBase::EEventAction {
+ if (event->HasBuffer() && !event->HasEvent()) {
+ } else if (!event->GetBase()) {
+ Cerr << "Type nullptr" << Endl;
+ } else {
+ Cerr << "Step " << event->GetBase()->Type() << Endl;
+ auto ptr = dynamic_cast<NMetadataProvider::TEvRefreshSubscriberData*>(event->GetBase());
+ if (ptr) {
+ auto value = ptr->GetSnapshotAs<TCSKVSnapshot>()->GetValue("/Root/olapStore.tier1.a");
+ if (value && *value == "b") {
+ *foundPtr = true;
+ } else {
+ Cerr << ptr->GetSnapshot()->SerializeToString() << Endl;
+ }
+ }
+ }
+ return TTestActorRuntimeBase::EEventAction::PROCESS;
+ };
+
+ runtime.SetObserverFunc(pred);
+
+ while (!found) {
+ runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10));
+ }
+ //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
+ //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
+ }
+*/
Y_UNIT_TEST(CreateTable) {
ui64 tableId = 1;
diff --git a/ydb/services/CMakeLists.txt b/ydb/services/CMakeLists.txt
index 7359c6a379..da7cf03301 100644
--- a/ydb/services/CMakeLists.txt
+++ b/ydb/services/CMakeLists.txt
@@ -14,6 +14,7 @@ add_subdirectory(fq)
add_subdirectory(kesus)
add_subdirectory(lib)
add_subdirectory(local_discovery)
+add_subdirectory(metadata)
add_subdirectory(monitoring)
add_subdirectory(persqueue_cluster_discovery)
add_subdirectory(persqueue_v1)
diff --git a/ydb/services/metadata/CMakeLists.txt b/ydb/services/metadata/CMakeLists.txt
new file mode 100644
index 0000000000..13463c41c2
--- /dev/null
+++ b/ydb/services/metadata/CMakeLists.txt
@@ -0,0 +1,21 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+add_subdirectory(abstract)
+add_subdirectory(ds_table)
+
+add_library(ydb-services-metadata)
+target_link_libraries(ydb-services-metadata PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ services-metadata-abstract
+)
+target_sources(ydb-services-metadata PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/service.cpp
+)
diff --git a/ydb/services/metadata/abstract/CMakeLists.txt b/ydb/services/metadata/abstract/CMakeLists.txt
new file mode 100644
index 0000000000..7ea18da9cf
--- /dev/null
+++ b/ydb/services/metadata/abstract/CMakeLists.txt
@@ -0,0 +1,21 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-metadata-abstract)
+target_link_libraries(services-metadata-abstract PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ ydb-library-accessor
+ cpp-actors-core
+ api-protos
+ ydb-core-base
+)
+target_sources(services-metadata-abstract PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/abstract/common.cpp
+)
diff --git a/ydb/services/metadata/abstract/common.cpp b/ydb/services/metadata/abstract/common.cpp
new file mode 100644
index 0000000000..fa5484fdbe
--- /dev/null
+++ b/ydb/services/metadata/abstract/common.cpp
@@ -0,0 +1,5 @@
+#include "common.h"
+
+namespace NKikimr::NMetadataProvider {
+
+}
diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h
new file mode 100644
index 0000000000..dd235bacdc
--- /dev/null
+++ b/ydb/services/metadata/abstract/common.h
@@ -0,0 +1,111 @@
+#pragma once
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/core/actor_virtual.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/public/api/protos/ydb_table.pb.h>
+#include <ydb/core/base/events.h>
+
+namespace NKikimr::NMetadataProvider {
+
+enum EEvSubscribe {
+ EvRefreshSubscriberData = EventSpaceBegin(TKikimrEvents::ES_METADATA_PROVIDER),
+ EvCreateTableRequest,
+ EvCreateTableInternalResponse,
+ EvCreateTableResponse,
+ EvSelectRequest,
+ EvSelectInternalResponse,
+ EvSelectResponse,
+ EvCreateSessionRequest,
+ EvCreateSessionInternalResponse,
+ EvCreateSessionResponse,
+ EvRefresh,
+ EvSubscribeLocal,
+ EvUnsubscribeLocal,
+ EvSubscribeExternal,
+ EvUnsubscribeExternal,
+ EvEnd
+};
+
+static_assert(EEvSubscribe::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_TABLET_PIPE)");
+
+class ISnapshot {
+private:
+ YDB_READONLY_DEF(TInstant, Actuality);
+protected:
+ virtual bool DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) = 0;
+ virtual TString DoSerializeToString() const = 0;
+
+public:
+ using TPtr = std::shared_ptr<ISnapshot>;
+ ISnapshot(const TInstant actuality)
+ : Actuality(actuality) {
+
+ }
+
+ bool DeserializeFromResultSet(const Ydb::ResultSet& rawData) {
+ return DoDeserializeFromResultSet(rawData);
+ }
+
+ TString SerializeToString() const {
+ return DoSerializeToString();
+ }
+
+ virtual ~ISnapshot() = default;
+};
+
+class ISnapshotParser {
+protected:
+ virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const = 0;
+ virtual Ydb::Table::CreateTableRequest DoGetTableSchema() const = 0;
+ virtual const TString& DoGetTablePath() const = 0;
+public:
+ using TPtr = std::shared_ptr<ISnapshotParser>;
+
+ ISnapshot::TPtr ParseSnapshot(const Ydb::ResultSet& rawData, const TInstant actuality) const {
+ ISnapshot::TPtr result = CreateSnapshot(actuality);
+ Y_VERIFY(result);
+ if (!result->DeserializeFromResultSet(rawData)) {
+ return nullptr;
+ }
+ return result;
+ }
+
+ Ydb::Table::CreateTableRequest GetTableSchema() const {
+ return DoGetTableSchema();
+ }
+
+ const TString& GetTablePath() const {
+ return DoGetTablePath();
+ }
+
+ virtual ~ISnapshotParser() = default;
+};
+
+template <class TSnapshot>
+class TGenericSnapshotParser: public ISnapshotParser {
+protected:
+ virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const override {
+ return std::make_shared<TSnapshot>(actuality);
+ }
+};
+
+class TEvRefreshSubscriberData: public NActors::TEventLocal<TEvRefreshSubscriberData, EvRefreshSubscriberData> {
+private:
+ YDB_READONLY_DEF(ISnapshot::TPtr, Snapshot);
+public:
+ TEvRefreshSubscriberData(ISnapshot::TPtr snapshot)
+ : Snapshot(snapshot) {
+
+ }
+
+ template <class TSnapshot>
+ const TSnapshot* GetSnapshotAs() const {
+ return dynamic_cast<const TSnapshot*>(Snapshot.get());
+ }
+};
+
+}
diff --git a/ydb/services/metadata/ds_table/CMakeLists.txt b/ydb/services/metadata/ds_table/CMakeLists.txt
new file mode 100644
index 0000000000..696e31532c
--- /dev/null
+++ b/ydb/services/metadata/ds_table/CMakeLists.txt
@@ -0,0 +1,27 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-metadata-ds_table)
+target_link_libraries(services-metadata-ds_table PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ core-grpc_services-local_rpc
+ core-grpc_services-base
+ ydb-core-grpc_services
+)
+target_sources(services-metadata-ds_table PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_init.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_refresh.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/request_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp
+)
diff --git a/ydb/services/metadata/ds_table/accessor_init.cpp b/ydb/services/metadata/ds_table/accessor_init.cpp
new file mode 100644
index 0000000000..e410085817
--- /dev/null
+++ b/ydb/services/metadata/ds_table/accessor_init.cpp
@@ -0,0 +1,16 @@
+#include "accessor_init.h"
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+
+namespace NKikimr::NMetadataProvider {
+
+bool TDSAccessorInitialized::Handle(TEvRequestResult<TDialogCreateTable>::TPtr& /*ev*/) {
+ InitializedFlag = true;
+ return true;
+}
+
+void TDSAccessorInitialized::Bootstrap(const NActors::TActorContext& /*ctx*/) {
+ RegisterState();
+ Register(new TYDBRequest<TDialogCreateTable>(GetTableSchema(), SelfId(), Config));
+}
+
+}
diff --git a/ydb/services/metadata/ds_table/accessor_init.h b/ydb/services/metadata/ds_table/accessor_init.h
new file mode 100644
index 0000000000..708ec3a3dd
--- /dev/null
+++ b/ydb/services/metadata/ds_table/accessor_init.h
@@ -0,0 +1,38 @@
+#pragma once
+#include "config.h"
+#include "request_actor.h"
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/event_local.h>
+#include <library/cpp/threading/future/core/future.h>
+#include <library/cpp/actors/core/av_bootstrapped.h>
+
+namespace NKikimr::NMetadataProvider {
+
+class TDSAccessorInitialized;
+
+class TDSAccessorInitialized: public NActors::TActorBootstrapped<TDSAccessorInitialized> {
+private:
+ YDB_READONLY_FLAG(Initialized, false);
+protected:
+ const TConfig& Config;
+ virtual Ydb::Table::CreateTableRequest GetTableSchema() const = 0;
+ virtual void RegisterState() = 0;
+public:
+ TDSAccessorInitialized(const TConfig& config)
+ : Config(config) {
+
+ }
+ void Bootstrap(const NActors::TActorContext& /*ctx*/);
+ virtual bool Handle(TEvRequestResult<TDialogCreateTable>::TPtr& ev);
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvRequestResult<TDialogCreateTable>, Handle);
+ default:
+ break;
+ }
+ }
+};
+
+}
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp
new file mode 100644
index 0000000000..56828775e8
--- /dev/null
+++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp
@@ -0,0 +1,68 @@
+#include "accessor_refresh.h"
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+
+#include <library/cpp/actors/core/log.h>
+
+#include <util/string/escape.h>
+
+namespace NKikimr::NMetadataProvider {
+
+bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) {
+ const TString startString = CurrentSelection.SerializeAsString();
+
+ auto currentFullReply = ev->Get()->GetResult();
+ Ydb::Table::ExecuteQueryResult qResult;
+ currentFullReply.operation().result().UnpackTo(&qResult);
+ Y_VERIFY(qResult.result_sets().size() == 1);
+ CurrentSelection = qResult.result_sets()[0];
+ auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(CurrentSelection, RequestedActuality);
+ if (!!parsedSnapshot) {
+ CurrentSnapshot = parsedSnapshot;
+ } else {
+ ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot parse current snapshot";
+ }
+
+ RequestedActuality = TInstant::Zero();
+ Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
+ if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != startString) {
+ ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << CurrentSelection.DebugString();
+ return true;
+ }
+ return false;
+}
+
+void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev) {
+ Ydb::Table::CreateSessionResponse currentFullReply = ev->Get()->GetResult();
+ Ydb::Table::CreateSessionResult session;
+ currentFullReply.operation().result().UnpackTo(&session);
+ const TString sessionId = session.session_id();
+ Y_VERIFY(sessionId);
+ Ydb::Table::ExecuteDataQueryRequest request;
+ request.mutable_query()->set_yql_text("SELECT * FROM `" + EscapeC(GetTableName()) + "`");
+ request.set_session_id(sessionId);
+ request.mutable_tx_control()->mutable_begin_tx()->mutable_snapshot_read_only();
+
+ Register(new TYDBRequest<TDialogSelect>(std::move(request), SelfId(), Config));
+}
+
+void TDSAccessorRefresher::Handle(TEvRefresh::TPtr& /*ev*/) {
+ Y_VERIFY(IsInitialized());
+ Register(new TYDBRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), SelfId(), Config));
+}
+
+bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateTable>::TPtr& ev) {
+ if (!TBase::Handle(ev)) {
+ return false;
+ }
+ Sender<TEvRefresh>().SendTo(SelfId());
+ return true;
+}
+
+TDSAccessorRefresher::TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor)
+ : TBase(config)
+ , SnapshotConstructor(snapshotConstructor)
+{
+}
+
+}
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h
new file mode 100644
index 0000000000..c260f396b1
--- /dev/null
+++ b/ydb/services/metadata/ds_table/accessor_refresh.h
@@ -0,0 +1,48 @@
+#pragma once
+#include "accessor_init.h"
+#include <ydb/public/api/protos/ydb_value.pb.h>
+#include <ydb/services/metadata/abstract/common.h>
+
+namespace NKikimr::NMetadataProvider {
+
+class TDSAccessorRefresher;
+
+class TEvRefresh: public NActors::TEventLocal<TEvRefresh, EEvSubscribe::EvRefresh> {
+public:
+};
+
+class TDSAccessorRefresher: public TDSAccessorInitialized {
+private:
+ using TBase = TDSAccessorInitialized;
+ ISnapshotParser::TPtr SnapshotConstructor;
+ YDB_READONLY_DEF(ISnapshot::TPtr, CurrentSnapshot);
+ YDB_READONLY_DEF(Ydb::ResultSet, CurrentSelection);
+ TInstant RequestedActuality = TInstant::Zero();
+protected:
+ virtual TString GetTableName() const = 0;
+ bool IsReady() const {
+ return !!CurrentSnapshot;
+ }
+public:
+ using TBase::Handle;
+ virtual bool Handle(TEvRequestResult<TDialogCreateTable>::TPtr& ev) override;
+
+ STFUNC(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvRequestResult<TDialogSelect>, Handle);
+ hFunc(TEvRequestResult<TDialogCreateTable>, Handle);
+ hFunc(TEvRequestResult<TDialogCreateSession>, Handle);
+ hFunc(TEvRefresh, Handle);
+ default:
+ TBase::StateMain(ev, ctx);
+ }
+ }
+
+ TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor);
+
+ virtual bool Handle(TEvRequestResult<TDialogSelect>::TPtr& ev);
+ void Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev);
+ void Handle(TEvRefresh::TPtr& ev);
+};
+
+}
diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.cpp b/ydb/services/metadata/ds_table/accessor_subscribe.cpp
new file mode 100644
index 0000000000..707432c65a
--- /dev/null
+++ b/ydb/services/metadata/ds_table/accessor_subscribe.cpp
@@ -0,0 +1,39 @@
+#include "accessor_subscribe.h"
+#include <library/cpp/actors/core/log.h>
+
+namespace NKikimr::NMetadataProvider {
+
+bool TDSAccessorNotifier::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) {
+ if (TBase::Handle(ev)) {
+ auto snapshot = GetCurrentSnapshot();
+ if (!snapshot) {
+ ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot construct snapshot";
+ return false;
+ }
+
+ for (auto&& i : Subscribed) {
+ Send(i, new TEvRefreshSubscriberData(snapshot));
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void TDSAccessorNotifier::Handle(TEvSubscribe::TPtr& ev) {
+ Subscribed.emplace(ev->Get()->GetSubscriberId());
+ if (TBase::IsReady()) {
+ auto snapshot = GetCurrentSnapshot();
+ if (!snapshot) {
+ ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot construct snapshot";
+ return;
+ }
+ Send(ev->Get()->GetSubscriberId(), new TEvRefreshSubscriberData(snapshot));
+ }
+}
+
+void TDSAccessorNotifier::Handle(TEvUnsubscribe::TPtr& ev) {
+ Subscribed.erase(ev->Get()->GetSubscriberId());
+}
+
+}
diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.h b/ydb/services/metadata/ds_table/accessor_subscribe.h
new file mode 100644
index 0000000000..5ce80b08dc
--- /dev/null
+++ b/ydb/services/metadata/ds_table/accessor_subscribe.h
@@ -0,0 +1,80 @@
+#pragma once
+#include "accessor_refresh.h"
+
+namespace NKikimr::NMetadataProvider {
+
+class TDSAccessorNotifier;
+
+class TEvSubscribe: public NActors::TEventLocal<TEvSubscribe, EEvSubscribe::EvSubscribeLocal> {
+private:
+ YDB_READONLY_DEF(TActorId, SubscriberId);
+public:
+ TEvSubscribe(const TActorId& subscriberId)
+ : SubscriberId(subscriberId)
+ {
+
+ }
+};
+
+class TEvUnsubscribe: public NActors::TEventLocal<TEvUnsubscribe, EEvSubscribe::EvUnsubscribeLocal> {
+private:
+ YDB_READONLY_DEF(TActorId, SubscriberId);
+public:
+ TEvUnsubscribe(const TActorId& subscriberId)
+ : SubscriberId(subscriberId) {
+
+ }
+};
+
+class TDSAccessorNotifier: public TDSAccessorRefresher {
+private:
+ using TBase = TDSAccessorRefresher;
+ std::set<NActors::TActorId> Subscribed;
+protected:
+ virtual void RegisterState() override {
+ Become(&TDSAccessorNotifier::StateMain);
+ }
+public:
+ using TBase::Handle;
+
+ TDSAccessorNotifier(const TConfig& config, ISnapshotParser::TPtr sParser)
+ : TBase(config, sParser) {
+ }
+
+ STFUNC(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvSubscribe, Handle);
+ hFunc(TEvUnsubscribe, Handle);
+ default:
+ TBase::StateMain(ev, ctx);
+ }
+ }
+
+
+ virtual bool Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) override;
+ void Handle(TEvSubscribe::TPtr& context);
+ void Handle(TEvUnsubscribe::TPtr& context);
+};
+
+class TExternalData: public TDSAccessorNotifier {
+private:
+ using TBase = TDSAccessorNotifier;
+ Ydb::Table::CreateTableRequest ExternalCreateTableRequest;
+ TString TableName;
+protected:
+ virtual TString GetTableName() const override {
+ return TableName;
+ }
+ virtual Ydb::Table::CreateTableRequest GetTableSchema() const override {
+ return ExternalCreateTableRequest;
+ }
+public:
+ TExternalData(const TConfig& config, ISnapshotParser::TPtr sParser)
+ : TBase(config, sParser)
+ , ExternalCreateTableRequest(sParser->GetTableSchema())
+ , TableName(ExternalCreateTableRequest.path()) {
+
+ }
+};
+
+}
diff --git a/ydb/services/metadata/ds_table/config.cpp b/ydb/services/metadata/ds_table/config.cpp
new file mode 100644
index 0000000000..b202f01e78
--- /dev/null
+++ b/ydb/services/metadata/ds_table/config.cpp
@@ -0,0 +1,24 @@
+#include "config.h"
+#include <util/generic/ylimits.h>
+
+namespace NKikimr::NMetadataProvider {
+
+bool TConfig::DeserializeFromProto(const NKikimrConfig::TMetadataProviderConfig& config) {
+ EnabledFlag = config.GetEnabled();
+ RefreshPeriod = TDuration::Seconds(config.GetRefreshPeriodSeconds());
+ RetryPeriodStart = TDuration::Seconds(config.GetRetryPeriodStartSeconds());
+ RetryPeriodFinish = TDuration::Seconds(config.GetRetryPeriodFinishSeconds());
+ if (RetryPeriodStart > RetryPeriodFinish) {
+ Cerr << "incorrect metadata provider config start/finish periods";
+ std::swap(RetryPeriodStart, RetryPeriodFinish);
+ return false;
+ }
+ return true;
+}
+
+TDuration TConfig::GetRetryPeriod(const ui32 retry) const {
+ const double kff = 1.0 / Max<double>(1.0, retry);
+ return RetryPeriodStart * kff + RetryPeriodFinish * (1 - kff);
+}
+
+}
diff --git a/ydb/services/metadata/ds_table/config.h b/ydb/services/metadata/ds_table/config.h
new file mode 100644
index 0000000000..47ef42fed9
--- /dev/null
+++ b/ydb/services/metadata/ds_table/config.h
@@ -0,0 +1,20 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/protos/config.pb.h>
+#include <util/datetime/base.h>
+
+namespace NKikimr::NMetadataProvider {
+
+class TConfig {
+private:
+ YDB_READONLY(TDuration, RefreshPeriod, TDuration::Seconds(10));
+ TDuration RetryPeriodStart = TDuration::Seconds(3);
+ TDuration RetryPeriodFinish = TDuration::Seconds(30);
+ YDB_READONLY_FLAG(Enabled, true);
+public:
+ TConfig() = default;
+
+ TDuration GetRetryPeriod(const ui32 retry) const;
+ bool DeserializeFromProto(const NKikimrConfig::TMetadataProviderConfig& config);
+};
+}
diff --git a/ydb/services/metadata/ds_table/request_actor.cpp b/ydb/services/metadata/ds_table/request_actor.cpp
new file mode 100644
index 0000000000..f9b541e80c
--- /dev/null
+++ b/ydb/services/metadata/ds_table/request_actor.cpp
@@ -0,0 +1,5 @@
+#include "request_actor.h"
+
+namespace NKikimr::NMetadataProvider {
+
+}
diff --git a/ydb/services/metadata/ds_table/request_actor.h b/ydb/services/metadata/ds_table/request_actor.h
new file mode 100644
index 0000000000..5a2f44be52
--- /dev/null
+++ b/ydb/services/metadata/ds_table/request_actor.h
@@ -0,0 +1,138 @@
+#pragma once
+#include "config.h"
+
+#include <library/cpp/actors/core/actor_virtual.h>
+#include <library/cpp/actors/core/av_bootstrapped.h>
+#include <library/cpp/actors/core/log.h>
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/services/metadata/abstract/common.h>
+
+namespace NKikimr::NMetadataProvider {
+
+template <class TRequestExt, class TResponseExt, ui32 EvStartExt, ui32 EvResultInternalExt, ui32 EvResultExt>
+class TDialogPolicyImpl {
+public:
+ using TRequest = TRequestExt;
+ using TResponse = TResponseExt;
+ static constexpr ui32 EvStart = EvStartExt;
+ static constexpr ui32 EvResultInternal = EvResultInternalExt;
+ static constexpr ui32 EvResult = EvResultExt;
+};
+
+using TDialogCreateTable = TDialogPolicyImpl<Ydb::Table::CreateTableRequest, Ydb::Table::CreateTableResponse,
+ EEvSubscribe::EvCreateTableRequest, EEvSubscribe::EvCreateTableInternalResponse, EEvSubscribe::EvCreateTableResponse>;
+using TDialogSelect = TDialogPolicyImpl<Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse,
+ EEvSubscribe::EvSelectRequest, EEvSubscribe::EvSelectInternalResponse, EEvSubscribe::EvSelectResponse>;
+using TDialogCreateSession = TDialogPolicyImpl<Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse,
+ EEvSubscribe::EvCreateSessionRequest, EEvSubscribe::EvCreateSessionInternalResponse, EEvSubscribe::EvCreateSessionResponse>;
+
+template <class TDialogPolicy>
+class TEvRequestResult: public NActors::TEventLocal<TEvRequestResult<TDialogPolicy>, TDialogPolicy::EvResult> {
+private:
+ YDB_READONLY_DEF(typename TDialogPolicy::TResponse, Result);
+public:
+ TEvRequestResult(typename TDialogPolicy::TResponse&& result)
+ : Result(std::move(result))
+ {
+
+ }
+};
+
+template <class TResponse>
+class TOperatorChecker {
+public:
+ static bool IsSuccess(const TResponse& r) {
+ return r.operation().status() == Ydb::StatusIds::SUCCESS;
+ }
+};
+
+template <>
+class TOperatorChecker<Ydb::Table::CreateTableResponse> {
+public:
+ static bool IsSuccess(const Ydb::Table::CreateTableResponse& r) {
+ return r.operation().status() == Ydb::StatusIds::SUCCESS ||
+ r.operation().status() != Ydb::StatusIds::ALREADY_EXISTS;
+ }
+};
+
+template <class TDialogPolicy>
+class TYDBRequest: public NActors::TActorBootstrapped<TYDBRequest<TDialogPolicy>> {
+private:
+ using TBase = NActors::TActorBootstrapped<TYDBRequest<TDialogPolicy>>;
+ using TRequest = typename TDialogPolicy::TRequest;
+ using TResponse = typename TDialogPolicy::TResponse;
+ using TSelf = TYDBRequest<TDialogPolicy>;
+ TRequest ProtoRequest;
+ const NActors::TActorId ActorFinishId;
+ const TConfig& Config;
+ ui32 Retry = 0;
+protected:
+ class TEvRequestInternalResult: public NActors::TEventLocal<TEvRequestInternalResult, TDialogPolicy::EvResultInternal> {
+ private:
+ YDB_READONLY_DEF(NThreading::TFuture<typename TDialogPolicy::TResponse>, Future);
+ public:
+ TEvRequestInternalResult(const NThreading::TFuture<typename TDialogPolicy::TResponse>& f)
+ : Future(f) {
+
+ }
+ };
+ class TEvRequestStart: public NActors::TEventLocal<TEvRequestStart, TDialogPolicy::EvStart> {
+ public:
+ };
+
+public:
+ void Bootstrap(const TActorContext& /*ctx*/) {
+ TBase::Become(&TBase::TThis::StateMain);
+ TBase::template Sender<TEvRequestStart>().SendTo(TBase::SelfId());
+ }
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvRequestInternalResult, Handle);
+ hFunc(TEvRequestStart, Handle);
+ default:
+ break;
+ }
+ }
+ void Handle(typename TEvRequestInternalResult::TPtr& ev) {
+ if (!ev->Get()->GetFuture().HasValue() || ev->Get()->GetFuture().HasException()) {
+ ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot receive result on initialization";
+ TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart);
+ return;
+ }
+ auto f = ev->Get()->GetFuture();
+ TResponse response = f.ExtractValue();
+ if (!TOperatorChecker<TResponse>::IsSuccess(response)) {
+ ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "incorrect reply: " << response.DebugString();
+ TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart);
+ return;
+ }
+ TBase::template Sender<TEvRequestResult<TDialogPolicy>>(std::move(response)).SendTo(ActorFinishId);
+ TBase::Die(TActivationContext::AsActorContext());
+ }
+
+ void Handle(typename TEvRequestStart::TPtr& /*ev*/) {
+ auto aSystem = TActivationContext::ActorSystem();
+ using TRpcRequest = NGRpcService::TGrpcRequestOperationCall<TRequest, TResponse>;
+ auto request = ProtoRequest;
+ auto result = NRpcService::DoLocalRpc<TRpcRequest>(std::move(request), AppData()->TenantName, "", aSystem);
+ const NActors::TActorId selfId = TBase::SelfId();
+ const auto replyCallback = [aSystem, selfId](const NThreading::TFuture<TResponse>& f) {
+ aSystem->Send(selfId, new TEvRequestInternalResult(f));
+ };
+ result.Subscribe(replyCallback);
+ }
+
+ TYDBRequest(const TRequest& request, const NActors::TActorId actorFinishId, const TConfig& config)
+ : ProtoRequest(request)
+ , ActorFinishId(actorFinishId)
+ , Config(config)
+ {
+
+ }
+};
+
+}
diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp
new file mode 100644
index 0000000000..c083afe97e
--- /dev/null
+++ b/ydb/services/metadata/ds_table/service.cpp
@@ -0,0 +1,33 @@
+#include "service.h"
+
+#include "accessor_subscribe.h"
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+#include <ydb/core/grpc_services/grpc_request_proxy.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/services/metadata/service.h>
+
+namespace NKikimr::NMetadataProvider {
+
+IActor* CreateService(const TConfig& config) {
+ return new TService(config);
+}
+
+void TService::Handle(TEvSubscribeExternal::TPtr& ev) {
+ auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetTablePath());
+ if (it == Accessors.end()) {
+ THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetSnapshotParser());
+ it = Accessors.emplace(ev->Get()->GetSnapshotParser()->GetTablePath(), Register(actor.Release())).first;
+ }
+ Send<TEvSubscribe>(it->second, ev->Sender);
+}
+
+void TService::Handle(TEvUnsubscribeExternal::TPtr& ev) {
+ auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetTablePath());
+ if (it != Accessors.end()) {
+ Send<TEvUnsubscribe>(it->second, ev->Sender);
+ }
+}
+
+}
diff --git a/ydb/services/metadata/ds_table/service.h b/ydb/services/metadata/ds_table/service.h
new file mode 100644
index 0000000000..e55d90ac91
--- /dev/null
+++ b/ydb/services/metadata/ds_table/service.h
@@ -0,0 +1,39 @@
+#pragma once
+#include "config.h"
+
+#include <ydb/services/metadata/service.h>
+#include <library/cpp/actors/core/hfunc.h>
+
+namespace NKikimr::NMetadataProvider {
+
+class TService: public NActors::TActorBootstrapped<TService> {
+private:
+ using TBase = NActors::TActor<TService>;
+ std::map<TString, NActors::TActorId> Accessors;
+ const TConfig Config;
+public:
+ void Handle(TEvSubscribeExternal::TPtr& ev);
+ void Handle(TEvUnsubscribeExternal::TPtr& ev);
+
+ void Bootstrap(const NActors::TActorContext& /*ctx*/) {
+ Become(&TThis::StateMain);
+ }
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvSubscribeExternal, Handle);
+ hFunc(TEvUnsubscribeExternal, Handle);
+ default:
+ Y_VERIFY(false);
+ }
+ }
+
+ TService(const TConfig& config)
+ : Config(config)
+ {
+ }
+};
+
+NActors::IActor* CreateService(const TConfig& config);
+
+}
diff --git a/ydb/services/metadata/service.cpp b/ydb/services/metadata/service.cpp
new file mode 100644
index 0000000000..736d41bfc5
--- /dev/null
+++ b/ydb/services/metadata/service.cpp
@@ -0,0 +1,9 @@
+#include "service.h"
+
+namespace NKikimr::NMetadataProvider {
+
+NActors::TActorId MakeServiceId(ui32 node) {
+ return NActors::TActorId(node, "SrvcMetaData");
+}
+
+}
diff --git a/ydb/services/metadata/service.h b/ydb/services/metadata/service.h
new file mode 100644
index 0000000000..72800a06d2
--- /dev/null
+++ b/ydb/services/metadata/service.h
@@ -0,0 +1,29 @@
+#pragma once
+#include <ydb/services/metadata/abstract/common.h>
+#include <library/cpp/actors/core/event_local.h>
+
+namespace NKikimr::NMetadataProvider {
+
+class TEvSubscribeExternal: public NActors::TEventLocal<TEvSubscribeExternal, EEvSubscribe::EvSubscribeExternal> {
+private:
+ YDB_READONLY_DEF(ISnapshotParser::TPtr, SnapshotParser);
+public:
+ TEvSubscribeExternal(ISnapshotParser::TPtr parser)
+ : SnapshotParser(parser) {
+ Y_VERIFY(!!SnapshotParser);
+ }
+};
+
+class TEvUnsubscribeExternal: public NActors::TEventLocal<TEvUnsubscribeExternal, EEvSubscribe::EvUnsubscribeExternal> {
+private:
+ YDB_READONLY_DEF(ISnapshotParser::TPtr, SnapshotParser);
+public:
+ TEvUnsubscribeExternal(ISnapshotParser::TPtr parser)
+ : SnapshotParser(parser) {
+ Y_VERIFY(!!SnapshotParser);
+ }
+};
+
+NActors::TActorId MakeServiceId(ui32 node = 0);
+
+}