aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@ydb.tech>2023-11-07 16:16:40 +0300
committeraozeritsky <aozeritsky@ydb.tech>2023-11-07 16:51:32 +0300
commit26b24efc04ba6226f1fe8e7187fcec75b05a4927 (patch)
tree60c2028c07d35420022229e699f4b8be65175ff1
parent17299b814561285008930bd84a014ff4b6fb261b (diff)
downloadydb-26b24efc04ba6226f1fe8e7187fcec75b05a4927.tar.gz
Build outputType for sink in vanilla job
-rw-r--r--ydb/library/yql/providers/dq/api/protos/task_command_executor.proto3
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp41
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp56
3 files changed, 31 insertions, 69 deletions
diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
index 742f9cabe9..85e01537f4 100644
--- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
+++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
@@ -19,7 +19,6 @@ message TCommandHeader {
RUN = 5;
PREPARE = 6;
STOP = 7;
- GET_INPUT_TYPE = 8;
GET_FREE_SPACE = 9;
FINISH_OUTPUT = 10;
@@ -35,13 +34,11 @@ message TCommandHeader {
GET_FREE_SPACE_SOURCE = 18;
GET_STORED_BYTES_SOURCE = 19;
PUSH_SOURCE = 20;
- GET_SOURCE_TYPE = 21;
FINISH_SOURCE = 22;
// Sink
SINK_POP = 23; // TSinkPopRequest -> TSinkPopResponse
SINK_IS_FINISHED = 24; // Header -> TIsFinishedResponse
- SINK_OUTPUT_TYPE = 25; // Header -> TGetTypeResponse
SINK_STATS = 26; // Header -> TGetSinkStatsResponse
GET_METERING_STATS = 27;
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index 561e8fcdfb..d3b48bef8b 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -525,34 +525,6 @@ public:
break;
}
- case NDqProto::TCommandHeader::GET_INPUT_TYPE: {
- Y_ENSURE(header.GetVersion() <= CurrentProtocolVersion);
- auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit
-
- Y_ENSURE(taskId == Runner->GetTaskId());
- auto channel = Runner->GetInputChannel(channelId);
- auto inputType = channel->GetInputType();
-
- NDqProto::TGetTypeResponse response;
- response.SetResult(NKikimr::NMiniKQL::SerializeNode(inputType, Runner->GetTypeEnv()));
- response.Save(&output);
-
- break;
- }
- case NDqProto::TCommandHeader::GET_SOURCE_TYPE: {
- Y_ENSURE(header.GetVersion() <= CurrentProtocolVersion);
- auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit
-
- Y_ENSURE(taskId == Runner->GetTaskId());
- auto source = Runner->GetSource(channelId);
- auto inputType = source->GetInputType();
-
- NDqProto::TGetTypeResponse response;
- response.SetResult(NKikimr::NMiniKQL::SerializeNode(inputType, Runner->GetTypeEnv()));
- response.Save(&output);
-
- break;
- }
case NDqProto::TCommandHeader::GET_FREE_SPACE: {
Y_ENSURE(header.GetVersion() >= 2);
@@ -653,19 +625,6 @@ public:
response.Save(&output);
break;
}
- case NDqProto::TCommandHeader::SINK_OUTPUT_TYPE: {
- Y_ENSURE(header.GetVersion() <= CurrentProtocolVersion);
- auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit
-
- Y_ENSURE(taskId == Runner->GetTaskId());
- auto outputType = Runner->GetSink(channelId)->GetOutputType();
-
- NDqProto::TGetTypeResponse response;
- response.SetResult(NKikimr::NMiniKQL::SerializeNode(outputType, Runner->GetTypeEnv()));
- response.Save(&output);
-
- break;
- }
case NDqProto::TCommandHeader::SINK_STATS: {
Y_ENSURE(header.GetVersion() <= CurrentProtocolVersion);
Y_ENSURE(taskId == Runner->GetTaskId());
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 182104764a..8932fe43c9 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1029,11 +1029,12 @@ private:
class TDqSink : public IStringSink {
public:
- TDqSink(ui64 taskId, ui64 outputIndex, IPipeTaskRunner* taskRunner)
+ TDqSink(ui64 taskId, ui64 outputIndex, TType* type, IPipeTaskRunner* taskRunner)
: TaskId(taskId)
, TaskRunner(taskRunner)
, Input(TaskRunner->GetInput())
, Output(TaskRunner->GetOutput())
+ , OutputType(type)
{
PopStats.OutputIndex = outputIndex;
}
@@ -1128,24 +1129,6 @@ public:
}
NKikimr::NMiniKQL::TType* GetOutputType() const override {
- if (OutputType) {
- return OutputType;
- }
-
- NDqProto::TCommandHeader header;
- header.SetVersion(5);
- header.SetCommand(NDqProto::TCommandHeader::SINK_OUTPUT_TYPE);
- header.SetTaskId(TaskId);
- header.SetChannelId(PopStats.OutputIndex);
- header.Save(&Output);
-
- NDqProto::TGetTypeResponse response;
- response.Load(&Input);
-
- OutputType = static_cast<NKikimr::NMiniKQL::TType*>(
- NKikimr::NMiniKQL::DeserializeNode(
- response.GetResult(),
- TaskRunner->GetTypeEnv()));
return OutputType;
}
@@ -1209,7 +1192,7 @@ private:
IInputStream& Input;
IOutputStream& Output;
- mutable NKikimr::NMiniKQL::TType* OutputType = nullptr;
+ NKikimr::NMiniKQL::TType* OutputType;
TDqOutputStats PushStats;
TDqAsyncOutputBufferStats PopStats;
@@ -1323,7 +1306,7 @@ public:
}
TDqSink::TPtr GetSink(ui64 index) override {
- return new TDqSink(Task.GetId(), index, this);
+ return new TDqSink(Task.GetId(), index, OutputTypes.at(index), this);
}
const NMiniKQL::TTypeEnvironment& GetTypeEnv() const override {
@@ -1453,6 +1436,31 @@ private:
InputTypes[i] = inputType;
}
+
+ OutputTypes.resize(Task.OutputsSize());
+ if (programRoot.GetNode()->GetType()->IsCallable()) {
+ auto programResultType = static_cast<const TCallableType*>(programRoot.GetNode()->GetType());
+ YQL_ENSURE(programResultType->GetReturnType()->IsStream());
+ auto programResultItemType = static_cast<const TStreamType*>(programResultType->GetReturnType())->GetItemType();
+
+ if (programResultItemType->IsVariant()) {
+ auto variantType = static_cast<const TVariantType*>(programResultItemType);
+ YQL_ENSURE(variantType->GetUnderlyingType()->IsTuple());
+ auto variantTupleType = static_cast<const TTupleType*>(variantType->GetUnderlyingType());
+ YQL_ENSURE(Task.OutputsSize() == variantTupleType->GetElementsCount(),
+ "" << Task.OutputsSize() << " != " << variantTupleType->GetElementsCount());
+ for (ui32 i = 0; i < variantTupleType->GetElementsCount(); ++i) {
+ OutputTypes[i] = variantTupleType->GetElementType(i);
+ }
+ }
+ else {
+ YQL_ENSURE(Task.OutputsSize() == 1);
+ OutputTypes[0] = programResultItemType;
+ }
+ } else {
+ YQL_ENSURE(programRoot.GetNode()->GetType()->IsVoid());
+ YQL_ENSURE(Task.OutputsSize() == 0);
+ }
}
private:
@@ -1491,6 +1499,7 @@ private:
NKikimr::NMiniKQL::TRuntimeNode ProgramNode;
std::vector<TType*> InputTypes;
+ std::vector<TType*> OutputTypes;
};
class TDqTaskRunner: public NDq::IDqTaskRunner {
@@ -1578,10 +1587,7 @@ public:
IDqAsyncOutputBuffer::TPtr GetSink(ui64 outputIndex) override {
auto& sink = Sinks[outputIndex];
if (!sink) {
- sink = new TDqSink(
- Task.GetId(),
- outputIndex,
- Delegate.Get());
+ sink = static_cast<TDqSink*>(Delegate->GetSink(outputIndex).Get());
}
return sink;
}