aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-12-06 09:25:59 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-12-06 09:25:59 +0300
commit83bee92cb09a4061f43973771afcc67f8b1865df (patch)
tree1943efe03e72a11aa496ab387ec4ad7b60806498
parente857c58e5e995b376b6c1c8a37dbe40df2b736a6 (diff)
downloadydb-83bee92cb09a4061f43973771afcc67f8b1865df.tar.gz
general schemeshard data fetching for alter-tiering-validation
-rw-r--r--ydb/core/protos/flat_tx_scheme.proto33
-rw-r--r--ydb/core/protos/services.proto1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard.h66
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp17
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h2
-rw-r--r--ydb/core/tx/tiering/common.h3
-rw-r--r--ydb/core/tx/tiering/rule/CMakeLists.txt14
-rw-r--r--ydb/core/tx/tiering/rule/checker.cpp42
-rw-r--r--ydb/core/tx/tiering/rule/checker.h7
-rw-r--r--ydb/core/tx/tiering/rule/ss_checker.cpp26
-rw-r--r--ydb/core/tx/tiering/rule/ss_checker.h68
-rw-r--r--ydb/core/tx/tiering/rule/ss_dialog.cpp54
-rw-r--r--ydb/core/tx/tiering/rule/ss_dialog.h38
-rw-r--r--ydb/core/tx/tiering/rule/ss_fetcher.cpp77
-rw-r--r--ydb/core/tx/tiering/rule/ss_fetcher.h78
-rw-r--r--ydb/core/tx/tiering/rule/timeout.cpp5
-rw-r--r--ydb/core/tx/tiering/rule/timeout.h53
-rw-r--r--ydb/core/tx/tiering/snapshot.cpp13
-rw-r--r--ydb/core/tx/tiering/snapshot.h2
-rw-r--r--ydb/core/tx/tiering/tier/CMakeLists.txt8
-rw-r--r--ydb/core/tx/tiering/tier/checker.cpp61
-rw-r--r--ydb/core/tx/tiering/tier/checker.h6
-rw-r--r--ydb/core/tx/tiering/ut/ut_tiers.cpp4
23 files changed, 669 insertions, 9 deletions
diff --git a/ydb/core/protos/flat_tx_scheme.proto b/ydb/core/protos/flat_tx_scheme.proto
index ebced3f665b..3e22ccf2059 100644
--- a/ydb/core/protos/flat_tx_scheme.proto
+++ b/ydb/core/protos/flat_tx_scheme.proto
@@ -54,6 +54,39 @@ message TEvModifySchemeTransaction {
optional string UserToken = 7; // serialized NACLib::TUserToken
}
+message TFetcherCheckUserTieringPermissionsResult {
+ optional bool OperationAllow = 1;
+ optional string DenyReason = 2;
+}
+
+message TFetcherCheckUserTieringPermissions {
+ optional bytes UserToken = 1;
+ repeated string TieringRuleIds = 2;
+ optional string ActivityType = 3;
+}
+
+message TEvProcessingRequest {
+ optional string ClassName = 1;
+ // Fetcher is string serializable object that deserialized after factory constructed object from ClassName
+ optional bytes Data = 2;
+}
+
+message TEvProcessingResponse {
+ message TContent {
+ // Fetched content is string serializable object that determined by general content fetcher by mthod UnpackContent
+ optional bytes Data = 2;
+ }
+
+ message TError {
+ optional string ErrorMessage = 1;
+ }
+
+ oneof Result {
+ TContent Content = 1;
+ TError Error = 2;
+ }
+}
+
message TEvModifySchemeTransactionResult {
optional EStatus Status = 1;
optional string Reason = 2;
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 781516f43d2..c136090061a 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -954,5 +954,6 @@ message TActivity {
HTTP_MON_SERVICE_NODE_PROXY = 595;
AUDIT_WRITER_ACTOR = 596;
SCHEMESHARD_SVP_MIGRATOR = 597;
+ SS_FETCHING_ACTOR = 598;
};
};
diff --git a/ydb/core/tx/schemeshard/schemeshard.h b/ydb/core/tx/schemeshard/schemeshard.h
index 5d68b0a7769..13dc8ac912d 100644
--- a/ydb/core/tx/schemeshard/schemeshard.h
+++ b/ydb/core/tx/schemeshard/schemeshard.h
@@ -11,6 +11,7 @@
#include <ydb/core/scheme/scheme_tablecell.h>
#include <library/cpp/deprecated/enum_codegen/enum_codegen.h>
+#include <library/cpp/object_factory/object_factory.h>
#include "schemeshard_identificators.h"
@@ -20,8 +21,35 @@ namespace NSchemeShard {
static constexpr ui64 RootSchemeShardId = 0;
static constexpr ui64 RootPathId = 1;
+class TSchemeShard;
struct TSchemeLimits;
+class ISSDataProcessor {
+protected:
+ virtual void DoProcess(TSchemeShard& schemeShard, NKikimrScheme::TEvProcessingResponse& result) const = 0;
+ virtual bool DoDeserializeFromString(const TString& data) = 0;
+ virtual TString DoSerializeToString() const = 0;
+public:
+ using TPtr = std::shared_ptr<ISSDataProcessor>;
+ using TFactory = NObjectFactory::TObjectFactory<ISSDataProcessor, TString>;
+ virtual ~ISSDataProcessor() = default;
+
+ virtual TString DebugString() const = 0;
+
+ TString SerializeToString() const {
+ return DoSerializeToString();
+ }
+ bool DeserializeFromString(const TString& data) {
+ return DoDeserializeFromString(data);
+ }
+
+ virtual TString GetClassName() const = 0;
+
+ void Process(TSchemeShard& schemeShard, NKikimrScheme::TEvProcessingResponse& result) {
+ return DoProcess(schemeShard, result);
+ }
+};
+
struct TEvSchemeShard {
enum EEv {
EvModifySchemeTransaction = EventSpaceBegin(TKikimrEvents::ES_FLAT_TX_SCHEMESHARD), // 271122432
@@ -60,12 +88,50 @@ struct TEvSchemeShard {
EvBackupDatashardResult,
EvCancelTx,
EvCancelTxResult,
+ EvProcessingRequest,
+ EvProcessingResponse,
EvEnd
};
static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_FLAT_TX_SCHEMESHARD), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_FLAT_TX_SCHEMESHARD)");
+ struct TEvProcessingResponse: public TEventPB<TEvProcessingResponse,
+ NKikimrScheme::TEvProcessingResponse, EvProcessingResponse> {
+ private:
+ using TBase = TEventPB<TEvProcessingResponse,
+ NKikimrScheme::TEvProcessingResponse, EvProcessingResponse>;
+ public:
+ using TBase::TBase;
+ TEvProcessingResponse(const TString& errorMessage) {
+ Record.MutableError()->SetErrorMessage(errorMessage);
+ }
+ };
+
+ struct TEvProcessingRequest: public TEventPB<TEvProcessingRequest,
+ NKikimrScheme::TEvProcessingRequest, EvProcessingRequest> {
+ private:
+ using TBase = TEventPB<TEvProcessingRequest,
+ NKikimrScheme::TEvProcessingRequest, EvProcessingRequest>;
+ public:
+ using TBase::TBase;
+ TEvProcessingRequest(const ISSDataProcessor& processor) {
+ Record.SetClassName(processor.GetClassName());
+ Record.SetData(processor.SerializeToString());
+ }
+
+ ISSDataProcessor::TPtr RestoreProcessor() const {
+ auto result = ISSDataProcessor::TFactory::MakeHolder(TBase::Record.GetClassName());
+ if (!result) {
+ return nullptr;
+ } else if (!result->DeserializeFromString(TBase::Record.GetData())) {
+ return nullptr;
+ } else {
+ return std::shared_ptr<ISSDataProcessor>(result.Release());
+ }
+ }
+ };
+
struct TEvModifySchemeTransaction : public TEventPB<TEvModifySchemeTransaction,
NKikimrScheme::TEvModifySchemeTransaction,
EvModifySchemeTransaction> {
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index f52200b2aef..ace8f317e02 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -4182,6 +4182,8 @@ void TSchemeShard::StateWork(STFUNC_SIG) {
HFuncTraced(TEvDataShard::TEvCompactBorrowedResult, Handle);
HFuncTraced(TEvSchemeShard::TEvSyncTenantSchemeShard, Handle);
+ HFuncTraced(TEvSchemeShard::TEvProcessingRequest, Handle);
+
HFuncTraced(TEvSchemeShard::TEvUpdateTenantSchemeShard, Handle);
HFuncTraced(TSchemeBoardEvents::TEvUpdateAck, Handle);
@@ -4781,6 +4783,21 @@ void TSchemeShard::Handle(TEvSchemeShard::TEvModifySchemeTransaction::TPtr &ev,
Execute(CreateTxOperationPropose(ev), ctx);
}
+void TSchemeShard::Handle(TEvSchemeShard::TEvProcessingRequest::TPtr& ev, const TActorContext& ctx) {
+ const auto processor = ev->Get()->RestoreProcessor();
+ LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "TSchemeShard::Handle"
+ << ", at schemeshard: " << TabletID()
+ << ", processor: " << (processor ? processor->DebugString() : "nullptr"));
+ if (processor) {
+ NKikimrScheme::TEvProcessingResponse result;
+ processor->Process(*this, result);
+ ctx.Send(ev->Sender, new TEvSchemeShard::TEvProcessingResponse(result));
+ } else {
+ ctx.Send(ev->Sender, new TEvSchemeShard::TEvProcessingResponse("cannot restore processor: " + ev->Get()->Record.GetClassName()));
+ }
+}
+
void TSchemeShard::Handle(TEvPrivate::TEvProgressOperation::TPtr &ev, const TActorContext &ctx) {
const auto txId = TTxId(ev->Get()->TxId);
if (!Operations.contains(txId)) {
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index db40fd8d7be..09c89915683 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -958,6 +958,8 @@ public:
void Handle(TEvDataShard::TEvCompactTableResult::TPtr &ev, const TActorContext &ctx);
void Handle(TEvDataShard::TEvCompactBorrowedResult::TPtr &ev, const TActorContext &ctx);
+
+ void Handle(TEvSchemeShard::TEvProcessingRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvSchemeShard::TEvSyncTenantSchemeShard::TPtr& ev, const TActorContext& ctx);
void Handle(TEvSchemeShard::TEvUpdateTenantSchemeShard::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/tx/tiering/common.h b/ydb/core/tx/tiering/common.h
index 906f897dd65..1294252b70f 100644
--- a/ydb/core/tx/tiering/common.h
+++ b/ydb/core/tx/tiering/common.h
@@ -9,6 +9,9 @@ namespace NKikimr::NColumnShard::NTiers {
enum EEvents {
EvTierCleared = EventSpaceBegin(TKikimrEvents::ES_TIERING),
+ EvSSFetchingResult,
+ EvSSFetchingProblem,
+ EvTimeout,
EvEnd
};
diff --git a/ydb/core/tx/tiering/rule/CMakeLists.txt b/ydb/core/tx/tiering/rule/CMakeLists.txt
index 9d9cdb53026..022e8654412 100644
--- a/ydb/core/tx/tiering/rule/CMakeLists.txt
+++ b/ydb/core/tx/tiering/rule/CMakeLists.txt
@@ -8,25 +8,39 @@
add_library(tx-tiering-rule)
+target_compile_options(tx-tiering-rule PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_link_libraries(tx-tiering-rule PUBLIC
contrib-libs-cxxsupp
yutil
services-metadata-initializer
services-metadata-abstract
+ services-bg_tasks-abstract
+ core-tx-schemeshard
)
target_sources(tx-tiering-rule PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/object.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/initializer.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/checker.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/ss_checker.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/timeout.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/ss_dialog.cpp
)
add_global_library_for(tx-tiering-rule.global tx-tiering-rule)
+target_compile_options(tx-tiering-rule.global PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_link_libraries(tx-tiering-rule.global PUBLIC
contrib-libs-cxxsupp
yutil
services-metadata-initializer
services-metadata-abstract
+ services-bg_tasks-abstract
+ core-tx-schemeshard
)
target_sources(tx-tiering-rule.global PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/manager.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/rule/ss_fetcher.cpp
)
diff --git a/ydb/core/tx/tiering/rule/checker.cpp b/ydb/core/tx/tiering/rule/checker.cpp
index 0111a56e2ee..fc373a0f833 100644
--- a/ydb/core/tx/tiering/rule/checker.cpp
+++ b/ydb/core/tx/tiering/rule/checker.cpp
@@ -1,13 +1,24 @@
#include "checker.h"
+#include "ss_checker.h"
+#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/tiering/external_data.h>
+#include <ydb/core/tx/tiering/rule/ss_fetcher.h>
+#include <ydb/services/bg_tasks/abstract/interface.h>
#include <ydb/services/metadata/secret/snapshot.h>
#include <ydb/services/metadata/secret/fetcher.h>
namespace NKikimr::NColumnShard::NTiers {
void TRulePreparationActor::StartChecker() {
+ if (!Tierings || !Secrets || !SSCheckResult) {
+ return;
+ }
auto g = PassAwayGuard();
+ if (!SSCheckResult->GetContent().GetOperationAllow()) {
+ Controller->PreparationProblem(SSCheckResult->GetContent().GetDenyReason());
+ return;
+ }
for (auto&& tiering : Objects) {
for (auto&& interval : tiering.GetIntervals()) {
@@ -27,6 +38,24 @@ void TRulePreparationActor::StartChecker() {
Controller->PreparationFinished(std::move(Objects));
}
+void TRulePreparationActor::Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev) {
+ auto& proto = ev->Get()->Record;
+ if (proto.HasError()) {
+ Controller->PreparationProblem(proto.GetError().GetErrorMessage());
+ PassAway();
+ } else if (proto.HasContent()) {
+ SSCheckResult = SSFetcher->UnpackResult(ev->Get()->Record.GetContent().GetData());
+ if (!SSCheckResult) {
+ Controller->PreparationProblem("cannot unpack ss-fetcher result for class " + SSFetcher->GetClassName());
+ PassAway();
+ } else {
+ StartChecker();
+ }
+ } else {
+ Y_VERIFY(false);
+ }
+}
+
void TRulePreparationActor::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) {
if (auto snapshot = ev->Get()->GetSnapshotPtrAs<TConfigsSnapshot>()) {
Tierings = snapshot;
@@ -35,9 +64,7 @@ void TRulePreparationActor::Handle(NMetadataProvider::TEvRefreshSubscriberData::
} else {
Y_VERIFY(false);
}
- if (Tierings && Secrets) {
- StartChecker();
- }
+ StartChecker();
}
void TRulePreparationActor::Bootstrap() {
@@ -46,6 +73,15 @@ void TRulePreparationActor::Bootstrap() {
new NMetadataProvider::TEvAskSnapshot(std::make_shared<TSnapshotConstructor>()));
Send(NMetadataProvider::MakeServiceId(SelfId().NodeId()),
new NMetadataProvider::TEvAskSnapshot(std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>()));
+ {
+ SSFetcher = std::make_shared<TFetcherCheckUserTieringPermissions>();
+ SSFetcher->SetUserToken(Context.GetUserToken());
+ SSFetcher->SetActivityType(Context.GetActivityType());
+ for (auto&& i : Objects) {
+ SSFetcher->MutableTieringRuleIds().emplace(i.GetTieringRuleId());
+ }
+ Register(new TSSFetchingActor(SSFetcher, std::make_shared<TSSFetchingController>(SelfId()), TDuration::Seconds(10)));
+ }
}
TRulePreparationActor::TRulePreparationActor(std::vector<TTieringRule>&& objects,
diff --git a/ydb/core/tx/tiering/rule/checker.h b/ydb/core/tx/tiering/rule/checker.h
index 3f96aec9078..e0dfec22144 100644
--- a/ydb/core/tx/tiering/rule/checker.h
+++ b/ydb/core/tx/tiering/rule/checker.h
@@ -1,6 +1,8 @@
#pragma once
#include "object.h"
+#include "ss_fetcher.h"
+#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/tx/tiering/snapshot.h>
#include <ydb/services/metadata/abstract/common.h>
@@ -17,14 +19,17 @@ private:
NMetadata::IOperationsManager::TModificationContext Context;
std::shared_ptr<TConfigsSnapshot> Tierings;
std::shared_ptr<NMetadata::NSecret::TSnapshot> Secrets;
-
+ std::shared_ptr<TFetcherCheckUserTieringPermissions> SSFetcher;
+ std::optional<TFetcherCheckUserTieringPermissions::TResult> SSCheckResult;
void StartChecker();
protected:
void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev);
+ void Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev);
public:
STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle);
+ hFunc(NSchemeShard::TEvSchemeShard::TEvProcessingResponse, Handle);
default:
break;
}
diff --git a/ydb/core/tx/tiering/rule/ss_checker.cpp b/ydb/core/tx/tiering/rule/ss_checker.cpp
new file mode 100644
index 00000000000..523ce607f22
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/ss_checker.cpp
@@ -0,0 +1,26 @@
+#include "ss_checker.h"
+
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/core/protos/services.pb.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+void TSSFetchingActor::Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev) {
+ auto g = PassAwayGuard();
+ Controller->FetchingResult(ev->Get()->Record);
+}
+
+TSSFetchingActor::TSSFetchingActor(NSchemeShard::ISSDataProcessor::TPtr processor,
+ ISSFetchingController::TPtr controller, const TDuration livetime)
+ : TBase(livetime)
+ , Processor(processor)
+ , Controller(controller)
+{
+
+}
+
+constexpr NKikimrServices::TActivity::EType TSSFetchingActor::ActorActivityType() {
+ return NKikimrServices::TActivity::SS_FETCHING_ACTOR;
+}
+
+}
diff --git a/ydb/core/tx/tiering/rule/ss_checker.h b/ydb/core/tx/tiering/rule/ss_checker.h
new file mode 100644
index 00000000000..f2932523b4f
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/ss_checker.h
@@ -0,0 +1,68 @@
+#pragma once
+#include "object.h"
+#include "ss_dialog.h"
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class ISSFetchingController {
+public:
+ using TPtr = std::shared_ptr<ISSFetchingController>;
+ virtual ~ISSFetchingController() = default;
+ virtual void FetchingProblem(const TString& errorMessage) const = 0;
+ virtual void FetchingResult(const NKikimrScheme::TEvProcessingResponse& result) const = 0;
+};
+
+class TSSFetchingController: public ISSFetchingController {
+private:
+ const TActorIdentity ActorId;
+public:
+ TSSFetchingController(const TActorIdentity& actorId)
+ : ActorId(actorId) {
+
+ }
+
+ virtual void FetchingProblem(const TString& errorMessage) const override {
+ ActorId.Send(ActorId, new NSchemeShard::TEvSchemeShard::TEvProcessingResponse(errorMessage));
+ }
+ virtual void FetchingResult(const NKikimrScheme::TEvProcessingResponse& result) const override {
+ ActorId.Send(ActorId, new NSchemeShard::TEvSchemeShard::TEvProcessingResponse(result));
+ }
+};
+
+class TSSFetchingActor: public TSSDialogActor {
+private:
+ using TBase = TSSDialogActor;
+ NSchemeShard::ISSDataProcessor::TPtr Processor;
+ ISSFetchingController::TPtr Controller;
+ void Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev);
+protected:
+ virtual void OnBootstrap() override {
+ Become(&TSSFetchingActor::StateMain);
+ TBase::OnBootstrap();
+ }
+ virtual void OnFail(const TString& errorMessage) override {
+ Controller->FetchingProblem(errorMessage);
+ }
+ virtual void Execute() override {
+ auto req = std::make_unique<NSchemeShard::TEvSchemeShard::TEvProcessingRequest>(*Processor);
+ Send(SchemeShardPipe, new TEvPipeCache::TEvForward(req.release(), SchemeShardId, false));
+ }
+public:
+ static constexpr NKikimrServices::TActivity::EType ActorActivityType();
+ STFUNC(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NSchemeShard::TEvSchemeShard::TEvProcessingResponse, Handle);
+ default:
+ TBase::StateMain(ev, ctx);
+ }
+ }
+ TSSFetchingActor(NSchemeShard::ISSDataProcessor::TPtr processor, ISSFetchingController::TPtr controller, const TDuration livetime);
+};
+
+}
diff --git a/ydb/core/tx/tiering/rule/ss_dialog.cpp b/ydb/core/tx/tiering/rule/ss_dialog.cpp
new file mode 100644
index 00000000000..ebd79e2641b
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/ss_dialog.cpp
@@ -0,0 +1,54 @@
+#include "ss_dialog.h"
+
+#include <ydb/core/base/path.h>
+#include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/core/protos/services.pb.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+void TSSDialogActor::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& /*ev*/) {
+ OnFail("cannot delivery message to schemeshard");
+ PassAway();
+}
+
+void TSSDialogActor::Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
+ auto* info = ev->Get();
+ const auto& request = info->Request;
+
+ if (request->ResultSet.empty()) {
+ OnFail("navigation problems");
+ PassAway();
+ return;
+ }
+ if (request->ResultSet.size() != 1) {
+ OnFail("cannot resolve database path");
+ PassAway();
+ return;
+ }
+ auto& entity = request->ResultSet.front();
+ SchemeShardId = entity.DomainInfo->ExtractSchemeShard();
+ NTabletPipe::TClientConfig clientConfig;
+ SchemeShardPipe = MakePipePeNodeCacheID(false);
+ Execute();
+}
+
+void TSSDialogActor::OnBootstrap() {
+ auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
+ request->DatabaseName = NKikimr::CanonizePath(AppData()->TenantName);
+ auto& entry = request->ResultSet.emplace_back();
+ entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
+ entry.Path = NKikimr::SplitPath(request->DatabaseName);
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
+}
+
+void TSSDialogActor::Die(const TActorContext& ctx) {
+ NTabletPipe::CloseAndForgetClient(SelfId(), SchemeShardPipe);
+ return IActor::Die(ctx);
+}
+
+void TSSDialogActor::OnTimeout() {
+ OnFail("timeout");
+}
+
+}
diff --git a/ydb/core/tx/tiering/rule/ss_dialog.h b/ydb/core/tx/tiering/rule/ss_dialog.h
new file mode 100644
index 00000000000..bb97e1161a4
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/ss_dialog.h
@@ -0,0 +1,38 @@
+#pragma once
+
+#include "timeout.h"
+
+#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/core/base/tablet_pipecache.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TSSDialogActor: public TTimeoutActor<TSSDialogActor> {
+private:
+ using TBase = TTimeoutActor<TSSDialogActor>;
+ void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev);
+ void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev);
+protected:
+ virtual void OnFail(const TString& errorMessage) = 0;
+ virtual void OnTimeout() override;
+ virtual void OnBootstrap() override;
+ virtual void Execute() = 0;
+ virtual void Die(const TActorContext& ctx) override;
+protected:
+ ui64 SchemeShardId = 0;
+ TActorId SchemeShardPipe;
+public:
+ using TBase::TBase;
+
+ STFUNC(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
+ hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
+ default:
+ TBase::StateMain(ev, ctx);
+ }
+ }
+};
+
+}
diff --git a/ydb/core/tx/tiering/rule/ss_fetcher.cpp b/ydb/core/tx/tiering/rule/ss_fetcher.cpp
new file mode 100644
index 00000000000..b825bbccdc1
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/ss_fetcher.cpp
@@ -0,0 +1,77 @@
+#include "ss_fetcher.h"
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+TFetcherCheckUserTieringPermissions::TFactory::TRegistrator<TFetcherCheckUserTieringPermissions>
+ TFetcherCheckUserTieringPermissions::Registrator(TFetcherCheckUserTieringPermissions::GetTypeIdStatic());
+
+void TFetcherCheckUserTieringPermissions::DoProcess(NSchemeShard::TSchemeShard& schemeShard, NKikimrScheme::TEvProcessingResponse& result) const {
+ TResult content;
+ content.MutableContent().SetOperationAllow(true);
+ ui32 access = 0;
+ access |= NACLib::EAccessRights::AlterSchema;
+
+ if (ActivityType == NMetadata::IOperationsManager::EActivityType::Undefined) {
+ content.Deny("undefined activity type");
+ } else {
+ bool denied = false;
+ for (auto&& i : TieringRuleIds) {
+ const std::set<TPathId>& pathIds = schemeShard.ColumnTables.GetTablesWithTiering(i);
+ for (auto&& pathId : pathIds) {
+ auto path = NSchemeShard::TPath::Init(pathId, &schemeShard);
+ if (!path.IsResolved() || path.IsUnderDeleting() || path.IsDeleted()) {
+ continue;
+ }
+ if (ActivityType == NMetadata::IOperationsManager::EActivityType::Drop) {
+ denied = true;
+ content.Deny("tiering in using by table");
+ break;
+ } else if (ActivityType == NMetadata::IOperationsManager::EActivityType::Alter) {
+ if (!UserToken) {
+ continue;
+ }
+ TSecurityObject sObject(path->Owner, path->ACL, path->IsContainer());
+ if (!sObject.CheckAccess(access, *UserToken)) {
+ denied = true;
+ content.Deny("no alter permissions for affected table");
+ break;
+ }
+ }
+ }
+ if (denied) {
+ break;
+ }
+ }
+ }
+ result.MutableContent()->SetData(content.SerializeToString());
+}
+
+bool TFetcherCheckUserTieringPermissions::DoDeserializeFromProto(const TProtoClass& protoData) {
+ if (!TryFromString(protoData.GetActivityType(), ActivityType)) {
+ ALS_ERROR(0) << "Cannot parse activity type: undefined value = " << protoData.GetActivityType();
+ return false;
+ }
+ if (protoData.GetUserToken()) {
+ NACLib::TUserToken uToken(protoData.GetUserToken());
+ UserToken = uToken;
+ }
+ for (auto&& i : protoData.GetTieringRuleIds()) {
+ TieringRuleIds.emplace(i);
+ }
+ return true;
+}
+
+NKikimr::NColumnShard::NTiers::TFetcherCheckUserTieringPermissions::TProtoClass TFetcherCheckUserTieringPermissions::DoSerializeToProto() const {
+ TProtoClass result;
+ result.SetActivityType(::ToString(ActivityType));
+ if (UserToken) {
+ result.SetUserToken(UserToken->SerializeAsString());
+ }
+ for (auto&& i : TieringRuleIds) {
+ *result.AddTieringRuleIds() = i;
+ }
+ return result;
+}
+
+}
diff --git a/ydb/core/tx/tiering/rule/ss_fetcher.h b/ydb/core/tx/tiering/rule/ss_fetcher.h
new file mode 100644
index 00000000000..9782e325b45
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/ss_fetcher.h
@@ -0,0 +1,78 @@
+#pragma once
+#include "object.h"
+
+#include <ydb/core/protos/flat_tx_scheme.pb.h>
+#include <ydb/core/tx/schemeshard/schemeshard_impl.h>
+#include <ydb/core/tx/tiering/common.h>
+
+#include <ydb/services/bg_tasks/abstract/interface.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TFetcherCheckUserTieringPermissionsResult: public NBackgroundTasks::IProtoStringSerializable<
+ NKikimrScheme::TFetcherCheckUserTieringPermissionsResult, NBackgroundTasks::IStringSerializable> {
+private:
+ using TProtoClass = NKikimrScheme::TFetcherCheckUserTieringPermissionsResult;
+ YDB_ACCESSOR_DEF(TProtoClass, Content);
+protected:
+ virtual TProtoClass DoSerializeToProto() const override {
+ return Content;
+ }
+ virtual bool DoDeserializeFromProto(const TProtoClass& protoData) override {
+ Content = protoData;
+ return true;
+ }
+public:
+ void Deny(const TString& reason) {
+ Content.SetOperationAllow(false);
+ Content.SetDenyReason(reason);
+ }
+};
+
+class TFetcherCheckUserTieringPermissions: public NBackgroundTasks::IProtoStringSerializable<
+ NKikimrScheme::TFetcherCheckUserTieringPermissions, NSchemeShard::ISSDataProcessor> {
+private:
+ using TBase = NBackgroundTasks::IProtoStringSerializable<
+ NKikimrScheme::TFetcherCheckUserTieringPermissions, NSchemeShard::ISSDataProcessor>;
+ using TBase::TFactory;
+ using TProtoClass = NKikimrScheme::TFetcherCheckUserTieringPermissions;
+ static TFactory::TRegistrator<TFetcherCheckUserTieringPermissions> Registrator;
+ YDB_ACCESSOR_DEF(std::set<TString>, TieringRuleIds);
+ YDB_ACCESSOR_DEF(std::optional<NACLib::TUserToken>, UserToken);
+ YDB_ACCESSOR(NMetadata::IOperationsManager::EActivityType, ActivityType, NMetadata::IOperationsManager::EActivityType::Undefined);
+protected:
+ virtual TProtoClass DoSerializeToProto() const override;
+ virtual bool DoDeserializeFromProto(const TProtoClass& protoData) override;
+ virtual void DoProcess(NSchemeShard::TSchemeShard& schemeShard, NKikimrScheme::TEvProcessingResponse& result) const override;
+public:
+ using TResult = TFetcherCheckUserTieringPermissionsResult;
+ std::optional<TFetcherCheckUserTieringPermissionsResult> UnpackResult(const TString& content) const {
+ TFetcherCheckUserTieringPermissionsResult result;
+ if (!result.DeserializeFromString(content)) {
+ return {};
+ } else {
+ return result;
+ }
+ }
+
+ TFetcherCheckUserTieringPermissions() = default;
+
+ virtual TString DebugString() const override {
+ TStringBuilder sb;
+ sb << "USID=" << (UserToken ? UserToken->GetUserSID() : "nobody") << ";";
+ sb << "tierings=";
+ for (auto&& i : TieringRuleIds) {
+ sb << i << ",";
+ }
+ sb << ";";
+ return sb;
+ }
+ virtual TString GetClassName() const override {
+ return GetTypeIdStatic();
+ }
+ static TString GetTypeIdStatic() {
+ return "ss_fetcher_tiering_permissions";
+ }
+};
+
+}
diff --git a/ydb/core/tx/tiering/rule/timeout.cpp b/ydb/core/tx/tiering/rule/timeout.cpp
new file mode 100644
index 00000000000..b1982635cdb
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/timeout.cpp
@@ -0,0 +1,5 @@
+#include "timeout.h"
+
+namespace NKikimr::NColumnShard::NTiers {
+
+}
diff --git a/ydb/core/tx/tiering/rule/timeout.h b/ydb/core/tx/tiering/rule/timeout.h
new file mode 100644
index 00000000000..437d6fb3285
--- /dev/null
+++ b/ydb/core/tx/tiering/rule/timeout.h
@@ -0,0 +1,53 @@
+#pragma once
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/tx/tiering/common.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/events.h>
+
+namespace NKikimr::NColumnShard::NTiers {
+
+class TEvTimeout: public NActors::TEventLocal<TEvTimeout, EEvents::EvTimeout> {
+
+};
+
+template <class TDerived>
+class TTimeoutActor: public NActors::TActorBootstrapped<TDerived> {
+private:
+ using TBase = NActors::TActorBootstrapped<TDerived>;
+ const TInstant Deadline = TInstant::Max();
+ void HandleTimeout() {
+ OnTimeout();
+ }
+protected:
+ virtual void OnBootstrap() = 0;
+ virtual void OnTimeout() = 0;
+public:
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ sFunc(TEvTimeout, HandleTimeout);
+ default:
+ break;
+ }
+ }
+
+ TTimeoutActor(const TInstant deadline)
+ : Deadline(deadline) {
+
+ }
+
+ TTimeoutActor(const TDuration livetime)
+ : Deadline(TAppData::TimeProvider->Now() + livetime) {
+
+ }
+
+ void Bootstrap() {
+ OnBootstrap();
+ if (Deadline != TInstant::Max()) {
+ TBase::Schedule(Deadline, new TEvTimeout);
+ }
+ }
+};
+
+}
diff --git a/ydb/core/tx/tiering/snapshot.cpp b/ydb/core/tx/tiering/snapshot.cpp
index 8997364d093..e33421e5a5c 100644
--- a/ydb/core/tx/tiering/snapshot.cpp
+++ b/ydb/core/tx/tiering/snapshot.cpp
@@ -61,6 +61,19 @@ const TTieringRule* TConfigsSnapshot::GetTieringById(const TString& tieringId) c
}
}
+std::set<TString> TConfigsSnapshot::GetTieringIdsForTier(const TString& tierName) const {
+ std::set<TString> result;
+ for (auto&& i : TableTierings) {
+ for (auto&& t : i.second.GetIntervals()) {
+ if (t.GetTierName() == tierName) {
+ result.emplace(i.second.GetTieringRuleId());
+ break;
+ }
+ }
+ }
+ return result;
+}
+
TString NTiers::TConfigsSnapshot::DoSerializeToString() const {
NJson::TJsonValue result = NJson::JSON_MAP;
auto& jsonTiers = result.InsertValue("tiers", NJson::JSON_MAP);
diff --git a/ydb/core/tx/tiering/snapshot.h b/ydb/core/tx/tiering/snapshot.h
index 07d2b773706..dfb4b59a7ce 100644
--- a/ydb/core/tx/tiering/snapshot.h
+++ b/ydb/core/tx/tiering/snapshot.h
@@ -21,6 +21,8 @@ protected:
virtual bool DoDeserializeFromResultSet(const Ydb::Table::ExecuteQueryResult& rawData) override;
virtual TString DoSerializeToString() const override;
public:
+
+ std::set<TString> GetTieringIdsForTier(const TString& tierName) const;
const TTieringRule* GetTieringById(const TString& tieringId) const;
std::optional<TTierConfig> GetTierById(const TString& tierName) const;
using TBase::TBase;
diff --git a/ydb/core/tx/tiering/tier/CMakeLists.txt b/ydb/core/tx/tiering/tier/CMakeLists.txt
index 1985edde91b..e97416ffc71 100644
--- a/ydb/core/tx/tiering/tier/CMakeLists.txt
+++ b/ydb/core/tx/tiering/tier/CMakeLists.txt
@@ -8,12 +8,16 @@
add_library(tx-tiering-tier)
+target_compile_options(tx-tiering-tier PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_link_libraries(tx-tiering-tier PUBLIC
contrib-libs-cxxsupp
yutil
services-metadata-initializer
services-metadata-abstract
services-metadata-secret
+ core-tx-schemeshard
)
target_sources(tx-tiering-tier PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier/object.cpp
@@ -22,12 +26,16 @@ target_sources(tx-tiering-tier PRIVATE
)
add_global_library_for(tx-tiering-tier.global tx-tiering-tier)
+target_compile_options(tx-tiering-tier.global PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
target_link_libraries(tx-tiering-tier.global PUBLIC
contrib-libs-cxxsupp
yutil
services-metadata-initializer
services-metadata-abstract
services-metadata-secret
+ core-tx-schemeshard
)
target_sources(tx-tiering-tier.global PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/tiering/tier/manager.cpp
diff --git a/ydb/core/tx/tiering/tier/checker.cpp b/ydb/core/tx/tiering/tier/checker.cpp
index 5667f32e3af..08df47bd2c7 100644
--- a/ydb/core/tx/tiering/tier/checker.cpp
+++ b/ydb/core/tx/tiering/tier/checker.cpp
@@ -1,20 +1,35 @@
#include "checker.h"
#include <ydb/core/tx/tiering/external_data.h>
+#include <ydb/core/tx/tiering/rule/ss_checker.h>
#include <ydb/services/metadata/secret/fetcher.h>
namespace NKikimr::NColumnShard::NTiers {
void TTierPreparationActor::StartChecker() {
+ if (!Tierings || !Secrets || !SSCheckResult) {
+ return;
+ }
auto g = PassAwayGuard();
+ if (!SSCheckResult->GetContent().GetOperationAllow()) {
+ Controller->PreparationProblem(SSCheckResult->GetContent().GetDenyReason());
+ return;
+ }
for (auto&& tier : Objects) {
if (Context.GetActivityType() == NMetadata::IOperationsManager::EActivityType::Drop) {
+ std::set<TString> tieringsWithTiers;
for (auto&& i : Tierings->GetTableTierings()) {
if (i.second.ContainsTier(tier.GetTierName())) {
- Controller->PreparationProblem("tier in usage for tiering " + i.first);
- return;
+ tieringsWithTiers.emplace(i.first);
+ if (tieringsWithTiers.size() > 10) {
+ break;
+ }
}
}
+ if (tieringsWithTiers.size()) {
+ Controller->PreparationProblem("tier in usage for tierings: " + JoinSeq(", ", tieringsWithTiers));
+ return;
+ }
}
if (!Secrets->CheckSecretAccess(tier.GetProtoConfig().GetObjectStorage().GetAccessKey(), Context.GetUserToken())) {
Controller->PreparationProblem("no access for secret: " + tier.GetProtoConfig().GetObjectStorage().GetAccessKey());
@@ -27,17 +42,53 @@ void TTierPreparationActor::StartChecker() {
Controller->PreparationFinished(std::move(Objects));
}
+void TTierPreparationActor::Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev) {
+ auto& proto = ev->Get()->Record;
+ if (proto.HasError()) {
+ Controller->PreparationProblem(proto.GetError().GetErrorMessage());
+ PassAway();
+ } else if (proto.HasContent()) {
+ SSCheckResult = SSFetcher->UnpackResult(ev->Get()->Record.GetContent().GetData());
+ if (!SSCheckResult) {
+ Controller->PreparationProblem("cannot unpack ss-fetcher result for class " + SSFetcher->GetClassName());
+ PassAway();
+ } else {
+ StartChecker();
+ }
+ } else {
+ Y_VERIFY(false);
+ }
+}
+
void TTierPreparationActor::Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev) {
if (auto snapshot = ev->Get()->GetSnapshotPtrAs<NMetadata::NSecret::TSnapshot>()) {
Secrets = snapshot;
} else if (auto snapshot = ev->Get()->GetSnapshotPtrAs<TConfigsSnapshot>()) {
Tierings = snapshot;
+ std::set<TString> tieringIds;
+ std::set<TString> tiersChecked;
+ for (auto&& tier : Objects) {
+ if (!tiersChecked.emplace(tier.GetTierName()).second) {
+ continue;
+ }
+ auto tIds = Tierings->GetTieringIdsForTier(tier.GetTierName());
+ if (tieringIds.empty()) {
+ tieringIds = std::move(tIds);
+ } else {
+ tieringIds.insert(tIds.begin(), tIds.end());
+ }
+ }
+ {
+ SSFetcher = std::make_shared<TFetcherCheckUserTieringPermissions>();
+ SSFetcher->SetUserToken(Context.GetUserToken());
+ SSFetcher->SetActivityType(Context.GetActivityType());
+ SSFetcher->MutableTieringRuleIds() = tieringIds;
+ Register(new TSSFetchingActor(SSFetcher, std::make_shared<TSSFetchingController>(SelfId()), TDuration::Seconds(10)));
+ }
} else {
Y_VERIFY(false);
}
- if (Secrets && Tierings) {
- StartChecker();
- }
+ StartChecker();
}
void TTierPreparationActor::Bootstrap() {
diff --git a/ydb/core/tx/tiering/tier/checker.h b/ydb/core/tx/tiering/tier/checker.h
index 0dd3c247c58..b267ee7c68c 100644
--- a/ydb/core/tx/tiering/tier/checker.h
+++ b/ydb/core/tx/tiering/tier/checker.h
@@ -1,6 +1,8 @@
#pragma once
#include "object.h"
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/tx/tiering/rule/ss_fetcher.h>
#include <ydb/core/tx/tiering/snapshot.h>
#include <ydb/services/metadata/abstract/common.h>
@@ -17,13 +19,17 @@ private:
NMetadata::IOperationsManager::TModificationContext Context;
std::shared_ptr<NMetadata::NSecret::TSnapshot> Secrets;
std::shared_ptr<TConfigsSnapshot> Tierings;
+ std::shared_ptr<TFetcherCheckUserTieringPermissions> SSFetcher;
+ std::optional<TFetcherCheckUserTieringPermissions::TResult> SSCheckResult;
void StartChecker();
protected:
void Handle(NMetadataProvider::TEvRefreshSubscriberData::TPtr& ev);
+ void Handle(NSchemeShard::TEvSchemeShard::TEvProcessingResponse::TPtr& ev);
public:
STATEFN(StateMain) {
switch (ev->GetTypeRewrite()) {
hFunc(NMetadataProvider::TEvRefreshSubscriberData, Handle);
+ hFunc(NSchemeShard::TEvSchemeShard::TEvProcessingResponse, Handle);
default:
break;
}
diff --git a/ydb/core/tx/tiering/ut/ut_tiers.cpp b/ydb/core/tx/tiering/ut/ut_tiers.cpp
index 40075877615..6501469c278 100644
--- a/ydb/core/tx/tiering/ut/ut_tiers.cpp
+++ b/ydb/core/tx/tiering/ut/ut_tiers.cpp
@@ -316,6 +316,8 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
}
lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIER)", false);
+ lHelper.StartSchemaRequest("DROP OBJECT tiering1(TYPE TIERING_RULE)", false);
+ lHelper.StartSchemaRequest("DROP TABLE `/Root/olapStore/olapTable`");
lHelper.StartSchemaRequest("DROP OBJECT tiering1(TYPE TIERING_RULE)");
lHelper.StartSchemaRequest("DROP OBJECT tier1(TYPE TIER)");
lHelper.StartSchemaRequest("DROP OBJECT tier2(TYPE TIER)");
@@ -392,6 +394,8 @@ Y_UNIT_TEST_SUITE(ColumnShardTiers) {
lHelper.StartSchemaRequest("DROP OBJECT tier2 (TYPE TIER)", false);
lHelper.StartSchemaRequest("DROP OBJECT tier1 (TYPE TIER)", false);
lHelper.StartSchemaRequest("DROP OBJECT tiering2 (TYPE TIERING_RULE)");
+ lHelper.StartSchemaRequest("DROP OBJECT tiering1 (TYPE TIERING_RULE)", false);
+ lHelper.StartSchemaRequest("DROP TABLE `/Root/olapStore/olapTable`");
lHelper.StartSchemaRequest("DROP OBJECT tiering1 (TYPE TIERING_RULE)");
{
TTestCSEmulator emulator;