aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-12-21 08:56:00 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-12-21 08:56:00 +0300
commitece732c43ec9915c75884b02478cf9b9abd189e9 (patch)
tree09013f2800e7ac740d2bcbc6b647a091abccc547
parentfd79d5c797534fcce5d66ff6b53283b64f995e7e (diff)
downloadydb-ece732c43ec9915c75884b02478cf9b9abd189e9.tar.gz
initialize snapshot for migrations on start withno constructor
-rw-r--r--ydb/core/testlib/common_helper.cpp42
-rw-r--r--ydb/core/testlib/common_helper.h2
-rw-r--r--ydb/core/tx/tiering/external_data.cpp2
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp19
-rw-r--r--ydb/services/metadata/abstract/common.h6
-rw-r--r--ydb/services/metadata/abstract/fetcher.h6
-rw-r--r--ydb/services/metadata/ds_table/CMakeLists.darwin.txt3
-rw-r--r--ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt3
-rw-r--r--ydb/services/metadata/ds_table/CMakeLists.linux.txt3
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.cpp70
-rw-r--r--ydb/services/metadata/ds_table/accessor_refresh.h107
-rw-r--r--ydb/services/metadata/ds_table/accessor_snapshot_base.cpp69
-rw-r--r--ydb/services/metadata/ds_table/accessor_snapshot_base.h104
-rw-r--r--ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp48
-rw-r--r--ydb/services/metadata/ds_table/accessor_snapshot_simple.h112
-rw-r--r--ydb/services/metadata/ds_table/accessor_subscribe.cpp5
-rw-r--r--ydb/services/metadata/ds_table/accessor_subscribe.h6
-rw-r--r--ydb/services/metadata/ds_table/scheme_describe.cpp10
-rw-r--r--ydb/services/metadata/ds_table/service.cpp78
-rw-r--r--ydb/services/metadata/ds_table/service.h47
-rw-r--r--ydb/services/metadata/ds_table/table_exists.cpp60
-rw-r--r--ydb/services/metadata/ds_table/table_exists.h86
-rw-r--r--ydb/services/metadata/initializer/behaviour.cpp8
-rw-r--r--ydb/services/metadata/initializer/ut/ut_init.cpp5
-rw-r--r--ydb/services/metadata/manager/abstract.h4
-rw-r--r--ydb/services/metadata/manager/alter_impl.h2
-rw-r--r--ydb/services/metadata/manager/modification.h4
-rw-r--r--ydb/services/metadata/manager/restore.h2
-rw-r--r--ydb/services/metadata/request/request_actor.h105
-rw-r--r--ydb/services/metadata/secret/ut/ut_secret.cpp5
30 files changed, 768 insertions, 255 deletions
diff --git a/ydb/core/testlib/common_helper.cpp b/ydb/core/testlib/common_helper.cpp
index 5c780b0ba7..182e02b163 100644
--- a/ydb/core/testlib/common_helper.cpp
+++ b/ydb/core/testlib/common_helper.cpp
@@ -20,45 +20,59 @@ void THelper::WaitForSchemeOperation(TActorId sender, ui64 txId) {
}
void THelper::StartDataRequest(const TString& request, const bool expectSuccess) const {
- NYdb::NTable::TTableClient tClient(Server.GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false));
+ NYdb::NTable::TTableClient tClient(Server.GetDriver(),
+ NYdb::NTable::TClientSettings().UseQueryCache(false).AuthToken("root@builtin"));
auto expectation = expectSuccess;
- tClient.CreateSession().Subscribe([request, expectation](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ bool resultReady = false;
+ bool* rrPtr = &resultReady;
+ tClient.CreateSession().Subscribe([rrPtr, request, expectation](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
auto session = f.GetValueSync().GetSession();
session.ExecuteDataQuery(request
, NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx())
- .Subscribe([expectation](NYdb::NTable::TAsyncDataQueryResult f)
+ .Subscribe([rrPtr, expectation, request](NYdb::NTable::TAsyncDataQueryResult f)
{
TStringStream ss;
f.GetValueSync().GetIssues().PrintTo(ss, false);
- Cerr << ss.Str() << Endl;
+ Cerr << "REQUEST=" << request << ";RESULT=" << ss.Str() << ";EXPECTATION=" << expectation << Endl;
Y_VERIFY(expectation == f.GetValueSync().IsSuccess());
+ *rrPtr = true;
});
});
+ const TInstant start = TInstant::Now();
+ while (!resultReady && start + TDuration::Seconds(200) > TInstant::Now()) {
+ Server.GetRuntime()->SimulateSleep(TDuration::Seconds(1));
+ }
+ Cerr << "REQUEST=" << request << ";EXPECTATION=" << expectation << Endl;
+ Y_VERIFY(resultReady);
}
-void THelper::StartSchemaRequest(const TString& request, const bool expectSuccess) const {
+void THelper::StartSchemaRequest(const TString& request, const bool expectSuccess, const bool waiting) const {
NYdb::NTable::TTableClient tClient(Server.GetDriver(),
NYdb::NTable::TClientSettings().UseQueryCache(false).AuthToken("root@builtin"));
auto expectation = expectSuccess;
- bool resultReady = false;
- bool* rrPtr = &resultReady;
- tClient.CreateSession().Subscribe([rrPtr, request, expectation](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
+ std::shared_ptr<bool> rrPtr = std::make_shared<bool>(false);
+ TString requestInt = request;
+ tClient.CreateSession().Subscribe([rrPtr, requestInt, expectation](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) {
auto session = f.GetValueSync().GetSession();
- session.ExecuteSchemeQuery(request).Subscribe([rrPtr, expectation](NYdb::TAsyncStatus f)
+ session.ExecuteSchemeQuery(requestInt).Subscribe([rrPtr, expectation, requestInt](NYdb::TAsyncStatus f)
{
TStringStream ss;
f.GetValueSync().GetIssues().PrintTo(ss, false);
- Cerr << ss.Str() << Endl;
+ Cerr << "REQUEST=" << requestInt << ";RESULT=" << ss.Str() << ";EXPECTATION=" << expectation << Endl;
Y_VERIFY(expectation == f.GetValueSync().IsSuccess(), "%d", expectation ? 1 : 0);
*rrPtr = true;
});
});
- const TInstant start = TInstant::Now();
- while (!resultReady && start + TDuration::Seconds(20) > TInstant::Now()) {
- Server.GetRuntime()->SimulateSleep(TDuration::Seconds(1));
+ Cerr << "REQUEST=" << request << ";EXPECTATION=" << expectation << ";WAITING=" << waiting << Endl;
+ if (waiting) {
+ const TInstant start = TInstant::Now();
+ while (!*rrPtr && start + TDuration::Seconds(20) > TInstant::Now()) {
+ Server.GetRuntime()->SimulateSleep(TDuration::Seconds(1));
+ }
+ Y_VERIFY(*rrPtr);
+ Cerr << "FINISHED_REQUEST=" << request << ";EXPECTATION=" << expectation << ";WAITING=" << waiting << Endl;
}
- Y_VERIFY(resultReady);
}
void THelper::DropTable(const TString& tablePath) {
diff --git a/ydb/core/testlib/common_helper.h b/ydb/core/testlib/common_helper.h
index 4218e1f447..d25393972d 100644
--- a/ydb/core/testlib/common_helper.h
+++ b/ydb/core/testlib/common_helper.h
@@ -17,6 +17,6 @@ public:
void DropTable(const TString& tablePath);
void StartDataRequest(const TString& request, const bool expectSuccess = true) const;
- void StartSchemaRequest(const TString& request, const bool expectSuccess = true) const;
+ void StartSchemaRequest(const TString& request, const bool expectSuccess = true, const bool waiting = true) const;
};
}
diff --git a/ydb/core/tx/tiering/external_data.cpp b/ydb/core/tx/tiering/external_data.cpp
index 446b880c64..b6616bc760 100644
--- a/ydb/core/tx/tiering/external_data.cpp
+++ b/ydb/core/tx/tiering/external_data.cpp
@@ -12,7 +12,7 @@
namespace NKikimr::NColumnShard::NTiers {
void TSnapshotConstructor::EnrichSnapshotData(ISnapshot::TPtr original, NMetadata::NFetcher::ISnapshotAcceptorController::TPtr controller) const {
- controller->Enriched(original);
+ controller->OnSnapshotEnriched(original);
}
TSnapshotConstructor::TSnapshotConstructor() {
diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index 4a324e66f5..02d6fce31e 100644
--- a/ydb/core/tx/tiering/ut/ut_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -291,11 +291,12 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
Y_VERIFY(emulator->IsFound());
}
{
- lHelper.StartSchemaRequest("ALTER OBJECT tier1 (TYPE TIER) SET tierConfig = `" + ConfigProtoStr1 + "`");
-
emulator->ResetConditions();
emulator->SetExpectedTiersCount(2);
emulator->MutableCheckers().emplace("TIER.tier1", TJsonChecker("Name", "abc1"));
+
+ lHelper.StartSchemaRequest("ALTER OBJECT tier1 (TYPE TIER) SET tierConfig = `" + ConfigProtoStr1 + "`");
+
{
const TInstant start = Now();
while (!emulator->IsFound() && Now() - start < TDuration::Seconds(2000)) {
@@ -312,6 +313,9 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
patch.SetColumn("tierName", NMetadata::NInternal::TYDBValue::Bytes("tier1"));
patches.emplace_back(std::move(patch));
}
+ emulator->ResetConditions();
+ emulator->SetExpectedTieringsCount(0);
+ emulator->SetExpectedTiersCount(0);
lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIER)", false);
lHelper.StartSchemaRequest("DROP OBJECT tiering1(TYPE TIERING_RULE)", false);
@@ -320,9 +324,6 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIER)");
lHelper.StartSchemaRequest("DROP OBJECT tier2(TYPE TIER)");
- emulator->ResetConditions();
- emulator->SetExpectedTieringsCount(0);
- emulator->SetExpectedTiersCount(0);
{
const TInstant start = Now();
while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) {
@@ -365,7 +366,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
runtime.SimulateSleep(TDuration::Seconds(10));
Cerr << "Initialization finished" << Endl;
- lHelper.StartSchemaRequest("CREATE OBJECT tier1 (TYPE TIER) WITH tierConfig = `" + ConfigProtoStr1 + "`");
+ lHelper.StartSchemaRequest("CREATE OBJECT tier1 (TYPE TIER) WITH tierConfig = `" + ConfigProtoStr1 + "`", true, false);
{
TTestCSEmulator emulator;
emulator.MutableCheckers().emplace("TIER.tier1", TJsonChecker("Name", "abc1"));
@@ -378,7 +379,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
lHelper.StartSchemaRequest("CREATE OBJECT tiering1 (TYPE TIERING_RULE) "
"WITH (defaultColumn = timestamp, description = `" + ConfigTiering1Str + "`)");
lHelper.StartSchemaRequest("CREATE OBJECT tiering2 (TYPE TIERING_RULE) "
- "WITH (defaultColumn = timestamp, description = `" + ConfigTiering2Str + "` )");
+ "WITH (defaultColumn = timestamp, description = `" + ConfigTiering2Str + "` )", true, false);
{
TTestCSEmulator emulator;
emulator.MutableCheckers().emplace("TIER.tier1", TJsonChecker("Name", "abc1"));
@@ -393,7 +394,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
lHelper.StartSchemaRequest("DROP OBJECT tiering2 (TYPE TIERING_RULE)");
lHelper.StartSchemaRequest("DROP OBJECT tiering1 (TYPE TIERING_RULE)", false);
lHelper.StartSchemaRequest("DROP TABLE `/Root/olapStore/olapTable`");
- lHelper.StartSchemaRequest("DROP OBJECT tiering1 (TYPE TIERING_RULE)");
+ lHelper.StartSchemaRequest("DROP OBJECT tiering1 (TYPE TIERING_RULE)", true, false);
{
TTestCSEmulator emulator;
emulator.SetExpectedTieringsCount(0);
@@ -401,7 +402,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
emulator.CheckRuntime(runtime);
}
lHelper.StartSchemaRequest("DROP OBJECT tier2 (TYPE TIER)");
- lHelper.StartSchemaRequest("DROP OBJECT tier1 (TYPE TIER)");
+ lHelper.StartSchemaRequest("DROP OBJECT tier1 (TYPE TIER)", true, false);
{
TTestCSEmulator emulator;
emulator.SetExpectedTieringsCount(0);
diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h
index 864f031295..90998755b0 100644
--- a/ydb/services/metadata/abstract/common.h
+++ b/ydb/services/metadata/abstract/common.h
@@ -31,6 +31,12 @@ enum EEvents {
EvTimeout,
EvTableDescriptionFailed,
EvTableDescriptionSuccess,
+ EvAccessorSimpleResult,
+ EvAccessorSimpleError,
+ EvAccessorSimpleTableAbsent,
+ EvPathExistsCheckFailed,
+ EvPathExistsCheckResult,
+ EvStartMetadataService,
EvEnd
};
diff --git a/ydb/services/metadata/abstract/fetcher.h b/ydb/services/metadata/abstract/fetcher.h
index 48ebc5918c..dca8a0ab0f 100644
--- a/ydb/services/metadata/abstract/fetcher.h
+++ b/ydb/services/metadata/abstract/fetcher.h
@@ -23,8 +23,8 @@ class ISnapshotAcceptorController {
public:
using TPtr = std::shared_ptr<ISnapshotAcceptorController>;
virtual ~ISnapshotAcceptorController() = default;
- virtual void Enriched(std::shared_ptr<ISnapshot> enrichedSnapshot) = 0;
- virtual void EnrichProblem(const TString& errorMessage) = 0;
+ virtual void OnSnapshotEnriched(std::shared_ptr<ISnapshot> enrichedSnapshot) = 0;
+ virtual void OnSnapshotEnrichError(const TString& errorMessage) = 0;
};
class ISnapshot {
@@ -64,7 +64,7 @@ public:
ISnapshot::TPtr ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const;
virtual void EnrichSnapshotData(ISnapshot::TPtr original, ISnapshotAcceptorController::TPtr controller) const {
- controller->Enriched(original);
+ controller->OnSnapshotEnriched(original);
}
const std::vector<IClassBehaviour::TPtr>& GetManagers() const {
diff --git a/ydb/services/metadata/ds_table/CMakeLists.darwin.txt b/ydb/services/metadata/ds_table/CMakeLists.darwin.txt
index fbb641ff7b..34212fd99c 100644
--- a/ydb/services/metadata/ds_table/CMakeLists.darwin.txt
+++ b/ydb/services/metadata/ds_table/CMakeLists.darwin.txt
@@ -23,7 +23,10 @@ target_link_libraries(services-metadata-ds_table PUBLIC
target_sources(services-metadata-ds_table PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_refresh.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/scheme_describe.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/table_exists.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp
)
diff --git a/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt b/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt
index cbd8ba545c..409457cc70 100644
--- a/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt
+++ b/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt
@@ -24,7 +24,10 @@ target_link_libraries(services-metadata-ds_table PUBLIC
target_sources(services-metadata-ds_table PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_refresh.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/scheme_describe.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/table_exists.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp
)
diff --git a/ydb/services/metadata/ds_table/CMakeLists.linux.txt b/ydb/services/metadata/ds_table/CMakeLists.linux.txt
index cbd8ba545c..409457cc70 100644
--- a/ydb/services/metadata/ds_table/CMakeLists.linux.txt
+++ b/ydb/services/metadata/ds_table/CMakeLists.linux.txt
@@ -24,7 +24,10 @@ target_link_libraries(services-metadata-ds_table PUBLIC
target_sources(services-metadata-ds_table PRIVATE
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_refresh.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/scheme_describe.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/table_exists.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp
${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp
)
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp
index f3557615b8..bd9417afb4 100644
--- a/ydb/services/metadata/ds_table/accessor_refresh.cpp
+++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp
@@ -1,71 +1,45 @@
#include "accessor_refresh.h"
-#include <ydb/core/grpc_services/base/base.h>
-#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
-#include <library/cpp/actors/core/log.h>
+namespace NKikimr::NMetadata::NProvider {
-#include <util/string/escape.h>
+void TDSAccessorRefresher::OnBootstrap() {
+ TBase::OnBootstrap();
+ Become(&TDSAccessorRefresher::StateMain);
+ Sender<TEvRefresh>().SendTo(SelfId());
+}
-namespace NKikimr::NMetadata::NProvider {
+void TDSAccessorRefresher::OnNewEnrichedSnapshot(NFetcher::ISnapshot::TPtr snapshot) {
+ Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
+ CurrentSnapshot = snapshot;
+ *CurrentSelection.mutable_result_sets() = std::move(*ProposedProto.mutable_result_sets());
+ OnSnapshotModified();
+ OnSnapshotRefresh();
+}
-void TDSAccessorRefresher::Handle(TEvYQLResponse::TPtr& ev) {
- auto& currentFullReply = ev->Get()->GetResponse();
- Ydb::Table::ExecuteQueryResult qResult;
- currentFullReply.operation().result().UnpackTo(&qResult);
- Y_VERIFY((size_t)qResult.result_sets().size() == SnapshotConstructor->GetManagers().size());
+void TDSAccessorRefresher::OnNewParsedSnapshot(Ydb::Table::ExecuteQueryResult&& qResult, NFetcher::ISnapshot::TPtr snapshot) {
*ProposedProto.mutable_result_sets() = std::move(*qResult.mutable_result_sets());
- auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(ProposedProto, RequestedActuality);
- if (!parsedSnapshot) {
- ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot parse current snapshot";
- }
-
- if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != ProposedProto.SerializeAsString()) {
+ if (CurrentSelection.SerializeAsString() != ProposedProto.SerializeAsString()) {
ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << ProposedProto.DebugString();
- SnapshotConstructor->EnrichSnapshotData(parsedSnapshot, InternalController);
+ SnapshotConstructor->EnrichSnapshotData(snapshot, InternalController);
} else {
- CurrentSnapshot->SetActuality(RequestedActuality);
+ CurrentSnapshot->SetActuality(GetRequestedActuality());
OnSnapshotRefresh();
Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
}
}
-void TDSAccessorRefresher::Handle(TEvEnrichSnapshotResult::TPtr& ev) {
- RequestedActuality = TInstant::Zero();
+void TDSAccessorRefresher::OnIncorrectSnapshotFromYQL(const TString& errorMessage) {
+ TBase::OnIncorrectSnapshotFromYQL(errorMessage);
Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
- CurrentSnapshot = ev->Get()->GetEnrichedSnapshot();
- *CurrentSelection.mutable_result_sets() = std::move(*ProposedProto.mutable_result_sets());
- OnSnapshotModified();
- OnSnapshotRefresh();
}
-void TDSAccessorRefresher::Handle(TEvEnrichSnapshotProblem::TPtr& ev) {
- RequestedActuality = TInstant::Zero();
+void TDSAccessorRefresher::OnSnapshotEnrichingError(const TString& errorMessage) {
+ TBase::OnSnapshotEnrichingError(errorMessage);
Schedule(Config.GetRefreshPeriod(), new TEvRefresh());
- ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "enrich problem: " << ev->Get()->GetErrorText();
}
void TDSAccessorRefresher::Handle(TEvRefresh::TPtr& /*ev*/) {
- TStringBuilder sb;
- RequestedActuality = TInstant::Now();
- auto& managers = SnapshotConstructor->GetManagers();
- Y_VERIFY(managers.size());
- for (auto&& i : managers) {
- sb << "SELECT * FROM `" + EscapeC(i->GetStorageTablePath()) + "`;";
- }
- Register(new NRequest::TYQLQuerySessionedActor(sb, NACLib::TSystemUsers::Metadata(), Config.GetRequestConfig(), InternalController));
-}
-
-TDSAccessorRefresher::TDSAccessorRefresher(const TConfig& config, NFetcher::ISnapshotsFetcher::TPtr snapshotConstructor)
- : SnapshotConstructor(snapshotConstructor)
- , Config(config)
-{
-
-}
-
-void TDSAccessorRefresher::Bootstrap() {
- RegisterState();
- InternalController = std::make_shared<TRefreshInternalController>(SelfId());
- Sender<TEvRefresh>().SendTo(SelfId());
+ TBase::StartSnapshotsFetching();
}
}
diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h
index da76f6dd81..f7600af28a 100644
--- a/ydb/services/metadata/ds_table/accessor_refresh.h
+++ b/ydb/services/metadata/ds_table/accessor_refresh.h
@@ -1,110 +1,47 @@
#pragma once
-#include <ydb/public/api/protos/ydb_value.pb.h>
-#include <ydb/services/metadata/abstract/common.h>
-#include <ydb/services/metadata/initializer/accessor_init.h>
-#include <ydb/services/metadata/request/request_actor.h>
-#include <library/cpp/actors/core/hfunc.h>
+#include "accessor_snapshot_base.h"
namespace NKikimr::NMetadata::NProvider {
-class TDSAccessorRefresher;
-
-class TEvRefresh: public NActors::TEventLocal<TEvRefresh, EEvents::EvRefresh> {
-public:
-};
-
-class TEvYQLResponse: public NActors::TEventLocal<TEvYQLResponse, EEvents::EvYQLResponse> {
-private:
- YDB_READONLY_DEF(NRequest::TDialogYQLRequest::TResponse, Response);
-public:
- TEvYQLResponse(const NRequest::TDialogYQLRequest::TResponse& r)
- : Response(r)
- {
-
- }
-};
-
-class TEvEnrichSnapshotResult: public NActors::TEventLocal<TEvEnrichSnapshotResult, EEvents::EvEnrichSnapshotResult> {
-private:
- YDB_READONLY_DEF(NFetcher::ISnapshot::TPtr, EnrichedSnapshot);
-public:
- TEvEnrichSnapshotResult(NFetcher::ISnapshot::TPtr snapshot)
- : EnrichedSnapshot(snapshot) {
-
- }
-};
-
-class TEvEnrichSnapshotProblem: public NActors::TEventLocal<TEvEnrichSnapshotProblem, EEvents::EvEnrichSnapshotProblem> {
-private:
- YDB_READONLY_DEF(TString, ErrorText);
-public:
- TEvEnrichSnapshotProblem(const TString& errorText)
- : ErrorText(errorText) {
-
- }
-};
-
-class TRefreshInternalController: public NFetcher::ISnapshotAcceptorController, public NRequest::IQueryOutput {
+class TDSAccessorRefresher: public TDSAccessorBase {
private:
- const TActorIdentity ActorId;
-public:
- TRefreshInternalController(const TActorIdentity& actorId)
- : ActorId(actorId) {
+ using TBase = TDSAccessorBase;
+ YDB_READONLY_DEF(NFetcher::ISnapshot::TPtr, CurrentSnapshot);
+ YDB_READONLY_DEF(Ydb::Table::ExecuteQueryResult, CurrentSelection);
+ Ydb::Table::ExecuteQueryResult ProposedProto;
+ const TConfig Config;
- }
+ void Handle(TEvRefresh::TPtr& ev);
- virtual void EnrichProblem(const TString& errorMessage) override {
- ActorId.Send(ActorId, new TEvEnrichSnapshotProblem(errorMessage));
- }
+ virtual void OnNewEnrichedSnapshot(NFetcher::ISnapshot::TPtr snapshot) override final;
- virtual void Enriched(NFetcher::ISnapshot::TPtr enrichedSnapshot) override {
- ActorId.Send(ActorId, new TEvEnrichSnapshotResult(enrichedSnapshot));
- }
+ virtual void OnNewParsedSnapshot(Ydb::Table::ExecuteQueryResult&& qResult, NFetcher::ISnapshot::TPtr snapshot) override final;
- virtual void OnReply(const NRequest::TDialogYQLRequest::TResponse& response) override {
- ActorId.Send(ActorId, new TEvYQLResponse(response));
- }
-};
+ virtual void OnIncorrectSnapshotFromYQL(const TString& errorMessage) override final;
-class TDSAccessorRefresher: public NActors::TActorBootstrapped<TDSAccessorRefresher> {
-private:
- using TBase = NActors::TActorBootstrapped<TDSAccessorRefresher>;
- NFetcher::ISnapshotsFetcher::TPtr SnapshotConstructor;
- std::shared_ptr<TRefreshInternalController> InternalController;
- YDB_READONLY_DEF(NFetcher::ISnapshot::TPtr, CurrentSnapshot);
- YDB_READONLY_DEF(Ydb::Table::ExecuteQueryResult, CurrentSelection);
- Ydb::Table::ExecuteQueryResult ProposedProto;
- TInstant RequestedActuality = TInstant::Zero();
- const TConfig Config;
+ virtual void OnSnapshotEnrichingError(const TString& errorMessage) override final;
protected:
- virtual void RegisterState() {
- Become(&TDSAccessorRefresher::StateMain);
- }
+ virtual void OnBootstrap() override;
+ virtual void OnSnapshotModified() = 0;
+ virtual void OnSnapshotRefresh() = 0;
bool IsReady() const {
return !!CurrentSnapshot;
}
- virtual void OnSnapshotModified() = 0;
- virtual void OnSnapshotRefresh() = 0;
public:
- void Bootstrap();
-
- STATEFN(StateMain) {
+ STFUNC(StateMain) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvYQLResponse, Handle);
hFunc(TEvRefresh, Handle);
- hFunc(TEvEnrichSnapshotResult, Handle);
- hFunc(TEvEnrichSnapshotProblem, Handle);
default:
- break;
+ TBase::StateMain(ev, ctx);
}
}
+ TDSAccessorRefresher(const TConfig& config, NFetcher::ISnapshotsFetcher::TPtr snapshotConstructor)
+ : TBase(config.GetRequestConfig(), snapshotConstructor)
+ , Config(config)
+ {
- TDSAccessorRefresher(const TConfig& config, NFetcher::ISnapshotsFetcher::TPtr snapshotConstructor);
+ }
- void Handle(TEvEnrichSnapshotResult::TPtr& ev);
- void Handle(TEvEnrichSnapshotProblem::TPtr& ev);
- void Handle(TEvYQLResponse::TPtr& ev);
- void Handle(TEvRefresh::TPtr& ev);
};
}
diff --git a/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp b/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp
new file mode 100644
index 0000000000..ce516994d4
--- /dev/null
+++ b/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp
@@ -0,0 +1,69 @@
+#include "accessor_snapshot_base.h"
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+
+#include <library/cpp/actors/core/log.h>
+
+#include <util/string/escape.h>
+
+namespace NKikimr::NMetadata::NProvider {
+
+void TDSAccessorBase::OnNewParsedSnapshot(Ydb::Table::ExecuteQueryResult&& /*qResult*/, NFetcher::ISnapshot::TPtr snapshot) {
+ SnapshotConstructor->EnrichSnapshotData(snapshot, InternalController);
+}
+
+void TDSAccessorBase::OnIncorrectSnapshotFromYQL(const TString& errorMessage) {
+ ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot parse current snapshot: " << errorMessage;
+}
+
+void TDSAccessorBase::Handle(TEvYQLResponse::TPtr& ev) {
+ auto& currentFullReply = ev->Get()->GetResponse();
+ Ydb::Table::ExecuteQueryResult qResult;
+ currentFullReply.operation().result().UnpackTo(&qResult);
+ Y_VERIFY((size_t)qResult.result_sets().size() == SnapshotConstructor->GetManagers().size());
+ auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(qResult, RequestedActuality);
+ if (!parsedSnapshot) {
+ OnIncorrectSnapshotFromYQL("snapshot is null after parsing");
+ } else {
+ OnNewParsedSnapshot(std::move(qResult), parsedSnapshot);
+ }
+}
+
+void TDSAccessorBase::Handle(TEvEnrichSnapshotResult::TPtr& ev) {
+ RequestedActuality = TInstant::Zero();
+ OnNewEnrichedSnapshot(ev->Get()->GetEnrichedSnapshot());
+}
+
+void TDSAccessorBase::OnSnapshotEnrichingError(const TString& errorMessage) {
+ ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot enrich current snapshot: " << errorMessage;
+}
+
+void TDSAccessorBase::Handle(TEvEnrichSnapshotProblem::TPtr& ev) {
+ RequestedActuality = TInstant::Zero();
+ OnSnapshotEnrichingError(ev->Get()->GetErrorText());
+}
+
+void TDSAccessorBase::StartSnapshotsFetching() {
+ TStringBuilder sb;
+ RequestedActuality = TInstant::Now();
+ auto& managers = SnapshotConstructor->GetManagers();
+ Y_VERIFY(managers.size());
+ for (auto&& i : managers) {
+ sb << "SELECT * FROM `" + EscapeC(i->GetStorageTablePath()) + "`;";
+ }
+ Register(new NRequest::TYQLQuerySessionedActor(sb, NACLib::TSystemUsers::Metadata(), Config, InternalController));
+}
+
+TDSAccessorBase::TDSAccessorBase(const NRequest::TConfig& config, NFetcher::ISnapshotsFetcher::TPtr snapshotConstructor)
+ : Config(config)
+ , SnapshotConstructor(snapshotConstructor)
+{
+
+}
+
+void TDSAccessorBase::Bootstrap() {
+ InternalController = std::make_shared<TRefreshInternalController>(SelfId());
+ OnBootstrap();
+}
+
+}
diff --git a/ydb/services/metadata/ds_table/accessor_snapshot_base.h b/ydb/services/metadata/ds_table/accessor_snapshot_base.h
new file mode 100644
index 0000000000..d74c1ddfe2
--- /dev/null
+++ b/ydb/services/metadata/ds_table/accessor_snapshot_base.h
@@ -0,0 +1,104 @@
+#pragma once
+#include <ydb/public/api/protos/ydb_value.pb.h>
+#include <ydb/services/metadata/abstract/common.h>
+#include <ydb/services/metadata/initializer/accessor_init.h>
+#include <ydb/services/metadata/request/request_actor.h>
+#include <library/cpp/actors/core/hfunc.h>
+
+namespace NKikimr::NMetadata::NProvider {
+
+class TEvRefresh: public NActors::TEventLocal<TEvRefresh, EEvents::EvRefresh> {
+public:
+};
+
+class TEvYQLResponse: public NActors::TEventLocal<TEvYQLResponse, EEvents::EvYQLResponse> {
+private:
+ YDB_READONLY_DEF(NRequest::TDialogYQLRequest::TResponse, Response);
+public:
+ TEvYQLResponse(const NRequest::TDialogYQLRequest::TResponse& r)
+ : Response(r)
+ {
+
+ }
+};
+
+class TEvEnrichSnapshotResult: public NActors::TEventLocal<TEvEnrichSnapshotResult, EEvents::EvEnrichSnapshotResult> {
+private:
+ YDB_READONLY_DEF(NFetcher::ISnapshot::TPtr, EnrichedSnapshot);
+public:
+ TEvEnrichSnapshotResult(NFetcher::ISnapshot::TPtr snapshot)
+ : EnrichedSnapshot(snapshot) {
+
+ }
+};
+
+class TEvEnrichSnapshotProblem: public NActors::TEventLocal<TEvEnrichSnapshotProblem, EEvents::EvEnrichSnapshotProblem> {
+private:
+ YDB_READONLY_DEF(TString, ErrorText);
+public:
+ TEvEnrichSnapshotProblem(const TString& errorText)
+ : ErrorText(errorText) {
+
+ }
+};
+
+class TRefreshInternalController: public NFetcher::ISnapshotAcceptorController, public NRequest::IQueryOutput {
+private:
+ const TActorIdentity ActorId;
+public:
+ TRefreshInternalController(const TActorIdentity& actorId)
+ : ActorId(actorId) {
+
+ }
+
+ virtual void OnSnapshotEnrichError(const TString& errorMessage) override {
+ ActorId.Send(ActorId, new TEvEnrichSnapshotProblem(errorMessage));
+ }
+
+ virtual void OnSnapshotEnriched(NFetcher::ISnapshot::TPtr enrichedSnapshot) override {
+ ActorId.Send(ActorId, new TEvEnrichSnapshotResult(enrichedSnapshot));
+ }
+
+ virtual void OnYQLQueryReply(const NRequest::TDialogYQLRequest::TResponse& response) override {
+ ActorId.Send(ActorId, new TEvYQLResponse(response));
+ }
+};
+
+class TDSAccessorBase: public NActors::TActorBootstrapped<TDSAccessorBase> {
+private:
+ using TBase = NActors::TActorBootstrapped<TDSAccessorBase>;
+ YDB_READONLY(TInstant, RequestedActuality, TInstant::Zero());
+ const NRequest::TConfig Config;
+protected:
+ std::shared_ptr<TRefreshInternalController> InternalController;
+ NFetcher::ISnapshotsFetcher::TPtr SnapshotConstructor;
+
+ virtual void OnBootstrap() {
+ Become(&TDSAccessorBase::StateMain);
+ }
+ virtual void OnNewEnrichedSnapshot(NFetcher::ISnapshot::TPtr snapshot) = 0;
+ virtual void OnNewParsedSnapshot(Ydb::Table::ExecuteQueryResult&& qResult, NFetcher::ISnapshot::TPtr snapshot);
+ virtual void OnIncorrectSnapshotFromYQL(const TString& errorMessage);
+ virtual void OnSnapshotEnrichingError(const TString& errorMessage);
+ void StartSnapshotsFetching();
+
+ void Handle(TEvEnrichSnapshotResult::TPtr& ev);
+ void Handle(TEvEnrichSnapshotProblem::TPtr& ev);
+ void Handle(TEvYQLResponse::TPtr& ev);
+public:
+ void Bootstrap();
+
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvYQLResponse, Handle);
+ hFunc(TEvEnrichSnapshotResult, Handle);
+ hFunc(TEvEnrichSnapshotProblem, Handle);
+ default:
+ break;
+ }
+ }
+
+ TDSAccessorBase(const NRequest::TConfig& config, NFetcher::ISnapshotsFetcher::TPtr snapshotConstructor);
+};
+
+}
diff --git a/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp b/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp
new file mode 100644
index 0000000000..5a9e424909
--- /dev/null
+++ b/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp
@@ -0,0 +1,48 @@
+#include "accessor_snapshot_simple.h"
+
+namespace NKikimr::NMetadata::NProvider {
+
+void TDSAccessorSimple::OnNewEnrichedSnapshot(NFetcher::ISnapshot::TPtr snapshot) {
+ auto g = PassAwayGuard();
+ OutputController->OnSnapshotConstructionResult(snapshot);
+}
+
+void TDSAccessorSimple::OnIncorrectSnapshotFromYQL(const TString& errorMessage) {
+ TBase::OnIncorrectSnapshotFromYQL(errorMessage);
+ auto g = PassAwayGuard();
+ OutputController->OnSnapshotConstructionError(errorMessage);
+}
+
+void TDSAccessorSimple::OnSnapshotEnrichingError(const TString& errorMessage) {
+ TBase::OnSnapshotEnrichingError(errorMessage);
+ auto g = PassAwayGuard();
+ OutputController->OnSnapshotConstructionError(errorMessage);
+}
+
+void TDSAccessorSimple::OnBootstrap() {
+ Become(&TDSAccessorSimple::StateMain);
+ InputController = std::make_shared<TInputController>(SelfId());
+ for (auto&& i : SnapshotConstructor->GetManagers()) {
+ PathesInCheck.emplace(i->GetStorageTablePath());
+ Register(new TTableExistsActor(InputController, i->GetStorageTablePath(), TDuration::Seconds(2)));
+ }
+}
+
+void TDSAccessorSimple::Handle(TTableExistsActor::TEvController::TEvError::TPtr& ev) {
+ auto g = PassAwayGuard();
+ OutputController->OnSnapshotConstructionError(ev->Get()->GetErrorMessage());
+}
+
+void TDSAccessorSimple::Handle(TTableExistsActor::TEvController::TEvResult::TPtr& ev) {
+ if (!ev->Get()->IsTableExists()) {
+ OutputController->OnSnapshotConstructionTableAbsent(ev->Get()->GetTablePath());
+ PassAway();
+ return;
+ }
+ Y_VERIFY(PathesInCheck.erase(ev->Get()->GetTablePath()) == 1);
+ if (PathesInCheck.empty()) {
+ TBase::StartSnapshotsFetching();
+ }
+}
+
+}
diff --git a/ydb/services/metadata/ds_table/accessor_snapshot_simple.h b/ydb/services/metadata/ds_table/accessor_snapshot_simple.h
new file mode 100644
index 0000000000..b97b01bdb3
--- /dev/null
+++ b/ydb/services/metadata/ds_table/accessor_snapshot_simple.h
@@ -0,0 +1,112 @@
+#pragma once
+#include "accessor_snapshot_base.h"
+#include "table_exists.h"
+
+namespace NKikimr::NMetadata::NProvider {
+
+class ISnapshotConstructorController {
+public:
+ using TPtr = std::shared_ptr<ISnapshotConstructorController>;
+ virtual ~ISnapshotConstructorController() = default;
+ virtual void OnSnapshotConstructionResult(NFetcher::ISnapshot::TPtr snapshot) const = 0;
+ virtual void OnSnapshotConstructionError(const TString& errorMessage) const = 0;
+ virtual void OnSnapshotConstructionTableAbsent(const TString& path) const = 0;
+};
+
+class TDSAccessorSimple: public TDSAccessorBase {
+private:
+ using TBase = TDSAccessorBase;
+ std::set<TString> PathesInCheck;
+
+ class TInputController: public TTableExistsActor::TEvController {
+ public:
+ TInputController(const TActorIdentity& actorId)
+ : TTableExistsActor::TEvController(actorId) {
+
+ }
+ };
+
+ std::shared_ptr<TInputController> InputController;
+ ISnapshotConstructorController::TPtr OutputController;
+protected:
+ virtual void OnNewEnrichedSnapshot(NFetcher::ISnapshot::TPtr snapshot) final;
+ virtual void OnIncorrectSnapshotFromYQL(const TString& errorMessage) final;
+ virtual void OnSnapshotEnrichingError(const TString& errorMessage) final;
+
+ virtual void OnBootstrap() override;
+
+ void Handle(TTableExistsActor::TEvController::TEvError::TPtr& ev);
+
+ void Handle(TTableExistsActor::TEvController::TEvResult::TPtr& ev);
+
+public:
+ class TEvController: public ISnapshotConstructorController {
+ private:
+ const TActorIdentity ActorId;
+ public:
+ class TEvResult: public TEventLocal<TEvResult, EEvents::EvAccessorSimpleResult> {
+ private:
+ YDB_READONLY_DEF(NFetcher::ISnapshot::TPtr, Result);
+ public:
+ TEvResult(NFetcher::ISnapshot::TPtr snapshot)
+ : Result(snapshot) {
+
+ }
+ };
+
+ class TEvError: public TEventLocal<TEvError, EEvents::EvAccessorSimpleError> {
+ private:
+ YDB_READONLY_DEF(TString, ErrorMessage);
+ public:
+ TEvError(const TString& errorMessage)
+ : ErrorMessage(errorMessage) {
+
+ }
+ };
+
+ class TEvTableAbsent: public TEventLocal<TEvTableAbsent, EEvents::EvAccessorSimpleTableAbsent> {
+ private:
+ YDB_READONLY_DEF(TString, Path);
+ public:
+ TEvTableAbsent(const TString& path)
+ : Path(path) {
+
+ }
+ };
+
+ virtual void OnSnapshotConstructionResult(NFetcher::ISnapshot::TPtr snapshot) const override {
+ ActorId.Send(ActorId, new TEvResult(snapshot));
+ }
+
+ virtual void OnSnapshotConstructionError(const TString& errorMessage) const override {
+ ActorId.Send(ActorId, new TEvError(errorMessage));
+ }
+
+ virtual void OnSnapshotConstructionTableAbsent(const TString& path) const override {
+ ActorId.Send(ActorId, new TEvTableAbsent(path));
+ }
+
+ TEvController(const TActorIdentity& actorId)
+ : ActorId(actorId)
+ {
+
+ }
+ };
+
+ STFUNC(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TTableExistsActor::TEvController::TEvError, Handle);
+ hFunc(TTableExistsActor::TEvController::TEvResult, Handle);
+ default:
+ TBase::StateMain(ev, ctx);
+ }
+ }
+ TDSAccessorSimple(const NRequest::TConfig& config, ISnapshotConstructorController::TPtr outputController,
+ NFetcher::ISnapshotsFetcher::TPtr snapshotConstructor)
+ : TBase(config, snapshotConstructor)
+ , OutputController(outputController){
+
+ }
+};
+
+}
diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.cpp b/ydb/services/metadata/ds_table/accessor_subscribe.cpp
index 47b6f7d217..b34b6324bd 100644
--- a/ydb/services/metadata/ds_table/accessor_subscribe.cpp
+++ b/ydb/services/metadata/ds_table/accessor_subscribe.cpp
@@ -54,4 +54,9 @@ void TDSAccessorNotifier::OnSnapshotRefresh() {
}
}
+void TDSAccessorNotifier::OnBootstrap() {
+ TBase::OnBootstrap();
+ Become(&TDSAccessorNotifier::StateMain);
+}
+
}
diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.h b/ydb/services/metadata/ds_table/accessor_subscribe.h
index 809d543631..e57b5e398a 100644
--- a/ydb/services/metadata/ds_table/accessor_subscribe.h
+++ b/ydb/services/metadata/ds_table/accessor_subscribe.h
@@ -41,14 +41,10 @@ private:
std::set<NActors::TActorId> Subscribed;
std::map<TInstant, std::set<NActors::TActorId>> Asked;
protected:
- virtual void RegisterState() override {
- Become(&TDSAccessorNotifier::StateMain);
- }
+ virtual void OnBootstrap() override;
virtual void OnSnapshotModified() override;
virtual void OnSnapshotRefresh() override;
public:
- using TBase::Handle;
-
TDSAccessorNotifier(const TConfig& config, NFetcher::ISnapshotsFetcher::TPtr sParser)
: TBase(config, sParser) {
}
diff --git a/ydb/services/metadata/ds_table/scheme_describe.cpp b/ydb/services/metadata/ds_table/scheme_describe.cpp
index 30ed84e1ec..83f824b1ea 100644
--- a/ydb/services/metadata/ds_table/scheme_describe.cpp
+++ b/ydb/services/metadata/ds_table/scheme_describe.cpp
@@ -11,20 +11,22 @@ namespace NKikimr::NMetadata::NProvider {
void TSchemeDescriptionActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
auto* info = ev->Get();
const auto& request = info->Request;
+ auto g = PassAwayGuard();
if (request->ResultSet.empty()) {
Controller->OnDescriptionFailed("navigation problems for path " + Path, RequestId);
- PassAway();
return;
}
if (request->ResultSet.size() != 1) {
Controller->OnDescriptionFailed("cannot resolve database path " + Path, RequestId);
- PassAway();
return;
}
auto& entity = request->ResultSet.front();
- auto g = PassAwayGuard();
- Controller->OnDescriptionSuccess(std::move(entity.Columns), RequestId);
+ if (entity.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
+ Controller->OnDescriptionSuccess(std::move(entity.Columns), RequestId);
+ } else {
+ Controller->OnDescriptionFailed("incorrect path status: " + ::ToString(entity.Status), RequestId);
+ }
}
void TSchemeDescriptionActor::Bootstrap() {
diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp
index 13ee11730a..f327a4cd84 100644
--- a/ydb/services/metadata/ds_table/service.cpp
+++ b/ydb/services/metadata/ds_table/service.cpp
@@ -16,20 +16,21 @@ IActor* CreateService(const TConfig& config) {
void TService::PrepareManagers(std::vector<IClassBehaviour::TPtr> managers, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender) {
TManagersId id(managers);
- const bool isInitialization = (managers.size() == 1) && managers.front()->GetTypeId() == NInitializer::TDBInitialization::GetTypeId();
- if (ActiveFlag || (PreparationFlag && isInitialization)) {
- for (auto&& manager : managers) {
- Y_VERIFY(!RegisteredManagers.contains(manager->GetTypeId()));
- if (!ManagersInRegistration.contains(manager->GetTypeId()) && !RegisteredManagers.contains(manager->GetTypeId())) {
- ManagersInRegistration.emplace(manager->GetTypeId(), manager);
- Register(new NInitializer::TDSAccessorInitialized(Config.GetRequestConfig(),
- manager->GetTypeId(), manager->GetInitializer(), InternalController, InitializationSnapshot));
+ if (InitializationSnapshot) {
+ const bool isInitialization = (managers.size() == 1) && managers.front()->GetTypeId() == NInitializer::TDBInitialization::GetTypeId();
+ if (ActiveFlag || (PreparationFlag && isInitialization)) {
+ for (auto&& manager : managers) {
+ Y_VERIFY(!RegisteredManagers.contains(manager->GetTypeId()));
+ if (!ManagersInRegistration.contains(manager->GetTypeId()) && !RegisteredManagers.contains(manager->GetTypeId())) {
+ ManagersInRegistration.emplace(manager->GetTypeId(), manager);
+ Register(new NInitializer::TDSAccessorInitialized(Config.GetRequestConfig(),
+ manager->GetTypeId(), manager->GetInitializer(), InternalController, InitializationSnapshot));
+ }
}
+ } else if (!PreparationFlag) {
+ PreparationFlag = true;
+ Send(SelfId(), new TEvSubscribeExternal(InitializationFetcher));
}
- } else if (!PreparationFlag) {
- PreparationFlag = true;
- InitializationFetcher = std::make_shared<NInitializer::TFetcher>();
- Send(SelfId(), new TEvSubscribeExternal(InitializationFetcher));
}
EventsWaiting[id].emplace_back(ev, sender);
}
@@ -107,26 +108,31 @@ void TService::Handle(NInitializer::TEvInitializationFinished::TPtr& ev) {
void TService::Handle(TEvSubscribeExternal::TPtr& ev) {
const TActorId senderId = ev->Sender;
- ProcessEventWithFetcher(ev, [this, senderId](const TActorId& actorId) {
+ ProcessEventWithFetcher(*ev, ev->Get()->GetFetcher(), [this, senderId](const TActorId& actorId) {
Send<TEvSubscribe>(actorId, senderId);
});
}
void TService::Handle(TEvAskSnapshot::TPtr& ev) {
const TActorId senderId = ev->Sender;
- ProcessEventWithFetcher(ev, [this, senderId](const TActorId& actorId) {
+ ProcessEventWithFetcher(*ev, ev->Get()->GetFetcher(), [this, senderId](const TActorId& actorId) {
Send<TEvAsk>(actorId, senderId);
});
}
void TService::Handle(TEvObjectsOperation::TPtr& ev) {
- auto it = RegisteredManagers.find(ev->Get()->GetCommand()->GetManager()->GetTypeId());
- if (it != RegisteredManagers.end()) {
- ev->Get()->GetCommand()->SetManager(it->second);
+ if (ev->Get()->GetCommand()->GetManager()->GetTypeId() == NInitializer::TDBInitialization::GetTypeId()) {
+ ev->Get()->GetCommand()->SetManager(NInitializer::TDBInitialization::GetBehaviour());
ev->Get()->GetCommand()->Execute();
} else {
- auto m = ev->Get()->GetCommand()->GetManager();
- PrepareManagers({ m }, ev->ReleaseBase(), ev->Sender);
+ auto it = RegisteredManagers.find(ev->Get()->GetCommand()->GetManager()->GetTypeId());
+ if (it != RegisteredManagers.end()) {
+ ev->Get()->GetCommand()->SetManager(it->second);
+ ev->Get()->GetCommand()->Execute();
+ } else {
+ auto m = ev->Get()->GetCommand()->GetManager();
+ PrepareManagers({ m }, ev->ReleaseBase(), ev->Sender);
+ }
}
}
@@ -143,17 +149,43 @@ void TService::Handle(TEvRefreshSubscriberData::TPtr& ev) {
Y_VERIFY(InitializationSnapshot);
if (!ActiveFlag) {
ActiveFlag = true;
- for (auto&& i : EventsWaiting) {
- i.second.front().Resend(SelfId());
- i.second.pop_front();
- }
+ Activate();
+ }
+}
+
+void TService::Activate() {
+ for (auto&& i : EventsWaiting) {
+ i.second.front().Resend(SelfId());
+ i.second.pop_front();
}
}
+void TService::Handle(TDSAccessorSimple::TEvController::TEvResult::TPtr& ev) {
+ InitializationSnapshot = dynamic_pointer_cast<NInitializer::TSnapshot>(ev->Get()->GetResult());
+ Y_VERIFY(InitializationSnapshot);
+ Activate();
+}
+
+void TService::Handle(TEvStartMetadataService::TPtr& /*ev*/) {
+ Register(new TDSAccessorSimple(Config.GetRequestConfig(), InternalController, InitializationFetcher));
+}
+
+void TService::Handle(TDSAccessorSimple::TEvController::TEvError::TPtr& ev) {
+ ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot receive initializer snapshot: " << ev->Get()->GetErrorMessage() << Endl;
+ Schedule(TDuration::Seconds(1), new TEvStartMetadataService());
+}
+
+void TService::Handle(TDSAccessorSimple::TEvController::TEvTableAbsent::TPtr& /*ev*/) {
+ InitializationSnapshot = std::make_shared<NInitializer::TSnapshot>(TInstant::Zero());
+ Activate();
+}
+
void TService::Bootstrap(const NActors::TActorContext& /*ctx*/) {
ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "metadata service started" << Endl;
Become(&TService::StateMain);
InternalController = std::make_shared<TServiceInternalController>(SelfId());
+ InitializationFetcher = std::make_shared<NInitializer::TFetcher>();
+ Sender<TEvStartMetadataService>().SendTo(SelfId());
}
void TServiceInternalController::InitializationFinished(const TString& id) const {
diff --git a/ydb/services/metadata/ds_table/service.h b/ydb/services/metadata/ds_table/service.h
index 35dd823e76..a22597d4a4 100644
--- a/ydb/services/metadata/ds_table/service.h
+++ b/ydb/services/metadata/ds_table/service.h
@@ -2,6 +2,7 @@
#include "accessor_subscribe.h"
#include "config.h"
#include "scheme_describe.h"
+#include "accessor_snapshot_simple.h"
#include <ydb/services/metadata/service.h>
#include <ydb/services/metadata/initializer/common.h>
@@ -15,6 +16,9 @@
namespace NKikimr::NMetadata::NProvider {
+class TEvStartMetadataService: public TEventLocal<TEvStartMetadataService, EEvents::EvStartMetadataService> {
+};
+
class TEvTableDescriptionFailed: public TEventLocal<TEvTableDescriptionFailed, EEvents::EvTableDescriptionFailed> {
private:
YDB_READONLY_DEF(TString, ErrorMessage);
@@ -44,12 +48,14 @@ public:
}
};
-class TServiceInternalController: public NInitializer::IInitializerOutput, public ISchemeDescribeController {
+class TServiceInternalController: public NInitializer::IInitializerOutput,
+ public TDSAccessorSimple::TEvController, public ISchemeDescribeController {
private:
const NActors::TActorIdentity ActorId;
public:
TServiceInternalController(const NActors::TActorIdentity& actorId)
- : ActorId(actorId)
+ : TDSAccessorSimple::TEvController(actorId)
+ , ActorId(actorId)
{
}
@@ -120,10 +126,6 @@ public:
}
};
-class TWaitingWatcher {
-private:
-};
-
class TService: public NActors::TActorBootstrapped<TService> {
private:
using TBase = NActors::TActor<TService>;
@@ -139,46 +141,57 @@ private:
std::shared_ptr<TServiceInternalController> InternalController;
const TConfig Config;
- void Handle(NInitializer::TEvInitializationFinished::TPtr& ev);
+ void Handle(TEvStartMetadataService::TPtr& ev);
+ void Handle(NInitializer::TEvInitializationFinished::TPtr & ev);
void Handle(TEvRefreshSubscriberData::TPtr& ev);
+
void Handle(TEvAskSnapshot::TPtr& ev);
void Handle(TEvPrepareManager::TPtr& ev);
void Handle(TEvSubscribeExternal::TPtr& ev);
void Handle(TEvUnsubscribeExternal::TPtr& ev);
void Handle(TEvObjectsOperation::TPtr& ev);
+
void Handle(TEvTableDescriptionSuccess::TPtr& ev);
void Handle(TEvTableDescriptionFailed::TPtr& ev);
+ void Handle(TDSAccessorSimple::TEvController::TEvResult::TPtr& ev);
+ void Handle(TDSAccessorSimple::TEvController::TEvError::TPtr& ev);
+ void Handle(TDSAccessorSimple::TEvController::TEvTableAbsent::TPtr& ev);
+
void PrepareManagers(std::vector<IClassBehaviour::TPtr> manager, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender);
void InitializationFinished(const TString& initId);
- void RequestTableDescription(const TString& path) const;
+ void Activate();
- template <class TEventPtr, class TAction>
- void ProcessEventWithFetcher(TEventPtr& ev, TAction action) {
+ template <class TAction>
+ void ProcessEventWithFetcher(IEventHandle& ev, NFetcher::ISnapshotsFetcher::TPtr fetcher, TAction action) {
std::vector<IClassBehaviour::TPtr> needManagers;
- for (auto&& i : ev->Get()->GetFetcher()->GetManagers()) {
+ for (auto&& i : fetcher->GetManagers()) {
if (!RegisteredManagers.contains(i->GetTypeId())) {
needManagers.emplace_back(i);
}
}
if (needManagers.empty()) {
- auto it = Accessors.find(ev->Get()->GetFetcher()->GetComponentId());
+ auto it = Accessors.find(fetcher->GetComponentId());
if (it == Accessors.end()) {
- THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetFetcher());
- it = Accessors.emplace(ev->Get()->GetFetcher()->GetComponentId(), Register(actor.Release())).first;
+ THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, fetcher);
+ it = Accessors.emplace(fetcher->GetComponentId(), Register(actor.Release())).first;
}
action(it->second);
} else {
- PrepareManagers(needManagers, ev->ReleaseBase(), ev->Sender);
+ PrepareManagers(needManagers, ev.ReleaseBase(), ev.Sender);
}
}
public:
- void Bootstrap(const NActors::TActorContext& /*ctx*/);
+ void Bootstrap(const NActors::TActorContext& ctx);
STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
+ hFunc(TDSAccessorSimple::TEvController::TEvResult, Handle);
+ hFunc(TDSAccessorSimple::TEvController::TEvError, Handle);
+ hFunc(TDSAccessorSimple::TEvController::TEvTableAbsent, Handle);
+
hFunc(TEvObjectsOperation, Handle);
hFunc(TEvRefreshSubscriberData, Handle);
hFunc(TEvAskSnapshot, Handle);
@@ -188,6 +201,8 @@ public:
hFunc(TEvTableDescriptionSuccess, Handle);
hFunc(TEvTableDescriptionFailed, Handle);
+
+ hFunc(TEvStartMetadataService, Handle);
hFunc(NInitializer::TEvInitializationFinished, Handle);
default:
diff --git a/ydb/services/metadata/ds_table/table_exists.cpp b/ydb/services/metadata/ds_table/table_exists.cpp
new file mode 100644
index 0000000000..330dd722b1
--- /dev/null
+++ b/ydb/services/metadata/ds_table/table_exists.cpp
@@ -0,0 +1,60 @@
+#include "table_exists.h"
+
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/protos/services.pb.h>
+#include <ydb/core/ydb_convert/ydb_convert.h>
+
+namespace NKikimr::NMetadata::NProvider {
+
+void TTableExistsActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ auto* info = ev->Get();
+ const auto& request = info->Request;
+ auto g = PassAwayGuard();
+
+ if (request->ResultSet.empty()) {
+ OutputController->OnPathExistsCheckFailed("navigation problems for path", Path);
+ return;
+ }
+ if (request->ResultSet.size() != 1) {
+ OutputController->OnPathExistsCheckFailed("cannot resolve database path", Path);
+ return;
+ }
+ auto& entity = request->ResultSet.front();
+ if (entity.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) {
+ if (entity.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindTable) {
+ OutputController->OnPathExistsCheckResult(true, Path);
+ } else {
+ OutputController->OnPathExistsCheckResult(false, Path);
+ }
+ } else if (entity.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotPath) {
+ OutputController->OnPathExistsCheckResult(false, Path);
+ } else if (entity.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable) {
+ OutputController->OnPathExistsCheckResult(false, Path);
+ } else if (entity.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown) {
+ OutputController->OnPathExistsCheckResult(false, Path);
+ } else {
+ OutputController->OnPathExistsCheckFailed("incorrect path status: " + ::ToString(entity.Status), Path);
+ }
+}
+
+NKikimrServices::TActivity::EType TTableExistsActor::ActorActivityType() {
+ return NKikimrServices::TActivity::METADATA_SCHEME_DESCRIPTION_ACTOR;
+}
+
+void TTableExistsActor::OnBootstrap() {
+ Become(&TTableExistsActor::StateMain);
+
+ auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
+ request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName);
+ auto& entry = request->ResultSet.emplace_back();
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
+ entry.Path = NKikimr::SplitPath(Path);
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
+}
+
+void TTableExistsActor::OnTimeout() {
+ OutputController->OnPathExistsCheckFailed("timeout", Path);
+}
+
+}
diff --git a/ydb/services/metadata/ds_table/table_exists.h b/ydb/services/metadata/ds_table/table_exists.h
new file mode 100644
index 0000000000..6394ba848c
--- /dev/null
+++ b/ydb/services/metadata/ds_table/table_exists.h
@@ -0,0 +1,86 @@
+#pragma once
+
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/services/metadata/common/ss_dialog.h>
+
+namespace NKikimr::NMetadata::NProvider {
+
+class ITableExistsController {
+public:
+ using TPtr = std::shared_ptr<ITableExistsController>;
+ virtual ~ITableExistsController() = default;
+ virtual void OnPathExistsCheckFailed(const TString& errorMessage, const TString& path) const = 0;
+ virtual void OnPathExistsCheckResult(const bool exists, const TString& path) const = 0;
+};
+
+class TTableExistsActor: public NInternal::TTimeoutActor<TTableExistsActor> {
+private:
+ using TBase = NInternal::TTimeoutActor<TTableExistsActor>;
+ ITableExistsController::TPtr OutputController;
+ const TString Path;
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
+ virtual void OnBootstrap() override;
+ virtual void OnTimeout() override;
+public:
+
+ class TEvController: public ITableExistsController {
+ private:
+ const TActorIdentity ActorId;
+ public:
+ class TEvError: public TEventLocal<TEvError, EvPathExistsCheckFailed> {
+ private:
+ YDB_READONLY_DEF(TString, ErrorMessage);
+ YDB_READONLY_DEF(TString, Path);
+ public:
+ TEvError(const TString& errorMessage, const TString& path)
+ : ErrorMessage(errorMessage)
+ , Path(path)
+ {
+
+ }
+ };
+
+ class TEvResult: public TEventLocal<TEvResult, EvPathExistsCheckResult> {
+ private:
+ YDB_READONLY_FLAG(TableExists, false);
+ YDB_READONLY_DEF(TString, TablePath);
+ public:
+ TEvResult(const bool exists, const TString& path)
+ : TableExistsFlag(exists)
+ , TablePath(path) {
+
+ }
+ };
+
+ virtual void OnPathExistsCheckFailed(const TString& errorMessage, const TString& path) const override {
+ ActorId.Send(ActorId, new TEvError(errorMessage, path));
+ }
+ virtual void OnPathExistsCheckResult(const bool exists, const TString& path) const override {
+ ActorId.Send(ActorId, new TEvResult(exists, path));
+ }
+
+ TEvController(const TActorIdentity& actorId)
+ : ActorId(actorId) {
+
+ }
+ };
+
+ static NKikimrServices::TActivity::EType ActorActivityType();
+
+ STFUNC(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ default:
+ TBase::StateMain(ev, ctx);
+ }
+ }
+ TTableExistsActor(ITableExistsController::TPtr controller, const TString& path, const TDuration d)
+ : TBase(d)
+ , OutputController(controller)
+ , Path(path)
+ {
+
+ }
+};
+
+}
diff --git a/ydb/services/metadata/initializer/behaviour.cpp b/ydb/services/metadata/initializer/behaviour.cpp
index 4f3562af24..e106530587 100644
--- a/ydb/services/metadata/initializer/behaviour.cpp
+++ b/ydb/services/metadata/initializer/behaviour.cpp
@@ -20,7 +20,13 @@ IInitializationBehaviour::TPtr TDBObjectBehaviour::ConstructInitializer() const
}
std::shared_ptr<NKikimr::NMetadata::NModifications::IOperationsManager> TDBObjectBehaviour::ConstructOperationsManager() const {
- return std::make_shared<TManager>();
+ auto result = std::make_shared<TManager>();
+ NModifications::TTableSchema schema;
+ schema.AddColumn(true, NInternal::TYDBColumn::Bytes(TDBInitialization::TDecoder::ComponentId));
+ schema.AddColumn(true, NInternal::TYDBColumn::Bytes(TDBInitialization::TDecoder::ModificationId));
+ schema.AddColumn(false, NInternal::TYDBColumn::UInt32(TDBInitialization::TDecoder::Instant));
+ result->SetActualSchema(schema);
+ return result;
}
}
diff --git a/ydb/services/metadata/initializer/ut/ut_init.cpp b/ydb/services/metadata/initializer/ut/ut_init.cpp
index 5f0a9cdb65..16ea1a4be1 100644
--- a/ydb/services/metadata/initializer/ut/ut_init.cpp
+++ b/ydb/services/metadata/initializer/ut/ut_init.cpp
@@ -134,7 +134,7 @@ Y_UNIT_TEST_SUITE(Initializer) {
runtime.Register(emulator);
const TInstant start = Now();
- while (Now() - start < TDuration::Seconds(15) && !emulator->IsInitialized()) {
+ while (Now() - start < TDuration::Seconds(35) && !emulator->IsInitialized()) {
runtime.SimulateSleep(TDuration::Seconds(1));
}
UNIT_ASSERT(emulator->IsInitialized());
@@ -142,6 +142,9 @@ Y_UNIT_TEST_SUITE(Initializer) {
lHelper.StartDataRequest("SELECT * FROM `/Root/.metadata/test`");
lHelper.StartSchemaRequest("DROP TABLE `/Root/.metadata/test`", false);
+ lHelper.StartDataRequest("SELECT * FROM `/Root/.metadata/initialization/migrations`");
+ lHelper.StartSchemaRequest("DELETE FROM `/Root/.metadata/initialization/migrations`", false);
+ lHelper.StartSchemaRequest("DROP TABLE `/Root/.metadata/initialization/migrations`", false);
}
}
}
diff --git a/ydb/services/metadata/manager/abstract.h b/ydb/services/metadata/manager/abstract.h
index 5f2c156d23..c5130d9dcf 100644
--- a/ydb/services/metadata/manager/abstract.h
+++ b/ydb/services/metadata/manager/abstract.h
@@ -116,9 +116,11 @@ private:
YDB_READONLY_DEF(std::vector<Ydb::Column>, PKColumns);
YDB_READONLY_DEF(std::vector<TString>, PKColumnIds);
- TTableSchema& AddColumn(const bool primary, const Ydb::Column& info) noexcept;
public:
+ TTableSchema() = default;
TTableSchema(const THashMap<ui32, TSysTables::TTableColumnInfo>& description);
+
+ TTableSchema& AddColumn(const bool primary, const Ydb::Column& info) noexcept;
};
class IOperationsManager {
diff --git a/ydb/services/metadata/manager/alter_impl.h b/ydb/services/metadata/manager/alter_impl.h
index a6098325db..ffc3564601 100644
--- a/ydb/services/metadata/manager/alter_impl.h
+++ b/ydb/services/metadata/manager/alter_impl.h
@@ -126,7 +126,7 @@ public:
return TBase::PassAway();
}
- TBase::Register(new NRequest::TYDBRequest<NRequest::TDialogCreateSession>(
+ TBase::Register(new NRequest::TYDBCallbackRequest<NRequest::TDialogCreateSession>(
NRequest::TDialogCreateSession::TRequest(), UserToken, TBase::SelfId()));
}
diff --git a/ydb/services/metadata/manager/modification.h b/ydb/services/metadata/manager/modification.h
index 58b83f31cc..7a85bdb418 100644
--- a/ydb/services/metadata/manager/modification.h
+++ b/ydb/services/metadata/manager/modification.h
@@ -49,7 +49,7 @@ protected:
void Handle(NRequest::TEvRequestResult<NRequest::TDialogYQLRequest>::TPtr& /*ev*/) {
if (Requests.size()) {
- TBase::Register(new NRequest::TYDBRequest<NRequest::TDialogYQLRequest>(
+ TBase::Register(new NRequest::TYDBCallbackRequest<NRequest::TDialogYQLRequest>(
Requests.front(), SystemUserToken, TBase::SelfId()));
Requests.pop_front();
} else {
@@ -94,7 +94,7 @@ public:
Y_VERIFY(Requests.size());
Requests.back().mutable_tx_control()->set_commit_tx(true);
- TBase::Register(new NRequest::TYDBRequest<NRequest::TDialogYQLRequest>(
+ TBase::Register(new NRequest::TYDBCallbackRequest<NRequest::TDialogYQLRequest>(
Requests.front(), SystemUserToken, TBase::SelfId()));
Requests.pop_front();
}
diff --git a/ydb/services/metadata/manager/restore.h b/ydb/services/metadata/manager/restore.h
index 0fd04a1a1d..a50caf7de8 100644
--- a/ydb/services/metadata/manager/restore.h
+++ b/ydb/services/metadata/manager/restore.h
@@ -71,7 +71,7 @@ public:
request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write();
request.set_session_id(SessionId);
TBase::Become(&TRestoreObjectsActor::StateMain);
- TBase::Register(new NRequest::TYDBRequest<NRequest::TDialogSelect>(request, UserToken, TBase::SelfId()));
+ TBase::Register(new NRequest::TYDBCallbackRequest<NRequest::TDialogSelect>(request, UserToken, TBase::SelfId()));
}
};
diff --git a/ydb/services/metadata/request/request_actor.h b/ydb/services/metadata/request/request_actor.h
index d251ff55a8..83a00e35e7 100644
--- a/ydb/services/metadata/request/request_actor.h
+++ b/ydb/services/metadata/request/request_actor.h
@@ -91,17 +91,12 @@ public:
};
template <class TDialogPolicy>
-class TYDBRequest: public NActors::TActorBootstrapped<TYDBRequest<TDialogPolicy>> {
+class TYDBOneRequest: public NActors::TActorBootstrapped<TYDBOneRequest<TDialogPolicy>> {
private:
- using TBase = NActors::TActorBootstrapped<TYDBRequest<TDialogPolicy>>;
+ using TBase = NActors::TActorBootstrapped<TYDBOneRequest<TDialogPolicy>>;
using TRequest = typename TDialogPolicy::TRequest;
using TResponse = typename TDialogPolicy::TResponse;
- using TSelf = TYDBRequest<TDialogPolicy>;
TRequest ProtoRequest;
- const NActors::TActorId ActorFinishId;
- const NActors::TActorId ActorRestartId;
- const TConfig Config;
- ui32 Retry = 0;
const NACLib::TUserToken UserToken;
protected:
class TEvRequestInternalResult: public NActors::TEventLocal<TEvRequestInternalResult, TDialogPolicy::EvResultInternal> {
@@ -113,6 +108,9 @@ protected:
}
};
+
+ virtual void OnInternalResultError(const TString& errorMessage) = 0;
+ virtual void OnInternalResultSuccess(TResponse&& response) = 0;
public:
void Bootstrap(const TActorContext& /*ctx*/) {
TBase::Become(&TBase::TThis::StateMain);
@@ -130,11 +128,7 @@ public:
void Handle(typename TEvRequestInternalResult::TPtr& ev) {
if (!ev->Get()->GetFuture().HasValue() || ev->Get()->GetFuture().HasException()) {
ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot receive result on initialization";
- if (ActorRestartId) {
- TBase::template Sender<TEvRequestFailed>("incorrect future result").SendTo(ActorRestartId);
- } else {
- TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart);
- }
+ OnInternalResultError("cannot receive result from future");
return;
}
auto f = ev->Get()->GetFuture();
@@ -143,16 +137,11 @@ public:
ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "incorrect reply: " << response.DebugString();
NYql::TIssues issue;
NYql::IssuesFromMessage(response.operation().issues(), issue);
- if (ActorRestartId) {
- TBase::template Sender<TEvRequestFailed>(issue.ToString()).SendTo(ActorRestartId);
- } else {
- TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart);
- }
+ OnInternalResultError(issue.ToString());
return;
}
- TBase::template Sender<TEvRequestResult<TDialogPolicy>>(std::move(response)).SendTo(ActorFinishId);
- TBase::template Sender<TEvRequestFinished>().SendTo(ActorFinishId);
- TBase::Die(TActivationContext::AsActorContext());
+ OnInternalResultSuccess(std::move(response));
+ TBase::PassAway();
}
void Handle(typename TEvRequestStart::TPtr& /*ev*/) {
@@ -168,23 +157,71 @@ public:
result.Subscribe(replyCallback);
}
+ TYDBOneRequest(const TRequest& request, const NACLib::TUserToken& uToken)
+ : ProtoRequest(request)
+ , UserToken(uToken) {
+
+ }
+};
+
+template <class TDialogPolicy>
+class TYDBRequest: public TYDBOneRequest<TDialogPolicy> {
+private:
+ using TBase = TYDBOneRequest<TDialogPolicy>;
+ using TRequest = typename TDialogPolicy::TRequest;
+ using TResponse = typename TDialogPolicy::TResponse;
+ const NActors::TActorId ActorFinishId;
+ const NActors::TActorId ActorRestartId;
+ const TConfig Config;
+ ui32 Retry = 0;
+protected:
+ virtual void OnInternalResultError(const TString& errorMessage) override {
+ if (ActorRestartId) {
+ TBase::template Sender<TEvRequestFailed>(errorMessage).SendTo(ActorRestartId);
+ TBase::PassAway();
+ } else {
+ TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart);
+ }
+ }
+ virtual void OnInternalResultSuccess(TResponse&& response) override {
+ TBase::template Sender<TEvRequestResult<TDialogPolicy>>(std::move(response)).SendTo(ActorFinishId);
+ TBase::template Sender<TEvRequestFinished>().SendTo(ActorFinishId);
+ }
+public:
+
TYDBRequest(const TRequest& request, const NACLib::TUserToken& uToken,
const NActors::TActorId actorFinishId, const TConfig& config, const NActors::TActorId& actorRestartId = {})
- : ProtoRequest(request)
+ : TBase(request, uToken)
, ActorFinishId(actorFinishId)
, ActorRestartId(actorRestartId)
- , Config(config)
- , UserToken(uToken)
- {
+ , Config(config) {
}
+};
- TYDBRequest(const TRequest& request, const NACLib::TUserToken& uToken, const NActors::TActorId actorCallbackId)
- : ProtoRequest(request)
- , ActorFinishId(actorCallbackId)
- , ActorRestartId(actorCallbackId)
- , UserToken(uToken)
- {
+template <class TDialogPolicy>
+class TYDBCallbackRequest: public TYDBOneRequest<TDialogPolicy> {
+private:
+ using TBase = TYDBOneRequest<TDialogPolicy>;
+ using TRequest = typename TDialogPolicy::TRequest;
+ using TResponse = typename TDialogPolicy::TResponse;
+ const NActors::TActorId CallbackActorId;
+ const TConfig Config;
+ ui32 Retry = 0;
+protected:
+ virtual void OnInternalResultError(const TString& errorMessage) override {
+ TBase::template Sender<TEvRequestFailed>(errorMessage).SendTo(CallbackActorId);
+ TBase::PassAway();
+ }
+ virtual void OnInternalResultSuccess(TResponse&& response) override {
+ TBase::template Sender<TEvRequestResult<TDialogPolicy>>(std::move(response)).SendTo(CallbackActorId);
+ TBase::template Sender<TEvRequestFinished>().SendTo(CallbackActorId);
+ }
+public:
+
+ TYDBCallbackRequest(const TRequest& request, const NACLib::TUserToken& uToken, const NActors::TActorId actorCallbackId)
+ : TBase(request, uToken)
+ , CallbackActorId(actorCallbackId) {
}
};
@@ -204,7 +241,7 @@ private:
Y_VERIFY(sessionId);
std::optional<typename TDialogPolicy::TRequest> nextRequest = OnSessionId(sessionId);
Y_VERIFY(nextRequest);
- TBase::Register(new TYDBRequest<TDialogPolicy>(*nextRequest, UserToken, TBase::SelfId(), Config, TBase::SelfId()));
+ TBase::Register(new TYDBCallbackRequest<TDialogPolicy>(*nextRequest, UserToken, TBase::SelfId()));
}
protected:
const TConfig Config;
@@ -244,7 +281,7 @@ public:
}
void Handle(typename TEvRequestStart::TPtr& /*ev*/) {
- TBase::Register(new TYDBRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), UserToken, TBase::SelfId(), Config, TBase::SelfId()));
+ TBase::Register(new TYDBCallbackRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), UserToken, TBase::SelfId()));
}
void Bootstrap() {
@@ -258,7 +295,7 @@ public:
using TPtr = std::shared_ptr<IQueryOutput>;
virtual ~IQueryOutput() = default;
- virtual void OnReply(const TDialogYQLRequest::TResponse& response) = 0;
+ virtual void OnYQLQueryReply(const TDialogYQLRequest::TResponse& response) = 0;
};
class TYQLQuerySessionedActor: public TSessionedActorImpl<TDialogYQLRequest> {
@@ -275,7 +312,7 @@ protected:
return request;
}
virtual void OnResult(const TDialogYQLRequest::TResponse& response) override {
- Output->OnReply(response);
+ Output->OnYQLQueryReply(response);
}
public:
TYQLQuerySessionedActor(const TString& query, const NACLib::TUserToken& uToken,
diff --git a/ydb/services/metadata/secret/ut/ut_secret.cpp b/ydb/services/metadata/secret/ut/ut_secret.cpp
index 6d6f74bf4a..e4f09cb531 100644
--- a/ydb/services/metadata/secret/ut/ut_secret.cpp
+++ b/ydb/services/metadata/secret/ut/ut_secret.cpp
@@ -236,8 +236,6 @@ Y_UNIT_TEST_SUITE(Secret) {
auto sender = runtime.AllocateEdgeActor();
server->SetupRootStoragePools(sender);
- TSecretUserEmulator* emulator = new TSecretUserEmulator;
- runtime.Register(emulator);
{
runtime.SimulateSleep(TDuration::Seconds(10));
Cerr << "Initialization finished" << Endl;
@@ -250,9 +248,6 @@ Y_UNIT_TEST_SUITE(Secret) {
lHelper.StartSchemaRequest("CREATE OBJECT `secret1:test@test1` (TYPE SECRET_ACCESS)");
lHelper.StartSchemaRequest("CREATE OBJECT `secret2:test@test1` (TYPE SECRET_ACCESS)", false);
lHelper.StartSchemaRequest("DROP OBJECT `secret1` (TYPE SECRET)", false);
- lHelper.StartDataRequest("SELECT * FROM `/Root/.metadata/initialization/migrations`");
- lHelper.StartSchemaRequest("DELETE FROM `/Root/.metadata/initialization/migrations`", false);
- lHelper.StartSchemaRequest("DROP TABLE `/Root/.metadata/initialization/migrations`", false);
lHelper.StartDataRequest("SELECT * FROM `/Root/.metadata/secrets/values`", false);
}
}