diff options
author | va-kuznecov <va-kuznecov@ydb.tech> | 2022-09-19 17:04:16 +0300 |
---|---|---|
committer | va-kuznecov <va-kuznecov@ydb.tech> | 2022-09-19 17:04:16 +0300 |
commit | 9b15b3d3735d7d14e50cd650ec66674b7ec221ce (patch) | |
tree | f514d026df4c372f98acfba1982327bb214ca0ce | |
parent | 168273e822f2d73c568d8c8ce32220abe497d60b (diff) | |
download | ydb-9b15b3d3735d7d14e50cd650ec66674b7ec221ce.tar.gz |
Fix truncated results in session_actor
-rw-r--r-- | ydb/core/kqp/kqp_session_actor.cpp | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/ydb/core/kqp/kqp_session_actor.cpp b/ydb/core/kqp/kqp_session_actor.cpp index c37fab6f9bb..9cc4a34ca42 100644 --- a/ydb/core/kqp/kqp_session_actor.cpp +++ b/ydb/core/kqp/kqp_session_actor.cpp @@ -160,6 +160,11 @@ public: RequestCounters->Counters = Counters; RequestCounters->DbCounters = Settings.DbCounters; RequestCounters->TxProxyMon = MakeIntrusive<NTxProxy::TTxProxyMon>(AppData()->Counters); + + FillSettings.AllResultsBytesLimit = Nothing(); + FillSettings.RowsLimitPerWrite = Config->_ResultRowsLimit.Get().GetRef(); + FillSettings.Format = IDataProvider::EResultFormat::Custom; + FillSettings.FormatDetails = TString(KikimrMkqlProtoFormat); } void Bootstrap() { @@ -1187,7 +1192,7 @@ public: auto* response = ev->Get()->Record.MutableResponse(); auto requestInfo = TKqpRequestInfo(QueryState->TraceId, SessionId); LOG_D(SelfId() << " " << requestInfo << " TEvTxResponse, CurrentTx: " << QueryState->CurrentTx - << " response: " << response->DebugString()); + << " response.status: " << response->GetStatus() << " results.size: " << response->GetResult().ResultsSize()); ExecuterId = TActorId{}; if (response->GetStatus() != Ydb::StatusIds::SUCCESS) { @@ -1528,7 +1533,8 @@ public: if (QueryState->PreparedQuery) { auto& phyQuery = QueryState->PreparedQuery->GetPhysicalQuery(); - for (auto& rb : phyQuery.GetResultBindings()) { + for (size_t i = 0; i < phyQuery.ResultBindingsSize(); ++i) { + auto& rb = phyQuery.GetResultBindings(i); auto txIndex = rb.GetTxResultBinding().GetTxIndex(); auto resultIndex = rb.GetTxResultBinding().GetResultIndex(); @@ -1536,10 +1542,19 @@ public: YQL_ENSURE(txIndex < txResults.size()); YQL_ENSURE(resultIndex < txResults[txIndex].size()); - IDataProvider::TFillSettings fillSettings; - //TODO: shoud it be taken from PreparedQuery->GetResults().GetRowsLimit() ? - fillSettings.RowsLimitPerWrite = Config->_ResultRowsLimit.Get().GetRef(); - auto* protoRes = KikimrResultToProto(txResults[txIndex][resultIndex], {}, fillSettings, arena.get()); + std::optional<IDataProvider::TFillSettings> fillSettings; + if (QueryState->PreparedQuery->ResultsSize()) { + YQL_ENSURE(phyQuery.ResultBindingsSize() == QueryState->PreparedQuery->ResultsSize(), "" + << phyQuery.ResultBindingsSize() << " != " << QueryState->PreparedQuery->ResultsSize()); + const auto& result = QueryState->PreparedQuery->GetResults(i); + if (result.GetRowsLimit()) { + fillSettings = FillSettings; + fillSettings->RowsLimitPerWrite = result.GetRowsLimit(); + } + } + + auto* protoRes = KikimrResultToProto(txResults[txIndex][resultIndex], {}, + fillSettings.value_or(FillSettings), arena.get()); response->AddResults()->Swap(protoRes); } } @@ -2233,6 +2248,7 @@ private: std::unique_ptr<TKqpCleanupCtx> CleanupCtx; ui32 QueryId = 0; TKikimrConfiguration::TPtr Config; + IDataProvider::TFillSettings FillSettings; TLRUCache<TULID, TIntrusivePtr<TKqpTransactionContext>> ExplicitTransactions; std::vector<TIntrusivePtr<TKqpTransactionContext>> TransactionsToBeAborted; ui64 EvictedTx = 0; |