diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-10-24 18:22:51 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-10-24 18:22:51 +0300 |
commit | 0007642ecd1f43032cae2eefb5ede97d786bccbf (patch) | |
tree | 5a4220a39f33b416b0df8ff853c8adbae285a84a | |
parent | 7741bf7772cfd0c0421d37aaa53b1b200841b056 (diff) | |
download | ydb-0007642ecd1f43032cae2eefb5ede97d786bccbf.tar.gz |
multiple operations for initialization
prepare for internal requests reusage
move tests to appropriate file
31 files changed, 631 insertions, 396 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h index f30912e0748..e0d00c47671 100644 --- a/ydb/core/base/events.h +++ b/ydb/core/base/events.h @@ -149,6 +149,7 @@ struct TKikimrEvents : TEvents { ES_BLOB_DEPOT, ES_DATASHARD_LOAD, ES_METADATA_PROVIDER, + ES_INTERNAL_REQUEST, }; }; diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 9dbbd9d748b..c04909f14ab 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -574,11 +574,15 @@ message TKQPConfig { repeated NKikimrKqp.TKqpSetting Settings = 10; } +message TInternalRequestConfig { + optional uint32 RetryPeriodStartSeconds = 1 [default = 3]; + optional uint32 RetryPeriodFinishSeconds = 2 [default = 30]; +} + message TMetadataProviderConfig { optional bool Enabled = 1 [default = true]; optional uint32 RefreshPeriodSeconds = 2 [default = 10]; - optional uint32 RetryPeriodStartSeconds = 3 [default = 3]; - optional uint32 RetryPeriodFinishSeconds = 4 [default = 30]; + optional TInternalRequestConfig RequestConfig = 3; } message TMemoryLogConfig { diff --git a/ydb/core/tx/columnshard/external_data.cpp b/ydb/core/tx/columnshard/external_data.cpp index e75ec77db85..5a95613ca81 100644 --- a/ydb/core/tx/columnshard/external_data.cpp +++ b/ydb/core/tx/columnshard/external_data.cpp @@ -1,63 +1,91 @@ #include "external_data.h" +#include <library/cpp/json/writer/json_value.h> +#include <library/cpp/protobuf/json/proto2json.h> -namespace NKikimr::NColumnShard { +namespace NKikimr::NColumnShard::NTiers { -std::optional<TString> TCSKVSnapshot::GetValue(const TString& key) const { - auto it = Data.find(key); - if (it == Data.end()) { - return {}; - } else { - return it->second; - } -} - -bool TCSKVSnapshot::DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) { - if (rawData.columns().size() != 2) { - Cerr << "incorrect proto columns info" << Endl; - return false; - } - i32 keyIdx = -1; - i32 valueIdx = -1; - ui32 idx = 0; - for (auto&& i : rawData.columns()) { - if (i.name() == "key") { - keyIdx = idx; - } else if (i.name() == "value") { - valueIdx = idx; - } - ++idx; - } - if (keyIdx < 0 || valueIdx < 0) { - Cerr << "incorrect table columns"; +bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) { + const i32 ownerPathIdx = GetFieldIndex(rawData, "ownerPath"); + const i32 tierNameIdx = GetFieldIndex(rawData, "tierName"); + const i32 tierConfigIdx = GetFieldIndex(rawData, "tierConfig"); + if (tierNameIdx < 0 || tierConfigIdx < 0 || ownerPathIdx < 0) { + ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "incorrect tiers config table structure"; return false; } for (auto&& r : rawData.rows()) { - TString key(r.items()[keyIdx].bytes_value()); - TString value(r.items()[valueIdx].bytes_value()); - if (!Data.emplace(key, value).second) { - Cerr << "keys duplication: " << key; + TConfig config(r.items()[ownerPathIdx].bytes_value(), r.items()[tierNameIdx].bytes_value()); + if (!config.DeserializeFromString(r.items()[tierConfigIdx].bytes_value())) { + ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "cannot parse tier config from snapshot"; + return false; + } + if (!Data.emplace(config.GetConfigId(), config).second) { + ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "tier names duplication: " << config.GetTierName(); return false; } } return true; } -Ydb::Table::CreateTableRequest TSnapshotConstructor::DoGetTableSchema() const { +std::optional<TConfig> TConfigsSnapshot::GetValue(const TString& key) const { + auto it = Data.find(key); + if (it == Data.end()) { + return {}; + } else { + return it->second; + } +} + +TVector<NMetadataProvider::ITableModifier::TPtr> TSnapshotConstructor::DoGetTableSchema() const { Ydb::Table::CreateTableRequest request; request.set_session_id(""); request.set_path(TablePath); - request.add_primary_key("key"); + request.add_primary_key("ownerPath"); + request.add_primary_key("tierName"); + { + auto& column = *request.add_columns(); + column.set_name("tierName"); + column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); + } { auto& column = *request.add_columns(); - column.set_name("key"); + column.set_name("ownerPath"); column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); } { auto& column = *request.add_columns(); - column.set_name("value"); + column.set_name("tierConfig"); column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING); } - return request; + NMetadataProvider::ITableModifier::TPtr result( + new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request)); + return { result }; +} + +TString NTiers::TConfigsSnapshot::DoSerializeToString() const { + NJson::TJsonValue result = NJson::JSON_MAP; + for (auto&& i : Data) { + result.InsertValue(i.first, i.second.SerializeToJson()); + } + return result.GetStringRobust(); +} + +bool NTiers::TConfig::DeserializeFromString(const TString& configProtoStr) { + if (!::google::protobuf::TextFormat::ParseFromString(configProtoStr, &ProtoConfig)) { + ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "cannot parse proto string: " << configProtoStr; + return false; + } + return true; +} + +NJson::TJsonValue TConfig::SerializeToJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + result.InsertValue("tierName", TierName); + NProtobufJson::Proto2Json(ProtoConfig, result.InsertValue("tierConfig", NJson::JSON_MAP)); + return result; +} + +bool TConfig::IsSame(const TConfig& item) const { + return TierName == item.TierName && ProtoConfig.SerializeAsString() == item.ProtoConfig.SerializeAsString(); } } diff --git a/ydb/core/tx/columnshard/external_data.h b/ydb/core/tx/columnshard/external_data.h index 500e7d94d77..e070fa5ba21 100644 --- a/ydb/core/tx/columnshard/external_data.h +++ b/ydb/core/tx/columnshard/external_data.h @@ -1,33 +1,57 @@ #pragma once #include <ydb/services/metadata/service.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> -namespace NKikimr::NColumnShard { +#include <library/cpp/json/writer/json_value.h> -class TCSKVSnapshot: public NMetadataProvider::ISnapshot { +namespace NKikimr::NColumnShard::NTiers { + +class TConfig { +private: + using TTierProto = NKikimrSchemeOp::TStorageTierConfig; + using TS3SettingsProto = NKikimrSchemeOp::TS3Settings; + YDB_READONLY_DEF(TString, OwnerPath); + YDB_READONLY_DEF(TString, TierName); + YDB_READONLY_DEF(TTierProto, ProtoConfig); +public: + TConfig() = default; + TConfig(const TString& ownerPath, const TString& tierName) + : OwnerPath(ownerPath) + , TierName(tierName) + { + + } + + TString GetConfigId() const { + return OwnerPath + "." + TierName; + } + + bool NeedExport() const { + return ProtoConfig.HasObjectStorage(); + } + bool IsSame(const TConfig& item) const; + bool DeserializeFromString(const TString& configProtoStr); + NJson::TJsonValue SerializeToJson() const; +}; + +class TConfigsSnapshot: public NMetadataProvider::ISnapshot { private: using TBase = NMetadataProvider::ISnapshot; - using TKVData = TMap<TString, TString>; - YDB_READONLY_DEF(TKVData, Data); + using TConfigsMap = TMap<TString, TConfig>; + YDB_READONLY_DEF(TConfigsMap, Data); protected: virtual bool DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) override; - virtual TString DoSerializeToString() const override { - TStringBuilder sb; - for (auto&& i : Data) { - sb << i.first << "=" << i.second << ";"; - } - return sb; - } + virtual TString DoSerializeToString() const override; public: - std::optional<TString> GetValue(const TString& key) const; - + std::optional<TConfig> GetValue(const TString& key) const; using TBase::TBase; }; -class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TCSKVSnapshot> { +class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TConfigsSnapshot> { private: const TString TablePath; protected: - virtual Ydb::Table::CreateTableRequest DoGetTableSchema() const override; + virtual TVector<NMetadataProvider::ITableModifier::TPtr> DoGetTableSchema() const override; virtual const TString& DoGetTablePath() const override { return TablePath; } diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt b/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt index ac703050476..14d9ad8173b 100644 --- a/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt +++ b/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt @@ -40,6 +40,7 @@ target_sources(ydb-core-tx-columnshard-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_schema.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp ) add_test( NAME diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt b/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt index 4de436d1222..1b40ae04947 100644 --- a/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt +++ b/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt @@ -44,6 +44,7 @@ target_sources(ydb-core-tx-columnshard-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_schema.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp ) add_test( NAME diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp index c0aebd18894..b530b362afd 100644 --- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp +++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp @@ -1,10 +1,8 @@ #include "columnshard_ut_common.h" -#include "external_data.h" #include <ydb/core/wrappers/ut_helpers/s3_mock.h> #include <ydb/core/wrappers/s3_wrapper.h> #include <ydb/services/metadata/service.h> #include <ydb/core/cms/console/configs_dispatcher.h> -//#include <ydb/core/testlib/cs_helper.h> #include <ydb/core/tx/tx_proxy/proxy.h> #include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> @@ -702,238 +700,9 @@ void TestDrop(bool reboots) { namespace NColumnShard { extern bool gAllowLogBatchingDefaultValue; } -/* -class TLocalHelper: public Tests::NCS::THelper { -private: - using TBase = Tests::NCS::THelper; -public: - using TBase::TBase; - void CreateTestOlapTable(TString tableName = "olapTable", TString storeName = "olapStore", - ui32 storeShardsCount = 4, ui32 tableShardsCount = 3, - TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") { - TActorId sender = Server.GetRuntime()->AllocateEdgeActor(); - CreateTestOlapStore(sender, Sprintf(R"( - Name: "%s" - ColumnShardCount: %d - SchemaPresets { - Name: "default" - Schema { - Columns { Name: "timestamp" Type: "Timestamp" } - #Columns { Name: "resource_type" Type: "Utf8" } - Columns { Name: "resource_id" Type: "Utf8" } - Columns { Name: "uid" Type: "Utf8" } - Columns { Name: "level" Type: "Int32" } - Columns { Name: "message" Type: "Utf8" } - #Columns { Name: "json_payload" Type: "Json" } - #Columns { Name: "ingested_at" Type: "Timestamp" } - #Columns { Name: "saved_at" Type: "Timestamp" } - #Columns { Name: "request_id" Type: "Utf8" } - KeyColumnNames: "timestamp" - Engine: COLUMN_ENGINE_REPLACING_TIMESERIES - StorageTiers { - Name: "tier1" - ObjectStorage { - Endpoint: "fake" - AccessKey: "$a" - } - } - } - } - )", storeName.c_str(), storeShardsCount)); - - TString shardingColumns = "[\"timestamp\", \"uid\"]"; - if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") { - shardingColumns = "[\"uid\"]"; - } - - TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"( - Name: "%s" - ColumnShardCount: %d - Sharding { - HashSharding { - Function: %s - Columns: %s - } - })", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str())); - } -}; -*/ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) { - class TTestCSEmulator: public NActors::TActorBootstrapped<TTestCSEmulator> { - private: - using TBase = NActors::TActorBootstrapped<TTestCSEmulator>; - std::shared_ptr<TSnapshotConstructor> ExternalDataManipulation; - TActorId ProviderId; - TInstant Start; - YDB_READONLY_FLAG(Found, false); - public: - STFUNC(StateInit) { - switch (ev->GetTypeRewrite()) { - HFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle); - default: - Y_VERIFY(false); - } - } - void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev, const TActorContext&) { - auto value = ev->Get()->GetSnapshotAs<TCSKVSnapshot>()->GetValue("/Root/olapStore.tier1.a"); - if (value && *value == "b") { - FoundFlag = true; - } else { - Cerr << ev->Get()->GetSnapshot()->SerializeToString() << Endl; - } - } - - void Bootstrap() { - ProviderId = NMetadataProvider::MakeServiceId(1); - ExternalDataManipulation = std::make_shared<TSnapshotConstructor>("/Root/.external_data/kv_settings"); - Become(&TThis::StateInit); - Sender<NMetadataProvider::TEvSubscribeExternal>(ExternalDataManipulation).SendTo(ProviderId); - Start = Now(); - } - }; - - Y_UNIT_TEST(DSConfigsStub) { - TPortManager pm; - - ui32 grpcPort = pm.GetPort(); - ui32 msgbPort = pm.GetPort(); - - Tests::TServerSettings serverSettings(msgbPort); - serverSettings.Port = msgbPort; - serverSettings.GrpcPort = grpcPort; - serverSettings.SetDomainName("Root") - .SetUseRealThreads(false) - .SetEnableMetadataProvider(true) - .SetEnableKqpSessionActor(true) - .SetEnableOlapSchemaOperations(true); - ; - - Tests::TServer::TPtr server = new Tests::TServer(serverSettings); - server->EnableGRpc(grpcPort); - // server->SetupDefaultProfiles(); - - Tests::TClient client(serverSettings); - - auto& runtime = *server->GetRuntime(); - - auto sender = runtime.AllocateEdgeActor(); - server->SetupRootStoragePools(sender); - { - TTestCSEmulator* emulator = new TTestCSEmulator; - runtime.Register(emulator); - { - const TInstant start = Now(); - while (Now() - start < TDuration::Seconds(10)) { - runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { - Cerr << "Step " << event->Type << Endl; - return false; - }, {}, TDuration::Seconds(1)); - Sleep(TDuration::Seconds(1)); - Cerr << "Step finished" << Endl; - } - } - - NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); - tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { - auto sResult = f.GetValueSync(); - Cerr << sResult.GetIssues().ToString() << Endl; - auto session = sResult.GetSession(); - session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/kv_settings` (key, value) VALUES ('/Root/olapStore.tier1.a', 'b')" - , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); - }); - - const TInstant start = Now(); - while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { - runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10)); - } - Y_VERIFY(emulator->IsFound()); - } - //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); - //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE); - } - - /* - Y_UNIT_TEST(DSConfigs) { - TPortManager pm; - - ui32 grpcPort = pm.GetPort(); - ui32 msgbPort = pm.GetPort(); - - Tests::TServerSettings serverSettings(msgbPort); - serverSettings.Port = msgbPort; - serverSettings.GrpcPort = grpcPort; - serverSettings.SetDomainName("Root") - .SetUseRealThreads(false) - .SetEnableMetadataProvider(true) - .SetEnableKqpSessionActor(true) - .SetEnableOlapSchemaOperations(true); - ; - - Tests::TServer::TPtr server = new Tests::TServer(serverSettings); - server->EnableGRpc(grpcPort); -// server->SetupDefaultProfiles(); - - Tests::TClient client(serverSettings); - - auto& runtime = *server->GetRuntime(); - - auto sender = runtime.AllocateEdgeActor(); - server->InitRoot(sender); - TLocalHelper lHelper(*server); - lHelper.CreateTestOlapTable(); - - runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); - const TInstant start = Now(); - while (Now() - start < TDuration::Seconds(10)) { - runtime.WaitForEdgeEvents([](TTestActorRuntimeBase& , TAutoPtr<IEventHandle>& event) { - Cerr << "Step " << event->Type << Endl; - return false; - }, {}, TDuration::Seconds(1)); - Sleep(TDuration::Seconds(1)); - Cerr << "Step finished" << Endl; - } - - NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); - tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { - auto sResult = f.GetValueSync(); - Cerr << sResult.GetIssues().ToString() << Endl; - auto session = sResult.GetSession(); - session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/kv_settings` (key, value) VALUES ('/Root/olapStore.tier1.a', 'b')" - , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); - }); - - bool found = false; - bool* foundPtr = &found; - const auto pred = [foundPtr](TTestActorRuntimeBase& , TAutoPtr<IEventHandle>& event)->TTestActorRuntimeBase::EEventAction { - if (event->HasBuffer() && !event->HasEvent()) { - } else if (!event->GetBase()) { - Cerr << "Type nullptr" << Endl; - } else { - Cerr << "Step " << event->GetBase()->Type() << Endl; - auto ptr = dynamic_cast<NMetadataProvider::TEvRefreshSubscriberData*>(event->GetBase()); - if (ptr) { - auto value = ptr->GetSnapshotAs<TCSKVSnapshot>()->GetValue("/Root/olapStore.tier1.a"); - if (value && *value == "b") { - *foundPtr = true; - } else { - Cerr << ptr->GetSnapshot()->SerializeToString() << Endl; - } - } - } - return TTestActorRuntimeBase::EEventAction::PROCESS; - }; - - runtime.SetObserverFunc(pred); - - while (!found) { - runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10)); - } - //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); - //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE); - } -*/ Y_UNIT_TEST(CreateTable) { ui64 tableId = 1; diff --git a/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp b/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp new file mode 100644 index 00000000000..521ed7aa450 --- /dev/null +++ b/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp @@ -0,0 +1,254 @@ +#include "columnshard_ut_common.h" +#include "external_data.h" +#include <ydb/core/wrappers/ut_helpers/s3_mock.h> +#include <ydb/core/wrappers/s3_wrapper.h> +#include <ydb/services/metadata/service.h> +#include <ydb/core/cms/console/configs_dispatcher.h> +#include <ydb/core/testlib/cs_helper.h> +#include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +#include <library/cpp/actors/core/av_bootstrapped.h> + +#include <util/system/hostname.h> + +namespace NKikimr { + +using namespace NColumnShard; + +class TLocalHelper: public Tests::NCS::THelper { +private: + using TBase = Tests::NCS::THelper; +public: + using TBase::TBase; + void CreateTestOlapTable(TString tableName = "olapTable", TString storeName = "olapStore", + ui32 storeShardsCount = 4, ui32 tableShardsCount = 3, + TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") { + TActorId sender = Server.GetRuntime()->AllocateEdgeActor(); + CreateTestOlapStore(sender, Sprintf(R"( + Name: "%s" + ColumnShardCount: %d + SchemaPresets { + Name: "default" + Schema { + Columns { Name: "timestamp" Type: "Timestamp" } + #Columns { Name: "resource_type" Type: "Utf8" } + Columns { Name: "resource_id" Type: "Utf8" } + Columns { Name: "uid" Type: "Utf8" } + Columns { Name: "level" Type: "Int32" } + Columns { Name: "message" Type: "Utf8" } + #Columns { Name: "json_payload" Type: "Json" } + #Columns { Name: "ingested_at" Type: "Timestamp" } + #Columns { Name: "saved_at" Type: "Timestamp" } + #Columns { Name: "request_id" Type: "Utf8" } + KeyColumnNames: "timestamp" + Engine: COLUMN_ENGINE_REPLACING_TIMESERIES + StorageTiers { + Name: "tier1" + ObjectStorage { + Endpoint: "fake" + AccessKey: "$a" + } + } + } + } + )", storeName.c_str(), storeShardsCount)); + + TString shardingColumns = "[\"timestamp\", \"uid\"]"; + if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") { + shardingColumns = "[\"uid\"]"; + } + + TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"( + Name: "%s" + ColumnShardCount: %d + Sharding { + HashSharding { + Function: %s + Columns: %s + } + })", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str())); + } +}; + + +Y_UNIT_TEST_SUITE(ColumnShardTiers) { + + const TString ConfigProtoStr = "Name : \"abc\""; + + class TTestCSEmulator: public NActors::TActorBootstrapped<TTestCSEmulator> { + private: + using TBase = NActors::TActorBootstrapped<TTestCSEmulator>; + std::shared_ptr<NTiers::TSnapshotConstructor> ExternalDataManipulation; + TActorId ProviderId; + TInstant Start; + YDB_READONLY_FLAG(Found, false); + public: + STFUNC(StateInit) { + switch (ev->GetTypeRewrite()) { + HFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle); + default: + Y_VERIFY(false); + } + } + + void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev, const TActorContext&) { + auto value = ev->Get()->GetSnapshotAs<NTiers::TConfigsSnapshot>()->GetValue("/Root/olapStore.tier1"); + if (value && value->GetProtoConfig().GetName() == "abc") { + FoundFlag = true; + } else { + Cerr << ev->Get()->GetSnapshot()->SerializeToString() << Endl; + } + } + + void Bootstrap() { + ProviderId = NMetadataProvider::MakeServiceId(1); + ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>("/Root/.external_data/tiers"); + Become(&TThis::StateInit); + Sender<NMetadataProvider::TEvSubscribeExternal>(ExternalDataManipulation).SendTo(ProviderId); + Start = Now(); + } + }; + + Y_UNIT_TEST(DSConfigsStub) { + TPortManager pm; + + ui32 grpcPort = pm.GetPort(); + ui32 msgbPort = pm.GetPort(); + + Tests::TServerSettings serverSettings(msgbPort); + serverSettings.Port = msgbPort; + serverSettings.GrpcPort = grpcPort; + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMetadataProvider(true) + .SetEnableKqpSessionActor(true) + .SetEnableOlapSchemaOperations(true); + ; + + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + server->EnableGRpc(grpcPort); + // server->SetupDefaultProfiles(); + + Tests::TClient client(serverSettings); + + auto& runtime = *server->GetRuntime(); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + { + TTestCSEmulator* emulator = new TTestCSEmulator; + runtime.Register(emulator); + { + const TInstant start = Now(); + while (Now() - start < TDuration::Seconds(10)) { + runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { + Cerr << "Step " << event->Type << Endl; + return false; + }, {}, TDuration::Seconds(1)); + Sleep(TDuration::Seconds(1)); + Cerr << "Step finished" << Endl; + } + } + + NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto sResult = f.GetValueSync(); + Cerr << sResult.GetIssues().ToString() << Endl; + auto session = sResult.GetSession(); + session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')" + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + }); + + const TInstant start = Now(); + while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { + runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10)); + } + Y_VERIFY(emulator->IsFound()); + } + //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); + //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE); + } +/* + Y_UNIT_TEST(DSConfigs) { + TPortManager pm; + + ui32 grpcPort = pm.GetPort(); + ui32 msgbPort = pm.GetPort(); + + Tests::TServerSettings serverSettings(msgbPort); + serverSettings.Port = msgbPort; + serverSettings.GrpcPort = grpcPort; + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false) + .SetEnableMetadataProvider(true) + .SetEnableKqpSessionActor(true) + .SetEnableOlapSchemaOperations(true); + ; + + Tests::TServer::TPtr server = new Tests::TServer(serverSettings); + server->EnableGRpc(grpcPort); + // server->SetupDefaultProfiles(); + + Tests::TClient client(serverSettings); + + auto& runtime = *server->GetRuntime(); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + TLocalHelper lHelper(*server); + lHelper.CreateTestOlapTable(); + + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); + const TInstant start = Now(); + while (Now() - start < TDuration::Seconds(10)) { + runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) { + Cerr << "Step " << event->Type << Endl; + return false; + }, {}, TDuration::Seconds(1)); + Sleep(TDuration::Seconds(1)); + Cerr << "Step finished" << Endl; + } + + NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + auto sResult = f.GetValueSync(); + Cerr << sResult.GetIssues().ToString() << Endl; + auto session = sResult.GetSession(); + session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')" + , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()); + }); + + bool found = false; + bool* foundPtr = &found; + const auto pred = [foundPtr](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event)->TTestActorRuntimeBase::EEventAction { + if (event->HasBuffer() && !event->HasEvent()) { + } else if (!event->GetBase()) { + Cerr << "Type nullptr" << Endl; + } else { + Cerr << "Step " << event->GetBase()->Type() << Endl; + auto ptr = dynamic_cast<NMetadataProvider::TEvRefreshSubscriberData*>(event->GetBase()); + if (ptr) { + auto value = ptr->GetSnapshotAs<NTiers::TConfigsSnapshot>()->GetValue("/Root/olapStore.tier1"); + if (value && value->GetProtoConfig().GetName() == "abc") { + *foundPtr = true; + } else { + Cerr << ptr->GetSnapshot()->SerializeToString() << Endl; + } + } + } + return TTestActorRuntimeBase::EEventAction::PROCESS; + }; + + runtime.SetObserverFunc(pred); + + while (!found) { + runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10)); + } + //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE); + //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE); + } +*/ +} +} diff --git a/ydb/services/metadata/CMakeLists.txt b/ydb/services/metadata/CMakeLists.txt index 13463c41c23..55a81045b7a 100644 --- a/ydb/services/metadata/CMakeLists.txt +++ b/ydb/services/metadata/CMakeLists.txt @@ -8,6 +8,8 @@ add_subdirectory(abstract) add_subdirectory(ds_table) +add_subdirectory(initializer) +add_subdirectory(request) add_library(ydb-services-metadata) target_link_libraries(ydb-services-metadata PUBLIC diff --git a/ydb/services/metadata/abstract/common.cpp b/ydb/services/metadata/abstract/common.cpp index fa5484fdbea..eddddb14b11 100644 --- a/ydb/services/metadata/abstract/common.cpp +++ b/ydb/services/metadata/abstract/common.cpp @@ -2,4 +2,15 @@ namespace NKikimr::NMetadataProvider { +i32 ISnapshot::GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId) const { + i32 idx = 0; + for (auto&& i : rawData.columns()) { + if (i.name() == columnId) { + return idx; + } + ++idx; + } + return -1; +} + } diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h index dd235bacdca..782d9695fbb 100644 --- a/ydb/services/metadata/abstract/common.h +++ b/ydb/services/metadata/abstract/common.h @@ -5,23 +5,15 @@ #include <library/cpp/actors/core/actor_virtual.h> #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/core/base/events.h> #include <ydb/library/accessor/accessor.h> #include <ydb/public/api/protos/ydb_table.pb.h> -#include <ydb/core/base/events.h> +#include <ydb/services/metadata/initializer/common.h> namespace NKikimr::NMetadataProvider { enum EEvSubscribe { EvRefreshSubscriberData = EventSpaceBegin(TKikimrEvents::ES_METADATA_PROVIDER), - EvCreateTableRequest, - EvCreateTableInternalResponse, - EvCreateTableResponse, - EvSelectRequest, - EvSelectInternalResponse, - EvSelectResponse, - EvCreateSessionRequest, - EvCreateSessionInternalResponse, - EvCreateSessionResponse, EvRefresh, EvSubscribeLocal, EvUnsubscribeLocal, @@ -30,7 +22,7 @@ enum EEvSubscribe { EvEnd }; -static_assert(EEvSubscribe::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_TABLET_PIPE)"); +static_assert(EEvSubscribe::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER)"); class ISnapshot { private: @@ -38,7 +30,7 @@ private: protected: virtual bool DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) = 0; virtual TString DoSerializeToString() const = 0; - + i32 GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId) const; public: using TPtr = std::shared_ptr<ISnapshot>; ISnapshot(const TInstant actuality) @@ -60,7 +52,7 @@ public: class ISnapshotParser { protected: virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const = 0; - virtual Ydb::Table::CreateTableRequest DoGetTableSchema() const = 0; + virtual TVector<ITableModifier::TPtr> DoGetTableSchema() const = 0; virtual const TString& DoGetTablePath() const = 0; public: using TPtr = std::shared_ptr<ISnapshotParser>; @@ -74,7 +66,7 @@ public: return result; } - Ydb::Table::CreateTableRequest GetTableSchema() const { + TVector<ITableModifier::TPtr> GetTableSchema() const { return DoGetTableSchema(); } diff --git a/ydb/services/metadata/ds_table/CMakeLists.txt b/ydb/services/metadata/ds_table/CMakeLists.txt index 696e31532c4..137a58c0d35 100644 --- a/ydb/services/metadata/ds_table/CMakeLists.txt +++ b/ydb/services/metadata/ds_table/CMakeLists.txt @@ -16,12 +16,11 @@ target_link_libraries(services-metadata-ds_table PUBLIC core-grpc_services-local_rpc core-grpc_services-base ydb-core-grpc_services + services-metadata-initializer ) target_sources(services-metadata-ds_table PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_init.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_refresh.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/request_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp ) diff --git a/ydb/services/metadata/ds_table/accessor_init.cpp b/ydb/services/metadata/ds_table/accessor_init.cpp deleted file mode 100644 index e4100858173..00000000000 --- a/ydb/services/metadata/ds_table/accessor_init.cpp +++ /dev/null @@ -1,16 +0,0 @@ -#include "accessor_init.h" -#include <ydb/core/grpc_services/local_rpc/local_rpc.h> - -namespace NKikimr::NMetadataProvider { - -bool TDSAccessorInitialized::Handle(TEvRequestResult<TDialogCreateTable>::TPtr& /*ev*/) { - InitializedFlag = true; - return true; -} - -void TDSAccessorInitialized::Bootstrap(const NActors::TActorContext& /*ctx*/) { - RegisterState(); - Register(new TYDBRequest<TDialogCreateTable>(GetTableSchema(), SelfId(), Config)); -} - -} diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp index 56828775e88..339b5916ee7 100644 --- a/ydb/services/metadata/ds_table/accessor_refresh.cpp +++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp @@ -7,6 +7,7 @@ #include <util/string/escape.h> namespace NKikimr::NMetadataProvider { +using namespace NInternal::NRequest; bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) { const TString startString = CurrentSelection.SerializeAsString(); @@ -27,6 +28,7 @@ bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) { Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != startString) { ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << CurrentSelection.DebugString(); + OnSnapshotModified(); return true; } return false; @@ -39,28 +41,23 @@ void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateSession>::TPtr& const TString sessionId = session.session_id(); Y_VERIFY(sessionId); Ydb::Table::ExecuteDataQueryRequest request; - request.mutable_query()->set_yql_text("SELECT * FROM `" + EscapeC(GetTableName()) + "`"); + request.mutable_query()->set_yql_text("SELECT * FROM `" + EscapeC(SnapshotConstructor->GetTablePath()) + "`"); request.set_session_id(sessionId); request.mutable_tx_control()->mutable_begin_tx()->mutable_snapshot_read_only(); - Register(new TYDBRequest<TDialogSelect>(std::move(request), SelfId(), Config)); + Register(new TYDBRequest<TDialogSelect>(std::move(request), SelfId(), Config.GetRequestConfig())); } void TDSAccessorRefresher::Handle(TEvRefresh::TPtr& /*ev*/) { - Y_VERIFY(IsInitialized()); - Register(new TYDBRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), SelfId(), Config)); + Register(new TYDBRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), SelfId(), Config.GetRequestConfig())); } -bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateTable>::TPtr& ev) { - if (!TBase::Handle(ev)) { - return false; - } +void TDSAccessorRefresher::OnInitialized() { Sender<TEvRefresh>().SendTo(SelfId()); - return true; } TDSAccessorRefresher::TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor) - : TBase(config) + : TBase(config, snapshotConstructor->GetTableSchema()) , SnapshotConstructor(snapshotConstructor) { } diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h index c260f396b18..ceb21233648 100644 --- a/ydb/services/metadata/ds_table/accessor_refresh.h +++ b/ydb/services/metadata/ds_table/accessor_refresh.h @@ -1,7 +1,7 @@ #pragma once -#include "accessor_init.h" #include <ydb/public/api/protos/ydb_value.pb.h> #include <ydb/services/metadata/abstract/common.h> +#include <ydb/services/metadata/initializer/accessor_init.h> namespace NKikimr::NMetadataProvider { @@ -19,19 +19,18 @@ private: YDB_READONLY_DEF(Ydb::ResultSet, CurrentSelection); TInstant RequestedActuality = TInstant::Zero(); protected: - virtual TString GetTableName() const = 0; bool IsReady() const { return !!CurrentSnapshot; } + virtual void OnInitialized() override; + virtual void OnSnapshotModified() = 0; public: using TBase::Handle; - virtual bool Handle(TEvRequestResult<TDialogCreateTable>::TPtr& ev) override; STFUNC(StateMain) { switch (ev->GetTypeRewrite()) { - hFunc(TEvRequestResult<TDialogSelect>, Handle); - hFunc(TEvRequestResult<TDialogCreateTable>, Handle); - hFunc(TEvRequestResult<TDialogCreateSession>, Handle); + hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>, Handle); + hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>, Handle); hFunc(TEvRefresh, Handle); default: TBase::StateMain(ev, ctx); @@ -40,8 +39,8 @@ public: TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor); - virtual bool Handle(TEvRequestResult<TDialogSelect>::TPtr& ev); - void Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev); + bool Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>::TPtr& ev); + void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>::TPtr& ev); void Handle(TEvRefresh::TPtr& ev); }; diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.cpp b/ydb/services/metadata/ds_table/accessor_subscribe.cpp index 707432c65a2..93a41f8c481 100644 --- a/ydb/services/metadata/ds_table/accessor_subscribe.cpp +++ b/ydb/services/metadata/ds_table/accessor_subscribe.cpp @@ -3,20 +3,15 @@ namespace NKikimr::NMetadataProvider { -bool TDSAccessorNotifier::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) { - if (TBase::Handle(ev)) { - auto snapshot = GetCurrentSnapshot(); - if (!snapshot) { - ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot construct snapshot"; - return false; - } - - for (auto&& i : Subscribed) { - Send(i, new TEvRefreshSubscriberData(snapshot)); - } - return true; - } else { - return false; +void TDSAccessorNotifier::OnSnapshotModified() { + auto snapshot = GetCurrentSnapshot(); + if (!snapshot) { + ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot construct snapshot"; + return; + } + + for (auto&& i : Subscribed) { + Send(i, new TEvRefreshSubscriberData(snapshot)); } } diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.h b/ydb/services/metadata/ds_table/accessor_subscribe.h index 5ce80b08dc5..e082a714e89 100644 --- a/ydb/services/metadata/ds_table/accessor_subscribe.h +++ b/ydb/services/metadata/ds_table/accessor_subscribe.h @@ -34,6 +34,7 @@ protected: virtual void RegisterState() override { Become(&TDSAccessorNotifier::StateMain); } + virtual void OnSnapshotModified() override; public: using TBase::Handle; @@ -51,7 +52,6 @@ public: } - virtual bool Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) override; void Handle(TEvSubscribe::TPtr& context); void Handle(TEvUnsubscribe::TPtr& context); }; @@ -59,20 +59,9 @@ public: class TExternalData: public TDSAccessorNotifier { private: using TBase = TDSAccessorNotifier; - Ydb::Table::CreateTableRequest ExternalCreateTableRequest; - TString TableName; -protected: - virtual TString GetTableName() const override { - return TableName; - } - virtual Ydb::Table::CreateTableRequest GetTableSchema() const override { - return ExternalCreateTableRequest; - } public: TExternalData(const TConfig& config, ISnapshotParser::TPtr sParser) - : TBase(config, sParser) - , ExternalCreateTableRequest(sParser->GetTableSchema()) - , TableName(ExternalCreateTableRequest.path()) { + : TBase(config, sParser) { } }; diff --git a/ydb/services/metadata/ds_table/config.cpp b/ydb/services/metadata/ds_table/config.cpp index b202f01e78d..45003ceca6a 100644 --- a/ydb/services/metadata/ds_table/config.cpp +++ b/ydb/services/metadata/ds_table/config.cpp @@ -5,20 +5,11 @@ namespace NKikimr::NMetadataProvider { bool TConfig::DeserializeFromProto(const NKikimrConfig::TMetadataProviderConfig& config) { EnabledFlag = config.GetEnabled(); - RefreshPeriod = TDuration::Seconds(config.GetRefreshPeriodSeconds()); - RetryPeriodStart = TDuration::Seconds(config.GetRetryPeriodStartSeconds()); - RetryPeriodFinish = TDuration::Seconds(config.GetRetryPeriodFinishSeconds()); - if (RetryPeriodStart > RetryPeriodFinish) { - Cerr << "incorrect metadata provider config start/finish periods"; - std::swap(RetryPeriodStart, RetryPeriodFinish); + if (!RequestConfig.DeserializeFromProto(config.GetRequestConfig())) { return false; } + RefreshPeriod = TDuration::Seconds(config.GetRefreshPeriodSeconds()); return true; } -TDuration TConfig::GetRetryPeriod(const ui32 retry) const { - const double kff = 1.0 / Max<double>(1.0, retry); - return RetryPeriodStart * kff + RetryPeriodFinish * (1 - kff); -} - } diff --git a/ydb/services/metadata/ds_table/config.h b/ydb/services/metadata/ds_table/config.h index 47ef42fed9d..372f446771c 100644 --- a/ydb/services/metadata/ds_table/config.h +++ b/ydb/services/metadata/ds_table/config.h @@ -1,5 +1,6 @@ #pragma once #include <ydb/library/accessor/accessor.h> +#include <ydb/services/metadata/request/config.h> #include <ydb/core/protos/config.pb.h> #include <util/datetime/base.h> @@ -7,9 +8,8 @@ namespace NKikimr::NMetadataProvider { class TConfig { private: + YDB_READONLY_DEF(NInternal::NRequest::TConfig, RequestConfig); YDB_READONLY(TDuration, RefreshPeriod, TDuration::Seconds(10)); - TDuration RetryPeriodStart = TDuration::Seconds(3); - TDuration RetryPeriodFinish = TDuration::Seconds(30); YDB_READONLY_FLAG(Enabled, true); public: TConfig() = default; diff --git a/ydb/services/metadata/initializer/CMakeLists.txt b/ydb/services/metadata/initializer/CMakeLists.txt new file mode 100644 index 00000000000..55e553e7830 --- /dev/null +++ b/ydb/services/metadata/initializer/CMakeLists.txt @@ -0,0 +1,24 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-metadata-initializer) +target_link_libraries(services-metadata-initializer PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + core-grpc_services-local_rpc + core-grpc_services-base + ydb-core-grpc_services + services-metadata-request +) +target_sources(services-metadata-initializer PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/accessor_init.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/common.cpp +) diff --git a/ydb/services/metadata/initializer/accessor_init.cpp b/ydb/services/metadata/initializer/accessor_init.cpp new file mode 100644 index 00000000000..99cf9a5a698 --- /dev/null +++ b/ydb/services/metadata/initializer/accessor_init.cpp @@ -0,0 +1,32 @@ +#include "accessor_init.h" +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> + +namespace NKikimr::NMetadataProvider { + +void TDSAccessorInitialized::Bootstrap() { + RegisterState(); + Modifiers.front()->Execute(SelfId(), Config.GetRequestConfig()); +} + +void TDSAccessorInitialized::Handle(NInternal::NRequest::TEvRequestFinished::TPtr& /*ev*/) { + if (Modifiers.empty()) { + return; + } + Modifiers.pop_front(); + if (Modifiers.size()) { + Modifiers.front()->Execute(SelfId(), Config.GetRequestConfig()); + } else { + OnInitialized(); + } +} + +TDSAccessorInitialized::TDSAccessorInitialized(const TConfig& config, const TVector<ITableModifier::TPtr>& modifiers) + : Config(config) +{ + Y_VERIFY(modifiers.size()); + for (auto&& i : modifiers) { + Modifiers.emplace_back(i); + } +} + +} diff --git a/ydb/services/metadata/ds_table/accessor_init.h b/ydb/services/metadata/initializer/accessor_init.h index 708ec3a3ddc..d278408ae2e 100644 --- a/ydb/services/metadata/ds_table/accessor_init.h +++ b/ydb/services/metadata/initializer/accessor_init.h @@ -1,6 +1,7 @@ #pragma once -#include "config.h" -#include "request_actor.h" +#include "common.h" + +#include <ydb/services/metadata/ds_table/config.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/event_local.h> @@ -9,26 +10,21 @@ namespace NKikimr::NMetadataProvider { -class TDSAccessorInitialized; - class TDSAccessorInitialized: public NActors::TActorBootstrapped<TDSAccessorInitialized> { private: - YDB_READONLY_FLAG(Initialized, false); + TDeque<ITableModifier::TPtr> Modifiers; protected: const TConfig& Config; - virtual Ydb::Table::CreateTableRequest GetTableSchema() const = 0; virtual void RegisterState() = 0; + virtual void OnInitialized() = 0; public: - TDSAccessorInitialized(const TConfig& config) - : Config(config) { - - } - void Bootstrap(const NActors::TActorContext& /*ctx*/); - virtual bool Handle(TEvRequestResult<TDialogCreateTable>::TPtr& ev); + void Bootstrap(); + TDSAccessorInitialized(const TConfig& config, const TVector<ITableModifier::TPtr>& modifiers); + void Handle(NInternal::NRequest::TEvRequestFinished::TPtr& ev); STATEFN(StateMain) { switch (ev->GetTypeRewrite()) { - hFunc(TEvRequestResult<TDialogCreateTable>, Handle); + hFunc(NInternal::NRequest::TEvRequestFinished, Handle); default: break; } diff --git a/ydb/services/metadata/ds_table/request_actor.cpp b/ydb/services/metadata/initializer/common.cpp index f9b541e80c0..fa5484fdbea 100644 --- a/ydb/services/metadata/ds_table/request_actor.cpp +++ b/ydb/services/metadata/initializer/common.cpp @@ -1,4 +1,4 @@ -#include "request_actor.h" +#include "common.h" namespace NKikimr::NMetadataProvider { diff --git a/ydb/services/metadata/initializer/common.h b/ydb/services/metadata/initializer/common.h new file mode 100644 index 00000000000..f9fac852785 --- /dev/null +++ b/ydb/services/metadata/initializer/common.h @@ -0,0 +1,34 @@ +#pragma once +#include <ydb/services/metadata/request/config.h> +#include <ydb/services/metadata/request/request_actor.h> + +namespace NKikimr::NMetadataProvider { + +class ITableModifier { +protected: + virtual bool DoExecute(const TActorId& resultCallbackId, const NInternal::NRequest::TConfig& config) const = 0; +public: + using TPtr = std::shared_ptr<ITableModifier>; + virtual ~ITableModifier() = default; + bool Execute(const TActorId& resultCallbackId, const NInternal::NRequest::TConfig& config) const { + return DoExecute(resultCallbackId, config); + } +}; + +template <class TDialogPolicy> +class TGenericTableModifier: public ITableModifier { +private: + YDB_READONLY_DEF(typename TDialogPolicy::TRequest, Request); +protected: + virtual bool DoExecute(const TActorId& resultCallbackId, const NInternal::NRequest::TConfig& config) const override { + TActivationContext::ActorSystem()->Register(new NInternal::NRequest::TYDBRequest<TDialogPolicy>(Request, resultCallbackId, config)); + return true; + } +public: + TGenericTableModifier(typename TDialogPolicy::TRequest& request) + : Request(request) { + + } +}; + +} diff --git a/ydb/services/metadata/request/CMakeLists.txt b/ydb/services/metadata/request/CMakeLists.txt new file mode 100644 index 00000000000..fe88f4b7c89 --- /dev/null +++ b/ydb/services/metadata/request/CMakeLists.txt @@ -0,0 +1,24 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(services-metadata-request) +target_link_libraries(services-metadata-request PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + ydb-core-base + core-grpc_services-local_rpc + core-grpc_services-base + ydb-core-grpc_services +) +target_sources(services-metadata-request PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/request/request_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/request/config.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/request/common.cpp +) diff --git a/ydb/services/metadata/request/common.cpp b/ydb/services/metadata/request/common.cpp new file mode 100644 index 00000000000..4ec9f4dde00 --- /dev/null +++ b/ydb/services/metadata/request/common.cpp @@ -0,0 +1,5 @@ +#include "common.h" + +namespace NKikimr::NInternal::NRequest { + +} diff --git a/ydb/services/metadata/request/common.h b/ydb/services/metadata/request/common.h new file mode 100644 index 00000000000..eb7100c9389 --- /dev/null +++ b/ydb/services/metadata/request/common.h @@ -0,0 +1,30 @@ +#pragma once +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/actorid.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/actor_virtual.h> +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <ydb/library/accessor/accessor.h> +#include <ydb/public/api/protos/ydb_table.pb.h> +#include <ydb/core/base/events.h> + +namespace NKikimr::NInternal::NRequest { + +enum EEvents { + EvCreateTableRequest = EventSpaceBegin(TKikimrEvents::ES_INTERNAL_REQUEST), + EvCreateTableInternalResponse, + EvCreateTableResponse, + EvSelectRequest, + EvSelectInternalResponse, + EvSelectResponse, + EvCreateSessionRequest, + EvCreateSessionInternalResponse, + EvCreateSessionResponse, + EvRequestFinished, + EvEnd +}; + +static_assert(EEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_INTERNAL_REQUEST), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_INTERNAL_REQUESTS)"); + +} diff --git a/ydb/services/metadata/request/config.cpp b/ydb/services/metadata/request/config.cpp new file mode 100644 index 00000000000..f70fb705bda --- /dev/null +++ b/ydb/services/metadata/request/config.cpp @@ -0,0 +1,21 @@ +#include "config.h" +#include <util/generic/ylimits.h> + +namespace NKikimr::NInternal::NRequest { + +bool TConfig::DeserializeFromProto(const NKikimrConfig::TInternalRequestConfig& config) { + RetryPeriodStart = TDuration::Seconds(config.GetRetryPeriodStartSeconds()); + RetryPeriodFinish = TDuration::Seconds(config.GetRetryPeriodFinishSeconds()); + if (RetryPeriodStart > RetryPeriodFinish) { + Cerr << "incorrect metadata provider config start/finish periods"; + std::swap(RetryPeriodStart, RetryPeriodFinish); + } + return true; +} + +TDuration TConfig::GetRetryPeriod(const ui32 retry) const { + const double kff = 1.0 / Max<double>(1.0, retry); + return RetryPeriodStart * kff + RetryPeriodFinish * (1 - kff); +} + +} diff --git a/ydb/services/metadata/request/config.h b/ydb/services/metadata/request/config.h new file mode 100644 index 00000000000..a770b8b3379 --- /dev/null +++ b/ydb/services/metadata/request/config.h @@ -0,0 +1,18 @@ +#pragma once +#include <ydb/library/accessor/accessor.h> +#include <ydb/core/protos/config.pb.h> +#include <util/datetime/base.h> + +namespace NKikimr::NInternal::NRequest { + +class TConfig { +private: + TDuration RetryPeriodStart = TDuration::Seconds(3); + TDuration RetryPeriodFinish = TDuration::Seconds(30); +public: + TConfig() = default; + + TDuration GetRetryPeriod(const ui32 retry) const; + bool DeserializeFromProto(const NKikimrConfig::TInternalRequestConfig& config); +}; +} diff --git a/ydb/services/metadata/request/request_actor.cpp b/ydb/services/metadata/request/request_actor.cpp new file mode 100644 index 00000000000..97ac6877667 --- /dev/null +++ b/ydb/services/metadata/request/request_actor.cpp @@ -0,0 +1,5 @@ +#include "request_actor.h" + +namespace NKikimr::NMetadataProvider::NRequest { + +} diff --git a/ydb/services/metadata/ds_table/request_actor.h b/ydb/services/metadata/request/request_actor.h index 5a2f44be52c..a5bf6064abc 100644 --- a/ydb/services/metadata/ds_table/request_actor.h +++ b/ydb/services/metadata/request/request_actor.h @@ -1,4 +1,5 @@ #pragma once +#include "common.h" #include "config.h" #include <library/cpp/actors/core/actor_virtual.h> @@ -8,9 +9,8 @@ #include <ydb/core/grpc_services/base/base.h> #include <ydb/core/grpc_services/local_rpc/local_rpc.h> #include <ydb/library/accessor/accessor.h> -#include <ydb/services/metadata/abstract/common.h> -namespace NKikimr::NMetadataProvider { +namespace NKikimr::NInternal::NRequest { template <class TRequestExt, class TResponseExt, ui32 EvStartExt, ui32 EvResultInternalExt, ui32 EvResultExt> class TDialogPolicyImpl { @@ -23,11 +23,11 @@ public: }; using TDialogCreateTable = TDialogPolicyImpl<Ydb::Table::CreateTableRequest, Ydb::Table::CreateTableResponse, - EEvSubscribe::EvCreateTableRequest, EEvSubscribe::EvCreateTableInternalResponse, EEvSubscribe::EvCreateTableResponse>; + EEvents::EvCreateTableRequest, EEvents::EvCreateTableInternalResponse, EEvents::EvCreateTableResponse>; using TDialogSelect = TDialogPolicyImpl<Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse, - EEvSubscribe::EvSelectRequest, EEvSubscribe::EvSelectInternalResponse, EEvSubscribe::EvSelectResponse>; + EEvents::EvSelectRequest, EEvents::EvSelectInternalResponse, EEvents::EvSelectResponse>; using TDialogCreateSession = TDialogPolicyImpl<Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse, - EEvSubscribe::EvCreateSessionRequest, EEvSubscribe::EvCreateSessionInternalResponse, EEvSubscribe::EvCreateSessionResponse>; + EEvents::EvCreateSessionRequest, EEvents::EvCreateSessionInternalResponse, EEvents::EvCreateSessionResponse>; template <class TDialogPolicy> class TEvRequestResult: public NActors::TEventLocal<TEvRequestResult<TDialogPolicy>, TDialogPolicy::EvResult> { @@ -35,12 +35,16 @@ private: YDB_READONLY_DEF(typename TDialogPolicy::TResponse, Result); public: TEvRequestResult(typename TDialogPolicy::TResponse&& result) - : Result(std::move(result)) - { + : Result(std::move(result)) { } }; +class TEvRequestFinished: public NActors::TEventLocal<TEvRequestFinished, EEvents::EvRequestFinished> { +public: + TEvRequestFinished() = default; +}; + template <class TResponse> class TOperatorChecker { public: @@ -111,6 +115,7 @@ public: return; } TBase::template Sender<TEvRequestResult<TDialogPolicy>>(std::move(response)).SendTo(ActorFinishId); + TBase::template Sender<TEvRequestFinished>().SendTo(ActorFinishId); TBase::Die(TActivationContext::AsActorContext()); } |