aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormakostrov <makostrov@yandex-team.com>2023-09-30 15:44:52 +0300
committermakostrov <makostrov@yandex-team.com>2023-09-30 15:59:59 +0300
commitd0df3655f8a3f6ffb9a6658f3918c40c5d28c6b1 (patch)
tree762dcb05c900f47d1b36ab5fab8d4013d66c1988
parent8fdc2f37e43f846ca5405525508074333bae94b1 (diff)
downloadydb-d0df3655f8a3f6ffb9a6658f3918c40c5d28c6b1.tar.gz
Generate TraceId if RequestCtx don't have one
-rw-r--r--ydb/core/kqp/common/kqp_user_request_context.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h14
-rw-r--r--ydb/core/kqp/executer_actor/kqp_literal_executer.cpp10
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp50
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.h6
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp27
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner_strategy.h2
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp2
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp8
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp8
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.h2
-rw-r--r--ydb/core/kqp/ut/scan/kqp_scan_ut.cpp33
13 files changed, 122 insertions, 44 deletions
diff --git a/ydb/core/kqp/common/kqp_user_request_context.h b/ydb/core/kqp/common/kqp_user_request_context.h
index 3cf4559ab63..3fccabda861 100644
--- a/ydb/core/kqp/common/kqp_user_request_context.h
+++ b/ydb/core/kqp/common/kqp_user_request_context.h
@@ -10,6 +10,8 @@ namespace NKikimr::NKqp {
TString Database;
TString SessionId;
+ TUserRequestContext() = default;
+
TUserRequestContext(const TString& traceId, const TString& database, const TString& sessionId)
: TraceId(traceId)
, Database(database)
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 67a94ed6e63..c8bd2dfc9a1 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -99,7 +99,7 @@ IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId
bool temporary, TString SessionId);
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral(
- IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner);
+ IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner, const TIntrusivePtr<TUserRequestContext>& userRequestContext);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 4dc36f49e47..1a1aeddaf95 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -53,13 +53,13 @@ LWTRACE_USING(KQP_PROVIDER);
namespace NKikimr {
namespace NKqp {
-#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream)
-#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream)
-#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream)
-#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream)
-#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream)
-#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream)
-#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << stream)
+#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
+#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
+#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
+#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
+#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "ActorId: " << SelfId() << " TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
enum class EExecType {
Data,
diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
index 2ada01f805f..8774fc02443 100644
--- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
@@ -73,11 +73,13 @@ TDqTaskRunnerExecutionContext CreateTaskRunnerExecutionContext() {
class TKqpLiteralExecuter {
public:
- TKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner)
+ TKqpLiteralExecuter(IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner,
+ const TIntrusivePtr<TUserRequestContext>& userRequestContext)
: Request(std::move(request))
, Counters(counters)
, OwnerActor(owner)
, LiteralExecuterSpan(TWilsonKqp::LiteralExecuter, std::move(Request.TraceId), "LiteralExecuter")
+ , UserRequestContext(userRequestContext)
{
ResponseEv = std::make_unique<TEvKqpExecuter::TEvTxResponse>(Request.TxAlloc);
ResponseEv->Orbit = std::move(Request.Orbit);
@@ -406,15 +408,17 @@ private:
std::unique_ptr<NKikimr::NMiniKQL::TKqpComputeContextBase> ComputeCtx;
std::unique_ptr<NYql::NDq::TDqTaskRunnerContext> RunnerContext;
NWilson::TSpan LiteralExecuterSpan;
+
+ TIntrusivePtr<TUserRequestContext> UserRequestContext;
};
} // anonymous namespace
std::unique_ptr<TEvKqpExecuter::TEvTxResponse> ExecuteLiteral(
- IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner)
+ IKqpGateway::TExecPhysicalRequest&& request, TKqpRequestCounters::TPtr counters, TActorId owner, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
{
std::unique_ptr<TKqpLiteralExecuter> executer = std::make_unique<TKqpLiteralExecuter>(
- std::move(request), counters, owner);
+ std::move(request), counters, owner, userRequestContext);
return executer->ExecuteLiteral();
}
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index f4516628803..7178e37ee05 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -15,10 +15,10 @@ using namespace NActors;
namespace NKikimr::NKqp {
-#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << stream)
-#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << stream)
-#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << stream)
-#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << stream)
+#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
+#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
+#define LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
+#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ". " << "UserRequestContext: " << *UserRequestContext << ". " << stream)
using namespace NYql;
@@ -27,7 +27,7 @@ namespace {
const ui64 MaxTaskSize = 48_MB;
template <class TCollection>
-std::unique_ptr<TEvKqp::TEvAbortExecution> CheckTaskSize(ui64 TxId, const TCollection& tasks) {
+std::unique_ptr<TEvKqp::TEvAbortExecution> CheckTaskSize(ui64 TxId, const TIntrusivePtr<TUserRequestContext>& UserRequestContext, const TCollection& tasks) {
for (const auto& task : tasks) {
if (ui32 size = task.ByteSize(); size > MaxTaskSize) {
LOG_E("Abort execution. Task #" << task.GetId() << " size is too big: " << size << " > " << MaxTaskSize);
@@ -51,6 +51,8 @@ void BuildInitialTaskResources(const TKqpTasksGraph& graph, ui64 taskId, TTaskRe
}
+bool TKqpPlanner::UseMockEmptyPlanner = false;
+
// Task can allocate extra memory during execution.
// So, we estimate total memory amount required for task as apriori task size multiplied by this constant.
constexpr ui32 MEMORY_ESTIMATION_OVERFLOW = 2;
@@ -92,6 +94,28 @@ TKqpPlanner::TKqpPlanner(TKqpTasksGraph& graph, ui64 txId, const TActorId& execu
}
}
+// ResourcesSnapshot, ResourceEstimations
+
+void TKqpPlanner::LogMemoryStatistics(const TLogFunc& logFunc) {
+ uint64_t totalMemory = 0;
+ uint32_t totalComputeActors = 0;
+ for (auto& node : ResourcesSnapshot) {
+ logFunc(TStringBuilder() << "[AvailableResources] node #" << node.GetNodeId()
+ << " memory: " << (node.GetTotalMemory() - node.GetUsedMemory())
+ << ", ca: " << node.GetAvailableComputeActors());
+ totalMemory += (node.GetTotalMemory() - node.GetUsedMemory());
+ totalComputeActors += node.GetAvailableComputeActors();
+ }
+ logFunc(TStringBuilder() << "Total nodes: " << ResourcesSnapshot.size() << ", total memory: " << totalMemory << ", total CA:" << totalComputeActors);
+
+ totalMemory = 0;
+ for (const auto& task : ResourceEstimations) {
+ logFunc(TStringBuilder() << "[TaskResources] task: " << task.TaskId << ", memory: " << task.TotalMemoryLimit);
+ totalMemory += task.TotalMemoryLimit;
+ }
+ logFunc(TStringBuilder() << "Total tasks: " << ResourceEstimations.size() << ", total memory: " << totalMemory);
+}
+
bool TKqpPlanner::SendStartKqpTasksRequest(ui32 requestId, const TActorId& target) {
YQL_ENSURE(requestId < Requests.size());
@@ -248,11 +272,11 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release());
}
- auto planner = CreateKqpGreedyPlanner();
+ auto planner = (UseMockEmptyPlanner ? CreateKqpMockEmptyPlanner() : CreateKqpGreedyPlanner()); // KqpMockEmptyPlanner is a mock planner for tests
auto ctx = TlsActivationContext->AsActorContext();
if (ctx.LoggerSettings() && ctx.LoggerSettings()->Satisfies(NActors::NLog::PRI_DEBUG, NKikimrServices::KQP_EXECUTER)) {
- planner->SetLogFunc([TxId = TxId](TStringBuf msg) { LOG_D(msg); });
+ planner->SetLogFunc([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); });
}
THashMap<ui64, size_t> nodeIdtoIdx;
@@ -260,6 +284,8 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
nodeIdtoIdx[ResourcesSnapshot[idx].nodeid()] = idx;
}
+ LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_D(msg); });
+
auto plan = planner->Plan(ResourcesSnapshot, ResourceEstimations);
THashMap<ui64, ui64> alreadyAssigned;
@@ -281,12 +307,10 @@ std::unique_ptr<IEventHandle> TKqpPlanner::AssignTasksToNodes() {
return nullptr;
} else {
- // what is here?
- Cerr << (*UserRequestContext);
- Y_FAIL_S("Fail point 123#!");
- // UserRequestContext->Out(Cerr);
+ LogMemoryStatistics([TxId = TxId, &UserRequestContext = UserRequestContext](TStringBuf msg) { LOG_E(msg); });
+
auto ev = MakeHolder<TEvKqp::TEvAbortExecution>(NYql::NDqProto::StatusIds::PRECONDITION_FAILED,
- "Not enough resources to execute query");
+ TStringBuilder() << "Not enough resources to execute query. " << "TraceId: " << UserRequestContext->TraceId);
return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.Release());
}
}
@@ -414,7 +438,7 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
SortUnique(tasks);
auto& request = Requests.emplace_back(std::move(tasks), CalcSendMessageFlagsForNode(nodeId), nodeId);
request.SerializedRequest = SerializeRequest(request);
- auto ev = CheckTaskSize(TxId, request.SerializedRequest->Record.GetTasks());
+ auto ev = CheckTaskSize(TxId, UserRequestContext, request.SerializedRequest->Record.GetTasks());
if (ev != nullptr) {
return std::make_unique<IEventHandle>(ExecuterId, ExecuterId, ev.release());
}
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.h b/ydb/core/kqp/executer_actor/kqp_planner.h
index d8f5da48ad7..2c9cf4d7d71 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner.h
@@ -38,6 +38,8 @@ class TKqpPlanner {
{}
};
+ using TLogFunc = std::function<void(TStringBuf message)>;
+
public:
TKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer, const IKqpGateway::TKqpSnapshot& snapshot,
const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TInstant deadline,
@@ -70,7 +72,7 @@ private:
std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> SerializeRequest(const TRequestData& requestData);
ui32 CalcSendMessageFlagsForNode(ui32 nodeId);
-
+ void LogMemoryStatistics(const TLogFunc& logFunc);
private:
const ui64 TxId;
@@ -104,6 +106,8 @@ private:
TIntrusivePtr<TUserRequestContext> UserRequestContext;
+public:
+ static bool UseMockEmptyPlanner; // for tests: if true then use TKqpMockEmptyPlanner that leads to the error
};
std::unique_ptr<TKqpPlanner> CreateKqpPlanner(TKqpTasksGraph& tasksGraph, ui64 txId, const TActorId& executer,
diff --git a/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp b/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp
index 5cd5bced3d2..6fbc46b0657 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner_strategy.cpp
@@ -116,18 +116,6 @@ public:
{
TVector<TResult> result;
TNodesManager nodes(nodeResources);
- for (auto& node : nodeResources) {
- if (LogFunc) {
- LogFunc(TStringBuilder() << "[AvailableResources] node #" << node.GetNodeId()
- << " memory: " << (node.GetTotalMemory() - node.GetUsedMemory())
- << ", ca: " << node.GetAvailableComputeActors());
- }
- }
- if (LogFunc) {
- for (const auto& task : tasks) {
- LogFunc(TStringBuilder() << "[TaskResources] task: " << task.TaskId << ", memory: " << task.TotalMemoryLimit);
- }
- }
for (const auto& taskEstimation : tasks) {
auto node = nodes.PopOptimalNodeWithLimits(taskEstimation.TotalMemoryLimit * TASK_MEMORY_ESTIMATION_OVERFLOW, 1);
@@ -169,11 +157,26 @@ public:
}
};
+class TKqpMockEmptyPlanner : public IKqpPlannerStrategy {
+public:
+ ~TKqpMockEmptyPlanner() override {}
+
+ TVector<TResult> Plan(const TVector<NKikimrKqp::TKqpNodeResources>&,
+ const TVector<TTaskResourceEstimation>&) override
+ {
+ return {};
+ }
+};
+
} // anonymous namespace
THolder<IKqpPlannerStrategy> CreateKqpGreedyPlanner() {
return MakeHolder<TKqpGreedyPlanner>();
}
+THolder<IKqpPlannerStrategy> CreateKqpMockEmptyPlanner() {
+ return MakeHolder<TKqpMockEmptyPlanner>();
+}
+
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_planner_strategy.h b/ydb/core/kqp/executer_actor/kqp_planner_strategy.h
index a0733fd4116..41a550259f0 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner_strategy.h
+++ b/ydb/core/kqp/executer_actor/kqp_planner_strategy.h
@@ -32,4 +32,6 @@ protected:
THolder<IKqpPlannerStrategy> CreateKqpGreedyPlanner();
+THolder<IKqpPlannerStrategy> CreateKqpMockEmptyPlanner();
+
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index 3b5728a6dbb..478d9bb239a 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -486,7 +486,7 @@ public:
{}
void Bootstrap() {
- auto result = ::NKikimr::NKqp::ExecuteLiteral(std::move(Request), Counters, SelfId());
+ auto result = ::NKikimr::NKqp::ExecuteLiteral(std::move(Request), Counters, SelfId(), MakeIntrusive<TUserRequestContext>());
ProcessPureExecution(result);
Become(&TThis::DieState);
Send(SelfId(), new TEvents::TEvPoisonPill());
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 6276b9d6a40..6390be1c31d 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -217,6 +217,9 @@ public:
QueryState = std::make_shared<TKqpQueryState>(
ev, QueryId, Settings.Database, Settings.Cluster, Settings.DbCounters, Settings.LongSession,
Settings.TableService, Settings.QueryService, std::move(id), SessionId);
+ if (QueryState->UserRequestContext->TraceId == "") {
+ QueryState->UserRequestContext->TraceId = UlidGen.Next().ToString();
+ }
}
void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) {
@@ -990,7 +993,8 @@ public:
request.Orbit = std::move(QueryState->Orbit);
}
request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId();
- auto response = ExecuteLiteral(std::move(request), RequestCounters, SelfId());
+ Y_ENSURE(QueryState);
+ auto response = ExecuteLiteral(std::move(request), RequestCounters, SelfId(), QueryState->UserRequestContext);
++QueryState->CurrentTx;
ProcessExecuterResult(response.get());
return true;
@@ -1065,7 +1069,7 @@ public:
request.MaxShardCount = RequestControls.MaxShardCount;
request.TraceId = QueryState ? QueryState->KqpSessionSpan.GetTraceId() : NWilson::TTraceId();
LOG_D("Sending to Executer TraceId: " << request.TraceId.GetTraceId() << " " << request.TraceId.GetSpanIdSize());
-
+
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(),
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index 3c15feee940..122f7dd2867 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -608,8 +608,9 @@ bool IsTimeoutError(NYdb::EStatus status) {
return status == NYdb::EStatus::CLIENT_DEADLINE_EXCEEDED || status == NYdb::EStatus::TIMEOUT || status == NYdb::EStatus::CANCELLED;
}
+// IssueMessageSubString - uses only in case if !streamPart.IsSuccess()
template<typename TIterator>
-TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles, bool throwOnTimeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS) {
+TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles, bool throwOnTimeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS, const TString& issueMessageSubString = "") {
TStringStream out;
NYson::TYsonWriter writer(&out, NYson::EYsonFormat::Text, ::NYson::EYsonType::Node, true);
writer.OnBeginList();
@@ -621,6 +622,7 @@ TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles, bool t
if (!streamPart.IsSuccess()) {
if (opStatus != NYdb::EStatus::SUCCESS) {
UNIT_ASSERT_VALUES_EQUAL_C(streamPart.GetStatus(), opStatus, streamPart.GetIssues().ToString());
+ UNIT_ASSERT_C(streamPart.GetIssues().ToString().Contains(issueMessageSubString), TStringBuilder() << "Issue should contain '" << issueMessageSubString << "'. " << streamPart.GetIssues().ToString());
break;
}
if (throwOnTimeout && IsTimeoutError(streamPart.GetStatus())) {
@@ -644,8 +646,8 @@ TString StreamResultToYsonImpl(TIterator& it, TVector<TString>* profiles, bool t
return out.Str();
}
-TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTimeout, const NYdb::EStatus& opStatus) {
- return StreamResultToYsonImpl(it, nullptr, throwOnTimeout, opStatus);
+TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTimeout, const NYdb::EStatus& opStatus, const TString& issueMessageSubString) {
+ return StreamResultToYsonImpl(it, nullptr, throwOnTimeout, opStatus, issueMessageSubString);
}
TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it, bool throwOnTimeout, const NYdb::EStatus& opStatus) {
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.h b/ydb/core/kqp/ut/common/kqp_ut_common.h
index 44bd0035f38..afa9ac01508 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.h
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.h
@@ -260,7 +260,7 @@ public:
NYdb::EStatus Status;
};
-TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS);
+TString StreamResultToYson(NYdb::NTable::TScanQueryPartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS, const TString& issueMessageSubString = "");
TString StreamResultToYson(NYdb::NScripting::TYqlResultPartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS);
TString StreamResultToYson(NYdb::NTable::TTablePartIterator& it, bool throwOnTImeout = false, const NYdb::EStatus& opStatus = NYdb::EStatus::SUCCESS);
diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
index 6b3e52a63be..eca49356bb9 100644
--- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
@@ -8,6 +8,7 @@
#include <util/generic/size_literals.h>
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
+#include <ydb/core/kqp/executer_actor/kqp_planner.h>
namespace NKikimr {
namespace NKqp {
@@ -2572,5 +2573,37 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
}
+Y_UNIT_TEST_SUITE(KqpRequestContext) {
+
+ Y_UNIT_TEST(TraceIdInErrorMessage) {
+ auto settings = TKikimrSettings()
+ .SetAppConfig(AppCfg())
+ .SetEnableScriptExecutionOperations(true)
+ .SetNodeCount(4);
+ TKikimrRunner kikimr{settings};
+ auto db = kikimr.GetTableClient();
+
+ NKikimr::NKqp::TKqpPlanner::UseMockEmptyPlanner = true;
+ Y_DEFER {
+ NKikimr::NKqp::TKqpPlanner::UseMockEmptyPlanner = false; // just in case if test fails
+ };
+
+ auto it = db.StreamExecuteScanQuery(R"(
+ SELECT Text, SUM(Key) AS Total FROM `/Root/EightShard`
+ GROUP BY Text
+ ORDER BY Total DESC;
+ )").GetValueSync();
+
+ UNIT_ASSERT(it.IsSuccess());
+ try {
+ auto yson = StreamResultToYson(it, true, NYdb::EStatus::PRECONDITION_FAILED, "TraceId");
+ } catch (const std::exception& ex) {
+ UNIT_ASSERT_C(false, "Exception NYdb::EStatus::PRECONDITION_FAILED not found or IssueMessage doesn't contain 'TraceId'");
+ }
+
+ NKikimr::NKqp::TKqpPlanner::UseMockEmptyPlanner = false;
+ }
+}
+
} // namespace NKqp
} // namespace NKikimr