diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-12-06 09:25:59 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-12-06 09:25:59 +0300 |
commit | 83bee92cb09a4061f43973771afcc67f8b1865df (patch) | |
tree | 1943efe03e72a11aa496ab387ec4ad7b60806498 | |
parent | e857c58e5e995b376b6c1c8a37dbe40df2b736a6 (diff) | |
download | ydb-83bee92cb09a4061f43973771afcc67f8b1865df.tar.gz |
general schemeshard data fetching for alter-tiering-validation
23 files changed, 669 insertions, 9 deletions
diff --git a/ydb/core/protos/flat_tx_scheme.proto b/ydb/core/protos/flat_tx_scheme.proto index ebced3f665b..3e22ccf2059 100644 --- a/ydb/core/protos/flat_tx_scheme.proto +++ b/ydb/core/protos/flat_tx_scheme.proto @@ -54,6 +54,39 @@ message TEvModifySchemeTransaction { optional string UserToken = 7; // serialized NACLib::TUserToken } +message TFetcherCheckUserTieringPermissionsResult { + optional bool OperationAllow = 1; + optional string DenyReason = 2; +} + +message TFetcherCheckUserTieringPermissions { + optional bytes UserToken = 1; + repeated string TieringRuleIds = 2; + optional string ActivityType = 3; +} + +message TEvProcessingRequest { + optional string ClassName = 1; + // Fetcher is string serializable object that deserialized after factory constructed object from ClassName + optional bytes Data = 2; +} + +message TEvProcessingResponse { + message TContent { + // Fetched content is string serializable object that determined by general content fetcher by mthod UnpackContent + optional bytes Data = 2; + } + + message TError { + optional string ErrorMessage = 1; + } + + oneof Result { + TContent Content = 1; + TError Error = 2; + } +} + message TEvModifySchemeTransactionResult { optional EStatus Status = 1; optional string Reason = 2; diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 781516f43d2..c136090061a 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -954,5 +954,6 @@ message TActivity { HTTP_MON_SERVICE_NODE_PROXY = 595; AUDIT_WRITER_ACTOR = 596; SCHEMESHARD_SVP_MIGRATOR = 597; + SS_FETCHING_ACTOR = 598; }; }; diff --git a/ydb/core/tx/schemeshard/schemeshard.h b/ydb/core/tx/schemeshard/schemeshard.h index 5d68b0a7769..13dc8ac912d 100644 --- a/ydb/core/tx/schemeshard/schemeshard.h +++ b/ydb/core/tx/schemeshard/schemeshard.h @@ -11,6 +11,7 @@ #include <ydb/core/scheme/scheme_tablecell.h> #include <library/cpp/deprecated/enum_codegen/enum_codegen.h> +#include <library/cpp/object_factory/object_factory.h> #include "schemeshard_identificators.h" @@ -20,8 +21,35 @@ namespace NSchemeShard { static constexpr ui64 RootSchemeShardId = 0; static constexpr ui64 RootPathId = 1; +class TSchemeShard; struct TSchemeLimits; +class ISSDataProcessor { +protected: + virtual void DoProcess(TSchemeShard& schemeShard, NKikimrScheme::TEvProcessingResponse& result) const = 0; + virtual bool DoDeserializeFromString(const TString& data) = 0; + virtual TString DoSerializeToString() const = 0; +public: + using TPtr = std::shared_ptr<ISSDataProcessor>; + using TFactory = NObjectFactory::TObjectFactory<ISSDataProcessor, TString>; + virtual ~ISSDataProcessor() = default; + + virtual TString DebugString() const = 0; + + TString SerializeToString() const { + return DoSerializeToString(); + } + bool DeserializeFromString(const TString& data) { + return DoDeserializeFromString(data); + } + + virtual TString GetClassName() const = 0; + + void Process(TSchemeShard& schemeShard, NKikimrScheme::TEvProcessingResponse& result) { + return DoProcess(schemeShard, result); + } +}; + struct TEvSchemeShard { enum EEv { EvModifySchemeTransaction = EventSpaceBegin(TKikimrEvents::ES_FLAT_TX_SCHEMESHARD), // 271122432 @@ -60,12 +88,50 @@ struct TEvSchemeShard { EvBackupDatashardResult, EvCancelTx, EvCancelTxResult, + EvProcessingRequest, + EvProcessingResponse, EvEnd }; static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_FLAT_TX_SCHEMESHARD), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_FLAT_TX_SCHEMESHARD)"); + struct TEvProcessingResponse: public TEventPB<TEvProcessingResponse, + NKikimrScheme::TEvProcessingResponse, EvProcessingResponse> { + private: + using TBase = TEventPB<TEvProcessingResponse, + NKikimrScheme::TEvProcessingResponse, EvProcessingResponse>; + public: + using TBase::TBase; + TEvProcessingResponse(const TString& errorMessage) { + Record.MutableError()->SetErrorMessage(errorMessage); + } + }; + + struct TEvProcessingRequest: public TEventPB<TEvProcessingRequest, + NKikimrScheme::TEvProcessingRequest, EvProcessingRequest> { + private: + using TBase = TEventPB<TEvProcessingRequest, + NKikimrScheme::TEvProcessingRequest, EvProcessingRequest>; + public: + using TBase::TBase; + TEvProcessingRequest(const ISSDataProcessor& processor) { + Record.SetClassName(processor.GetClassName()); + Record.SetData(processor.SerializeToString()); + } + + ISSDataProcessor::TPtr RestoreProcessor() const { + auto result = ISSDataProcessor::TFactory::MakeHolder(TBase::Record.GetClassName()); + if (!result) { + return nullptr; + } else if (!result->DeserializeFromString(TBase::Record.GetData())) { + return nullptr; + } else { + return std::shared_ptr<ISSDataProcessor>(result.Release()); + } + } + }; + struct TEvModifySchemeTransaction : public TEventPB<TEvModifySchemeTransaction, NKikimrScheme::TEvModifySchemeTransaction, EvModifySchemeTransaction> { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index f52200b2aef..ace8f317e02 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4182,6 +4182,8 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvDataShard::TEvCompactBorrowedResult, Handle); HFuncTraced(TEvSchemeShard::TEvSyncTenantSchemeShard, Handle); + HFuncTraced(TEvSchemeShard::TEvProcessingRequest, Handle); + HFuncTraced(TEvSchemeShard::TEvUpdateTenantSchemeShard, Handle); HFuncTraced(TSchemeBoardEvents::TEvUpdateAck, Handle); @@ -4781,6 +4783,21 @@ void TSchemeShard::Handle(TEvSchemeShard::TEvModifySchemeTransaction::TPtr &ev, Execute(CreateTxOperationPropose(ev), ctx); } +void TSchemeShard::Handle(TEvSchemeShard::TEvProcessingRequest::TPtr& ev, const TActorContext& ctx) { + const auto processor = ev->Get()->RestoreProcessor(); + LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "TSchemeShard::Handle" + << ", at schemeshard: " << TabletID() + << ", processor: " << (processor ? processor->DebugString() : "nullptr")); + if (processor) { + NKikimrScheme::TEvProcessingResponse result; + processor->Process(*this, result); + ctx.Send(ev->Sender, new TEvSchemeShard::TEvProcessingResponse(result)); + } else { + ctx.Send(ev->Sender, new TEvSchemeShard::TEvProcessingResponse("cannot restore processor: " + ev->Get()->Record.GetClassName())); + } +} + void TSchemeShard::Handle(TEvPrivate::TEvProgressOperation::TPtr &ev, const TActorContext &ctx) { const auto txId = TTxId(ev->Get()->TxId); if (!Operations.contains(txId)) { diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index db40fd8d7be..09c89915683 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -958,6 +958,8 @@ public: void Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const TActorContext &ctx); void Handle(TEvDataShard::TEvCompactBorrowedResult::TPtr &ev, const TActorContext &ctx); + + void Handle(TEvSchemeShard::TEvProcessingRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvSchemeShard::TEvSyncTenantSchemeShard::TPtr& ev, const TActorContext& ctx); void Handle(TEvSchemeShard::TEvUpdateTenantSchemeShard::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/tx/tiering/common.h b/ydb/core/tx/tiering/common.h index 906f897dd65..1294252b70f 100644 --- a/ydb/core/tx/tiering/common.h +++ b/ydb/core/tx/tiering/common.h @@ -9,6 +9,9 @@ namespace NKikimr::NColumnShard::NTiers { enum EEvents { EvTierCleared = EventSpaceBegin(TKikimrEvents::ES_TIERING), + EvSSFetchingResult, + EvSSFetchingProblem, + EvTimeout, EvEnd }; diff --git a/ydb/core/tx/tiering/rule/CMakeLists.txt b/ydb/core/tx/tiering/rule/CMakeLists.txt index 9d9cdb53026..022e8654412 100644 --- a/ydb/core/tx/tiering/rule/CMakeLists.txt +++ b/ydb/core/tx/tiering/rule/CMakeLists.txt @@ -8,25 +8,39 @@ add_library(tx-tiering-rule) +target_compile_options(tx-tiering-rule PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_link_libraries(tx-tiering-rule PUBLIC contrib-libs-cxxsupp yutil services-metadata-initializer services-metadata-abstract + services-bg_tasks-abstract + core-tx-schemeshard ) target_sources(tx-tiering-rule PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/object.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/initializer.cpp ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/checker.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/ss_checker.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/timeout.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/ss_dialog.cpp ) add_global_library_for(tx-tiering-rule.global tx-tiering-rule) +target_compile_options(tx-tiering-rule.global PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_link_libraries(tx-tiering-rule.global PUBLIC contrib-libs-cxxsupp yutil services-metadata-initializer services-metadata-abstract + services-bg_tasks-abstract + core-tx-schemeshard ) target_sources(tx-tiering-rule.global PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/manager.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/ss_fetcher.cpp ) diff --git a/ydb/core/tx/tiering/rule/checker.cpp b/ydb/core/tx/tiering/rule/checker.cpp index 0111a56e2ee..fc373a0f833 100644 --- a/ydb/core/tx/tiering/rule/checker.cpp +++ b/ydb/core/tx/tiering/rule/checker.cpp @@ -1,13 +1,24 @@ #include "checker.h" +#include "ss_checker.h" +#include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/tx/tiering/external_data.h> +#include <ydb/core/tx/tiering/rule/ss_fetcher.h> +#include <ydb/services/bg_tasks/abstract/interface.h> #include <ydb/services/metadata/secret/snapshot.h> #include <ydb/services/metadata/secret/fetcher.h> namespace NKikimr::NColumnShard::NTiers { void TRulePreparationActor::StartChecker() { + if (!Tierings || !Secrets || !SSCheckResult) { + return; + } auto g = PassAwayGuard(); + if (!SSCheckResult->GetContent().GetOperationAllow()) { + Controller->PreparationProblem(SSCheckResult->GetContent().GetDenyReason()); + return; + } for (auto&& tiering : Objects) { for (auto&& interval : tiering.GetIntervals()) { @@ -27,6 +38,24 @@ void TRulePreparationActor::StartChecker() { Controller->PreparationFinished(std::move(Objects)); } +void TRulePreparationActor::Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev) { + auto& proto = ev->Get()->Record; + if (proto.HasError()) { + Controller->PreparationProblem(proto.GetError().GetErrorMessage()); + PassAway(); + } else if (proto.HasContent()) { + SSCheckResult = SSFetcher->UnpackResult(ev->Get()->Record.GetContent().GetData()); + if (!SSCheckResult) { + Controller->PreparationProblem("cannot unpack ss-fetcher result for class " + SSFetcher->GetClassName()); + PassAway(); + } else { + StartChecker(); + } + } else { + Y_VERIFY(false); + } +} + void TRulePreparationActor::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) { if (auto snapshot = ev->Get()->GetSnapshotPtrAs<TConfigsSnapshot>()) { Tierings = snapshot; @@ -35,9 +64,7 @@ void TRulePreparationActor::Handle(NMetadataProvider::TEvRefreshSubscriberData:: } else { Y_VERIFY(false); } - if (Tierings && Secrets) { - StartChecker(); - } + StartChecker(); } void TRulePreparationActor::Bootstrap() { @@ -46,6 +73,15 @@ void TRulePreparationActor::Bootstrap() { new NMetadataProvider::TEvAskSnapshot(std::make_shared<TSnapshotConstructor>())); Send(NMetadataProvider::MakeServiceId(SelfId().NodeId()), new NMetadataProvider::TEvAskSnapshot(std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>())); + { + SSFetcher = std::make_shared<TFetcherCheckUserTieringPermissions>(); + SSFetcher->SetUserToken(Context.GetUserToken()); + SSFetcher->SetActivityType(Context.GetActivityType()); + for (auto&& i : Objects) { + SSFetcher->MutableTieringRuleIds().emplace(i.GetTieringRuleId()); + } + Register(new TSSFetchingActor(SSFetcher, std::make_shared<TSSFetchingController>(SelfId()), TDuration::Seconds(10))); + } } TRulePreparationActor::TRulePreparationActor(std::vector<TTieringRule>&& objects, diff --git a/ydb/core/tx/tiering/rule/checker.h b/ydb/core/tx/tiering/rule/checker.h index 3f96aec9078..e0dfec22144 100644 --- a/ydb/core/tx/tiering/rule/checker.h +++ b/ydb/core/tx/tiering/rule/checker.h @@ -1,6 +1,8 @@ #pragma once #include "object.h" +#include "ss_fetcher.h" +#include <ydb/core/tx/schemeshard/schemeshard.h> #include <ydb/core/tx/tiering/snapshot.h> #include <ydb/services/metadata/abstract/common.h> @@ -17,14 +19,17 @@ private: NMetadata::IOperationsManager::TModificationContext Context; std::shared_ptr<TConfigsSnapshot> Tierings; std::shared_ptr<NMetadata::NSecret::TSnapshot> Secrets; - + std::shared_ptr<TFetcherCheckUserTieringPermissions> SSFetcher; + std::optional<TFetcherCheckUserTieringPermissions::TResult> SSCheckResult; void StartChecker(); protected: void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev); + void Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev); public: STATEFN(StateMain) { switch (ev->GetTypeRewrite()) { hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle); + hFunc(NSchemeShard::TEvSchemeShard::TEvProcessingResponse, Handle); default: break; } diff --git a/ydb/core/tx/tiering/rule/ss_checker.cpp b/ydb/core/tx/tiering/rule/ss_checker.cpp new file mode 100644 index 00000000000..523ce607f22 --- /dev/null +++ b/ydb/core/tx/tiering/rule/ss_checker.cpp @@ -0,0 +1,26 @@ +#include "ss_checker.h" + +#include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/core/protos/services.pb.h> + +namespace NKikimr::NColumnShard::NTiers { + +void TSSFetchingActor::Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev) { + auto g = PassAwayGuard(); + Controller->FetchingResult(ev->Get()->Record); +} + +TSSFetchingActor::TSSFetchingActor(NSchemeShard::ISSDataProcessor::TPtr processor, + ISSFetchingController::TPtr controller, const TDuration livetime) + : TBase(livetime) + , Processor(processor) + , Controller(controller) +{ + +} + +constexpr NKikimrServices::TActivity::EType TSSFetchingActor::ActorActivityType() { + return NKikimrServices::TActivity::SS_FETCHING_ACTOR; +} + +} diff --git a/ydb/core/tx/tiering/rule/ss_checker.h b/ydb/core/tx/tiering/rule/ss_checker.h new file mode 100644 index 00000000000..f2932523b4f --- /dev/null +++ b/ydb/core/tx/tiering/rule/ss_checker.h @@ -0,0 +1,68 @@ +#pragma once +#include "object.h" +#include "ss_dialog.h" + +#include <ydb/core/base/appdata.h> +#include <ydb/core/base/tablet_pipecache.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/core/tx/tx_proxy/proxy.h> + +namespace NKikimr::NColumnShard::NTiers { + +class ISSFetchingController { +public: + using TPtr = std::shared_ptr<ISSFetchingController>; + virtual ~ISSFetchingController() = default; + virtual void FetchingProblem(const TString& errorMessage) const = 0; + virtual void FetchingResult(const NKikimrScheme::TEvProcessingResponse& result) const = 0; +}; + +class TSSFetchingController: public ISSFetchingController { +private: + const TActorIdentity ActorId; +public: + TSSFetchingController(const TActorIdentity& actorId) + : ActorId(actorId) { + + } + + virtual void FetchingProblem(const TString& errorMessage) const override { + ActorId.Send(ActorId, new NSchemeShard::TEvSchemeShard::TEvProcessingResponse(errorMessage)); + } + virtual void FetchingResult(const NKikimrScheme::TEvProcessingResponse& result) const override { + ActorId.Send(ActorId, new NSchemeShard::TEvSchemeShard::TEvProcessingResponse(result)); + } +}; + +class TSSFetchingActor: public TSSDialogActor { +private: + using TBase = TSSDialogActor; + NSchemeShard::ISSDataProcessor::TPtr Processor; + ISSFetchingController::TPtr Controller; + void Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev); +protected: + virtual void OnBootstrap() override { + Become(&TSSFetchingActor::StateMain); + TBase::OnBootstrap(); + } + virtual void OnFail(const TString& errorMessage) override { + Controller->FetchingProblem(errorMessage); + } + virtual void Execute() override { + auto req = std::make_unique<NSchemeShard::TEvSchemeShard::TEvProcessingRequest>(*Processor); + Send(SchemeShardPipe, new TEvPipeCache::TEvForward(req.release(), SchemeShardId, false)); + } +public: + static constexpr NKikimrServices::TActivity::EType ActorActivityType(); + STFUNC(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(NSchemeShard::TEvSchemeShard::TEvProcessingResponse, Handle); + default: + TBase::StateMain(ev, ctx); + } + } + TSSFetchingActor(NSchemeShard::ISSDataProcessor::TPtr processor, ISSFetchingController::TPtr controller, const TDuration livetime); +}; + +} diff --git a/ydb/core/tx/tiering/rule/ss_dialog.cpp b/ydb/core/tx/tiering/rule/ss_dialog.cpp new file mode 100644 index 00000000000..ebd79e2641b --- /dev/null +++ b/ydb/core/tx/tiering/rule/ss_dialog.cpp @@ -0,0 +1,54 @@ +#include "ss_dialog.h" + +#include <ydb/core/base/path.h> +#include <ydb/core/base/tablet_pipecache.h> +#include <ydb/core/protos/services.pb.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> + +namespace NKikimr::NColumnShard::NTiers { + +void TSSDialogActor::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& /*ev*/) { + OnFail("cannot delivery message to schemeshard"); + PassAway(); +} + +void TSSDialogActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) { + auto* info = ev->Get(); + const auto& request = info->Request; + + if (request->ResultSet.empty()) { + OnFail("navigation problems"); + PassAway(); + return; + } + if (request->ResultSet.size() != 1) { + OnFail("cannot resolve database path"); + PassAway(); + return; + } + auto& entity = request->ResultSet.front(); + SchemeShardId = entity.DomainInfo->ExtractSchemeShard(); + NTabletPipe::TClientConfig clientConfig; + SchemeShardPipe = MakePipePeNodeCacheID(false); + Execute(); +} + +void TSSDialogActor::OnBootstrap() { + auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); + request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName); + auto& entry = request->ResultSet.emplace_back(); + entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; + entry.Path = NKikimr::SplitPath(request->DatabaseName); + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); +} + +void TSSDialogActor::Die(const TActorContext& ctx) { + NTabletPipe::CloseAndForgetClient(SelfId(), SchemeShardPipe); + return IActor::Die(ctx); +} + +void TSSDialogActor::OnTimeout() { + OnFail("timeout"); +} + +} diff --git a/ydb/core/tx/tiering/rule/ss_dialog.h b/ydb/core/tx/tiering/rule/ss_dialog.h new file mode 100644 index 00000000000..bb97e1161a4 --- /dev/null +++ b/ydb/core/tx/tiering/rule/ss_dialog.h @@ -0,0 +1,38 @@ +#pragma once + +#include "timeout.h" + +#include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/core/base/tablet_pipecache.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TSSDialogActor: public TTimeoutActor<TSSDialogActor> { +private: + using TBase = TTimeoutActor<TSSDialogActor>; + void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev); + void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev); +protected: + virtual void OnFail(const TString& errorMessage) = 0; + virtual void OnTimeout() override; + virtual void OnBootstrap() override; + virtual void Execute() = 0; + virtual void Die(const TActorContext& ctx) override; +protected: + ui64 SchemeShardId = 0; + TActorId SchemeShardPipe; +public: + using TBase::TBase; + + STFUNC(StateMain) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); + hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + default: + TBase::StateMain(ev, ctx); + } + } +}; + +} diff --git a/ydb/core/tx/tiering/rule/ss_fetcher.cpp b/ydb/core/tx/tiering/rule/ss_fetcher.cpp new file mode 100644 index 00000000000..b825bbccdc1 --- /dev/null +++ b/ydb/core/tx/tiering/rule/ss_fetcher.cpp @@ -0,0 +1,77 @@ +#include "ss_fetcher.h" +#include <ydb/core/tx/schemeshard/schemeshard.h> + +namespace NKikimr::NColumnShard::NTiers { + +TFetcherCheckUserTieringPermissions::TFactory::TRegistrator<TFetcherCheckUserTieringPermissions> + TFetcherCheckUserTieringPermissions::Registrator(TFetcherCheckUserTieringPermissions::GetTypeIdStatic()); + +void TFetcherCheckUserTieringPermissions::DoProcess(NSchemeShard::TSchemeShard& schemeShard, NKikimrScheme::TEvProcessingResponse& result) const { + TResult content; + content.MutableContent().SetOperationAllow(true); + ui32 access = 0; + access |= NACLib::EAccessRights::AlterSchema; + + if (ActivityType == NMetadata::IOperationsManager::EActivityType::Undefined) { + content.Deny("undefined activity type"); + } else { + bool denied = false; + for (auto&& i : TieringRuleIds) { + const std::set<TPathId>& pathIds = schemeShard.ColumnTables.GetTablesWithTiering(i); + for (auto&& pathId : pathIds) { + auto path = NSchemeShard::TPath::Init(pathId, &schemeShard); + if (!path.IsResolved() || path.IsUnderDeleting() || path.IsDeleted()) { + continue; + } + if (ActivityType == NMetadata::IOperationsManager::EActivityType::Drop) { + denied = true; + content.Deny("tiering in using by table"); + break; + } else if (ActivityType == NMetadata::IOperationsManager::EActivityType::Alter) { + if (!UserToken) { + continue; + } + TSecurityObject sObject(path->Owner, path->ACL, path->IsContainer()); + if (!sObject.CheckAccess(access, *UserToken)) { + denied = true; + content.Deny("no alter permissions for affected table"); + break; + } + } + } + if (denied) { + break; + } + } + } + result.MutableContent()->SetData(content.SerializeToString()); +} + +bool TFetcherCheckUserTieringPermissions::DoDeserializeFromProto(const TProtoClass& protoData) { + if (!TryFromString(protoData.GetActivityType(), ActivityType)) { + ALS_ERROR(0) << "Cannot parse activity type: undefined value = " << protoData.GetActivityType(); + return false; + } + if (protoData.GetUserToken()) { + NACLib::TUserToken uToken(protoData.GetUserToken()); + UserToken = uToken; + } + for (auto&& i : protoData.GetTieringRuleIds()) { + TieringRuleIds.emplace(i); + } + return true; +} + +NKikimr::NColumnShard::NTiers::TFetcherCheckUserTieringPermissions::TProtoClass TFetcherCheckUserTieringPermissions::DoSerializeToProto() const { + TProtoClass result; + result.SetActivityType(::ToString(ActivityType)); + if (UserToken) { + result.SetUserToken(UserToken->SerializeAsString()); + } + for (auto&& i : TieringRuleIds) { + *result.AddTieringRuleIds() = i; + } + return result; +} + +} diff --git a/ydb/core/tx/tiering/rule/ss_fetcher.h b/ydb/core/tx/tiering/rule/ss_fetcher.h new file mode 100644 index 00000000000..9782e325b45 --- /dev/null +++ b/ydb/core/tx/tiering/rule/ss_fetcher.h @@ -0,0 +1,78 @@ +#pragma once +#include "object.h" + +#include <ydb/core/protos/flat_tx_scheme.pb.h> +#include <ydb/core/tx/schemeshard/schemeshard_impl.h> +#include <ydb/core/tx/tiering/common.h> + +#include <ydb/services/bg_tasks/abstract/interface.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TFetcherCheckUserTieringPermissionsResult: public NBackgroundTasks::IProtoStringSerializable< + NKikimrScheme::TFetcherCheckUserTieringPermissionsResult, NBackgroundTasks::IStringSerializable> { +private: + using TProtoClass = NKikimrScheme::TFetcherCheckUserTieringPermissionsResult; + YDB_ACCESSOR_DEF(TProtoClass, Content); +protected: + virtual TProtoClass DoSerializeToProto() const override { + return Content; + } + virtual bool DoDeserializeFromProto(const TProtoClass& protoData) override { + Content = protoData; + return true; + } +public: + void Deny(const TString& reason) { + Content.SetOperationAllow(false); + Content.SetDenyReason(reason); + } +}; + +class TFetcherCheckUserTieringPermissions: public NBackgroundTasks::IProtoStringSerializable< + NKikimrScheme::TFetcherCheckUserTieringPermissions, NSchemeShard::ISSDataProcessor> { +private: + using TBase = NBackgroundTasks::IProtoStringSerializable< + NKikimrScheme::TFetcherCheckUserTieringPermissions, NSchemeShard::ISSDataProcessor>; + using TBase::TFactory; + using TProtoClass = NKikimrScheme::TFetcherCheckUserTieringPermissions; + static TFactory::TRegistrator<TFetcherCheckUserTieringPermissions> Registrator; + YDB_ACCESSOR_DEF(std::set<TString>, TieringRuleIds); + YDB_ACCESSOR_DEF(std::optional<NACLib::TUserToken>, UserToken); + YDB_ACCESSOR(NMetadata::IOperationsManager::EActivityType, ActivityType, NMetadata::IOperationsManager::EActivityType::Undefined); +protected: + virtual TProtoClass DoSerializeToProto() const override; + virtual bool DoDeserializeFromProto(const TProtoClass& protoData) override; + virtual void DoProcess(NSchemeShard::TSchemeShard& schemeShard, NKikimrScheme::TEvProcessingResponse& result) const override; +public: + using TResult = TFetcherCheckUserTieringPermissionsResult; + std::optional<TFetcherCheckUserTieringPermissionsResult> UnpackResult(const TString& content) const { + TFetcherCheckUserTieringPermissionsResult result; + if (!result.DeserializeFromString(content)) { + return {}; + } else { + return result; + } + } + + TFetcherCheckUserTieringPermissions() = default; + + virtual TString DebugString() const override { + TStringBuilder sb; + sb << "USID=" << (UserToken ? UserToken->GetUserSID() : "nobody") << ";"; + sb << "tierings="; + for (auto&& i : TieringRuleIds) { + sb << i << ","; + } + sb << ";"; + return sb; + } + virtual TString GetClassName() const override { + return GetTypeIdStatic(); + } + static TString GetTypeIdStatic() { + return "ss_fetcher_tiering_permissions"; + } +}; + +} diff --git a/ydb/core/tx/tiering/rule/timeout.cpp b/ydb/core/tx/tiering/rule/timeout.cpp new file mode 100644 index 00000000000..b1982635cdb --- /dev/null +++ b/ydb/core/tx/tiering/rule/timeout.cpp @@ -0,0 +1,5 @@ +#include "timeout.h" + +namespace NKikimr::NColumnShard::NTiers { + +} diff --git a/ydb/core/tx/tiering/rule/timeout.h b/ydb/core/tx/tiering/rule/timeout.h new file mode 100644 index 00000000000..437d6fb3285 --- /dev/null +++ b/ydb/core/tx/tiering/rule/timeout.h @@ -0,0 +1,53 @@ +#pragma once + +#include <ydb/core/base/appdata.h> +#include <ydb/core/tx/tiering/common.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/events.h> + +namespace NKikimr::NColumnShard::NTiers { + +class TEvTimeout: public NActors::TEventLocal<TEvTimeout, EEvents::EvTimeout> { + +}; + +template <class TDerived> +class TTimeoutActor: public NActors::TActorBootstrapped<TDerived> { +private: + using TBase = NActors::TActorBootstrapped<TDerived>; + const TInstant Deadline = TInstant::Max(); + void HandleTimeout() { + OnTimeout(); + } +protected: + virtual void OnBootstrap() = 0; + virtual void OnTimeout() = 0; +public: + STATEFN(StateMain) { + switch (ev->GetTypeRewrite()) { + sFunc(TEvTimeout, HandleTimeout); + default: + break; + } + } + + TTimeoutActor(const TInstant deadline) + : Deadline(deadline) { + + } + + TTimeoutActor(const TDuration livetime) + : Deadline(TAppData::TimeProvider->Now() + livetime) { + + } + + void Bootstrap() { + OnBootstrap(); + if (Deadline != TInstant::Max()) { + TBase::Schedule(Deadline, new TEvTimeout); + } + } +}; + +} diff --git a/ydb/core/tx/tiering/snapshot.cpp b/ydb/core/tx/tiering/snapshot.cpp index 8997364d093..e33421e5a5c 100644 --- a/ydb/core/tx/tiering/snapshot.cpp +++ b/ydb/core/tx/tiering/snapshot.cpp @@ -61,6 +61,19 @@ const TTieringRule* TConfigsSnapshot::GetTieringById(const TString& tieringId) c } } +std::set<TString> TConfigsSnapshot::GetTieringIdsForTier(const TString& tierName) const { + std::set<TString> result; + for (auto&& i : TableTierings) { + for (auto&& t : i.second.GetIntervals()) { + if (t.GetTierName() == tierName) { + result.emplace(i.second.GetTieringRuleId()); + break; + } + } + } + return result; +} + TString NTiers::TConfigsSnapshot::DoSerializeToString() const { NJson::TJsonValue result = NJson::JSON_MAP; auto& jsonTiers = result.InsertValue("tiers", NJson::JSON_MAP); diff --git a/ydb/core/tx/tiering/snapshot.h b/ydb/core/tx/tiering/snapshot.h index 07d2b773706..dfb4b59a7ce 100644 --- a/ydb/core/tx/tiering/snapshot.h +++ b/ydb/core/tx/tiering/snapshot.h @@ -21,6 +21,8 @@ protected: virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override; virtual TString DoSerializeToString() const override; public: + + std::set<TString> GetTieringIdsForTier(const TString& tierName) const; const TTieringRule* GetTieringById(const TString& tieringId) const; std::optional<TTierConfig> GetTierById(const TString& tierName) const; using TBase::TBase; diff --git a/ydb/core/tx/tiering/tier/CMakeLists.txt b/ydb/core/tx/tiering/tier/CMakeLists.txt index 1985edde91b..e97416ffc71 100644 --- a/ydb/core/tx/tiering/tier/CMakeLists.txt +++ b/ydb/core/tx/tiering/tier/CMakeLists.txt @@ -8,12 +8,16 @@ add_library(tx-tiering-tier) +target_compile_options(tx-tiering-tier PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_link_libraries(tx-tiering-tier PUBLIC contrib-libs-cxxsupp yutil services-metadata-initializer services-metadata-abstract services-metadata-secret + core-tx-schemeshard ) target_sources(tx-tiering-tier PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier/object.cpp @@ -22,12 +26,16 @@ target_sources(tx-tiering-tier PRIVATE ) add_global_library_for(tx-tiering-tier.global tx-tiering-tier) +target_compile_options(tx-tiering-tier.global PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) target_link_libraries(tx-tiering-tier.global PUBLIC contrib-libs-cxxsupp yutil services-metadata-initializer services-metadata-abstract services-metadata-secret + core-tx-schemeshard ) target_sources(tx-tiering-tier.global PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier/manager.cpp diff --git a/ydb/core/tx/tiering/tier/checker.cpp b/ydb/core/tx/tiering/tier/checker.cpp index 5667f32e3af..08df47bd2c7 100644 --- a/ydb/core/tx/tiering/tier/checker.cpp +++ b/ydb/core/tx/tiering/tier/checker.cpp @@ -1,20 +1,35 @@ #include "checker.h" #include <ydb/core/tx/tiering/external_data.h> +#include <ydb/core/tx/tiering/rule/ss_checker.h> #include <ydb/services/metadata/secret/fetcher.h> namespace NKikimr::NColumnShard::NTiers { void TTierPreparationActor::StartChecker() { + if (!Tierings || !Secrets || !SSCheckResult) { + return; + } auto g = PassAwayGuard(); + if (!SSCheckResult->GetContent().GetOperationAllow()) { + Controller->PreparationProblem(SSCheckResult->GetContent().GetDenyReason()); + return; + } for (auto&& tier : Objects) { if (Context.GetActivityType() == NMetadata::IOperationsManager::EActivityType::Drop) { + std::set<TString> tieringsWithTiers; for (auto&& i : Tierings->GetTableTierings()) { if (i.second.ContainsTier(tier.GetTierName())) { - Controller->PreparationProblem("tier in usage for tiering " + i.first); - return; + tieringsWithTiers.emplace(i.first); + if (tieringsWithTiers.size() > 10) { + break; + } } } + if (tieringsWithTiers.size()) { + Controller->PreparationProblem("tier in usage for tierings: " + JoinSeq(", ", tieringsWithTiers)); + return; + } } if (!Secrets->CheckSecretAccess(tier.GetProtoConfig().GetObjectStorage().GetAccessKey(), Context.GetUserToken())) { Controller->PreparationProblem("no access for secret: " + tier.GetProtoConfig().GetObjectStorage().GetAccessKey()); @@ -27,17 +42,53 @@ void TTierPreparationActor::StartChecker() { Controller->PreparationFinished(std::move(Objects)); } +void TTierPreparationActor::Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev) { + auto& proto = ev->Get()->Record; + if (proto.HasError()) { + Controller->PreparationProblem(proto.GetError().GetErrorMessage()); + PassAway(); + } else if (proto.HasContent()) { + SSCheckResult = SSFetcher->UnpackResult(ev->Get()->Record.GetContent().GetData()); + if (!SSCheckResult) { + Controller->PreparationProblem("cannot unpack ss-fetcher result for class " + SSFetcher->GetClassName()); + PassAway(); + } else { + StartChecker(); + } + } else { + Y_VERIFY(false); + } +} + void TTierPreparationActor::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) { if (auto snapshot = ev->Get()->GetSnapshotPtrAs<NMetadata::NSecret::TSnapshot>()) { Secrets = snapshot; } else if (auto snapshot = ev->Get()->GetSnapshotPtrAs<TConfigsSnapshot>()) { Tierings = snapshot; + std::set<TString> tieringIds; + std::set<TString> tiersChecked; + for (auto&& tier : Objects) { + if (!tiersChecked.emplace(tier.GetTierName()).second) { + continue; + } + auto tIds = Tierings->GetTieringIdsForTier(tier.GetTierName()); + if (tieringIds.empty()) { + tieringIds = std::move(tIds); + } else { + tieringIds.insert(tIds.begin(), tIds.end()); + } + } + { + SSFetcher = std::make_shared<TFetcherCheckUserTieringPermissions>(); + SSFetcher->SetUserToken(Context.GetUserToken()); + SSFetcher->SetActivityType(Context.GetActivityType()); + SSFetcher->MutableTieringRuleIds() = tieringIds; + Register(new TSSFetchingActor(SSFetcher, std::make_shared<TSSFetchingController>(SelfId()), TDuration::Seconds(10))); + } } else { Y_VERIFY(false); } - if (Secrets && Tierings) { - StartChecker(); - } + StartChecker(); } void TTierPreparationActor::Bootstrap() { diff --git a/ydb/core/tx/tiering/tier/checker.h b/ydb/core/tx/tiering/tier/checker.h index 0dd3c247c58..b267ee7c68c 100644 --- a/ydb/core/tx/tiering/tier/checker.h +++ b/ydb/core/tx/tiering/tier/checker.h @@ -1,6 +1,8 @@ #pragma once #include "object.h" +#include <ydb/core/tx/schemeshard/schemeshard.h> +#include <ydb/core/tx/tiering/rule/ss_fetcher.h> #include <ydb/core/tx/tiering/snapshot.h> #include <ydb/services/metadata/abstract/common.h> @@ -17,13 +19,17 @@ private: NMetadata::IOperationsManager::TModificationContext Context; std::shared_ptr<NMetadata::NSecret::TSnapshot> Secrets; std::shared_ptr<TConfigsSnapshot> Tierings; + std::shared_ptr<TFetcherCheckUserTieringPermissions> SSFetcher; + std::optional<TFetcherCheckUserTieringPermissions::TResult> SSCheckResult; void StartChecker(); protected: void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev); + void Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev); public: STATEFN(StateMain) { switch (ev->GetTypeRewrite()) { hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle); + hFunc(NSchemeShard::TEvSchemeShard::TEvProcessingResponse, Handle); default: break; } diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp index 40075877615..6501469c278 100644 --- a/ydb/core/tx/tiering/ut/ut_tiers.cpp +++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp @@ -316,6 +316,8 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { } lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIER)", false); + lHelper.StartSchemaRequest("DROP OBJECT tiering1(TYPE TIERING_RULE)", false); + lHelper.StartSchemaRequest("DROP TABLE `/Root/olapStore/olapTable`"); lHelper.StartSchemaRequest("DROP OBJECT tiering1(TYPE TIERING_RULE)"); lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIER)"); lHelper.StartSchemaRequest("DROP OBJECT tier2(TYPE TIER)"); @@ -392,6 +394,8 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) { lHelper.StartSchemaRequest("DROP OBJECT tier2 (TYPE TIER)", false); lHelper.StartSchemaRequest("DROP OBJECT tier1 (TYPE TIER)", false); lHelper.StartSchemaRequest("DROP OBJECT tiering2 (TYPE TIERING_RULE)"); + lHelper.StartSchemaRequest("DROP OBJECT tiering1 (TYPE TIERING_RULE)", false); + lHelper.StartSchemaRequest("DROP TABLE `/Root/olapStore/olapTable`"); lHelper.StartSchemaRequest("DROP OBJECT tiering1 (TYPE TIERING_RULE)"); { TTestCSEmulator emulator; |