diff options
author | makostrov <makostrov@yandex-team.com> | 2023-09-30 15:44:52 +0300 |
---|---|---|
committer | makostrov <makostrov@yandex-team.com> | 2023-09-30 15:59:59 +0300 |
commit | d0df3655f8a3f6ffb9a6658f3918c40c5d28c6b1 (patch) | |
tree | 762dcb05c900f47d1b36ab5fab8d4013d66c1988 | |
parent | 8fdc2f37e43f846ca5405525508074333bae94b1 (diff) | |
download | ydb-d0df3655f8a3f6ffb9a6658f3918c40c5d28c6b1.tar.gz |
Generate TraceId if RequestCtx don't have one
-rw-r--r-- | ydb/core/kqp/common/kqp_user_request_context.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_impl.h | 14 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_literal_executer.cpp | 10 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.cpp | 50 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner.h | 6 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp | 27 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_planner_strategy.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/gateway/kqp_ic_gateway.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/session_actor/kqp_session_actor.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 8 | ||||
-rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scan/kqp_scan_ut.cpp | 33 |
13 files changed, 122 insertions, 44 deletions
diff --git a/ydb/core/kqp/common/kqp_user_request_context.h b/ydb/core/kqp/common/kqp_user_request_context.h index 3cf4559ab63..3fccabda861 100644 --- a/ydb/core/kqp/common/kqp_user_request_context.h +++ b/ydb/core/kqp/common/kqp_user_request_context.h @@ -10,6 +10,8 @@ namespace NKikimr::NKqp { TString Database; TString SessionId; + TUserRequestContext() = default; + TUserRequestContext(const TString& traceId, const TString& database, const TString& sessionId) : TraceId(traceId) , Database(database) diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 67a94ed6e63..c8bd2dfc9a1 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -99,7 +99,7 @@ IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId bool temporary, TString SessionId); std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral( - IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner); + IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner, const TIntrusivePtr<TUserRequestContext>& userRequestContext); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 4dc36f49e47..1a1aeddaf95 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -53,13 +53,13 @@ LWTRACE_USING(KQP_PROVIDER); namespace NKikimr { namespace NKqp { -#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream) -#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream) -#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream) -#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream) -#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream) -#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream) -#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream) +#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) +#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) +#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) +#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) +#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) enum class EExecType { Data, diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp index 2ada01f805f..8774fc02443 100644 --- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp @@ -73,11 +73,13 @@ TDqTaskRunnerExecutionContext CreateTaskRunnerExecutionContext() { class TKqpLiteralExecuter { public: - TKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner) + TKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner, + const TIntrusivePtr<TUserRequestContext>& userRequestContext) : Request(std::move(request)) , Counters(counters) , OwnerActor(owner) , LiteralExecuterSpan(TWilsonKqp::LiteralExecuter, std::move(Request.TraceId), "LiteralExecuter") + , UserRequestContext(userRequestContext) { ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc); ResponseEv->Orbit = std::move(Request.Orbit); @@ -406,15 +408,17 @@ private: std::unique_ptr<NKikimr::NMiniKQL::TKqpComputeContextBase> ComputeCtx; std::unique_ptr<NYql::NDq::TDqTaskRunnerContext> RunnerContext; NWilson::TSpan LiteralExecuterSpan; + + TIntrusivePtr<TUserRequestContext> UserRequestContext; }; } // anonymous namespace std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral( - IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner) + IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner, const TIntrusivePtr<TUserRequestContext>& userRequestContext) { std::unique_ptr<TKqpLiteralExecuter> executer = std::make_unique<TKqpLiteralExecuter>( - std::move(request), counters, owner); + std::move(request), counters, owner, userRequestContext); return executer->ExecuteLiteral(); } diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index f4516628803..7178e37ee05 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -15,10 +15,10 @@ using namespace NActors; namespace NKikimr::NKqp { -#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << stream) -#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << stream) -#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << stream) -#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << stream) +#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) +#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) +#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) +#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream) using namespace NYql; @@ -27,7 +27,7 @@ namespace { const ui64 MaxTaskSize = 48_MB; template <class TCollection> -std::unique_ptr<TEvKqp::TEvAbortExecution> CheckTaskSize(ui64 TxId, const TCollection& tasks) { +std::unique_ptr<TEvKqp::TEvAbortExecution> CheckTaskSize(ui64 TxId, const TIntrusivePtr<TUserRequestContext>& UserRequestContext, const TCollection& tasks) { for (const auto& task : tasks) { if (ui32 size = task.ByteSize(); size > MaxTaskSize) { LOG_E("Abort execution. Task #" << task.GetId() << " size is too big: " << size << " > " << MaxTaskSize); @@ -51,6 +51,8 @@ void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskRe } +bool TKqpPlanner::UseMockEmptyPlanner = false; + // Task can allocate extra memory during execution. // So, we estimate total memory amount required for task as apriori task size multiplied by this constant. constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2; @@ -92,6 +94,28 @@ TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& execu } } +// ResourcesSnapshot, ResourceEstimations + +void TKqpPlanner::LogMemoryStatistics(const TLogFunc& logFunc) { + uint64_t totalMemory = 0; + uint32_t totalComputeActors = 0; + for (auto& node : ResourcesSnapshot) { + logFunc(TStringBuilder() << "[AvailableResources] node #" << node.GetNodeId() + << " memory: " << (node.GetTotalMemory() - node.GetUsedMemory()) + << ", ca: " << node.GetAvailableComputeActors()); + totalMemory += (node.GetTotalMemory() - node.GetUsedMemory()); + totalComputeActors += node.GetAvailableComputeActors(); + } + logFunc(TStringBuilder() << "Total nodes: " << ResourcesSnapshot.size() << ", total memory: " << totalMemory << ", total CA:" << totalComputeActors); + + totalMemory = 0; + for (const auto& task : ResourceEstimations) { + logFunc(TStringBuilder() << "[TaskResources] task: " << task.TaskId << ", memory: " << task.TotalMemoryLimit); + totalMemory += task.TotalMemoryLimit; + } + logFunc(TStringBuilder() << "Total tasks: " << ResourceEstimations.size() << ", total memory: " << totalMemory); +} + bool TKqpPlanner::SendStartKqpTasksRequest(ui32 requestId, const TActorId& target) { YQL_ENSURE(requestId < Requests.size()); @@ -248,11 +272,11 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() { return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release()); } - auto planner = CreateKqpGreedyPlanner(); + auto planner = (UseMockEmptyPlanner ? CreateKqpMockEmptyPlanner() : CreateKqpGreedyPlanner()); // KqpMockEmptyPlanner is a mock planner for tests auto ctx = TlsActivationContext->AsActorContext(); if (ctx.LoggerSettings() && ctx.LoggerSettings()->Satisfies(NActors::NLog::PRI_DEBUG, NKikimrServices::KQP_EXECUTER)) { - planner->SetLogFunc([TxId = TxId](TStringBuf msg) { LOG_D(msg); }); + planner->SetLogFunc([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); }); } THashMap<ui64, size_t> nodeIdtoIdx; @@ -260,6 +284,8 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() { nodeIdtoIdx[ResourcesSnapshot[idx].nodeid()] = idx; } + LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); }); + auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations); THashMap<ui64, ui64> alreadyAssigned; @@ -281,12 +307,10 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() { return nullptr; } else { - // what is here? - Cerr << (*UserRequestContext); - Y_FAIL_S("Fail point 123#!"); - // UserRequestContext->Out(Cerr); + LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_E(msg); }); + auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, - "Not enough resources to execute query"); + TStringBuilder() << "Not enough resources to execute query. " << "TraceId: " << UserRequestContext->TraceId); return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release()); } } @@ -414,7 +438,7 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() { SortUnique(tasks); auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId); request.SerializedRequest = SerializeRequest(request); - auto ev = CheckTaskSize(TxId, request.SerializedRequest->Record.GetTasks()); + auto ev = CheckTaskSize(TxId, UserRequestContext, request.SerializedRequest->Record.GetTasks()); if (ev != nullptr) { return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.release()); } diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h index d8f5da48ad7..2c9cf4d7d71 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.h +++ b/ydb/core/kqp/executer_actor/kqp_planner.h @@ -38,6 +38,8 @@ class TKqpPlanner { {} }; + using TLogFunc = std::function<void(TStringBuf message)>; + public: TKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline, @@ -70,7 +72,7 @@ private: std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> SerializeRequest(const TRequestData& requestData); ui32 CalcSendMessageFlagsForNode(ui32 nodeId); - + void LogMemoryStatistics(const TLogFunc& logFunc); private: const ui64 TxId; @@ -104,6 +106,8 @@ private: TIntrusivePtr<TUserRequestContext> UserRequestContext; +public: + static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error }; std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, diff --git a/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp b/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp index 5cd5bced3d2..6fbc46b0657 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp @@ -116,18 +116,6 @@ public: { TVector<TResult> result; TNodesManager nodes(nodeResources); - for (auto& node : nodeResources) { - if (LogFunc) { - LogFunc(TStringBuilder() << "[AvailableResources] node #" << node.GetNodeId() - << " memory: " << (node.GetTotalMemory() - node.GetUsedMemory()) - << ", ca: " << node.GetAvailableComputeActors()); - } - } - if (LogFunc) { - for (const auto& task : tasks) { - LogFunc(TStringBuilder() << "[TaskResources] task: " << task.TaskId << ", memory: " << task.TotalMemoryLimit); - } - } for (const auto& taskEstimation : tasks) { auto node = nodes.PopOptimalNodeWithLimits(taskEstimation.TotalMemoryLimit * TASK_MEMORY_ESTIMATION_OVERFLOW, 1); @@ -169,11 +157,26 @@ public: } }; +class TKqpMockEmptyPlanner : public IKqpPlannerStrategy { +public: + ~TKqpMockEmptyPlanner() override {} + + TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>&, + const TVector<TTaskResourceEstimation>&) override + { + return {}; + } +}; + } // anonymous namespace THolder<IKqpPlannerStrategy> CreateKqpGreedyPlanner() { return MakeHolder<TKqpGreedyPlanner>(); } +THolder<IKqpPlannerStrategy> CreateKqpMockEmptyPlanner() { + return MakeHolder<TKqpMockEmptyPlanner>(); +} + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_planner_strategy.h b/ydb/core/kqp/executer_actor/kqp_planner_strategy.h index a0733fd4116..41a550259f0 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner_strategy.h +++ b/ydb/core/kqp/executer_actor/kqp_planner_strategy.h @@ -32,4 +32,6 @@ protected: THolder<IKqpPlannerStrategy> CreateKqpGreedyPlanner(); +THolder<IKqpPlannerStrategy> CreateKqpMockEmptyPlanner(); + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 3b5728a6dbb..478d9bb239a 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -486,7 +486,7 @@ public: {} void Bootstrap() { - auto result = ::NKikimr::NKqp::ExecuteLiteral(std::move(Request), Counters, SelfId()); + auto result = ::NKikimr::NKqp::ExecuteLiteral(std::move(Request), Counters, SelfId(), MakeIntrusive<TUserRequestContext>()); ProcessPureExecution(result); Become(&TThis::DieState); Send(SelfId(), new TEvents::TEvPoisonPill()); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 6276b9d6a40..6390be1c31d 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -217,6 +217,9 @@ public: QueryState = std::make_shared<TKqpQueryState>( ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession, Settings.TableService, Settings.QueryService, std::move(id), SessionId); + if (QueryState->UserRequestContext->TraceId == "") { + QueryState->UserRequestContext->TraceId = UlidGen.Next().ToString(); + } } void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) { @@ -990,7 +993,8 @@ public: request.Orbit = std::move(QueryState->Orbit); } request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId(); - auto response = ExecuteLiteral(std::move(request), RequestCounters, SelfId()); + Y_ENSURE(QueryState); + auto response = ExecuteLiteral(std::move(request), RequestCounters, SelfId(), QueryState->UserRequestContext); ++QueryState->CurrentTx; ProcessExecuterResult(response.get()); return true; @@ -1065,7 +1069,7 @@ public: request.MaxShardCount = RequestControls.MaxShardCount; request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId(); LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize()); - + auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(), RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(), diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index 3c15feee940..122f7dd2867 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -608,8 +608,9 @@ bool IsTimeoutError(NYdb::EStatus status) { return status == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED || status == NYdb::EStatus::TIMEOUT || status == NYdb::EStatus::CANCELLED; } +// IssueMessageSubString - uses only in case if !streamPart.IsSuccess() template<typename TIterator> -TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles, bool throwOnTimeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS) { +TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles, bool throwOnTimeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS, const TString& issueMessageSubString = "") { TStringStream out; NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Text, ::NYson::EYsonType::Node, true); writer.OnBeginList(); @@ -621,6 +622,7 @@ TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles, bool t if (!streamPart.IsSuccess()) { if (opStatus != NYdb::EStatus::SUCCESS) { UNIT_ASSERT_VALUES_EQUAL_C(streamPart.GetStatus(), opStatus, streamPart.GetIssues().ToString()); + UNIT_ASSERT_C(streamPart.GetIssues().ToString().Contains(issueMessageSubString), TStringBuilder() << "Issue should contain '" << issueMessageSubString << "'. " << streamPart.GetIssues().ToString()); break; } if (throwOnTimeout && IsTimeoutError(streamPart.GetStatus())) { @@ -644,8 +646,8 @@ TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles, bool t return out.Str(); } -TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTimeout, const NYdb::EStatus& opStatus) { - return StreamResultToYsonImpl(it, nullptr, throwOnTimeout, opStatus); +TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTimeout, const NYdb::EStatus& opStatus, const TString& issueMessageSubString) { + return StreamResultToYsonImpl(it, nullptr, throwOnTimeout, opStatus, issueMessageSubString); } TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it, bool throwOnTimeout, const NYdb::EStatus& opStatus) { diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h index 44bd0035f38..afa9ac01508 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.h +++ b/ydb/core/kqp/ut/common/kqp_ut_common.h @@ -260,7 +260,7 @@ public: NYdb::EStatus Status; }; -TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS); +TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS, const TString& issueMessageSubString = ""); TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS); TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS); diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index 6b3e52a63be..eca49356bb9 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -8,6 +8,7 @@ #include <util/generic/size_literals.h> #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/executer_actor/kqp_executer.h> +#include <ydb/core/kqp/executer_actor/kqp_planner.h> namespace NKikimr { namespace NKqp { @@ -2572,5 +2573,37 @@ Y_UNIT_TEST_SUITE(KqpScan) { } } +Y_UNIT_TEST_SUITE(KqpRequestContext) { + + Y_UNIT_TEST(TraceIdInErrorMessage) { + auto settings = TKikimrSettings() + .SetAppConfig(AppCfg()) + .SetEnableScriptExecutionOperations(true) + .SetNodeCount(4); + TKikimrRunner kikimr{settings}; + auto db = kikimr.GetTableClient(); + + NKikimr::NKqp::TKqpPlanner::UseMockEmptyPlanner = true; + Y_DEFER { + NKikimr::NKqp::TKqpPlanner::UseMockEmptyPlanner = false; // just in case if test fails + }; + + auto it = db.StreamExecuteScanQuery(R"( + SELECT Text, SUM(Key) AS Total FROM `/Root/EightShard` + GROUP BY Text + ORDER BY Total DESC; + )").GetValueSync(); + + UNIT_ASSERT(it.IsSuccess()); + try { + auto yson = StreamResultToYson(it, true, NYdb::EStatus::PRECONDITION_FAILED, "TraceId"); + } catch (const std::exception& ex) { + UNIT_ASSERT_C(false, "Exception NYdb::EStatus::PRECONDITION_FAILED not found or IssueMessage doesn't contain 'TraceId'"); + } + + NKikimr::NKqp::TKqpPlanner::UseMockEmptyPlanner = false; + } +} + } // namespace NKqp } // namespace NKikimr |