aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@ydb.tech>2023-11-21 10:55:04 +0300
committeraozeritsky <aozeritsky@ydb.tech>2023-11-21 11:39:21 +0300
commit90255fbb9d46ed371b9a85c4123f04f7037a406f (patch)
tree59ba325a6e1be2d4d083e5589fa2645407e75a1b
parent652423f5101c754a20ca924201513b5fc2cdd78d (diff)
downloadydb-90255fbb9d46ed371b9a85c4123f04f7037a406f.tar.gz
Cache free space in channels
-rw-r--r--ydb/library/yql/providers/dq/api/protos/task_command_executor.proto7
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp28
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp158
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h2
4 files changed, 146 insertions, 49 deletions
diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
index 3532c5f58e..6c26bc38c7 100644
--- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
+++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
@@ -87,10 +87,17 @@ message TIsFinishedResponse {
bool Result = 1;
}
+message TFreeSpace {
+ uint64 Id = 1;
+ int64 Space = 2;
+}
+
message TRunResponse {
int32 Result = 1;
repeated TMetric Metric = 2;
TRusage Rusage = 3;
+ repeated TFreeSpace ChannelFreeSpace = 4;
+ repeated TFreeSpace SourceFreeSpace = 5;
}
message TPrepareResponse {
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index acccf5f5c3..2445da2e8a 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -39,7 +39,8 @@ namespace NTaskRunnerProxy {
// static const int CurrentProtocolVersion = 2; // GetFreeSpace
// static const int CurrentProtocolVersion = 3; // Calls for ComputeActor
// static const int CurrentProtocolVersion = 4; // Calls for Sources
-static const int CurrentProtocolVersion = 5; // Calls for Sinks
+// static const int CurrentProtocolVersion = 5; // Calls for Sinks
+static const int CurrentProtocolVersion = 6; // Respond free space after run
template<typename T>
void ToProto(T& proto, const NDq::TDqAsyncStats& stats)
@@ -482,6 +483,17 @@ public:
if (status == NDq::ERunStatus::Finished) {
UpdateStats(response);
}
+ for (auto id : InputChannels) {
+ auto* space = response.AddChannelFreeSpace();
+ space->SetId(id);
+ space->SetSpace(Runner->GetInputChannel(id)->GetFreeSpace());
+ }
+
+ for (auto id : Sources) {
+ auto* space = response.AddSourceFreeSpace();
+ space->SetId(id);
+ space->SetSpace(Runner->GetSource(id)->GetFreeSpace());
+ }
response.Save(&output);
} catch (const NKikimr::TMemoryLimitExceededException& ex) {
throw yexception() << "DQ computation exceeds the memory limit " << DqConfiguration->MemoryLimit.Get().GetOrElse(0) << ". Try to increase the limit using PRAGMA dq.MemoryLimit";
@@ -725,6 +737,18 @@ public:
NDq::TDqTaskRunnerExecutionContextDefault execCtx;
Runner->Prepare(task, limits, execCtx);
+
+ for (ui32 i = 0; i < task.InputsSize(); ++i) {
+ auto& inputDesc = task.GetInputs(i);
+ if (inputDesc.HasSource()) {
+ Sources.emplace(i);
+ } else {
+ for (auto& inputChannelDesc : inputDesc.GetChannels()) {
+ ui64 channelId = inputChannelDesc.GetId();
+ InputChannels.emplace(channelId);
+ }
+ }
+ }
});
result.Save(&output);
@@ -736,6 +760,8 @@ public:
NKikimr::NMiniKQL::IStatsRegistry* JobStats;
bool TerminateOnError;
TIntrusivePtr<NDq::IDqTaskRunner> Runner;
+ THashSet<ui64> Sources;
+ THashSet<ui64> InputChannels;
ui32 StageId = 0;
TTaskCounters QueryStat;
TTaskCounters PrevStat;
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 3990ee8676..35f36fc55a 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -25,6 +25,7 @@
#include <util/system/fs.h>
#include <util/stream/file.h>
#include <util/stream/pipe.h>
+#include <util/stream/length.h>
#include <util/generic/size_literals.h>
#include <util/generic/maybe.h>
#include <util/string/cast.h>
@@ -51,16 +52,6 @@ extern "C" int kill(int pid, int sig);
extern "C" int waitpid(int pid, int* status, int options);
#endif
-void SaveRopeToPipe(IOutputStream& output, const TRope& rope) {
- for (const auto& [data, size] : rope) {
- output.Write(&size, sizeof(size_t));
- YQL_ENSURE(size != 0);
- output.Write(data, size);
- }
- size_t zero = 0;
- output.Write(&zero, sizeof(size_t));
-}
-
namespace {
void Load(IInputStream& input, void* buf, size_t size) {
@@ -73,6 +64,19 @@ void Load(IInputStream& input, void* buf, size_t size) {
}
}
+} // namespace {
+
+i64 SaveRopeToPipe(IOutputStream& output, const TRope& rope) {
+ i64 total = 0;
+ for (const auto& [data, size] : rope) {
+ output.Write(&size, sizeof(size_t));
+ YQL_ENSURE(size != 0);
+ output.Write(data, size);
+ total += size;
+ }
+ size_t zero = 0;
+ output.Write(&zero, sizeof(size_t));
+ return total;
}
void LoadRopeFromPipe(IInputStream& input, TRope& rope) {
@@ -529,29 +533,38 @@ public:
class TInputChannel : public IInputChannel {
public:
- TInputChannel(const ITaskRunner::TPtr& taskRunner, ui64 taskId, ui64 channelId, IInputStream& input, IOutputStream& output)
+ TInputChannel(const ITaskRunner::TPtr& taskRunner, ui64 taskId, ui64 channelId, IInputStream& input, IOutputStream& output, i64 channelBufferSize)
: TaskId(taskId)
, ChannelId(channelId)
, Input(input)
, Output(output)
, ProtocolVersion(taskRunner->GetProtocolVersion())
+ , FreeSpace(channelBufferSize)
{ }
i64 GetFreeSpace() override {
if (ProtocolVersion <= 1) {
return std::numeric_limits<i64>::max();
}
+
+ if (ProtocolVersion < 6) {
+ NDqProto::TCommandHeader header;
+ header.SetVersion(2);
+ header.SetCommand(NDqProto::TCommandHeader::GET_FREE_SPACE);
+ header.SetTaskId(TaskId);
+ header.SetChannelId(ChannelId);
+ header.Save(&Output);
- NDqProto::TCommandHeader header;
- header.SetVersion(2);
- header.SetCommand(NDqProto::TCommandHeader::GET_FREE_SPACE);
- header.SetTaskId(TaskId);
- header.SetChannelId(ChannelId);
- header.Save(&Output);
+ NDqProto::TGetFreeSpaceResponse response;
+ response.Load(&Input);
+ return response.GetFreeSpace();
+ }
- NDqProto::TGetFreeSpaceResponse response;
- response.Load(&Input);
- return response.GetFreeSpace();
+ return FreeSpace;
+ }
+
+ void SetFreeSpace(i64 space) {
+ FreeSpace = space;
}
void Push(TDqSerializedBatch&& data) override {
@@ -562,9 +575,18 @@ public:
header.SetChannelId(ChannelId);
header.Save(&Output);
- data.Proto.Save(&Output);
+ i64 written = 0;
+ TCountingOutput countingOutput(&Output);
+ data.Proto.Save(&countingOutput);
if (data.IsOOB()) {
- SaveRopeToPipe(Output, data.Payload);
+ written = SaveRopeToPipe(Output, data.Payload);
+ } else {
+ written = countingOutput.Counter();
+ }
+
+ if (ProtocolVersion >= 6) {
+ // estimate free space
+ FreeSpace -= written;
}
}
@@ -584,9 +606,8 @@ private:
IInputStream& Input;
IOutputStream& Output;
- TString SerializedInputType;
-
i32 ProtocolVersion;
+ i64 FreeSpace;
};
class TDqInputChannel: public IDqInputChannel {
@@ -711,41 +732,56 @@ private:
class TDqSource: public IDqAsyncInputBuffer {
public:
- TDqSource(ui64 taskId, ui64 inputIndex, TType* inputType, IPipeTaskRunner* taskRunner)
+ TDqSource(ui64 taskId, ui64 inputIndex, TType* inputType, i64 channelBufferSize, IPipeTaskRunner* taskRunner)
: TaskId(taskId)
, TaskRunner(taskRunner)
, Input(TaskRunner->GetInput())
, Output(TaskRunner->GetOutput())
, InputType(inputType)
+ , BufferSize(channelBufferSize)
+ , FreeSpace(channelBufferSize)
+ , ProtocolVersion(TaskRunner->GetProtocolVersion())
{
PushStats.InputIndex = inputIndex;
}
i64 GetFreeSpace() const override {
- NDqProto::TCommandHeader header;
- header.SetVersion(4);
- header.SetCommand(NDqProto::TCommandHeader::GET_FREE_SPACE_SOURCE);
- header.SetTaskId(TaskId);
- header.SetChannelId(PushStats.InputIndex);
- header.Save(&Output);
+ if (ProtocolVersion < 6) {
+ NDqProto::TCommandHeader header;
+ header.SetVersion(4);
+ header.SetCommand(NDqProto::TCommandHeader::GET_FREE_SPACE_SOURCE);
+ header.SetTaskId(TaskId);
+ header.SetChannelId(PushStats.InputIndex);
+ header.Save(&Output);
- NDqProto::TGetFreeSpaceResponse response;
- response.Load(&Input);
- return response.GetFreeSpace();
+ NDqProto::TGetFreeSpaceResponse response;
+ response.Load(&Input);
+ return response.GetFreeSpace();
+ }
+
+ return FreeSpace;
+ }
+
+ void SetFreeSpace(i64 space) {
+ FreeSpace = space;
}
ui64 GetStoredBytes() const override {
- NDqProto::TCommandHeader header;
- header.SetVersion(4);
- header.SetCommand(NDqProto::TCommandHeader::GET_STORED_BYTES_SOURCE);
- header.SetTaskId(TaskId);
- header.SetChannelId(PushStats.InputIndex);
- header.Save(&Output);
+ if (ProtocolVersion < 6) {
+ NDqProto::TCommandHeader header;
+ header.SetVersion(4);
+ header.SetCommand(NDqProto::TCommandHeader::GET_STORED_BYTES_SOURCE);
+ header.SetTaskId(TaskId);
+ header.SetChannelId(PushStats.InputIndex);
+ header.Save(&Output);
- NDqProto::TGetStoredBytesResponse response;
- response.Load(&Input);
+ NDqProto::TGetStoredBytesResponse response;
+ response.Load(&Input);
- return response.GetResult();
+ return response.GetResult();
+ }
+
+ return BufferSize - FreeSpace;
}
const TDqAsyncInputBufferStats& GetPushStats() const override {
@@ -776,6 +812,10 @@ public:
if (isOOB) {
SaveRopeToPipe(Output, serialized.Payload);
}
+
+ if (ProtocolVersion >= 6) {
+ FreeSpace -= space;
+ }
}
void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) override {
@@ -842,6 +882,9 @@ private:
mutable NKikimr::NMiniKQL::TType* InputType = nullptr;
TDqAsyncInputBufferStats PushStats;
TDqInputStats PopStats;
+ i64 BufferSize;
+ i64 FreeSpace;
+ i32 ProtocolVersion;
};
/*______________________________________________________________________________________________*/
@@ -1285,7 +1328,7 @@ public:
}
NYql::NDqProto::TPrepareResponse Prepare(const NDq::TDqTaskRunnerMemoryLimits& limits) override {
- Y_UNUSED(limits);
+ ChannelBufferSize = limits.ChannelBufferSize;
NDqProto::TCommandHeader header;
header.SetVersion(1);
header.SetCommand(NDqProto::TCommandHeader::PREPARE);
@@ -1302,18 +1345,32 @@ public:
NYql::NDqProto::TRunResponse Run() override {
NDqProto::TCommandHeader header;
- header.SetVersion(1);
+ header.SetVersion(GetProtocolVersion() >= 6 ? 6 : 1);
header.SetCommand(NDqProto::TCommandHeader::RUN);
header.SetTaskId(Task.GetId());
header.Save(&Output);
NDqProto::TRunResponse response;
response.Load(&Input);
+ if (GetProtocolVersion() >= 6) {
+ for (auto& space : response.GetChannelFreeSpace()) {
+ auto* channel = static_cast<TInputChannel*>(GetInputChannel(space.GetId()).Get());
+ channel->SetFreeSpace(space.GetSpace());
+ }
+ for (auto& space : response.GetSourceFreeSpace()) {
+ auto* source = static_cast<TDqSource*>(GetSource(space.GetId()).Get());
+ source->SetFreeSpace(space.GetSpace());
+ }
+ }
return response;
}
IInputChannel::TPtr GetInputChannel(ui64 channelId) override {
- return new TInputChannel(this, Task.GetId(), channelId, Input, Output);
+ auto& channel = InputChannels[channelId];
+ if (channel == nullptr) {
+ channel = new TInputChannel(this, Task.GetId(), channelId, Input, Output, ChannelBufferSize);
+ }
+ return channel;
}
IOutputChannel::TPtr GetOutputChannel(ui64 channelId) override {
@@ -1321,7 +1378,11 @@ public:
}
IDqAsyncInputBuffer::TPtr GetSource(ui64 index) override {
- return new TDqSource(Task.GetId(), index, InputTypes.at(index), this);
+ auto& source = Sources[index];
+ if (source == nullptr) {
+ source = new TDqSource(Task.GetId(), index, InputTypes.at(index), ChannelBufferSize, this);
+ }
+ return source;
}
TDqSink::TPtr GetSink(ui64 index) override {
@@ -1488,6 +1549,9 @@ private:
THashMap<TString, TString> SecureParams;
THashMap<TString, TString> TaskParams;
TVector<TString> ReadRanges;
+ THashMap<ui64, IInputChannel::TPtr> InputChannels;
+ THashMap<ui64, IDqAsyncInputBuffer::TPtr> Sources;
+ i64 ChannelBufferSize = 0;
std::shared_ptr <NKikimr::NMiniKQL::TScopedAlloc> Alloc;
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
index ec3b9dd728..c99c3f197f 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
@@ -12,7 +12,7 @@ extern const TString WorkingDirectoryParamName;
extern const TString WorkingDirectoryDontInitParamName; // COMPAT(aozeritsky)
extern const TString UseMetaParamName; // COMPAT(aozeritsky)
-void SaveRopeToPipe(IOutputStream& output, const TRope& rope);
+i64 SaveRopeToPipe(IOutputStream& output, const TRope& rope);
void LoadRopeFromPipe(IInputStream& input, TRope& rope);
NDq::TDqTaskRunnerMemoryLimits DefaultMemoryLimits();