diff options
author | grigoriypisar <[email protected]> | 2023-08-15 12:38:32 +0300 |
---|---|---|
committer | grigoriypisar <[email protected]> | 2023-08-15 15:11:12 +0300 |
commit | eea430045a4ff81c4e59b342127de25e24439990 (patch) | |
tree | 824a022977d455f43f812fec648aa2f9d38a1fe6 | |
parent | 621fda51a62b159d046b3ca185238a9ccacb0c21 (diff) |
insert into private S3
Refactor SinkName.
Fixed secureParams passing.
Pass secure params to tasks.
Updated kqp_physical.proto
6 files changed, 45 insertions, 23 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 155f08800ab..ea06f6e3cf0 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -1346,7 +1346,7 @@ private: taskMeta.ReadInfo.Reverse = reverse; }; - void BuildDatashardTasks(TStageInfo& stageInfo) { + void BuildDatashardTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) { THashMap<ui64, ui64> shardTasks; // shardId -> taskId auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); @@ -1361,7 +1361,7 @@ private: task.Meta.ShardId = shardId; shardTasks.emplace(shardId, task.Id); - BuildSinks(stage, task); + BuildSinks(stage, task, secureParams); return task; }; @@ -1490,7 +1490,7 @@ private: } } - void BuildComputeTasks(TStageInfo& stageInfo) { + void BuildComputeTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) { auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); ui32 partitionsCount = 1; @@ -1544,7 +1544,7 @@ private: task.Meta.ExecuterId = SelfId(); task.Meta.Type = TTaskMeta::TTaskType::Compute; - BuildSinks(stage, task); + BuildSinks(stage, task, secureParams); LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id); } @@ -1668,8 +1668,7 @@ private: size_t readActors = 0; for (ui32 txIdx = 0; txIdx < Request.Transactions.size(); ++txIdx) { auto& tx = Request.Transactions[txIdx]; - const auto& secretNames = tx.Body->GetSecretNames(); - TMap<TString, TString> secureParams; + TMap<TString, TString> secureParams = ResolveSecretNames(tx.Body->GetSecretNames()); for (ui32 stageIdx = 0; stageIdx < tx.Body->StagesSize(); ++stageIdx) { auto& stage = tx.Body->GetStages(stageIdx); @@ -1707,27 +1706,24 @@ private: if (stage.SourcesSize() > 0) { switch (stage.GetSources(0).GetTypeCase()) { case NKqpProto::TKqpSource::kReadRangesSource: - if (auto actors = BuildScanTasksFromSource(stageInfo)) { + if (auto actors = BuildScanTasksFromSource(stageInfo, secureParams)) { readActors += *actors; } else { UnknownAffectedShardCount = true; } break; case NKqpProto::TKqpSource::kExternalSource: - if (!secretNames.empty() && secureParams.empty()) { - ResolveSecretNames(secretNames, secureParams); - } BuildReadTasksFromSource(stageInfo, secureParams); break; default: YQL_ENSURE(false, "unknown source type"); } } else if (stageInfo.Meta.ShardOperations.empty()) { - BuildComputeTasks(stageInfo); + BuildComputeTasks(stageInfo, secureParams); } else if (stageInfo.Meta.IsSysView()) { - BuildSysViewScanTasks(stageInfo); + BuildSysViewScanTasks(stageInfo, secureParams); } else { - BuildDatashardTasks(stageInfo); + BuildDatashardTasks(stageInfo, secureParams); } if (stage.GetIsSinglePartition()) { diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index b299c593d46..be19b632a93 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -396,7 +396,8 @@ protected: virtual void OnSecretsFetched() {} - void ResolveSecretNames(const google::protobuf::RepeatedPtrField<TProtoStringType>& secretNames, TMap<TString, TString>& secureParams) { + TMap<TString, TString> ResolveSecretNames(const google::protobuf::RepeatedPtrField<TProtoStringType>& secretNames) { + TMap<TString, TString> secureParams; for (const auto& secretName : secretNames) { auto secretId = NMetadata::NSecret::TSecretId(UserToken->GetUserSID(), secretName); @@ -405,6 +406,8 @@ protected: secureParams[secretName] = secretValue; } + + return secureParams; } protected: @@ -691,7 +694,7 @@ protected: protected: - void BuildSysViewScanTasks(TStageInfo& stageInfo) { + void BuildSysViewScanTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) { Y_VERIFY_DEBUG(stageInfo.Meta.IsSysView()); auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); @@ -734,19 +737,26 @@ protected: task.Meta.ReadInfo.Reverse = op.GetReadRange().GetReverse(); task.Meta.Type = TTaskMeta::TTaskType::Compute; - BuildSinks(stage, task); + BuildSinks(stage, task, secureParams); LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id); } } - void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task) { + void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task, const TMap<TString, TString>& secureParams) { if (stage.SinksSize() > 0) { YQL_ENSURE(stage.SinksSize() == 1, "multiple sinks are not supported"); const auto& sink = stage.GetSinks(0); YQL_ENSURE(sink.HasExternalSink(), "only external sinks are supported"); const auto& extSink = sink.GetExternalSink(); YQL_ENSURE(sink.GetOutputIndex() < task.Outputs.size()); + + auto sinkName = extSink.GetSinkName(); + if (sinkName) { + auto structuredToken = NYql::CreateStructuredTokenParser(extSink.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson(); + task.Meta.SecureParams.emplace(sinkName, structuredToken); + } + auto& output = task.Outputs[sink.GetOutputIndex()]; output.Type = TTaskOutputType::Sink; output.SinkType = extSink.GetType(); @@ -754,7 +764,7 @@ protected: } } - void BuildReadTasksFromSource(TStageInfo& stageInfo, TMap<TString, TString> secureParams) { + void BuildReadTasksFromSource(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) { const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); YQL_ENSURE(stage.GetSources(0).HasExternalSource()); @@ -780,11 +790,11 @@ protected: task.Meta.Type = TTaskMeta::TTaskType::Compute; - BuildSinks(stage, task); + BuildSinks(stage, task, secureParams); } } - TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo) { + TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) { THashMap<ui64, std::vector<ui64>> nodeTasks; THashMap<ui64, ui64> assignedShardsCount; @@ -893,7 +903,7 @@ protected: settings->SetLockNodeId(self.NodeId()); } - BuildSinks(stage, task); + BuildSinks(stage, task, secureParams); }; if (source.GetSequentialInFlightShards()) { diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 1a397ce4b35..0528fd9919e 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -532,7 +532,7 @@ private: if (stage.SourcesSize() > 0) { switch (stage.GetSources(0).GetTypeCase()) { case NKqpProto::TKqpSource::kReadRangesSource: - BuildScanTasksFromSource(stageInfo); + BuildScanTasksFromSource(stageInfo, {}); break; default: YQL_ENSURE(false, "unknown source type"); @@ -540,7 +540,7 @@ private: } else if (stageInfo.Meta.ShardOperations.empty()) { BuildComputeTasks(stageInfo); } else if (stageInfo.Meta.IsSysView()) { - BuildSysViewScanTasks(stageInfo); + BuildSysViewScanTasks(stageInfo, {}); } else if (stageInfo.Meta.IsOlap() || stageInfo.Meta.IsDatashard()) { BuildScanTasks(stageInfo); } else { diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp index 4e541e0d68d..b7da08f0106 100644 --- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp @@ -913,6 +913,17 @@ private: dqIntegration->FillSinkSettings(sink.Ref(), settings, sinkType); YQL_ENSURE(!settings.type_url().empty(), "Data sink provider \"" << dataSinkCategory << "\" did't fill dq sink settings for its dq sink node"); YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkCategory << "\" did't fill dq sink settings type for its dq sink node"); + + THashMap<TString, TString> secureParams; + NYql::NCommon::FillSecureParams(sink.Ptr(), TypesCtx, secureParams); + if (!secureParams.empty()) { + YQL_ENSURE(secureParams.size() == 1, "Only one SecureParams per sink allowed"); + auto it = secureParams.begin(); + externalSink.SetSinkName(it->first); + auto token = it->second; + externalSink.SetAuthInfo(CreateStructuredTokenParser(token).ToBuilder().RemoveSecrets().ToJson()); + CreateStructuredTokenParser(token).ListReferences(SecretNames); + } } void FillConnection(const TDqConnection& connection, const TMap<ui64, ui32>& stagesMap, diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 56a50bca969..fd208e72a30 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -327,6 +327,9 @@ message TKqpSource { message TKqpExternalSink { string Type = 1; google.protobuf.Any Settings = 2; + + string SinkName = 3; + string AuthInfo = 4; } message TKqpSink { diff --git a/ydb/library/yql/providers/common/structured_token/ut/yql_token_builder_ut.cpp b/ydb/library/yql/providers/common/structured_token/ut/yql_token_builder_ut.cpp index ea929f1c67b..e56bd0263ec 100644 --- a/ydb/library/yql/providers/common/structured_token/ut/yql_token_builder_ut.cpp +++ b/ydb/library/yql/providers/common/structured_token/ut/yql_token_builder_ut.cpp @@ -89,6 +89,8 @@ Y_UNIT_TEST_SUITE(TokenBuilderTest) { UNIT_ASSERT(references.contains("my_passw_reference")); b.ReplaceReferences({{"my_passw_reference", "my_passw_value"}}); UNIT_ASSERT_VALUES_EQUAL(R"({"basic_login":"my_login","basic_password":"my_passw_value"})", b.ToJson()); + b.RemoveSecrets(); + UNIT_ASSERT_VALUES_EQUAL(R"({"basic_login":"my_login"})", b.ToJson()); } Y_UNIT_TEST(IAMToken) { |