summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexey Ozeritskiy <[email protected]>2022-06-09 20:50:56 +0300
committerAlexey Ozeritskiy <[email protected]>2022-06-09 20:50:56 +0300
commit2a82ad54ef82e80dc3667f14c02193f98f22086e (patch)
tree09e8ef80cf80ecadaa22db516fd8c22a7339504c
parent66aefc6939acaf049ce10d9506310a1b5e05cfa9 (diff)
YQL-14386: Can change ChunkSizeHardLimit
ref:3475f5268b56a19d50024e0840fb368b9ef47a99
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp18
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.h3
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp1
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h1
4 files changed, 14 insertions, 9 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index ac4ca1accf0..88d23c8b38e 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -22,8 +22,6 @@ namespace {
using namespace NKikimr;
-const ui32 ChunkSizeHardLimit = 48 * 1024 * 1024; // 48 MB
-
bool IsSafeToEstimateValueSize(const NMiniKQL::TType* type) {
if (type->GetKind() == NMiniKQL::TType::EKind::Data) {
return true;
@@ -68,7 +66,7 @@ class TDqOutputChannelOld : public IDqOutputChannel {
public:
TDqOutputChannelOld(ui64 channelId, NMiniKQL::TType* outputType, bool collectProfileStats,
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, ui64 maxStoredBytes,
- ui64 maxChunkBytes, NDqProto::EDataTransportVersion transportVersion, const TLogFunc& logFunc)
+ ui64 maxChunkBytes, ui64 chunkSizeLimit, NDqProto::EDataTransportVersion transportVersion, const TLogFunc& logFunc)
: ChannelId(channelId)
, OutputType(outputType)
, BasicStats(ChannelId)
@@ -76,6 +74,7 @@ public:
, DataSerializer(typeEnv, holderFactory, transportVersion)
, MaxStoredBytes(maxStoredBytes)
, MaxChunkBytes(maxChunkBytes)
+ , ChunkSizeLimit(chunkSizeLimit)
, LogFunc(logFunc) {}
ui64 GetChannelId() const override {
@@ -166,7 +165,7 @@ public:
DLOG("Recalc estimated row size: " << EstimatedRowBytes);
}
- while (data.GetRaw().size() >= ChunkSizeHardLimit && takeRows > 1) {
+ while (data.GetRaw().size() >= ChunkSizeLimit && takeRows > 1) {
ui64 newTakeRows = std::max<ui64>(bytes / EstimatedRowBytes, 1);
newTakeRows = std::min<ui64>(newTakeRows, Data.size());
@@ -200,7 +199,7 @@ public:
}
}
- YQL_ENSURE(data.GetRaw().size() < ChunkSizeHardLimit);
+ YQL_ENSURE(data.GetRaw().size() < ChunkSizeLimit);
BasicStats.Chunks++;
BasicStats.RowsOut += takeRows;
@@ -305,6 +304,7 @@ private:
TDqDataSerializer DataSerializer;
const ui64 MaxStoredBytes;
ui64 MaxChunkBytes;
+ ui64 ChunkSizeLimit;
TLogFunc LogFunc;
using TDataType = TDeque<NUdf::TUnboxedValue, NKikimr::NMiniKQL::TMKQLAllocator<NUdf::TUnboxedValue>>;
@@ -332,6 +332,7 @@ public:
, Storage(settings.ChannelStorage)
, MaxStoredBytes(settings.MaxStoredBytes)
, MaxChunkBytes(settings.MaxChunkBytes)
+ , ChunkSizeLimit(settings.ChunkSizeLimit)
, LogFunc(log)
{
if (Storage && 3 * MaxChunkBytes > MaxStoredBytes) {
@@ -597,7 +598,7 @@ public:
if (spilledBlob.SerializedSize <= bytes * 2) {
data.Swap(&protoBlob);
- YQL_ENSURE(data.ByteSizeLong() <= ChunkSizeHardLimit);
+ YQL_ENSURE(data.ByteSizeLong() <= ChunkSizeLimit);
hasResult = true;
// LOG("return loaded blob as-is");
} else {
@@ -657,7 +658,7 @@ public:
data = DataSerializer.Serialize(firstDataIt, lastDataIt, OutputType);
}
- YQL_ENSURE(data.ByteSizeLong() <= ChunkSizeHardLimit);
+ YQL_ENSURE(data.ByteSizeLong() <= ChunkSizeLimit);
BasicStats.Chunks++;
BasicStats.RowsOut += takeRows;
@@ -800,6 +801,7 @@ private:
IDqChannelStorage::TPtr Storage;
const ui64 MaxStoredBytes;
ui64 MaxChunkBytes;
+ ui64 ChunkSizeLimit;
TLogFunc LogFunc;
std::optional<ui32> RowFixedSize;
@@ -841,7 +843,7 @@ IDqOutputChannel::TPtr CreateDqOutputChannel(ui64 channelId, NKikimr::NMiniKQL::
if (settings.AllowGeneratorsInUnboxedValues) {
YQL_ENSURE(!settings.ChannelStorage);
return new TDqOutputChannelOld(channelId, outputType, settings.CollectProfileStats, typeEnv, holderFactory,
- settings.MaxStoredBytes, settings.MaxChunkBytes, settings.TransportVersion, logFunc);
+ settings.MaxStoredBytes, settings.MaxChunkBytes, settings.ChunkSizeLimit, settings.TransportVersion, logFunc);
} else {
return new TDqOutputChannelNew(channelId, outputType, typeEnv, holderFactory, settings, logFunc);
}
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.h b/ydb/library/yql/dq/runtime/dq_output_channel.h
index d87422fef53..0adee34614e 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.h
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.h
@@ -26,7 +26,7 @@ struct TDqOutputChannelStats : TDqOutputStats {
explicit TDqOutputChannelStats(ui64 channelId)
: ChannelId(channelId) {}
-
+
template<typename T>
void FromProto(const T& f)
{
@@ -74,6 +74,7 @@ public:
struct TDqOutputChannelSettings {
ui64 MaxStoredBytes = 8_MB;
ui64 MaxChunkBytes = 2_MB;
+ ui64 ChunkSizeLimit = 48_MB;
NDqProto::EDataTransportVersion TransportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0;
IDqChannelStorage::TPtr ChannelStorage;
bool CollectProfileStats = false;
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 65bc0bf9b86..bb0a0a5bafd 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -518,6 +518,7 @@ public:
TDqOutputChannelSettings settings;
settings.MaxStoredBytes = memoryLimits.ChannelBufferSize;
settings.MaxChunkBytes = memoryLimits.OutputChunkMaxSize;
+ settings.ChunkSizeLimit = memoryLimits.ChunkSizeLimit;
settings.TransportVersion = outputChannelDesc.GetTransportVersion();
settings.CollectProfileStats = Settings.CollectProfileStats;
settings.AllowGeneratorsInUnboxedValues = Settings.AllowGeneratorsInUnboxedValues;
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 684e1df889f..aa7c32458dc 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -271,6 +271,7 @@ struct TDqTaskRunnerSettings {
struct TDqTaskRunnerMemoryLimits {
ui32 ChannelBufferSize = 0;
ui32 OutputChunkMaxSize = 0;
+ ui32 ChunkSizeLimit = 48_MB;
};
NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, const NKikimr::NMiniKQL::TType* type,