diff options
author | aozeritsky <aozeritsky@ydb.tech> | 2023-11-07 16:16:40 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@ydb.tech> | 2023-11-07 16:51:32 +0300 |
commit | 26b24efc04ba6226f1fe8e7187fcec75b05a4927 (patch) | |
tree | 60c2028c07d35420022229e699f4b8be65175ff1 | |
parent | 17299b814561285008930bd84a014ff4b6fb261b (diff) | |
download | ydb-26b24efc04ba6226f1fe8e7187fcec75b05a4927.tar.gz |
Build outputType for sink in vanilla job
3 files changed, 31 insertions, 69 deletions
diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto index 742f9cabe9..85e01537f4 100644 --- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto +++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto @@ -19,7 +19,6 @@ message TCommandHeader { RUN = 5; PREPARE = 6; STOP = 7; - GET_INPUT_TYPE = 8; GET_FREE_SPACE = 9; FINISH_OUTPUT = 10; @@ -35,13 +34,11 @@ message TCommandHeader { GET_FREE_SPACE_SOURCE = 18; GET_STORED_BYTES_SOURCE = 19; PUSH_SOURCE = 20; - GET_SOURCE_TYPE = 21; FINISH_SOURCE = 22; // Sink SINK_POP = 23; // TSinkPopRequest -> TSinkPopResponse SINK_IS_FINISHED = 24; // Header -> TIsFinishedResponse - SINK_OUTPUT_TYPE = 25; // Header -> TGetTypeResponse SINK_STATS = 26; // Header -> TGetSinkStatsResponse GET_METERING_STATS = 27; diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index 561e8fcdfb..d3b48bef8b 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -525,34 +525,6 @@ public: break; } - case NDqProto::TCommandHeader::GET_INPUT_TYPE: { - Y_ENSURE(header.GetVersion() <= CurrentProtocolVersion); - auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit - - Y_ENSURE(taskId == Runner->GetTaskId()); - auto channel = Runner->GetInputChannel(channelId); - auto inputType = channel->GetInputType(); - - NDqProto::TGetTypeResponse response; - response.SetResult(NKikimr::NMiniKQL::SerializeNode(inputType, Runner->GetTypeEnv())); - response.Save(&output); - - break; - } - case NDqProto::TCommandHeader::GET_SOURCE_TYPE: { - Y_ENSURE(header.GetVersion() <= CurrentProtocolVersion); - auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit - - Y_ENSURE(taskId == Runner->GetTaskId()); - auto source = Runner->GetSource(channelId); - auto inputType = source->GetInputType(); - - NDqProto::TGetTypeResponse response; - response.SetResult(NKikimr::NMiniKQL::SerializeNode(inputType, Runner->GetTypeEnv())); - response.Save(&output); - - break; - } case NDqProto::TCommandHeader::GET_FREE_SPACE: { Y_ENSURE(header.GetVersion() >= 2); @@ -653,19 +625,6 @@ public: response.Save(&output); break; } - case NDqProto::TCommandHeader::SINK_OUTPUT_TYPE: { - Y_ENSURE(header.GetVersion() <= CurrentProtocolVersion); - auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit - - Y_ENSURE(taskId == Runner->GetTaskId()); - auto outputType = Runner->GetSink(channelId)->GetOutputType(); - - NDqProto::TGetTypeResponse response; - response.SetResult(NKikimr::NMiniKQL::SerializeNode(outputType, Runner->GetTypeEnv())); - response.Save(&output); - - break; - } case NDqProto::TCommandHeader::SINK_STATS: { Y_ENSURE(header.GetVersion() <= CurrentProtocolVersion); Y_ENSURE(taskId == Runner->GetTaskId()); diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 182104764a..8932fe43c9 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1029,11 +1029,12 @@ private: class TDqSink : public IStringSink { public: - TDqSink(ui64 taskId, ui64 outputIndex, IPipeTaskRunner* taskRunner) + TDqSink(ui64 taskId, ui64 outputIndex, TType* type, IPipeTaskRunner* taskRunner) : TaskId(taskId) , TaskRunner(taskRunner) , Input(TaskRunner->GetInput()) , Output(TaskRunner->GetOutput()) + , OutputType(type) { PopStats.OutputIndex = outputIndex; } @@ -1128,24 +1129,6 @@ public: } NKikimr::NMiniKQL::TType* GetOutputType() const override { - if (OutputType) { - return OutputType; - } - - NDqProto::TCommandHeader header; - header.SetVersion(5); - header.SetCommand(NDqProto::TCommandHeader::SINK_OUTPUT_TYPE); - header.SetTaskId(TaskId); - header.SetChannelId(PopStats.OutputIndex); - header.Save(&Output); - - NDqProto::TGetTypeResponse response; - response.Load(&Input); - - OutputType = static_cast<NKikimr::NMiniKQL::TType*>( - NKikimr::NMiniKQL::DeserializeNode( - response.GetResult(), - TaskRunner->GetTypeEnv())); return OutputType; } @@ -1209,7 +1192,7 @@ private: IInputStream& Input; IOutputStream& Output; - mutable NKikimr::NMiniKQL::TType* OutputType = nullptr; + NKikimr::NMiniKQL::TType* OutputType; TDqOutputStats PushStats; TDqAsyncOutputBufferStats PopStats; @@ -1323,7 +1306,7 @@ public: } TDqSink::TPtr GetSink(ui64 index) override { - return new TDqSink(Task.GetId(), index, this); + return new TDqSink(Task.GetId(), index, OutputTypes.at(index), this); } const NMiniKQL::TTypeEnvironment& GetTypeEnv() const override { @@ -1453,6 +1436,31 @@ private: InputTypes[i] = inputType; } + + OutputTypes.resize(Task.OutputsSize()); + if (programRoot.GetNode()->GetType()->IsCallable()) { + auto programResultType = static_cast<const TCallableType*>(programRoot.GetNode()->GetType()); + YQL_ENSURE(programResultType->GetReturnType()->IsStream()); + auto programResultItemType = static_cast<const TStreamType*>(programResultType->GetReturnType())->GetItemType(); + + if (programResultItemType->IsVariant()) { + auto variantType = static_cast<const TVariantType*>(programResultItemType); + YQL_ENSURE(variantType->GetUnderlyingType()->IsTuple()); + auto variantTupleType = static_cast<const TTupleType*>(variantType->GetUnderlyingType()); + YQL_ENSURE(Task.OutputsSize() == variantTupleType->GetElementsCount(), + "" << Task.OutputsSize() << " != " << variantTupleType->GetElementsCount()); + for (ui32 i = 0; i < variantTupleType->GetElementsCount(); ++i) { + OutputTypes[i] = variantTupleType->GetElementType(i); + } + } + else { + YQL_ENSURE(Task.OutputsSize() == 1); + OutputTypes[0] = programResultItemType; + } + } else { + YQL_ENSURE(programRoot.GetNode()->GetType()->IsVoid()); + YQL_ENSURE(Task.OutputsSize() == 0); + } } private: @@ -1491,6 +1499,7 @@ private: NKikimr::NMiniKQL::TRuntimeNode ProgramNode; std::vector<TType*> InputTypes; + std::vector<TType*> OutputTypes; }; class TDqTaskRunner: public NDq::IDqTaskRunner { @@ -1578,10 +1587,7 @@ public: IDqAsyncOutputBuffer::TPtr GetSink(ui64 outputIndex) override { auto& sink = Sinks[outputIndex]; if (!sink) { - sink = new TDqSink( - Task.GetId(), - outputIndex, - Delegate.Get()); + sink = static_cast<TDqSink*>(Delegate->GetSink(outputIndex).Get()); } return sink; } |