diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-10-23 10:01:09 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-10-23 10:01:09 +0300 |
commit | b0391b746c94239133124589d8387ce982c60c91 (patch) | |
tree | 0d18fb5d29d0d9d9c09d965f94c00b1f4056ffbf | |
parent | 3dcc4596463443e3f7dd3191c29cc2e02bc0f36f (diff) | |
download | ydb-b0391b746c94239133124589d8387ce982c60c91.tar.gz |
add metadata service
39 files changed, 1242 insertions, 8 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index b17cc32123..f30912e074 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -148,6 +148,7 @@ struct TKikimrEvents : TEvents { ES_HTTP_PROXY, ES_BLOB_DEPOT, ES_DATASHARD_LOAD, + ES_METADATA_PROVIDER, }; }; diff --git a/ydb/core/driver_lib/run/CMakeLists.txt b/ydb/core/driver_lib/run/CMakeLists.txt index e4bdb347da..1dad9985c2 100644 --- a/ydb/core/driver_lib/run/CMakeLists.txt +++ b/ydb/core/driver_lib/run/CMakeLists.txt @@ -122,6 +122,8 @@ target_link_libraries(run PUBLIC ydb-services-fq ydb-services-kesus ydb-services-local_discovery + services-metadata-ds_table + ydb-services-metadata ydb-services-monitoring ydb-services-persqueue_cluster_discovery ydb-services-persqueue_v1 diff --git a/ydb/core/driver_lib/run/config.h b/ydb/core/driver_lib/run/config.h index 19d597af21..f90503bf41 100644 --- a/ydb/core/driver_lib/run/config.h +++ b/ydb/core/driver_lib/run/config.h @@ -65,6 +65,7 @@ union TBasicKikimrServicesMask { bool EnableYandexQuery:1; bool EnableSequenceProxyService:1; bool EnableHttpProxy:1; + bool EnableMetadataProvider : 1; }; ui64 Raw; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 4fd6d21632..ecdb212a4e 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -148,6 +148,9 @@ #include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> #include <ydb/library/yql/parser/pg_wrapper/interface/comp_factory.h> +#include <ydb/services/metadata/ds_table/service.h> +#include <ydb/services/metadata/service.h> + #include <library/cpp/actors/protos/services_common.pb.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -2011,6 +2014,25 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu } } +TMetadataProviderInitializer::TMetadataProviderInitializer(const TKikimrRunConfig& runConfig) + : IKikimrServicesInitializer(runConfig) +{ +} + +void TMetadataProviderInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) { + NMetadataProvider::TConfig serviceConfig; + if (Config.HasMetadataProviderConfig()) { + Y_VERIFY(serviceConfig.DeserializeFromProto(Config.GetMetadataProviderConfig())); + } + + if (serviceConfig.IsEnabled()) { + auto service = NMetadataProvider::CreateService(serviceConfig); + setup->LocalServices.push_back(std::make_pair( + NMetadataProvider::MakeServiceId(), + TActorSetupCmd(service, TMailboxType::HTSwap, appData->UserPoolId))); + } +} + TMemoryLogInitializer::TMemoryLogInitializer( const TKikimrRunConfig& runConfig) : IKikimrServicesInitializer(runConfig) diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.h b/ydb/core/driver_lib/run/kikimr_services_initializers.h index 648026c5ac..9ec1bdccf4 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.h +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.h @@ -383,6 +383,12 @@ private: std::shared_ptr<TModuleFactories> Factories; }; +class TMetadataProviderInitializer: public IKikimrServicesInitializer { +public: + TMetadataProviderInitializer(const TKikimrRunConfig& runConfig); + void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override; +}; + class TMemoryLogInitializer : public IKikimrServicesInitializer { size_t LogBufferSize = 0; size_t LogGrainSize = 0; diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index e5ba487e6f..7ad5a71684 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -1429,6 +1429,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers sil->AddServiceInitializer(new TKqpServiceInitializer(runConfig, ModuleFactories)); } + if (serviceMask.EnableMetadataProvider) { + sil->AddServiceInitializer(new TMetadataProviderInitializer(runConfig)); + } + if (serviceMask.EnableCms) { sil->AddServiceInitializer(new TCmsServiceInitializer(runConfig)); } diff --git a/ydb/core/grpc_services/rpc_create_table.cpp b/ydb/core/grpc_services/rpc_create_table.cpp index 3cd152fc70..3dd3ed00a6 100644 --- a/ydb/core/grpc_services/rpc_create_table.cpp +++ b/ydb/core/grpc_services/rpc_create_table.cpp @@ -233,5 +233,10 @@ void DoCreateTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvi TActivationContext::AsActorContext().Register(new TCreateTableRPC(p.release())); } +template<> +IActor* TEvCreateTableRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { + return new TCreateTableRPC(msg); +} + } // namespace NGRpcService } // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index 375edaea36..5f62ffefae 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -260,5 +260,10 @@ void DoExecuteDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacility TActivationContext::AsActorContext().Register(new TExecuteDataQueryRPC(p.release())); } +template<> +IActor* TEvExecuteDataQueryRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { + return new TExecuteDataQueryRPC(msg); +} + } // namespace NGRpcService } // namespace NKikimr diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index e5ff07a69b..9dbbd9d748 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -574,6 +574,13 @@ message TKQPConfig { repeated NKikimrKqp.TKqpSetting Settings = 10; } +message TMetadataProviderConfig { + optional bool Enabled = 1 [default = true]; + optional uint32 RefreshPeriodSeconds = 2 [default = 10]; + optional uint32 RetryPeriodStartSeconds = 3 [default = 3]; + optional uint32 RetryPeriodFinishSeconds = 4 [default = 30]; +} + message TMemoryLogConfig { optional uint64 LogBufferSize = 1; optional uint64 LogGrainSize = 2; @@ -1633,6 +1640,7 @@ message TAppConfig { optional TTracingConfig TracingConfig = 55; optional TFailureInjectionConfig FailureInjectionConfig = 56; optional THttpProxyConfig PublicHttpConfig = 57; + optional TMetadataProviderConfig MetadataProviderConfig = 59; optional NYq.NConfig.TConfig YandexQueryConfig = 50; // TODO: remove after migration to FederatedQueryConfig diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 05b2da81c7..c2c24e2f06 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -326,6 +326,10 @@ enum EServiceKikimr { // DATASHARD section again DS_LOAD_TEST = 1400; + + // MetadataProvider + METADATA_PROVIDER = 1500; + }; message TActivity { diff --git a/ydb/core/testlib/CMakeLists.txt b/ydb/core/testlib/CMakeLists.txt index 98c3ba82f1..c5ab86c1f7 100644 --- a/ydb/core/testlib/CMakeLists.txt +++ b/ydb/core/testlib/CMakeLists.txt @@ -61,6 +61,7 @@ target_link_libraries(ydb-core-testlib PUBLIC core-tx-long_tx_service core-tx-mediator tx-replication-controller + core-tx-schemeshard core-tx-sequenceproxy core-tx-sequenceshard core-tx-time_cast @@ -90,6 +91,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-services-persqueue_v1 ydb-services-rate_limiter ydb-services-monitoring + services-metadata-ds_table ydb-services-ydb ydb-services-yq ydb-core-http_proxy diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index c82ee285c2..c8c8bfac98 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -88,6 +88,8 @@ #include <ydb/library/security/ydb_credentials_provider_factory.h> #include <ydb/core/yq/libs/init/init.h> #include <ydb/core/yq/libs/mock/yql_mock.h> +#include <ydb/services/metadata/ds_table/service.h> +#include <ydb/services/metadata/service.h> #include <ydb/library/folder_service/mock/mock_folder_service.h> #include <ydb/core/client/server/msgbus_server_tracer.h> @@ -352,7 +354,50 @@ namespace Tests { ); } - void TServer::SetupDomains(TAppPrepare &app) { + void TServer::SetupRootStoragePools(const TActorId sender) const { + if (GetSettings().StoragePoolTypes.empty()) { + return; + } + + auto& runtime = *GetRuntime(); + auto& settings = GetSettings(); + + auto tid = ChangeStateStorage(SchemeRoot, settings.Domain); + const TDomainsInfo::TDomain& domain = runtime.GetAppData().DomainsInfo->GetDomain(settings.Domain); + + auto evTx = MakeHolder<NSchemeShard::TEvSchemeShard::TEvModifySchemeTransaction>(1, tid); + auto transaction = evTx->Record.AddTransaction(); + transaction->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain); + transaction->SetWorkingDir("/"); + auto op = transaction->MutableSubDomain(); + op->SetName(domain.Name); + + for (const auto& [kind, pool] : settings.StoragePoolTypes) { + auto* p = op->AddStoragePools(); + p->SetKind(kind); + p->SetName(pool.GetName()); + } + + runtime.SendToPipe(tid, sender, evTx.Release(), 0, GetPipeConfigWithRetries()); + + { + TAutoPtr<IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<NSchemeShard::TEvSchemeShard::TEvModifySchemeTransactionResult>(handle); + UNIT_ASSERT_VALUES_EQUAL(event->Record.GetSchemeshardId(), tid); + UNIT_ASSERT_VALUES_EQUAL(event->Record.GetStatus(), NKikimrScheme::EStatus::StatusAccepted); + } + + auto evSubscribe = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(1); + runtime.SendToPipe(tid, sender, evSubscribe.Release(), 0, GetPipeConfigWithRetries()); + + { + TAutoPtr<IEventHandle> handle; + auto event = runtime.GrabEdgeEvent<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(handle); + UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), 1); + } + } + + void TServer::SetupDomains(TAppPrepare& app) { const ui32 domainId = Settings->Domain; ui64 planResolution = Settings->DomainPlanResolution; if (!planResolution) { @@ -579,6 +624,11 @@ namespace Tests { auto aid = Runtime->Register(dispatcher, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NConsole::MakeConfigsDispatcherID(Runtime->GetNodeId(nodeIdx)), aid); } + if (Settings->IsEnableMetadataProvider()) { + auto* actor = NMetadataProvider::CreateService(NMetadataProvider::TConfig()); + const auto aid = Runtime->Register(actor, nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); + Runtime->RegisterService(NMetadataProvider::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid); + } Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); auto sysViewService = NSysView::CreateSysViewServiceForTests(); diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index 9326e9718f..2401c11e54 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -17,7 +17,6 @@ #include <ydb/library/yql/minikql/mkql_function_registry.h> #include <ydb/library/mkql_proto/protos/minikql.pb.h> #include <ydb/core/protos/flat_scheme_op.pb.h> -#include <library/cpp/grpc/server/grpc_server.h> #include <ydb/core/testlib/basics/runtime.h> #include <ydb/core/testlib/basics/appdata.h> #include <ydb/core/protos/kesus.pb.h> @@ -27,6 +26,9 @@ #include <ydb/core/persqueue/actor_persqueue_client_iface.h> #include <ydb/core/yq/libs/shared_resources/interface/shared_resources.h> #include <ydb/core/http_proxy/auth_factory.h> +#include <ydb/library/accessor/accessor.h> + +#include <library/cpp/grpc/server/grpc_server.h> #include <google/protobuf/text_format.h> @@ -209,6 +211,8 @@ namespace Tests { TServerSettings(const TServerSettings& settings) = default; TServerSettings& operator=(const TServerSettings& settings) = default; + private: + YDB_FLAG_ACCESSOR(EnableMetadataProvider, true); }; class TServer : public TThrRefBase, TMoveOnly { @@ -240,6 +244,7 @@ namespace Tests { void EnableGRpc(const NGrpc::TServerOptions& options); void EnableGRpc(ui16 port); + void SetupRootStoragePools(const TActorId sender) const; void SetupDefaultProfiles(); diff --git a/ydb/core/tx/columnshard/CMakeLists.txt b/ydb/core/tx/columnshard/CMakeLists.txt index 74b573329e..f1c69be237 100644 --- a/ydb/core/tx/columnshard/CMakeLists.txt +++ b/ydb/core/tx/columnshard/CMakeLists.txt @@ -18,6 +18,7 @@ target_link_libraries(core-tx-columnshard PUBLIC yutil tools-enum_parser-enum_serialization_runtime cpp-actors-core + ydb-services-metadata ydb-core-actorlib_impl ydb-core-base core-blobstorage-dsproxy @@ -61,6 +62,7 @@ target_sources(core-tx-columnshard PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/compaction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/eviction_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/export_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/external_data.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/indexing_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/read_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/write_actor.cpp diff --git a/ydb/core/tx/columnshard/external_data.cpp b/ydb/core/tx/columnshard/external_data.cpp new file mode 100644 index 0000000000..e75ec77db8 --- /dev/null +++ b/ydb/core/tx/columnshard/external_data.cpp @@ -0,0 +1,63 @@ +#include "external_data.h" + +namespace NKikimr::NColumnShard { + +std::optional<TString> TCSKVSnapshot::GetValue(const TString& key) const { + auto it = Data.find(key); + if (it == Data.end()) { + return {}; + } else { + return it->second; + } +} + +bool TCSKVSnapshot::DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) { + if (rawData.columns().size() != 2) { + Cerr << "incorrect proto columns info" << Endl; + return false; + } + i32 keyIdx = -1; + i32 valueIdx = -1; + ui32 idx = 0; + for (auto&& i : rawData.columns()) { + if (i.name() == "key") { + keyIdx = idx; + } else if (i.name() == "value") { + valueIdx = idx; + } + ++idx; + } + if (keyIdx < 0 || valueIdx < 0) { + Cerr << "incorrect table columns"; + return false; + } + for (auto&& r : rawData.rows()) { + TString key(r.items()[keyIdx].bytes_value()); + TString value(r.items()[valueIdx].bytes_value()); + if (!Data.emplace(key, value).second) { + Cerr << "keys duplication: " << key; + return false; + } + } + return true; +} + +Ydb::Table::CreateTableRequest TSnapshotConstructor::DoGetTableSchema() const { + Ydb::Table::CreateTableRequest request; + request.set_session_id(""); + request.set_path(TablePath); + request.add_primary_key("key"); + { + auto& column = *request.add_columns(); + column.set_name("key"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + { + auto& column = *request.add_columns(); + column.set_name("value"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } + return request; +} + +} diff --git a/ydb/core/tx/columnshard/external_data.h b/ydb/core/tx/columnshard/external_data.h new file mode 100644 index 0000000000..500e7d94d7 --- /dev/null +++ b/ydb/core/tx/columnshard/external_data.h @@ -0,0 +1,42 @@ +#pragma once +#include <ydb/services/metadata/service.h> + +namespace NKikimr::NColumnShard { + +class TCSKVSnapshot: public NMetadataProvider::ISnapshot { +private: + using TBase = NMetadataProvider::ISnapshot; + using TKVData = TMap<TString, TString>; + YDB_READONLY_DEF(TKVData, Data); +protected: + virtual bool DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) override; + virtual TString DoSerializeToString() const override { + TStringBuilder sb; + for (auto&& i : Data) { + sb << i.first << "=" << i.second << ";"; + } + return sb; + } +public: + std::optional<TString> GetValue(const TString& key) const; + + using TBase::TBase; +}; + +class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TCSKVSnapshot> { +private: + const TString TablePath; +protected: + virtual Ydb::Table::CreateTableRequest DoGetTableSchema() const override; + virtual const TString& DoGetTablePath() const override { + return TablePath; + } +public: + TSnapshotConstructor(const TString& tablePath) + : TablePath(tablePath) + { + + } +}; + +} diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt b/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt index e61780de69..ac70305047 100644 --- a/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt +++ b/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt @@ -24,6 +24,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut PUBLIC cpp-regex-pcre library-cpp-svnversion core-testlib-default + ydb-services-metadata ydb-core-tx public-lib-yson_value ) diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt b/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt index 62ba940e34..4de436d122 100644 --- a/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt +++ b/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt @@ -26,6 +26,7 @@ target_link_libraries(ydb-core-tx-columnshard-ut PUBLIC cpp-regex-pcre library-cpp-svnversion core-testlib-default + ydb-services-metadata ydb-core-tx public-lib-yson_value ) diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp index e8e6a8e65a..c0aebd1889 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp @@ -1,6 +1,16 @@ #include "columnshard_ut_common.h" +#include "external_data.h" #include <ydb/core/wrappers/ut_helpers/s3_mock.h> #include <ydb/core/wrappers/s3_wrapper.h> +#include <ydb/services/metadata/service.h> +#include <ydb/core/cms/console/configs_dispatcher.h> +//#include <ydb/core/testlib/cs_helper.h> +#include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +#include <library/cpp/actors/core/av_bootstrapped.h> + #include <util/system/hostname.h> namespace NKikimr { @@ -574,8 +584,6 @@ void TestHotAndColdTiers(bool reboot) { TString bucket = "ydb"; TPortManager portManager; const ui16 port = portManager.GetPort(); - const TString connString = "fake"; - Cerr << "S3 at " << connString << "\n"; TS3Mock s3Mock({}, TS3Mock::TSettings(port)); UNIT_ASSERT(s3Mock.Start()); @@ -591,14 +599,14 @@ void TestHotAndColdTiers(bool reboot) { s3Config.SetVerifySSL(false); #if 0 s3Config.SetEndpoint("storage.cloud-preprod.yandex.net"); - s3Config.SetBucket("ch-s3"); - s3Config.SetAccessKey(); < -- - s3Config.SetSecretKey(); < -- + s3Config.SetBucket("tiering-test-01"); + s3Config.SetAccessKey("..."); + s3Config.SetSecretKey("..."); s3Config.SetProxyHost("localhost"); s3Config.SetProxyPort(8080); s3Config.SetProxyScheme(NKikimrSchemeOp::TS3Settings::HTTP); #else - s3Config.SetEndpoint(connString); + s3Config.SetEndpoint("fake"); s3Config.SetBucket(bucket); #endif s3Config.SetRequestTimeoutMs(10000); @@ -694,8 +702,238 @@ void TestDrop(bool reboots) { namespace NColumnShard { extern bool gAllowLogBatchingDefaultValue; } +/* +class TLocalHelper: public Tests::NCS::THelper { +private: + using TBase = Tests::NCS::THelper; +public: + using TBase::TBase; + void CreateTestOlapTable(TString tableName = "olapTable", TString storeName = "olapStore", + ui32 storeShardsCount = 4, ui32 tableShardsCount = 3, + TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") { + TActorId sender = Server.GetRuntime()->AllocateEdgeActor(); + CreateTestOlapStore(sender, Sprintf(R"( + Name: "%s" + ColumnShardCount: %d + SchemaPresets { + Name: "default" + Schema { + Columns { Name: "timestamp" Type: "Timestamp" } + #Columns { Name: "resource_type" Type: "Utf8" } + Columns { Name: "resource_id" Type: "Utf8" } + Columns { Name: "uid" Type: "Utf8" } + Columns { Name: "level" Type: "Int32" } + Columns { Name: "message" Type: "Utf8" } + #Columns { Name: "json_payload" Type: "Json" } + #Columns { Name: "ingested_at" Type: "Timestamp" } + #Columns { Name: "saved_at" Type: "Timestamp" } + #Columns { Name: "request_id" Type: "Utf8" } + KeyColumnNames: "timestamp" + Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + StorageTiers { + Name: "tier1" + ObjectStorage { + Endpoint: "fake" + AccessKey: "$a" + } + } + } + } + )", storeName.c_str(), storeShardsCount)); + + TString shardingColumns = "[\"timestamp\", \"uid\"]"; + if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") { + shardingColumns = "[\"uid\"]"; + } + + TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"( + Name: "%s" + ColumnShardCount: %d + Sharding { + HashSharding { + Function: %s + Columns: %s + } + })", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str())); + } +}; +*/ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { + class TTestCSEmulator: public NActors::TActorBootstrapped<TTestCSEmulator> { + private: + using TBase = NActors::TActorBootstrapped<TTestCSEmulator>; + std::shared_ptr<TSnapshotConstructor> ExternalDataManipulation; + TActorId ProviderId; + TInstant Start; + YDB_READONLY_FLAG(Found, false); + public: + STFUNC(StateInit) { + switch (ev->GetTypeRewrite()) { + HFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle); + default: + Y_VERIFY(false); + } + } + + void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev, const TActorContext&) { + auto value = ev->Get()->GetSnapshotAs<TCSKVSnapshot>()->GetValue("/Root/olapStore.tier1.a"); + if (value && *value == "b") { + FoundFlag = true; + } else { + Cerr << ev->Get()->GetSnapshot()->SerializeToString() << Endl; + } + } + + void Bootstrap() { + ProviderId = NMetadataProvider::MakeServiceId(1); + ExternalDataManipulation = std::make_shared<TSnapshotConstructor>("/Root/.external_data/kv_settings"); + Become(&TThis::StateInit); + Sender<NMetadataProvider::TEvSubscribeExternal>(ExternalDataManipulation).SendTo(ProviderId); + Start = Now(); + } + }; + + Y_UNIT_TEST(DSConfigsStub) { + TPortManager pm; + + ui32 grpcPort = pm.GetPort(); + ui32 msgbPort = pm.GetPort(); + + Tests::TServerSettings serverSettings(msgbPort); + serverSettings.Port = msgbPort; + serverSettings.GrpcPort = grpcPort; + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMetadataProvider(true) + .SetEnableKqpSessionActor(true) + .SetEnableOlapSchemaOperations(true); + ; + + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + server->EnableGRpc(grpcPort); + // server->SetupDefaultProfiles(); + + Tests::TClient client(serverSettings); + + auto& runtime = *server->GetRuntime(); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + { + TTestCSEmulator* emulator = new TTestCSEmulator; + runtime.Register(emulator); + { + const TInstant start = Now(); + while (Now() - start < TDuration::Seconds(10)) { + runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { + Cerr << "Step " << event->Type << Endl; + return false; + }, {}, TDuration::Seconds(1)); + Sleep(TDuration::Seconds(1)); + Cerr << "Step finished" << Endl; + } + } + + NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto sResult = f.GetValueSync(); + Cerr << sResult.GetIssues().ToString() << Endl; + auto session = sResult.GetSession(); + session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/kv_settings` (key, value) VALUES ('/Root/olapStore.tier1.a', 'b')" + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + }); + + const TInstant start = Now(); + while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { + runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10)); + } + Y_VERIFY(emulator->IsFound()); + } + //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); + //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE); + } + + /* + Y_UNIT_TEST(DSConfigs) { + TPortManager pm; + + ui32 grpcPort = pm.GetPort(); + ui32 msgbPort = pm.GetPort(); + + Tests::TServerSettings serverSettings(msgbPort); + serverSettings.Port = msgbPort; + serverSettings.GrpcPort = grpcPort; + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMetadataProvider(true) + .SetEnableKqpSessionActor(true) + .SetEnableOlapSchemaOperations(true); + ; + + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + server->EnableGRpc(grpcPort); +// server->SetupDefaultProfiles(); + + Tests::TClient client(serverSettings); + + auto& runtime = *server->GetRuntime(); + + auto sender = runtime.AllocateEdgeActor(); + server->InitRoot(sender); + TLocalHelper lHelper(*server); + lHelper.CreateTestOlapTable(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + const TInstant start = Now(); + while (Now() - start < TDuration::Seconds(10)) { + runtime.WaitForEdgeEvents([](TTestActorRuntimeBase& , TAutoPtr<IEventHandle>& event) { + Cerr << "Step " << event->Type << Endl; + return false; + }, {}, TDuration::Seconds(1)); + Sleep(TDuration::Seconds(1)); + Cerr << "Step finished" << Endl; + } + + NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto sResult = f.GetValueSync(); + Cerr << sResult.GetIssues().ToString() << Endl; + auto session = sResult.GetSession(); + session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/kv_settings` (key, value) VALUES ('/Root/olapStore.tier1.a', 'b')" + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + }); + + bool found = false; + bool* foundPtr = &found; + const auto pred = [foundPtr](TTestActorRuntimeBase& , TAutoPtr<IEventHandle>& event)->TTestActorRuntimeBase::EEventAction { + if (event->HasBuffer() && !event->HasEvent()) { + } else if (!event->GetBase()) { + Cerr << "Type nullptr" << Endl; + } else { + Cerr << "Step " << event->GetBase()->Type() << Endl; + auto ptr = dynamic_cast<NMetadataProvider::TEvRefreshSubscriberData*>(event->GetBase()); + if (ptr) { + auto value = ptr->GetSnapshotAs<TCSKVSnapshot>()->GetValue("/Root/olapStore.tier1.a"); + if (value && *value == "b") { + *foundPtr = true; + } else { + Cerr << ptr->GetSnapshot()->SerializeToString() << Endl; + } + } + } + return TTestActorRuntimeBase::EEventAction::PROCESS; + }; + + runtime.SetObserverFunc(pred); + + while (!found) { + runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10)); + } + //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); + //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE); + } +*/ Y_UNIT_TEST(CreateTable) { ui64 tableId = 1; diff --git a/ydb/services/CMakeLists.txt b/ydb/services/CMakeLists.txt index 7359c6a379..da7cf03301 100644 --- a/ydb/services/CMakeLists.txt +++ b/ydb/services/CMakeLists.txt @@ -14,6 +14,7 @@ add_subdirectory(fq) add_subdirectory(kesus) add_subdirectory(lib) add_subdirectory(local_discovery) +add_subdirectory(metadata) add_subdirectory(monitoring) add_subdirectory(persqueue_cluster_discovery) add_subdirectory(persqueue_v1) diff --git a/ydb/services/metadata/CMakeLists.txt b/ydb/services/metadata/CMakeLists.txt new file mode 100644 index 0000000000..13463c41c2 --- /dev/null +++ b/ydb/services/metadata/CMakeLists.txt @@ -0,0 +1,21 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(abstract) +add_subdirectory(ds_table) + +add_library(ydb-services-metadata) +target_link_libraries(ydb-services-metadata PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + services-metadata-abstract +) +target_sources(ydb-services-metadata PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/service.cpp +) diff --git a/ydb/services/metadata/abstract/CMakeLists.txt b/ydb/services/metadata/abstract/CMakeLists.txt new file mode 100644 index 0000000000..7ea18da9cf --- /dev/null +++ b/ydb/services/metadata/abstract/CMakeLists.txt @@ -0,0 +1,21 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-metadata-abstract) +target_link_libraries(services-metadata-abstract PUBLIC + contrib-libs-cxxsupp + yutil + ydb-library-accessor + cpp-actors-core + api-protos + ydb-core-base +) +target_sources(services-metadata-abstract PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/abstract/common.cpp +) diff --git a/ydb/services/metadata/abstract/common.cpp b/ydb/services/metadata/abstract/common.cpp new file mode 100644 index 0000000000..fa5484fdbe --- /dev/null +++ b/ydb/services/metadata/abstract/common.cpp @@ -0,0 +1,5 @@ +#include "common.h" + +namespace NKikimr::NMetadataProvider { + +} diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h new file mode 100644 index 0000000000..dd235bacdc --- /dev/null +++ b/ydb/services/metadata/abstract/common.h @@ -0,0 +1,111 @@ +#pragma once +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actorid.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/actor_virtual.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/public/api/protos/ydb_table.pb.h> +#include <ydb/core/base/events.h> + +namespace NKikimr::NMetadataProvider { + +enum EEvSubscribe { + EvRefreshSubscriberData = EventSpaceBegin(TKikimrEvents::ES_METADATA_PROVIDER), + EvCreateTableRequest, + EvCreateTableInternalResponse, + EvCreateTableResponse, + EvSelectRequest, + EvSelectInternalResponse, + EvSelectResponse, + EvCreateSessionRequest, + EvCreateSessionInternalResponse, + EvCreateSessionResponse, + EvRefresh, + EvSubscribeLocal, + EvUnsubscribeLocal, + EvSubscribeExternal, + EvUnsubscribeExternal, + EvEnd +}; + +static_assert(EEvSubscribe::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_TABLET_PIPE)"); + +class ISnapshot { +private: + YDB_READONLY_DEF(TInstant, Actuality); +protected: + virtual bool DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) = 0; + virtual TString DoSerializeToString() const = 0; + +public: + using TPtr = std::shared_ptr<ISnapshot>; + ISnapshot(const TInstant actuality) + : Actuality(actuality) { + + } + + bool DeserializeFromResultSet(const Ydb::ResultSet& rawData) { + return DoDeserializeFromResultSet(rawData); + } + + TString SerializeToString() const { + return DoSerializeToString(); + } + + virtual ~ISnapshot() = default; +}; + +class ISnapshotParser { +protected: + virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const = 0; + virtual Ydb::Table::CreateTableRequest DoGetTableSchema() const = 0; + virtual const TString& DoGetTablePath() const = 0; +public: + using TPtr = std::shared_ptr<ISnapshotParser>; + + ISnapshot::TPtr ParseSnapshot(const Ydb::ResultSet& rawData, const TInstant actuality) const { + ISnapshot::TPtr result = CreateSnapshot(actuality); + Y_VERIFY(result); + if (!result->DeserializeFromResultSet(rawData)) { + return nullptr; + } + return result; + } + + Ydb::Table::CreateTableRequest GetTableSchema() const { + return DoGetTableSchema(); + } + + const TString& GetTablePath() const { + return DoGetTablePath(); + } + + virtual ~ISnapshotParser() = default; +}; + +template <class TSnapshot> +class TGenericSnapshotParser: public ISnapshotParser { +protected: + virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const override { + return std::make_shared<TSnapshot>(actuality); + } +}; + +class TEvRefreshSubscriberData: public NActors::TEventLocal<TEvRefreshSubscriberData, EvRefreshSubscriberData> { +private: + YDB_READONLY_DEF(ISnapshot::TPtr, Snapshot); +public: + TEvRefreshSubscriberData(ISnapshot::TPtr snapshot) + : Snapshot(snapshot) { + + } + + template <class TSnapshot> + const TSnapshot* GetSnapshotAs() const { + return dynamic_cast<const TSnapshot*>(Snapshot.get()); + } +}; + +} diff --git a/ydb/services/metadata/ds_table/CMakeLists.txt b/ydb/services/metadata/ds_table/CMakeLists.txt new file mode 100644 index 0000000000..696e31532c --- /dev/null +++ b/ydb/services/metadata/ds_table/CMakeLists.txt @@ -0,0 +1,27 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-metadata-ds_table) +target_link_libraries(services-metadata-ds_table PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + core-grpc_services-local_rpc + core-grpc_services-base + ydb-core-grpc_services +) +target_sources(services-metadata-ds_table PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_refresh.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/request_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp +) diff --git a/ydb/services/metadata/ds_table/accessor_init.cpp b/ydb/services/metadata/ds_table/accessor_init.cpp new file mode 100644 index 0000000000..e410085817 --- /dev/null +++ b/ydb/services/metadata/ds_table/accessor_init.cpp @@ -0,0 +1,16 @@ +#include "accessor_init.h" +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> + +namespace NKikimr::NMetadataProvider { + +bool TDSAccessorInitialized::Handle(TEvRequestResult<TDialogCreateTable>::TPtr& /*ev*/) { + InitializedFlag = true; + return true; +} + +void TDSAccessorInitialized::Bootstrap(const NActors::TActorContext& /*ctx*/) { + RegisterState(); + Register(new TYDBRequest<TDialogCreateTable>(GetTableSchema(), SelfId(), Config)); +} + +} diff --git a/ydb/services/metadata/ds_table/accessor_init.h b/ydb/services/metadata/ds_table/accessor_init.h new file mode 100644 index 0000000000..708ec3a3dd --- /dev/null +++ b/ydb/services/metadata/ds_table/accessor_init.h @@ -0,0 +1,38 @@ +#pragma once +#include "config.h" +#include "request_actor.h" + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/event_local.h> +#include <library/cpp/threading/future/core/future.h> +#include <library/cpp/actors/core/av_bootstrapped.h> + +namespace NKikimr::NMetadataProvider { + +class TDSAccessorInitialized; + +class TDSAccessorInitialized: public NActors::TActorBootstrapped<TDSAccessorInitialized> { +private: + YDB_READONLY_FLAG(Initialized, false); +protected: + const TConfig& Config; + virtual Ydb::Table::CreateTableRequest GetTableSchema() const = 0; + virtual void RegisterState() = 0; +public: + TDSAccessorInitialized(const TConfig& config) + : Config(config) { + + } + void Bootstrap(const NActors::TActorContext& /*ctx*/); + virtual bool Handle(TEvRequestResult<TDialogCreateTable>::TPtr& ev); + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvRequestResult<TDialogCreateTable>, Handle); + default: + break; + } + } +}; + +} diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp new file mode 100644 index 0000000000..56828775e8 --- /dev/null +++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp @@ -0,0 +1,68 @@ +#include "accessor_refresh.h" +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> + +#include <library/cpp/actors/core/log.h> + +#include <util/string/escape.h> + +namespace NKikimr::NMetadataProvider { + +bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) { + const TString startString = CurrentSelection.SerializeAsString(); + + auto currentFullReply = ev->Get()->GetResult(); + Ydb::Table::ExecuteQueryResult qResult; + currentFullReply.operation().result().UnpackTo(&qResult); + Y_VERIFY(qResult.result_sets().size() == 1); + CurrentSelection = qResult.result_sets()[0]; + auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(CurrentSelection, RequestedActuality); + if (!!parsedSnapshot) { + CurrentSnapshot = parsedSnapshot; + } else { + ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot parse current snapshot"; + } + + RequestedActuality = TInstant::Zero(); + Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); + if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != startString) { + ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << CurrentSelection.DebugString(); + return true; + } + return false; +} + +void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev) { + Ydb::Table::CreateSessionResponse currentFullReply = ev->Get()->GetResult(); + Ydb::Table::CreateSessionResult session; + currentFullReply.operation().result().UnpackTo(&session); + const TString sessionId = session.session_id(); + Y_VERIFY(sessionId); + Ydb::Table::ExecuteDataQueryRequest request; + request.mutable_query()->set_yql_text("SELECT * FROM `" + EscapeC(GetTableName()) + "`"); + request.set_session_id(sessionId); + request.mutable_tx_control()->mutable_begin_tx()->mutable_snapshot_read_only(); + + Register(new TYDBRequest<TDialogSelect>(std::move(request), SelfId(), Config)); +} + +void TDSAccessorRefresher::Handle(TEvRefresh::TPtr& /*ev*/) { + Y_VERIFY(IsInitialized()); + Register(new TYDBRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), SelfId(), Config)); +} + +bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateTable>::TPtr& ev) { + if (!TBase::Handle(ev)) { + return false; + } + Sender<TEvRefresh>().SendTo(SelfId()); + return true; +} + +TDSAccessorRefresher::TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor) + : TBase(config) + , SnapshotConstructor(snapshotConstructor) +{ +} + +} diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h new file mode 100644 index 0000000000..c260f396b1 --- /dev/null +++ b/ydb/services/metadata/ds_table/accessor_refresh.h @@ -0,0 +1,48 @@ +#pragma once +#include "accessor_init.h" +#include <ydb/public/api/protos/ydb_value.pb.h> +#include <ydb/services/metadata/abstract/common.h> + +namespace NKikimr::NMetadataProvider { + +class TDSAccessorRefresher; + +class TEvRefresh: public NActors::TEventLocal<TEvRefresh, EEvSubscribe::EvRefresh> { +public: +}; + +class TDSAccessorRefresher: public TDSAccessorInitialized { +private: + using TBase = TDSAccessorInitialized; + ISnapshotParser::TPtr SnapshotConstructor; + YDB_READONLY_DEF(ISnapshot::TPtr, CurrentSnapshot); + YDB_READONLY_DEF(Ydb::ResultSet, CurrentSelection); + TInstant RequestedActuality = TInstant::Zero(); +protected: + virtual TString GetTableName() const = 0; + bool IsReady() const { + return !!CurrentSnapshot; + } +public: + using TBase::Handle; + virtual bool Handle(TEvRequestResult<TDialogCreateTable>::TPtr& ev) override; + + STFUNC(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvRequestResult<TDialogSelect>, Handle); + hFunc(TEvRequestResult<TDialogCreateTable>, Handle); + hFunc(TEvRequestResult<TDialogCreateSession>, Handle); + hFunc(TEvRefresh, Handle); + default: + TBase::StateMain(ev, ctx); + } + } + + TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor); + + virtual bool Handle(TEvRequestResult<TDialogSelect>::TPtr& ev); + void Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev); + void Handle(TEvRefresh::TPtr& ev); +}; + +} diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.cpp b/ydb/services/metadata/ds_table/accessor_subscribe.cpp new file mode 100644 index 0000000000..707432c65a --- /dev/null +++ b/ydb/services/metadata/ds_table/accessor_subscribe.cpp @@ -0,0 +1,39 @@ +#include "accessor_subscribe.h" +#include <library/cpp/actors/core/log.h> + +namespace NKikimr::NMetadataProvider { + +bool TDSAccessorNotifier::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) { + if (TBase::Handle(ev)) { + auto snapshot = GetCurrentSnapshot(); + if (!snapshot) { + ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot construct snapshot"; + return false; + } + + for (auto&& i : Subscribed) { + Send(i, new TEvRefreshSubscriberData(snapshot)); + } + return true; + } else { + return false; + } +} + +void TDSAccessorNotifier::Handle(TEvSubscribe::TPtr& ev) { + Subscribed.emplace(ev->Get()->GetSubscriberId()); + if (TBase::IsReady()) { + auto snapshot = GetCurrentSnapshot(); + if (!snapshot) { + ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot construct snapshot"; + return; + } + Send(ev->Get()->GetSubscriberId(), new TEvRefreshSubscriberData(snapshot)); + } +} + +void TDSAccessorNotifier::Handle(TEvUnsubscribe::TPtr& ev) { + Subscribed.erase(ev->Get()->GetSubscriberId()); +} + +} diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.h b/ydb/services/metadata/ds_table/accessor_subscribe.h new file mode 100644 index 0000000000..5ce80b08dc --- /dev/null +++ b/ydb/services/metadata/ds_table/accessor_subscribe.h @@ -0,0 +1,80 @@ +#pragma once +#include "accessor_refresh.h" + +namespace NKikimr::NMetadataProvider { + +class TDSAccessorNotifier; + +class TEvSubscribe: public NActors::TEventLocal<TEvSubscribe, EEvSubscribe::EvSubscribeLocal> { +private: + YDB_READONLY_DEF(TActorId, SubscriberId); +public: + TEvSubscribe(const TActorId& subscriberId) + : SubscriberId(subscriberId) + { + + } +}; + +class TEvUnsubscribe: public NActors::TEventLocal<TEvUnsubscribe, EEvSubscribe::EvUnsubscribeLocal> { +private: + YDB_READONLY_DEF(TActorId, SubscriberId); +public: + TEvUnsubscribe(const TActorId& subscriberId) + : SubscriberId(subscriberId) { + + } +}; + +class TDSAccessorNotifier: public TDSAccessorRefresher { +private: + using TBase = TDSAccessorRefresher; + std::set<NActors::TActorId> Subscribed; +protected: + virtual void RegisterState() override { + Become(&TDSAccessorNotifier::StateMain); + } +public: + using TBase::Handle; + + TDSAccessorNotifier(const TConfig& config, ISnapshotParser::TPtr sParser) + : TBase(config, sParser) { + } + + STFUNC(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvSubscribe, Handle); + hFunc(TEvUnsubscribe, Handle); + default: + TBase::StateMain(ev, ctx); + } + } + + + virtual bool Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) override; + void Handle(TEvSubscribe::TPtr& context); + void Handle(TEvUnsubscribe::TPtr& context); +}; + +class TExternalData: public TDSAccessorNotifier { +private: + using TBase = TDSAccessorNotifier; + Ydb::Table::CreateTableRequest ExternalCreateTableRequest; + TString TableName; +protected: + virtual TString GetTableName() const override { + return TableName; + } + virtual Ydb::Table::CreateTableRequest GetTableSchema() const override { + return ExternalCreateTableRequest; + } +public: + TExternalData(const TConfig& config, ISnapshotParser::TPtr sParser) + : TBase(config, sParser) + , ExternalCreateTableRequest(sParser->GetTableSchema()) + , TableName(ExternalCreateTableRequest.path()) { + + } +}; + +} diff --git a/ydb/services/metadata/ds_table/config.cpp b/ydb/services/metadata/ds_table/config.cpp new file mode 100644 index 0000000000..b202f01e78 --- /dev/null +++ b/ydb/services/metadata/ds_table/config.cpp @@ -0,0 +1,24 @@ +#include "config.h" +#include <util/generic/ylimits.h> + +namespace NKikimr::NMetadataProvider { + +bool TConfig::DeserializeFromProto(const NKikimrConfig::TMetadataProviderConfig& config) { + EnabledFlag = config.GetEnabled(); + RefreshPeriod = TDuration::Seconds(config.GetRefreshPeriodSeconds()); + RetryPeriodStart = TDuration::Seconds(config.GetRetryPeriodStartSeconds()); + RetryPeriodFinish = TDuration::Seconds(config.GetRetryPeriodFinishSeconds()); + if (RetryPeriodStart > RetryPeriodFinish) { + Cerr << "incorrect metadata provider config start/finish periods"; + std::swap(RetryPeriodStart, RetryPeriodFinish); + return false; + } + return true; +} + +TDuration TConfig::GetRetryPeriod(const ui32 retry) const { + const double kff = 1.0 / Max<double>(1.0, retry); + return RetryPeriodStart * kff + RetryPeriodFinish * (1 - kff); +} + +} diff --git a/ydb/services/metadata/ds_table/config.h b/ydb/services/metadata/ds_table/config.h new file mode 100644 index 0000000000..47ef42fed9 --- /dev/null +++ b/ydb/services/metadata/ds_table/config.h @@ -0,0 +1,20 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/protos/config.pb.h> +#include <util/datetime/base.h> + +namespace NKikimr::NMetadataProvider { + +class TConfig { +private: + YDB_READONLY(TDuration, RefreshPeriod, TDuration::Seconds(10)); + TDuration RetryPeriodStart = TDuration::Seconds(3); + TDuration RetryPeriodFinish = TDuration::Seconds(30); + YDB_READONLY_FLAG(Enabled, true); +public: + TConfig() = default; + + TDuration GetRetryPeriod(const ui32 retry) const; + bool DeserializeFromProto(const NKikimrConfig::TMetadataProviderConfig& config); +}; +} diff --git a/ydb/services/metadata/ds_table/request_actor.cpp b/ydb/services/metadata/ds_table/request_actor.cpp new file mode 100644 index 0000000000..f9b541e80c --- /dev/null +++ b/ydb/services/metadata/ds_table/request_actor.cpp @@ -0,0 +1,5 @@ +#include "request_actor.h" + +namespace NKikimr::NMetadataProvider { + +} diff --git a/ydb/services/metadata/ds_table/request_actor.h b/ydb/services/metadata/ds_table/request_actor.h new file mode 100644 index 0000000000..5a2f44be52 --- /dev/null +++ b/ydb/services/metadata/ds_table/request_actor.h @@ -0,0 +1,138 @@ +#pragma once +#include "config.h" + +#include <library/cpp/actors/core/actor_virtual.h> +#include <library/cpp/actors/core/av_bootstrapped.h> +#include <library/cpp/actors/core/log.h> +#include <ydb/core/base/appdata.h> +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/services/metadata/abstract/common.h> + +namespace NKikimr::NMetadataProvider { + +template <class TRequestExt, class TResponseExt, ui32 EvStartExt, ui32 EvResultInternalExt, ui32 EvResultExt> +class TDialogPolicyImpl { +public: + using TRequest = TRequestExt; + using TResponse = TResponseExt; + static constexpr ui32 EvStart = EvStartExt; + static constexpr ui32 EvResultInternal = EvResultInternalExt; + static constexpr ui32 EvResult = EvResultExt; +}; + +using TDialogCreateTable = TDialogPolicyImpl<Ydb::Table::CreateTableRequest, Ydb::Table::CreateTableResponse, + EEvSubscribe::EvCreateTableRequest, EEvSubscribe::EvCreateTableInternalResponse, EEvSubscribe::EvCreateTableResponse>; +using TDialogSelect = TDialogPolicyImpl<Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse, + EEvSubscribe::EvSelectRequest, EEvSubscribe::EvSelectInternalResponse, EEvSubscribe::EvSelectResponse>; +using TDialogCreateSession = TDialogPolicyImpl<Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse, + EEvSubscribe::EvCreateSessionRequest, EEvSubscribe::EvCreateSessionInternalResponse, EEvSubscribe::EvCreateSessionResponse>; + +template <class TDialogPolicy> +class TEvRequestResult: public NActors::TEventLocal<TEvRequestResult<TDialogPolicy>, TDialogPolicy::EvResult> { +private: + YDB_READONLY_DEF(typename TDialogPolicy::TResponse, Result); +public: + TEvRequestResult(typename TDialogPolicy::TResponse&& result) + : Result(std::move(result)) + { + + } +}; + +template <class TResponse> +class TOperatorChecker { +public: + static bool IsSuccess(const TResponse& r) { + return r.operation().status() == Ydb::StatusIds::SUCCESS; + } +}; + +template <> +class TOperatorChecker<Ydb::Table::CreateTableResponse> { +public: + static bool IsSuccess(const Ydb::Table::CreateTableResponse& r) { + return r.operation().status() == Ydb::StatusIds::SUCCESS || + r.operation().status() != Ydb::StatusIds::ALREADY_EXISTS; + } +}; + +template <class TDialogPolicy> +class TYDBRequest: public NActors::TActorBootstrapped<TYDBRequest<TDialogPolicy>> { +private: + using TBase = NActors::TActorBootstrapped<TYDBRequest<TDialogPolicy>>; + using TRequest = typename TDialogPolicy::TRequest; + using TResponse = typename TDialogPolicy::TResponse; + using TSelf = TYDBRequest<TDialogPolicy>; + TRequest ProtoRequest; + const NActors::TActorId ActorFinishId; + const TConfig& Config; + ui32 Retry = 0; +protected: + class TEvRequestInternalResult: public NActors::TEventLocal<TEvRequestInternalResult, TDialogPolicy::EvResultInternal> { + private: + YDB_READONLY_DEF(NThreading::TFuture<typename TDialogPolicy::TResponse>, Future); + public: + TEvRequestInternalResult(const NThreading::TFuture<typename TDialogPolicy::TResponse>& f) + : Future(f) { + + } + }; + class TEvRequestStart: public NActors::TEventLocal<TEvRequestStart, TDialogPolicy::EvStart> { + public: + }; + +public: + void Bootstrap(const TActorContext& /*ctx*/) { + TBase::Become(&TBase::TThis::StateMain); + TBase::template Sender<TEvRequestStart>().SendTo(TBase::SelfId()); + } + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvRequestInternalResult, Handle); + hFunc(TEvRequestStart, Handle); + default: + break; + } + } + void Handle(typename TEvRequestInternalResult::TPtr& ev) { + if (!ev->Get()->GetFuture().HasValue() || ev->Get()->GetFuture().HasException()) { + ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot receive result on initialization"; + TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart); + return; + } + auto f = ev->Get()->GetFuture(); + TResponse response = f.ExtractValue(); + if (!TOperatorChecker<TResponse>::IsSuccess(response)) { + ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "incorrect reply: " << response.DebugString(); + TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart); + return; + } + TBase::template Sender<TEvRequestResult<TDialogPolicy>>(std::move(response)).SendTo(ActorFinishId); + TBase::Die(TActivationContext::AsActorContext()); + } + + void Handle(typename TEvRequestStart::TPtr& /*ev*/) { + auto aSystem = TActivationContext::ActorSystem(); + using TRpcRequest = NGRpcService::TGrpcRequestOperationCall<TRequest, TResponse>; + auto request = ProtoRequest; + auto result = NRpcService::DoLocalRpc<TRpcRequest>(std::move(request), AppData()->TenantName, "", aSystem); + const NActors::TActorId selfId = TBase::SelfId(); + const auto replyCallback = [aSystem, selfId](const NThreading::TFuture<TResponse>& f) { + aSystem->Send(selfId, new TEvRequestInternalResult(f)); + }; + result.Subscribe(replyCallback); + } + + TYDBRequest(const TRequest& request, const NActors::TActorId actorFinishId, const TConfig& config) + : ProtoRequest(request) + , ActorFinishId(actorFinishId) + , Config(config) + { + + } +}; + +} diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp new file mode 100644 index 0000000000..c083afe97e --- /dev/null +++ b/ydb/services/metadata/ds_table/service.cpp @@ -0,0 +1,33 @@ +#include "service.h" + +#include "accessor_subscribe.h" + +#include <ydb/core/base/appdata.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> +#include <ydb/core/grpc_services/grpc_request_proxy.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/services/metadata/service.h> + +namespace NKikimr::NMetadataProvider { + +IActor* CreateService(const TConfig& config) { + return new TService(config); +} + +void TService::Handle(TEvSubscribeExternal::TPtr& ev) { + auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetTablePath()); + if (it == Accessors.end()) { + THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetSnapshotParser()); + it = Accessors.emplace(ev->Get()->GetSnapshotParser()->GetTablePath(), Register(actor.Release())).first; + } + Send<TEvSubscribe>(it->second, ev->Sender); +} + +void TService::Handle(TEvUnsubscribeExternal::TPtr& ev) { + auto it = Accessors.find(ev->Get()->GetSnapshotParser()->GetTablePath()); + if (it != Accessors.end()) { + Send<TEvUnsubscribe>(it->second, ev->Sender); + } +} + +} diff --git a/ydb/services/metadata/ds_table/service.h b/ydb/services/metadata/ds_table/service.h new file mode 100644 index 0000000000..e55d90ac91 --- /dev/null +++ b/ydb/services/metadata/ds_table/service.h @@ -0,0 +1,39 @@ +#pragma once +#include "config.h" + +#include <ydb/services/metadata/service.h> +#include <library/cpp/actors/core/hfunc.h> + +namespace NKikimr::NMetadataProvider { + +class TService: public NActors::TActorBootstrapped<TService> { +private: + using TBase = NActors::TActor<TService>; + std::map<TString, NActors::TActorId> Accessors; + const TConfig Config; +public: + void Handle(TEvSubscribeExternal::TPtr& ev); + void Handle(TEvUnsubscribeExternal::TPtr& ev); + + void Bootstrap(const NActors::TActorContext& /*ctx*/) { + Become(&TThis::StateMain); + } + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvSubscribeExternal, Handle); + hFunc(TEvUnsubscribeExternal, Handle); + default: + Y_VERIFY(false); + } + } + + TService(const TConfig& config) + : Config(config) + { + } +}; + +NActors::IActor* CreateService(const TConfig& config); + +} diff --git a/ydb/services/metadata/service.cpp b/ydb/services/metadata/service.cpp new file mode 100644 index 0000000000..736d41bfc5 --- /dev/null +++ b/ydb/services/metadata/service.cpp @@ -0,0 +1,9 @@ +#include "service.h" + +namespace NKikimr::NMetadataProvider { + +NActors::TActorId MakeServiceId(ui32 node) { + return NActors::TActorId(node, "SrvcMetaData"); +} + +} diff --git a/ydb/services/metadata/service.h b/ydb/services/metadata/service.h new file mode 100644 index 0000000000..72800a06d2 --- /dev/null +++ b/ydb/services/metadata/service.h @@ -0,0 +1,29 @@ +#pragma once +#include <ydb/services/metadata/abstract/common.h> +#include <library/cpp/actors/core/event_local.h> + +namespace NKikimr::NMetadataProvider { + +class TEvSubscribeExternal: public NActors::TEventLocal<TEvSubscribeExternal, EEvSubscribe::EvSubscribeExternal> { +private: + YDB_READONLY_DEF(ISnapshotParser::TPtr, SnapshotParser); +public: + TEvSubscribeExternal(ISnapshotParser::TPtr parser) + : SnapshotParser(parser) { + Y_VERIFY(!!SnapshotParser); + } +}; + +class TEvUnsubscribeExternal: public NActors::TEventLocal<TEvUnsubscribeExternal, EEvSubscribe::EvUnsubscribeExternal> { +private: + YDB_READONLY_DEF(ISnapshotParser::TPtr, SnapshotParser); +public: + TEvUnsubscribeExternal(ISnapshotParser::TPtr parser) + : SnapshotParser(parser) { + Y_VERIFY(!!SnapshotParser); + } +}; + +NActors::TActorId MakeServiceId(ui32 node = 0); + +} |