aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2024-10-03 08:49:42 +0300
committerGitHub <noreply@github.com>2024-10-03 08:49:42 +0300
commita491b73df338c8211563263223f33cf8439180ab (patch)
treeb21636c03b19e3252b8d50f9c0b4fd4d8467e7d9
parent91f8a44b4102ecbc5d62bfc87430117cc5719e89 (diff)
downloadydb-a491b73df338c8211563263223f33cf8439180ab.tar.gz
get rid of mkql results in scripting (#9997)
-rw-r--r--ydb/core/grpc_services/rpc_execute_data_query.cpp3
-rw-r--r--ydb/core/grpc_services/rpc_execute_yql_script.cpp6
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp4
-rw-r--r--ydb/core/kqp/common/events/query.h4
-rw-r--r--ydb/core/kqp/common/kqp.h11
-rw-r--r--ydb/core/kqp/common/kqp_event_impl.cpp1
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp28
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp4
-rw-r--r--ydb/core/kqp/host/kqp_runner.cpp16
-rw-r--r--ydb/core/kqp/host/kqp_transform.h5
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_exec.cpp6
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway.h2
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_results.cpp88
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_results.h5
-rw-r--r--ydb/core/kqp/session_actor/kqp_response.cpp30
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp37
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_actor.cpp4
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_common.cpp2
-rw-r--r--ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp37
-rw-r--r--ydb/core/load_test/aggregated_result.cpp87
-rw-r--r--ydb/core/protos/kqp.proto4
-rw-r--r--ydb/core/viewer/viewer_query.h8
-rw-r--r--ydb/core/viewer/viewer_query_old.h8
-rw-r--r--ydb/tests/tools/kqprun/src/ydb_setup.cpp7
24 files changed, 121 insertions, 286 deletions
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp
index 143101b4a2..452e36874a 100644
--- a/ydb/core/grpc_services/rpc_execute_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp
@@ -189,9 +189,8 @@ public:
// https://protobuf.dev/reference/cpp/arenas/#swap
// Actualy will be copy in case pf remote execution
queryResult->mutable_result_sets()->Swap(record.MutableResponse()->MutableYdbResults());
- } else {
- NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
}
+
ConvertQueryStats(kqpResponse, queryResult);
if (kqpResponse.HasTxMeta()) {
queryResult->mutable_tx_meta()->CopyFrom(kqpResponse.GetTxMeta());
diff --git a/ydb/core/grpc_services/rpc_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_execute_yql_script.cpp
index 18cd27d39e..d8e06561c8 100644
--- a/ydb/core/grpc_services/rpc_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_execute_yql_script.cpp
@@ -101,7 +101,11 @@ public:
auto queryResult = TEvExecuteYqlScriptRequest::AllocateResult<TResult>(Request_);
try {
- NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult);
+ const auto& results = kqpResponse.GetYdbResults();
+ for (const auto& result : results) {
+ queryResult->add_result_sets()->CopyFrom(result);
+ }
+
} catch (const std::exception& ex) {
NYql::TIssues issues;
issues.AddIssue(NYql::ExceptionToIssue(ex));
diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
index 2b80ebc5d9..0a5c5b260a 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
@@ -50,7 +50,7 @@ namespace {
{}
NKqp::TEvKqp::TEvDataQueryStreamPart::TPtr Handle;
- google::protobuf::RepeatedPtrField<NKikimrMiniKQL::TResult>::const_iterator ResultIterator;
+ google::protobuf::RepeatedPtrField<Ydb::ResultSet>::const_iterator ResultIterator;
};
enum EStreamRpcWakeupTag : ui64 {
@@ -220,7 +220,7 @@ private:
auto result = response.mutable_result();
try {
- NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set());
+ result->mutable_result_set()->CopyFrom(kqpResult);
} catch (std::exception ex) {
ReplyFinishStream(ex.what());
}
diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h
index 9609273320..b180f66549 100644
--- a/ydb/core/kqp/common/events/query.h
+++ b/ydb/core/kqp/common/events/query.h
@@ -70,7 +70,9 @@ public:
const TQueryRequestSettings& querySettings = TQueryRequestSettings(),
const TString& poolId = "");
- TEvQueryRequest() = default;
+ TEvQueryRequest() {
+ Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
+ }
bool IsSerializable() const override {
return true;
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index 14785394fa..e0bccb2e50 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -33,18 +33,9 @@
namespace NKikimr::NKqp {
-void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to);
-
TString ScriptExecutionRunnerActorIdString(const NActors::TActorId& actorId);
bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId& actorId);
-template<typename TFrom, typename TTo>
-inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
- const auto& results = from.GetResults();
- for (const auto& result : results) {
- ConvertKqpQueryResultToDbResult(result, to->add_result_sets());
- }
-}
class TKqpRequestInfo {
public:
@@ -80,7 +71,7 @@ public:
/// Accepts query text
virtual void Collect(const TString& queryData) = 0;
- virtual bool IsNull() { return false; }
+ virtual bool IsNull() { return false; }
virtual ~IQueryReplayBackend() {};
diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp
index ee4e834e6c..a64a05c194 100644
--- a/ydb/core/kqp/common/kqp_event_impl.cpp
+++ b/ydb/core/kqp/common/kqp_event_impl.cpp
@@ -94,6 +94,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const {
Record.MutableRequest()->SetDatabaseId(DatabaseId);
}
+ Record.MutableRequest()->SetUsePublicResponseDataFormat(true);
Record.MutableRequest()->SetSessionId(SessionId);
Record.MutableRequest()->SetAction(QueryAction);
Record.MutableRequest()->SetType(QueryType);
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index ee8698df4a..fe40b75c44 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -227,7 +227,7 @@ public:
void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
auto& response = *ev->Get()->Record.GetRef().MutableResponse();
- NKikimr::ConvertYdbResultToKqpResult(ResultSet,*response.AddResults());
+ response.AddYdbResults()->CopyFrom(ResultSet);
for (auto& execStats : Executions) {
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
}
@@ -286,20 +286,18 @@ public:
virtual void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) {
auto& record = ev->Get()->Record.GetRef();
if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
- if (record.MutableResponse()->GetResults().size()) {
+ if (record.MutableResponse()->GetYdbResults().size()) {
// Send result sets to RPC actor TStreamExecuteYqlScriptRPC
auto evStreamPart = MakeHolder<NKqp::TEvKqp::TEvDataQueryStreamPart>();
ActorIdToProto(this->SelfId(), evStreamPart->Record.MutableGatewayActorId());
- for (int i = 0; i < record.MutableResponse()->MutableResults()->size(); ++i) {
+ for (int i = 0; i < record.MutableResponse()->MutableYdbResults()->size(); ++i) {
// Workaround to avoid errors on Pull execution stage which would expect some results
- Ydb::ResultSet resultSet;
- NKikimr::ConvertYdbResultToKqpResult(resultSet, *evStreamPart->Record.AddResults());
+ evStreamPart->Record.AddResults();
}
- evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableResults());
+ evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableYdbResults());
this->Send(TargetActorId, evStreamPart.Release());
-
// Save response without data to send it later
ResponseHandle = ev.Release();
} else {
@@ -405,7 +403,7 @@ public:
auto& response = *ev->Get()->Record.GetRef().MutableResponse();
Ydb::ResultSet resultSet;
- NKikimr::ConvertYdbResultToKqpResult(resultSet, *response.AddResults());
+ response.AddYdbResults()->CopyFrom(resultSet);
for (auto& execStats : Executions) {
response.MutableQueryStats()->AddExecutions()->Swap(&execStats);
}
@@ -511,7 +509,7 @@ public:
auto& response = *ev->Get()->Record.GetRef().MutableResponse();
for (auto& resultSet : ResultSets) {
- ConvertYdbResultToKqpResult(std::move(resultSet.ResultSet), *response.AddResults());
+ response.AddYdbResults()->Swap(&resultSet.ResultSet);
}
TBase::HandleResponse(ev, ctx);
@@ -672,8 +670,8 @@ void KqpResponseToQueryResult(const NKikimrKqp::TEvQueryResponse& response, IKqp
queryResult.AddIssue(NYql::IssueFromMessage(issue));
}
- for (auto& result : queryResponse.GetResults()) {
- auto arenaResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
+ for (auto& result : queryResponse.GetYdbResults()) {
+ auto arenaResult = google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(
queryResult.ProtobufArenaPtr.get());
arenaResult->CopyFrom(result);
@@ -1419,11 +1417,11 @@ public:
if (!CheckCluster(cluster)) {
return InvalidCluster<TGenericResult>(cluster);
}
-
+
auto analyzePromise = NewPromise<TGenericResult>();
IActor* analyzeActor = new TAnalyzeActor(settings.TablePath, settings.Columns, analyzePromise);
RegisterActor(analyzeActor);
-
+
return analyzePromise.GetFuture();
} catch (yexception& e) {
return MakeFuture(ResultFromException<TGenericResult>(e));
@@ -1995,7 +1993,7 @@ public:
}
TFuture<TQueryResult> ExecDataQueryAst(const TString& cluster, const TString& query, TQueryData::TPtr params,
- const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
+ const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TMaybe<TString>& traceId) override
{
YQL_ENSURE(cluster == Cluster);
@@ -2075,7 +2073,7 @@ public:
}
TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query, TQueryData::TPtr params,
- const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
+ const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings,
const TMaybe<TString>& traceId) override
{
YQL_ENSURE(cluster == Cluster);
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index c98dc7453a..5a16801c50 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -288,8 +288,8 @@ public:
for (auto& resultStr : ResultProviderConfig.CommittedResults) {
queryResult.Results.emplace_back(
- google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get()));
- NKikimrMiniKQL::TResult* result = queryResult.Results.back();
+ google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(queryResult.ProtobufArenaPtr.get()));
+ Ydb::ResultSet* result = queryResult.Results.back();
if (!result->ParseFromArray(resultStr.data(), resultStr.size())) {
queryResult = ResultFromError<TResult>("Failed to parse run result.");
diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp
index c774e42b96..034b3cbe2b 100644
--- a/ydb/core/kqp/host/kqp_runner.cpp
+++ b/ydb/core/kqp/host/kqp_runner.cpp
@@ -117,17 +117,7 @@ public:
}
void FillResult(TResult& queryResult) const override {
- TVector<NKikimrMiniKQL::TResult*> results;
- for (auto& phyResult : TransformCtx.PhysicalQueryResults) {
- auto result = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(
- queryResult.ProtobufArenaPtr.get());
-
- result->CopyFrom(phyResult);
- results.push_back(result);
- }
-
queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats);
- queryResult.Results = std::move(results);
}
private:
@@ -328,7 +318,7 @@ private:
Config),
"BuildPhysicalTxs")
.Build(false));
-
+
auto physicalBuildQueryTransformer = TTransformationPipeline(typesCtx)
.AddServiceTransformers()
.Add(Log("PhysicalBuildQuery"), "LogPhysicalBuildQuery")
@@ -403,8 +393,8 @@ private:
TKqpProviderContext Pctx;
TAutoPtr<IGraphTransformer> Transformer;
-
- TActorSystem* ActorSystem;
+
+ TActorSystem* ActorSystem;
};
} // namespace
diff --git a/ydb/core/kqp/host/kqp_transform.h b/ydb/core/kqp/host/kqp_transform.h
index bdf95715ba..0004340de5 100644
--- a/ydb/core/kqp/host/kqp_transform.h
+++ b/ydb/core/kqp/host/kqp_transform.h
@@ -26,18 +26,13 @@ struct TKqlTransformContext : TThrRefBase {
NKqpProto::TKqpStatsQuery QueryStats;
std::shared_ptr<const NKqpProto::TKqpPhyQuery> PhysicalQuery;
- TVector<TSimpleSharedPtr<NKikimrMiniKQL::TResult>> MkqlResults;
- TVector<NKikimrMiniKQL::TResult> PhysicalQueryResults;
-
NYql::TExprNode::TPtr ExplainTransformerInput; // Explain transformer must work after other transformers, but use input before peephole
TMaybe<NYql::NNodes::TKiDataQueryBlocks> DataQueryBlocks;
void Reset() {
ReplyTarget = {};
- MkqlResults.clear();
QueryStats = {};
PhysicalQuery = nullptr;
- PhysicalQueryResults.clear();
ExplainTransformerInput = nullptr;
DataQueryBlocks = Nothing();
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
index a1a52017cc..8f580f53c2 100644
--- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp
@@ -997,16 +997,12 @@ public:
private:
static TExprNode::TPtr GetResOrPullResult(const TExprNode& node, const IDataProvider::TFillSettings& fillSettings,
- const NKikimrMiniKQL::TResult& resultValue, TExprContext& ctx)
+ const Ydb::ResultSet& resultValue, TExprContext& ctx)
{
TColumnOrder columnHints(NCommon::GetResOrPullColumnHints(node));
auto protoValue = &resultValue;
YQL_ENSURE(resultValue.GetArena());
- if (IsRawKikimrResult(resultValue)) {
- protoValue = KikimrResultToProto(resultValue, columnHints, fillSettings, resultValue.GetArena());
- }
-
YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Custom);
YQL_ENSURE(fillSettings.FormatDetails == KikimrMkqlProtoFormat);
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h
index 6e55cff428..06d7676d22 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway.h
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h
@@ -902,7 +902,7 @@ public:
struct TQueryResult : public TGenericResult {
TString SessionId;
- TVector<NKikimrMiniKQL::TResult*> Results;
+ TVector<Ydb::ResultSet*> Results;
NKqpProto::TKqpStatsQuery QueryStats;
std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery;
std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery;
diff --git a/ydb/core/kqp/provider/yql_kikimr_results.cpp b/ydb/core/kqp/provider/yql_kikimr_results.cpp
index 275eb767a8..d976cde200 100644
--- a/ydb/core/kqp/provider/yql_kikimr_results.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_results.cpp
@@ -338,94 +338,6 @@ void KikimrResultToYson(const TStringStream& stream, NYson::TYsonWriter& writer,
fillSettings, truncated, true);
}
-bool IsRawKikimrResult(const NKikimrMiniKQL::TResult& result) {
- auto& type = result.GetType();
- if (type.GetKind() != NKikimrMiniKQL::ETypeKind::Struct) {
- return true;
- }
-
- auto& structType = type.GetStruct();
- if (structType.MemberSize() != 2) {
- return true;
- }
-
- return structType.GetMember(0).GetName() != "Data" || structType.GetMember(1).GetName() != "Truncated";
-}
-
-NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TColumnOrder& columnHints,
- const IDataProvider::TFillSettings& fillSettings, google::protobuf::Arena* arena)
-{
- NKikimrMiniKQL::TResult* packedResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(arena);
- auto* packedType = packedResult->MutableType();
- packedType->SetKind(NKikimrMiniKQL::ETypeKind::Struct);
- auto* dataMember = packedType->MutableStruct()->AddMember();
- dataMember->SetName("Data");
- auto* truncatedMember = packedType->MutableStruct()->AddMember();
- truncatedMember->SetName("Truncated");
- truncatedMember->MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data);
- truncatedMember->MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<bool>::Id);
-
- auto* packedValue = packedResult->MutableValue();
- auto* dataValue = packedValue->AddStruct();
- auto* dataType = dataMember->MutableType();
- auto* truncatedValue = packedValue->AddStruct();
-
- bool truncated = false;
- if (result.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) {
- const auto& itemType = result.GetType().GetList().GetItem();
-
- TMap<TString, size_t> memberIndices;
- if (itemType.GetKind() == NKikimrMiniKQL::ETypeKind::Struct && columnHints.Size() != 0) {
- const auto& structType = itemType.GetStruct();
-
- for (size_t i = 0; i < structType.MemberSize(); ++i) {
- memberIndices[structType.GetMember(i).GetName()] = i;
- }
-
- dataType->SetKind(NKikimrMiniKQL::ETypeKind::List);
- auto* newItem = dataType->MutableList()->MutableItem();
- newItem->SetKind(NKikimrMiniKQL::ETypeKind::Struct);
- auto* newStructType = newItem->MutableStruct();
- for (auto& [column, gen_col] : columnHints) {
- auto* memberIndex = memberIndices.FindPtr(gen_col);
- YQL_ENSURE(memberIndex);
-
- (*newStructType->AddMember() = structType.GetMember(*memberIndex)).SetName(column);
- }
- } else {
- *dataType = result.GetType();
- }
-
- ui64 rowsWritten = 0;
- ui64 bytesWritten = 0;
- for (auto& item : result.GetValue().GetList()) {
- if (ResultsOverflow(rowsWritten, bytesWritten, fillSettings)) {
- truncated = true;
- break;
- }
- if (!memberIndices.empty()) {
- auto* newStruct = dataValue->AddList();
- for (auto& [column, gen_column] : columnHints) {
- auto* memberIndex = memberIndices.FindPtr(gen_column);
- YQL_ENSURE(memberIndex);
-
- *newStruct->AddStruct() = item.GetStruct(*memberIndex);
- }
- } else {
- *dataValue->AddList() = item;
- }
-
- bytesWritten += item.ByteSize();
- ++rowsWritten;
- }
- } else {
- dataType->CopyFrom(result.GetType());
- dataValue->CopyFrom(result.GetValue());
- }
-
- truncatedValue->SetBool(truncated);
- return packedResult;
-}
const TTypeAnnotationNode* ParseTypeFromKikimrProto(const NKikimrMiniKQL::TType& type, TExprContext& ctx) {
switch (type.GetKind()) {
diff --git a/ydb/core/kqp/provider/yql_kikimr_results.h b/ydb/core/kqp/provider/yql_kikimr_results.h
index 8eb70ae450..1c5250325a 100644
--- a/ydb/core/kqp/provider/yql_kikimr_results.h
+++ b/ydb/core/kqp/provider/yql_kikimr_results.h
@@ -7,11 +7,6 @@ namespace NYql {
void KikimrResultToYson(const TStringStream& stream, NYson::TYsonWriter& writer, const NKikimrMiniKQL::TResult& result,
const TColumnOrder& columnHints, const IDataProvider::TFillSettings& fillSettings, bool& truncated);
-NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TColumnOrder& columnHints,
- const IDataProvider::TFillSettings& fillSettings, google::protobuf::Arena* arena);
-
-bool IsRawKikimrResult(const NKikimrMiniKQL::TResult& result);
-
const TTypeAnnotationNode* ParseTypeFromKikimrProto(const NKikimrMiniKQL::TType& type, TExprContext& ctx);
bool ExportTypeToKikimrProto(const TTypeAnnotationNode& type, NKikimrMiniKQL::TType& protoType, TExprContext& ctx);
TExprNode::TPtr ParseKikimrProtoValue(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,
diff --git a/ydb/core/kqp/session_actor/kqp_response.cpp b/ydb/core/kqp/session_actor/kqp_response.cpp
index e1864d104c..cc90a3031b 100644
--- a/ydb/core/kqp/session_actor/kqp_response.cpp
+++ b/ydb/core/kqp/session_actor/kqp_response.cpp
@@ -53,36 +53,6 @@ bool HasSchemeOrFatalIssues(const TIssue& issue) {
} // namespace
-void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to) {
- const auto& type = from.GetType();
- TStackVec<NKikimrMiniKQL::TType> columnTypes;
- Y_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct);
- for (const auto& member : type.GetStruct().GetMember()) {
- if (member.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) {
- for (const auto& column : member.GetType().GetList().GetItem().GetStruct().GetMember()) {
- auto columnMeta = to->add_columns();
- columnMeta->set_name(column.GetName());
- columnTypes.push_back(column.GetType());
- ConvertMiniKQLTypeToYdbType(column.GetType(), *columnMeta->mutable_type());
- }
- }
- }
- for (const auto& responseStruct : from.GetValue().GetStruct()) {
- for (const auto& row : responseStruct.GetList()) {
- auto newRow = to->add_rows();
- ui32 columnCount = static_cast<ui32>(row.StructSize());
- Y_ENSURE(columnCount == columnTypes.size());
- for (ui32 i = 0; i < columnCount; i++) {
- const auto& column = row.GetStruct(i);
- ConvertMiniKQLValueToYdbValue(columnTypes[i], column, *newRow->add_items());
- }
- }
- if (responseStruct.Getvalue_valueCase() == NKikimrMiniKQL::TValue::kBool) {
- to->set_truncated(responseStruct.GetBool());
- }
- }
-}
-
TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const TIssue& issue) {
if (issue.GetSeverity() == TSeverityIds::S_FATAL) {
return Ydb::StatusIds::INTERNAL_ERROR;
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index c97381ba82..ea33835c97 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1788,7 +1788,6 @@ public:
// Result for scan query is sent directly to target actor.
Y_ABORT_UNLESS(response->GetArena());
if (QueryState->PreparedQuery) {
- bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat();
auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
size_t trailingResultsCount = 0;
for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
@@ -1805,28 +1804,12 @@ public:
continue;
}
- if (useYdbResponseFormat) {
- TMaybe<ui64> effectiveRowsLimit = FillSettings.RowsLimitPerWrite;
- if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) {
- effectiveRowsLimit = QueryState->PreparedQuery->GetResults(i).GetRowsLimit();
- }
- auto* ydbResult = QueryState->QueryData->GetYdbTxResult(phyQuery.GetResultBindings(i), response->GetArena(), effectiveRowsLimit);
- response->AddYdbResults()->Swap(ydbResult);
- } else {
- auto* protoRes = QueryState->QueryData->GetMkqlTxResult(phyQuery.GetResultBindings(i), response->GetArena());
- std::optional<IDataProvider::TFillSettings> fillSettings;
- if (QueryState->PreparedQuery->ResultsSize()) {
- YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), ""
- << phyQuery.ResultBindingsSize() << " != " << QueryState->PreparedQuery->ResultsSize());
- const auto& result = QueryState->PreparedQuery->GetResults(i);
- if (result.GetRowsLimit()) {
- fillSettings = FillSettings;
- fillSettings->RowsLimitPerWrite = result.GetRowsLimit();
- }
- }
- auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), response->GetArena());
- response->AddResults()->Swap(finalResult);
+ TMaybe<ui64> effectiveRowsLimit = FillSettings.RowsLimitPerWrite;
+ if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) {
+ effectiveRowsLimit = QueryState->PreparedQuery->GetResults(i).GetRowsLimit();
}
+ auto* ydbResult = QueryState->QueryData->GetYdbTxResult(phyQuery.GetResultBindings(i), response->GetArena(), effectiveRowsLimit);
+ response->AddYdbResults()->Swap(ydbResult);
}
}
@@ -1892,10 +1875,10 @@ public:
AddTrailingInfo(response->Record.GetRef());
NDataIntegrity::LogIntegrityTrails(
- request->Get()->GetTraceId(),
- request->Get()->GetAction(),
- request->Get()->GetType(),
- response,
+ request->Get()->GetTraceId(),
+ request->Get()->GetAction(),
+ request->Get()->GetType(),
+ response,
TlsActivationContext->AsActorContext()
);
@@ -1955,7 +1938,7 @@ public:
QueryState->UserRequestContext->TraceId,
QueryState->GetAction(),
QueryState->GetType(),
- QueryResponse,
+ QueryResponse,
TlsActivationContext->AsActorContext()
);
diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
index 587bf6d010..272d0534a1 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
@@ -194,7 +194,7 @@ public:
Config->FeatureFlags = AppData(ctx)->FeatureFlags;
KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup,
- QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry,
+ QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry,
!Settings.LongSession, false, nullptr, nullptr, nullptr, QueryState->RequestEv->GetUserRequestContext());
auto& queryRequest = QueryState->RequestEv;
@@ -959,7 +959,7 @@ private:
// If we have result it must be allocated on protobuf arena
Y_ASSERT(result->GetArena());
Y_ASSERT(resp->GetArena() == result->GetArena());
- resp->AddResults()->Swap(result);
+ resp->AddYdbResults()->Swap(result);
}
} else {
auto resp = ev.MutableResponse();
diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.cpp b/ydb/core/kqp/session_actor/kqp_worker_common.cpp
index 63db80b02d..e7cf7113b0 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_common.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_common.cpp
@@ -119,7 +119,7 @@ void SlowLogQuery(const TActorContext &ctx, const TKikimrConfiguration* config,
<< 'b';
ui64 resultsSize = 0;
- for (auto& result : record->GetResponse().GetResults()) {
+ for (auto& result : record->GetResponse().GetYdbResults()) {
resultsSize += result.ByteSize();
}
diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
index 6525749d7c..b09e37b794 100644
--- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
+++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp
@@ -79,7 +79,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
ALTER TABLE `/Root/ScriptingCreateAndAlterTableTest` SET (AUTO_PARTITIONING_BY_SIZE = ENABLED);
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
-
+
result = client.ExecuteYqlScript(R"(
ALTER TABLE `/Root/ScriptingCreateAndAlterTableTest` SET (AUTO_PARTITIONING_BY_SIZE = ENABLED);
COMMIT;
@@ -88,7 +88,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
ALTER TABLE `/Root/ScriptingCreateAndAlterTableTest` SET (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 4);
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
-
+
result = client.ExecuteYqlScript(R"(
ALTER TABLE `/Root/ScriptingCreateAndAlterTableTest` SET (AUTO_PARTITIONING_BY_SIZE = ENABLED);
COMMIT;
@@ -989,7 +989,36 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
[[[101u]];[[201u]];[[301u]];[[401u]];[[501u]];[[601u]];[[701u]];[[801u]]];
[[8u]];
[[8u]];
- [[8u]]])", StreamResultToYson(it));
+ [[8u]]
+ ])", StreamResultToYson(it));
+ }
+
+ Y_UNIT_TEST(SelectNullType) {
+ TKikimrRunner kikimr;
+ TScriptingClient client(kikimr.GetDriver());
+ {
+ auto result = client.ExecuteYqlScript(R"(
+ CREATE TABLE demo1(id Text, PRIMARY KEY(id));
+ )").GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = client.ExecuteYqlScript(R"(
+ UPSERT INTO demo1(id) VALUES("a"),("b"),("c");
+ )").GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = client.ExecuteYqlScript(R"(
+ SELECT NULL auto_proc_ FROM demo1 LIMIT 10;
+ )").GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [#];[#];[#]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+ }
}
Y_UNIT_TEST(StreamExecuteYqlScriptLeadingEmptyScan) {
@@ -1206,7 +1235,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) {
Y_UNIT_TEST(StreamExecuteYqlScriptPg) {
TKikimrRunner kikimr;
-
+
auto settings = TExecuteYqlRequestSettings()
.Syntax(Ydb::Query::SYNTAX_PG);
diff --git a/ydb/core/load_test/aggregated_result.cpp b/ydb/core/load_test/aggregated_result.cpp
index 82ae3dee3e..548a5628b4 100644
--- a/ydb/core/load_test/aggregated_result.cpp
+++ b/ydb/core/load_test/aggregated_result.cpp
@@ -6,6 +6,7 @@
#include <util/string/cast.h>
#include <ydb/library/mkql_proto/protos/minikql.pb.h>
+#include <ydb/public/sdk/cpp/client/ydb_result/result.h>
namespace NKikimr {
@@ -77,55 +78,35 @@ IOutputStream& operator<<(IOutputStream& output, const TAggregatedResult& result
return output;
}
-using TColumnPositions = THashMap<TString, ui32>;
-
-TColumnPositions GetColumnPositionsInResponse(const NKikimrMiniKQL::TType& ttype) {
- TColumnPositions columnPositions;
- for (const NKikimrMiniKQL::TMember& member : ttype.GetStruct().GetMember()) {
- if (member.GetName() == "Data") {
- const auto& listStruct = member.GetType().GetList().GetItem().GetStruct();
- for (const NKikimrMiniKQL::TMember& listMember : listStruct.GetMember()) {
- columnPositions.emplace(listMember.GetName(), columnPositions.size());
- }
- break;
- }
- }
- return columnPositions;
-}
-
-NKikimrMiniKQL::TValue GetOptional(const NKikimrMiniKQL::TValue& listItem, ui32 pos) {
- return listItem.GetStruct(pos).GetOptional();
-}
-
template<typename T>
-T ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) {
- Y_UNUSED(listItem, pos);
+T ExtractValue(NYdb::TResultSetParser& parser, const TString& column) {
+ Y_UNUSED(parser, column);
Y_ABORT("unimplemented");
}
template<>
-ui32 ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) {
- return GetOptional(listItem, pos).GetUint32();
+ui32 ExtractValue(NYdb::TResultSetParser& parser, const TString& column) {
+ return parser.ColumnParser(column).GetOptionalUint32().GetOrElse(0);
}
template<>
-ui64 ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) {
- return GetOptional(listItem, pos).GetUint64();
+ui64 ExtractValue(NYdb::TResultSetParser& parser, const TString& column) {
+ return parser.ColumnParser(column).GetOptionalUint64().GetOrElse(0);
}
template<>
-double ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) {
- return GetOptional(listItem, pos).GetDouble();
+double ExtractValue(NYdb::TResultSetParser& parser, const TString& column) {
+ return parser.ColumnParser(column).GetOptionalDouble().GetOrElse(static_cast<double>(0));
}
template<>
-TString ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) {
- return GetOptional(listItem, pos).GetBytes();
+TString ExtractValue(NYdb::TResultSetParser& parser, const TString& column) {
+ return parser.ColumnParser(column).GetOptionalString().GetOrElse("");
}
template<>
-TInstant ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) {
- return TInstant::Seconds(GetOptional(listItem, pos).GetUint32());
+TInstant ExtractValue(NYdb::TResultSetParser& parser, const TString& column) {
+ return TInstant::Seconds(parser.ColumnParser(column).GetOptionalUint32().GetOrElse(0));
}
bool GetStatName(TStringBuf columnName, TStringBuf& statName, TStringBuf& suffix) {
@@ -161,38 +142,40 @@ void SetInAggregatedField(TStringBuf suffix, T value, TAggregatedField<U>& dst)
}
}
-TAggregatedResult GetResultFromValueListItem(const NKikimrMiniKQL::TValue& listItem, const TColumnPositions& columnPositions) {
+TAggregatedResult GetResultFromValueListItem(NYdb::TResultSetParser& parser, const NYdb::TResultSet& rs) {
TAggregatedResult result;
TStringBuf statName;
TStringBuf suffix;
TStringBuf levelSb;
- for (const auto& [column, pos] : columnPositions) {
+ for (const auto& columnMeta : rs.GetColumnsMeta()) {
+ TString column = columnMeta.Name;
+
if (column == "id") {
- result.Uuid = ExtractValue<TString>(listItem, pos);
+ result.Uuid = ExtractValue<TString>(parser, column);
} else if (column == "start") {
- result.Start = ExtractValue<TInstant>(listItem, pos);
+ result.Start = ExtractValue<TInstant>(parser, column);
} else if (column == "finish") {
- result.Finish = ExtractValue<TInstant>(listItem, pos);
+ result.Finish = ExtractValue<TInstant>(parser, column);
} else if (column == "total_nodes") {
- result.Stats.TotalNodes = ExtractValue<ui32>(listItem, pos);
+ result.Stats.TotalNodes = ExtractValue<ui32>(parser, column);
} else if (column == "success_nodes") {
- result.Stats.SuccessNodes = ExtractValue<ui32>(listItem, pos);
+ result.Stats.SuccessNodes = ExtractValue<ui32>(parser, column);
} else if (column == "config") {
- result.Config = ExtractValue<TString>(listItem, pos);
+ result.Config = ExtractValue<TString>(parser, column);
} else if (GetStatName(column, statName, suffix)) {
if (statName == "transactions") {
if (suffix == "_avg") {
- SetInAggregatedField(suffix, ExtractValue<double>(listItem, pos), result.Stats.Transactions);
+ SetInAggregatedField(suffix, ExtractValue<double>(parser, column), result.Stats.Transactions);
} else {
- SetInAggregatedField(suffix, ExtractValue<ui64>(listItem, pos), result.Stats.Transactions);
+ SetInAggregatedField(suffix, ExtractValue<ui64>(parser, column), result.Stats.Transactions);
}
} else if (statName == "transactions_per_sec") {
- SetInAggregatedField(suffix, ExtractValue<double>(listItem, pos), result.Stats.TransactionsPerSecond);
+ SetInAggregatedField(suffix, ExtractValue<double>(parser, column), result.Stats.TransactionsPerSecond);
} else if (statName == "errors_per_sec") {
- SetInAggregatedField(suffix, ExtractValue<double>(listItem, pos), result.Stats.ErrorsPerSecond);
+ SetInAggregatedField(suffix, ExtractValue<double>(parser, column), result.Stats.ErrorsPerSecond);
} else if (GetPercentileLevel(statName, levelSb)) {
auto level = FromString<EPercentileLevel>(levelSb);
- SetInAggregatedField(suffix, ExtractValue<double>(listItem, pos), result.Stats.Percentiles[level]);
+ SetInAggregatedField(suffix, ExtractValue<double>(parser, column), result.Stats.Percentiles[level]);
}
}
}
@@ -200,16 +183,16 @@ TAggregatedResult GetResultFromValueListItem(const NKikimrMiniKQL::TValue& listI
}
bool LoadResultFromResponseProto(const NKikimrKqp::TQueryResponse& response, TVector<TAggregatedResult>& results) {
- const auto& ttype = response.GetResults(0).GetType();
- auto columnPositions = GetColumnPositionsInResponse(ttype);
- if (columnPositions.empty()) {
- return false;
- }
+ Y_ABORT_UNLESS(response.GetYdbResults().size() > 0);
+
+ NYdb::TResultSet rs(response.GetYdbResults(0));
+ NYdb::TResultSetParser parser(response.GetYdbResults(0));
results.clear();
- for (const NKikimrMiniKQL::TValue& listItem : response.GetResults(0).GetValue().GetStruct().Get(0).GetList()) {
- results.push_back(GetResultFromValueListItem(listItem, columnPositions));
+ while(parser.TryNextRow()) {
+ results.push_back(GetResultFromValueListItem(parser, rs));
}
+
return true;
}
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index a4c6906783..f4dcf7457d 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -251,7 +251,7 @@ message TQueryResponseExtraInfo {
message TQueryResponse {
optional bytes SessionId = 1;
reserved 2; // (deprecated) QueryErrors
- repeated NKikimrMiniKQL.TResult Results = 3;
+ // repeated NKikimrMiniKQL.TResult Results = 3;
// optional TQueryProfile Profile = 4; // TODO: Deprecate, use QueryStats
reserved 4;
optional bytes PreparedQuery = 5;
@@ -407,7 +407,7 @@ message TEvPingSessionResponse {
message TEvDataQueryStreamPart {
optional NActorsProto.TActorId GatewayActorId = 1;
- repeated NKikimrMiniKQL.TResult Results = 2;
+ repeated Ydb.ResultSet Results = 2;
};
message TCancelQueryRequest {
diff --git a/ydb/core/viewer/viewer_query.h b/ydb/core/viewer/viewer_query.h
index c875dc0ba1..7529236286 100644
--- a/ydb/core/viewer/viewer_query.h
+++ b/ydb/core/viewer/viewer_query.h
@@ -546,14 +546,8 @@ private:
void MakeOkReply(NJson::TJsonValue& jsonResponse, NKikimrKqp::TEvQueryResponse& record) {
const auto& response = record.GetResponse();
- if (response.ResultsSize() > 0 || response.YdbResultsSize() > 0) {
+ if (response.YdbResultsSize() > 0) {
try {
- for (const auto& result : response.GetResults()) {
- Ydb::ResultSet resultSet;
- NKqp::ConvertKqpQueryResultToDbResult(result, &resultSet);
- ResultSets.emplace_back().emplace_back(std::move(resultSet));
- }
-
for (const auto& result : response.GetYdbResults()) {
ResultSets.emplace_back().emplace_back(result);
}
diff --git a/ydb/core/viewer/viewer_query_old.h b/ydb/core/viewer/viewer_query_old.h
index 9b130e9be7..a9bc078343 100644
--- a/ydb/core/viewer/viewer_query_old.h
+++ b/ydb/core/viewer/viewer_query_old.h
@@ -493,14 +493,8 @@ private:
void MakeOkReply(NJson::TJsonValue& jsonResponse, NKikimrKqp::TEvQueryResponse& record) {
const auto& response = record.GetResponse();
- if (response.ResultsSize() > 0 || response.YdbResultsSize() > 0) {
+ if (response.YdbResultsSize() > 0) {
try {
- for (const auto& result : response.GetResults()) {
- Ydb::ResultSet resultSet;
- NKqp::ConvertKqpQueryResultToDbResult(result, &resultSet);
- ResultSets.emplace_back(std::move(resultSet));
- }
-
for (const auto& result : response.GetYdbResults()) {
ResultSets.emplace_back(result);
}
diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp
index fb200de3fe..5e330ecbc9 100644
--- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp
+++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp
@@ -649,10 +649,9 @@ TRequestResult TYdbSetup::YqlScriptRequest(const TRequestOptions& query, TQueryM
FillQueryMeta(meta, responseRecord);
- resultSets.reserve(responseRecord.results_size());
- for (const auto& result : responseRecord.results()) {
- resultSets.emplace_back();
- NKikimr::NKqp::ConvertKqpQueryResultToDbResult(result, &resultSets.back());
+ resultSets.reserve(responseRecord.ydbresults_size());
+ for (const auto& result : responseRecord.ydbresults()) {
+ resultSets.emplace_back(result);
}
return TRequestResult(yqlQueryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues());