diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2022-12-29 15:50:19 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2022-12-29 15:50:19 +0300 |
commit | bebb9f9cc8ac7bd04eb1aba2531c5dd5bb5bc5f2 (patch) | |
tree | 9f1b8ca129efdbd4f1899ef7b8321972afb2d32a | |
parent | 7d154606d1d5e5ef2a0ee327e515df9099474cfb (diff) | |
download | ydb-bebb9f9cc8ac7bd04eb1aba2531c5dd5bb5bc5f2.tar.gz |
Fix block size in clickhouse udf. Fix data loss in clickhouse udf
Размер батчуемых данных в clikhouse udf был слишком велик, из-за этого, если мы перекладываем большое количество данных, в буфер попадают, причём надолго, несколько крупных кусков по 500-700 мегабайт (на каждом этапе цепочки, где контролируется backpressure). Кроме того, пропадает параллельность обработки: пока в clikhouse udf мы не сформировали огромный кусок данных, мы не записываем его в S3. Уменьшил размер буфера до примерно равного размерам буфера каналов в DQ.
Также обнаружил и пофиксил баг, когда мы в стриме формируем выходные данные, но возвращаем из него Yield. Тем самым данные выкидываются.
6 files changed, 202 insertions, 79 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp index 9f2e7787c8..086169fb95 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp @@ -120,16 +120,14 @@ struct TEvPrivate { using namespace NKikimr::NMiniKQL; class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> { - friend class TS3WriteActor; - using TActorBootstrapped<TS3FileWriteActor>::PassAway; public: TS3FileWriteActor( const TTxId& txId, IHTTPGateway::TPtr gateway, NYdb::TCredentialsProviderPtr credProvider, - const TString& key, const TString& url, size_t sizeLimit, const std::string_view& compression, + const TString& key, const TString& url, const std::string_view& compression, const IRetryPolicy<long>::TPtr& retryPolicy) : TxId(txId) , Gateway(std::move(gateway)) @@ -139,7 +137,6 @@ public: , Key(key) , Url(url) , RequestId(CreateGuidAsString()) - , SizeLimit(sizeLimit) , Parts(MakeCompressorQueue(compression)) { YQL_ENSURE(Parts, "Compression '" << compression << "' is not supported."); @@ -170,13 +167,15 @@ public: TActorBootstrapped<TS3FileWriteActor>::PassAway(); } - void SendData(TString&& data) { + void AddData(TString&& data) { Parts->Push(std::move(data)); + } - Y_UNUSED(SizeLimit); -//TODO: if (10000U == Tags.size() + Parts->Size() || SizeLimit <= SentSize + Parts->Volume()) - Parts->Seal(); + void Seal() { + Parts->Seal(); + } + void Go() { if (!UploadId.empty()) StartUploadParts(); } @@ -367,7 +366,6 @@ private: const TString Key; const TString Url; const TString RequestId; - const size_t SizeLimit; IOutputQueue::TPtr Parts; std::vector<TString> Tags; @@ -388,8 +386,8 @@ public: const TString& extension, const std::vector<TString>& keys, const size_t memoryLimit, - const size_t maxFileSize, const TString& compression, + bool multipart, IDqComputeActorAsyncOutput::ICallbacks* callbacks, const IRetryPolicy<long>::TPtr& retryPolicy) : Gateway(std::move(gateway)) @@ -405,8 +403,8 @@ public: , Extension(extension) , Keys(keys) , MemoryLimit(memoryLimit) - , MaxFileSize(maxFileSize) , Compression(compression) + , Multipart(multipart) { if (!RandomProvider) { DefaultRandomProvider = CreateDefaultRandomProvider(); @@ -456,16 +454,29 @@ private: ) void SendData(TUnboxedValueVector&& data, i64, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final { + std::unordered_set<TS3FileWriteActor*> processedActors; for (const auto& v : data) { const auto& key = MakePartitionKey(v); - const auto ins = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>()); - if (ins.second || ins.first->second.empty() || ins.first->second.back()->IsFinishing()) { - auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeOutputName() + Extension, MaxFileSize, Compression, RetryPolicy); - ins.first->second.emplace_back(fileWrite.get()); + const auto [keyIt, insertedNew] = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>()); + if (insertedNew || keyIt->second.empty() || keyIt->second.back()->IsFinishing()) { + auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeOutputName() + Extension, Compression, RetryPolicy); + keyIt->second.emplace_back(fileWrite.get()); RegisterWithSameMailbox(fileWrite.release()); } - ins.first->second.back()->SendData(TString((Keys.empty() ? v : *v.GetElements()).AsStringRef())); + const NUdf::TUnboxedValue& value = Keys.empty() ? v : *v.GetElements(); + TS3FileWriteActor* actor = keyIt->second.back(); + if (value) { + actor->AddData(TString(value.AsStringRef())); + } + if (!Multipart || !value) { + actor->Seal(); + } + processedActors.insert(actor); + } + + for (TS3FileWriteActor* actor : processedActors) { + actor->Go(); } if (finished) { @@ -555,8 +566,8 @@ private: const std::vector<TString> Keys; const size_t MemoryLimit; - const size_t MaxFileSize; const TString Compression; + const bool Multipart; bool Finished = false; ui64 EgressBytes = 0; @@ -592,8 +603,8 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor( params.HasExtension() ? params.GetExtension() : "", std::vector<TString>(params.GetKeys().cbegin(), params.GetKeys().cend()), params.HasMemoryLimit() ? params.GetMemoryLimit() : 1_GB, - params.HasMaxFileSize() ? params.GetMaxFileSize() : 50_MB, params.HasCompression() ? params.GetCompression() : "", + params.GetMultipart(), callbacks, retryPolicy); return {actor, actor}; diff --git a/ydb/library/yql/providers/s3/proto/sink.proto b/ydb/library/yql/providers/s3/proto/sink.proto index 7ee6610cd2..688478376e 100644 --- a/ydb/library/yql/providers/s3/proto/sink.proto +++ b/ydb/library/yql/providers/s3/proto/sink.proto @@ -10,6 +10,7 @@ message TSink { repeated string Keys = 4; optional string Compression = 5; optional uint64 MemoryLimit = 6; - optional uint64 MaxFileSize = 7; + optional uint64 MaxFileSize_deprecated = 7; optional string Extension = 8; + optional bool Multipart = 9; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp index cf888e5e8d..cbbfdd55a7 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp @@ -278,8 +278,11 @@ private: const auto structType = itemType->Cast<TStructExprType>(); const auto keysCount = input->Child(TS3SinkOutput::idx_KeyColumns)->ChildrenSize(); - if (const auto format = input->Child(TS3SinkOutput::idx_Format); keysCount) { - if (format->IsAtom({"raw","json_list"})) { + const auto format = input->Child(TS3SinkOutput::idx_Format); + const bool isSingleRowPerFileFormat = format->IsAtom({"raw","json_list"}); // One string from ClikhouseUdf ClickHouseClient.SerializeFormat results in one S3 file => no delimiters between files. + + if (keysCount) { + if (isSingleRowPerFileFormat) { ctx.AddError(TIssue(ctx.GetPosition(format->Pos()), TStringBuilder() << "Partitioned isn't supporter for " << format->Content() << " output format.")); return TStatus::Error; } @@ -295,7 +298,7 @@ private: } TTypeAnnotationNode::TListType itemTypes(keysCount + 1U, ctx.MakeType<TDataExprType>(EDataSlot::Utf8)); - itemTypes.front() = ctx.MakeType<TDataExprType>(EDataSlot::String); + itemTypes.front() = ctx.MakeType<TOptionalExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)); input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TTupleExprType>(itemTypes))); } } else { @@ -310,7 +313,11 @@ private: } } - input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String))); + if (isSingleRowPerFileFormat) { + input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String))); + } else { + input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TOptionalExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)))); + } } return TStatus::Ok; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index bd610534c1..56919944b9 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -41,6 +41,16 @@ std::string_view GetCompression(const TExprNode& settings) { return {}; } +bool GetMultipart(const TExprNode& settings) { + for (auto i = 0U; i < settings.ChildrenSize(); ++i) { + if (settings.Child(i)->Head().IsAtom("multipart")) { + return FromString(settings.Child(i)->Tail().Content()); + } + } + + return false; +} + using namespace NYql::NS3Details; class TS3DqIntegration: public TDqIntegrationBase { @@ -298,8 +308,8 @@ public: void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sinkType) override { const TDqSink sink(&node); - if (const auto maySettings = sink.Settings().Maybe<TS3SinkSettings>()) { - const auto settings = maySettings.Cast(); + if (const auto maybeSettings = sink.Settings().Maybe<TS3SinkSettings>()) { + const auto settings = maybeSettings.Cast(); const auto& cluster = sink.DataSink().Cast<TS3DataSink>().Cluster().StringValue(); const auto& connect = State_->Configuration->Clusters.at(cluster); @@ -311,15 +321,14 @@ public: for (const auto& key : GetKeys(settings.Settings().Ref())) sinkDesc.MutableKeys()->Add(TString(key->Content())); - if (const auto& maxObjectSize = State_->Configuration->MaxOutputObjectSize.Get()) - sinkDesc.SetMaxFileSize(*maxObjectSize); - if (const auto& memoryLimit = State_->Configuration->InFlightMemoryLimit.Get()) sinkDesc.SetMemoryLimit(*memoryLimit); if (const auto& compression = GetCompression(settings.Settings().Ref()); !compression.empty()) sinkDesc.SetCompression(TString(compression)); + sinkDesc.SetMultipart(GetMultipart(settings.Settings().Ref())); + protoSettings.PackFrom(sinkDesc); sinkType = "S3Sink"; } diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp index 290bb60cbd..970d49feea 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp @@ -8,6 +8,8 @@ #include <ydb/library/yql/providers/common/transform/yql_optimize.h> #include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h> +#include <util/generic/size_literals.h> + namespace NYql { namespace { @@ -165,6 +167,26 @@ public: sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair))); } + const TStringBuf format = targetNode.Format(); + if (format != "raw" || format != "json_list") { // multipart + { + TExprNode::TListType pair; + pair.push_back(ctx.NewAtom(targetNode.Pos(), "multipart")); + pair.push_back(ctx.NewAtom(targetNode.Pos(), "true")); + sinkSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair))); + } + { + TExprNode::TListType pair; + pair.push_back(ctx.NewAtom(targetNode.Pos(), "file_size_limit")); + size_t fileSize = 50_MB; + if (const auto& maxObjectSize = State_->Configuration->MaxOutputObjectSize.Get()) { + fileSize = *maxObjectSize; + } + pair.push_back(ctx.NewAtom(targetNode.Pos(), ToString(fileSize))); + sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair))); + } + } + if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) { YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as stage with sink."; return keys.empty() ? @@ -346,4 +368,3 @@ THolder<IGraphTransformer> CreateS3PhysicalOptProposalTransformer(TS3State::TPtr } } // namespace NYql - diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp index 556648d691..8a684a8a8a 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp @@ -1037,10 +1037,20 @@ class TSerializeFormat : public TBoxedValue { const size_t Size;
};
- using TPartitionByPayload = std::pair<NDB::Block, std::unique_ptr<NDB::IBlockOutputStream>>;
+ struct TPartitionByPayload {
+ explicit TPartitionByPayload(NDB::Block&& block)
+ : Block(std::move(block))
+ {}
+
+ NDB::Block Block;
+ std::unique_ptr<NDB::IBlockOutputStream> OutputStream;
+ size_t CurrentFileSize = 0; // Sum of size of blocks that were written to output with this key before closing footer
+ bool hasDataInBlock = false; // We wrote something to block at least once.
+ };
+
using TPartitionsMap = std::unordered_map<TPartitionByKey, TPartitionByPayload, TPartitionMapStuff, TPartitionMapStuff, TStdAllocatorForUdf<std::pair<const TPartitionByKey, TPartitionByPayload>>>;
public:
- TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<ui32>& keysIndexes, const std::vector<ui32>& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, size_t totalSizeLimit, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos)
+ TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<ui32>& keysIndexes, const std::vector<ui32>& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, size_t totalSizeLimit, size_t fileSizeLimit, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos)
: ValueBuilder(valueBuilder)
, Stream(stream)
, InMeta(inMeta)
@@ -1049,6 +1059,7 @@ class TSerializeFormat : public TBoxedValue { , BlockSizeLimit(blockSizeLimit)
, KeysCountLimit(keysCountLimit)
, TotalSizeLimit(totalSizeLimit)
+ , FileSizeLimit(fileSizeLimit)
, Pos(pos)
, HeaderBlock(columns)
, Buffer(std::make_unique<NDB::WriteBufferFromOwnString>())
@@ -1058,31 +1069,76 @@ class TSerializeFormat : public TBoxedValue { , Cache(KeysIndexes.size() + 1U)
{}
private:
- void FlushKey(const TPartitionsMap::iterator it, TUnboxedValue& result) {
- TotalSize -= it->second.first.bytes();
- if (!it->second.second) {
- it->second.second = std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(Type, *Buffer, HeaderBlock, nullptr, {}, Settings));
- it->second.second->writePrefix();
+ bool FlushKey(const TPartitionsMap::iterator it, TUnboxedValue& result, bool finishFile = false) {
+ const size_t currentBlockSize = it->second.Block.bytes();
+ TotalSize -= currentBlockSize;
+ const bool hasDataInFile = it->second.hasDataInBlock || it->second.OutputStream;
+ if (!hasDataInFile) {
+ return false;
}
- it->second.second->write(it->second.first);
- it->second.second->writeSuffix();
- it->second.second->flush();
- it->second.second.reset();
- it->second.first = HeaderBlock.cloneEmpty();
-
- if (KeysIndexes.empty())
- result = ValueBuilder->NewString(Buffer->str());
- else {
- TUnboxedValue* tupleItems = nullptr;
- result = Cache.NewArray(*ValueBuilder, tupleItems);
- *tupleItems++ = ValueBuilder->NewString(Buffer->str());
- std::copy(it->first.cbegin(), it->first.cend(), tupleItems);
+ if (!it->second.OutputStream) {
+ it->second.OutputStream = std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(Type, *Buffer, HeaderBlock, nullptr, {}, Settings));
+ it->second.OutputStream->writePrefix();
+ }
+ if (it->second.hasDataInBlock) {
+ it->second.OutputStream->write(it->second.Block);
+ }
+ it->second.CurrentFileSize += Buffer->stringRef().size;
+ if (it->second.CurrentFileSize >= FileSizeLimit) { // Finish current file.
+ finishFile = true;
+ }
+
+ if (finishFile && hasDataInFile) {
+ it->second.OutputStream->writeSuffix(); // Finish current file with optional footer.
+ }
+ it->second.OutputStream->flush();
+
+ if (finishFile) {
+ it->second.OutputStream.reset();
+ it->second.CurrentFileSize = 0;
+ }
+ it->second.Block = HeaderBlock.cloneEmpty();
+ it->second.hasDataInBlock = false;
+
+ std::string& resultString = Buffer->str();
+ bool hasResult = resultString.size() > 0;
+ if (hasResult) {
+ if (KeysIndexes.empty())
+ result = ValueBuilder->NewString(resultString);
+ else {
+ TUnboxedValue* tupleItems = nullptr;
+ result = Cache.NewArray(*ValueBuilder, tupleItems);
+ *tupleItems++ = ValueBuilder->NewString(resultString);
+ std::copy(it->first.cbegin(), it->first.cend(), tupleItems);
+ }
+ }
+
+ if (finishFile) {
+ FinishCurrentFileFlag = true; // The corresponding S3 write actor will finish current file and begin a new one.
+ if (!KeysIndexes.empty()) {
+ TUnboxedValue* tupleItems = nullptr;
+ FinishCurrentFileValue = Cache.NewArray(*ValueBuilder, tupleItems);
+ *tupleItems++ = TUnboxedValue(); // Empty optional as finishing mark of current file.
+ std::copy(it->first.cbegin(), it->first.cend(), tupleItems);
+ }
+ if (!hasResult) {
+ FinishCurrentFileFlag = false;
+ result = FinishCurrentFileValue;
+ hasResult = true;
+ }
}
Buffer->restart();
+ return hasResult;
}
EFetchStatus Fetch(TUnboxedValue& result) final try {
+ if (FinishCurrentFileFlag) {
+ FinishCurrentFileFlag = false;
+ result = FinishCurrentFileValue;
+ return EFetchStatus::Ok;
+ }
+
if (IsFinished && PartitionsMap.empty())
return EFetchStatus::Finish;
@@ -1091,34 +1147,38 @@ class TSerializeFormat : public TBoxedValue { case EFetchStatus::Yield:
return EFetchStatus::Yield;
case EFetchStatus::Ok: {
- if (TotalSizeLimit && TotalSizeLimit < TotalSize) {
- const auto top = std::max_element(PartitionsMap.begin(), PartitionsMap.end(),
- [](const TPartitionsMap::value_type& l, const TPartitionsMap::value_type& r){ return l.second.first.bytes(), r.second.first.bytes(); });
- FlushKey(top, result);
- }
-
TPartitionByKey keys(KeysIndexes.size());
for (auto i = 0U; i < keys.size(); ++i)
keys[i] = row.GetElement(KeysIndexes[i]);
- const auto ins = PartitionsMap.emplace(keys, std::make_pair(HeaderBlock.cloneEmpty(), nullptr));
+ const auto [partIt, insertedNew] = PartitionsMap.emplace(std::move(keys), HeaderBlock.cloneEmpty());
- if (ins.second && PartitionsMap.size() > KeysCountLimit) {
+ if (insertedNew && PartitionsMap.size() > KeysCountLimit) {
UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " Too many unique keys: " << PartitionsMap.size()).data());
}
- const bool flush = !ins.second && ins.first->second.first.bytes() >= BlockSizeLimit;
- if (flush)
- FlushKey(ins.first, result);
+ bool flush = !insertedNew && partIt->second.Block.bytes() >= BlockSizeLimit;
+ if (flush) {
+ Y_VERIFY(FlushKey(partIt, result));
+ }
- TotalSize -= ins.first->second.first.bytes();
- auto columns = ins.first->second.first.mutateColumns();
+ if (!flush && TotalSizeLimit && TotalSizeLimit <= TotalSize) {
+ const auto top = std::max_element(PartitionsMap.begin(), PartitionsMap.end(),
+ [](const TPartitionsMap::value_type& l, const TPartitionsMap::value_type& r){ return l.second.Block.bytes() < r.second.Block.bytes(); });
+ if (FlushKey(top, result)) {
+ flush = true;
+ }
+ }
+
+ TotalSize -= partIt->second.Block.bytes();
+ auto columns = partIt->second.Block.mutateColumns();
+ partIt->second.hasDataInBlock = true;
for (auto i = 0U; i < columns.size(); ++i) {
const auto index = PayloadsIndexes[i];
ConvertInputValue(row.GetElement(index), columns[i].get(), InMeta[index]);
}
- ins.first->second.first.setColumns(std::move(columns));
- TotalSize += ins.first->second.first.bytes();
+ partIt->second.Block.setColumns(std::move(columns));
+ TotalSize += partIt->second.Block.bytes();
if (flush)
return EFetchStatus::Ok;
@@ -1131,12 +1191,17 @@ class TSerializeFormat : public TBoxedValue { }
}
- if (PartitionsMap.empty())
+ if (PartitionsMap.empty() && !FinishCurrentFileFlag)
return EFetchStatus::Finish;
- FlushKey(PartitionsMap.begin(), result);
- PartitionsMap.erase(PartitionsMap.cbegin());
- return EFetchStatus::Ok;
+ while (!PartitionsMap.empty()) {
+ const bool hasResult = FlushKey(PartitionsMap.begin(), result, true);
+ PartitionsMap.erase(PartitionsMap.cbegin());
+ if (hasResult) {
+ return EFetchStatus::Ok;
+ }
+ }
+ return EFetchStatus::Finish;
}
catch (const Poco::Exception& e) {
UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
@@ -1153,7 +1218,10 @@ class TSerializeFormat : public TBoxedValue { const size_t BlockSizeLimit;
const size_t KeysCountLimit;
const size_t TotalSizeLimit;
+ const size_t FileSizeLimit;
const TSourcePosition Pos;
+ TUnboxedValue FinishCurrentFileValue;
+ bool FinishCurrentFileFlag = false;
const NDB::Block HeaderBlock;
const std::unique_ptr<NDB::WriteBufferFromOwnString> Buffer;
@@ -1168,12 +1236,12 @@ class TSerializeFormat : public TBoxedValue { size_t TotalSize = 0ULL;
};
public:
- TSerializeFormat(const std::string_view& type, const std::string_view& settings, std::vector<ui32>&& keysIndexes, std::vector<ui32>&& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, size_t totalSizeLimit, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns)
- : Type(type), Settings(GetFormatSettings(settings)), KeysIndexes(std::move(keysIndexes)), PayloadsIndexes(std::move(payloadsIndexes)), BlockSizeLimit(blockSizeLimit), KeysCountLimit(keysCountLimit), TotalSizeLimit(totalSizeLimit), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns))
+ TSerializeFormat(const std::string_view& type, const std::string_view& settings, std::vector<ui32>&& keysIndexes, std::vector<ui32>&& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, size_t totalSizeLimit, size_t fileSizeLimit, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns)
+ : Type(type), Settings(GetFormatSettings(settings)), KeysIndexes(std::move(keysIndexes)), PayloadsIndexes(std::move(payloadsIndexes)), BlockSizeLimit(blockSizeLimit), KeysCountLimit(keysCountLimit), TotalSizeLimit(totalSizeLimit), FileSizeLimit(fileSizeLimit), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns))
{}
TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try {
- return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, KeysIndexes, PayloadsIndexes, BlockSizeLimit, KeysCountLimit, TotalSizeLimit, InMeta, Columns, Pos));
+ return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, KeysIndexes, PayloadsIndexes, BlockSizeLimit, KeysCountLimit, TotalSizeLimit, FileSizeLimit, InMeta, Columns, Pos));
}
catch (const Poco::Exception& e) {
UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
@@ -1189,6 +1257,7 @@ private: const size_t BlockSizeLimit;
const size_t KeysCountLimit;
const size_t TotalSizeLimit;
+ const size_t FileSizeLimit;
const TSourcePosition Pos;
const std::vector<TColumnMeta> InMeta;
@@ -1532,9 +1601,10 @@ public: const auto jsonFrom = typeCfg.find('{');
std::vector<std::string> keys;
const auto& tail = std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom);
- size_t blockSizeLimit = 256_MB;
+ size_t blockSizeLimit = 1_MB;
size_t keysCountLimit = 4096;
- size_t totalSizeLimit = 1_GB;
+ size_t totalSizeLimit = 10_MB;
+ size_t fileSizeLimit = 50_MB;
if (!tail.empty()) {
const std::string str(tail);
const JSON json(str);
@@ -1544,23 +1614,27 @@ public: std::transform(list.begin(), list.end(), std::back_inserter(keys), std::mem_fun_ref(&JSON::getString));
}
if (json.has("block_size_limit")) {
- blockSizeLimit = json["block_size_limit"].getUInt();
+ blockSizeLimit = FromString(TStringBuf(json["block_size_limit"].getString()));
}
if (json.has("keys_count_limit")) {
- keysCountLimit = json["keys_count_limit"].getUInt();
+ keysCountLimit = FromString(TStringBuf(json["keys_count_limit"].getString()));
}
if (json.has("total_size_limit")) {
- totalSizeLimit = json["total_size_limit"].getUInt();
+ totalSizeLimit = FromString(TStringBuf(json["total_size_limit"].getString()));
+ }
+ if (json.has("file_size_limit")) {
+ fileSizeLimit = FromString(TStringBuf(json["file_size_limit"].getString()));
}
}
+ blockSizeLimit = Min(blockSizeLimit, fileSizeLimit);
builder.UserType(userType);
builder.Args()->Add(inputType).Done();
if (keys.empty())
- builder.Returns(builder.Stream()->Item<const char*>());
+ builder.Returns(builder.Stream()->Item(builder.Optional()->Item<const char*>()));
else {
const auto tuple = builder.Tuple();
- tuple->Add<const char*>();
+ tuple->Add(builder.Optional()->Item<const char*>());
for (auto k =0U; k < keys.size(); ++k)
tuple->Add<TUtf8>();
builder.Returns(builder.Stream()->Item(tuple->Build()));
@@ -1595,7 +1669,7 @@ public: }
if (!(flags & TFlags::TypesOnly)) {
- builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), tail, std::move(keyIndexes), std::move(payloadIndexes), blockSizeLimit, keysCountLimit, totalSizeLimit, builder.GetSourcePosition(), std::move(inMeta), std::move(columns)));
+ builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), tail, std::move(keyIndexes), std::move(payloadIndexes), blockSizeLimit, keysCountLimit, totalSizeLimit, fileSizeLimit, builder.GetSourcePosition(), std::move(inMeta), std::move(columns)));
}
return;
} else {
|