diff options
author | aozeritsky <aozeritsky@ydb.tech> | 2023-11-21 10:55:04 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@ydb.tech> | 2023-11-21 11:39:21 +0300 |
commit | 90255fbb9d46ed371b9a85c4123f04f7037a406f (patch) | |
tree | 59ba325a6e1be2d4d083e5589fa2645407e75a1b | |
parent | 652423f5101c754a20ca924201513b5fc2cdd78d (diff) | |
download | ydb-90255fbb9d46ed371b9a85c4123f04f7037a406f.tar.gz |
Cache free space in channels
4 files changed, 146 insertions, 49 deletions
diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto index 3532c5f58e..6c26bc38c7 100644 --- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto +++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto @@ -87,10 +87,17 @@ message TIsFinishedResponse { bool Result = 1; } +message TFreeSpace { + uint64 Id = 1; + int64 Space = 2; +} + message TRunResponse { int32 Result = 1; repeated TMetric Metric = 2; TRusage Rusage = 3; + repeated TFreeSpace ChannelFreeSpace = 4; + repeated TFreeSpace SourceFreeSpace = 5; } message TPrepareResponse { diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index acccf5f5c3..2445da2e8a 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -39,7 +39,8 @@ namespace NTaskRunnerProxy { // static const int CurrentProtocolVersion = 2; // GetFreeSpace // static const int CurrentProtocolVersion = 3; // Calls for ComputeActor // static const int CurrentProtocolVersion = 4; // Calls for Sources -static const int CurrentProtocolVersion = 5; // Calls for Sinks +// static const int CurrentProtocolVersion = 5; // Calls for Sinks +static const int CurrentProtocolVersion = 6; // Respond free space after run template<typename T> void ToProto(T& proto, const NDq::TDqAsyncStats& stats) @@ -482,6 +483,17 @@ public: if (status == NDq::ERunStatus::Finished) { UpdateStats(response); } + for (auto id : InputChannels) { + auto* space = response.AddChannelFreeSpace(); + space->SetId(id); + space->SetSpace(Runner->GetInputChannel(id)->GetFreeSpace()); + } + + for (auto id : Sources) { + auto* space = response.AddSourceFreeSpace(); + space->SetId(id); + space->SetSpace(Runner->GetSource(id)->GetFreeSpace()); + } response.Save(&output); } catch (const NKikimr::TMemoryLimitExceededException& ex) { throw yexception() << "DQ computation exceeds the memory limit " << DqConfiguration->MemoryLimit.Get().GetOrElse(0) << ". Try to increase the limit using PRAGMA dq.MemoryLimit"; @@ -725,6 +737,18 @@ public: NDq::TDqTaskRunnerExecutionContextDefault execCtx; Runner->Prepare(task, limits, execCtx); + + for (ui32 i = 0; i < task.InputsSize(); ++i) { + auto& inputDesc = task.GetInputs(i); + if (inputDesc.HasSource()) { + Sources.emplace(i); + } else { + for (auto& inputChannelDesc : inputDesc.GetChannels()) { + ui64 channelId = inputChannelDesc.GetId(); + InputChannels.emplace(channelId); + } + } + } }); result.Save(&output); @@ -736,6 +760,8 @@ public: NKikimr::NMiniKQL::IStatsRegistry* JobStats; bool TerminateOnError; TIntrusivePtr<NDq::IDqTaskRunner> Runner; + THashSet<ui64> Sources; + THashSet<ui64> InputChannels; ui32 StageId = 0; TTaskCounters QueryStat; TTaskCounters PrevStat; diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 3990ee8676..35f36fc55a 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -25,6 +25,7 @@ #include <util/system/fs.h> #include <util/stream/file.h> #include <util/stream/pipe.h> +#include <util/stream/length.h> #include <util/generic/size_literals.h> #include <util/generic/maybe.h> #include <util/string/cast.h> @@ -51,16 +52,6 @@ extern "C" int kill(int pid, int sig); extern "C" int waitpid(int pid, int* status, int options); #endif -void SaveRopeToPipe(IOutputStream& output, const TRope& rope) { - for (const auto& [data, size] : rope) { - output.Write(&size, sizeof(size_t)); - YQL_ENSURE(size != 0); - output.Write(data, size); - } - size_t zero = 0; - output.Write(&zero, sizeof(size_t)); -} - namespace { void Load(IInputStream& input, void* buf, size_t size) { @@ -73,6 +64,19 @@ void Load(IInputStream& input, void* buf, size_t size) { } } +} // namespace { + +i64 SaveRopeToPipe(IOutputStream& output, const TRope& rope) { + i64 total = 0; + for (const auto& [data, size] : rope) { + output.Write(&size, sizeof(size_t)); + YQL_ENSURE(size != 0); + output.Write(data, size); + total += size; + } + size_t zero = 0; + output.Write(&zero, sizeof(size_t)); + return total; } void LoadRopeFromPipe(IInputStream& input, TRope& rope) { @@ -529,29 +533,38 @@ public: class TInputChannel : public IInputChannel { public: - TInputChannel(const ITaskRunner::TPtr& taskRunner, ui64 taskId, ui64 channelId, IInputStream& input, IOutputStream& output) + TInputChannel(const ITaskRunner::TPtr& taskRunner, ui64 taskId, ui64 channelId, IInputStream& input, IOutputStream& output, i64 channelBufferSize) : TaskId(taskId) , ChannelId(channelId) , Input(input) , Output(output) , ProtocolVersion(taskRunner->GetProtocolVersion()) + , FreeSpace(channelBufferSize) { } i64 GetFreeSpace() override { if (ProtocolVersion <= 1) { return std::numeric_limits<i64>::max(); } + + if (ProtocolVersion < 6) { + NDqProto::TCommandHeader header; + header.SetVersion(2); + header.SetCommand(NDqProto::TCommandHeader::GET_FREE_SPACE); + header.SetTaskId(TaskId); + header.SetChannelId(ChannelId); + header.Save(&Output); - NDqProto::TCommandHeader header; - header.SetVersion(2); - header.SetCommand(NDqProto::TCommandHeader::GET_FREE_SPACE); - header.SetTaskId(TaskId); - header.SetChannelId(ChannelId); - header.Save(&Output); + NDqProto::TGetFreeSpaceResponse response; + response.Load(&Input); + return response.GetFreeSpace(); + } - NDqProto::TGetFreeSpaceResponse response; - response.Load(&Input); - return response.GetFreeSpace(); + return FreeSpace; + } + + void SetFreeSpace(i64 space) { + FreeSpace = space; } void Push(TDqSerializedBatch&& data) override { @@ -562,9 +575,18 @@ public: header.SetChannelId(ChannelId); header.Save(&Output); - data.Proto.Save(&Output); + i64 written = 0; + TCountingOutput countingOutput(&Output); + data.Proto.Save(&countingOutput); if (data.IsOOB()) { - SaveRopeToPipe(Output, data.Payload); + written = SaveRopeToPipe(Output, data.Payload); + } else { + written = countingOutput.Counter(); + } + + if (ProtocolVersion >= 6) { + // estimate free space + FreeSpace -= written; } } @@ -584,9 +606,8 @@ private: IInputStream& Input; IOutputStream& Output; - TString SerializedInputType; - i32 ProtocolVersion; + i64 FreeSpace; }; class TDqInputChannel: public IDqInputChannel { @@ -711,41 +732,56 @@ private: class TDqSource: public IDqAsyncInputBuffer { public: - TDqSource(ui64 taskId, ui64 inputIndex, TType* inputType, IPipeTaskRunner* taskRunner) + TDqSource(ui64 taskId, ui64 inputIndex, TType* inputType, i64 channelBufferSize, IPipeTaskRunner* taskRunner) : TaskId(taskId) , TaskRunner(taskRunner) , Input(TaskRunner->GetInput()) , Output(TaskRunner->GetOutput()) , InputType(inputType) + , BufferSize(channelBufferSize) + , FreeSpace(channelBufferSize) + , ProtocolVersion(TaskRunner->GetProtocolVersion()) { PushStats.InputIndex = inputIndex; } i64 GetFreeSpace() const override { - NDqProto::TCommandHeader header; - header.SetVersion(4); - header.SetCommand(NDqProto::TCommandHeader::GET_FREE_SPACE_SOURCE); - header.SetTaskId(TaskId); - header.SetChannelId(PushStats.InputIndex); - header.Save(&Output); + if (ProtocolVersion < 6) { + NDqProto::TCommandHeader header; + header.SetVersion(4); + header.SetCommand(NDqProto::TCommandHeader::GET_FREE_SPACE_SOURCE); + header.SetTaskId(TaskId); + header.SetChannelId(PushStats.InputIndex); + header.Save(&Output); - NDqProto::TGetFreeSpaceResponse response; - response.Load(&Input); - return response.GetFreeSpace(); + NDqProto::TGetFreeSpaceResponse response; + response.Load(&Input); + return response.GetFreeSpace(); + } + + return FreeSpace; + } + + void SetFreeSpace(i64 space) { + FreeSpace = space; } ui64 GetStoredBytes() const override { - NDqProto::TCommandHeader header; - header.SetVersion(4); - header.SetCommand(NDqProto::TCommandHeader::GET_STORED_BYTES_SOURCE); - header.SetTaskId(TaskId); - header.SetChannelId(PushStats.InputIndex); - header.Save(&Output); + if (ProtocolVersion < 6) { + NDqProto::TCommandHeader header; + header.SetVersion(4); + header.SetCommand(NDqProto::TCommandHeader::GET_STORED_BYTES_SOURCE); + header.SetTaskId(TaskId); + header.SetChannelId(PushStats.InputIndex); + header.Save(&Output); - NDqProto::TGetStoredBytesResponse response; - response.Load(&Input); + NDqProto::TGetStoredBytesResponse response; + response.Load(&Input); - return response.GetResult(); + return response.GetResult(); + } + + return BufferSize - FreeSpace; } const TDqAsyncInputBufferStats& GetPushStats() const override { @@ -776,6 +812,10 @@ public: if (isOOB) { SaveRopeToPipe(Output, serialized.Payload); } + + if (ProtocolVersion >= 6) { + FreeSpace -= space; + } } void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override { @@ -842,6 +882,9 @@ private: mutable NKikimr::NMiniKQL::TType* InputType = nullptr; TDqAsyncInputBufferStats PushStats; TDqInputStats PopStats; + i64 BufferSize; + i64 FreeSpace; + i32 ProtocolVersion; }; /*______________________________________________________________________________________________*/ @@ -1285,7 +1328,7 @@ public: } NYql::NDqProto::TPrepareResponse Prepare(const NDq::TDqTaskRunnerMemoryLimits& limits) override { - Y_UNUSED(limits); + ChannelBufferSize = limits.ChannelBufferSize; NDqProto::TCommandHeader header; header.SetVersion(1); header.SetCommand(NDqProto::TCommandHeader::PREPARE); @@ -1302,18 +1345,32 @@ public: NYql::NDqProto::TRunResponse Run() override { NDqProto::TCommandHeader header; - header.SetVersion(1); + header.SetVersion(GetProtocolVersion() >= 6 ? 6 : 1); header.SetCommand(NDqProto::TCommandHeader::RUN); header.SetTaskId(Task.GetId()); header.Save(&Output); NDqProto::TRunResponse response; response.Load(&Input); + if (GetProtocolVersion() >= 6) { + for (auto& space : response.GetChannelFreeSpace()) { + auto* channel = static_cast<TInputChannel*>(GetInputChannel(space.GetId()).Get()); + channel->SetFreeSpace(space.GetSpace()); + } + for (auto& space : response.GetSourceFreeSpace()) { + auto* source = static_cast<TDqSource*>(GetSource(space.GetId()).Get()); + source->SetFreeSpace(space.GetSpace()); + } + } return response; } IInputChannel::TPtr GetInputChannel(ui64 channelId) override { - return new TInputChannel(this, Task.GetId(), channelId, Input, Output); + auto& channel = InputChannels[channelId]; + if (channel == nullptr) { + channel = new TInputChannel(this, Task.GetId(), channelId, Input, Output, ChannelBufferSize); + } + return channel; } IOutputChannel::TPtr GetOutputChannel(ui64 channelId) override { @@ -1321,7 +1378,11 @@ public: } IDqAsyncInputBuffer::TPtr GetSource(ui64 index) override { - return new TDqSource(Task.GetId(), index, InputTypes.at(index), this); + auto& source = Sources[index]; + if (source == nullptr) { + source = new TDqSource(Task.GetId(), index, InputTypes.at(index), ChannelBufferSize, this); + } + return source; } TDqSink::TPtr GetSink(ui64 index) override { @@ -1488,6 +1549,9 @@ private: THashMap<TString, TString> SecureParams; THashMap<TString, TString> TaskParams; TVector<TString> ReadRanges; + THashMap<ui64, IInputChannel::TPtr> InputChannels; + THashMap<ui64, IDqAsyncInputBuffer::TPtr> Sources; + i64 ChannelBufferSize = 0; std::shared_ptr <NKikimr::NMiniKQL::TScopedAlloc> Alloc; diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h index ec3b9dd728..c99c3f197f 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h @@ -12,7 +12,7 @@ extern const TString WorkingDirectoryParamName; extern const TString WorkingDirectoryDontInitParamName; // COMPAT(aozeritsky) extern const TString UseMetaParamName; // COMPAT(aozeritsky) -void SaveRopeToPipe(IOutputStream& output, const TRope& rope); +i64 SaveRopeToPipe(IOutputStream& output, const TRope& rope); void LoadRopeFromPipe(IInputStream& input, TRope& rope); NDq::TDqTaskRunnerMemoryLimits DefaultMemoryLimits(); |