aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorva-kuznecov <va-kuznecov@ydb.tech>2022-09-19 17:04:16 +0300
committerva-kuznecov <va-kuznecov@ydb.tech>2022-09-19 17:04:16 +0300
commit9b15b3d3735d7d14e50cd650ec66674b7ec221ce (patch)
treef514d026df4c372f98acfba1982327bb214ca0ce
parent168273e822f2d73c568d8c8ce32220abe497d60b (diff)
downloadydb-9b15b3d3735d7d14e50cd650ec66674b7ec221ce.tar.gz
Fix truncated results in session_actor
-rw-r--r--ydb/core/kqp/kqp_session_actor.cpp28
1 files changed, 22 insertions, 6 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp
index c37fab6f9bb..9cc4a34ca42 100644
--- a/ydb/core/kqp/kqp_session_actor.cpp
+++ b/ydb/core/kqp/kqp_session_actor.cpp
@@ -160,6 +160,11 @@ public:
RequestCounters->Counters = Counters;
RequestCounters->DbCounters = Settings.DbCounters;
RequestCounters->TxProxyMon = MakeIntrusive<NTxProxy::TTxProxyMon>(AppData()->Counters);
+
+ FillSettings.AllResultsBytesLimit = Nothing();
+ FillSettings.RowsLimitPerWrite = Config->_ResultRowsLimit.Get().GetRef();
+ FillSettings.Format = IDataProvider::EResultFormat::Custom;
+ FillSettings.FormatDetails = TString(KikimrMkqlProtoFormat);
}
void Bootstrap() {
@@ -1187,7 +1192,7 @@ public:
auto* response = ev->Get()->Record.MutableResponse();
auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId);
LOG_D(SelfId() << " " << requestInfo << " TEvTxResponse, CurrentTx: " << QueryState->CurrentTx
- << " response: " << response->DebugString());
+ << " response.status: " << response->GetStatus() << " results.size: " << response->GetResult().ResultsSize());
ExecuterId = TActorId{};
if (response->GetStatus() != Ydb::StatusIds::SUCCESS) {
@@ -1528,7 +1533,8 @@ public:
if (QueryState->PreparedQuery) {
auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery();
- for (auto& rb : phyQuery.GetResultBindings()) {
+ for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) {
+ auto& rb = phyQuery.GetResultBindings(i);
auto txIndex = rb.GetTxResultBinding().GetTxIndex();
auto resultIndex = rb.GetTxResultBinding().GetResultIndex();
@@ -1536,10 +1542,19 @@ public:
YQL_ENSURE(txIndex < txResults.size());
YQL_ENSURE(resultIndex < txResults[txIndex].size());
- IDataProvider::TFillSettings fillSettings;
- //TODO: shoud it be taken from PreparedQuery->GetResults().GetRowsLimit() ?
- fillSettings.RowsLimitPerWrite = Config->_ResultRowsLimit.Get().GetRef();
- auto* protoRes = KikimrResultToProto(txResults[txIndex][resultIndex], {}, fillSettings, arena.get());
+ std::optional<IDataProvider::TFillSettings> fillSettings;
+ if (QueryState->PreparedQuery->ResultsSize()) {
+ YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), ""
+ << phyQuery.ResultBindingsSize() << " != " << QueryState->PreparedQuery->ResultsSize());
+ const auto& result = QueryState->PreparedQuery->GetResults(i);
+ if (result.GetRowsLimit()) {
+ fillSettings = FillSettings;
+ fillSettings->RowsLimitPerWrite = result.GetRowsLimit();
+ }
+ }
+
+ auto* protoRes = KikimrResultToProto(txResults[txIndex][resultIndex], {},
+ fillSettings.value_or(FillSettings), arena.get());
response->AddResults()->Swap(protoRes);
}
}
@@ -2233,6 +2248,7 @@ private:
std::unique_ptr<TKqpCleanupCtx> CleanupCtx;
ui32 QueryId = 0;
TKikimrConfiguration::TPtr Config;
+ IDataProvider::TFillSettings FillSettings;
TLRUCache<TULID, TIntrusivePtr<TKqpTransactionContext>> ExplicitTransactions;
std::vector<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted;
ui64 EvictedTx = 0;