diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-28 18:45:35 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-28 18:45:35 +0300 |
commit | d5d775a01feed14fd597f72586b5bf81c9de9fde (patch) | |
tree | 20d6c0d76c0fa92c493218173b2d0f56c445b143 | |
parent | b9a10e543a5624c857f7c598b2605e161e01d688 (diff) | |
download | ydb-d5d775a01feed14fd597f72586b5bf81c9de9fde.tar.gz |
Add some serialization settings.
3 files changed, 34 insertions, 14 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp index 3956e84a99..3a8cab235c 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp @@ -18,6 +18,7 @@ TRuntimeNode BuildSerializeCall( const std::vector<std::string_view>& keys, const std::string_view& format, TType* inputType, + const TS3Configuration::TPtr& config, NCommon::TMkqlBuildContext& ctx) { const auto inputItemType = AS_TYPE(TFlowType, inputType)->GetItemType(); @@ -41,6 +42,9 @@ TRuntimeNode BuildSerializeCall( } TString settings; + TStringOutput stream(settings); + NJson::TJsonWriter writer(&stream, NJson::TJsonWriterConfig()); + writer.OpenMap(); if (!keys.empty()) { const std::unordered_set<std::string_view> set(keys.cbegin(), keys.cend()); const auto structType = AS_TYPE(TStructType, inputItemType); @@ -73,34 +77,42 @@ TRuntimeNode BuildSerializeCall( ); } - TStringOutput stream(settings); - NJson::TJsonWriter writer(&stream, NJson::TJsonWriterConfig()); - writer.OpenMap(); - writer.WriteKey("keys"); - writer.OpenArray(); - std::for_each(keys.cbegin(), keys.cend(), [&writer](const std::string_view& key){ writer.Write(key); }); - writer.CloseArray(); - writer.CloseMap(); - writer.Flush(); + writer.WriteKey("keys"); + writer.OpenArray(); + std::for_each(keys.cbegin(), keys.cend(), [&writer](const std::string_view& key){ writer.Write(key); }); + writer.CloseArray(); + + if (const auto keysCount = config->UniqueKeysCountLimit.Get()) + writer.Write("keys_count_limit", *keysCount); } + if (const auto totalSize = config->SerializeMemoryLimit.Get()) + writer.Write("total_size_limit", *totalSize); + if (const auto blockSize = config->BlockSizeMemoryLimit.Get()) + writer.Write("block_size_limit", *blockSize); + + writer.CloseMap(); + writer.Flush(); + if (settings == "{}") + settings.clear(); + input = ctx.ProgramBuilder.FromFlow(input); const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({input.GetStaticType()})}); return ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.SerializeFormat", {}, userType, format + settings), {input})); } -TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, NCommon::TMkqlBuildContext& ctx) { +TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, const TS3Configuration::TPtr& config, NCommon::TMkqlBuildContext& ctx) { const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx); const auto inputItemType = NCommon::BuildType(wrapper.Input().Ref(), *wrapper.Input().Ref().GetTypeAnn(), ctx.ProgramBuilder); std::vector<std::string_view> keys; keys.reserve(wrapper.KeyColumns().Size()); wrapper.KeyColumns().Ref().ForEachChild([&](const TExprNode& key){ keys.emplace_back(key.Content()); }); - return BuildSerializeCall(input, keys, wrapper.Format().Value(), inputItemType, ctx); + return BuildSerializeCall(input, keys, wrapper.Format().Value(), inputItemType, config, ctx); } } -void RegisterDqS3MkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, const TS3State::TPtr&) { +void RegisterDqS3MkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, const TS3State::TPtr& state) { compiler.ChainCallable(TDqSourceWideWrap::CallableName(), [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { if (const auto wrapper = TDqSourceWideWrap(&node); wrapper.DataSource().Category().Value() == S3ProviderName) { @@ -115,8 +127,8 @@ void RegisterDqS3MkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, con if (!compiler.HasCallable(TS3SinkOutput::CallableName())) compiler.AddCallable(TS3SinkOutput::CallableName(), - [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { - return SerializeForS3(TS3SinkOutput(&node), ctx); + [state](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { + return SerializeForS3(TS3SinkOutput(&node), state->Configuration, ctx); }); } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp index d7b4b0bfa0..993dd90925 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp @@ -9,6 +9,10 @@ using namespace NCommon; TS3Configuration::TS3Configuration() { REGISTER_SETTING(*this, SourceCoroActor); + REGISTER_SETTING(*this, MaxOutputObjectSize); + REGISTER_SETTING(*this, UniqueKeysCountLimit); + REGISTER_SETTING(*this, BlockSizeMemoryLimit); + REGISTER_SETTING(*this, SerializeMemoryLimit); } TS3Settings::TConstPtr TS3Configuration::Snapshot() const { diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index 3c800540e6..8a557d1509 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -11,6 +11,10 @@ struct TS3Settings { using TConstPtr = std::shared_ptr<const TS3Settings>; NCommon::TConfSetting<bool, false> SourceCoroActor; + NCommon::TConfSetting<ui64, false> MaxOutputObjectSize; + NCommon::TConfSetting<ui64, false> UniqueKeysCountLimit; + NCommon::TConfSetting<ui64, false> BlockSizeMemoryLimit; + NCommon::TConfSetting<ui64, false> SerializeMemoryLimit; // Total serialization memory limit for all current blocks for all patition keys. Reachable in case of many but small partitions. }; struct TS3ClusterSettings { |