summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <[email protected]>2023-08-15 12:38:32 +0300
committergrigoriypisar <[email protected]>2023-08-15 15:11:12 +0300
commiteea430045a4ff81c4e59b342127de25e24439990 (patch)
tree824a022977d455f43f812fec648aa2f9d38a1fe6
parent621fda51a62b159d046b3ca185238a9ccacb0c21 (diff)
insert into private S3
Refactor SinkName. Fixed secureParams passing. Pass secure params to tasks. Updated kqp_physical.proto
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp22
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h26
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp4
-rw-r--r--ydb/core/kqp/query_compiler/kqp_query_compiler.cpp11
-rw-r--r--ydb/core/protos/kqp_physical.proto3
-rw-r--r--ydb/library/yql/providers/common/structured_token/ut/yql_token_builder_ut.cpp2
6 files changed, 45 insertions, 23 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 155f08800ab..ea06f6e3cf0 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1346,7 +1346,7 @@ private:
taskMeta.ReadInfo.Reverse = reverse;
};
- void BuildDatashardTasks(TStageInfo& stageInfo) {
+ void BuildDatashardTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) {
THashMap<ui64, ui64> shardTasks; // shardId -> taskId
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
@@ -1361,7 +1361,7 @@ private:
task.Meta.ShardId = shardId;
shardTasks.emplace(shardId, task.Id);
- BuildSinks(stage, task);
+ BuildSinks(stage, task, secureParams);
return task;
};
@@ -1490,7 +1490,7 @@ private:
}
}
- void BuildComputeTasks(TStageInfo& stageInfo) {
+ void BuildComputeTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) {
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
ui32 partitionsCount = 1;
@@ -1544,7 +1544,7 @@ private:
task.Meta.ExecuterId = SelfId();
task.Meta.Type = TTaskMeta::TTaskType::Compute;
- BuildSinks(stage, task);
+ BuildSinks(stage, task, secureParams);
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
}
@@ -1668,8 +1668,7 @@ private:
size_t readActors = 0;
for (ui32 txIdx = 0; txIdx < Request.Transactions.size(); ++txIdx) {
auto& tx = Request.Transactions[txIdx];
- const auto& secretNames = tx.Body->GetSecretNames();
- TMap<TString, TString> secureParams;
+ TMap<TString, TString> secureParams = ResolveSecretNames(tx.Body->GetSecretNames());
for (ui32 stageIdx = 0; stageIdx < tx.Body->StagesSize(); ++stageIdx) {
auto& stage = tx.Body->GetStages(stageIdx);
@@ -1707,27 +1706,24 @@ private:
if (stage.SourcesSize() > 0) {
switch (stage.GetSources(0).GetTypeCase()) {
case NKqpProto::TKqpSource::kReadRangesSource:
- if (auto actors = BuildScanTasksFromSource(stageInfo)) {
+ if (auto actors = BuildScanTasksFromSource(stageInfo, secureParams)) {
readActors += *actors;
} else {
UnknownAffectedShardCount = true;
}
break;
case NKqpProto::TKqpSource::kExternalSource:
- if (!secretNames.empty() && secureParams.empty()) {
- ResolveSecretNames(secretNames, secureParams);
- }
BuildReadTasksFromSource(stageInfo, secureParams);
break;
default:
YQL_ENSURE(false, "unknown source type");
}
} else if (stageInfo.Meta.ShardOperations.empty()) {
- BuildComputeTasks(stageInfo);
+ BuildComputeTasks(stageInfo, secureParams);
} else if (stageInfo.Meta.IsSysView()) {
- BuildSysViewScanTasks(stageInfo);
+ BuildSysViewScanTasks(stageInfo, secureParams);
} else {
- BuildDatashardTasks(stageInfo);
+ BuildDatashardTasks(stageInfo, secureParams);
}
if (stage.GetIsSinglePartition()) {
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index b299c593d46..be19b632a93 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -396,7 +396,8 @@ protected:
virtual void OnSecretsFetched() {}
- void ResolveSecretNames(const google::protobuf::RepeatedPtrField<TProtoStringType>& secretNames, TMap<TString, TString>& secureParams) {
+ TMap<TString, TString> ResolveSecretNames(const google::protobuf::RepeatedPtrField<TProtoStringType>& secretNames) {
+ TMap<TString, TString> secureParams;
for (const auto& secretName : secretNames) {
auto secretId = NMetadata::NSecret::TSecretId(UserToken->GetUserSID(), secretName);
@@ -405,6 +406,8 @@ protected:
secureParams[secretName] = secretValue;
}
+
+ return secureParams;
}
protected:
@@ -691,7 +694,7 @@ protected:
protected:
- void BuildSysViewScanTasks(TStageInfo& stageInfo) {
+ void BuildSysViewScanTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) {
Y_VERIFY_DEBUG(stageInfo.Meta.IsSysView());
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
@@ -734,19 +737,26 @@ protected:
task.Meta.ReadInfo.Reverse = op.GetReadRange().GetReverse();
task.Meta.Type = TTaskMeta::TTaskType::Compute;
- BuildSinks(stage, task);
+ BuildSinks(stage, task, secureParams);
LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id);
}
}
- void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task) {
+ void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task, const TMap<TString, TString>& secureParams) {
if (stage.SinksSize() > 0) {
YQL_ENSURE(stage.SinksSize() == 1, "multiple sinks are not supported");
const auto& sink = stage.GetSinks(0);
YQL_ENSURE(sink.HasExternalSink(), "only external sinks are supported");
const auto& extSink = sink.GetExternalSink();
YQL_ENSURE(sink.GetOutputIndex() < task.Outputs.size());
+
+ auto sinkName = extSink.GetSinkName();
+ if (sinkName) {
+ auto structuredToken = NYql::CreateStructuredTokenParser(extSink.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson();
+ task.Meta.SecureParams.emplace(sinkName, structuredToken);
+ }
+
auto& output = task.Outputs[sink.GetOutputIndex()];
output.Type = TTaskOutputType::Sink;
output.SinkType = extSink.GetType();
@@ -754,7 +764,7 @@ protected:
}
}
- void BuildReadTasksFromSource(TStageInfo& stageInfo, TMap<TString, TString> secureParams) {
+ void BuildReadTasksFromSource(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) {
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
YQL_ENSURE(stage.GetSources(0).HasExternalSource());
@@ -780,11 +790,11 @@ protected:
task.Meta.Type = TTaskMeta::TTaskType::Compute;
- BuildSinks(stage, task);
+ BuildSinks(stage, task, secureParams);
}
}
- TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo) {
+ TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) {
THashMap<ui64, std::vector<ui64>> nodeTasks;
THashMap<ui64, ui64> assignedShardsCount;
@@ -893,7 +903,7 @@ protected:
settings->SetLockNodeId(self.NodeId());
}
- BuildSinks(stage, task);
+ BuildSinks(stage, task, secureParams);
};
if (source.GetSequentialInFlightShards()) {
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 1a397ce4b35..0528fd9919e 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -532,7 +532,7 @@ private:
if (stage.SourcesSize() > 0) {
switch (stage.GetSources(0).GetTypeCase()) {
case NKqpProto::TKqpSource::kReadRangesSource:
- BuildScanTasksFromSource(stageInfo);
+ BuildScanTasksFromSource(stageInfo, {});
break;
default:
YQL_ENSURE(false, "unknown source type");
@@ -540,7 +540,7 @@ private:
} else if (stageInfo.Meta.ShardOperations.empty()) {
BuildComputeTasks(stageInfo);
} else if (stageInfo.Meta.IsSysView()) {
- BuildSysViewScanTasks(stageInfo);
+ BuildSysViewScanTasks(stageInfo, {});
} else if (stageInfo.Meta.IsOlap() || stageInfo.Meta.IsDatashard()) {
BuildScanTasks(stageInfo);
} else {
diff --git a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
index 4e541e0d68d..b7da08f0106 100644
--- a/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
+++ b/ydb/core/kqp/query_compiler/kqp_query_compiler.cpp
@@ -913,6 +913,17 @@ private:
dqIntegration->FillSinkSettings(sink.Ref(), settings, sinkType);
YQL_ENSURE(!settings.type_url().empty(), "Data sink provider \"" << dataSinkCategory << "\" did't fill dq sink settings for its dq sink node");
YQL_ENSURE(sinkType, "Data sink provider \"" << dataSinkCategory << "\" did't fill dq sink settings type for its dq sink node");
+
+ THashMap<TString, TString> secureParams;
+ NYql::NCommon::FillSecureParams(sink.Ptr(), TypesCtx, secureParams);
+ if (!secureParams.empty()) {
+ YQL_ENSURE(secureParams.size() == 1, "Only one SecureParams per sink allowed");
+ auto it = secureParams.begin();
+ externalSink.SetSinkName(it->first);
+ auto token = it->second;
+ externalSink.SetAuthInfo(CreateStructuredTokenParser(token).ToBuilder().RemoveSecrets().ToJson());
+ CreateStructuredTokenParser(token).ListReferences(SecretNames);
+ }
}
void FillConnection(const TDqConnection& connection, const TMap<ui64, ui32>& stagesMap,
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index 56a50bca969..fd208e72a30 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -327,6 +327,9 @@ message TKqpSource {
message TKqpExternalSink {
string Type = 1;
google.protobuf.Any Settings = 2;
+
+ string SinkName = 3;
+ string AuthInfo = 4;
}
message TKqpSink {
diff --git a/ydb/library/yql/providers/common/structured_token/ut/yql_token_builder_ut.cpp b/ydb/library/yql/providers/common/structured_token/ut/yql_token_builder_ut.cpp
index ea929f1c67b..e56bd0263ec 100644
--- a/ydb/library/yql/providers/common/structured_token/ut/yql_token_builder_ut.cpp
+++ b/ydb/library/yql/providers/common/structured_token/ut/yql_token_builder_ut.cpp
@@ -89,6 +89,8 @@ Y_UNIT_TEST_SUITE(TokenBuilderTest) {
UNIT_ASSERT(references.contains("my_passw_reference"));
b.ReplaceReferences({{"my_passw_reference", "my_passw_value"}});
UNIT_ASSERT_VALUES_EQUAL(R"({"basic_login":"my_login","basic_password":"my_passw_value"})", b.ToJson());
+ b.RemoveSecrets();
+ UNIT_ASSERT_VALUES_EQUAL(R"({"basic_login":"my_login"})", b.ToJson());
}
Y_UNIT_TEST(IAMToken) {