diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2024-11-12 13:48:46 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-12 13:48:46 +0300 |
commit | 667a30d8c2d6c3c312cb3f81db1e07790bc99483 (patch) | |
tree | 762ffef6611bee313adb32b61529be6bd5b0522a | |
parent | 6ed35ea072eb8a85a52af57f72f5dcc82169ca89 (diff) | |
download | ydb-667a30d8c2d6c3c312cb3f81db1e07790bc99483.tar.gz |
remove result channel proxies (#11382)
21 files changed, 47 insertions, 391 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 004eddf501..1730d30a78 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -31,10 +31,8 @@ struct TProducerState { ui64 ChannelId = 0; void SendAck(const NActors::TActorIdentity& actor) const { - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - resp->Record.SetSeqNo(*LastSeqNo); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo, ChannelId); resp->Record.SetFreeSpace(AckedFreeSpaceBytes); - resp->Record.SetChannelId(ChannelId); actor.Send(ActorId, resp.Release()); } diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index 222ee2d7dd..c37dc9b06e 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -266,12 +266,10 @@ private: << ", freeSpace: " << freeSpaceBytes << ", to: " << ExecuterActorId_); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - resp->Record.SetSeqNo(*LastSeqNo_); + // scan query has single result set, so it's ok to put zero as channelId here. + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo_, 0); resp->Record.SetFreeSpace(freeSpaceBytes); - ctx.Send(ExecuterActorId_, resp.Release()); - AckedFreeSpaceBytes_ = freeSpaceBytes; } } @@ -320,6 +318,7 @@ private: } void Handle(NKqp::TEvKqp::TEvAbortExecution::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record; NYql::TIssues issues = ev->Get()->GetIssues(); @@ -355,8 +354,7 @@ private: << ", to: " << ev->Sender << ", queue: " << FlowControl_.QueueSize()); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(ev->Sender, resp.Release()); diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index 9c861b7b1b..bd699a5a50 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -281,8 +281,7 @@ private: << ", to: " << ev->Sender << ", queue: " << FlowControl_.QueueSize()); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(ev->Sender, resp.Release()); @@ -322,8 +321,7 @@ private: << ", freeSpace: " << freeSpaceBytes << ", to: " << GatewayRequestHandlerActorId_); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - resp->Record.SetSeqNo(*LastSeqNo_); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo_, 0); resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(GatewayRequestHandlerActorId_, resp.Release()); diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index bda00781bb..5a1d150498 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -2036,10 +2036,6 @@ private: THashMap<ui64, TVector<ui64>> remoteComputeTasks; // shardId -> [task] TVector<ui64> computeTasks; - if (StreamResult) { - InitializeChannelProxies(); - } - for (auto& task : TasksGraph.GetTasks()) { auto& stageInfo = TasksGraph.GetStageInfo(task.StageId); if (task.Meta.ShardId && (task.Meta.Reads || task.Meta.Writes)) { diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index fc127fa3f0..2ae6a86591 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -71,7 +71,19 @@ struct TEvKqpExecuter { TKqpExecuterEvents::EvStreamData> {}; struct TEvStreamDataAck : public TEventPB<TEvStreamDataAck, NKikimrKqp::TEvExecuterStreamDataAck, - TKqpExecuterEvents::EvStreamDataAck> {}; + TKqpExecuterEvents::EvStreamDataAck> + { + friend class TEventPBBase; + explicit TEvStreamDataAck(ui64 seqno, ui64 channelId) + { + Record.SetSeqNo(seqno); + Record.SetChannelId(channelId); + } + + private: + // using a little hack to hide default empty constructor + TEvStreamDataAck() = default; + }; // deprecated event, remove in the future releases. struct TEvExecuterProgress : public TEventPB<TEvExecuterProgress, NKikimrKqp::TEvExecuterProgress, diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 29e118432b..bc6bf0708d 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -4,7 +4,6 @@ #include "kqp_executer_stats.h" #include "kqp_planner.h" #include "kqp_partition_helper.h" -#include "kqp_result_channel.h" #include "kqp_table_resolver.h" #include <ydb/core/kqp/common/kqp_ru_calc.h> @@ -1855,22 +1854,6 @@ protected: return true; } - void InitializeChannelProxies() { - // notice: forward all respones to executer if - // trailing results are allowed. - // temporary, will be removed in the next pr. - if (Request.IsTrailingResultsAllowed()) - return; - - for(const auto& channel: TasksGraph.GetChannels()) { - if (channel.DstTask) { - continue; - } - - CreateChannelProxy(channel); - } - } - const IKqpGateway::TKqpSnapshot& GetSnapshot() const { return TasksGraph.GetMeta().Snapshot; } @@ -1879,31 +1862,6 @@ protected: TasksGraph.GetMeta().SetSnapshot(step, txId); } - IActor* CreateChannelProxy(const NYql::NDq::TChannel& channel) { - auto channelIt = ResultChannelProxies.find(channel.Id); - if (channelIt != ResultChannelProxies.end()) { - return channelIt->second; - } - - YQL_ENSURE(channel.DstInputIndex < ResponseEv->ResultsSize()); - const auto& txResult = ResponseEv->TxResults[channel.DstInputIndex]; - - IActor* proxy; - if (txResult.IsStream && txResult.QueryResultIndex.Defined()) { - proxy = CreateResultStreamChannelProxy(TxId, channel.Id, txResult.MkqlItemType, - txResult.ColumnOrder, txResult.ColumnHints, *txResult.QueryResultIndex, Target, this->SelfId(), StatementResultIndex); - } else { - proxy = CreateResultDataChannelProxy(TxId, channel.Id, this->SelfId(), - channel.DstInputIndex, ResponseEv.get()); - } - - this->RegisterWithSameMailbox(proxy); - ResultChannelProxies.emplace(std::make_pair(channel.Id, proxy)); - TasksGraph.GetMeta().ResultChannelProxies.emplace(channel.Id, proxy->SelfId()); - - return proxy; - } - protected: // Introduced separate method from `PassAway()` - to not get confused with expectations from other actors, // that `PassAway()` should kill actor immediately. diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp deleted file mode 100644 index 8555a40676..0000000000 --- a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp +++ /dev/null @@ -1,243 +0,0 @@ -#include "kqp_result_channel.h" - -#include "kqp_executer.h" -#include "kqp_executer_impl.h" -#include "kqp_executer_stats.h" - -#include <ydb/core/base/appdata.h> -#include <ydb/core/kqp/common/kqp.h> -#include <ydb/core/kqp/runtime/kqp_transport.h> - -#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> - -namespace NKikimr { -namespace NKqp { -namespace { - -struct TEvComputeChannelDataOOB { - NYql::NDqProto::TEvComputeChannelData Proto; - TRope Payload; - - size_t Size() const { - return Proto.GetChannelData().GetData().GetRaw().size() + Payload.size(); - } - - ui32 RowCount() const { - return Proto.GetChannelData().GetData().GetRows(); - } -}; - -class TResultCommonChannelProxy : public NActors::TActor<TResultCommonChannelProxy> { -public: - static constexpr NKikimrServices::TActivity::EType ActorActivityType() { - return NKikimrServices::TActivity::KQP_RESULT_CHANNEL_PROXY; - } - - TResultCommonChannelProxy(ui64 txId, ui64 channelId, TActorId executer) - : TActor(&TResultCommonChannelProxy::WorkState) - , TxId(txId) - , ChannelId(channelId) - , Executer(executer) {} - -protected: - virtual void SendResults(TEvComputeChannelDataOOB& computeData, TActorId sender) = 0; - -private: - STATEFN(WorkState) { - try { - switch (ev->GetTypeRewrite()) { - hFunc(NYql::NDq::TEvDqCompute::TEvChannelData, HandleWork); - hFunc(TEvKqpExecuter::TEvStreamDataAck, HandleWork); - hFunc(TEvents::TEvPoison, HandlePoison); - default: { - InternalError(TStringBuilder() << "TxId: " << TxId << ", channelId: " << ChannelId - << "Handle unexpected event " << ev->GetTypeRewrite()); - } - } - } catch (const yexception& ex) { - InternalError(ex.what()); - } catch (const NKikimr::TMemoryLimitExceededException& ex) { - InternalError("Memory limit exceeded exception", NYql::NDqProto::StatusIds::PRECONDITION_FAILED); - } - } - - void HandleWork(NYql::NDq::TEvDqCompute::TEvChannelData::TPtr& ev) { - TEvComputeChannelDataOOB record; - record.Proto = std::move(ev->Get()->Record); - if (record.Proto.GetChannelData().GetData().HasPayloadId()) { - record.Payload = ev->Get()->GetPayload(record.Proto.GetChannelData().GetData().GetPayloadId()); - } - - const auto& channelData = record.Proto.GetChannelData(); - - ComputeActor = ev->Sender; - - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId << ", got result" - << ", channelId: " << channelData.GetChannelId() - << ", seqNo: " << record.Proto.GetSeqNo() - << ", from: " << ev->Sender); - - SendResults(record, ev->Sender); - } - - void HandleWork(TEvKqpExecuter::TEvStreamDataAck::TPtr& ev) { - ui64 seqNo = ev->Get()->Record.GetSeqNo(); - i64 freeSpace = ev->Get()->Record.GetFreeSpace(); - - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId - << ", send ack to channelId: " << ChannelId - << ", seqNo: " << seqNo - << ", enough: " << ev->Get()->Record.GetEnough() - << ", freeSpace: " << freeSpace - << ", to: " << ComputeActor); - - auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>(); - ackEv->Record.SetSeqNo(seqNo); - ackEv->Record.SetChannelId(ChannelId); - ackEv->Record.SetFreeSpace(freeSpace); - ackEv->Record.SetFinish(ev->Get()->Record.GetEnough()); - Send(ComputeActor, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ ChannelId); - } - - void InternalError(const TString& msg, const NYql::NDqProto::StatusIds_StatusCode& code = NYql::NDqProto::StatusIds::INTERNAL_ERROR) { - LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, msg); - - auto evAbort = MakeHolder<TEvKqp::TEvAbortExecution>(code, msg); - Send(Executer, evAbort.Release()); - - Become(&TResultCommonChannelProxy::DeadState); - } - -private: - STATEFN(DeadState) { - try { - switch (ev->GetTypeRewrite()) { - hFunc(TEvents::TEvPoison, HandlePoison); - } - - } catch(const yexception& ex) { - InternalError(ex.what()); - } - } - -private: - void HandlePoison(TEvents::TEvPoison::TPtr&) { - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, "TxId: " << TxId - << ", result channelId: " << ChannelId << ", pass away"); - PassAway(); - } - -private: - const ui64 TxId; - const ui64 ChannelId; - const NActors::TActorId Executer; - NActors::TActorId ComputeActor; -}; - -class TResultStreamChannelProxy : public TResultCommonChannelProxy { -public: - TResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, - const TVector<ui32>* columnOrder, const TVector<TString>* columnHints, ui32 queryResultIndex, TActorId target, - TActorId executer, size_t statementResultIndex) - : TResultCommonChannelProxy(txId, channelId, executer) - , ColumnOrder(columnOrder) - , ColumnHints(columnHints) - , ItemType(itemType) - , QueryResultIndex(queryResultIndex) - , Target(target) - , StatementResultIndex(statementResultIndex) {} - -private: - void SendResults(TEvComputeChannelDataOOB& computeData, TActorId sender) override { - Y_UNUSED(sender); - - TVector<NYql::NDq::TDqSerializedBatch> batches(1); - auto& batch = batches.front(); - - batch.Proto = std::move(*computeData.Proto.MutableChannelData()->MutableData()); - batch.Payload = std::move(computeData.Payload); - - TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry}; - auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), ItemType, ColumnOrder, ColumnHints); - - auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>(); - streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo()); - streamEv->Record.SetQueryResultIndex(QueryResultIndex + StatementResultIndex); - streamEv->Record.MutableResultSet()->Swap(&resultSet); - - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, - "Send TEvStreamData to " << Target << ", seqNo: " << streamEv->Record.GetSeqNo() - << ", nRows: " << batch.RowCount() ); - - Send(Target, streamEv.Release()); - } - -private: - const TVector<ui32>* ColumnOrder; - const TVector<TString>* ColumnHints; - NKikimr::NMiniKQL::TType* ItemType; - ui32 QueryResultIndex = 0; - const NActors::TActorId Target; - size_t StatementResultIndex; -}; - -class TResultDataChannelProxy : public TResultCommonChannelProxy { -public: - TResultDataChannelProxy(ui64 txId, ui64 channelId, TActorId executer, - ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* resultReceiver) - : TResultCommonChannelProxy(txId, channelId, executer) - , InputIndex(inputIndex) - , ResultReceiver(resultReceiver) {} - -private: - virtual void SendResults(TEvComputeChannelDataOOB& computeData, TActorId sender) { - NYql::NDq::TDqSerializedBatch batch; - batch.Proto = std::move(*computeData.Proto.MutableChannelData()->MutableData()); - batch.Payload = std::move(computeData.Payload); - - auto channelId = computeData.Proto.GetChannelData().GetChannelId(); - - ResultReceiver->TakeResult(InputIndex, std::move(batch)); - - auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>(); - - ackEv->Record.SetSeqNo(computeData.Proto.GetSeqNo()); - ackEv->Record.SetChannelId(channelId); - ackEv->Record.SetFreeSpace(1_MB); - - Send(sender, ackEv.Release(), /* TODO: undelivery */ 0, /* cookie */ channelId); - } - -private: - ui32 InputIndex; - TEvKqpExecuter::TEvTxResponse* ResultReceiver; -}; - -} // anonymous namespace end - -NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, - const TVector<ui32>* columnOrder, const TVector<TString>* columnHints, ui32 queryResultIndex, TActorId target, - TActorId executer, ui32 statementResultIndex) -{ - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, - "CreateResultStreamChannelProxy: TxId: " << txId << - ", channelId: " << channelId - ); - - return new TResultStreamChannelProxy(txId, channelId, itemType, columnOrder, columnHints, queryResultIndex, target, - executer, statementResultIndex); -} - -NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, TActorId executer, - ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* resultsReceiver) -{ - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_EXECUTER, - "CreateResultDataChannelProxy: TxId: " << txId << - ", channelId: " << channelId - ); - - return new TResultDataChannelProxy(txId, channelId, executer, inputIndex, resultsReceiver); -} - -} // namespace NKqp -} // namespace NKikimr diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.h b/ydb/core/kqp/executer_actor/kqp_result_channel.h deleted file mode 100644 index 57b58fc855..0000000000 --- a/ydb/core/kqp/executer_actor/kqp_result_channel.h +++ /dev/null @@ -1,35 +0,0 @@ -#pragma once - -#include "kqp_tasks_graph.h" -#include "kqp_executer.h" - -#include <ydb/library/actors/core/actor.h> - -namespace NYql { - -class TTypeAnnotationNode; - -namespace NDqProto { - -class TData; - -} // namespace NDqProto -} // namespace NYql - -namespace NKikimrMiniKQL { -class TType; -} // namespace NKikimrMiniKQL - -namespace NKikimr::NKqp { - -struct TQueryExecutionStats; -struct TKqpExecuterTxResult; - -NActors::IActor* CreateResultStreamChannelProxy(ui64 txId, ui64 channelId, NKikimr::NMiniKQL::TType* itemType, - const TVector<ui32>* columnOrder, const TVector<TString>* columnHints, ui32 queryResultIndex, NActors::TActorId target, - NActors::TActorId executer, ui32 statementResultIndex); - -NActors::IActor* CreateResultDataChannelProxy(ui64 txId, ui64 channelId, - NActors::TActorId executer, ui32 inputIndex, TEvKqpExecuter::TEvTxResponse* receiver); - -} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/ya.make b/ydb/core/kqp/executer_actor/ya.make index 8b6a6f5119..0cd7a98ba1 100644 --- a/ydb/core/kqp/executer_actor/ya.make +++ b/ydb/core/kqp/executer_actor/ya.make @@ -11,7 +11,6 @@ SRCS( kqp_partition_helper.cpp kqp_planner.cpp kqp_planner_strategy.cpp - kqp_result_channel.cpp kqp_table_resolver.cpp kqp_tasks_graph.cpp kqp_tasks_validate.cpp diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index d0c1fa51f2..cc4b12ad79 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -202,9 +202,8 @@ public: ResultSet.set_truncated(true); } - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); resp->Record.SetEnough(truncated); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); resp->Record.SetFreeSpace(ResultSetBytesLimit); ctx.Send(ev->Sender, resp.Release()); } @@ -473,8 +472,7 @@ public: } } - auto response = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto response = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); response->Record.SetFreeSpace(SizeLimit && SizeLimit < std::numeric_limits<i64>::max() ? SizeLimit : std::numeric_limits<i64>::max()); Send(ev->Sender, response.Release()); } diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index 66cc4b2098..a10a257201 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -44,11 +44,8 @@ struct TProducerState { ui64 ChannelId = 0; void SendAck(const NActors::TActorIdentity& actor) const { - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - resp->Record.SetSeqNo(*LastSeqNo); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(*LastSeqNo, ChannelId); resp->Record.SetFreeSpace(AckedFreeSpaceBytes); - resp->Record.SetChannelId(ChannelId); - actor.Send(ActorId, resp.Release()); } diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index ea70ec1389..29a5db2c73 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -1403,9 +1403,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ASSERT(record.GetResultSet().rows().size() == 1); result = 1; - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -1471,9 +1470,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -1534,9 +1532,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ASSERT(record.GetResultSet().rows().size() == 0); hasResult = true; - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -1605,9 +1602,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -2209,7 +2205,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto result = CollectStreamResult(it); auto ast = result.QueryStats->Getquery_ast(); - + pushdown = ast.find("KqpOlapFilter") != std::string::npos; } else { // Error means that predicate not pushed down diff --git a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp index d8c2655ce9..b14c1268a9 100644 --- a/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp @@ -2414,9 +2414,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record; Y_ASSERT(record.GetResultSet().rows().size() == 0); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(record.GetSeqNo()); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return true; } diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp index de741398ad..995f214d7d 100644 --- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp @@ -326,9 +326,8 @@ Y_UNIT_TEST_SUITE(KqpSplit) { collectedKeys->push_back(row.items(0).uint64_value()); } - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(record.GetSeqNo()); runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); return true; } @@ -401,7 +400,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) { } else if (testActorType == ETestActorType::StreamLookup) { InterceptStreamLookupActorPipeCache(MakePipePerNodeCacheID(false)); } - + if (providedServer) { Server = providedServer; } else { @@ -890,7 +889,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) { ); shim->ReadsReceived.WaitI(); - + UNIT_ASSERT_EQUAL(shards.size(), 1); auto undelivery = MakeHolder<TEvPipeCache::TEvDeliveryProblem>(shards[0], true); @@ -937,7 +936,7 @@ Y_UNIT_TEST_SUITE(KqpSplit) { ); shim->ReadsReceived.WaitI(); - + UNIT_ASSERT_EQUAL(shards.size(), 1); auto undelivery = MakeHolder<TEvPipeCache::TEvDeliveryProblem>(shards[0], true); diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp index 5db8d66d46..c1e6d39d39 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp @@ -86,8 +86,7 @@ public: SendNotification<TEvQueryRunner::TEvExecutionStarted>(); } - auto response = std::make_unique<TEvKqpExecuter::TEvStreamDataAck>(); - response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto response = std::make_unique<TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); response->Record.SetFreeSpace(std::numeric_limits<i64>::max()); auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex(); diff --git a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp index 89057f4815..5d2c5fa083 100644 --- a/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp +++ b/ydb/core/local_pgwire/pgwire_kqp_proxy.cpp @@ -273,8 +273,7 @@ protected: TBase::Send(EventRequest_->Sender, response.release(), 0, EventRequest_->Cookie); BLOG_D(this->SelfId() << " Send stream data ack to " << ev->Sender); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); resp->Record.SetFreeSpace(std::numeric_limits<i64>::max()); TBase::Send(ev->Sender, resp.Release()); } diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp index fd30fb8424..2c22969358 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp @@ -117,9 +117,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -214,9 +213,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -317,9 +315,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); @@ -435,9 +432,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); @@ -565,9 +561,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); @@ -693,9 +688,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { } } - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; @@ -803,9 +797,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(record.GetSeqNo()); resp->Record.SetFreeSpace(100); ctx.Send(ev->Sender, resp.Release()); break; @@ -944,9 +937,8 @@ Y_UNIT_TEST_SUITE(KqpScan) { Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); result = record.GetResultSet().rows().at(0).items().at(0).int64_value(); - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(record.GetSeqNo(), record.GetChannelId()); resp->Record.SetEnough(false); - resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); resp->Record.SetFreeSpace(100); runtime.Send(new IEventHandle(ev->Sender, sender, resp.Release())); return TTestActorRuntime::EEventAction::DROP; diff --git a/ydb/core/tx/datashard/datashard_ut_volatile.cpp b/ydb/core/tx/datashard/datashard_ut_volatile.cpp index 4325b7cdc7..f5af54a545 100644 --- a/ydb/core/tx/datashard/datashard_ut_volatile.cpp +++ b/ydb/core/tx/datashard/datashard_ut_volatile.cpp @@ -1307,8 +1307,7 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) { if (msg->Record.GetResultSet().rows().size()) { observedResults.emplace_back(FormatResult(msg->Record.GetResultSet())); } - auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - resp->Record.SetSeqNo(msg->Record.GetSeqNo()); + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(msg->Record.GetSeqNo(), msg->Record.GetChannelId()); resp->Record.SetFreeSpace(1); ctx.Send(ev->Sender, resp.Release()); break; diff --git a/ydb/core/viewer/viewer_query.h b/ydb/core/viewer/viewer_query.h index 3278819cd2..9bf210e5db 100644 --- a/ydb/core/viewer/viewer_query.h +++ b/ydb/core/viewer/viewer_query.h @@ -572,8 +572,7 @@ private: ResultSets[data.GetQueryResultIndex()].emplace_back() = std::move(*data.MutableResultSet()); } - THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - ack->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); if (TotalRows >= LimitRows) { ack->Record.SetEnough(true); } diff --git a/ydb/core/viewer/viewer_query_old.h b/ydb/core/viewer/viewer_query_old.h index abeeeb4acd..ff592792d0 100644 --- a/ydb/core/viewer/viewer_query_old.h +++ b/ydb/core/viewer/viewer_query_old.h @@ -435,8 +435,7 @@ private: ResultSets.emplace_back(); ResultSets.back() = std::move(data.GetResultSet()); - THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - ack->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + THolder<NKqp::TEvKqpExecuter::TEvStreamDataAck> ack = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); Send(ev->Sender, ack.Release()); } diff --git a/ydb/tests/tools/kqprun/src/actors.cpp b/ydb/tests/tools/kqprun/src/actors.cpp index 225271772d..66b28b1093 100644 --- a/ydb/tests/tools/kqprun/src/actors.cpp +++ b/ydb/tests/tools/kqprun/src/actors.cpp @@ -42,8 +42,7 @@ public: ) void Handle(NKikimr::NKqp::TEvKqpExecuter::TEvStreamData::TPtr& ev) { - auto response = MakeHolder<NKikimr::NKqp::TEvKqpExecuter::TEvStreamDataAck>(); - response->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + auto response = MakeHolder<NKikimr::NKqp::TEvKqpExecuter::TEvStreamDataAck>(ev->Get()->Record.GetSeqNo(), ev->Get()->Record.GetChannelId()); response->Record.SetFreeSpace(ResultSizeLimit_); auto resultSetIndex = ev->Get()->Record.GetQueryResultIndex(); |