aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-07-28 18:45:35 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-07-28 18:45:35 +0300
commitd5d775a01feed14fd597f72586b5bf81c9de9fde (patch)
tree20d6c0d76c0fa92c493218173b2d0f56c445b143
parentb9a10e543a5624c857f7c598b2605e161e01d688 (diff)
downloadydb-d5d775a01feed14fd597f72586b5bf81c9de9fde.tar.gz
Add some serialization settings.
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp40
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp4
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.h4
3 files changed, 34 insertions, 14 deletions
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
index 3956e84a99..3a8cab235c 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_mkql_compiler.cpp
@@ -18,6 +18,7 @@ TRuntimeNode BuildSerializeCall(
const std::vector<std::string_view>& keys,
const std::string_view& format,
TType* inputType,
+ const TS3Configuration::TPtr& config,
NCommon::TMkqlBuildContext& ctx)
{
const auto inputItemType = AS_TYPE(TFlowType, inputType)->GetItemType();
@@ -41,6 +42,9 @@ TRuntimeNode BuildSerializeCall(
}
TString settings;
+ TStringOutput stream(settings);
+ NJson::TJsonWriter writer(&stream, NJson::TJsonWriterConfig());
+ writer.OpenMap();
if (!keys.empty()) {
const std::unordered_set<std::string_view> set(keys.cbegin(), keys.cend());
const auto structType = AS_TYPE(TStructType, inputItemType);
@@ -73,34 +77,42 @@ TRuntimeNode BuildSerializeCall(
);
}
- TStringOutput stream(settings);
- NJson::TJsonWriter writer(&stream, NJson::TJsonWriterConfig());
- writer.OpenMap();
- writer.WriteKey("keys");
- writer.OpenArray();
- std::for_each(keys.cbegin(), keys.cend(), [&writer](const std::string_view& key){ writer.Write(key); });
- writer.CloseArray();
- writer.CloseMap();
- writer.Flush();
+ writer.WriteKey("keys");
+ writer.OpenArray();
+ std::for_each(keys.cbegin(), keys.cend(), [&writer](const std::string_view& key){ writer.Write(key); });
+ writer.CloseArray();
+
+ if (const auto keysCount = config->UniqueKeysCountLimit.Get())
+ writer.Write("keys_count_limit", *keysCount);
}
+ if (const auto totalSize = config->SerializeMemoryLimit.Get())
+ writer.Write("total_size_limit", *totalSize);
+ if (const auto blockSize = config->BlockSizeMemoryLimit.Get())
+ writer.Write("block_size_limit", *blockSize);
+
+ writer.CloseMap();
+ writer.Flush();
+ if (settings == "{}")
+ settings.clear();
+
input = ctx.ProgramBuilder.FromFlow(input);
const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({input.GetStaticType()})});
return ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.SerializeFormat", {}, userType, format + settings), {input}));
}
-TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, NCommon::TMkqlBuildContext& ctx) {
+TRuntimeNode SerializeForS3(const TS3SinkOutput& wrapper, const TS3Configuration::TPtr& config, NCommon::TMkqlBuildContext& ctx) {
const auto input = MkqlBuildExpr(wrapper.Input().Ref(), ctx);
const auto inputItemType = NCommon::BuildType(wrapper.Input().Ref(), *wrapper.Input().Ref().GetTypeAnn(), ctx.ProgramBuilder);
std::vector<std::string_view> keys;
keys.reserve(wrapper.KeyColumns().Size());
wrapper.KeyColumns().Ref().ForEachChild([&](const TExprNode& key){ keys.emplace_back(key.Content()); });
- return BuildSerializeCall(input, keys, wrapper.Format().Value(), inputItemType, ctx);
+ return BuildSerializeCall(input, keys, wrapper.Format().Value(), inputItemType, config, ctx);
}
}
-void RegisterDqS3MkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, const TS3State::TPtr&) {
+void RegisterDqS3MkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, const TS3State::TPtr& state) {
compiler.ChainCallable(TDqSourceWideWrap::CallableName(),
[](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
if (const auto wrapper = TDqSourceWideWrap(&node); wrapper.DataSource().Category().Value() == S3ProviderName) {
@@ -115,8 +127,8 @@ void RegisterDqS3MkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, con
if (!compiler.HasCallable(TS3SinkOutput::CallableName()))
compiler.AddCallable(TS3SinkOutput::CallableName(),
- [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
- return SerializeForS3(TS3SinkOutput(&node), ctx);
+ [state](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) {
+ return SerializeForS3(TS3SinkOutput(&node), state->Configuration, ctx);
});
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
index d7b4b0bfa0..993dd90925 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.cpp
@@ -9,6 +9,10 @@ using namespace NCommon;
TS3Configuration::TS3Configuration()
{
REGISTER_SETTING(*this, SourceCoroActor);
+ REGISTER_SETTING(*this, MaxOutputObjectSize);
+ REGISTER_SETTING(*this, UniqueKeysCountLimit);
+ REGISTER_SETTING(*this, BlockSizeMemoryLimit);
+ REGISTER_SETTING(*this, SerializeMemoryLimit);
}
TS3Settings::TConstPtr TS3Configuration::Snapshot() const {
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
index 3c800540e6..8a557d1509 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
@@ -11,6 +11,10 @@ struct TS3Settings {
using TConstPtr = std::shared_ptr<const TS3Settings>;
NCommon::TConfSetting<bool, false> SourceCoroActor;
+ NCommon::TConfSetting<ui64, false> MaxOutputObjectSize;
+ NCommon::TConfSetting<ui64, false> UniqueKeysCountLimit;
+ NCommon::TConfSetting<ui64, false> BlockSizeMemoryLimit;
+ NCommon::TConfSetting<ui64, false> SerializeMemoryLimit; // Total serialization memory limit for all current blocks for all patition keys. Reachable in case of many but small partitions.
};
struct TS3ClusterSettings {