diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2024-10-03 08:49:42 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-03 08:49:42 +0300 |
commit | a491b73df338c8211563263223f33cf8439180ab (patch) | |
tree | b21636c03b19e3252b8d50f9c0b4fd4d8467e7d9 | |
parent | 91f8a44b4102ecbc5d62bfc87430117cc5719e89 (diff) | |
download | ydb-a491b73df338c8211563263223f33cf8439180ab.tar.gz |
get rid of mkql results in scripting (#9997)
24 files changed, 121 insertions, 286 deletions
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp index 143101b4a2..452e36874a 100644 --- a/ydb/core/grpc_services/rpc_execute_data_query.cpp +++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp @@ -189,9 +189,8 @@ public: // https://protobuf.dev/reference/cpp/arenas/#swap // Actualy will be copy in case pf remote execution queryResult->mutable_result_sets()->Swap(record.MutableResponse()->MutableYdbResults()); - } else { - NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult); } + ConvertQueryStats(kqpResponse, queryResult); if (kqpResponse.HasTxMeta()) { queryResult->mutable_tx_meta()->CopyFrom(kqpResponse.GetTxMeta()); diff --git a/ydb/core/grpc_services/rpc_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_execute_yql_script.cpp index 18cd27d39e..d8e06561c8 100644 --- a/ydb/core/grpc_services/rpc_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_execute_yql_script.cpp @@ -101,7 +101,11 @@ public: auto queryResult = TEvExecuteYqlScriptRequest::AllocateResult<TResult>(Request_); try { - NKqp::ConvertKqpQueryResultsToDbResult(kqpResponse, queryResult); + const auto& results = kqpResponse.GetYdbResults(); + for (const auto& result : results) { + queryResult->add_result_sets()->CopyFrom(result); + } + } catch (const std::exception& ex) { NYql::TIssues issues; issues.AddIssue(NYql::ExceptionToIssue(ex)); diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index 2b80ebc5d9..0a5c5b260a 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -50,7 +50,7 @@ namespace { {} NKqp::TEvKqp::TEvDataQueryStreamPart::TPtr Handle; - google::protobuf::RepeatedPtrField<NKikimrMiniKQL::TResult>::const_iterator ResultIterator; + google::protobuf::RepeatedPtrField<Ydb::ResultSet>::const_iterator ResultIterator; }; enum EStreamRpcWakeupTag : ui64 { @@ -220,7 +220,7 @@ private: auto result = response.mutable_result(); try { - NKqp::ConvertKqpQueryResultToDbResult(kqpResult, result->mutable_result_set()); + result->mutable_result_set()->CopyFrom(kqpResult); } catch (std::exception ex) { ReplyFinishStream(ex.what()); } diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index 9609273320..b180f66549 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -70,7 +70,9 @@ public: const TQueryRequestSettings& querySettings = TQueryRequestSettings(), const TString& poolId = ""); - TEvQueryRequest() = default; + TEvQueryRequest() { + Record.MutableRequest()->SetUsePublicResponseDataFormat(true); + } bool IsSerializable() const override { return true; diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index 14785394fa..e0bccb2e50 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -33,18 +33,9 @@ namespace NKikimr::NKqp { -void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to); - TString ScriptExecutionRunnerActorIdString(const NActors::TActorId& actorId); bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId& actorId); -template<typename TFrom, typename TTo> -inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) { - const auto& results = from.GetResults(); - for (const auto& result : results) { - ConvertKqpQueryResultToDbResult(result, to->add_result_sets()); - } -} class TKqpRequestInfo { public: @@ -80,7 +71,7 @@ public: /// Accepts query text virtual void Collect(const TString& queryData) = 0; - virtual bool IsNull() { return false; } + virtual bool IsNull() { return false; } virtual ~IQueryReplayBackend() {}; diff --git a/ydb/core/kqp/common/kqp_event_impl.cpp b/ydb/core/kqp/common/kqp_event_impl.cpp index ee4e834e6c..a64a05c194 100644 --- a/ydb/core/kqp/common/kqp_event_impl.cpp +++ b/ydb/core/kqp/common/kqp_event_impl.cpp @@ -94,6 +94,7 @@ void TEvKqp::TEvQueryRequest::PrepareRemote() const { Record.MutableRequest()->SetDatabaseId(DatabaseId); } + Record.MutableRequest()->SetUsePublicResponseDataFormat(true); Record.MutableRequest()->SetSessionId(SessionId); Record.MutableRequest()->SetAction(QueryAction); Record.MutableRequest()->SetType(QueryType); diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index ee8698df4a..fe40b75c44 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -227,7 +227,7 @@ public: void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) { auto& response = *ev->Get()->Record.GetRef().MutableResponse(); - NKikimr::ConvertYdbResultToKqpResult(ResultSet,*response.AddResults()); + response.AddYdbResults()->CopyFrom(ResultSet); for (auto& execStats : Executions) { response.MutableQueryStats()->AddExecutions()->Swap(&execStats); } @@ -286,20 +286,18 @@ public: virtual void HandleResponse(typename TResponse::TPtr &ev, const TActorContext &ctx) { auto& record = ev->Get()->Record.GetRef(); if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) { - if (record.MutableResponse()->GetResults().size()) { + if (record.MutableResponse()->GetYdbResults().size()) { // Send result sets to RPC actor TStreamExecuteYqlScriptRPC auto evStreamPart = MakeHolder<NKqp::TEvKqp::TEvDataQueryStreamPart>(); ActorIdToProto(this->SelfId(), evStreamPart->Record.MutableGatewayActorId()); - for (int i = 0; i < record.MutableResponse()->MutableResults()->size(); ++i) { + for (int i = 0; i < record.MutableResponse()->MutableYdbResults()->size(); ++i) { // Workaround to avoid errors on Pull execution stage which would expect some results - Ydb::ResultSet resultSet; - NKikimr::ConvertYdbResultToKqpResult(resultSet, *evStreamPart->Record.AddResults()); + evStreamPart->Record.AddResults(); } - evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableResults()); + evStreamPart->Record.MutableResults()->Swap(record.MutableResponse()->MutableYdbResults()); this->Send(TargetActorId, evStreamPart.Release()); - // Save response without data to send it later ResponseHandle = ev.Release(); } else { @@ -405,7 +403,7 @@ public: auto& response = *ev->Get()->Record.GetRef().MutableResponse(); Ydb::ResultSet resultSet; - NKikimr::ConvertYdbResultToKqpResult(resultSet, *response.AddResults()); + response.AddYdbResults()->CopyFrom(resultSet); for (auto& execStats : Executions) { response.MutableQueryStats()->AddExecutions()->Swap(&execStats); } @@ -511,7 +509,7 @@ public: auto& response = *ev->Get()->Record.GetRef().MutableResponse(); for (auto& resultSet : ResultSets) { - ConvertYdbResultToKqpResult(std::move(resultSet.ResultSet), *response.AddResults()); + response.AddYdbResults()->Swap(&resultSet.ResultSet); } TBase::HandleResponse(ev, ctx); @@ -672,8 +670,8 @@ void KqpResponseToQueryResult(const NKikimrKqp::TEvQueryResponse& response, IKqp queryResult.AddIssue(NYql::IssueFromMessage(issue)); } - for (auto& result : queryResponse.GetResults()) { - auto arenaResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>( + for (auto& result : queryResponse.GetYdbResults()) { + auto arenaResult = google::protobuf::Arena::CreateMessage<Ydb::ResultSet>( queryResult.ProtobufArenaPtr.get()); arenaResult->CopyFrom(result); @@ -1419,11 +1417,11 @@ public: if (!CheckCluster(cluster)) { return InvalidCluster<TGenericResult>(cluster); } - + auto analyzePromise = NewPromise<TGenericResult>(); IActor* analyzeActor = new TAnalyzeActor(settings.TablePath, settings.Columns, analyzePromise); RegisterActor(analyzeActor); - + return analyzePromise.GetFuture(); } catch (yexception& e) { return MakeFuture(ResultFromException<TGenericResult>(e)); @@ -1995,7 +1993,7 @@ public: } TFuture<TQueryResult> ExecDataQueryAst(const TString& cluster, const TString& query, TQueryData::TPtr params, - const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings, + const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings, const TMaybe<TString>& traceId) override { YQL_ENSURE(cluster == Cluster); @@ -2075,7 +2073,7 @@ public: } TFuture<TQueryResult> ExecGenericQuery(const TString& cluster, const TString& query, TQueryData::TPtr params, - const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings, + const TAstQuerySettings& settings, const Ydb::Table::TransactionSettings& txSettings, const TMaybe<TString>& traceId) override { YQL_ENSURE(cluster == Cluster); diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index c98dc7453a..5a16801c50 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -288,8 +288,8 @@ public: for (auto& resultStr : ResultProviderConfig.CommittedResults) { queryResult.Results.emplace_back( - google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(queryResult.ProtobufArenaPtr.get())); - NKikimrMiniKQL::TResult* result = queryResult.Results.back(); + google::protobuf::Arena::CreateMessage<Ydb::ResultSet>(queryResult.ProtobufArenaPtr.get())); + Ydb::ResultSet* result = queryResult.Results.back(); if (!result->ParseFromArray(resultStr.data(), resultStr.size())) { queryResult = ResultFromError<TResult>("Failed to parse run result."); diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index c774e42b96..034b3cbe2b 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -117,17 +117,7 @@ public: } void FillResult(TResult& queryResult) const override { - TVector<NKikimrMiniKQL::TResult*> results; - for (auto& phyResult : TransformCtx.PhysicalQueryResults) { - auto result = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>( - queryResult.ProtobufArenaPtr.get()); - - result->CopyFrom(phyResult); - results.push_back(result); - } - queryResult.QueryStats.CopyFrom(TransformCtx.QueryStats); - queryResult.Results = std::move(results); } private: @@ -328,7 +318,7 @@ private: Config), "BuildPhysicalTxs") .Build(false)); - + auto physicalBuildQueryTransformer = TTransformationPipeline(typesCtx) .AddServiceTransformers() .Add(Log("PhysicalBuildQuery"), "LogPhysicalBuildQuery") @@ -403,8 +393,8 @@ private: TKqpProviderContext Pctx; TAutoPtr<IGraphTransformer> Transformer; - - TActorSystem* ActorSystem; + + TActorSystem* ActorSystem; }; } // namespace diff --git a/ydb/core/kqp/host/kqp_transform.h b/ydb/core/kqp/host/kqp_transform.h index bdf95715ba..0004340de5 100644 --- a/ydb/core/kqp/host/kqp_transform.h +++ b/ydb/core/kqp/host/kqp_transform.h @@ -26,18 +26,13 @@ struct TKqlTransformContext : TThrRefBase { NKqpProto::TKqpStatsQuery QueryStats; std::shared_ptr<const NKqpProto::TKqpPhyQuery> PhysicalQuery; - TVector<TSimpleSharedPtr<NKikimrMiniKQL::TResult>> MkqlResults; - TVector<NKikimrMiniKQL::TResult> PhysicalQueryResults; - NYql::TExprNode::TPtr ExplainTransformerInput; // Explain transformer must work after other transformers, but use input before peephole TMaybe<NYql::NNodes::TKiDataQueryBlocks> DataQueryBlocks; void Reset() { ReplyTarget = {}; - MkqlResults.clear(); QueryStats = {}; PhysicalQuery = nullptr; - PhysicalQueryResults.clear(); ExplainTransformerInput = nullptr; DataQueryBlocks = Nothing(); } diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index a1a52017cc..8f580f53c2 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -997,16 +997,12 @@ public: private: static TExprNode::TPtr GetResOrPullResult(const TExprNode& node, const IDataProvider::TFillSettings& fillSettings, - const NKikimrMiniKQL::TResult& resultValue, TExprContext& ctx) + const Ydb::ResultSet& resultValue, TExprContext& ctx) { TColumnOrder columnHints(NCommon::GetResOrPullColumnHints(node)); auto protoValue = &resultValue; YQL_ENSURE(resultValue.GetArena()); - if (IsRawKikimrResult(resultValue)) { - protoValue = KikimrResultToProto(resultValue, columnHints, fillSettings, resultValue.GetArena()); - } - YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Custom); YQL_ENSURE(fillSettings.FormatDetails == KikimrMkqlProtoFormat); diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 6e55cff428..06d7676d22 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -902,7 +902,7 @@ public: struct TQueryResult : public TGenericResult { TString SessionId; - TVector<NKikimrMiniKQL::TResult*> Results; + TVector<Ydb::ResultSet*> Results; NKqpProto::TKqpStatsQuery QueryStats; std::unique_ptr<NKikimrKqp::TPreparedQuery> PreparingQuery; std::shared_ptr<const NKikimrKqp::TPreparedQuery> PreparedQuery; diff --git a/ydb/core/kqp/provider/yql_kikimr_results.cpp b/ydb/core/kqp/provider/yql_kikimr_results.cpp index 275eb767a8..d976cde200 100644 --- a/ydb/core/kqp/provider/yql_kikimr_results.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_results.cpp @@ -338,94 +338,6 @@ void KikimrResultToYson(const TStringStream& stream, NYson::TYsonWriter& writer, fillSettings, truncated, true); } -bool IsRawKikimrResult(const NKikimrMiniKQL::TResult& result) { - auto& type = result.GetType(); - if (type.GetKind() != NKikimrMiniKQL::ETypeKind::Struct) { - return true; - } - - auto& structType = type.GetStruct(); - if (structType.MemberSize() != 2) { - return true; - } - - return structType.GetMember(0).GetName() != "Data" || structType.GetMember(1).GetName() != "Truncated"; -} - -NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TColumnOrder& columnHints, - const IDataProvider::TFillSettings& fillSettings, google::protobuf::Arena* arena) -{ - NKikimrMiniKQL::TResult* packedResult = google::protobuf::Arena::CreateMessage<NKikimrMiniKQL::TResult>(arena); - auto* packedType = packedResult->MutableType(); - packedType->SetKind(NKikimrMiniKQL::ETypeKind::Struct); - auto* dataMember = packedType->MutableStruct()->AddMember(); - dataMember->SetName("Data"); - auto* truncatedMember = packedType->MutableStruct()->AddMember(); - truncatedMember->SetName("Truncated"); - truncatedMember->MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Data); - truncatedMember->MutableType()->MutableData()->SetScheme(NKikimr::NUdf::TDataType<bool>::Id); - - auto* packedValue = packedResult->MutableValue(); - auto* dataValue = packedValue->AddStruct(); - auto* dataType = dataMember->MutableType(); - auto* truncatedValue = packedValue->AddStruct(); - - bool truncated = false; - if (result.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) { - const auto& itemType = result.GetType().GetList().GetItem(); - - TMap<TString, size_t> memberIndices; - if (itemType.GetKind() == NKikimrMiniKQL::ETypeKind::Struct && columnHints.Size() != 0) { - const auto& structType = itemType.GetStruct(); - - for (size_t i = 0; i < structType.MemberSize(); ++i) { - memberIndices[structType.GetMember(i).GetName()] = i; - } - - dataType->SetKind(NKikimrMiniKQL::ETypeKind::List); - auto* newItem = dataType->MutableList()->MutableItem(); - newItem->SetKind(NKikimrMiniKQL::ETypeKind::Struct); - auto* newStructType = newItem->MutableStruct(); - for (auto& [column, gen_col] : columnHints) { - auto* memberIndex = memberIndices.FindPtr(gen_col); - YQL_ENSURE(memberIndex); - - (*newStructType->AddMember() = structType.GetMember(*memberIndex)).SetName(column); - } - } else { - *dataType = result.GetType(); - } - - ui64 rowsWritten = 0; - ui64 bytesWritten = 0; - for (auto& item : result.GetValue().GetList()) { - if (ResultsOverflow(rowsWritten, bytesWritten, fillSettings)) { - truncated = true; - break; - } - if (!memberIndices.empty()) { - auto* newStruct = dataValue->AddList(); - for (auto& [column, gen_column] : columnHints) { - auto* memberIndex = memberIndices.FindPtr(gen_column); - YQL_ENSURE(memberIndex); - - *newStruct->AddStruct() = item.GetStruct(*memberIndex); - } - } else { - *dataValue->AddList() = item; - } - - bytesWritten += item.ByteSize(); - ++rowsWritten; - } - } else { - dataType->CopyFrom(result.GetType()); - dataValue->CopyFrom(result.GetValue()); - } - - truncatedValue->SetBool(truncated); - return packedResult; -} const TTypeAnnotationNode* ParseTypeFromKikimrProto(const NKikimrMiniKQL::TType& type, TExprContext& ctx) { switch (type.GetKind()) { diff --git a/ydb/core/kqp/provider/yql_kikimr_results.h b/ydb/core/kqp/provider/yql_kikimr_results.h index 8eb70ae450..1c5250325a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_results.h +++ b/ydb/core/kqp/provider/yql_kikimr_results.h @@ -7,11 +7,6 @@ namespace NYql { void KikimrResultToYson(const TStringStream& stream, NYson::TYsonWriter& writer, const NKikimrMiniKQL::TResult& result, const TColumnOrder& columnHints, const IDataProvider::TFillSettings& fillSettings, bool& truncated); -NKikimrMiniKQL::TResult* KikimrResultToProto(const NKikimrMiniKQL::TResult& result, const TColumnOrder& columnHints, - const IDataProvider::TFillSettings& fillSettings, google::protobuf::Arena* arena); - -bool IsRawKikimrResult(const NKikimrMiniKQL::TResult& result); - const TTypeAnnotationNode* ParseTypeFromKikimrProto(const NKikimrMiniKQL::TType& type, TExprContext& ctx); bool ExportTypeToKikimrProto(const TTypeAnnotationNode& type, NKikimrMiniKQL::TType& protoType, TExprContext& ctx); TExprNode::TPtr ParseKikimrProtoValue(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value, diff --git a/ydb/core/kqp/session_actor/kqp_response.cpp b/ydb/core/kqp/session_actor/kqp_response.cpp index e1864d104c..cc90a3031b 100644 --- a/ydb/core/kqp/session_actor/kqp_response.cpp +++ b/ydb/core/kqp/session_actor/kqp_response.cpp @@ -53,36 +53,6 @@ bool HasSchemeOrFatalIssues(const TIssue& issue) { } // namespace -void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to) { - const auto& type = from.GetType(); - TStackVec<NKikimrMiniKQL::TType> columnTypes; - Y_ENSURE(type.GetKind() == NKikimrMiniKQL::ETypeKind::Struct); - for (const auto& member : type.GetStruct().GetMember()) { - if (member.GetType().GetKind() == NKikimrMiniKQL::ETypeKind::List) { - for (const auto& column : member.GetType().GetList().GetItem().GetStruct().GetMember()) { - auto columnMeta = to->add_columns(); - columnMeta->set_name(column.GetName()); - columnTypes.push_back(column.GetType()); - ConvertMiniKQLTypeToYdbType(column.GetType(), *columnMeta->mutable_type()); - } - } - } - for (const auto& responseStruct : from.GetValue().GetStruct()) { - for (const auto& row : responseStruct.GetList()) { - auto newRow = to->add_rows(); - ui32 columnCount = static_cast<ui32>(row.StructSize()); - Y_ENSURE(columnCount == columnTypes.size()); - for (ui32 i = 0; i < columnCount; i++) { - const auto& column = row.GetStruct(i); - ConvertMiniKQLValueToYdbValue(columnTypes[i], column, *newRow->add_items()); - } - } - if (responseStruct.Getvalue_valueCase() == NKikimrMiniKQL::TValue::kBool) { - to->set_truncated(responseStruct.GetBool()); - } - } -} - TMaybe<Ydb::StatusIds::StatusCode> GetYdbStatus(const TIssue& issue) { if (issue.GetSeverity() == TSeverityIds::S_FATAL) { return Ydb::StatusIds::INTERNAL_ERROR; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index c97381ba82..ea33835c97 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1788,7 +1788,6 @@ public: // Result for scan query is sent directly to target actor. Y_ABORT_UNLESS(response->GetArena()); if (QueryState->PreparedQuery) { - bool useYdbResponseFormat = QueryState->GetUsePublicResponseDataFormat(); auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); size_t trailingResultsCount = 0; for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) { @@ -1805,28 +1804,12 @@ public: continue; } - if (useYdbResponseFormat) { - TMaybe<ui64> effectiveRowsLimit = FillSettings.RowsLimitPerWrite; - if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) { - effectiveRowsLimit = QueryState->PreparedQuery->GetResults(i).GetRowsLimit(); - } - auto* ydbResult = QueryState->QueryData->GetYdbTxResult(phyQuery.GetResultBindings(i), response->GetArena(), effectiveRowsLimit); - response->AddYdbResults()->Swap(ydbResult); - } else { - auto* protoRes = QueryState->QueryData->GetMkqlTxResult(phyQuery.GetResultBindings(i), response->GetArena()); - std::optional<IDataProvider::TFillSettings> fillSettings; - if (QueryState->PreparedQuery->ResultsSize()) { - YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), "" - << phyQuery.ResultBindingsSize() << " != " << QueryState->PreparedQuery->ResultsSize()); - const auto& result = QueryState->PreparedQuery->GetResults(i); - if (result.GetRowsLimit()) { - fillSettings = FillSettings; - fillSettings->RowsLimitPerWrite = result.GetRowsLimit(); - } - } - auto* finalResult = KikimrResultToProto(*protoRes, {}, fillSettings.value_or(FillSettings), response->GetArena()); - response->AddResults()->Swap(finalResult); + TMaybe<ui64> effectiveRowsLimit = FillSettings.RowsLimitPerWrite; + if (QueryState->PreparedQuery->GetResults(i).GetRowsLimit()) { + effectiveRowsLimit = QueryState->PreparedQuery->GetResults(i).GetRowsLimit(); } + auto* ydbResult = QueryState->QueryData->GetYdbTxResult(phyQuery.GetResultBindings(i), response->GetArena(), effectiveRowsLimit); + response->AddYdbResults()->Swap(ydbResult); } } @@ -1892,10 +1875,10 @@ public: AddTrailingInfo(response->Record.GetRef()); NDataIntegrity::LogIntegrityTrails( - request->Get()->GetTraceId(), - request->Get()->GetAction(), - request->Get()->GetType(), - response, + request->Get()->GetTraceId(), + request->Get()->GetAction(), + request->Get()->GetType(), + response, TlsActivationContext->AsActorContext() ); @@ -1955,7 +1938,7 @@ public: QueryState->UserRequestContext->TraceId, QueryState->GetAction(), QueryState->GetType(), - QueryResponse, + QueryResponse, TlsActivationContext->AsActorContext() ); diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index 587bf6d010..272d0534a1 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -194,7 +194,7 @@ public: Config->FeatureFlags = AppData(ctx)->FeatureFlags; KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup, - QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry, + QueryState->RequestEv->GetUserToken(), GUCSettings, QueryServiceConfig, Settings.ApplicationName, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false, nullptr, nullptr, nullptr, QueryState->RequestEv->GetUserRequestContext()); auto& queryRequest = QueryState->RequestEv; @@ -959,7 +959,7 @@ private: // If we have result it must be allocated on protobuf arena Y_ASSERT(result->GetArena()); Y_ASSERT(resp->GetArena() == result->GetArena()); - resp->AddResults()->Swap(result); + resp->AddYdbResults()->Swap(result); } } else { auto resp = ev.MutableResponse(); diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.cpp b/ydb/core/kqp/session_actor/kqp_worker_common.cpp index 63db80b02d..e7cf7113b0 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_common.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_common.cpp @@ -119,7 +119,7 @@ void SlowLogQuery(const TActorContext &ctx, const TKikimrConfiguration* config, << 'b'; ui64 resultsSize = 0; - for (auto& result : record->GetResponse().GetResults()) { + for (auto& result : record->GetResponse().GetYdbResults()) { resultsSize += result.ByteSize(); } diff --git a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp index 6525749d7c..b09e37b794 100644 --- a/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp +++ b/ydb/core/kqp/ut/yql/kqp_scripting_ut.cpp @@ -79,7 +79,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) { ALTER TABLE `/Root/ScriptingCreateAndAlterTableTest` SET (AUTO_PARTITIONING_BY_SIZE = ENABLED); )").GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - + result = client.ExecuteYqlScript(R"( ALTER TABLE `/Root/ScriptingCreateAndAlterTableTest` SET (AUTO_PARTITIONING_BY_SIZE = ENABLED); COMMIT; @@ -88,7 +88,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) { ALTER TABLE `/Root/ScriptingCreateAndAlterTableTest` SET (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 4); )").GetValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); - + result = client.ExecuteYqlScript(R"( ALTER TABLE `/Root/ScriptingCreateAndAlterTableTest` SET (AUTO_PARTITIONING_BY_SIZE = ENABLED); COMMIT; @@ -989,7 +989,36 @@ Y_UNIT_TEST_SUITE(KqpScripting) { [[[101u]];[[201u]];[[301u]];[[401u]];[[501u]];[[601u]];[[701u]];[[801u]]]; [[8u]]; [[8u]]; - [[8u]]])", StreamResultToYson(it)); + [[8u]] + ])", StreamResultToYson(it)); + } + + Y_UNIT_TEST(SelectNullType) { + TKikimrRunner kikimr; + TScriptingClient client(kikimr.GetDriver()); + { + auto result = client.ExecuteYqlScript(R"( + CREATE TABLE demo1(id Text, PRIMARY KEY(id)); + )").GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteYqlScript(R"( + UPSERT INTO demo1(id) VALUES("a"),("b"),("c"); + )").GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteYqlScript(R"( + SELECT NULL auto_proc_ FROM demo1 LIMIT 10; + )").GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [#];[#];[#] + ])", FormatResultSetYson(result.GetResultSet(0))); + } } Y_UNIT_TEST(StreamExecuteYqlScriptLeadingEmptyScan) { @@ -1206,7 +1235,7 @@ Y_UNIT_TEST_SUITE(KqpScripting) { Y_UNIT_TEST(StreamExecuteYqlScriptPg) { TKikimrRunner kikimr; - + auto settings = TExecuteYqlRequestSettings() .Syntax(Ydb::Query::SYNTAX_PG); diff --git a/ydb/core/load_test/aggregated_result.cpp b/ydb/core/load_test/aggregated_result.cpp index 82ae3dee3e..548a5628b4 100644 --- a/ydb/core/load_test/aggregated_result.cpp +++ b/ydb/core/load_test/aggregated_result.cpp @@ -6,6 +6,7 @@ #include <util/string/cast.h> #include <ydb/library/mkql_proto/protos/minikql.pb.h> +#include <ydb/public/sdk/cpp/client/ydb_result/result.h> namespace NKikimr { @@ -77,55 +78,35 @@ IOutputStream& operator<<(IOutputStream& output, const TAggregatedResult& result return output; } -using TColumnPositions = THashMap<TString, ui32>; - -TColumnPositions GetColumnPositionsInResponse(const NKikimrMiniKQL::TType& ttype) { - TColumnPositions columnPositions; - for (const NKikimrMiniKQL::TMember& member : ttype.GetStruct().GetMember()) { - if (member.GetName() == "Data") { - const auto& listStruct = member.GetType().GetList().GetItem().GetStruct(); - for (const NKikimrMiniKQL::TMember& listMember : listStruct.GetMember()) { - columnPositions.emplace(listMember.GetName(), columnPositions.size()); - } - break; - } - } - return columnPositions; -} - -NKikimrMiniKQL::TValue GetOptional(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - return listItem.GetStruct(pos).GetOptional(); -} - template<typename T> -T ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - Y_UNUSED(listItem, pos); +T ExtractValue(NYdb::TResultSetParser& parser, const TString& column) { + Y_UNUSED(parser, column); Y_ABORT("unimplemented"); } template<> -ui32 ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - return GetOptional(listItem, pos).GetUint32(); +ui32 ExtractValue(NYdb::TResultSetParser& parser, const TString& column) { + return parser.ColumnParser(column).GetOptionalUint32().GetOrElse(0); } template<> -ui64 ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - return GetOptional(listItem, pos).GetUint64(); +ui64 ExtractValue(NYdb::TResultSetParser& parser, const TString& column) { + return parser.ColumnParser(column).GetOptionalUint64().GetOrElse(0); } template<> -double ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - return GetOptional(listItem, pos).GetDouble(); +double ExtractValue(NYdb::TResultSetParser& parser, const TString& column) { + return parser.ColumnParser(column).GetOptionalDouble().GetOrElse(static_cast<double>(0)); } template<> -TString ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - return GetOptional(listItem, pos).GetBytes(); +TString ExtractValue(NYdb::TResultSetParser& parser, const TString& column) { + return parser.ColumnParser(column).GetOptionalString().GetOrElse(""); } template<> -TInstant ExtractValue(const NKikimrMiniKQL::TValue& listItem, ui32 pos) { - return TInstant::Seconds(GetOptional(listItem, pos).GetUint32()); +TInstant ExtractValue(NYdb::TResultSetParser& parser, const TString& column) { + return TInstant::Seconds(parser.ColumnParser(column).GetOptionalUint32().GetOrElse(0)); } bool GetStatName(TStringBuf columnName, TStringBuf& statName, TStringBuf& suffix) { @@ -161,38 +142,40 @@ void SetInAggregatedField(TStringBuf suffix, T value, TAggregatedField<U>& dst) } } -TAggregatedResult GetResultFromValueListItem(const NKikimrMiniKQL::TValue& listItem, const TColumnPositions& columnPositions) { +TAggregatedResult GetResultFromValueListItem(NYdb::TResultSetParser& parser, const NYdb::TResultSet& rs) { TAggregatedResult result; TStringBuf statName; TStringBuf suffix; TStringBuf levelSb; - for (const auto& [column, pos] : columnPositions) { + for (const auto& columnMeta : rs.GetColumnsMeta()) { + TString column = columnMeta.Name; + if (column == "id") { - result.Uuid = ExtractValue<TString>(listItem, pos); + result.Uuid = ExtractValue<TString>(parser, column); } else if (column == "start") { - result.Start = ExtractValue<TInstant>(listItem, pos); + result.Start = ExtractValue<TInstant>(parser, column); } else if (column == "finish") { - result.Finish = ExtractValue<TInstant>(listItem, pos); + result.Finish = ExtractValue<TInstant>(parser, column); } else if (column == "total_nodes") { - result.Stats.TotalNodes = ExtractValue<ui32>(listItem, pos); + result.Stats.TotalNodes = ExtractValue<ui32>(parser, column); } else if (column == "success_nodes") { - result.Stats.SuccessNodes = ExtractValue<ui32>(listItem, pos); + result.Stats.SuccessNodes = ExtractValue<ui32>(parser, column); } else if (column == "config") { - result.Config = ExtractValue<TString>(listItem, pos); + result.Config = ExtractValue<TString>(parser, column); } else if (GetStatName(column, statName, suffix)) { if (statName == "transactions") { if (suffix == "_avg") { - SetInAggregatedField(suffix, ExtractValue<double>(listItem, pos), result.Stats.Transactions); + SetInAggregatedField(suffix, ExtractValue<double>(parser, column), result.Stats.Transactions); } else { - SetInAggregatedField(suffix, ExtractValue<ui64>(listItem, pos), result.Stats.Transactions); + SetInAggregatedField(suffix, ExtractValue<ui64>(parser, column), result.Stats.Transactions); } } else if (statName == "transactions_per_sec") { - SetInAggregatedField(suffix, ExtractValue<double>(listItem, pos), result.Stats.TransactionsPerSecond); + SetInAggregatedField(suffix, ExtractValue<double>(parser, column), result.Stats.TransactionsPerSecond); } else if (statName == "errors_per_sec") { - SetInAggregatedField(suffix, ExtractValue<double>(listItem, pos), result.Stats.ErrorsPerSecond); + SetInAggregatedField(suffix, ExtractValue<double>(parser, column), result.Stats.ErrorsPerSecond); } else if (GetPercentileLevel(statName, levelSb)) { auto level = FromString<EPercentileLevel>(levelSb); - SetInAggregatedField(suffix, ExtractValue<double>(listItem, pos), result.Stats.Percentiles[level]); + SetInAggregatedField(suffix, ExtractValue<double>(parser, column), result.Stats.Percentiles[level]); } } } @@ -200,16 +183,16 @@ TAggregatedResult GetResultFromValueListItem(const NKikimrMiniKQL::TValue& listI } bool LoadResultFromResponseProto(const NKikimrKqp::TQueryResponse& response, TVector<TAggregatedResult>& results) { - const auto& ttype = response.GetResults(0).GetType(); - auto columnPositions = GetColumnPositionsInResponse(ttype); - if (columnPositions.empty()) { - return false; - } + Y_ABORT_UNLESS(response.GetYdbResults().size() > 0); + + NYdb::TResultSet rs(response.GetYdbResults(0)); + NYdb::TResultSetParser parser(response.GetYdbResults(0)); results.clear(); - for (const NKikimrMiniKQL::TValue& listItem : response.GetResults(0).GetValue().GetStruct().Get(0).GetList()) { - results.push_back(GetResultFromValueListItem(listItem, columnPositions)); + while(parser.TryNextRow()) { + results.push_back(GetResultFromValueListItem(parser, rs)); } + return true; } diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index a4c6906783..f4dcf7457d 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -251,7 +251,7 @@ message TQueryResponseExtraInfo { message TQueryResponse { optional bytes SessionId = 1; reserved 2; // (deprecated) QueryErrors - repeated NKikimrMiniKQL.TResult Results = 3; + // repeated NKikimrMiniKQL.TResult Results = 3; // optional TQueryProfile Profile = 4; // TODO: Deprecate, use QueryStats reserved 4; optional bytes PreparedQuery = 5; @@ -407,7 +407,7 @@ message TEvPingSessionResponse { message TEvDataQueryStreamPart { optional NActorsProto.TActorId GatewayActorId = 1; - repeated NKikimrMiniKQL.TResult Results = 2; + repeated Ydb.ResultSet Results = 2; }; message TCancelQueryRequest { diff --git a/ydb/core/viewer/viewer_query.h b/ydb/core/viewer/viewer_query.h index c875dc0ba1..7529236286 100644 --- a/ydb/core/viewer/viewer_query.h +++ b/ydb/core/viewer/viewer_query.h @@ -546,14 +546,8 @@ private: void MakeOkReply(NJson::TJsonValue& jsonResponse, NKikimrKqp::TEvQueryResponse& record) { const auto& response = record.GetResponse(); - if (response.ResultsSize() > 0 || response.YdbResultsSize() > 0) { + if (response.YdbResultsSize() > 0) { try { - for (const auto& result : response.GetResults()) { - Ydb::ResultSet resultSet; - NKqp::ConvertKqpQueryResultToDbResult(result, &resultSet); - ResultSets.emplace_back().emplace_back(std::move(resultSet)); - } - for (const auto& result : response.GetYdbResults()) { ResultSets.emplace_back().emplace_back(result); } diff --git a/ydb/core/viewer/viewer_query_old.h b/ydb/core/viewer/viewer_query_old.h index 9b130e9be7..a9bc078343 100644 --- a/ydb/core/viewer/viewer_query_old.h +++ b/ydb/core/viewer/viewer_query_old.h @@ -493,14 +493,8 @@ private: void MakeOkReply(NJson::TJsonValue& jsonResponse, NKikimrKqp::TEvQueryResponse& record) { const auto& response = record.GetResponse(); - if (response.ResultsSize() > 0 || response.YdbResultsSize() > 0) { + if (response.YdbResultsSize() > 0) { try { - for (const auto& result : response.GetResults()) { - Ydb::ResultSet resultSet; - NKqp::ConvertKqpQueryResultToDbResult(result, &resultSet); - ResultSets.emplace_back(std::move(resultSet)); - } - for (const auto& result : response.GetYdbResults()) { ResultSets.emplace_back(result); } diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index fb200de3fe..5e330ecbc9 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -649,10 +649,9 @@ TRequestResult TYdbSetup::YqlScriptRequest(const TRequestOptions& query, TQueryM FillQueryMeta(meta, responseRecord); - resultSets.reserve(responseRecord.results_size()); - for (const auto& result : responseRecord.results()) { - resultSets.emplace_back(); - NKikimr::NKqp::ConvertKqpQueryResultToDbResult(result, &resultSets.back()); + resultSets.reserve(responseRecord.ydbresults_size()); + for (const auto& result : responseRecord.ydbresults()) { + resultSets.emplace_back(result); } return TRequestResult(yqlQueryOperationResponse.GetYdbStatus(), responseRecord.GetQueryIssues()); |