diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-01-19 15:18:06 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-01-19 15:18:06 +0300 |
commit | 2dd2cadc3b3fedfde3ac08fd6eec73f5f8025abe (patch) | |
tree | 07b5a78ee314ee6b937ec9a6abfae37b46104dd5 | |
parent | 473ee8e351e58c3e5a2a0487471e3e219359a7fa (diff) | |
download | ydb-2dd2cadc3b3fedfde3ac08fd6eec73f5f8025abe.tar.gz |
Bound SqueezeToList output list size for json_list s3 output format
4 files changed, 8 insertions, 2 deletions
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 0e2076e798..24ec38a1d0 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -493,7 +493,7 @@ public: const TNarrowLambda& payloadSelector, bool isCompact = false, ui64 itemsCountHint = 0); TRuntimeNode NarrowSqueezeToHashedDict(TRuntimeNode stream, bool all, const TNarrowLambda& keySelector, const TNarrowLambda& payloadSelector, bool isCompact = false, ui64 itemsCountHint = 0); - TRuntimeNode SqueezeToList(TRuntimeNode flow, TRuntimeNode sizeHint); + TRuntimeNode SqueezeToList(TRuntimeNode flow, TRuntimeNode limit); // return list of 2-item tuples with key and payload TRuntimeNode DictItems(TRuntimeNode dict); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp index 7a2ca387e0..c736ccaf55 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp @@ -34,7 +34,11 @@ TRuntimeNode BuildSerializeCall( } ); } else if (format == "json_list") { - return ctx.ProgramBuilder.FlatMap(ctx.ProgramBuilder.SqueezeToList(input, ctx.ProgramBuilder.NewEmptyOptionalDataLiteral(NUdf::TDataType<ui64>::Id)), + ui64 jsonListSizeLimit = 10'000; + if (const auto userLimit = config->JsonListSizeLimit.Get()) { + jsonListSizeLimit = *userLimit; + } + return ctx.ProgramBuilder.FlatMap(ctx.ProgramBuilder.SqueezeToList(input, ctx.ProgramBuilder.NewDataLiteral<ui64>(jsonListSizeLimit)), [&ctx] (TRuntimeNode list) { TRuntimeNode listNotEmpty = ctx.ProgramBuilder.HasItems(list); const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({list.GetStaticType()})}); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index bad4670f41..ab10097afd 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -14,6 +14,7 @@ TS3Configuration::TS3Configuration() REGISTER_SETTING(*this, BlockSizeMemoryLimit); REGISTER_SETTING(*this, SerializeMemoryLimit); REGISTER_SETTING(*this, InFlightMemoryLimit); + REGISTER_SETTING(*this, JsonListSizeLimit).Upper(100'000); } TS3Settings::TConstPtr TS3Configuration::Snapshot() const { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index bfb8cfae57..437715fd5a 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -16,6 +16,7 @@ struct TS3Settings { NCommon::TConfSetting<ui64, false> BlockSizeMemoryLimit; NCommon::TConfSetting<ui64, false> SerializeMemoryLimit; // Total serialization memory limit for all current blocks for all patition keys. Reachable in case of many but small partitions. NCommon::TConfSetting<ui64, false> InFlightMemoryLimit; // Maximum memory used by one sink. + NCommon::TConfSetting<ui64, false> JsonListSizeLimit; // Limit of elements count in json list written to S3 file. Default: 10'000. Max: 100'000. }; struct TS3ClusterSettings { |