diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-12-01 19:33:54 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-12-01 19:33:54 +0300 |
commit | e05ad9efe2e6c957982dbd54241d153900a710e6 (patch) | |
tree | 7f4a3254bdf3366774a8fc4a9a62a734c3ca8b8b | |
parent | 982b61276eed7a349c97b51c3e823ff16bfb3d3e (diff) | |
download | ydb-e05ad9efe2e6c957982dbd54241d153900a710e6.tar.gz |
renames, splitting
-rw-r--r-- | ydb/services/metadata/ds_table/service.cpp | 43 | ||||
-rw-r--r-- | ydb/services/metadata/ds_table/service.h | 26 | ||||
-rw-r--r-- | ydb/services/metadata/initializer/accessor_init.cpp | 2 | ||||
-rw-r--r-- | ydb/services/metadata/manager/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/services/metadata/manager/alter.h | 254 | ||||
-rw-r--r-- | ydb/services/metadata/manager/alter_impl.cpp | 5 | ||||
-rw-r--r-- | ydb/services/metadata/manager/alter_impl.h | 268 | ||||
-rw-r--r-- | ydb/services/metadata/manager/generic_manager.h | 6 | ||||
-rw-r--r-- | ydb/services/metadata/secret/ut/ut_secret.cpp | 7 | ||||
-rw-r--r-- | ydb/services/metadata/service.h | 4 |
10 files changed, 317 insertions, 299 deletions
diff --git a/ydb/services/metadata/ds_table/service.cpp b/ydb/services/metadata/ds_table/service.cpp index 605dbc1e41..a1136b688e 100644 --- a/ydb/services/metadata/ds_table/service.cpp +++ b/ydb/services/metadata/ds_table/service.cpp @@ -69,46 +69,21 @@ void TService::Handle(NMetadataInitializer::TEvInitializationFinished::TPtr& ev) EventsWaiting.emplace(i.first, std::move(i.second)); } } - void TService::Handle(TEvSubscribeExternal::TPtr& ev) { - std::vector<NMetadata::IOperationsManager::TPtr> needManagers; - for (auto&& i : ev->Get()->GetFetcher()->GetManagers()) { - if (!RegisteredManagers.contains(i->GetTypeId())) { - needManagers.emplace_back(i); - } - } - if (needManagers.empty()) { - auto it = Accessors.find(ev->Get()->GetFetcher()->GetComponentId()); - if (it == Accessors.end()) { - THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetFetcher()); - it = Accessors.emplace(ev->Get()->GetFetcher()->GetComponentId(), Register(actor.Release())).first; - } - Send<TEvSubscribe>(it->second, ev->Sender); - } else { - PrepareManagers(needManagers, ev->ReleaseBase(), ev->Sender); - } + const TActorId senderId = ev->Sender; + ProcessEventWithFetcher(ev, [this, senderId](const TActorId& actorId) { + Send<TEvSubscribe>(actorId, senderId); + }); } void TService::Handle(TEvAskSnapshot::TPtr& ev) { - std::vector<NMetadata::IOperationsManager::TPtr> needManagers; - for (auto&& i : ev->Get()->GetFetcher()->GetManagers()) { - if (!RegisteredManagers.contains(i->GetTypeId())) { - needManagers.emplace_back(i); - } - } - if (needManagers.empty()) { - auto it = Accessors.find(ev->Get()->GetFetcher()->GetComponentId()); - if (it == Accessors.end()) { - THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetFetcher()); - it = Accessors.emplace(ev->Get()->GetFetcher()->GetComponentId(), Register(actor.Release())).first; - } - Send<TEvAsk>(it->second, ev->Sender); - } else { - PrepareManagers(needManagers, ev->ReleaseBase(), ev->Sender); - } + const TActorId senderId = ev->Sender; + ProcessEventWithFetcher(ev, [this, senderId](const TActorId& actorId) { + Send<TEvAsk>(actorId, senderId); + }); } -void TService::Handle(TEvAlterObjects::TPtr& ev) { +void TService::Handle(TEvObjectsOperation::TPtr& ev) { auto it = RegisteredManagers.find(ev->Get()->GetCommand()->GetManager()->GetTypeId()); if (it != RegisteredManagers.end()) { ev->Get()->GetCommand()->Execute(); diff --git a/ydb/services/metadata/ds_table/service.h b/ydb/services/metadata/ds_table/service.h index 5f667a796e..779970456f 100644 --- a/ydb/services/metadata/ds_table/service.h +++ b/ydb/services/metadata/ds_table/service.h @@ -1,4 +1,5 @@ #pragma once +#include "accessor_subscribe.h" #include "config.h" #include <ydb/services/metadata/service.h> @@ -109,15 +110,36 @@ private: void Handle(TEvAskSnapshot::TPtr& ev); void Handle(TEvSubscribeExternal::TPtr& ev); void Handle(TEvUnsubscribeExternal::TPtr& ev); - void Handle(TEvAlterObjects::TPtr& ev); + void Handle(TEvObjectsOperation::TPtr& ev); void PrepareManagers(std::vector<NMetadata::IOperationsManager::TPtr> manager, TAutoPtr<IEventBase> ev, const NActors::TActorId& sender); + + template <class TEventPtr, class TAction> + void ProcessEventWithFetcher(TEventPtr& ev, TAction action) { + std::vector<NMetadata::IOperationsManager::TPtr> needManagers; + for (auto&& i : ev->Get()->GetFetcher()->GetManagers()) { + if (!RegisteredManagers.contains(i->GetTypeId())) { + needManagers.emplace_back(i); + } + } + if (needManagers.empty()) { + auto it = Accessors.find(ev->Get()->GetFetcher()->GetComponentId()); + if (it == Accessors.end()) { + THolder<TExternalData> actor = MakeHolder<TExternalData>(Config, ev->Get()->GetFetcher()); + it = Accessors.emplace(ev->Get()->GetFetcher()->GetComponentId(), Register(actor.Release())).first; + } + action(it->second); + } else { + PrepareManagers(needManagers, ev->ReleaseBase(), ev->Sender); + } + } + public: void Bootstrap(const NActors::TActorContext& /*ctx*/); STATEFN(StateMain) { switch (ev->GetTypeRewrite()) { - hFunc(TEvAlterObjects, Handle); + hFunc(TEvObjectsOperation, Handle); hFunc(TEvRefreshSubscriberData, Handle); hFunc(TEvAskSnapshot, Handle); hFunc(TEvSubscribeExternal, Handle); diff --git a/ydb/services/metadata/initializer/accessor_init.cpp b/ydb/services/metadata/initializer/accessor_init.cpp index bfe1d72644..64e76c4621 100644 --- a/ydb/services/metadata/initializer/accessor_init.cpp +++ b/ydb/services/metadata/initializer/accessor_init.cpp @@ -61,7 +61,7 @@ void TDSAccessorInitialized::Handle(NInternal::NRequest::TEvRequestFinished::TPt auto manager = std::make_shared<NMetadataInitializer::TManager>(); auto alterCommand = std::make_shared<NMetadataManager::TCreateCommand<TDBInitialization>>( dbInit.SerializeToRecord(), manager, InternalController, NMetadata::IOperationsManager::TModificationContext()); - Sender<NMetadataProvider::TEvAlterObjects>(alterCommand) + Sender<NMetadataProvider::TEvObjectsOperation>(alterCommand) .SendTo(NMetadataProvider::MakeServiceId(SelfId().NodeId())); } else { DoNextModifier(); diff --git a/ydb/services/metadata/manager/CMakeLists.txt b/ydb/services/metadata/manager/CMakeLists.txt index 019c6f158c..f62c406dac 100644 --- a/ydb/services/metadata/manager/CMakeLists.txt +++ b/ydb/services/metadata/manager/CMakeLists.txt @@ -22,6 +22,7 @@ target_link_libraries(services-metadata-manager PUBLIC ) target_sources(services-metadata-manager PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/alter.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/alter_impl.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/table_record.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/restore.cpp ${CMAKE_SOURCE_DIR}/ydb/services/metadata/manager/modification.cpp diff --git a/ydb/services/metadata/manager/alter.h b/ydb/services/metadata/manager/alter.h index 613feb4c3f..c4e1ca1542 100644 --- a/ydb/services/metadata/manager/alter.h +++ b/ydb/services/metadata/manager/alter.h @@ -1,4 +1,5 @@ #pragma once +#include "alter_impl.h" #include "modification_controller.h" #include "preparation_controller.h" #include "restore.h" @@ -11,259 +12,6 @@ namespace NKikimr::NMetadataManager { template <class TObject> -class TProcessingController: - public IRestoreObjectsController<TObject>, - public IModificationObjectsController, - public IAlterPreparationController<TObject> { -private: - const TActorIdentity ActorId; -public: - using TPtr = std::shared_ptr<TProcessingController>; - TProcessingController(const TActorIdentity actorId) - : ActorId(actorId) - { - - } - - virtual void RestoreFinished(std::vector<TObject>&& objects, const TString& transactionId) override { - ActorId.Send(ActorId, new TEvRestoreFinished<TObject>(std::move(objects), transactionId)); - } - virtual void RestoreProblem(const TString& errorMessage) override { - ActorId.Send(ActorId, new TEvRestoreProblem(errorMessage)); - } - virtual void ModificationFinished() override { - ActorId.Send(ActorId, new TEvModificationFinished()); - } - virtual void ModificationProblem(const TString& errorMessage) override { - ActorId.Send(ActorId, new TEvModificationProblem(errorMessage)); - } - virtual void PreparationProblem(const TString& errorMessage) override { - ActorId.Send(ActorId, new TEvAlterPreparationProblem(errorMessage)); - } - virtual void PreparationFinished(std::vector<TObject>&& objects) override { - ActorId.Send(ActorId, new TEvAlterPreparationFinished<TObject>(std::move(objects))); - } - -}; - -template <class TObject> -class TModificationActorImpl: public NActors::TActorBootstrapped<TModificationActorImpl<TObject>> { -private: - using TBase = NActors::TActorBootstrapped<TModificationActorImpl<TObject>>; -protected: - TString SessionId; - TString TransactionId; - typename TProcessingController<TObject>::TPtr InternalController; - IAlterController::TPtr ExternalController; - const NMetadata::IOperationsManager::TModificationContext Context; - std::vector<TTableRecord> Patches; - TTableRecords RestoreObjectIds; - const NACLib::TUserToken UserToken = NACLib::TSystemUsers::Metadata(); - virtual bool PrepareRestoredObjects(std::vector<TObject>& objects) const = 0; - virtual bool ProcessPreparedObjects(TTableRecords&& records) const = 0; - virtual void InitState() = 0; - virtual bool BuildRestoreObjectIds() = 0; -public: - TModificationActorImpl(TTableRecord&& patch, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context) - : ExternalController(controller) - , Context(context) { - Patches.emplace_back(std::move(patch)); - } - - TModificationActorImpl(const TTableRecord& patch, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context) - : ExternalController(controller) - , Context(context) { - Patches.emplace_back(patch); - } - - TModificationActorImpl(std::vector<TTableRecord>&& patches, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context) - : ExternalController(controller) - , Context(context) - , Patches(std::move(patches)) { - - } - - TModificationActorImpl(const std::vector<TTableRecord>& patches, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context) - : ExternalController(controller) - , Context(context) - , Patches(patches) { - - } - - STATEFN(StateMain) { - switch (ev->GetTypeRewrite()) { - hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>, Handle); - hFunc(TEvRestoreFinished<TObject>, Handle); - hFunc(TEvAlterPreparationFinished<TObject>, Handle); - hFunc(NInternal::NRequest::TEvRequestFailed, Handle); - hFunc(TEvRestoreProblem, Handle); - hFunc(TEvAlterPreparationProblem, Handle); - default: - break; - } - } - - void Bootstrap() { - InitState(); - if (!Patches.size()) { - ExternalController->AlterProblem("no patches"); - return TBase::PassAway(); - } - if (!BuildRestoreObjectIds()) { - return TBase::PassAway(); - } - - TBase::Register(new NInternal::NRequest::TYDBRequest<NInternal::NRequest::TDialogCreateSession>( - NInternal::NRequest::TDialogCreateSession::TRequest(), UserToken, TBase::SelfId())); - } - - void Handle(typename NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>::TPtr& ev) { - Ydb::Table::CreateSessionResponse currentFullReply = ev->Get()->GetResult(); - Ydb::Table::CreateSessionResult session; - currentFullReply.operation().result().UnpackTo(&session); - SessionId = session.session_id(); - Y_VERIFY(SessionId); - - InternalController = std::make_shared<TProcessingController<TObject>>(TBase::SelfId()); - TBase::Register(new TRestoreObjectsActor<TObject>(RestoreObjectIds, UserToken, InternalController, SessionId)); - } - - void Handle(typename TEvRestoreFinished<TObject>::TPtr& ev) { - TransactionId = ev->Get()->GetTransactionId(); - Y_VERIFY(TransactionId); - std::vector<TObject> objects = std::move(ev->Get()->MutableObjects()); - if (!PrepareRestoredObjects(objects)) { - TBase::PassAway(); - } else { - TObject::AlteringPreparation(std::move(objects), InternalController, Context); - } - } - - void Handle(typename TEvAlterPreparationFinished<TObject>::TPtr& ev) { - TTableRecords records; - records.InitColumns(TObject::TDecoder::GetColumns()); - records.ReserveRows(ev->Get()->GetObjects().size()); - for (auto&& i : ev->Get()->GetObjects()) { - if (!records.AddRecordNativeValues(i.SerializeToRecord())) { - ExternalController->AlterProblem("unexpected serialization inconsistency"); - return TBase::PassAway(); - } - } - if (!ProcessPreparedObjects(std::move(records))) { - ExternalController->AlterProblem("cannot process prepared objects"); - return TBase::PassAway(); - } - } - - void Handle(typename NInternal::NRequest::TEvRequestFailed::TPtr& /*ev*/) { - auto g = TBase::PassAwayGuard(); - ExternalController->AlterProblem("cannot initialize session"); - } - - void Handle(TEvAlterPreparationProblem::TPtr& ev) { - auto g = TBase::PassAwayGuard(); - ExternalController->AlterProblem("preparation problem: " + ev->Get()->GetErrorMessage()); - } - - void Handle(TEvRestoreProblem::TPtr& ev) { - auto g = TBase::PassAwayGuard(); - ExternalController->AlterProblem("cannot restore objects: " + ev->Get()->GetErrorMessage()); - } - -}; - -template <class TObject> -class TModificationActor: public TModificationActorImpl<TObject> { -private: - using TBase = TModificationActorImpl<TObject>; -protected: - virtual void InitState() override { - TBase::Become(&TModificationActor<TObject>::StateMain); - } - - virtual bool BuildRestoreObjectIds() override { - TBase::RestoreObjectIds.InitColumns(TObject::TDecoder::GetPKColumns()); - for (auto&& i : TBase::Patches) { - if (!TBase::RestoreObjectIds.AddRecordNativeValues(i)) { - TBase::ExternalController->AlterProblem("no pk columns in patch"); - return false; - } - } - return true; - } - - virtual TString GetModificationType() const = 0; - -public: - using TBase::TBase; - STFUNC(StateMain) { - switch (ev->GetTypeRewrite()) { - hFunc(TEvModificationFinished, Handle); - hFunc(TEvModificationProblem, Handle); - default: - TBase::StateMain(ev, ctx); - } - } - - virtual bool PrepareRestoredObjects(std::vector<TObject>& objects) const override { - std::vector<bool> realPatches; - realPatches.resize(TBase::Patches.size(), false); - for (auto&& i : objects) { - const TTableRecord* trPatch = nullptr; - TTableRecord trObject = i.SerializeToRecord(); - for (auto&& p : TBase::Patches) { - if (p.CompareColumns(trObject, TObject::TDecoder::GetPKColumnIds())) { - trPatch = &p; - break; - } - } - TObject objectPatched; - if (!trPatch) { - TBase::ExternalController->AlterProblem("cannot found patch for object"); - return false; - } else if (!trObject.TakeValuesFrom(*trPatch)) { - TBase::ExternalController->AlterProblem("cannot patch object"); - return false; - } else if (!TObject::TDecoder::DeserializeFromRecord(objectPatched, trObject)) { - TBase::ExternalController->AlterProblem("cannot parse object after patch"); - return false; - } else { - i = std::move(objectPatched); - } - } - for (auto&& p : TBase::Patches) { - bool found = false; - for (auto&& i : objects) { - if (i.SerializeToRecord().CompareColumns(p, TObject::TDecoder::GetPKColumnIds())) { - found = true; - break; - } - } - if (!found) { - TObject object; - if (!TObject::TDecoder::DeserializeFromRecord(object, p)) { - TBase::ExternalController->AlterProblem("cannot parse new object"); - return false; - } - objects.emplace_back(std::move(object)); - } - } - return true; - } - - void Handle(TEvModificationFinished::TPtr& /*ev*/) { - auto g = TBase::PassAwayGuard(); - TBase::ExternalController->AlterFinished(); - } - - void Handle(TEvModificationProblem::TPtr& ev) { - auto g = TBase::PassAwayGuard(); - TBase::ExternalController->AlterProblem("cannot " + GetModificationType() + " objects: " + ev->Get()->GetErrorMessage()); - } - -}; - -template <class TObject> class TAlterActor: public TModificationActor<TObject> { private: using TBase = TModificationActor<TObject>; diff --git a/ydb/services/metadata/manager/alter_impl.cpp b/ydb/services/metadata/manager/alter_impl.cpp new file mode 100644 index 0000000000..433f924afd --- /dev/null +++ b/ydb/services/metadata/manager/alter_impl.cpp @@ -0,0 +1,5 @@ +#include "alter_impl.h" + +namespace NKikimr::NMetadataManager { + +} diff --git a/ydb/services/metadata/manager/alter_impl.h b/ydb/services/metadata/manager/alter_impl.h new file mode 100644 index 0000000000..34ad418807 --- /dev/null +++ b/ydb/services/metadata/manager/alter_impl.h @@ -0,0 +1,268 @@ +#pragma once +#include "modification_controller.h" +#include "preparation_controller.h" +#include "restore.h" +#include "modification.h" + +#include <ydb/services/metadata/abstract/manager.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> + +namespace NKikimr::NMetadataManager { + +template <class TObject> +class TProcessingController: + public IRestoreObjectsController<TObject>, + public IModificationObjectsController, + public IAlterPreparationController<TObject> { +private: + const TActorIdentity ActorId; +public: + using TPtr = std::shared_ptr<TProcessingController>; + TProcessingController(const TActorIdentity actorId) + : ActorId(actorId) + { + + } + + virtual void RestoreFinished(std::vector<TObject>&& objects, const TString& transactionId) override { + ActorId.Send(ActorId, new TEvRestoreFinished<TObject>(std::move(objects), transactionId)); + } + virtual void RestoreProblem(const TString& errorMessage) override { + ActorId.Send(ActorId, new TEvRestoreProblem(errorMessage)); + } + virtual void ModificationFinished() override { + ActorId.Send(ActorId, new TEvModificationFinished()); + } + virtual void ModificationProblem(const TString& errorMessage) override { + ActorId.Send(ActorId, new TEvModificationProblem(errorMessage)); + } + virtual void PreparationProblem(const TString& errorMessage) override { + ActorId.Send(ActorId, new TEvAlterPreparationProblem(errorMessage)); + } + virtual void PreparationFinished(std::vector<TObject>&& objects) override { + ActorId.Send(ActorId, new TEvAlterPreparationFinished<TObject>(std::move(objects))); + } + +}; + +template <class TObject> +class TModificationActorImpl: public NActors::TActorBootstrapped<TModificationActorImpl<TObject>> { +private: + using TBase = NActors::TActorBootstrapped<TModificationActorImpl<TObject>>; +protected: + TString SessionId; + TString TransactionId; + typename TProcessingController<TObject>::TPtr InternalController; + IAlterController::TPtr ExternalController; + const NMetadata::IOperationsManager::TModificationContext Context; + std::vector<TTableRecord> Patches; + TTableRecords RestoreObjectIds; + const NACLib::TUserToken UserToken = NACLib::TSystemUsers::Metadata(); + virtual bool PrepareRestoredObjects(std::vector<TObject>& objects) const = 0; + virtual bool ProcessPreparedObjects(TTableRecords&& records) const = 0; + virtual void InitState() = 0; + virtual bool BuildRestoreObjectIds() = 0; +public: + TModificationActorImpl(TTableRecord&& patch, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context) + : ExternalController(controller) + , Context(context) { + Patches.emplace_back(std::move(patch)); + } + + TModificationActorImpl(const TTableRecord& patch, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context) + : ExternalController(controller) + , Context(context) { + Patches.emplace_back(patch); + } + + TModificationActorImpl(std::vector<TTableRecord>&& patches, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context) + : ExternalController(controller) + , Context(context) + , Patches(std::move(patches)) { + + } + + TModificationActorImpl(const std::vector<TTableRecord>& patches, IAlterController::TPtr controller, const NMetadata::IOperationsManager::TModificationContext& context) + : ExternalController(controller) + , Context(context) + , Patches(patches) { + + } + + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>, Handle); + hFunc(TEvRestoreFinished<TObject>, Handle); + hFunc(TEvAlterPreparationFinished<TObject>, Handle); + hFunc(NInternal::NRequest::TEvRequestFailed, Handle); + hFunc(TEvRestoreProblem, Handle); + hFunc(TEvAlterPreparationProblem, Handle); + default: + break; + } + } + + void Bootstrap() { + InitState(); + if (!Patches.size()) { + ExternalController->AlterProblem("no patches"); + return TBase::PassAway(); + } + if (!BuildRestoreObjectIds()) { + return TBase::PassAway(); + } + + TBase::Register(new NInternal::NRequest::TYDBRequest<NInternal::NRequest::TDialogCreateSession>( + NInternal::NRequest::TDialogCreateSession::TRequest(), UserToken, TBase::SelfId())); + } + + void Handle(typename NInternal::NRequest::TEvRequestResult<NInternal::NRequest::TDialogCreateSession>::TPtr& ev) { + Ydb::Table::CreateSessionResponse currentFullReply = ev->Get()->GetResult(); + Ydb::Table::CreateSessionResult session; + currentFullReply.operation().result().UnpackTo(&session); + SessionId = session.session_id(); + Y_VERIFY(SessionId); + + InternalController = std::make_shared<TProcessingController<TObject>>(TBase::SelfId()); + TBase::Register(new TRestoreObjectsActor<TObject>(RestoreObjectIds, UserToken, InternalController, SessionId)); + } + + void Handle(typename TEvRestoreFinished<TObject>::TPtr& ev) { + TransactionId = ev->Get()->GetTransactionId(); + Y_VERIFY(TransactionId); + std::vector<TObject> objects = std::move(ev->Get()->MutableObjects()); + if (!PrepareRestoredObjects(objects)) { + TBase::PassAway(); + } else { + auto preparationController = std::dynamic_pointer_cast<IAlterPreparationController<TObject>>(InternalController); + Y_VERIFY(preparationController); + TObject::AlteringPreparation(std::move(objects), preparationController, Context); + } + } + + void Handle(typename TEvAlterPreparationFinished<TObject>::TPtr& ev) { + TTableRecords records; + records.InitColumns(TObject::TDecoder::GetColumns()); + records.ReserveRows(ev->Get()->GetObjects().size()); + for (auto&& i : ev->Get()->GetObjects()) { + if (!records.AddRecordNativeValues(i.SerializeToRecord())) { + ExternalController->AlterProblem("unexpected serialization inconsistency"); + return TBase::PassAway(); + } + } + if (!ProcessPreparedObjects(std::move(records))) { + ExternalController->AlterProblem("cannot process prepared objects"); + return TBase::PassAway(); + } + } + + void Handle(typename NInternal::NRequest::TEvRequestFailed::TPtr& /*ev*/) { + auto g = TBase::PassAwayGuard(); + ExternalController->AlterProblem("cannot initialize session"); + } + + void Handle(TEvAlterPreparationProblem::TPtr& ev) { + auto g = TBase::PassAwayGuard(); + ExternalController->AlterProblem("preparation problem: " + ev->Get()->GetErrorMessage()); + } + + void Handle(TEvRestoreProblem::TPtr& ev) { + auto g = TBase::PassAwayGuard(); + ExternalController->AlterProblem("cannot restore objects: " + ev->Get()->GetErrorMessage()); + } + +}; + +template <class TObject> +class TModificationActor: public TModificationActorImpl<TObject> { +private: + using TBase = TModificationActorImpl<TObject>; +protected: + virtual void InitState() override { + TBase::Become(&TModificationActor<TObject>::StateMain); + } + + virtual bool BuildRestoreObjectIds() override { + TBase::RestoreObjectIds.InitColumns(TObject::TDecoder::GetPKColumns()); + for (auto&& i : TBase::Patches) { + if (!TBase::RestoreObjectIds.AddRecordNativeValues(i)) { + TBase::ExternalController->AlterProblem("no pk columns in patch"); + return false; + } + } + return true; + } + + virtual TString GetModificationType() const = 0; + +public: + using TBase::TBase; + STFUNC(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvModificationFinished, Handle); + hFunc(TEvModificationProblem, Handle); + default: + TBase::StateMain(ev, ctx); + } + } + + virtual bool PrepareRestoredObjects(std::vector<TObject>& objects) const override { + std::vector<bool> realPatches; + realPatches.resize(TBase::Patches.size(), false); + for (auto&& i : objects) { + const TTableRecord* trPatch = nullptr; + TTableRecord trObject = i.SerializeToRecord(); + for (auto&& p : TBase::Patches) { + if (p.CompareColumns(trObject, TObject::TDecoder::GetPKColumnIds())) { + trPatch = &p; + break; + } + } + TObject objectPatched; + if (!trPatch) { + TBase::ExternalController->AlterProblem("cannot found patch for object"); + return false; + } else if (!trObject.TakeValuesFrom(*trPatch)) { + TBase::ExternalController->AlterProblem("cannot patch object"); + return false; + } else if (!TObject::TDecoder::DeserializeFromRecord(objectPatched, trObject)) { + TBase::ExternalController->AlterProblem("cannot parse object after patch"); + return false; + } else { + i = std::move(objectPatched); + } + } + for (auto&& p : TBase::Patches) { + bool found = false; + for (auto&& i : objects) { + if (i.SerializeToRecord().CompareColumns(p, TObject::TDecoder::GetPKColumnIds())) { + found = true; + break; + } + } + if (!found) { + TObject object; + if (!TObject::TDecoder::DeserializeFromRecord(object, p)) { + TBase::ExternalController->AlterProblem("cannot parse new object"); + return false; + } + objects.emplace_back(std::move(object)); + } + } + return true; + } + + void Handle(TEvModificationFinished::TPtr& /*ev*/) { + auto g = TBase::PassAwayGuard(); + TBase::ExternalController->AlterFinished(); + } + + void Handle(TEvModificationProblem::TPtr& ev) { + auto g = TBase::PassAwayGuard(); + TBase::ExternalController->AlterProblem("cannot " + GetModificationType() + " objects: " + ev->Get()->GetErrorMessage()); + } + +}; + +} diff --git a/ydb/services/metadata/manager/generic_manager.h b/ydb/services/metadata/manager/generic_manager.h index 3d7eda96dc..40b1e7e535 100644 --- a/ydb/services/metadata/manager/generic_manager.h +++ b/ydb/services/metadata/manager/generic_manager.h @@ -41,7 +41,7 @@ protected: auto c = std::make_shared<TOperationsController>(std::move(promise)); auto command = std::make_shared<NMetadataManager::TCreateCommand<T>>(patch.GetRecord(), manager, c, context); TActivationContext::Send(new IEventHandle(NMetadataProvider::MakeServiceId(nodeId), {}, - new NMetadataProvider::TEvAlterObjects(command))); + new NMetadataProvider::TEvObjectsOperation(command))); return result; } virtual NThreading::TFuture<NMetadata::TObjectOperatorResult> DoAlterObject( @@ -57,7 +57,7 @@ protected: auto c = std::make_shared<TOperationsController>(std::move(promise)); auto command = std::make_shared<NMetadataManager::TAlterCommand<T>>(patch.GetRecord(), manager, c, context); TActivationContext::Send(new IEventHandle(NMetadataProvider::MakeServiceId(nodeId), {}, - new NMetadataProvider::TEvAlterObjects(command))); + new NMetadataProvider::TEvObjectsOperation(command))); return result; } virtual NThreading::TFuture<NMetadata::TObjectOperatorResult> DoDropObject( @@ -73,7 +73,7 @@ protected: auto c = std::make_shared<TOperationsController>(std::move(promise)); auto command = std::make_shared<NMetadataManager::TDropCommand<T>>(patch.GetRecord(), manager, c, context); TActivationContext::Send(new IEventHandle(NMetadataProvider::MakeServiceId(nodeId), {}, - new NMetadataProvider::TEvAlterObjects(command))); + new NMetadataProvider::TEvObjectsOperation(command))); return result; } public: diff --git a/ydb/services/metadata/secret/ut/ut_secret.cpp b/ydb/services/metadata/secret/ut/ut_secret.cpp index d99f713baf..6e3d804afe 100644 --- a/ydb/services/metadata/secret/ut/ut_secret.cpp +++ b/ydb/services/metadata/secret/ut/ut_secret.cpp @@ -37,8 +37,7 @@ Y_UNIT_TEST_SUITE(Secret) { public: TJsonChecker(const TString& path, const TString& expectation) : Path(path) - , Expectation(expectation) - { + , Expectation(expectation) { } bool Check(const NJson::TJsonValue& jsonInfo) const { @@ -197,8 +196,8 @@ Y_UNIT_TEST_SUITE(Secret) { Y_VERIFY(emulator->IsFound()); } - lHelper.StartSchemaRequest("DROP OBJECT `secret1/test@test1` (TYPE SECRET_ACCESS) WITH ownerUserId = `root@root`"); - lHelper.StartSchemaRequest("DROP OBJECT `secret1` (TYPE SECRET) WITH ownerUserId = `root@root`"); + lHelper.StartSchemaRequest("DROP OBJECT `secret1/test@test1` (TYPE SECRET_ACCESS)"); + lHelper.StartSchemaRequest("DROP OBJECT `secret1` (TYPE SECRET)"); emulator->SetExpectedSecretsCount(0).SetExpectedAccessCount(0); { diff --git a/ydb/services/metadata/service.h b/ydb/services/metadata/service.h index 96dcf1993a..72964ba0e7 100644 --- a/ydb/services/metadata/service.h +++ b/ydb/services/metadata/service.h @@ -5,11 +5,11 @@ namespace NKikimr::NMetadataProvider { -class TEvAlterObjects: public NActors::TEventLocal<TEvAlterObjects, EEvSubscribe::EvAlterObjects> { +class TEvObjectsOperation: public NActors::TEventLocal<TEvObjectsOperation, EEvSubscribe::EvAlterObjects> { private: YDB_READONLY_DEF(NMetadata::IAlterCommand::TPtr, Command); public: - TEvAlterObjects(NMetadata::IAlterCommand::TPtr command) + TEvObjectsOperation(NMetadata::IAlterCommand::TPtr command) : Command(command) { } |