diff options
author | Alexey Ozeritskiy <[email protected]> | 2022-06-09 20:50:56 +0300 |
---|---|---|
committer | Alexey Ozeritskiy <[email protected]> | 2022-06-09 20:50:56 +0300 |
commit | 2a82ad54ef82e80dc3667f14c02193f98f22086e (patch) | |
tree | 09e8ef80cf80ecadaa22db516fd8c22a7339504c | |
parent | 66aefc6939acaf049ce10d9506310a1b5e05cfa9 (diff) |
YQL-14386: Can change ChunkSizeHardLimit
ref:3475f5268b56a19d50024e0840fb368b9ef47a99
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_output_channel.cpp | 18 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_output_channel.h | 3 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_tasks_runner.h | 1 |
4 files changed, 14 insertions, 9 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index ac4ca1accf0..88d23c8b38e 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -22,8 +22,6 @@ namespace { using namespace NKikimr; -const ui32 ChunkSizeHardLimit = 48 * 1024 * 1024; // 48 MB - bool IsSafeToEstimateValueSize(const NMiniKQL::TType* type) { if (type->GetKind() == NMiniKQL::TType::EKind::Data) { return true; @@ -68,7 +66,7 @@ class TDqOutputChannelOld : public IDqOutputChannel { public: TDqOutputChannelOld(ui64 channelId, NMiniKQL::TType* outputType, bool collectProfileStats, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, ui64 maxStoredBytes, - ui64 maxChunkBytes, NDqProto::EDataTransportVersion transportVersion, const TLogFunc& logFunc) + ui64 maxChunkBytes, ui64 chunkSizeLimit, NDqProto::EDataTransportVersion transportVersion, const TLogFunc& logFunc) : ChannelId(channelId) , OutputType(outputType) , BasicStats(ChannelId) @@ -76,6 +74,7 @@ public: , DataSerializer(typeEnv, holderFactory, transportVersion) , MaxStoredBytes(maxStoredBytes) , MaxChunkBytes(maxChunkBytes) + , ChunkSizeLimit(chunkSizeLimit) , LogFunc(logFunc) {} ui64 GetChannelId() const override { @@ -166,7 +165,7 @@ public: DLOG("Recalc estimated row size: " << EstimatedRowBytes); } - while (data.GetRaw().size() >= ChunkSizeHardLimit && takeRows > 1) { + while (data.GetRaw().size() >= ChunkSizeLimit && takeRows > 1) { ui64 newTakeRows = std::max<ui64>(bytes / EstimatedRowBytes, 1); newTakeRows = std::min<ui64>(newTakeRows, Data.size()); @@ -200,7 +199,7 @@ public: } } - YQL_ENSURE(data.GetRaw().size() < ChunkSizeHardLimit); + YQL_ENSURE(data.GetRaw().size() < ChunkSizeLimit); BasicStats.Chunks++; BasicStats.RowsOut += takeRows; @@ -305,6 +304,7 @@ private: TDqDataSerializer DataSerializer; const ui64 MaxStoredBytes; ui64 MaxChunkBytes; + ui64 ChunkSizeLimit; TLogFunc LogFunc; using TDataType = TDeque<NUdf::TUnboxedValue, NKikimr::NMiniKQL::TMKQLAllocator<NUdf::TUnboxedValue>>; @@ -332,6 +332,7 @@ public: , Storage(settings.ChannelStorage) , MaxStoredBytes(settings.MaxStoredBytes) , MaxChunkBytes(settings.MaxChunkBytes) + , ChunkSizeLimit(settings.ChunkSizeLimit) , LogFunc(log) { if (Storage && 3 * MaxChunkBytes > MaxStoredBytes) { @@ -597,7 +598,7 @@ public: if (spilledBlob.SerializedSize <= bytes * 2) { data.Swap(&protoBlob); - YQL_ENSURE(data.ByteSizeLong() <= ChunkSizeHardLimit); + YQL_ENSURE(data.ByteSizeLong() <= ChunkSizeLimit); hasResult = true; // LOG("return loaded blob as-is"); } else { @@ -657,7 +658,7 @@ public: data = DataSerializer.Serialize(firstDataIt, lastDataIt, OutputType); } - YQL_ENSURE(data.ByteSizeLong() <= ChunkSizeHardLimit); + YQL_ENSURE(data.ByteSizeLong() <= ChunkSizeLimit); BasicStats.Chunks++; BasicStats.RowsOut += takeRows; @@ -800,6 +801,7 @@ private: IDqChannelStorage::TPtr Storage; const ui64 MaxStoredBytes; ui64 MaxChunkBytes; + ui64 ChunkSizeLimit; TLogFunc LogFunc; std::optional<ui32> RowFixedSize; @@ -841,7 +843,7 @@ IDqOutputChannel::TPtr CreateDqOutputChannel(ui64 channelId, NKikimr::NMiniKQL:: if (settings.AllowGeneratorsInUnboxedValues) { YQL_ENSURE(!settings.ChannelStorage); return new TDqOutputChannelOld(channelId, outputType, settings.CollectProfileStats, typeEnv, holderFactory, - settings.MaxStoredBytes, settings.MaxChunkBytes, settings.TransportVersion, logFunc); + settings.MaxStoredBytes, settings.MaxChunkBytes, settings.ChunkSizeLimit, settings.TransportVersion, logFunc); } else { return new TDqOutputChannelNew(channelId, outputType, typeEnv, holderFactory, settings, logFunc); } diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.h b/ydb/library/yql/dq/runtime/dq_output_channel.h index d87422fef53..0adee34614e 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.h +++ b/ydb/library/yql/dq/runtime/dq_output_channel.h @@ -26,7 +26,7 @@ struct TDqOutputChannelStats : TDqOutputStats { explicit TDqOutputChannelStats(ui64 channelId) : ChannelId(channelId) {} - + template<typename T> void FromProto(const T& f) { @@ -74,6 +74,7 @@ public: struct TDqOutputChannelSettings { ui64 MaxStoredBytes = 8_MB; ui64 MaxChunkBytes = 2_MB; + ui64 ChunkSizeLimit = 48_MB; NDqProto::EDataTransportVersion TransportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0; IDqChannelStorage::TPtr ChannelStorage; bool CollectProfileStats = false; diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 65bc0bf9b86..bb0a0a5bafd 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -518,6 +518,7 @@ public: TDqOutputChannelSettings settings; settings.MaxStoredBytes = memoryLimits.ChannelBufferSize; settings.MaxChunkBytes = memoryLimits.OutputChunkMaxSize; + settings.ChunkSizeLimit = memoryLimits.ChunkSizeLimit; settings.TransportVersion = outputChannelDesc.GetTransportVersion(); settings.CollectProfileStats = Settings.CollectProfileStats; settings.AllowGeneratorsInUnboxedValues = Settings.AllowGeneratorsInUnboxedValues; diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 684e1df889f..aa7c32458dc 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -271,6 +271,7 @@ struct TDqTaskRunnerSettings { struct TDqTaskRunnerMemoryLimits { ui32 ChannelBufferSize = 0; ui32 OutputChunkMaxSize = 0; + ui32 ChunkSizeLimit = 48_MB; }; NUdf::TUnboxedValue DqBuildInputValue(const NDqProto::TTaskInput& inputDesc, const NKikimr::NMiniKQL::TType* type, |