aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2024-11-12 13:48:46 +0300
committerGitHub <noreply@github.com>2024-11-12 13:48:46 +0300
commit667a30d8c2d6c3c312cb3f81db1e07790bc99483 (patch)
tree762ffef6611bee313adb32b61529be6bd5b0522a
parent6ed35ea072eb8a85a52af57f72f5dcc82169ca89 (diff)
downloadydb-667a30d8c2d6c3c312cb3f81db1e07790bc99483.tar.gz
remove result channel proxies (#11382)
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_query.cpp4
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp10
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp6
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h14
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h42
-rw-r--r--ydb/core/kqp/executer_actor/kqp_result_channel.cpp243
-rw-r--r--ydb/core/kqp/executer_actor/kqp_result_channel.h35
-rw-r--r--ydb/core/kqp/executer_actor/ya.make1
-rw-r--r--ydb/core/kqp/gateway/kqp_ic_gateway.cpp6
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp5
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp14
-rw-r--r--ydb/core/kqp/ut/scan/kqp_scan_ut.cpp3
-rw-r--r--ydb/core/kqp/ut/scan/kqp_split_ut.cpp9
-rw-r--r--ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp3
-rw-r--r--ydb/core/local_pgwire/pgwire_kqp_proxy.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp24
-rw-r--r--ydb/core/tx/datashard/datashard_ut_volatile.cpp3
-rw-r--r--ydb/core/viewer/viewer_query.h3
-rw-r--r--ydb/core/viewer/viewer_query_old.h3
-rw-r--r--ydb/tests/tools/kqprun/src/actors.cpp3
21 files changed, 47 insertions, 391 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp
index 004eddf501..1730d30a78 100644
--- a/ydb/core/grpc_services/query/rpc_execute_query.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp
@@ -31,10 +31,8 @@ struct TProducerState {
ui64 ChannelId = 0;
void SendAck(const NActors::TActorIdentity& actor) const {
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- resp->Record.SetSeqNo(*LastSeqNo);
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo, ChannelId);
resp->Record.SetFreeSpace(AckedFreeSpaceBytes);
- resp->Record.SetChannelId(ChannelId);
actor.Send(ActorId, resp.Release());
}
diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
index 222ee2d7dd..c37dc9b06e 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
@@ -266,12 +266,10 @@ private:
<< ", freeSpace: " << freeSpaceBytes
<< ", to: " << ExecuterActorId_);
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- resp->Record.SetSeqNo(*LastSeqNo_);
+ // scan query has single result set, so it's ok to put zero as channelId here.
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo_, 0);
resp->Record.SetFreeSpace(freeSpaceBytes);
-
ctx.Send(ExecuterActorId_, resp.Release());
-
AckedFreeSpaceBytes_ = freeSpaceBytes;
}
}
@@ -320,6 +318,7 @@ private:
}
void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) {
+
auto& record = ev->Get()->Record;
NYql::TIssues issues = ev->Get()->GetIssues();
@@ -355,8 +354,7 @@ private:
<< ", to: " << ev->Sender
<< ", queue: " << FlowControl_.QueueSize());
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetFreeSpace(freeSpaceBytes);
ctx.Send(ev->Sender, resp.Release());
diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
index 9c861b7b1b..bd699a5a50 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp
@@ -281,8 +281,7 @@ private:
<< ", to: " << ev->Sender
<< ", queue: " << FlowControl_.QueueSize());
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetFreeSpace(freeSpaceBytes);
ctx.Send(ev->Sender, resp.Release());
@@ -322,8 +321,7 @@ private:
<< ", freeSpace: " << freeSpaceBytes
<< ", to: " << GatewayRequestHandlerActorId_);
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- resp->Record.SetSeqNo(*LastSeqNo_);
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo_, 0);
resp->Record.SetFreeSpace(freeSpaceBytes);
ctx.Send(GatewayRequestHandlerActorId_, resp.Release());
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index bda00781bb..5a1d150498 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -2036,10 +2036,6 @@ private:
THashMap<ui64, TVector<ui64>> remoteComputeTasks; // shardId -> [task]
TVector<ui64> computeTasks;
- if (StreamResult) {
- InitializeChannelProxies();
- }
-
for (auto& task : TasksGraph.GetTasks()) {
auto& stageInfo = TasksGraph.GetStageInfo(task.StageId);
if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) {
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index fc127fa3f0..2ae6a86591 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -71,7 +71,19 @@ struct TEvKqpExecuter {
TKqpExecuterEvents::EvStreamData> {};
struct TEvStreamDataAck : public TEventPB<TEvStreamDataAck, NKikimrKqp::TEvExecuterStreamDataAck,
- TKqpExecuterEvents::EvStreamDataAck> {};
+ TKqpExecuterEvents::EvStreamDataAck>
+ {
+ friend class TEventPBBase;
+ explicit TEvStreamDataAck(ui64 seqno, ui64 channelId)
+ {
+ Record.SetSeqNo(seqno);
+ Record.SetChannelId(channelId);
+ }
+
+ private:
+ // using a little hack to hide default empty constructor
+ TEvStreamDataAck() = default;
+ };
// deprecated event, remove in the future releases.
struct TEvExecuterProgress : public TEventPB<TEvExecuterProgress, NKikimrKqp::TEvExecuterProgress,
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 29e118432b..bc6bf0708d 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -4,7 +4,6 @@
#include "kqp_executer_stats.h"
#include "kqp_planner.h"
#include "kqp_partition_helper.h"
-#include "kqp_result_channel.h"
#include "kqp_table_resolver.h"
#include <ydb/core/kqp/common/kqp_ru_calc.h>
@@ -1855,22 +1854,6 @@ protected:
return true;
}
- void InitializeChannelProxies() {
- // notice: forward all respones to executer if
- // trailing results are allowed.
- // temporary, will be removed in the next pr.
- if (Request.IsTrailingResultsAllowed())
- return;
-
- for(const auto& channel: TasksGraph.GetChannels()) {
- if (channel.DstTask) {
- continue;
- }
-
- CreateChannelProxy(channel);
- }
- }
-
const IKqpGateway::TKqpSnapshot& GetSnapshot() const {
return TasksGraph.GetMeta().Snapshot;
}
@@ -1879,31 +1862,6 @@ protected:
TasksGraph.GetMeta().SetSnapshot(step, txId);
}
- IActor* CreateChannelProxy(const NYql::NDq::TChannel& channel) {
- auto channelIt = ResultChannelProxies.find(channel.Id);
- if (channelIt != ResultChannelProxies.end()) {
- return channelIt->second;
- }
-
- YQL_ENSURE(channel.DstInputIndex < ResponseEv->ResultsSize());
- const auto& txResult = ResponseEv->TxResults[channel.DstInputIndex];
-
- IActor* proxy;
- if (txResult.IsStream && txResult.QueryResultIndex.Defined()) {
- proxy = CreateResultStreamChannelProxy(TxId, channel.Id, txResult.MkqlItemType,
- txResult.ColumnOrder, txResult.ColumnHints, *txResult.QueryResultIndex, Target, this->SelfId(), StatementResultIndex);
- } else {
- proxy = CreateResultDataChannelProxy(TxId, channel.Id, this->SelfId(),
- channel.DstInputIndex, ResponseEv.get());
- }
-
- this->RegisterWithSameMailbox(proxy);
- ResultChannelProxies.emplace(std::make_pair(channel.Id, proxy));
- TasksGraph.GetMeta().ResultChannelProxies.emplace(channel.Id, proxy->SelfId());
-
- return proxy;
- }
-
protected:
// Introduced separate method from `PassAway()` - to not get confused with expectations from other actors,
// that `PassAway()` should kill actor immediately.
diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
deleted file mode 100644
index 8555a40676..0000000000
--- a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
+++ /dev/null
@@ -1,243 +0,0 @@
-#include "kqp_result_channel.h"
-
-#include "kqp_executer.h"
-#include "kqp_executer_impl.h"
-#include "kqp_executer_stats.h"
-
-#include <ydb/core/base/appdata.h>
-#include <ydb/core/kqp/common/kqp.h>
-#include <ydb/core/kqp/runtime/kqp_transport.h>
-
-#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
-
-namespace NKikimr {
-namespace NKqp {
-namespace {
-
-struct TEvComputeChannelDataOOB {
- NYql::NDqProto::TEvComputeChannelData Proto;
- TRope Payload;
-
- size_t Size() const {
- return Proto.GetChannelData().GetData().GetRaw().size() + Payload.size();
- }
-
- ui32 RowCount() const {
- return Proto.GetChannelData().GetData().GetRows();
- }
-};
-
-class TResultCommonChannelProxy : public NActors::TActor<TResultCommonChannelProxy> {
-public:
- static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
- return NKikimrServices::TActivity::KQP_RESULT_CHANNEL_PROXY;
- }
-
- TResultCommonChannelProxy(ui64 txId, ui64 channelId, TActorId executer)
- : TActor(&TResultCommonChannelProxy::WorkState)
- , TxId(txId)
- , ChannelId(channelId)
- , Executer(executer) {}
-
-protected:
- virtual void SendResults(TEvComputeChannelDataOOB& computeData, TActorId sender) = 0;
-
-private:
- STATEFN(WorkState) {
- try {
- switch (ev->GetTypeRewrite()) {
- hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleWork);
- hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleWork);
- hFunc(TEvents::TEvPoison, HandlePoison);
- default: {
- InternalError(TStringBuilder() << "TxId: " << TxId << ", channelId: " << ChannelId
- << "Handle unexpected event " << ev->GetTypeRewrite());
- }
- }
- } catch (const yexception& ex) {
- InternalError(ex.what());
- } catch (const NKikimr::TMemoryLimitExceededException& ex) {
- InternalError("Memory limit exceeded exception", NYql::NDqProto::StatusIds::PRECONDITION_FAILED);
- }
- }
-
- void HandleWork(NYql::NDq::TEvDqCompute::TEvChannelData::TPtr& ev) {
- TEvComputeChannelDataOOB record;
- record.Proto = std::move(ev->Get()->Record);
- if (record.Proto.GetChannelData().GetData().HasPayloadId()) {
- record.Payload = ev->Get()->GetPayload(record.Proto.GetChannelData().GetData().GetPayloadId());
- }
-
- const auto& channelData = record.Proto.GetChannelData();
-
- ComputeActor = ev->Sender;
-
- LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ", got result"
- << ", channelId: " << channelData.GetChannelId()
- << ", seqNo: " << record.Proto.GetSeqNo()
- << ", from: " << ev->Sender);
-
- SendResults(record, ev->Sender);
- }
-
- void HandleWork(TEvKqpExecuter::TEvStreamDataAck::TPtr& ev) {
- ui64 seqNo = ev->Get()->Record.GetSeqNo();
- i64 freeSpace = ev->Get()->Record.GetFreeSpace();
-
- LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId
- << ", send ack to channelId: " << ChannelId
- << ", seqNo: " << seqNo
- << ", enough: " << ev->Get()->Record.GetEnough()
- << ", freeSpace: " << freeSpace
- << ", to: " << ComputeActor);
-
- auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>();
- ackEv->Record.SetSeqNo(seqNo);
- ackEv->Record.SetChannelId(ChannelId);
- ackEv->Record.SetFreeSpace(freeSpace);
- ackEv->Record.SetFinish(ev->Get()->Record.GetEnough());
- Send(ComputeActor, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ ChannelId);
- }
-
- void InternalError(const TString& msg, const NYql::NDqProto::StatusIds_StatusCode& code = NYql::NDqProto::StatusIds::INTERNAL_ERROR) {
- LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, msg);
-
- auto evAbort = MakeHolder<TEvKqp::TEvAbortExecution>(code, msg);
- Send(Executer, evAbort.Release());
-
- Become(&TResultCommonChannelProxy::DeadState);
- }
-
-private:
- STATEFN(DeadState) {
- try {
- switch (ev->GetTypeRewrite()) {
- hFunc(TEvents::TEvPoison, HandlePoison);
- }
-
- } catch(const yexception& ex) {
- InternalError(ex.what());
- }
- }
-
-private:
- void HandlePoison(TEvents::TEvPoison::TPtr&) {
- LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId
- << ", result channelId: " << ChannelId << ", pass away");
- PassAway();
- }
-
-private:
- const ui64 TxId;
- const ui64 ChannelId;
- const NActors::TActorId Executer;
- NActors::TActorId ComputeActor;
-};
-
-class TResultStreamChannelProxy : public TResultCommonChannelProxy {
-public:
- TResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
- const TVector<ui32>* columnOrder, const TVector<TString>* columnHints, ui32 queryResultIndex, TActorId target,
- TActorId executer, size_t statementResultIndex)
- : TResultCommonChannelProxy(txId, channelId, executer)
- , ColumnOrder(columnOrder)
- , ColumnHints(columnHints)
- , ItemType(itemType)
- , QueryResultIndex(queryResultIndex)
- , Target(target)
- , StatementResultIndex(statementResultIndex) {}
-
-private:
- void SendResults(TEvComputeChannelDataOOB& computeData, TActorId sender) override {
- Y_UNUSED(sender);
-
- TVector<NYql::NDq::TDqSerializedBatch> batches(1);
- auto& batch = batches.front();
-
- batch.Proto = std::move(*computeData.Proto.MutableChannelData()->MutableData());
- batch.Payload = std::move(computeData.Payload);
-
- TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
- auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), ItemType, ColumnOrder, ColumnHints);
-
- auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>();
- streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo());
- streamEv->Record.SetQueryResultIndex(QueryResultIndex + StatementResultIndex);
- streamEv->Record.MutableResultSet()->Swap(&resultSet);
-
- LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER,
- "Send TEvStreamData to " << Target << ", seqNo: " << streamEv->Record.GetSeqNo()
- << ", nRows: " << batch.RowCount() );
-
- Send(Target, streamEv.Release());
- }
-
-private:
- const TVector<ui32>* ColumnOrder;
- const TVector<TString>* ColumnHints;
- NKikimr::NMiniKQL::TType* ItemType;
- ui32 QueryResultIndex = 0;
- const NActors::TActorId Target;
- size_t StatementResultIndex;
-};
-
-class TResultDataChannelProxy : public TResultCommonChannelProxy {
-public:
- TResultDataChannelProxy(ui64 txId, ui64 channelId, TActorId executer,
- ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* resultReceiver)
- : TResultCommonChannelProxy(txId, channelId, executer)
- , InputIndex(inputIndex)
- , ResultReceiver(resultReceiver) {}
-
-private:
- virtual void SendResults(TEvComputeChannelDataOOB& computeData, TActorId sender) {
- NYql::NDq::TDqSerializedBatch batch;
- batch.Proto = std::move(*computeData.Proto.MutableChannelData()->MutableData());
- batch.Payload = std::move(computeData.Payload);
-
- auto channelId = computeData.Proto.GetChannelData().GetChannelId();
-
- ResultReceiver->TakeResult(InputIndex, std::move(batch));
-
- auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>();
-
- ackEv->Record.SetSeqNo(computeData.Proto.GetSeqNo());
- ackEv->Record.SetChannelId(channelId);
- ackEv->Record.SetFreeSpace(1_MB);
-
- Send(sender, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId);
- }
-
-private:
- ui32 InputIndex;
- TEvKqpExecuter::TEvTxResponse* ResultReceiver;
-};
-
-} // anonymous namespace end
-
-NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
- const TVector<ui32>* columnOrder, const TVector<TString>* columnHints, ui32 queryResultIndex, TActorId target,
- TActorId executer, ui32 statementResultIndex)
-{
- LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER,
- "CreateResultStreamChannelProxy: TxId: " << txId <<
- ", channelId: " << channelId
- );
-
- return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, columnHints, queryResultIndex, target,
- executer, statementResultIndex);
-}
-
-NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, TActorId executer,
- ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* resultsReceiver)
-{
- LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER,
- "CreateResultDataChannelProxy: TxId: " << txId <<
- ", channelId: " << channelId
- );
-
- return new TResultDataChannelProxy(txId, channelId, executer, inputIndex, resultsReceiver);
-}
-
-} // namespace NKqp
-} // namespace NKikimr
diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.h b/ydb/core/kqp/executer_actor/kqp_result_channel.h
deleted file mode 100644
index 57b58fc855..0000000000
--- a/ydb/core/kqp/executer_actor/kqp_result_channel.h
+++ /dev/null
@@ -1,35 +0,0 @@
-#pragma once
-
-#include "kqp_tasks_graph.h"
-#include "kqp_executer.h"
-
-#include <ydb/library/actors/core/actor.h>
-
-namespace NYql {
-
-class TTypeAnnotationNode;
-
-namespace NDqProto {
-
-class TData;
-
-} // namespace NDqProto
-} // namespace NYql
-
-namespace NKikimrMiniKQL {
-class TType;
-} // namespace NKikimrMiniKQL
-
-namespace NKikimr::NKqp {
-
-struct TQueryExecutionStats;
-struct TKqpExecuterTxResult;
-
-NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType,
- const TVector<ui32>* columnOrder, const TVector<TString>* columnHints, ui32 queryResultIndex, NActors::TActorId target,
- NActors::TActorId executer, ui32 statementResultIndex);
-
-NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId,
- NActors::TActorId executer, ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* receiver);
-
-} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/ya.make b/ydb/core/kqp/executer_actor/ya.make
index 8b6a6f5119..0cd7a98ba1 100644
--- a/ydb/core/kqp/executer_actor/ya.make
+++ b/ydb/core/kqp/executer_actor/ya.make
@@ -11,7 +11,6 @@ SRCS(
kqp_partition_helper.cpp
kqp_planner.cpp
kqp_planner_strategy.cpp
- kqp_result_channel.cpp
kqp_table_resolver.cpp
kqp_tasks_graph.cpp
kqp_tasks_validate.cpp
diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
index d0c1fa51f2..cc4b12ad79 100644
--- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp
@@ -202,9 +202,8 @@ public:
ResultSet.set_truncated(true);
}
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetEnough(truncated);
- resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
resp->Record.SetFreeSpace(ResultSetBytesLimit);
ctx.Send(ev->Sender, resp.Release());
}
@@ -473,8 +472,7 @@ public:
}
}
- auto response = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ auto response = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
response->Record.SetFreeSpace(SizeLimit && SizeLimit < std::numeric_limits<i64>::max() ? SizeLimit : std::numeric_limits<i64>::max());
Send(ev->Sender, response.Release());
}
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
index 66cc4b2098..a10a257201 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
@@ -44,11 +44,8 @@ struct TProducerState {
ui64 ChannelId = 0;
void SendAck(const NActors::TActorIdentity& actor) const {
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- resp->Record.SetSeqNo(*LastSeqNo);
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo, ChannelId);
resp->Record.SetFreeSpace(AckedFreeSpaceBytes);
- resp->Record.SetChannelId(ChannelId);
-
actor.Send(ActorId, resp.Release());
}
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index ea70ec1389..29a5db2c73 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -1403,9 +1403,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_ASSERT(record.GetResultSet().rows().size() == 1);
result = 1;
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
@@ -1471,9 +1470,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
@@ -1534,9 +1532,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_ASSERT(record.GetResultSet().rows().size() == 0);
hasResult = true;
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
@@ -1605,9 +1602,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
@@ -2209,7 +2205,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto result = CollectStreamResult(it);
auto ast = result.QueryStats->Getquery_ast();
-
+
pushdown = ast.find("KqpOlapFilter") != std::string::npos;
} else {
// Error means that predicate not pushed down
diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
index d8c2655ce9..b14c1268a9 100644
--- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
@@ -2414,9 +2414,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record;
Y_ASSERT(record.GetResultSet().rows().size() == 0);
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(record.GetSeqNo());
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return true;
}
diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
index de741398ad..995f214d7d 100644
--- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
@@ -326,9 +326,8 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
collectedKeys->push_back(row.items(0).uint64_value());
}
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(record.GetSeqNo());
runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return true;
}
@@ -401,7 +400,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
} else if (testActorType == ETestActorType::StreamLookup) {
InterceptStreamLookupActorPipeCache(MakePipePerNodeCacheID(false));
}
-
+
if (providedServer) {
Server = providedServer;
} else {
@@ -890,7 +889,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
);
shim->ReadsReceived.WaitI();
-
+
UNIT_ASSERT_EQUAL(shards.size(), 1);
auto undelivery = MakeHolder<TEvPipeCache::TEvDeliveryProblem>(shards[0], true);
@@ -937,7 +936,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
);
shim->ReadsReceived.WaitI();
-
+
UNIT_ASSERT_EQUAL(shards.size(), 1);
auto undelivery = MakeHolder<TEvPipeCache::TEvDeliveryProblem>(shards[0], true);
diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp
index 5db8d66d46..c1e6d39d39 100644
--- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp
+++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp
@@ -86,8 +86,7 @@ public:
SendNotification<TEvQueryRunner::TEvExecutionStarted>();
}
- auto response = std::make_unique<TEvKqpExecuter::TEvStreamDataAck>();
- response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ auto response = std::make_unique<TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
response->Record.SetFreeSpace(std::numeric_limits<i64>::max());
auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();
diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
index 89057f4815..5d2c5fa083 100644
--- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
+++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp
@@ -273,8 +273,7 @@ protected:
TBase::Send(EventRequest_->Sender, response.release(), 0, EventRequest_->Cookie);
BLOG_D(this->SelfId() << " Send stream data ack to " << ev->Sender);
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
resp->Record.SetFreeSpace(std::numeric_limits<i64>::max());
TBase::Send(ev->Sender, resp.Release());
}
diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
index fd30fb8424..2c22969358 100644
--- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
@@ -117,9 +117,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
@@ -214,9 +213,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
@@ -317,9 +315,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
@@ -435,9 +432,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
@@ -565,9 +561,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
@@ -693,9 +688,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
}
}
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
@@ -803,9 +797,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(record.GetSeqNo());
resp->Record.SetFreeSpace(100);
ctx.Send(ev->Sender, resp.Release());
break;
@@ -944,9 +937,8 @@ Y_UNIT_TEST_SUITE(KqpScan) {
Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
result = record.GetResultSet().rows().at(0).items().at(0).int64_value();
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId());
resp->Record.SetEnough(false);
- resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
resp->Record.SetFreeSpace(100);
runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release()));
return TTestActorRuntime::EEventAction::DROP;
diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
index 4325b7cdc7..f5af54a545 100644
--- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp
@@ -1307,8 +1307,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
if (msg->Record.GetResultSet().rows().size()) {
observedResults.emplace_back(FormatResult(msg->Record.GetResultSet()));
}
- auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- resp->Record.SetSeqNo(msg->Record.GetSeqNo());
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(msg->Record.GetSeqNo(), msg->Record.GetChannelId());
resp->Record.SetFreeSpace(1);
ctx.Send(ev->Sender, resp.Release());
break;
diff --git a/ydb/core/viewer/viewer_query.h b/ydb/core/viewer/viewer_query.h
index 3278819cd2..9bf210e5db 100644
--- a/ydb/core/viewer/viewer_query.h
+++ b/ydb/core/viewer/viewer_query.h
@@ -572,8 +572,7 @@ private:
ResultSets[data.GetQueryResultIndex()].emplace_back() = std::move(*data.MutableResultSet());
}
- THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- ack->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
if (TotalRows >= LimitRows) {
ack->Record.SetEnough(true);
}
diff --git a/ydb/core/viewer/viewer_query_old.h b/ydb/core/viewer/viewer_query_old.h
index abeeeb4acd..ff592792d0 100644
--- a/ydb/core/viewer/viewer_query_old.h
+++ b/ydb/core/viewer/viewer_query_old.h
@@ -435,8 +435,7 @@ private:
ResultSets.emplace_back();
ResultSets.back() = std::move(data.GetResultSet());
- THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- ack->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
Send(ev->Sender, ack.Release());
}
diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp
index 225271772d..66b28b1093 100644
--- a/ydb/tests/tools/kqprun/src/actors.cpp
+++ b/ydb/tests/tools/kqprun/src/actors.cpp
@@ -42,8 +42,7 @@ public:
)
void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) {
- auto response = MakeHolder<NKikimr::NKqp::TEvKqpExecuter::TEvStreamDataAck>();
- response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ auto response = MakeHolder<NKikimr::NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId());
response->Record.SetFreeSpace(ResultSizeLimit_);
auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex();