diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-12-21 08:56:00 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-12-21 08:56:00 +0300 |
commit | ece732c43ec9915c75884b02478cf9b9abd189e9 (patch) | |
tree | 09013f2800e7ac740d2bcbc6b647a091abccc547 | |
parent | fd79d5c797534fcce5d66ff6b53283b64f995e7e (diff) | |
download | ydb-ece732c43ec9915c75884b02478cf9b9abd189e9.tar.gz |
initialize snapshot for migrations on start withno constructor
30 files changed, 768 insertions, 255 deletions
diff --git a/ydb/core/testlib/common_helper.cpp b/ydb/core/testlib/common_helper.cpp index 5c780b0ba7..182e02b163 100644 --- a/ydb/core/testlib/common_helper.cpp +++ b/ydb/core/testlib/common_helper.cpp @@ -20,45 +20,59 @@ void THelper::WaitForSchemeOperation(TActorId sender, ui64 txId) { } void THelper::StartDataRequest(const TString& request, const bool expectSuccess) const { - NYdb::NTable::TTableClient tClient(Server.GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false)); + NYdb::NTable::TTableClient tClient(Server.GetDriver(), + NYdb::NTable::TClientSettings().UseQueryCache(false).AuthToken("root@builtin")); auto expectation = expectSuccess; - tClient.CreateSession().Subscribe([request, expectation](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + bool resultReady = false; + bool* rrPtr = &resultReady; + tClient.CreateSession().Subscribe([rrPtr, request, expectation](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { auto session = f.GetValueSync().GetSession(); session.ExecuteDataQuery(request , NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx()) - .Subscribe([expectation](NYdb::NTable::TAsyncDataQueryResult f) + .Subscribe([rrPtr, expectation, request](NYdb::NTable::TAsyncDataQueryResult f) { TStringStream ss; f.GetValueSync().GetIssues().PrintTo(ss, false); - Cerr << ss.Str() << Endl; + Cerr << "REQUEST=" << request << ";RESULT=" << ss.Str() << ";EXPECTATION=" << expectation << Endl; Y_VERIFY(expectation == f.GetValueSync().IsSuccess()); + *rrPtr = true; }); }); + const TInstant start = TInstant::Now(); + while (!resultReady && start + TDuration::Seconds(200) > TInstant::Now()) { + Server.GetRuntime()->SimulateSleep(TDuration::Seconds(1)); + } + Cerr << "REQUEST=" << request << ";EXPECTATION=" << expectation << Endl; + Y_VERIFY(resultReady); } -void THelper::StartSchemaRequest(const TString& request, const bool expectSuccess) const { +void THelper::StartSchemaRequest(const TString& request, const bool expectSuccess, const bool waiting) const { NYdb::NTable::TTableClient tClient(Server.GetDriver(), NYdb::NTable::TClientSettings().UseQueryCache(false).AuthToken("root@builtin")); auto expectation = expectSuccess; - bool resultReady = false; - bool* rrPtr = &resultReady; - tClient.CreateSession().Subscribe([rrPtr, request, expectation](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { + std::shared_ptr<bool> rrPtr = std::make_shared<bool>(false); + TString requestInt = request; + tClient.CreateSession().Subscribe([rrPtr, requestInt, expectation](NThreading::TFuture<NYdb::NTable::TCreateSessionResult> f) { auto session = f.GetValueSync().GetSession(); - session.ExecuteSchemeQuery(request).Subscribe([rrPtr, expectation](NYdb::TAsyncStatus f) + session.ExecuteSchemeQuery(requestInt).Subscribe([rrPtr, expectation, requestInt](NYdb::TAsyncStatus f) { TStringStream ss; f.GetValueSync().GetIssues().PrintTo(ss, false); - Cerr << ss.Str() << Endl; + Cerr << "REQUEST=" << requestInt << ";RESULT=" << ss.Str() << ";EXPECTATION=" << expectation << Endl; Y_VERIFY(expectation == f.GetValueSync().IsSuccess(), "%d", expectation ? 1 : 0); *rrPtr = true; }); }); - const TInstant start = TInstant::Now(); - while (!resultReady && start + TDuration::Seconds(20) > TInstant::Now()) { - Server.GetRuntime()->SimulateSleep(TDuration::Seconds(1)); + Cerr << "REQUEST=" << request << ";EXPECTATION=" << expectation << ";WAITING=" << waiting << Endl; + if (waiting) { + const TInstant start = TInstant::Now(); + while (!*rrPtr && start + TDuration::Seconds(20) > TInstant::Now()) { + Server.GetRuntime()->SimulateSleep(TDuration::Seconds(1)); + } + Y_VERIFY(*rrPtr); + Cerr << "FINISHED_REQUEST=" << request << ";EXPECTATION=" << expectation << ";WAITING=" << waiting << Endl; } - Y_VERIFY(resultReady); } void THelper::DropTable(const TString& tablePath) { diff --git a/ydb/core/testlib/common_helper.h b/ydb/core/testlib/common_helper.h index 4218e1f447..d25393972d 100644 --- a/ydb/core/testlib/common_helper.h +++ b/ydb/core/testlib/common_helper.h @@ -17,6 +17,6 @@ public: void DropTable(const TString& tablePath); void StartDataRequest(const TString& request, const bool expectSuccess = true) const; - void StartSchemaRequest(const TString& request, const bool expectSuccess = true) const; + void StartSchemaRequest(const TString& request, const bool expectSuccess = true, const bool waiting = true) const; }; } diff --git a/ydb/core/tx/tiering/external_data.cpp b/ydb/core/tx/tiering/external_data.cpp index 446b880c64..b6616bc760 100644 --- a/ydb/core/tx/tiering/external_data.cpp +++ b/ydb/core/tx/tiering/external_data.cpp @@ -12,7 +12,7 @@ namespace NKikimr::NColumnShard::NTiers { void TSnapshotConstructor::EnrichSnapshotData(ISnapshot::TPtr original, NMetadata::NFetcher::ISnapshotAcceptorController::TPtr controller) const { - controller->Enriched(original); + controller->OnSnapshotEnriched(original); } TSnapshotConstructor::TSnapshotConstructor() { diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp index 4a324e66f5..02d6fce31e 100644 --- a/ydb/core/tx/tiering/ut/ut_tiers.cpp +++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp @@ -291,11 +291,12 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { Y_VERIFY(emulator->IsFound()); } { - lHelper.StartSchemaRequest("ALTER OBJECT tier1 (TYPE TIER) SET tierConfig = `" + ConfigProtoStr1 + "`"); - emulator->ResetConditions(); emulator->SetExpectedTiersCount(2); emulator->MutableCheckers().emplace("TIER.tier1", TJsonChecker("Name", "abc1")); + + lHelper.StartSchemaRequest("ALTER OBJECT tier1 (TYPE TIER) SET tierConfig = `" + ConfigProtoStr1 + "`"); + { const TInstant start = Now(); while (!emulator->IsFound() && Now() - start < TDuration::Seconds(2000)) { @@ -312,6 +313,9 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { patch.SetColumn("tierName", NMetadata::NInternal::TYDBValue::Bytes("tier1")); patches.emplace_back(std::move(patch)); } + emulator->ResetConditions(); + emulator->SetExpectedTieringsCount(0); + emulator->SetExpectedTiersCount(0); lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIER)", false); lHelper.StartSchemaRequest("DROP OBJECT tiering1(TYPE TIERING_RULE)", false); @@ -320,9 +324,6 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIER)"); lHelper.StartSchemaRequest("DROP OBJECT tier2(TYPE TIER)"); - emulator->ResetConditions(); - emulator->SetExpectedTieringsCount(0); - emulator->SetExpectedTiersCount(0); { const TInstant start = Now(); while (!emulator->IsFound() && Now() - start < TDuration::Seconds(20)) { @@ -365,7 +366,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { runtime.SimulateSleep(TDuration::Seconds(10)); Cerr << "Initialization finished" << Endl; - lHelper.StartSchemaRequest("CREATE OBJECT tier1 (TYPE TIER) WITH tierConfig = `" + ConfigProtoStr1 + "`"); + lHelper.StartSchemaRequest("CREATE OBJECT tier1 (TYPE TIER) WITH tierConfig = `" + ConfigProtoStr1 + "`", true, false); { TTestCSEmulator emulator; emulator.MutableCheckers().emplace("TIER.tier1", TJsonChecker("Name", "abc1")); @@ -378,7 +379,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { lHelper.StartSchemaRequest("CREATE OBJECT tiering1 (TYPE TIERING_RULE) " "WITH (defaultColumn = timestamp, description = `" + ConfigTiering1Str + "`)"); lHelper.StartSchemaRequest("CREATE OBJECT tiering2 (TYPE TIERING_RULE) " - "WITH (defaultColumn = timestamp, description = `" + ConfigTiering2Str + "` )"); + "WITH (defaultColumn = timestamp, description = `" + ConfigTiering2Str + "` )", true, false); { TTestCSEmulator emulator; emulator.MutableCheckers().emplace("TIER.tier1", TJsonChecker("Name", "abc1")); @@ -393,7 +394,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { lHelper.StartSchemaRequest("DROP OBJECT tiering2 (TYPE TIERING_RULE)"); lHelper.StartSchemaRequest("DROP OBJECT tiering1 (TYPE TIERING_RULE)", false); lHelper.StartSchemaRequest("DROP TABLE `/Root/olapStore/olapTable`"); - lHelper.StartSchemaRequest("DROP OBJECT tiering1 (TYPE TIERING_RULE)"); + lHelper.StartSchemaRequest("DROP OBJECT tiering1 (TYPE TIERING_RULE)", true, false); { TTestCSEmulator emulator; emulator.SetExpectedTieringsCount(0); @@ -401,7 +402,7 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { emulator.CheckRuntime(runtime); } lHelper.StartSchemaRequest("DROP OBJECT tier2 (TYPE TIER)"); - lHelper.StartSchemaRequest("DROP OBJECT tier1 (TYPE TIER)"); + lHelper.StartSchemaRequest("DROP OBJECT tier1 (TYPE TIER)", true, false); { TTestCSEmulator emulator; emulator.SetExpectedTieringsCount(0); diff --git a/ydb/services/metadata/abstract/common.h b/ydb/services/metadata/abstract/common.h index 864f031295..90998755b0 100644 --- a/ydb/services/metadata/abstract/common.h +++ b/ydb/services/metadata/abstract/common.h @@ -31,6 +31,12 @@ enum EEvents { EvTimeout, EvTableDescriptionFailed, EvTableDescriptionSuccess, + EvAccessorSimpleResult, + EvAccessorSimpleError, + EvAccessorSimpleTableAbsent, + EvPathExistsCheckFailed, + EvPathExistsCheckResult, + EvStartMetadataService, EvEnd }; diff --git a/ydb/services/metadata/abstract/fetcher.h b/ydb/services/metadata/abstract/fetcher.h index 48ebc5918c..dca8a0ab0f 100644 --- a/ydb/services/metadata/abstract/fetcher.h +++ b/ydb/services/metadata/abstract/fetcher.h @@ -23,8 +23,8 @@ class ISnapshotAcceptorController { public: using TPtr = std::shared_ptr<ISnapshotAcceptorController>; virtual ~ISnapshotAcceptorController() = default; - virtual void Enriched(std::shared_ptr<ISnapshot> enrichedSnapshot) = 0; - virtual void EnrichProblem(const TString& errorMessage) = 0; + virtual void OnSnapshotEnriched(std::shared_ptr<ISnapshot> enrichedSnapshot) = 0; + virtual void OnSnapshotEnrichError(const TString& errorMessage) = 0; }; class ISnapshot { @@ -64,7 +64,7 @@ public: ISnapshot::TPtr ParseSnapshot(const Ydb::Table::ExecuteQueryResult& rawData, const TInstant actuality) const; virtual void EnrichSnapshotData(ISnapshot::TPtr original, ISnapshotAcceptorController::TPtr controller) const { - controller->Enriched(original); + controller->OnSnapshotEnriched(original); } const std::vector<IClassBehaviour::TPtr>& GetManagers() const { diff --git a/ydb/services/metadata/ds_table/CMakeLists.darwin.txt b/ydb/services/metadata/ds_table/CMakeLists.darwin.txt index fbb641ff7b..34212fd99c 100644 --- a/ydb/services/metadata/ds_table/CMakeLists.darwin.txt +++ b/ydb/services/metadata/ds_table/CMakeLists.darwin.txt @@ -23,7 +23,10 @@ target_link_libraries(services-metadata-ds_table PUBLIC target_sources(services-metadata-ds_table PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_refresh.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/scheme_describe.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/table_exists.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp ) diff --git a/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt b/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt index cbd8ba545c..409457cc70 100644 --- a/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt +++ b/ydb/services/metadata/ds_table/CMakeLists.linux-aarch64.txt @@ -24,7 +24,10 @@ target_link_libraries(services-metadata-ds_table PUBLIC target_sources(services-metadata-ds_table PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_refresh.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/scheme_describe.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/table_exists.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp ) diff --git a/ydb/services/metadata/ds_table/CMakeLists.linux.txt b/ydb/services/metadata/ds_table/CMakeLists.linux.txt index cbd8ba545c..409457cc70 100644 --- a/ydb/services/metadata/ds_table/CMakeLists.linux.txt +++ b/ydb/services/metadata/ds_table/CMakeLists.linux.txt @@ -24,7 +24,10 @@ target_link_libraries(services-metadata-ds_table PUBLIC target_sources(services-metadata-ds_table PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_refresh.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_subscribe.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/scheme_describe.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/table_exists.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/service.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/ds_table/config.cpp ) diff --git a/ydb/services/metadata/ds_table/accessor_refresh.cpp b/ydb/services/metadata/ds_table/accessor_refresh.cpp index f3557615b8..bd9417afb4 100644 --- a/ydb/services/metadata/ds_table/accessor_refresh.cpp +++ b/ydb/services/metadata/ds_table/accessor_refresh.cpp @@ -1,71 +1,45 @@ #include "accessor_refresh.h" -#include <ydb/core/grpc_services/base/base.h> -#include <ydb/core/grpc_services/local_rpc/local_rpc.h> -#include <library/cpp/actors/core/log.h> +namespace NKikimr::NMetadata::NProvider { -#include <util/string/escape.h> +void TDSAccessorRefresher::OnBootstrap() { + TBase::OnBootstrap(); + Become(&TDSAccessorRefresher::StateMain); + Sender<TEvRefresh>().SendTo(SelfId()); +} -namespace NKikimr::NMetadata::NProvider { +void TDSAccessorRefresher::OnNewEnrichedSnapshot(NFetcher::ISnapshot::TPtr snapshot) { + Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); + CurrentSnapshot = snapshot; + *CurrentSelection.mutable_result_sets() = std::move(*ProposedProto.mutable_result_sets()); + OnSnapshotModified(); + OnSnapshotRefresh(); +} -void TDSAccessorRefresher::Handle(TEvYQLResponse::TPtr& ev) { - auto& currentFullReply = ev->Get()->GetResponse(); - Ydb::Table::ExecuteQueryResult qResult; - currentFullReply.operation().result().UnpackTo(&qResult); - Y_VERIFY((size_t)qResult.result_sets().size() == SnapshotConstructor->GetManagers().size()); +void TDSAccessorRefresher::OnNewParsedSnapshot(Ydb::Table::ExecuteQueryResult&& qResult, NFetcher::ISnapshot::TPtr snapshot) { *ProposedProto.mutable_result_sets() = std::move(*qResult.mutable_result_sets()); - auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(ProposedProto, RequestedActuality); - if (!parsedSnapshot) { - ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot parse current snapshot"; - } - - if (!!parsedSnapshot && CurrentSelection.SerializeAsString() != ProposedProto.SerializeAsString()) { + if (CurrentSelection.SerializeAsString() != ProposedProto.SerializeAsString()) { ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "New refresher data: " << ProposedProto.DebugString(); - SnapshotConstructor->EnrichSnapshotData(parsedSnapshot, InternalController); + SnapshotConstructor->EnrichSnapshotData(snapshot, InternalController); } else { - CurrentSnapshot->SetActuality(RequestedActuality); + CurrentSnapshot->SetActuality(GetRequestedActuality()); OnSnapshotRefresh(); Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); } } -void TDSAccessorRefresher::Handle(TEvEnrichSnapshotResult::TPtr& ev) { - RequestedActuality = TInstant::Zero(); +void TDSAccessorRefresher::OnIncorrectSnapshotFromYQL(const TString& errorMessage) { + TBase::OnIncorrectSnapshotFromYQL(errorMessage); Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); - CurrentSnapshot = ev->Get()->GetEnrichedSnapshot(); - *CurrentSelection.mutable_result_sets() = std::move(*ProposedProto.mutable_result_sets()); - OnSnapshotModified(); - OnSnapshotRefresh(); } -void TDSAccessorRefresher::Handle(TEvEnrichSnapshotProblem::TPtr& ev) { - RequestedActuality = TInstant::Zero(); +void TDSAccessorRefresher::OnSnapshotEnrichingError(const TString& errorMessage) { + TBase::OnSnapshotEnrichingError(errorMessage); Schedule(Config.GetRefreshPeriod(), new TEvRefresh()); - ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "enrich problem: " << ev->Get()->GetErrorText(); } void TDSAccessorRefresher::Handle(TEvRefresh::TPtr& /*ev*/) { - TStringBuilder sb; - RequestedActuality = TInstant::Now(); - auto& managers = SnapshotConstructor->GetManagers(); - Y_VERIFY(managers.size()); - for (auto&& i : managers) { - sb << "SELECT * FROM `" + EscapeC(i->GetStorageTablePath()) + "`;"; - } - Register(new NRequest::TYQLQuerySessionedActor(sb, NACLib::TSystemUsers::Metadata(), Config.GetRequestConfig(), InternalController)); -} - -TDSAccessorRefresher::TDSAccessorRefresher(const TConfig& config, NFetcher::ISnapshotsFetcher::TPtr snapshotConstructor) - : SnapshotConstructor(snapshotConstructor) - , Config(config) -{ - -} - -void TDSAccessorRefresher::Bootstrap() { - RegisterState(); - InternalController = std::make_shared<TRefreshInternalController>(SelfId()); - Sender<TEvRefresh>().SendTo(SelfId()); + TBase::StartSnapshotsFetching(); } } diff --git a/ydb/services/metadata/ds_table/accessor_refresh.h b/ydb/services/metadata/ds_table/accessor_refresh.h index da76f6dd81..f7600af28a 100644 --- a/ydb/services/metadata/ds_table/accessor_refresh.h +++ b/ydb/services/metadata/ds_table/accessor_refresh.h @@ -1,110 +1,47 @@ #pragma once -#include <ydb/public/api/protos/ydb_value.pb.h> -#include <ydb/services/metadata/abstract/common.h> -#include <ydb/services/metadata/initializer/accessor_init.h> -#include <ydb/services/metadata/request/request_actor.h> -#include <library/cpp/actors/core/hfunc.h> +#include "accessor_snapshot_base.h" namespace NKikimr::NMetadata::NProvider { -class TDSAccessorRefresher; - -class TEvRefresh: public NActors::TEventLocal<TEvRefresh, EEvents::EvRefresh> { -public: -}; - -class TEvYQLResponse: public NActors::TEventLocal<TEvYQLResponse, EEvents::EvYQLResponse> { -private: - YDB_READONLY_DEF(NRequest::TDialogYQLRequest::TResponse, Response); -public: - TEvYQLResponse(const NRequest::TDialogYQLRequest::TResponse& r) - : Response(r) - { - - } -}; - -class TEvEnrichSnapshotResult: public NActors::TEventLocal<TEvEnrichSnapshotResult, EEvents::EvEnrichSnapshotResult> { -private: - YDB_READONLY_DEF(NFetcher::ISnapshot::TPtr, EnrichedSnapshot); -public: - TEvEnrichSnapshotResult(NFetcher::ISnapshot::TPtr snapshot) - : EnrichedSnapshot(snapshot) { - - } -}; - -class TEvEnrichSnapshotProblem: public NActors::TEventLocal<TEvEnrichSnapshotProblem, EEvents::EvEnrichSnapshotProblem> { -private: - YDB_READONLY_DEF(TString, ErrorText); -public: - TEvEnrichSnapshotProblem(const TString& errorText) - : ErrorText(errorText) { - - } -}; - -class TRefreshInternalController: public NFetcher::ISnapshotAcceptorController, public NRequest::IQueryOutput { +class TDSAccessorRefresher: public TDSAccessorBase { private: - const TActorIdentity ActorId; -public: - TRefreshInternalController(const TActorIdentity& actorId) - : ActorId(actorId) { + using TBase = TDSAccessorBase; + YDB_READONLY_DEF(NFetcher::ISnapshot::TPtr, CurrentSnapshot); + YDB_READONLY_DEF(Ydb::Table::ExecuteQueryResult, CurrentSelection); + Ydb::Table::ExecuteQueryResult ProposedProto; + const TConfig Config; - } + void Handle(TEvRefresh::TPtr& ev); - virtual void EnrichProblem(const TString& errorMessage) override { - ActorId.Send(ActorId, new TEvEnrichSnapshotProblem(errorMessage)); - } + virtual void OnNewEnrichedSnapshot(NFetcher::ISnapshot::TPtr snapshot) override final; - virtual void Enriched(NFetcher::ISnapshot::TPtr enrichedSnapshot) override { - ActorId.Send(ActorId, new TEvEnrichSnapshotResult(enrichedSnapshot)); - } + virtual void OnNewParsedSnapshot(Ydb::Table::ExecuteQueryResult&& qResult, NFetcher::ISnapshot::TPtr snapshot) override final; - virtual void OnReply(const NRequest::TDialogYQLRequest::TResponse& response) override { - ActorId.Send(ActorId, new TEvYQLResponse(response)); - } -}; + virtual void OnIncorrectSnapshotFromYQL(const TString& errorMessage) override final; -class TDSAccessorRefresher: public NActors::TActorBootstrapped<TDSAccessorRefresher> { -private: - using TBase = NActors::TActorBootstrapped<TDSAccessorRefresher>; - NFetcher::ISnapshotsFetcher::TPtr SnapshotConstructor; - std::shared_ptr<TRefreshInternalController> InternalController; - YDB_READONLY_DEF(NFetcher::ISnapshot::TPtr, CurrentSnapshot); - YDB_READONLY_DEF(Ydb::Table::ExecuteQueryResult, CurrentSelection); - Ydb::Table::ExecuteQueryResult ProposedProto; - TInstant RequestedActuality = TInstant::Zero(); - const TConfig Config; + virtual void OnSnapshotEnrichingError(const TString& errorMessage) override final; protected: - virtual void RegisterState() { - Become(&TDSAccessorRefresher::StateMain); - } + virtual void OnBootstrap() override; + virtual void OnSnapshotModified() = 0; + virtual void OnSnapshotRefresh() = 0; bool IsReady() const { return !!CurrentSnapshot; } - virtual void OnSnapshotModified() = 0; - virtual void OnSnapshotRefresh() = 0; public: - void Bootstrap(); - - STATEFN(StateMain) { + STFUNC(StateMain) { switch (ev->GetTypeRewrite()) { - hFunc(TEvYQLResponse, Handle); hFunc(TEvRefresh, Handle); - hFunc(TEvEnrichSnapshotResult, Handle); - hFunc(TEvEnrichSnapshotProblem, Handle); default: - break; + TBase::StateMain(ev, ctx); } } + TDSAccessorRefresher(const TConfig& config, NFetcher::ISnapshotsFetcher::TPtr snapshotConstructor) + : TBase(config.GetRequestConfig(), snapshotConstructor) + , Config(config) + { - TDSAccessorRefresher(const TConfig& config, NFetcher::ISnapshotsFetcher::TPtr snapshotConstructor); + } - void Handle(TEvEnrichSnapshotResult::TPtr& ev); - void Handle(TEvEnrichSnapshotProblem::TPtr& ev); - void Handle(TEvYQLResponse::TPtr& ev); - void Handle(TEvRefresh::TPtr& ev); }; } diff --git a/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp b/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp new file mode 100644 index 0000000000..ce516994d4 --- /dev/null +++ b/ydb/services/metadata/ds_table/accessor_snapshot_base.cpp @@ -0,0 +1,69 @@ +#include "accessor_snapshot_base.h" +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> + +#include <library/cpp/actors/core/log.h> + +#include <util/string/escape.h> + +namespace NKikimr::NMetadata::NProvider { + +void TDSAccessorBase::OnNewParsedSnapshot(Ydb::Table::ExecuteQueryResult&& /*qResult*/, NFetcher::ISnapshot::TPtr snapshot) { + SnapshotConstructor->EnrichSnapshotData(snapshot, InternalController); +} + +void TDSAccessorBase::OnIncorrectSnapshotFromYQL(const TString& errorMessage) { + ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot parse current snapshot: " << errorMessage; +} + +void TDSAccessorBase::Handle(TEvYQLResponse::TPtr& ev) { + auto& currentFullReply = ev->Get()->GetResponse(); + Ydb::Table::ExecuteQueryResult qResult; + currentFullReply.operation().result().UnpackTo(&qResult); + Y_VERIFY((size_t)qResult.result_sets().size() == SnapshotConstructor->GetManagers().size()); + auto parsedSnapshot = SnapshotConstructor->ParseSnapshot(qResult, RequestedActuality); + if (!parsedSnapshot) { + OnIncorrectSnapshotFromYQL("snapshot is null after parsing"); + } else { + OnNewParsedSnapshot(std::move(qResult), parsedSnapshot); + } +} + +void TDSAccessorBase::Handle(TEvEnrichSnapshotResult::TPtr& ev) { + RequestedActuality = TInstant::Zero(); + OnNewEnrichedSnapshot(ev->Get()->GetEnrichedSnapshot()); +} + +void TDSAccessorBase::OnSnapshotEnrichingError(const TString& errorMessage) { + ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot enrich current snapshot: " << errorMessage; +} + +void TDSAccessorBase::Handle(TEvEnrichSnapshotProblem::TPtr& ev) { + RequestedActuality = TInstant::Zero(); + OnSnapshotEnrichingError(ev->Get()->GetErrorText()); +} + +void TDSAccessorBase::StartSnapshotsFetching() { + TStringBuilder sb; + RequestedActuality = TInstant::Now(); + auto& managers = SnapshotConstructor->GetManagers(); + Y_VERIFY(managers.size()); + for (auto&& i : managers) { + sb << "SELECT * FROM `" + EscapeC(i->GetStorageTablePath()) + "`;"; + } + Register(new NRequest::TYQLQuerySessionedActor(sb, NACLib::TSystemUsers::Metadata(), Config, InternalController)); +} + +TDSAccessorBase::TDSAccessorBase(const NRequest::TConfig& config, NFetcher::ISnapshotsFetcher::TPtr snapshotConstructor) + : Config(config) + , SnapshotConstructor(snapshotConstructor) +{ + +} + +void TDSAccessorBase::Bootstrap() { + InternalController = std::make_shared<TRefreshInternalController>(SelfId()); + OnBootstrap(); +} + +} diff --git a/ydb/services/metadata/ds_table/accessor_snapshot_base.h b/ydb/services/metadata/ds_table/accessor_snapshot_base.h new file mode 100644 index 0000000000..d74c1ddfe2 --- /dev/null +++ b/ydb/services/metadata/ds_table/accessor_snapshot_base.h @@ -0,0 +1,104 @@ +#pragma once +#include <ydb/public/api/protos/ydb_value.pb.h> +#include <ydb/services/metadata/abstract/common.h> +#include <ydb/services/metadata/initializer/accessor_init.h> +#include <ydb/services/metadata/request/request_actor.h> +#include <library/cpp/actors/core/hfunc.h> + +namespace NKikimr::NMetadata::NProvider { + +class TEvRefresh: public NActors::TEventLocal<TEvRefresh, EEvents::EvRefresh> { +public: +}; + +class TEvYQLResponse: public NActors::TEventLocal<TEvYQLResponse, EEvents::EvYQLResponse> { +private: + YDB_READONLY_DEF(NRequest::TDialogYQLRequest::TResponse, Response); +public: + TEvYQLResponse(const NRequest::TDialogYQLRequest::TResponse& r) + : Response(r) + { + + } +}; + +class TEvEnrichSnapshotResult: public NActors::TEventLocal<TEvEnrichSnapshotResult, EEvents::EvEnrichSnapshotResult> { +private: + YDB_READONLY_DEF(NFetcher::ISnapshot::TPtr, EnrichedSnapshot); +public: + TEvEnrichSnapshotResult(NFetcher::ISnapshot::TPtr snapshot) + : EnrichedSnapshot(snapshot) { + + } +}; + +class TEvEnrichSnapshotProblem: public NActors::TEventLocal<TEvEnrichSnapshotProblem, EEvents::EvEnrichSnapshotProblem> { +private: + YDB_READONLY_DEF(TString, ErrorText); +public: + TEvEnrichSnapshotProblem(const TString& errorText) + : ErrorText(errorText) { + + } +}; + +class TRefreshInternalController: public NFetcher::ISnapshotAcceptorController, public NRequest::IQueryOutput { +private: + const TActorIdentity ActorId; +public: + TRefreshInternalController(const TActorIdentity& actorId) + : ActorId(actorId) { + + } + + virtual void OnSnapshotEnrichError(const TString& errorMessage) override { + ActorId.Send(ActorId, new TEvEnrichSnapshotProblem(errorMessage)); + } + + virtual void OnSnapshotEnriched(NFetcher::ISnapshot::TPtr enrichedSnapshot) override { + ActorId.Send(ActorId, new TEvEnrichSnapshotResult(enrichedSnapshot)); + } + + virtual void OnYQLQueryReply(const NRequest::TDialogYQLRequest::TResponse& response) override { + ActorId.Send(ActorId, new TEvYQLResponse(response)); + } +}; + +class TDSAccessorBase: public NActors::TActorBootstrapped<TDSAccessorBase> { +private: + using TBase = NActors::TActorBootstrapped<TDSAccessorBase>; + YDB_READONLY(TInstant, RequestedActuality, TInstant::Zero()); + const NRequest::TConfig Config; +protected: + std::shared_ptr<TRefreshInternalController> InternalController; + NFetcher::ISnapshotsFetcher::TPtr SnapshotConstructor; + + virtual void OnBootstrap() { + Become(&TDSAccessorBase::StateMain); + } + virtual void OnNewEnrichedSnapshot(NFetcher::ISnapshot::TPtr snapshot) = 0; + virtual void OnNewParsedSnapshot(Ydb::Table::ExecuteQueryResult&& qResult, NFetcher::ISnapshot::TPtr snapshot); + virtual void OnIncorrectSnapshotFromYQL(const TString& errorMessage); + virtual void OnSnapshotEnrichingError(const TString& errorMessage); + void StartSnapshotsFetching(); + + void Handle(TEvEnrichSnapshotResult::TPtr& ev); + void Handle(TEvEnrichSnapshotProblem::TPtr& ev); + void Handle(TEvYQLResponse::TPtr& ev); +public: + void Bootstrap(); + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvYQLResponse, Handle); + hFunc(TEvEnrichSnapshotResult, Handle); + hFunc(TEvEnrichSnapshotProblem, Handle); + default: + break; + } + } + + TDSAccessorBase(const NRequest::TConfig& config, NFetcher::ISnapshotsFetcher::TPtr snapshotConstructor); +}; + +} diff --git a/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp b/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp new file mode 100644 index 0000000000..5a9e424909 --- /dev/null +++ b/ydb/services/metadata/ds_table/accessor_snapshot_simple.cpp @@ -0,0 +1,48 @@ +#include "accessor_snapshot_simple.h" + +namespace NKikimr::NMetadata::NProvider { + +void TDSAccessorSimple::OnNewEnrichedSnapshot(NFetcher::ISnapshot::TPtr snapshot) { + auto g = PassAwayGuard(); + OutputController->OnSnapshotConstructionResult(snapshot); +} + +void TDSAccessorSimple::OnIncorrectSnapshotFromYQL(const TString& errorMessage) { + TBase::OnIncorrectSnapshotFromYQL(errorMessage); + auto g = PassAwayGuard(); + OutputController->OnSnapshotConstructionError(errorMessage); +} + +void TDSAccessorSimple::OnSnapshotEnrichingError(const TString& errorMessage) { + TBase::OnSnapshotEnrichingError(errorMessage); + auto g = PassAwayGuard(); + OutputController->OnSnapshotConstructionError(errorMessage); +} + +void TDSAccessorSimple::OnBootstrap() { + Become(&TDSAccessorSimple::StateMain); + InputController = std::make_shared<TInputController>(SelfId()); + for (auto&& i : SnapshotConstructor->GetManagers()) { + PathesInCheck.emplace(i->GetStorageTablePath()); + Register(new TTableExistsActor(InputController, i->GetStorageTablePath(), TDuration::Seconds(2))); + } +} + +void TDSAccessorSimple::Handle(TTableExistsActor::TEvController::TEvError::TPtr& ev) { + auto g = PassAwayGuard(); + OutputController->OnSnapshotConstructionError(ev->Get()->GetErrorMessage()); +} + +void TDSAccessorSimple::Handle(TTableExistsActor::TEvController::TEvResult::TPtr& ev) { + if (!ev->Get()->IsTableExists()) { + OutputController->OnSnapshotConstructionTableAbsent(ev->Get()->GetTablePath()); + PassAway(); + return; + } + Y_VERIFY(PathesInCheck.erase(ev->Get()->GetTablePath()) == 1); + if (PathesInCheck.empty()) { + TBase::StartSnapshotsFetching(); + } +} + +} diff --git a/ydb/services/metadata/ds_table/accessor_snapshot_simple.h b/ydb/services/metadata/ds_table/accessor_snapshot_simple.h new file mode 100644 index 0000000000..b97b01bdb3 --- /dev/null +++ b/ydb/services/metadata/ds_table/accessor_snapshot_simple.h @@ -0,0 +1,112 @@ +#pragma once +#include "accessor_snapshot_base.h" +#include "table_exists.h" + +namespace NKikimr::NMetadata::NProvider { + +class ISnapshotConstructorController { +public: + using TPtr = std::shared_ptr<ISnapshotConstructorController>; + virtual ~ISnapshotConstructorController() = default; + virtual void OnSnapshotConstructionResult(NFetcher::ISnapshot::TPtr snapshot) const = 0; + virtual void OnSnapshotConstructionError(const TString& errorMessage) const = 0; + virtual void OnSnapshotConstructionTableAbsent(const TString& path) const = 0; +}; + +class TDSAccessorSimple: public TDSAccessorBase { +private: + using TBase = TDSAccessorBase; + std::set<TString> PathesInCheck; + + class TInputController: public TTableExistsActor::TEvController { + public: + TInputController(const TActorIdentity& actorId) + : TTableExistsActor::TEvController(actorId) { + + } + }; + + std::shared_ptr<TInputController> InputController; + ISnapshotConstructorController::TPtr OutputController; +protected: + virtual void OnNewEnrichedSnapshot(NFetcher::ISnapshot::TPtr snapshot) final; + virtual void OnIncorrectSnapshotFromYQL(const TString& errorMessage) final; + virtual void OnSnapshotEnrichingError(const TString& errorMessage) final; + + virtual void OnBootstrap() override; + + void Handle(TTableExistsActor::TEvController::TEvError::TPtr& ev); + + void Handle(TTableExistsActor::TEvController::TEvResult::TPtr& ev); + +public: + class TEvController: public ISnapshotConstructorController { + private: + const TActorIdentity ActorId; + public: + class TEvResult: public TEventLocal<TEvResult, EEvents::EvAccessorSimpleResult> { + private: + YDB_READONLY_DEF(NFetcher::ISnapshot::TPtr, Result); + public: + TEvResult(NFetcher::ISnapshot::TPtr snapshot) + : Result(snapshot) { + + } + }; + + class TEvError: public TEventLocal<TEvError, EEvents::EvAccessorSimpleError> { + private: + YDB_READONLY_DEF(TString, ErrorMessage); + public: + TEvError(const TString& errorMessage) + : ErrorMessage(errorMessage) { + + } + }; + + class TEvTableAbsent: public TEventLocal<TEvTableAbsent, EEvents::EvAccessorSimpleTableAbsent> { + private: + YDB_READONLY_DEF(TString, Path); + public: + TEvTableAbsent(const TString& path) + : Path(path) { + + } + }; + + virtual void OnSnapshotConstructionResult(NFetcher::ISnapshot::TPtr snapshot) const override { + ActorId.Send(ActorId, new TEvResult(snapshot)); + } + + virtual void OnSnapshotConstructionError(const TString& errorMessage) const override { + ActorId.Send(ActorId, new TEvError(errorMessage)); + } + + virtual void OnSnapshotConstructionTableAbsent(const TString& path) const override { + ActorId.Send(ActorId, new TEvTableAbsent(path)); + } + + TEvController(const TActorIdentity& actorId) + : ActorId(actorId) + { + + } + }; + + STFUNC(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TTableExistsActor::TEvController::TEvError, Handle); + hFunc(TTableExistsActor::TEvController::TEvResult, Handle); + default: + TBase::StateMain(ev, ctx); + } + } + TDSAccessorSimple(const NRequest::TConfig& config, ISnapshotConstructorController::TPtr outputController, + NFetcher::ISnapshotsFetcher::TPtr snapshotConstructor) + : TBase(config, snapshotConstructor) + , OutputController(outputController){ + + } +}; + +} diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.cpp b/ydb/services/metadata/ds_table/accessor_subscribe.cpp index 47b6f7d217..b34b6324bd 100644 --- a/ydb/services/metadata/ds_table/accessor_subscribe.cpp +++ b/ydb/services/metadata/ds_table/accessor_subscribe.cpp @@ -54,4 +54,9 @@ void TDSAccessorNotifier::OnSnapshotRefresh() { } } +void TDSAccessorNotifier::OnBootstrap() { + TBase::OnBootstrap(); + Become(&TDSAccessorNotifier::StateMain); +} + } diff --git a/ydb/services/metadata/ds_table/accessor_subscribe.h b/ydb/services/metadata/ds_table/accessor_subscribe.h index 809d543631..e57b5e398a 100644 --- a/ydb/services/metadata/ds_table/accessor_subscribe.h +++ b/ydb/services/metadata/ds_table/accessor_subscribe.h @@ -41,14 +41,10 @@ private: std::set<NActors::TActorId> Subscribed; std::map<TInstant, std::set<NActors::TActorId>> Asked; protected: - virtual void RegisterState() override { - Become(&TDSAccessorNotifier::StateMain); - } + virtual void OnBootstrap() override; virtual void OnSnapshotModified() override; virtual void OnSnapshotRefresh() override; public: - using TBase::Handle; - TDSAccessorNotifier(const TConfig& config, NFetcher::ISnapshotsFetcher::TPtr sParser) : TBase(config, sParser) { } diff --git a/ydb/services/metadata/ds_table/scheme_describe.cpp b/ydb/services/metadata/ds_table/scheme_describe.cpp index 30ed84e1ec..83f824b1ea 100644 --- a/ydb/services/metadata/ds_table/scheme_describe.cpp +++ b/ydb/services/metadata/ds_table/scheme_describe.cpp @@ -11,20 +11,22 @@ namespace NKikimr::NMetadata::NProvider { void TSchemeDescriptionActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { auto* info = ev->Get(); const auto& request = info->Request; + auto g = PassAwayGuard(); if (request->ResultSet.empty()) { Controller->OnDescriptionFailed("navigation problems for path " + Path, RequestId); - PassAway(); return; } if (request->ResultSet.size() != 1) { Controller->OnDescriptionFailed("cannot resolve database path " + Path, RequestId); - PassAway(); return; } auto& entity = request->ResultSet.front(); - auto g = PassAwayGuard(); - Controller->OnDescriptionSuccess(std::move(entity.Columns), RequestId); + if (entity.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { + Controller->OnDescriptionSuccess(std::move(entity.Columns), RequestId); + } else { + Controller->OnDescriptionFailed("incorrect path status: " + ::ToString(entity.Status), RequestId); + } } void TSchemeDescriptionActor::Bootstrap() { diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp index 13ee11730a..f327a4cd84 100644 --- a/ydb/services/metadata/ds_table/service.cpp +++ b/ydb/services/metadata/ds_table/service.cpp @@ -16,20 +16,21 @@ IActor* CreateService(const TConfig& config) { void TService::PrepareManagers(std::vector<IClassBehaviour::TPtr> managers, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender) { TManagersId id(managers); - const bool isInitialization = (managers.size() == 1) && managers.front()->GetTypeId() == NInitializer::TDBInitialization::GetTypeId(); - if (ActiveFlag || (PreparationFlag && isInitialization)) { - for (auto&& manager : managers) { - Y_VERIFY(!RegisteredManagers.contains(manager->GetTypeId())); - if (!ManagersInRegistration.contains(manager->GetTypeId()) && !RegisteredManagers.contains(manager->GetTypeId())) { - ManagersInRegistration.emplace(manager->GetTypeId(), manager); - Register(new NInitializer::TDSAccessorInitialized(Config.GetRequestConfig(), - manager->GetTypeId(), manager->GetInitializer(), InternalController, InitializationSnapshot)); + if (InitializationSnapshot) { + const bool isInitialization = (managers.size() == 1) && managers.front()->GetTypeId() == NInitializer::TDBInitialization::GetTypeId(); + if (ActiveFlag || (PreparationFlag && isInitialization)) { + for (auto&& manager : managers) { + Y_VERIFY(!RegisteredManagers.contains(manager->GetTypeId())); + if (!ManagersInRegistration.contains(manager->GetTypeId()) && !RegisteredManagers.contains(manager->GetTypeId())) { + ManagersInRegistration.emplace(manager->GetTypeId(), manager); + Register(new NInitializer::TDSAccessorInitialized(Config.GetRequestConfig(), + manager->GetTypeId(), manager->GetInitializer(), InternalController, InitializationSnapshot)); + } } + } else if (!PreparationFlag) { + PreparationFlag = true; + Send(SelfId(), new TEvSubscribeExternal(InitializationFetcher)); } - } else if (!PreparationFlag) { - PreparationFlag = true; - InitializationFetcher = std::make_shared<NInitializer::TFetcher>(); - Send(SelfId(), new TEvSubscribeExternal(InitializationFetcher)); } EventsWaiting[id].emplace_back(ev, sender); } @@ -107,26 +108,31 @@ void TService::Handle(NInitializer::TEvInitializationFinished::TPtr& ev) { void TService::Handle(TEvSubscribeExternal::TPtr& ev) { const TActorId senderId = ev->Sender; - ProcessEventWithFetcher(ev, [this, senderId](const TActorId& actorId) { + ProcessEventWithFetcher(*ev, ev->Get()->GetFetcher(), [this, senderId](const TActorId& actorId) { Send<TEvSubscribe>(actorId, senderId); }); } void TService::Handle(TEvAskSnapshot::TPtr& ev) { const TActorId senderId = ev->Sender; - ProcessEventWithFetcher(ev, [this, senderId](const TActorId& actorId) { + ProcessEventWithFetcher(*ev, ev->Get()->GetFetcher(), [this, senderId](const TActorId& actorId) { Send<TEvAsk>(actorId, senderId); }); } void TService::Handle(TEvObjectsOperation::TPtr& ev) { - auto it = RegisteredManagers.find(ev->Get()->GetCommand()->GetManager()->GetTypeId()); - if (it != RegisteredManagers.end()) { - ev->Get()->GetCommand()->SetManager(it->second); + if (ev->Get()->GetCommand()->GetManager()->GetTypeId() == NInitializer::TDBInitialization::GetTypeId()) { + ev->Get()->GetCommand()->SetManager(NInitializer::TDBInitialization::GetBehaviour()); ev->Get()->GetCommand()->Execute(); } else { - auto m = ev->Get()->GetCommand()->GetManager(); - PrepareManagers({ m }, ev->ReleaseBase(), ev->Sender); + auto it = RegisteredManagers.find(ev->Get()->GetCommand()->GetManager()->GetTypeId()); + if (it != RegisteredManagers.end()) { + ev->Get()->GetCommand()->SetManager(it->second); + ev->Get()->GetCommand()->Execute(); + } else { + auto m = ev->Get()->GetCommand()->GetManager(); + PrepareManagers({ m }, ev->ReleaseBase(), ev->Sender); + } } } @@ -143,17 +149,43 @@ void TService::Handle(TEvRefreshSubscriberData::TPtr& ev) { Y_VERIFY(InitializationSnapshot); if (!ActiveFlag) { ActiveFlag = true; - for (auto&& i : EventsWaiting) { - i.second.front().Resend(SelfId()); - i.second.pop_front(); - } + Activate(); + } +} + +void TService::Activate() { + for (auto&& i : EventsWaiting) { + i.second.front().Resend(SelfId()); + i.second.pop_front(); } } +void TService::Handle(TDSAccessorSimple::TEvController::TEvResult::TPtr& ev) { + InitializationSnapshot = dynamic_pointer_cast<NInitializer::TSnapshot>(ev->Get()->GetResult()); + Y_VERIFY(InitializationSnapshot); + Activate(); +} + +void TService::Handle(TEvStartMetadataService::TPtr& /*ev*/) { + Register(new TDSAccessorSimple(Config.GetRequestConfig(), InternalController, InitializationFetcher)); +} + +void TService::Handle(TDSAccessorSimple::TEvController::TEvError::TPtr& ev) { + ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot receive initializer snapshot: " << ev->Get()->GetErrorMessage() << Endl; + Schedule(TDuration::Seconds(1), new TEvStartMetadataService()); +} + +void TService::Handle(TDSAccessorSimple::TEvController::TEvTableAbsent::TPtr& /*ev*/) { + InitializationSnapshot = std::make_shared<NInitializer::TSnapshot>(TInstant::Zero()); + Activate(); +} + void TService::Bootstrap(const NActors::TActorContext& /*ctx*/) { ALS_INFO(NKikimrServices::METADATA_PROVIDER) << "metadata service started" << Endl; Become(&TService::StateMain); InternalController = std::make_shared<TServiceInternalController>(SelfId()); + InitializationFetcher = std::make_shared<NInitializer::TFetcher>(); + Sender<TEvStartMetadataService>().SendTo(SelfId()); } void TServiceInternalController::InitializationFinished(const TString& id) const { diff --git a/ydb/services/metadata/ds_table/service.h b/ydb/services/metadata/ds_table/service.h index 35dd823e76..a22597d4a4 100644 --- a/ydb/services/metadata/ds_table/service.h +++ b/ydb/services/metadata/ds_table/service.h @@ -2,6 +2,7 @@ #include "accessor_subscribe.h" #include "config.h" #include "scheme_describe.h" +#include "accessor_snapshot_simple.h" #include <ydb/services/metadata/service.h> #include <ydb/services/metadata/initializer/common.h> @@ -15,6 +16,9 @@ namespace NKikimr::NMetadata::NProvider { +class TEvStartMetadataService: public TEventLocal<TEvStartMetadataService, EEvents::EvStartMetadataService> { +}; + class TEvTableDescriptionFailed: public TEventLocal<TEvTableDescriptionFailed, EEvents::EvTableDescriptionFailed> { private: YDB_READONLY_DEF(TString, ErrorMessage); @@ -44,12 +48,14 @@ public: } }; -class TServiceInternalController: public NInitializer::IInitializerOutput, public ISchemeDescribeController { +class TServiceInternalController: public NInitializer::IInitializerOutput, + public TDSAccessorSimple::TEvController, public ISchemeDescribeController { private: const NActors::TActorIdentity ActorId; public: TServiceInternalController(const NActors::TActorIdentity& actorId) - : ActorId(actorId) + : TDSAccessorSimple::TEvController(actorId) + , ActorId(actorId) { } @@ -120,10 +126,6 @@ public: } }; -class TWaitingWatcher { -private: -}; - class TService: public NActors::TActorBootstrapped<TService> { private: using TBase = NActors::TActor<TService>; @@ -139,46 +141,57 @@ private: std::shared_ptr<TServiceInternalController> InternalController; const TConfig Config; - void Handle(NInitializer::TEvInitializationFinished::TPtr& ev); + void Handle(TEvStartMetadataService::TPtr& ev); + void Handle(NInitializer::TEvInitializationFinished::TPtr & ev); void Handle(TEvRefreshSubscriberData::TPtr& ev); + void Handle(TEvAskSnapshot::TPtr& ev); void Handle(TEvPrepareManager::TPtr& ev); void Handle(TEvSubscribeExternal::TPtr& ev); void Handle(TEvUnsubscribeExternal::TPtr& ev); void Handle(TEvObjectsOperation::TPtr& ev); + void Handle(TEvTableDescriptionSuccess::TPtr& ev); void Handle(TEvTableDescriptionFailed::TPtr& ev); + void Handle(TDSAccessorSimple::TEvController::TEvResult::TPtr& ev); + void Handle(TDSAccessorSimple::TEvController::TEvError::TPtr& ev); + void Handle(TDSAccessorSimple::TEvController::TEvTableAbsent::TPtr& ev); + void PrepareManagers(std::vector<IClassBehaviour::TPtr> manager, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender); void InitializationFinished(const TString& initId); - void RequestTableDescription(const TString& path) const; + void Activate(); - template <class TEventPtr, class TAction> - void ProcessEventWithFetcher(TEventPtr& ev, TAction action) { + template <class TAction> + void ProcessEventWithFetcher(IEventHandle& ev, NFetcher::ISnapshotsFetcher::TPtr fetcher, TAction action) { std::vector<IClassBehaviour::TPtr> needManagers; - for (auto&& i : ev->Get()->GetFetcher()->GetManagers()) { + for (auto&& i : fetcher->GetManagers()) { if (!RegisteredManagers.contains(i->GetTypeId())) { needManagers.emplace_back(i); } } if (needManagers.empty()) { - auto it = Accessors.find(ev->Get()->GetFetcher()->GetComponentId()); + auto it = Accessors.find(fetcher->GetComponentId()); if (it == Accessors.end()) { - THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetFetcher()); - it = Accessors.emplace(ev->Get()->GetFetcher()->GetComponentId(), Register(actor.Release())).first; + THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, fetcher); + it = Accessors.emplace(fetcher->GetComponentId(), Register(actor.Release())).first; } action(it->second); } else { - PrepareManagers(needManagers, ev->ReleaseBase(), ev->Sender); + PrepareManagers(needManagers, ev.ReleaseBase(), ev.Sender); } } public: - void Bootstrap(const NActors::TActorContext& /*ctx*/); + void Bootstrap(const NActors::TActorContext& ctx); STATEFN(StateMain) { switch (ev->GetTypeRewrite()) { + hFunc(TDSAccessorSimple::TEvController::TEvResult, Handle); + hFunc(TDSAccessorSimple::TEvController::TEvError, Handle); + hFunc(TDSAccessorSimple::TEvController::TEvTableAbsent, Handle); + hFunc(TEvObjectsOperation, Handle); hFunc(TEvRefreshSubscriberData, Handle); hFunc(TEvAskSnapshot, Handle); @@ -188,6 +201,8 @@ public: hFunc(TEvTableDescriptionSuccess, Handle); hFunc(TEvTableDescriptionFailed, Handle); + + hFunc(TEvStartMetadataService, Handle); hFunc(NInitializer::TEvInitializationFinished, Handle); default: diff --git a/ydb/services/metadata/ds_table/table_exists.cpp b/ydb/services/metadata/ds_table/table_exists.cpp new file mode 100644 index 0000000000..330dd722b1 --- /dev/null +++ b/ydb/services/metadata/ds_table/table_exists.cpp @@ -0,0 +1,60 @@ +#include "table_exists.h" + +#include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/core/protos/services.pb.h> +#include <ydb/core/ydb_convert/ydb_convert.h> + +namespace NKikimr::NMetadata::NProvider { + +void TTableExistsActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + auto* info = ev->Get(); + const auto& request = info->Request; + auto g = PassAwayGuard(); + + if (request->ResultSet.empty()) { + OutputController->OnPathExistsCheckFailed("navigation problems for path", Path); + return; + } + if (request->ResultSet.size() != 1) { + OutputController->OnPathExistsCheckFailed("cannot resolve database path", Path); + return; + } + auto& entity = request->ResultSet.front(); + if (entity.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::Ok) { + if (entity.Kind == NSchemeCache::TSchemeCacheNavigate::EKind::KindTable) { + OutputController->OnPathExistsCheckResult(true, Path); + } else { + OutputController->OnPathExistsCheckResult(false, Path); + } + } else if (entity.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotPath) { + OutputController->OnPathExistsCheckResult(false, Path); + } else if (entity.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::PathNotTable) { + OutputController->OnPathExistsCheckResult(false, Path); + } else if (entity.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown) { + OutputController->OnPathExistsCheckResult(false, Path); + } else { + OutputController->OnPathExistsCheckFailed("incorrect path status: " + ::ToString(entity.Status), Path); + } +} + +NKikimrServices::TActivity::EType TTableExistsActor::ActorActivityType() { + return NKikimrServices::TActivity::METADATA_SCHEME_DESCRIPTION_ACTOR; +} + +void TTableExistsActor::OnBootstrap() { + Become(&TTableExistsActor::StateMain); + + auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); + request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName); + auto& entry = request->ResultSet.emplace_back(); + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable; + entry.Path = NKikimr::SplitPath(Path); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); +} + +void TTableExistsActor::OnTimeout() { + OutputController->OnPathExistsCheckFailed("timeout", Path); +} + +} diff --git a/ydb/services/metadata/ds_table/table_exists.h b/ydb/services/metadata/ds_table/table_exists.h new file mode 100644 index 0000000000..6394ba848c --- /dev/null +++ b/ydb/services/metadata/ds_table/table_exists.h @@ -0,0 +1,86 @@ +#pragma once + +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/services/metadata/common/ss_dialog.h> + +namespace NKikimr::NMetadata::NProvider { + +class ITableExistsController { +public: + using TPtr = std::shared_ptr<ITableExistsController>; + virtual ~ITableExistsController() = default; + virtual void OnPathExistsCheckFailed(const TString& errorMessage, const TString& path) const = 0; + virtual void OnPathExistsCheckResult(const bool exists, const TString& path) const = 0; +}; + +class TTableExistsActor: public NInternal::TTimeoutActor<TTableExistsActor> { +private: + using TBase = NInternal::TTimeoutActor<TTableExistsActor>; + ITableExistsController::TPtr OutputController; + const TString Path; + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); + virtual void OnBootstrap() override; + virtual void OnTimeout() override; +public: + + class TEvController: public ITableExistsController { + private: + const TActorIdentity ActorId; + public: + class TEvError: public TEventLocal<TEvError, EvPathExistsCheckFailed> { + private: + YDB_READONLY_DEF(TString, ErrorMessage); + YDB_READONLY_DEF(TString, Path); + public: + TEvError(const TString& errorMessage, const TString& path) + : ErrorMessage(errorMessage) + , Path(path) + { + + } + }; + + class TEvResult: public TEventLocal<TEvResult, EvPathExistsCheckResult> { + private: + YDB_READONLY_FLAG(TableExists, false); + YDB_READONLY_DEF(TString, TablePath); + public: + TEvResult(const bool exists, const TString& path) + : TableExistsFlag(exists) + , TablePath(path) { + + } + }; + + virtual void OnPathExistsCheckFailed(const TString& errorMessage, const TString& path) const override { + ActorId.Send(ActorId, new TEvError(errorMessage, path)); + } + virtual void OnPathExistsCheckResult(const bool exists, const TString& path) const override { + ActorId.Send(ActorId, new TEvResult(exists, path)); + } + + TEvController(const TActorIdentity& actorId) + : ActorId(actorId) { + + } + }; + + static NKikimrServices::TActivity::EType ActorActivityType(); + + STFUNC(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + default: + TBase::StateMain(ev, ctx); + } + } + TTableExistsActor(ITableExistsController::TPtr controller, const TString& path, const TDuration d) + : TBase(d) + , OutputController(controller) + , Path(path) + { + + } +}; + +} diff --git a/ydb/services/metadata/initializer/behaviour.cpp b/ydb/services/metadata/initializer/behaviour.cpp index 4f3562af24..e106530587 100644 --- a/ydb/services/metadata/initializer/behaviour.cpp +++ b/ydb/services/metadata/initializer/behaviour.cpp @@ -20,7 +20,13 @@ IInitializationBehaviour::TPtr TDBObjectBehaviour::ConstructInitializer() const } std::shared_ptr<NKikimr::NMetadata::NModifications::IOperationsManager> TDBObjectBehaviour::ConstructOperationsManager() const { - return std::make_shared<TManager>(); + auto result = std::make_shared<TManager>(); + NModifications::TTableSchema schema; + schema.AddColumn(true, NInternal::TYDBColumn::Bytes(TDBInitialization::TDecoder::ComponentId)); + schema.AddColumn(true, NInternal::TYDBColumn::Bytes(TDBInitialization::TDecoder::ModificationId)); + schema.AddColumn(false, NInternal::TYDBColumn::UInt32(TDBInitialization::TDecoder::Instant)); + result->SetActualSchema(schema); + return result; } } diff --git a/ydb/services/metadata/initializer/ut/ut_init.cpp b/ydb/services/metadata/initializer/ut/ut_init.cpp index 5f0a9cdb65..16ea1a4be1 100644 --- a/ydb/services/metadata/initializer/ut/ut_init.cpp +++ b/ydb/services/metadata/initializer/ut/ut_init.cpp @@ -134,7 +134,7 @@ Y_UNIT_TEST_SUITE(Initializer) { runtime.Register(emulator); const TInstant start = Now(); - while (Now() - start < TDuration::Seconds(15) && !emulator->IsInitialized()) { + while (Now() - start < TDuration::Seconds(35) && !emulator->IsInitialized()) { runtime.SimulateSleep(TDuration::Seconds(1)); } UNIT_ASSERT(emulator->IsInitialized()); @@ -142,6 +142,9 @@ Y_UNIT_TEST_SUITE(Initializer) { lHelper.StartDataRequest("SELECT * FROM `/Root/.metadata/test`"); lHelper.StartSchemaRequest("DROP TABLE `/Root/.metadata/test`", false); + lHelper.StartDataRequest("SELECT * FROM `/Root/.metadata/initialization/migrations`"); + lHelper.StartSchemaRequest("DELETE FROM `/Root/.metadata/initialization/migrations`", false); + lHelper.StartSchemaRequest("DROP TABLE `/Root/.metadata/initialization/migrations`", false); } } } diff --git a/ydb/services/metadata/manager/abstract.h b/ydb/services/metadata/manager/abstract.h index 5f2c156d23..c5130d9dcf 100644 --- a/ydb/services/metadata/manager/abstract.h +++ b/ydb/services/metadata/manager/abstract.h @@ -116,9 +116,11 @@ private: YDB_READONLY_DEF(std::vector<Ydb::Column>, PKColumns); YDB_READONLY_DEF(std::vector<TString>, PKColumnIds); - TTableSchema& AddColumn(const bool primary, const Ydb::Column& info) noexcept; public: + TTableSchema() = default; TTableSchema(const THashMap<ui32, TSysTables::TTableColumnInfo>& description); + + TTableSchema& AddColumn(const bool primary, const Ydb::Column& info) noexcept; }; class IOperationsManager { diff --git a/ydb/services/metadata/manager/alter_impl.h b/ydb/services/metadata/manager/alter_impl.h index a6098325db..ffc3564601 100644 --- a/ydb/services/metadata/manager/alter_impl.h +++ b/ydb/services/metadata/manager/alter_impl.h @@ -126,7 +126,7 @@ public: return TBase::PassAway(); } - TBase::Register(new NRequest::TYDBRequest<NRequest::TDialogCreateSession>( + TBase::Register(new NRequest::TYDBCallbackRequest<NRequest::TDialogCreateSession>( NRequest::TDialogCreateSession::TRequest(), UserToken, TBase::SelfId())); } diff --git a/ydb/services/metadata/manager/modification.h b/ydb/services/metadata/manager/modification.h index 58b83f31cc..7a85bdb418 100644 --- a/ydb/services/metadata/manager/modification.h +++ b/ydb/services/metadata/manager/modification.h @@ -49,7 +49,7 @@ protected: void Handle(NRequest::TEvRequestResult<NRequest::TDialogYQLRequest>::TPtr& /*ev*/) { if (Requests.size()) { - TBase::Register(new NRequest::TYDBRequest<NRequest::TDialogYQLRequest>( + TBase::Register(new NRequest::TYDBCallbackRequest<NRequest::TDialogYQLRequest>( Requests.front(), SystemUserToken, TBase::SelfId())); Requests.pop_front(); } else { @@ -94,7 +94,7 @@ public: Y_VERIFY(Requests.size()); Requests.back().mutable_tx_control()->set_commit_tx(true); - TBase::Register(new NRequest::TYDBRequest<NRequest::TDialogYQLRequest>( + TBase::Register(new NRequest::TYDBCallbackRequest<NRequest::TDialogYQLRequest>( Requests.front(), SystemUserToken, TBase::SelfId())); Requests.pop_front(); } diff --git a/ydb/services/metadata/manager/restore.h b/ydb/services/metadata/manager/restore.h index 0fd04a1a1d..a50caf7de8 100644 --- a/ydb/services/metadata/manager/restore.h +++ b/ydb/services/metadata/manager/restore.h @@ -71,7 +71,7 @@ public: request.mutable_tx_control()->mutable_begin_tx()->mutable_serializable_read_write(); request.set_session_id(SessionId); TBase::Become(&TRestoreObjectsActor::StateMain); - TBase::Register(new NRequest::TYDBRequest<NRequest::TDialogSelect>(request, UserToken, TBase::SelfId())); + TBase::Register(new NRequest::TYDBCallbackRequest<NRequest::TDialogSelect>(request, UserToken, TBase::SelfId())); } }; diff --git a/ydb/services/metadata/request/request_actor.h b/ydb/services/metadata/request/request_actor.h index d251ff55a8..83a00e35e7 100644 --- a/ydb/services/metadata/request/request_actor.h +++ b/ydb/services/metadata/request/request_actor.h @@ -91,17 +91,12 @@ public: }; template <class TDialogPolicy> -class TYDBRequest: public NActors::TActorBootstrapped<TYDBRequest<TDialogPolicy>> { +class TYDBOneRequest: public NActors::TActorBootstrapped<TYDBOneRequest<TDialogPolicy>> { private: - using TBase = NActors::TActorBootstrapped<TYDBRequest<TDialogPolicy>>; + using TBase = NActors::TActorBootstrapped<TYDBOneRequest<TDialogPolicy>>; using TRequest = typename TDialogPolicy::TRequest; using TResponse = typename TDialogPolicy::TResponse; - using TSelf = TYDBRequest<TDialogPolicy>; TRequest ProtoRequest; - const NActors::TActorId ActorFinishId; - const NActors::TActorId ActorRestartId; - const TConfig Config; - ui32 Retry = 0; const NACLib::TUserToken UserToken; protected: class TEvRequestInternalResult: public NActors::TEventLocal<TEvRequestInternalResult, TDialogPolicy::EvResultInternal> { @@ -113,6 +108,9 @@ protected: } }; + + virtual void OnInternalResultError(const TString& errorMessage) = 0; + virtual void OnInternalResultSuccess(TResponse&& response) = 0; public: void Bootstrap(const TActorContext& /*ctx*/) { TBase::Become(&TBase::TThis::StateMain); @@ -130,11 +128,7 @@ public: void Handle(typename TEvRequestInternalResult::TPtr& ev) { if (!ev->Get()->GetFuture().HasValue() || ev->Get()->GetFuture().HasException()) { ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "cannot receive result on initialization"; - if (ActorRestartId) { - TBase::template Sender<TEvRequestFailed>("incorrect future result").SendTo(ActorRestartId); - } else { - TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart); - } + OnInternalResultError("cannot receive result from future"); return; } auto f = ev->Get()->GetFuture(); @@ -143,16 +137,11 @@ public: ALS_ERROR(NKikimrServices::METADATA_PROVIDER) << "incorrect reply: " << response.DebugString(); NYql::TIssues issue; NYql::IssuesFromMessage(response.operation().issues(), issue); - if (ActorRestartId) { - TBase::template Sender<TEvRequestFailed>(issue.ToString()).SendTo(ActorRestartId); - } else { - TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart); - } + OnInternalResultError(issue.ToString()); return; } - TBase::template Sender<TEvRequestResult<TDialogPolicy>>(std::move(response)).SendTo(ActorFinishId); - TBase::template Sender<TEvRequestFinished>().SendTo(ActorFinishId); - TBase::Die(TActivationContext::AsActorContext()); + OnInternalResultSuccess(std::move(response)); + TBase::PassAway(); } void Handle(typename TEvRequestStart::TPtr& /*ev*/) { @@ -168,23 +157,71 @@ public: result.Subscribe(replyCallback); } + TYDBOneRequest(const TRequest& request, const NACLib::TUserToken& uToken) + : ProtoRequest(request) + , UserToken(uToken) { + + } +}; + +template <class TDialogPolicy> +class TYDBRequest: public TYDBOneRequest<TDialogPolicy> { +private: + using TBase = TYDBOneRequest<TDialogPolicy>; + using TRequest = typename TDialogPolicy::TRequest; + using TResponse = typename TDialogPolicy::TResponse; + const NActors::TActorId ActorFinishId; + const NActors::TActorId ActorRestartId; + const TConfig Config; + ui32 Retry = 0; +protected: + virtual void OnInternalResultError(const TString& errorMessage) override { + if (ActorRestartId) { + TBase::template Sender<TEvRequestFailed>(errorMessage).SendTo(ActorRestartId); + TBase::PassAway(); + } else { + TBase::Schedule(Config.GetRetryPeriod(++Retry), new TEvRequestStart); + } + } + virtual void OnInternalResultSuccess(TResponse&& response) override { + TBase::template Sender<TEvRequestResult<TDialogPolicy>>(std::move(response)).SendTo(ActorFinishId); + TBase::template Sender<TEvRequestFinished>().SendTo(ActorFinishId); + } +public: + TYDBRequest(const TRequest& request, const NACLib::TUserToken& uToken, const NActors::TActorId actorFinishId, const TConfig& config, const NActors::TActorId& actorRestartId = {}) - : ProtoRequest(request) + : TBase(request, uToken) , ActorFinishId(actorFinishId) , ActorRestartId(actorRestartId) - , Config(config) - , UserToken(uToken) - { + , Config(config) { } +}; - TYDBRequest(const TRequest& request, const NACLib::TUserToken& uToken, const NActors::TActorId actorCallbackId) - : ProtoRequest(request) - , ActorFinishId(actorCallbackId) - , ActorRestartId(actorCallbackId) - , UserToken(uToken) - { +template <class TDialogPolicy> +class TYDBCallbackRequest: public TYDBOneRequest<TDialogPolicy> { +private: + using TBase = TYDBOneRequest<TDialogPolicy>; + using TRequest = typename TDialogPolicy::TRequest; + using TResponse = typename TDialogPolicy::TResponse; + const NActors::TActorId CallbackActorId; + const TConfig Config; + ui32 Retry = 0; +protected: + virtual void OnInternalResultError(const TString& errorMessage) override { + TBase::template Sender<TEvRequestFailed>(errorMessage).SendTo(CallbackActorId); + TBase::PassAway(); + } + virtual void OnInternalResultSuccess(TResponse&& response) override { + TBase::template Sender<TEvRequestResult<TDialogPolicy>>(std::move(response)).SendTo(CallbackActorId); + TBase::template Sender<TEvRequestFinished>().SendTo(CallbackActorId); + } +public: + + TYDBCallbackRequest(const TRequest& request, const NACLib::TUserToken& uToken, const NActors::TActorId actorCallbackId) + : TBase(request, uToken) + , CallbackActorId(actorCallbackId) { } }; @@ -204,7 +241,7 @@ private: Y_VERIFY(sessionId); std::optional<typename TDialogPolicy::TRequest> nextRequest = OnSessionId(sessionId); Y_VERIFY(nextRequest); - TBase::Register(new TYDBRequest<TDialogPolicy>(*nextRequest, UserToken, TBase::SelfId(), Config, TBase::SelfId())); + TBase::Register(new TYDBCallbackRequest<TDialogPolicy>(*nextRequest, UserToken, TBase::SelfId())); } protected: const TConfig Config; @@ -244,7 +281,7 @@ public: } void Handle(typename TEvRequestStart::TPtr& /*ev*/) { - TBase::Register(new TYDBRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), UserToken, TBase::SelfId(), Config, TBase::SelfId())); + TBase::Register(new TYDBCallbackRequest<TDialogCreateSession>(TDialogCreateSession::TRequest(), UserToken, TBase::SelfId())); } void Bootstrap() { @@ -258,7 +295,7 @@ public: using TPtr = std::shared_ptr<IQueryOutput>; virtual ~IQueryOutput() = default; - virtual void OnReply(const TDialogYQLRequest::TResponse& response) = 0; + virtual void OnYQLQueryReply(const TDialogYQLRequest::TResponse& response) = 0; }; class TYQLQuerySessionedActor: public TSessionedActorImpl<TDialogYQLRequest> { @@ -275,7 +312,7 @@ protected: return request; } virtual void OnResult(const TDialogYQLRequest::TResponse& response) override { - Output->OnReply(response); + Output->OnYQLQueryReply(response); } public: TYQLQuerySessionedActor(const TString& query, const NACLib::TUserToken& uToken, diff --git a/ydb/services/metadata/secret/ut/ut_secret.cpp b/ydb/services/metadata/secret/ut/ut_secret.cpp index 6d6f74bf4a..e4f09cb531 100644 --- a/ydb/services/metadata/secret/ut/ut_secret.cpp +++ b/ydb/services/metadata/secret/ut/ut_secret.cpp @@ -236,8 +236,6 @@ Y_UNIT_TEST_SUITE(Secret) { auto sender = runtime.AllocateEdgeActor(); server->SetupRootStoragePools(sender); - TSecretUserEmulator* emulator = new TSecretUserEmulator; - runtime.Register(emulator); { runtime.SimulateSleep(TDuration::Seconds(10)); Cerr << "Initialization finished" << Endl; @@ -250,9 +248,6 @@ Y_UNIT_TEST_SUITE(Secret) { lHelper.StartSchemaRequest("CREATE OBJECT `secret1:test@test1` (TYPE SECRET_ACCESS)"); lHelper.StartSchemaRequest("CREATE OBJECT `secret2:test@test1` (TYPE SECRET_ACCESS)", false); lHelper.StartSchemaRequest("DROP OBJECT `secret1` (TYPE SECRET)", false); - lHelper.StartDataRequest("SELECT * FROM `/Root/.metadata/initialization/migrations`"); - lHelper.StartSchemaRequest("DELETE FROM `/Root/.metadata/initialization/migrations`", false); - lHelper.StartSchemaRequest("DROP TABLE `/Root/.metadata/initialization/migrations`", false); lHelper.StartDataRequest("SELECT * FROM `/Root/.metadata/secrets/values`", false); } } |