aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-10-24 18:22:51 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-10-24 18:22:51 +0300
commit0007642ecd1f43032cae2eefb5ede97d786bccbf (patch)
tree5a4220a39f33b416b0df8ff853c8adbae285a84a
parent7741bf7772cfd0c0421d37aaa53b1b200841b056 (diff)
downloadydb-0007642ecd1f43032cae2eefb5ede97d786bccbf.tar.gz
multiple operations for initialization
prepare for internal requests reusage move tests to appropriate file
-rw-r--r--ydb/core/base/events.h1
-rw-r--r--ydb/core/protos/config.proto8
-rw-r--r--ydb/core/tx/columnshard/external_data.cpp102
-rw-r--r--ydb/core/tx/columnshard/external_data.h54
-rw-r--r--ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/tx/columnshard/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_schema.cpp231
-rw-r--r--ydb/core/tx/columnshard/ut_columnshard_tiers.cpp254
-rw-r--r--ydb/services/metadata/CMakeLists.txt2
-rw-r--r--ydb/services/metadata/abstract/common.cpp11
-rw-r--r--ydb/services/metadata/abstract/common.h20
-rw-r--r--ydb/services/metadata/ds_table/CMakeLists.txt3
-rw-r--r--ydb/services/metadata/ds_table/accessor_init.cpp16
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.cpp17
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.h15
-rw-r--r--ydb/services/metadata/ds_table/accessor_subscribe.cpp23
-rw-r--r--ydb/services/metadata/ds_table/accessor_subscribe.h15
-rw-r--r--ydb/services/metadata/ds_table/config.cpp13
-rw-r--r--ydb/services/metadata/ds_table/config.h4
-rw-r--r--ydb/services/metadata/initializer/CMakeLists.txt24
-rw-r--r--ydb/services/metadata/initializer/accessor_init.cpp32
-rw-r--r--ydb/services/metadata/initializer/accessor_init.h (renamed from ydb/services/metadata/ds_table/accessor_init.h)22
-rw-r--r--ydb/services/metadata/initializer/common.cpp (renamed from ydb/services/metadata/ds_table/request_actor.cpp)2
-rw-r--r--ydb/services/metadata/initializer/common.h34
-rw-r--r--ydb/services/metadata/request/CMakeLists.txt24
-rw-r--r--ydb/services/metadata/request/common.cpp5
-rw-r--r--ydb/services/metadata/request/common.h30
-rw-r--r--ydb/services/metadata/request/config.cpp21
-rw-r--r--ydb/services/metadata/request/config.h18
-rw-r--r--ydb/services/metadata/request/request_actor.cpp5
-rw-r--r--ydb/services/metadata/request/request_actor.h (renamed from ydb/services/metadata/ds_table/request_actor.h)19
31 files changed, 631 insertions, 396 deletions
diff --git a/ydb/core/base/events.h b/ydb/core/base/events.h
index f30912e0748..e0d00c47671 100644
--- a/ydb/core/base/events.h
+++ b/ydb/core/base/events.h
@@ -149,6 +149,7 @@ struct TKikimrEvents : TEvents {
ES_BLOB_DEPOT,
ES_DATASHARD_LOAD,
ES_METADATA_PROVIDER,
+ ES_INTERNAL_REQUEST,
};
};
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 9dbbd9d748b..c04909f14ab 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -574,11 +574,15 @@ message TKQPConfig {
repeated NKikimrKqp.TKqpSetting Settings = 10;
}
+message TInternalRequestConfig {
+ optional uint32 RetryPeriodStartSeconds = 1 [default = 3];
+ optional uint32 RetryPeriodFinishSeconds = 2 [default = 30];
+}
+
message TMetadataProviderConfig {
optional bool Enabled = 1 [default = true];
optional uint32 RefreshPeriodSeconds = 2 [default = 10];
- optional uint32 RetryPeriodStartSeconds = 3 [default = 3];
- optional uint32 RetryPeriodFinishSeconds = 4 [default = 30];
+ optional TInternalRequestConfig RequestConfig = 3;
}
message TMemoryLogConfig {
diff --git a/ydb/core/tx/columnshard/external_data.cpp b/ydb/core/tx/columnshard/external_data.cpp
index e75ec77db85..5a95613ca81 100644
--- a/ydb/core/tx/columnshard/external_data.cpp
+++ b/ydb/core/tx/columnshard/external_data.cpp
@@ -1,63 +1,91 @@
#include "external_data.h"
+#include <library/cpp/json/writer/json_value.h>
+#include <library/cpp/protobuf/json/proto2json.h>
-namespace NKikimr::NColumnShard {
+namespace NKikimr::NColumnShard::NTiers {
-std::optional<TString> TCSKVSnapshot::GetValue(const TString& key) const {
- auto it = Data.find(key);
- if (it == Data.end()) {
- return {};
- } else {
- return it->second;
- }
-}
-
-bool TCSKVSnapshot::DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) {
- if (rawData.columns().size() != 2) {
- Cerr << "incorrect proto columns info" << Endl;
- return false;
- }
- i32 keyIdx = -1;
- i32 valueIdx = -1;
- ui32 idx = 0;
- for (auto&& i : rawData.columns()) {
- if (i.name() == "key") {
- keyIdx = idx;
- } else if (i.name() == "value") {
- valueIdx = idx;
- }
- ++idx;
- }
- if (keyIdx < 0 || valueIdx < 0) {
- Cerr << "incorrect table columns";
+bool TConfigsSnapshot::DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) {
+ const i32 ownerPathIdx = GetFieldIndex(rawData, "ownerPath");
+ const i32 tierNameIdx = GetFieldIndex(rawData, "tierName");
+ const i32 tierConfigIdx = GetFieldIndex(rawData, "tierConfig");
+ if (tierNameIdx < 0 || tierConfigIdx < 0 || ownerPathIdx < 0) {
+ ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "incorrect tiers config table structure";
return false;
}
for (auto&& r : rawData.rows()) {
- TString key(r.items()[keyIdx].bytes_value());
- TString value(r.items()[valueIdx].bytes_value());
- if (!Data.emplace(key, value).second) {
- Cerr << "keys duplication: " << key;
+ TConfig config(r.items()[ownerPathIdx].bytes_value(), r.items()[tierNameIdx].bytes_value());
+ if (!config.DeserializeFromString(r.items()[tierConfigIdx].bytes_value())) {
+ ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "cannot parse tier config from snapshot";
+ return false;
+ }
+ if (!Data.emplace(config.GetConfigId(), config).second) {
+ ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "tier names duplication: " << config.GetTierName();
return false;
}
}
return true;
}
-Ydb::Table::CreateTableRequest TSnapshotConstructor::DoGetTableSchema() const {
+std::optional<TConfig> TConfigsSnapshot::GetValue(const TString& key) const {
+ auto it = Data.find(key);
+ if (it == Data.end()) {
+ return {};
+ } else {
+ return it->second;
+ }
+}
+
+TVector<NMetadataProvider::ITableModifier::TPtr> TSnapshotConstructor::DoGetTableSchema() const {
Ydb::Table::CreateTableRequest request;
request.set_session_id("");
request.set_path(TablePath);
- request.add_primary_key("key");
+ request.add_primary_key("ownerPath");
+ request.add_primary_key("tierName");
+ {
+ auto& column = *request.add_columns();
+ column.set_name("tierName");
+ column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
+ }
{
auto& column = *request.add_columns();
- column.set_name("key");
+ column.set_name("ownerPath");
column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
}
{
auto& column = *request.add_columns();
- column.set_name("value");
+ column.set_name("tierConfig");
column.mutable_type()->mutable_optional_type()->mutable_item()->set_type_id(Ydb::Type::STRING);
}
- return request;
+ NMetadataProvider::ITableModifier::TPtr result(
+ new NMetadataProvider::TGenericTableModifier<NInternal::NRequest::TDialogCreateTable>(request));
+ return { result };
+}
+
+TString NTiers::TConfigsSnapshot::DoSerializeToString() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ for (auto&& i : Data) {
+ result.InsertValue(i.first, i.second.SerializeToJson());
+ }
+ return result.GetStringRobust();
+}
+
+bool NTiers::TConfig::DeserializeFromString(const TString& configProtoStr) {
+ if (!::google::protobuf::TextFormat::ParseFromString(configProtoStr, &ProtoConfig)) {
+ ALS_ERROR(NKikimrServices::TX_COLUMNSHARD) << "cannot parse proto string: " << configProtoStr;
+ return false;
+ }
+ return true;
+}
+
+NJson::TJsonValue TConfig::SerializeToJson() const {
+ NJson::TJsonValue result = NJson::JSON_MAP;
+ result.InsertValue("tierName", TierName);
+ NProtobufJson::Proto2Json(ProtoConfig, result.InsertValue("tierConfig", NJson::JSON_MAP));
+ return result;
+}
+
+bool TConfig::IsSame(const TConfig& item) const {
+ return TierName == item.TierName && ProtoConfig.SerializeAsString() == item.ProtoConfig.SerializeAsString();
}
}
diff --git a/ydb/core/tx/columnshard/external_data.h b/ydb/core/tx/columnshard/external_data.h
index 500e7d94d77..e070fa5ba21 100644
--- a/ydb/core/tx/columnshard/external_data.h
+++ b/ydb/core/tx/columnshard/external_data.h
@@ -1,33 +1,57 @@
#pragma once
#include <ydb/services/metadata/service.h>
+#include <ydb/core/protos/flat_scheme_op.pb.h>
-namespace NKikimr::NColumnShard {
+#include <library/cpp/json/writer/json_value.h>
-class TCSKVSnapshot: public NMetadataProvider::ISnapshot {
+namespace NKikimr::NColumnShard::NTiers {
+
+class TConfig {
+private:
+ using TTierProto = NKikimrSchemeOp::TStorageTierConfig;
+ using TS3SettingsProto = NKikimrSchemeOp::TS3Settings;
+ YDB_READONLY_DEF(TString, OwnerPath);
+ YDB_READONLY_DEF(TString, TierName);
+ YDB_READONLY_DEF(TTierProto, ProtoConfig);
+public:
+ TConfig() = default;
+ TConfig(const TString& ownerPath, const TString& tierName)
+ : OwnerPath(ownerPath)
+ , TierName(tierName)
+ {
+
+ }
+
+ TString GetConfigId() const {
+ return OwnerPath + "." + TierName;
+ }
+
+ bool NeedExport() const {
+ return ProtoConfig.HasObjectStorage();
+ }
+ bool IsSame(const TConfig& item) const;
+ bool DeserializeFromString(const TString& configProtoStr);
+ NJson::TJsonValue SerializeToJson() const;
+};
+
+class TConfigsSnapshot: public NMetadataProvider::ISnapshot {
private:
using TBase = NMetadataProvider::ISnapshot;
- using TKVData = TMap<TString, TString>;
- YDB_READONLY_DEF(TKVData, Data);
+ using TConfigsMap = TMap<TString, TConfig>;
+ YDB_READONLY_DEF(TConfigsMap, Data);
protected:
virtual bool DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) override;
- virtual TString DoSerializeToString() const override {
- TStringBuilder sb;
- for (auto&& i : Data) {
- sb << i.first << "=" << i.second << ";";
- }
- return sb;
- }
+ virtual TString DoSerializeToString() const override;
public:
- std::optional<TString> GetValue(const TString& key) const;
-
+ std::optional<TConfig> GetValue(const TString& key) const;
using TBase::TBase;
};
-class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TCSKVSnapshot> {
+class TSnapshotConstructor: public NMetadataProvider::TGenericSnapshotParser<TConfigsSnapshot> {
private:
const TString TablePath;
protected:
- virtual Ydb::Table::CreateTableRequest DoGetTableSchema() const override;
+ virtual TVector<NMetadataProvider::ITableModifier::TPtr> DoGetTableSchema() const override;
virtual const TString& DoGetTablePath() const override {
return TablePath;
}
diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt b/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt
index ac703050476..14d9ad8173b 100644
--- a/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt
+++ b/ydb/core/tx/columnshard/ut/CMakeLists.darwin.txt
@@ -40,6 +40,7 @@ target_sources(ydb-core-tx-columnshard-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp
)
add_test(
NAME
diff --git a/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt b/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt
index 4de436d1222..1b40ae04947 100644
--- a/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt
+++ b/ydb/core/tx/columnshard/ut/CMakeLists.linux.txt
@@ -44,6 +44,7 @@ target_sources(ydb-core-tx-columnshard-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/columnshard_ut_common.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_read_write.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp
)
add_test(
NAME
diff --git a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
index c0aebd18894..b530b362afd 100644
--- a/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
+++ b/ydb/core/tx/columnshard/ut_columnshard_schema.cpp
@@ -1,10 +1,8 @@
#include "columnshard_ut_common.h"
-#include "external_data.h"
#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
#include <ydb/core/wrappers/s3_wrapper.h>
#include <ydb/services/metadata/service.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
-//#include <ydb/core/testlib/cs_helper.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
@@ -702,238 +700,9 @@ void TestDrop(bool reboots) {
namespace NColumnShard {
extern bool gAllowLogBatchingDefaultValue;
}
-/*
-class TLocalHelper: public Tests::NCS::THelper {
-private:
- using TBase = Tests::NCS::THelper;
-public:
- using TBase::TBase;
- void CreateTestOlapTable(TString tableName = "olapTable", TString storeName = "olapStore",
- ui32 storeShardsCount = 4, ui32 tableShardsCount = 3,
- TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") {
- TActorId sender = Server.GetRuntime()->AllocateEdgeActor();
- CreateTestOlapStore(sender, Sprintf(R"(
- Name: "%s"
- ColumnShardCount: %d
- SchemaPresets {
- Name: "default"
- Schema {
- Columns { Name: "timestamp" Type: "Timestamp" }
- #Columns { Name: "resource_type" Type: "Utf8" }
- Columns { Name: "resource_id" Type: "Utf8" }
- Columns { Name: "uid" Type: "Utf8" }
- Columns { Name: "level" Type: "Int32" }
- Columns { Name: "message" Type: "Utf8" }
- #Columns { Name: "json_payload" Type: "Json" }
- #Columns { Name: "ingested_at" Type: "Timestamp" }
- #Columns { Name: "saved_at" Type: "Timestamp" }
- #Columns { Name: "request_id" Type: "Utf8" }
- KeyColumnNames: "timestamp"
- Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
- StorageTiers {
- Name: "tier1"
- ObjectStorage {
- Endpoint: "fake"
- AccessKey: "$a"
- }
- }
- }
- }
- )", storeName.c_str(), storeShardsCount));
-
- TString shardingColumns = "[\"timestamp\", \"uid\"]";
- if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") {
- shardingColumns = "[\"uid\"]";
- }
-
- TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"(
- Name: "%s"
- ColumnShardCount: %d
- Sharding {
- HashSharding {
- Function: %s
- Columns: %s
- }
- })", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str()));
- }
-};
-*/
Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
- class TTestCSEmulator: public NActors::TActorBootstrapped<TTestCSEmulator> {
- private:
- using TBase = NActors::TActorBootstrapped<TTestCSEmulator>;
- std::shared_ptr<TSnapshotConstructor> ExternalDataManipulation;
- TActorId ProviderId;
- TInstant Start;
- YDB_READONLY_FLAG(Found, false);
- public:
- STFUNC(StateInit) {
- switch (ev->GetTypeRewrite()) {
- HFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle);
- default:
- Y_VERIFY(false);
- }
- }
- void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev, const TActorContext&) {
- auto value = ev->Get()->GetSnapshotAs<TCSKVSnapshot>()->GetValue("/Root/olapStore.tier1.a");
- if (value && *value == "b") {
- FoundFlag = true;
- } else {
- Cerr << ev->Get()->GetSnapshot()->SerializeToString() << Endl;
- }
- }
-
- void Bootstrap() {
- ProviderId = NMetadataProvider::MakeServiceId(1);
- ExternalDataManipulation = std::make_shared<TSnapshotConstructor>("/Root/.external_data/kv_settings");
- Become(&TThis::StateInit);
- Sender<NMetadataProvider::TEvSubscribeExternal>(ExternalDataManipulation).SendTo(ProviderId);
- Start = Now();
- }
- };
-
- Y_UNIT_TEST(DSConfigsStub) {
- TPortManager pm;
-
- ui32 grpcPort = pm.GetPort();
- ui32 msgbPort = pm.GetPort();
-
- Tests::TServerSettings serverSettings(msgbPort);
- serverSettings.Port = msgbPort;
- serverSettings.GrpcPort = grpcPort;
- serverSettings.SetDomainName("Root")
- .SetUseRealThreads(false)
- .SetEnableMetadataProvider(true)
- .SetEnableKqpSessionActor(true)
- .SetEnableOlapSchemaOperations(true);
- ;
-
- Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
- server->EnableGRpc(grpcPort);
- // server->SetupDefaultProfiles();
-
- Tests::TClient client(serverSettings);
-
- auto& runtime = *server->GetRuntime();
-
- auto sender = runtime.AllocateEdgeActor();
- server->SetupRootStoragePools(sender);
- {
- TTestCSEmulator* emulator = new TTestCSEmulator;
- runtime.Register(emulator);
- {
- const TInstant start = Now();
- while (Now() - start < TDuration::Seconds(10)) {
- runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
- Cerr << "Step " << event->Type << Endl;
- return false;
- }, {}, TDuration::Seconds(1));
- Sleep(TDuration::Seconds(1));
- Cerr << "Step finished" << Endl;
- }
- }
-
- NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
- tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
- auto sResult = f.GetValueSync();
- Cerr << sResult.GetIssues().ToString() << Endl;
- auto session = sResult.GetSession();
- session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/kv_settings` (key, value) VALUES ('/Root/olapStore.tier1.a', 'b')"
- , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
- });
-
- const TInstant start = Now();
- while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
- runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10));
- }
- Y_VERIFY(emulator->IsFound());
- }
- //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
- //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
- }
-
- /*
- Y_UNIT_TEST(DSConfigs) {
- TPortManager pm;
-
- ui32 grpcPort = pm.GetPort();
- ui32 msgbPort = pm.GetPort();
-
- Tests::TServerSettings serverSettings(msgbPort);
- serverSettings.Port = msgbPort;
- serverSettings.GrpcPort = grpcPort;
- serverSettings.SetDomainName("Root")
- .SetUseRealThreads(false)
- .SetEnableMetadataProvider(true)
- .SetEnableKqpSessionActor(true)
- .SetEnableOlapSchemaOperations(true);
- ;
-
- Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
- server->EnableGRpc(grpcPort);
-// server->SetupDefaultProfiles();
-
- Tests::TClient client(serverSettings);
-
- auto& runtime = *server->GetRuntime();
-
- auto sender = runtime.AllocateEdgeActor();
- server->InitRoot(sender);
- TLocalHelper lHelper(*server);
- lHelper.CreateTestOlapTable();
-
- runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
- const TInstant start = Now();
- while (Now() - start < TDuration::Seconds(10)) {
- runtime.WaitForEdgeEvents([](TTestActorRuntimeBase& , TAutoPtr<IEventHandle>& event) {
- Cerr << "Step " << event->Type << Endl;
- return false;
- }, {}, TDuration::Seconds(1));
- Sleep(TDuration::Seconds(1));
- Cerr << "Step finished" << Endl;
- }
-
- NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
- tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
- auto sResult = f.GetValueSync();
- Cerr << sResult.GetIssues().ToString() << Endl;
- auto session = sResult.GetSession();
- session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/kv_settings` (key, value) VALUES ('/Root/olapStore.tier1.a', 'b')"
- , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
- });
-
- bool found = false;
- bool* foundPtr = &found;
- const auto pred = [foundPtr](TTestActorRuntimeBase& , TAutoPtr<IEventHandle>& event)->TTestActorRuntimeBase::EEventAction {
- if (event->HasBuffer() && !event->HasEvent()) {
- } else if (!event->GetBase()) {
- Cerr << "Type nullptr" << Endl;
- } else {
- Cerr << "Step " << event->GetBase()->Type() << Endl;
- auto ptr = dynamic_cast<NMetadataProvider::TEvRefreshSubscriberData*>(event->GetBase());
- if (ptr) {
- auto value = ptr->GetSnapshotAs<TCSKVSnapshot>()->GetValue("/Root/olapStore.tier1.a");
- if (value && *value == "b") {
- *foundPtr = true;
- } else {
- Cerr << ptr->GetSnapshot()->SerializeToString() << Endl;
- }
- }
- }
- return TTestActorRuntimeBase::EEventAction::PROCESS;
- };
-
- runtime.SetObserverFunc(pred);
-
- while (!found) {
- runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10));
- }
- //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
- //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
- }
-*/
Y_UNIT_TEST(CreateTable) {
ui64 tableId = 1;
diff --git a/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp b/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp
new file mode 100644
index 00000000000..521ed7aa450
--- /dev/null
+++ b/ydb/core/tx/columnshard/ut_columnshard_tiers.cpp
@@ -0,0 +1,254 @@
+#include "columnshard_ut_common.h"
+#include "external_data.h"
+#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
+#include <ydb/core/wrappers/s3_wrapper.h>
+#include <ydb/services/metadata/service.h>
+#include <ydb/core/cms/console/configs_dispatcher.h>
+#include <ydb/core/testlib/cs_helper.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+#include <library/cpp/actors/core/av_bootstrapped.h>
+
+#include <util/system/hostname.h>
+
+namespace NKikimr {
+
+using namespace NColumnShard;
+
+class TLocalHelper: public Tests::NCS::THelper {
+private:
+ using TBase = Tests::NCS::THelper;
+public:
+ using TBase::TBase;
+ void CreateTestOlapTable(TString tableName = "olapTable", TString storeName = "olapStore",
+ ui32 storeShardsCount = 4, ui32 tableShardsCount = 3,
+ TString shardingFunction = "HASH_FUNCTION_CLOUD_LOGS") {
+ TActorId sender = Server.GetRuntime()->AllocateEdgeActor();
+ CreateTestOlapStore(sender, Sprintf(R"(
+ Name: "%s"
+ ColumnShardCount: %d
+ SchemaPresets {
+ Name: "default"
+ Schema {
+ Columns { Name: "timestamp" Type: "Timestamp" }
+ #Columns { Name: "resource_type" Type: "Utf8" }
+ Columns { Name: "resource_id" Type: "Utf8" }
+ Columns { Name: "uid" Type: "Utf8" }
+ Columns { Name: "level" Type: "Int32" }
+ Columns { Name: "message" Type: "Utf8" }
+ #Columns { Name: "json_payload" Type: "Json" }
+ #Columns { Name: "ingested_at" Type: "Timestamp" }
+ #Columns { Name: "saved_at" Type: "Timestamp" }
+ #Columns { Name: "request_id" Type: "Utf8" }
+ KeyColumnNames: "timestamp"
+ Engine: COLUMN_ENGINE_REPLACING_TIMESERIES
+ StorageTiers {
+ Name: "tier1"
+ ObjectStorage {
+ Endpoint: "fake"
+ AccessKey: "$a"
+ }
+ }
+ }
+ }
+ )", storeName.c_str(), storeShardsCount));
+
+ TString shardingColumns = "[\"timestamp\", \"uid\"]";
+ if (shardingFunction != "HASH_FUNCTION_CLOUD_LOGS") {
+ shardingColumns = "[\"uid\"]";
+ }
+
+ TBase::CreateTestOlapTable(sender, storeName, Sprintf(R"(
+ Name: "%s"
+ ColumnShardCount: %d
+ Sharding {
+ HashSharding {
+ Function: %s
+ Columns: %s
+ }
+ })", tableName.c_str(), tableShardsCount, shardingFunction.c_str(), shardingColumns.c_str()));
+ }
+};
+
+
+Y_UNIT_TEST_SUITE(ColumnShardTiers) {
+
+ const TString ConfigProtoStr = "Name : \"abc\"";
+
+ class TTestCSEmulator: public NActors::TActorBootstrapped<TTestCSEmulator> {
+ private:
+ using TBase = NActors::TActorBootstrapped<TTestCSEmulator>;
+ std::shared_ptr<NTiers::TSnapshotConstructor> ExternalDataManipulation;
+ TActorId ProviderId;
+ TInstant Start;
+ YDB_READONLY_FLAG(Found, false);
+ public:
+ STFUNC(StateInit) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle);
+ default:
+ Y_VERIFY(false);
+ }
+ }
+
+ void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev, const TActorContext&) {
+ auto value = ev->Get()->GetSnapshotAs<NTiers::TConfigsSnapshot>()->GetValue("/Root/olapStore.tier1");
+ if (value && value->GetProtoConfig().GetName() == "abc") {
+ FoundFlag = true;
+ } else {
+ Cerr << ev->Get()->GetSnapshot()->SerializeToString() << Endl;
+ }
+ }
+
+ void Bootstrap() {
+ ProviderId = NMetadataProvider::MakeServiceId(1);
+ ExternalDataManipulation = std::make_shared<NTiers::TSnapshotConstructor>("/Root/.external_data/tiers");
+ Become(&TThis::StateInit);
+ Sender<NMetadataProvider::TEvSubscribeExternal>(ExternalDataManipulation).SendTo(ProviderId);
+ Start = Now();
+ }
+ };
+
+ Y_UNIT_TEST(DSConfigsStub) {
+ TPortManager pm;
+
+ ui32 grpcPort = pm.GetPort();
+ ui32 msgbPort = pm.GetPort();
+
+ Tests::TServerSettings serverSettings(msgbPort);
+ serverSettings.Port = msgbPort;
+ serverSettings.GrpcPort = grpcPort;
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMetadataProvider(true)
+ .SetEnableKqpSessionActor(true)
+ .SetEnableOlapSchemaOperations(true);
+ ;
+
+ Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
+ server->EnableGRpc(grpcPort);
+ // server->SetupDefaultProfiles();
+
+ Tests::TClient client(serverSettings);
+
+ auto& runtime = *server->GetRuntime();
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+ {
+ TTestCSEmulator* emulator = new TTestCSEmulator;
+ runtime.Register(emulator);
+ {
+ const TInstant start = Now();
+ while (Now() - start < TDuration::Seconds(10)) {
+ runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
+ Cerr << "Step " << event->Type << Endl;
+ return false;
+ }, {}, TDuration::Seconds(1));
+ Sleep(TDuration::Seconds(1));
+ Cerr << "Step finished" << Endl;
+ }
+ }
+
+ NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto sResult = f.GetValueSync();
+ Cerr << sResult.GetIssues().ToString() << Endl;
+ auto session = sResult.GetSession();
+ session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')"
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ });
+
+ const TInstant start = Now();
+ while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
+ runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10));
+ }
+ Y_VERIFY(emulator->IsFound());
+ }
+ //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
+ //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
+ }
+/*
+ Y_UNIT_TEST(DSConfigs) {
+ TPortManager pm;
+
+ ui32 grpcPort = pm.GetPort();
+ ui32 msgbPort = pm.GetPort();
+
+ Tests::TServerSettings serverSettings(msgbPort);
+ serverSettings.Port = msgbPort;
+ serverSettings.GrpcPort = grpcPort;
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetEnableMetadataProvider(true)
+ .SetEnableKqpSessionActor(true)
+ .SetEnableOlapSchemaOperations(true);
+ ;
+
+ Tests::TServer::TPtr server = new Tests::TServer(serverSettings);
+ server->EnableGRpc(grpcPort);
+ // server->SetupDefaultProfiles();
+
+ Tests::TClient client(serverSettings);
+
+ auto& runtime = *server->GetRuntime();
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+ TLocalHelper lHelper(*server);
+ lHelper.CreateTestOlapTable();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ const TInstant start = Now();
+ while (Now() - start < TDuration::Seconds(10)) {
+ runtime.WaitForEdgeEvents([](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) {
+ Cerr << "Step " << event->Type << Endl;
+ return false;
+ }, {}, TDuration::Seconds(1));
+ Sleep(TDuration::Seconds(1));
+ Cerr << "Step finished" << Endl;
+ }
+
+ NYdb::NTable::TTableClient tClient(server->GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ tClient.CreateSession().Subscribe([](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ auto sResult = f.GetValueSync();
+ Cerr << sResult.GetIssues().ToString() << Endl;
+ auto session = sResult.GetSession();
+ session.ExecuteDataQuery("INSERT INTO `/Root/.external_data/tiers` (ownerPath, tierName, tierConfig) VALUES ('/Root/olapStore', 'tier1', '" + ConfigProtoStr + "')"
+ , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx());
+ });
+
+ bool found = false;
+ bool* foundPtr = &found;
+ const auto pred = [foundPtr](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event)->TTestActorRuntimeBase::EEventAction {
+ if (event->HasBuffer() && !event->HasEvent()) {
+ } else if (!event->GetBase()) {
+ Cerr << "Type nullptr" << Endl;
+ } else {
+ Cerr << "Step " << event->GetBase()->Type() << Endl;
+ auto ptr = dynamic_cast<NMetadataProvider::TEvRefreshSubscriberData*>(event->GetBase());
+ if (ptr) {
+ auto value = ptr->GetSnapshotAs<NTiers::TConfigsSnapshot>()->GetValue("/Root/olapStore.tier1");
+ if (value && value->GetProtoConfig().GetName() == "abc") {
+ *foundPtr = true;
+ } else {
+ Cerr << ptr->GetSnapshot()->SerializeToString() << Endl;
+ }
+ }
+ }
+ return TTestActorRuntimeBase::EEventAction::PROCESS;
+ };
+
+ runtime.SetObserverFunc(pred);
+
+ while (!found) {
+ runtime.DispatchEvents(TDispatchOptions(), TDuration::Seconds(10));
+ }
+ //runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_TRACE);
+ //runtime.SetLogPriority(NKikimrServices::KQP_YQL, NLog::PRI_TRACE);
+ }
+*/
+}
+}
diff --git a/ydb/services/metadata/CMakeLists.txt b/ydb/services/metadata/CMakeLists.txt
index 13463c41c23..55a81045b7a 100644
--- a/ydb/services/metadata/CMakeLists.txt
+++ b/ydb/services/metadata/CMakeLists.txt
@@ -8,6 +8,8 @@
add_subdirectory(abstract)
add_subdirectory(ds_table)
+add_subdirectory(initializer)
+add_subdirectory(request)
add_library(ydb-services-metadata)
target_link_libraries(ydb-services-metadata PUBLIC
diff --git a/ydb/services/metadata/abstract/common.cpp b/ydb/services/metadata/abstract/common.cpp
index fa5484fdbea..eddddb14b11 100644
--- a/ydb/services/metadata/abstract/common.cpp
+++ b/ydb/services/metadata/abstract/common.cpp
@@ -2,4 +2,15 @@
namespace NKikimr::NMetadataProvider {
+i32 ISnapshot::GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId) const {
+ i32 idx = 0;
+ for (auto&& i : rawData.columns()) {
+ if (i.name() == columnId) {
+ return idx;
+ }
+ ++idx;
+ }
+ return -1;
+}
+
}
diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h
index dd235bacdca..782d9695fbb 100644
--- a/ydb/services/metadata/abstract/common.h
+++ b/ydb/services/metadata/abstract/common.h
@@ -5,23 +5,15 @@
#include <library/cpp/actors/core/actor_virtual.h>
#include <library/cpp/actors/core/actorsystem.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <ydb/core/base/events.h>
#include <ydb/library/accessor/accessor.h>
#include <ydb/public/api/protos/ydb_table.pb.h>
-#include <ydb/core/base/events.h>
+#include <ydb/services/metadata/initializer/common.h>
namespace NKikimr::NMetadataProvider {
enum EEvSubscribe {
EvRefreshSubscriberData = EventSpaceBegin(TKikimrEvents::ES_METADATA_PROVIDER),
- EvCreateTableRequest,
- EvCreateTableInternalResponse,
- EvCreateTableResponse,
- EvSelectRequest,
- EvSelectInternalResponse,
- EvSelectResponse,
- EvCreateSessionRequest,
- EvCreateSessionInternalResponse,
- EvCreateSessionResponse,
EvRefresh,
EvSubscribeLocal,
EvUnsubscribeLocal,
@@ -30,7 +22,7 @@ enum EEvSubscribe {
EvEnd
};
-static_assert(EEvSubscribe::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_TABLET_PIPE)");
+static_assert(EEvSubscribe::EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_METADATA_PROVIDER)");
class ISnapshot {
private:
@@ -38,7 +30,7 @@ private:
protected:
virtual bool DoDeserializeFromResultSet(const Ydb::ResultSet& rawData) = 0;
virtual TString DoSerializeToString() const = 0;
-
+ i32 GetFieldIndex(const Ydb::ResultSet& rawData, const TString& columnId) const;
public:
using TPtr = std::shared_ptr<ISnapshot>;
ISnapshot(const TInstant actuality)
@@ -60,7 +52,7 @@ public:
class ISnapshotParser {
protected:
virtual ISnapshot::TPtr CreateSnapshot(const TInstant actuality) const = 0;
- virtual Ydb::Table::CreateTableRequest DoGetTableSchema() const = 0;
+ virtual TVector<ITableModifier::TPtr> DoGetTableSchema() const = 0;
virtual const TString& DoGetTablePath() const = 0;
public:
using TPtr = std::shared_ptr<ISnapshotParser>;
@@ -74,7 +66,7 @@ public:
return result;
}
- Ydb::Table::CreateTableRequest GetTableSchema() const {
+ TVector<ITableModifier::TPtr> GetTableSchema() const {
return DoGetTableSchema();
}
diff --git a/ydb/services/metadata/ds_table/CMakeLists.txt b/ydb/services/metadata/ds_table/CMakeLists.txt
index 696e31532c4..137a58c0d35 100644
--- a/ydb/services/metadata/ds_table/CMakeLists.txt
+++ b/ydb/services/metadata/ds_table/CMakeLists.txt
@@ -16,12 +16,11 @@ target_link_libraries(services-metadata-ds_table PUBLIC
core-grpc_services-local_rpc
core-grpc_services-base
ydb-core-grpc_services
+ services-metadata-initializer
)
target_sources(services-metadata-ds_table PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_init.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_refresh.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp
- ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/request_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp
)
diff --git a/ydb/services/metadata/ds_table/accessor_init.cpp b/ydb/services/metadata/ds_table/accessor_init.cpp
deleted file mode 100644
index e4100858173..00000000000
--- a/ydb/services/metadata/ds_table/accessor_init.cpp
+++ /dev/null
@@ -1,16 +0,0 @@
-#include "accessor_init.h"
-#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
-
-namespace NKikimr::NMetadataProvider {
-
-bool TDSAccessorInitialized::Handle(TEvRequestResult<TDialogCreateTable>::TPtr& /*ev*/) {
- InitializedFlag = true;
- return true;
-}
-
-void TDSAccessorInitialized::Bootstrap(const NActors::TActorContext& /*ctx*/) {
- RegisterState();
- Register(new TYDBRequest<TDialogCreateTable>(GetTableSchema(), SelfId(), Config));
-}
-
-}
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp
index 56828775e88..339b5916ee7 100644
--- a/ydb/services/metadata/ds_table/accessor_refresh.cpp
+++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp
@@ -7,6 +7,7 @@
#include <util/string/escape.h>
namespace NKikimr::NMetadataProvider {
+using namespace NInternal::NRequest;
bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) {
const TString startString = CurrentSelection.SerializeAsString();
@@ -27,6 +28,7 @@ bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) {
Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != startString) {
ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << CurrentSelection.DebugString();
+ OnSnapshotModified();
return true;
}
return false;
@@ -39,28 +41,23 @@ void TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateSession>::TPtr&
const TString sessionId = session.session_id();
Y_VERIFY(sessionId);
Ydb::Table::ExecuteDataQueryRequest request;
- request.mutable_query()->set_yql_text("SELECT * FROM `" + EscapeC(GetTableName()) + "`");
+ request.mutable_query()->set_yql_text("SELECT * FROM `" + EscapeC(SnapshotConstructor->GetTablePath()) + "`");
request.set_session_id(sessionId);
request.mutable_tx_control()->mutable_begin_tx()->mutable_snapshot_read_only();
- Register(new TYDBRequest<TDialogSelect>(std::move(request), SelfId(), Config));
+ Register(new TYDBRequest<TDialogSelect>(std::move(request), SelfId(), Config.GetRequestConfig()));
}
void TDSAccessorRefresher::Handle(TEvRefresh::TPtr& /*ev*/) {
- Y_VERIFY(IsInitialized());
- Register(new TYDBRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), SelfId(), Config));
+ Register(new TYDBRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), SelfId(), Config.GetRequestConfig()));
}
-bool TDSAccessorRefresher::Handle(TEvRequestResult<TDialogCreateTable>::TPtr& ev) {
- if (!TBase::Handle(ev)) {
- return false;
- }
+void TDSAccessorRefresher::OnInitialized() {
Sender<TEvRefresh>().SendTo(SelfId());
- return true;
}
TDSAccessorRefresher::TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor)
- : TBase(config)
+ : TBase(config, snapshotConstructor->GetTableSchema())
, SnapshotConstructor(snapshotConstructor)
{
}
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h
index c260f396b18..ceb21233648 100644
--- a/ydb/services/metadata/ds_table/accessor_refresh.h
+++ b/ydb/services/metadata/ds_table/accessor_refresh.h
@@ -1,7 +1,7 @@
#pragma once
-#include "accessor_init.h"
#include <ydb/public/api/protos/ydb_value.pb.h>
#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/initializer/accessor_init.h>
namespace NKikimr::NMetadataProvider {
@@ -19,19 +19,18 @@ private:
YDB_READONLY_DEF(Ydb::ResultSet, CurrentSelection);
TInstant RequestedActuality = TInstant::Zero();
protected:
- virtual TString GetTableName() const = 0;
bool IsReady() const {
return !!CurrentSnapshot;
}
+ virtual void OnInitialized() override;
+ virtual void OnSnapshotModified() = 0;
public:
using TBase::Handle;
- virtual bool Handle(TEvRequestResult<TDialogCreateTable>::TPtr& ev) override;
STFUNC(StateMain) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvRequestResult<TDialogSelect>, Handle);
- hFunc(TEvRequestResult<TDialogCreateTable>, Handle);
- hFunc(TEvRequestResult<TDialogCreateSession>, Handle);
+ hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>, Handle);
+ hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>, Handle);
hFunc(TEvRefresh, Handle);
default:
TBase::StateMain(ev, ctx);
@@ -40,8 +39,8 @@ public:
TDSAccessorRefresher(const TConfig& config, ISnapshotParser::TPtr snapshotConstructor);
- virtual bool Handle(TEvRequestResult<TDialogSelect>::TPtr& ev);
- void Handle(TEvRequestResult<TDialogCreateSession>::TPtr& ev);
+ bool Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogSelect>::TPtr& ev);
+ void Handle(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>::TPtr& ev);
void Handle(TEvRefresh::TPtr& ev);
};
diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.cpp b/ydb/services/metadata/ds_table/accessor_subscribe.cpp
index 707432c65a2..93a41f8c481 100644
--- a/ydb/services/metadata/ds_table/accessor_subscribe.cpp
+++ b/ydb/services/metadata/ds_table/accessor_subscribe.cpp
@@ -3,20 +3,15 @@
namespace NKikimr::NMetadataProvider {
-bool TDSAccessorNotifier::Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) {
- if (TBase::Handle(ev)) {
- auto snapshot = GetCurrentSnapshot();
- if (!snapshot) {
- ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot construct snapshot";
- return false;
- }
-
- for (auto&& i : Subscribed) {
- Send(i, new TEvRefreshSubscriberData(snapshot));
- }
- return true;
- } else {
- return false;
+void TDSAccessorNotifier::OnSnapshotModified() {
+ auto snapshot = GetCurrentSnapshot();
+ if (!snapshot) {
+ ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot construct snapshot";
+ return;
+ }
+
+ for (auto&& i : Subscribed) {
+ Send(i, new TEvRefreshSubscriberData(snapshot));
}
}
diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.h b/ydb/services/metadata/ds_table/accessor_subscribe.h
index 5ce80b08dc5..e082a714e89 100644
--- a/ydb/services/metadata/ds_table/accessor_subscribe.h
+++ b/ydb/services/metadata/ds_table/accessor_subscribe.h
@@ -34,6 +34,7 @@ protected:
virtual void RegisterState() override {
Become(&TDSAccessorNotifier::StateMain);
}
+ virtual void OnSnapshotModified() override;
public:
using TBase::Handle;
@@ -51,7 +52,6 @@ public:
}
- virtual bool Handle(TEvRequestResult<TDialogSelect>::TPtr& ev) override;
void Handle(TEvSubscribe::TPtr& context);
void Handle(TEvUnsubscribe::TPtr& context);
};
@@ -59,20 +59,9 @@ public:
class TExternalData: public TDSAccessorNotifier {
private:
using TBase = TDSAccessorNotifier;
- Ydb::Table::CreateTableRequest ExternalCreateTableRequest;
- TString TableName;
-protected:
- virtual TString GetTableName() const override {
- return TableName;
- }
- virtual Ydb::Table::CreateTableRequest GetTableSchema() const override {
- return ExternalCreateTableRequest;
- }
public:
TExternalData(const TConfig& config, ISnapshotParser::TPtr sParser)
- : TBase(config, sParser)
- , ExternalCreateTableRequest(sParser->GetTableSchema())
- , TableName(ExternalCreateTableRequest.path()) {
+ : TBase(config, sParser) {
}
};
diff --git a/ydb/services/metadata/ds_table/config.cpp b/ydb/services/metadata/ds_table/config.cpp
index b202f01e78d..45003ceca6a 100644
--- a/ydb/services/metadata/ds_table/config.cpp
+++ b/ydb/services/metadata/ds_table/config.cpp
@@ -5,20 +5,11 @@ namespace NKikimr::NMetadataProvider {
bool TConfig::DeserializeFromProto(const NKikimrConfig::TMetadataProviderConfig& config) {
EnabledFlag = config.GetEnabled();
- RefreshPeriod = TDuration::Seconds(config.GetRefreshPeriodSeconds());
- RetryPeriodStart = TDuration::Seconds(config.GetRetryPeriodStartSeconds());
- RetryPeriodFinish = TDuration::Seconds(config.GetRetryPeriodFinishSeconds());
- if (RetryPeriodStart > RetryPeriodFinish) {
- Cerr << "incorrect metadata provider config start/finish periods";
- std::swap(RetryPeriodStart, RetryPeriodFinish);
+ if (!RequestConfig.DeserializeFromProto(config.GetRequestConfig())) {
return false;
}
+ RefreshPeriod = TDuration::Seconds(config.GetRefreshPeriodSeconds());
return true;
}
-TDuration TConfig::GetRetryPeriod(const ui32 retry) const {
- const double kff = 1.0 / Max<double>(1.0, retry);
- return RetryPeriodStart * kff + RetryPeriodFinish * (1 - kff);
-}
-
}
diff --git a/ydb/services/metadata/ds_table/config.h b/ydb/services/metadata/ds_table/config.h
index 47ef42fed9d..372f446771c 100644
--- a/ydb/services/metadata/ds_table/config.h
+++ b/ydb/services/metadata/ds_table/config.h
@@ -1,5 +1,6 @@
#pragma once
#include <ydb/library/accessor/accessor.h>
+#include <ydb/services/metadata/request/config.h>
#include <ydb/core/protos/config.pb.h>
#include <util/datetime/base.h>
@@ -7,9 +8,8 @@ namespace NKikimr::NMetadataProvider {
class TConfig {
private:
+ YDB_READONLY_DEF(NInternal::NRequest::TConfig, RequestConfig);
YDB_READONLY(TDuration, RefreshPeriod, TDuration::Seconds(10));
- TDuration RetryPeriodStart = TDuration::Seconds(3);
- TDuration RetryPeriodFinish = TDuration::Seconds(30);
YDB_READONLY_FLAG(Enabled, true);
public:
TConfig() = default;
diff --git a/ydb/services/metadata/initializer/CMakeLists.txt b/ydb/services/metadata/initializer/CMakeLists.txt
new file mode 100644
index 00000000000..55e553e7830
--- /dev/null
+++ b/ydb/services/metadata/initializer/CMakeLists.txt
@@ -0,0 +1,24 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-metadata-initializer)
+target_link_libraries(services-metadata-initializer PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ core-grpc_services-local_rpc
+ core-grpc_services-base
+ ydb-core-grpc_services
+ services-metadata-request
+)
+target_sources(services-metadata-initializer PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/accessor_init.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/initializer/common.cpp
+)
diff --git a/ydb/services/metadata/initializer/accessor_init.cpp b/ydb/services/metadata/initializer/accessor_init.cpp
new file mode 100644
index 00000000000..99cf9a5a698
--- /dev/null
+++ b/ydb/services/metadata/initializer/accessor_init.cpp
@@ -0,0 +1,32 @@
+#include "accessor_init.h"
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+
+namespace NKikimr::NMetadataProvider {
+
+void TDSAccessorInitialized::Bootstrap() {
+ RegisterState();
+ Modifiers.front()->Execute(SelfId(), Config.GetRequestConfig());
+}
+
+void TDSAccessorInitialized::Handle(NInternal::NRequest::TEvRequestFinished::TPtr& /*ev*/) {
+ if (Modifiers.empty()) {
+ return;
+ }
+ Modifiers.pop_front();
+ if (Modifiers.size()) {
+ Modifiers.front()->Execute(SelfId(), Config.GetRequestConfig());
+ } else {
+ OnInitialized();
+ }
+}
+
+TDSAccessorInitialized::TDSAccessorInitialized(const TConfig& config, const TVector<ITableModifier::TPtr>& modifiers)
+ : Config(config)
+{
+ Y_VERIFY(modifiers.size());
+ for (auto&& i : modifiers) {
+ Modifiers.emplace_back(i);
+ }
+}
+
+}
diff --git a/ydb/services/metadata/ds_table/accessor_init.h b/ydb/services/metadata/initializer/accessor_init.h
index 708ec3a3ddc..d278408ae2e 100644
--- a/ydb/services/metadata/ds_table/accessor_init.h
+++ b/ydb/services/metadata/initializer/accessor_init.h
@@ -1,6 +1,7 @@
#pragma once
-#include "config.h"
-#include "request_actor.h"
+#include "common.h"
+
+#include <ydb/services/metadata/ds_table/config.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/event_local.h>
@@ -9,26 +10,21 @@
namespace NKikimr::NMetadataProvider {
-class TDSAccessorInitialized;
-
class TDSAccessorInitialized: public NActors::TActorBootstrapped<TDSAccessorInitialized> {
private:
- YDB_READONLY_FLAG(Initialized, false);
+ TDeque<ITableModifier::TPtr> Modifiers;
protected:
const TConfig& Config;
- virtual Ydb::Table::CreateTableRequest GetTableSchema() const = 0;
virtual void RegisterState() = 0;
+ virtual void OnInitialized() = 0;
public:
- TDSAccessorInitialized(const TConfig& config)
- : Config(config) {
-
- }
- void Bootstrap(const NActors::TActorContext& /*ctx*/);
- virtual bool Handle(TEvRequestResult<TDialogCreateTable>::TPtr& ev);
+ void Bootstrap();
+ TDSAccessorInitialized(const TConfig& config, const TVector<ITableModifier::TPtr>& modifiers);
+ void Handle(NInternal::NRequest::TEvRequestFinished::TPtr& ev);
STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvRequestResult<TDialogCreateTable>, Handle);
+ hFunc(NInternal::NRequest::TEvRequestFinished, Handle);
default:
break;
}
diff --git a/ydb/services/metadata/ds_table/request_actor.cpp b/ydb/services/metadata/initializer/common.cpp
index f9b541e80c0..fa5484fdbea 100644
--- a/ydb/services/metadata/ds_table/request_actor.cpp
+++ b/ydb/services/metadata/initializer/common.cpp
@@ -1,4 +1,4 @@
-#include "request_actor.h"
+#include "common.h"
namespace NKikimr::NMetadataProvider {
diff --git a/ydb/services/metadata/initializer/common.h b/ydb/services/metadata/initializer/common.h
new file mode 100644
index 00000000000..f9fac852785
--- /dev/null
+++ b/ydb/services/metadata/initializer/common.h
@@ -0,0 +1,34 @@
+#pragma once
+#include <ydb/services/metadata/request/config.h>
+#include <ydb/services/metadata/request/request_actor.h>
+
+namespace NKikimr::NMetadataProvider {
+
+class ITableModifier {
+protected:
+ virtual bool DoExecute(const TActorId& resultCallbackId, const NInternal::NRequest::TConfig& config) const = 0;
+public:
+ using TPtr = std::shared_ptr<ITableModifier>;
+ virtual ~ITableModifier() = default;
+ bool Execute(const TActorId& resultCallbackId, const NInternal::NRequest::TConfig& config) const {
+ return DoExecute(resultCallbackId, config);
+ }
+};
+
+template <class TDialogPolicy>
+class TGenericTableModifier: public ITableModifier {
+private:
+ YDB_READONLY_DEF(typename TDialogPolicy::TRequest, Request);
+protected:
+ virtual bool DoExecute(const TActorId& resultCallbackId, const NInternal::NRequest::TConfig& config) const override {
+ TActivationContext::ActorSystem()->Register(new NInternal::NRequest::TYDBRequest<TDialogPolicy>(Request, resultCallbackId, config));
+ return true;
+ }
+public:
+ TGenericTableModifier(typename TDialogPolicy::TRequest& request)
+ : Request(request) {
+
+ }
+};
+
+}
diff --git a/ydb/services/metadata/request/CMakeLists.txt b/ydb/services/metadata/request/CMakeLists.txt
new file mode 100644
index 00000000000..fe88f4b7c89
--- /dev/null
+++ b/ydb/services/metadata/request/CMakeLists.txt
@@ -0,0 +1,24 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(services-metadata-request)
+target_link_libraries(services-metadata-request PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ ydb-core-base
+ core-grpc_services-local_rpc
+ core-grpc_services-base
+ ydb-core-grpc_services
+)
+target_sources(services-metadata-request PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/request/request_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/request/config.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/request/common.cpp
+)
diff --git a/ydb/services/metadata/request/common.cpp b/ydb/services/metadata/request/common.cpp
new file mode 100644
index 00000000000..4ec9f4dde00
--- /dev/null
+++ b/ydb/services/metadata/request/common.cpp
@@ -0,0 +1,5 @@
+#include "common.h"
+
+namespace NKikimr::NInternal::NRequest {
+
+}
diff --git a/ydb/services/metadata/request/common.h b/ydb/services/metadata/request/common.h
new file mode 100644
index 00000000000..eb7100c9389
--- /dev/null
+++ b/ydb/services/metadata/request/common.h
@@ -0,0 +1,30 @@
+#pragma once
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/actorid.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/core/actor_virtual.h>
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/public/api/protos/ydb_table.pb.h>
+#include <ydb/core/base/events.h>
+
+namespace NKikimr::NInternal::NRequest {
+
+enum EEvents {
+ EvCreateTableRequest = EventSpaceBegin(TKikimrEvents::ES_INTERNAL_REQUEST),
+ EvCreateTableInternalResponse,
+ EvCreateTableResponse,
+ EvSelectRequest,
+ EvSelectInternalResponse,
+ EvSelectResponse,
+ EvCreateSessionRequest,
+ EvCreateSessionInternalResponse,
+ EvCreateSessionResponse,
+ EvRequestFinished,
+ EvEnd
+};
+
+static_assert(EEvents::EvEnd < EventSpaceEnd(TKikimrEvents::ES_INTERNAL_REQUEST), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_INTERNAL_REQUESTS)");
+
+}
diff --git a/ydb/services/metadata/request/config.cpp b/ydb/services/metadata/request/config.cpp
new file mode 100644
index 00000000000..f70fb705bda
--- /dev/null
+++ b/ydb/services/metadata/request/config.cpp
@@ -0,0 +1,21 @@
+#include "config.h"
+#include <util/generic/ylimits.h>
+
+namespace NKikimr::NInternal::NRequest {
+
+bool TConfig::DeserializeFromProto(const NKikimrConfig::TInternalRequestConfig& config) {
+ RetryPeriodStart = TDuration::Seconds(config.GetRetryPeriodStartSeconds());
+ RetryPeriodFinish = TDuration::Seconds(config.GetRetryPeriodFinishSeconds());
+ if (RetryPeriodStart > RetryPeriodFinish) {
+ Cerr << "incorrect metadata provider config start/finish periods";
+ std::swap(RetryPeriodStart, RetryPeriodFinish);
+ }
+ return true;
+}
+
+TDuration TConfig::GetRetryPeriod(const ui32 retry) const {
+ const double kff = 1.0 / Max<double>(1.0, retry);
+ return RetryPeriodStart * kff + RetryPeriodFinish * (1 - kff);
+}
+
+}
diff --git a/ydb/services/metadata/request/config.h b/ydb/services/metadata/request/config.h
new file mode 100644
index 00000000000..a770b8b3379
--- /dev/null
+++ b/ydb/services/metadata/request/config.h
@@ -0,0 +1,18 @@
+#pragma once
+#include <ydb/library/accessor/accessor.h>
+#include <ydb/core/protos/config.pb.h>
+#include <util/datetime/base.h>
+
+namespace NKikimr::NInternal::NRequest {
+
+class TConfig {
+private:
+ TDuration RetryPeriodStart = TDuration::Seconds(3);
+ TDuration RetryPeriodFinish = TDuration::Seconds(30);
+public:
+ TConfig() = default;
+
+ TDuration GetRetryPeriod(const ui32 retry) const;
+ bool DeserializeFromProto(const NKikimrConfig::TInternalRequestConfig& config);
+};
+}
diff --git a/ydb/services/metadata/request/request_actor.cpp b/ydb/services/metadata/request/request_actor.cpp
new file mode 100644
index 00000000000..97ac6877667
--- /dev/null
+++ b/ydb/services/metadata/request/request_actor.cpp
@@ -0,0 +1,5 @@
+#include "request_actor.h"
+
+namespace NKikimr::NMetadataProvider::NRequest {
+
+}
diff --git a/ydb/services/metadata/ds_table/request_actor.h b/ydb/services/metadata/request/request_actor.h
index 5a2f44be52c..a5bf6064abc 100644
--- a/ydb/services/metadata/ds_table/request_actor.h
+++ b/ydb/services/metadata/request/request_actor.h
@@ -1,4 +1,5 @@
#pragma once
+#include "common.h"
#include "config.h"
#include <library/cpp/actors/core/actor_virtual.h>
@@ -8,9 +9,8 @@
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
#include <ydb/library/accessor/accessor.h>
-#include <ydb/services/metadata/abstract/common.h>
-namespace NKikimr::NMetadataProvider {
+namespace NKikimr::NInternal::NRequest {
template <class TRequestExt, class TResponseExt, ui32 EvStartExt, ui32 EvResultInternalExt, ui32 EvResultExt>
class TDialogPolicyImpl {
@@ -23,11 +23,11 @@ public:
};
using TDialogCreateTable = TDialogPolicyImpl<Ydb::Table::CreateTableRequest, Ydb::Table::CreateTableResponse,
- EEvSubscribe::EvCreateTableRequest, EEvSubscribe::EvCreateTableInternalResponse, EEvSubscribe::EvCreateTableResponse>;
+ EEvents::EvCreateTableRequest, EEvents::EvCreateTableInternalResponse, EEvents::EvCreateTableResponse>;
using TDialogSelect = TDialogPolicyImpl<Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse,
- EEvSubscribe::EvSelectRequest, EEvSubscribe::EvSelectInternalResponse, EEvSubscribe::EvSelectResponse>;
+ EEvents::EvSelectRequest, EEvents::EvSelectInternalResponse, EEvents::EvSelectResponse>;
using TDialogCreateSession = TDialogPolicyImpl<Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse,
- EEvSubscribe::EvCreateSessionRequest, EEvSubscribe::EvCreateSessionInternalResponse, EEvSubscribe::EvCreateSessionResponse>;
+ EEvents::EvCreateSessionRequest, EEvents::EvCreateSessionInternalResponse, EEvents::EvCreateSessionResponse>;
template <class TDialogPolicy>
class TEvRequestResult: public NActors::TEventLocal<TEvRequestResult<TDialogPolicy>, TDialogPolicy::EvResult> {
@@ -35,12 +35,16 @@ private:
YDB_READONLY_DEF(typename TDialogPolicy::TResponse, Result);
public:
TEvRequestResult(typename TDialogPolicy::TResponse&& result)
- : Result(std::move(result))
- {
+ : Result(std::move(result)) {
}
};
+class TEvRequestFinished: public NActors::TEventLocal<TEvRequestFinished, EEvents::EvRequestFinished> {
+public:
+ TEvRequestFinished() = default;
+};
+
template <class TResponse>
class TOperatorChecker {
public:
@@ -111,6 +115,7 @@ public:
return;
}
TBase::template Sender<TEvRequestResult<TDialogPolicy>>(std::move(response)).SendTo(ActorFinishId);
+ TBase::template Sender<TEvRequestFinished>().SendTo(ActorFinishId);
TBase::Die(TActivationContext::AsActorContext());
}