aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2022-12-29 15:50:19 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2022-12-29 15:50:19 +0300
commitbebb9f9cc8ac7bd04eb1aba2531c5dd5bb5bc5f2 (patch)
tree9f1b8ca129efdbd4f1899ef7b8321972afb2d32a
parent7d154606d1d5e5ef2a0ee327e515df9099474cfb (diff)
downloadydb-bebb9f9cc8ac7bd04eb1aba2531c5dd5bb5bc5f2.tar.gz
Fix block size in clickhouse udf. Fix data loss in clickhouse udf
Размер батчуемых данных в clikhouse udf был слишком велик, из-за этого, если мы перекладываем большое количество данных, в буфер попадают, причём надолго, несколько крупных кусков по 500-700 мегабайт (на каждом этапе цепочки, где контролируется backpressure). Кроме того, пропадает параллельность обработки: пока в clikhouse udf мы не сформировали огромный кусок данных, мы не записываем его в S3. Уменьшил размер буфера до примерно равного размерам буфера каналов в DQ. Также обнаружил и пофиксил баг, когда мы в стриме формируем выходные данные, но возвращаем из него Yield. Тем самым данные выкидываются.
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp47
-rw-r--r--ydb/library/yql/providers/s3/proto/sink.proto3
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp15
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp19
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp23
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp174
6 files changed, 202 insertions, 79 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
index 9f2e7787c8..086169fb95 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_write_actor.cpp
@@ -120,16 +120,14 @@ struct TEvPrivate {
using namespace NKikimr::NMiniKQL;
class TS3FileWriteActor : public TActorBootstrapped<TS3FileWriteActor> {
-
friend class TS3WriteActor;
- using TActorBootstrapped<TS3FileWriteActor>::PassAway;
public:
TS3FileWriteActor(
const TTxId& txId,
IHTTPGateway::TPtr gateway,
NYdb::TCredentialsProviderPtr credProvider,
- const TString& key, const TString& url, size_t sizeLimit, const std::string_view& compression,
+ const TString& key, const TString& url, const std::string_view& compression,
const IRetryPolicy<long>::TPtr& retryPolicy)
: TxId(txId)
, Gateway(std::move(gateway))
@@ -139,7 +137,6 @@ public:
, Key(key)
, Url(url)
, RequestId(CreateGuidAsString())
- , SizeLimit(sizeLimit)
, Parts(MakeCompressorQueue(compression))
{
YQL_ENSURE(Parts, "Compression '" << compression << "' is not supported.");
@@ -170,13 +167,15 @@ public:
TActorBootstrapped<TS3FileWriteActor>::PassAway();
}
- void SendData(TString&& data) {
+ void AddData(TString&& data) {
Parts->Push(std::move(data));
+ }
- Y_UNUSED(SizeLimit);
-//TODO: if (10000U == Tags.size() + Parts->Size() || SizeLimit <= SentSize + Parts->Volume())
- Parts->Seal();
+ void Seal() {
+ Parts->Seal();
+ }
+ void Go() {
if (!UploadId.empty())
StartUploadParts();
}
@@ -367,7 +366,6 @@ private:
const TString Key;
const TString Url;
const TString RequestId;
- const size_t SizeLimit;
IOutputQueue::TPtr Parts;
std::vector<TString> Tags;
@@ -388,8 +386,8 @@ public:
const TString& extension,
const std::vector<TString>& keys,
const size_t memoryLimit,
- const size_t maxFileSize,
const TString& compression,
+ bool multipart,
IDqComputeActorAsyncOutput::ICallbacks* callbacks,
const IRetryPolicy<long>::TPtr& retryPolicy)
: Gateway(std::move(gateway))
@@ -405,8 +403,8 @@ public:
, Extension(extension)
, Keys(keys)
, MemoryLimit(memoryLimit)
- , MaxFileSize(maxFileSize)
, Compression(compression)
+ , Multipart(multipart)
{
if (!RandomProvider) {
DefaultRandomProvider = CreateDefaultRandomProvider();
@@ -456,16 +454,29 @@ private:
)
void SendData(TUnboxedValueVector&& data, i64, const TMaybe<NDqProto::TCheckpoint>&, bool finished) final {
+ std::unordered_set<TS3FileWriteActor*> processedActors;
for (const auto& v : data) {
const auto& key = MakePartitionKey(v);
- const auto ins = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>());
- if (ins.second || ins.first->second.empty() || ins.first->second.back()->IsFinishing()) {
- auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeOutputName() + Extension, MaxFileSize, Compression, RetryPolicy);
- ins.first->second.emplace_back(fileWrite.get());
+ const auto [keyIt, insertedNew] = FileWriteActors.emplace(key, std::vector<TS3FileWriteActor*>());
+ if (insertedNew || keyIt->second.empty() || keyIt->second.back()->IsFinishing()) {
+ auto fileWrite = std::make_unique<TS3FileWriteActor>(TxId, Gateway, CredProvider, key, Url + Path + key + MakeOutputName() + Extension, Compression, RetryPolicy);
+ keyIt->second.emplace_back(fileWrite.get());
RegisterWithSameMailbox(fileWrite.release());
}
- ins.first->second.back()->SendData(TString((Keys.empty() ? v : *v.GetElements()).AsStringRef()));
+ const NUdf::TUnboxedValue& value = Keys.empty() ? v : *v.GetElements();
+ TS3FileWriteActor* actor = keyIt->second.back();
+ if (value) {
+ actor->AddData(TString(value.AsStringRef()));
+ }
+ if (!Multipart || !value) {
+ actor->Seal();
+ }
+ processedActors.insert(actor);
+ }
+
+ for (TS3FileWriteActor* actor : processedActors) {
+ actor->Go();
}
if (finished) {
@@ -555,8 +566,8 @@ private:
const std::vector<TString> Keys;
const size_t MemoryLimit;
- const size_t MaxFileSize;
const TString Compression;
+ const bool Multipart;
bool Finished = false;
ui64 EgressBytes = 0;
@@ -592,8 +603,8 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateS3WriteActor(
params.HasExtension() ? params.GetExtension() : "",
std::vector<TString>(params.GetKeys().cbegin(), params.GetKeys().cend()),
params.HasMemoryLimit() ? params.GetMemoryLimit() : 1_GB,
- params.HasMaxFileSize() ? params.GetMaxFileSize() : 50_MB,
params.HasCompression() ? params.GetCompression() : "",
+ params.GetMultipart(),
callbacks,
retryPolicy);
return {actor, actor};
diff --git a/ydb/library/yql/providers/s3/proto/sink.proto b/ydb/library/yql/providers/s3/proto/sink.proto
index 7ee6610cd2..688478376e 100644
--- a/ydb/library/yql/providers/s3/proto/sink.proto
+++ b/ydb/library/yql/providers/s3/proto/sink.proto
@@ -10,6 +10,7 @@ message TSink {
repeated string Keys = 4;
optional string Compression = 5;
optional uint64 MemoryLimit = 6;
- optional uint64 MaxFileSize = 7;
+ optional uint64 MaxFileSize_deprecated = 7;
optional string Extension = 8;
+ optional bool Multipart = 9;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
index cf888e5e8d..cbbfdd55a7 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_datasink_type_ann.cpp
@@ -278,8 +278,11 @@ private:
const auto structType = itemType->Cast<TStructExprType>();
const auto keysCount = input->Child(TS3SinkOutput::idx_KeyColumns)->ChildrenSize();
- if (const auto format = input->Child(TS3SinkOutput::idx_Format); keysCount) {
- if (format->IsAtom({"raw","json_list"})) {
+ const auto format = input->Child(TS3SinkOutput::idx_Format);
+ const bool isSingleRowPerFileFormat = format->IsAtom({"raw","json_list"}); // One string from ClikhouseUdf ClickHouseClient.SerializeFormat results in one S3 file => no delimiters between files.
+
+ if (keysCount) {
+ if (isSingleRowPerFileFormat) {
ctx.AddError(TIssue(ctx.GetPosition(format->Pos()), TStringBuilder() << "Partitioned isn't supporter for " << format->Content() << " output format."));
return TStatus::Error;
}
@@ -295,7 +298,7 @@ private:
}
TTypeAnnotationNode::TListType itemTypes(keysCount + 1U, ctx.MakeType<TDataExprType>(EDataSlot::Utf8));
- itemTypes.front() = ctx.MakeType<TDataExprType>(EDataSlot::String);
+ itemTypes.front() = ctx.MakeType<TOptionalExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String));
input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TTupleExprType>(itemTypes)));
}
} else {
@@ -310,7 +313,11 @@ private:
}
}
- input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)));
+ if (isSingleRowPerFileFormat) {
+ input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String)));
+ } else {
+ input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TOptionalExprType>(ctx.MakeType<TDataExprType>(EDataSlot::String))));
+ }
}
return TStatus::Ok;
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index bd610534c1..56919944b9 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -41,6 +41,16 @@ std::string_view GetCompression(const TExprNode& settings) {
return {};
}
+bool GetMultipart(const TExprNode& settings) {
+ for (auto i = 0U; i < settings.ChildrenSize(); ++i) {
+ if (settings.Child(i)->Head().IsAtom("multipart")) {
+ return FromString(settings.Child(i)->Tail().Content());
+ }
+ }
+
+ return false;
+}
+
using namespace NYql::NS3Details;
class TS3DqIntegration: public TDqIntegrationBase {
@@ -298,8 +308,8 @@ public:
void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& protoSettings, TString& sinkType) override {
const TDqSink sink(&node);
- if (const auto maySettings = sink.Settings().Maybe<TS3SinkSettings>()) {
- const auto settings = maySettings.Cast();
+ if (const auto maybeSettings = sink.Settings().Maybe<TS3SinkSettings>()) {
+ const auto settings = maybeSettings.Cast();
const auto& cluster = sink.DataSink().Cast<TS3DataSink>().Cluster().StringValue();
const auto& connect = State_->Configuration->Clusters.at(cluster);
@@ -311,15 +321,14 @@ public:
for (const auto& key : GetKeys(settings.Settings().Ref()))
sinkDesc.MutableKeys()->Add(TString(key->Content()));
- if (const auto& maxObjectSize = State_->Configuration->MaxOutputObjectSize.Get())
- sinkDesc.SetMaxFileSize(*maxObjectSize);
-
if (const auto& memoryLimit = State_->Configuration->InFlightMemoryLimit.Get())
sinkDesc.SetMemoryLimit(*memoryLimit);
if (const auto& compression = GetCompression(settings.Settings().Ref()); !compression.empty())
sinkDesc.SetCompression(TString(compression));
+ sinkDesc.SetMultipart(GetMultipart(settings.Settings().Ref()));
+
protoSettings.PackFrom(sinkDesc);
sinkType = "S3Sink";
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
index 290bb60cbd..970d49feea 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_phy_opt.cpp
@@ -8,6 +8,8 @@
#include <ydb/library/yql/providers/common/transform/yql_optimize.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
+#include <util/generic/size_literals.h>
+
namespace NYql {
namespace {
@@ -165,6 +167,26 @@ public:
sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair)));
}
+ const TStringBuf format = targetNode.Format();
+ if (format != "raw" || format != "json_list") { // multipart
+ {
+ TExprNode::TListType pair;
+ pair.push_back(ctx.NewAtom(targetNode.Pos(), "multipart"));
+ pair.push_back(ctx.NewAtom(targetNode.Pos(), "true"));
+ sinkSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair)));
+ }
+ {
+ TExprNode::TListType pair;
+ pair.push_back(ctx.NewAtom(targetNode.Pos(), "file_size_limit"));
+ size_t fileSize = 50_MB;
+ if (const auto& maxObjectSize = State_->Configuration->MaxOutputObjectSize.Get()) {
+ fileSize = *maxObjectSize;
+ }
+ pair.push_back(ctx.NewAtom(targetNode.Pos(), ToString(fileSize)));
+ sinkOutputSettingsBuilder.Add(ctx.NewList(targetNode.Pos(), std::move(pair)));
+ }
+ }
+
if (!FindNode(write.Input().Ptr(), [] (const TExprNode::TPtr& node) { return node->IsCallable(TCoDataSource::CallableName()); })) {
YQL_CLOG(INFO, ProviderS3) << "Rewrite pure S3WriteObject `" << cluster << "`.`" << targetNode.Path().StringValue() << "` as stage with sink.";
return keys.empty() ?
@@ -346,4 +368,3 @@ THolder<IGraphTransformer> CreateS3PhysicalOptProposalTransformer(TS3State::TPtr
}
} // namespace NYql
-
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
index 556648d691..8a684a8a8a 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
@@ -1037,10 +1037,20 @@ class TSerializeFormat : public TBoxedValue {
const size_t Size;
};
- using TPartitionByPayload = std::pair<NDB::Block, std::unique_ptr<NDB::IBlockOutputStream>>;
+ struct TPartitionByPayload {
+ explicit TPartitionByPayload(NDB::Block&& block)
+ : Block(std::move(block))
+ {}
+
+ NDB::Block Block;
+ std::unique_ptr<NDB::IBlockOutputStream> OutputStream;
+ size_t CurrentFileSize = 0; // Sum of size of blocks that were written to output with this key before closing footer
+ bool hasDataInBlock = false; // We wrote something to block at least once.
+ };
+
using TPartitionsMap = std::unordered_map<TPartitionByKey, TPartitionByPayload, TPartitionMapStuff, TPartitionMapStuff, TStdAllocatorForUdf<std::pair<const TPartitionByKey, TPartitionByPayload>>>;
public:
- TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<ui32>& keysIndexes, const std::vector<ui32>& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, size_t totalSizeLimit, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos)
+ TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<ui32>& keysIndexes, const std::vector<ui32>& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, size_t totalSizeLimit, size_t fileSizeLimit, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos)
: ValueBuilder(valueBuilder)
, Stream(stream)
, InMeta(inMeta)
@@ -1049,6 +1059,7 @@ class TSerializeFormat : public TBoxedValue {
, BlockSizeLimit(blockSizeLimit)
, KeysCountLimit(keysCountLimit)
, TotalSizeLimit(totalSizeLimit)
+ , FileSizeLimit(fileSizeLimit)
, Pos(pos)
, HeaderBlock(columns)
, Buffer(std::make_unique<NDB::WriteBufferFromOwnString>())
@@ -1058,31 +1069,76 @@ class TSerializeFormat : public TBoxedValue {
, Cache(KeysIndexes.size() + 1U)
{}
private:
- void FlushKey(const TPartitionsMap::iterator it, TUnboxedValue& result) {
- TotalSize -= it->second.first.bytes();
- if (!it->second.second) {
- it->second.second = std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(Type, *Buffer, HeaderBlock, nullptr, {}, Settings));
- it->second.second->writePrefix();
+ bool FlushKey(const TPartitionsMap::iterator it, TUnboxedValue& result, bool finishFile = false) {
+ const size_t currentBlockSize = it->second.Block.bytes();
+ TotalSize -= currentBlockSize;
+ const bool hasDataInFile = it->second.hasDataInBlock || it->second.OutputStream;
+ if (!hasDataInFile) {
+ return false;
}
- it->second.second->write(it->second.first);
- it->second.second->writeSuffix();
- it->second.second->flush();
- it->second.second.reset();
- it->second.first = HeaderBlock.cloneEmpty();
-
- if (KeysIndexes.empty())
- result = ValueBuilder->NewString(Buffer->str());
- else {
- TUnboxedValue* tupleItems = nullptr;
- result = Cache.NewArray(*ValueBuilder, tupleItems);
- *tupleItems++ = ValueBuilder->NewString(Buffer->str());
- std::copy(it->first.cbegin(), it->first.cend(), tupleItems);
+ if (!it->second.OutputStream) {
+ it->second.OutputStream = std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(Type, *Buffer, HeaderBlock, nullptr, {}, Settings));
+ it->second.OutputStream->writePrefix();
+ }
+ if (it->second.hasDataInBlock) {
+ it->second.OutputStream->write(it->second.Block);
+ }
+ it->second.CurrentFileSize += Buffer->stringRef().size;
+ if (it->second.CurrentFileSize >= FileSizeLimit) { // Finish current file.
+ finishFile = true;
+ }
+
+ if (finishFile && hasDataInFile) {
+ it->second.OutputStream->writeSuffix(); // Finish current file with optional footer.
+ }
+ it->second.OutputStream->flush();
+
+ if (finishFile) {
+ it->second.OutputStream.reset();
+ it->second.CurrentFileSize = 0;
+ }
+ it->second.Block = HeaderBlock.cloneEmpty();
+ it->second.hasDataInBlock = false;
+
+ std::string& resultString = Buffer->str();
+ bool hasResult = resultString.size() > 0;
+ if (hasResult) {
+ if (KeysIndexes.empty())
+ result = ValueBuilder->NewString(resultString);
+ else {
+ TUnboxedValue* tupleItems = nullptr;
+ result = Cache.NewArray(*ValueBuilder, tupleItems);
+ *tupleItems++ = ValueBuilder->NewString(resultString);
+ std::copy(it->first.cbegin(), it->first.cend(), tupleItems);
+ }
+ }
+
+ if (finishFile) {
+ FinishCurrentFileFlag = true; // The corresponding S3 write actor will finish current file and begin a new one.
+ if (!KeysIndexes.empty()) {
+ TUnboxedValue* tupleItems = nullptr;
+ FinishCurrentFileValue = Cache.NewArray(*ValueBuilder, tupleItems);
+ *tupleItems++ = TUnboxedValue(); // Empty optional as finishing mark of current file.
+ std::copy(it->first.cbegin(), it->first.cend(), tupleItems);
+ }
+ if (!hasResult) {
+ FinishCurrentFileFlag = false;
+ result = FinishCurrentFileValue;
+ hasResult = true;
+ }
}
Buffer->restart();
+ return hasResult;
}
EFetchStatus Fetch(TUnboxedValue& result) final try {
+ if (FinishCurrentFileFlag) {
+ FinishCurrentFileFlag = false;
+ result = FinishCurrentFileValue;
+ return EFetchStatus::Ok;
+ }
+
if (IsFinished && PartitionsMap.empty())
return EFetchStatus::Finish;
@@ -1091,34 +1147,38 @@ class TSerializeFormat : public TBoxedValue {
case EFetchStatus::Yield:
return EFetchStatus::Yield;
case EFetchStatus::Ok: {
- if (TotalSizeLimit && TotalSizeLimit < TotalSize) {
- const auto top = std::max_element(PartitionsMap.begin(), PartitionsMap.end(),
- [](const TPartitionsMap::value_type& l, const TPartitionsMap::value_type& r){ return l.second.first.bytes(), r.second.first.bytes(); });
- FlushKey(top, result);
- }
-
TPartitionByKey keys(KeysIndexes.size());
for (auto i = 0U; i < keys.size(); ++i)
keys[i] = row.GetElement(KeysIndexes[i]);
- const auto ins = PartitionsMap.emplace(keys, std::make_pair(HeaderBlock.cloneEmpty(), nullptr));
+ const auto [partIt, insertedNew] = PartitionsMap.emplace(std::move(keys), HeaderBlock.cloneEmpty());
- if (ins.second && PartitionsMap.size() > KeysCountLimit) {
+ if (insertedNew && PartitionsMap.size() > KeysCountLimit) {
UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " Too many unique keys: " << PartitionsMap.size()).data());
}
- const bool flush = !ins.second && ins.first->second.first.bytes() >= BlockSizeLimit;
- if (flush)
- FlushKey(ins.first, result);
+ bool flush = !insertedNew && partIt->second.Block.bytes() >= BlockSizeLimit;
+ if (flush) {
+ Y_VERIFY(FlushKey(partIt, result));
+ }
- TotalSize -= ins.first->second.first.bytes();
- auto columns = ins.first->second.first.mutateColumns();
+ if (!flush && TotalSizeLimit && TotalSizeLimit <= TotalSize) {
+ const auto top = std::max_element(PartitionsMap.begin(), PartitionsMap.end(),
+ [](const TPartitionsMap::value_type& l, const TPartitionsMap::value_type& r){ return l.second.Block.bytes() < r.second.Block.bytes(); });
+ if (FlushKey(top, result)) {
+ flush = true;
+ }
+ }
+
+ TotalSize -= partIt->second.Block.bytes();
+ auto columns = partIt->second.Block.mutateColumns();
+ partIt->second.hasDataInBlock = true;
for (auto i = 0U; i < columns.size(); ++i) {
const auto index = PayloadsIndexes[i];
ConvertInputValue(row.GetElement(index), columns[i].get(), InMeta[index]);
}
- ins.first->second.first.setColumns(std::move(columns));
- TotalSize += ins.first->second.first.bytes();
+ partIt->second.Block.setColumns(std::move(columns));
+ TotalSize += partIt->second.Block.bytes();
if (flush)
return EFetchStatus::Ok;
@@ -1131,12 +1191,17 @@ class TSerializeFormat : public TBoxedValue {
}
}
- if (PartitionsMap.empty())
+ if (PartitionsMap.empty() && !FinishCurrentFileFlag)
return EFetchStatus::Finish;
- FlushKey(PartitionsMap.begin(), result);
- PartitionsMap.erase(PartitionsMap.cbegin());
- return EFetchStatus::Ok;
+ while (!PartitionsMap.empty()) {
+ const bool hasResult = FlushKey(PartitionsMap.begin(), result, true);
+ PartitionsMap.erase(PartitionsMap.cbegin());
+ if (hasResult) {
+ return EFetchStatus::Ok;
+ }
+ }
+ return EFetchStatus::Finish;
}
catch (const Poco::Exception& e) {
UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
@@ -1153,7 +1218,10 @@ class TSerializeFormat : public TBoxedValue {
const size_t BlockSizeLimit;
const size_t KeysCountLimit;
const size_t TotalSizeLimit;
+ const size_t FileSizeLimit;
const TSourcePosition Pos;
+ TUnboxedValue FinishCurrentFileValue;
+ bool FinishCurrentFileFlag = false;
const NDB::Block HeaderBlock;
const std::unique_ptr<NDB::WriteBufferFromOwnString> Buffer;
@@ -1168,12 +1236,12 @@ class TSerializeFormat : public TBoxedValue {
size_t TotalSize = 0ULL;
};
public:
- TSerializeFormat(const std::string_view& type, const std::string_view& settings, std::vector<ui32>&& keysIndexes, std::vector<ui32>&& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, size_t totalSizeLimit, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns)
- : Type(type), Settings(GetFormatSettings(settings)), KeysIndexes(std::move(keysIndexes)), PayloadsIndexes(std::move(payloadsIndexes)), BlockSizeLimit(blockSizeLimit), KeysCountLimit(keysCountLimit), TotalSizeLimit(totalSizeLimit), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns))
+ TSerializeFormat(const std::string_view& type, const std::string_view& settings, std::vector<ui32>&& keysIndexes, std::vector<ui32>&& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, size_t totalSizeLimit, size_t fileSizeLimit, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns)
+ : Type(type), Settings(GetFormatSettings(settings)), KeysIndexes(std::move(keysIndexes)), PayloadsIndexes(std::move(payloadsIndexes)), BlockSizeLimit(blockSizeLimit), KeysCountLimit(keysCountLimit), TotalSizeLimit(totalSizeLimit), FileSizeLimit(fileSizeLimit), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns))
{}
TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try {
- return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, KeysIndexes, PayloadsIndexes, BlockSizeLimit, KeysCountLimit, TotalSizeLimit, InMeta, Columns, Pos));
+ return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, KeysIndexes, PayloadsIndexes, BlockSizeLimit, KeysCountLimit, TotalSizeLimit, FileSizeLimit, InMeta, Columns, Pos));
}
catch (const Poco::Exception& e) {
UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
@@ -1189,6 +1257,7 @@ private:
const size_t BlockSizeLimit;
const size_t KeysCountLimit;
const size_t TotalSizeLimit;
+ const size_t FileSizeLimit;
const TSourcePosition Pos;
const std::vector<TColumnMeta> InMeta;
@@ -1532,9 +1601,10 @@ public:
const auto jsonFrom = typeCfg.find('{');
std::vector<std::string> keys;
const auto& tail = std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom);
- size_t blockSizeLimit = 256_MB;
+ size_t blockSizeLimit = 1_MB;
size_t keysCountLimit = 4096;
- size_t totalSizeLimit = 1_GB;
+ size_t totalSizeLimit = 10_MB;
+ size_t fileSizeLimit = 50_MB;
if (!tail.empty()) {
const std::string str(tail);
const JSON json(str);
@@ -1544,23 +1614,27 @@ public:
std::transform(list.begin(), list.end(), std::back_inserter(keys), std::mem_fun_ref(&JSON::getString));
}
if (json.has("block_size_limit")) {
- blockSizeLimit = json["block_size_limit"].getUInt();
+ blockSizeLimit = FromString(TStringBuf(json["block_size_limit"].getString()));
}
if (json.has("keys_count_limit")) {
- keysCountLimit = json["keys_count_limit"].getUInt();
+ keysCountLimit = FromString(TStringBuf(json["keys_count_limit"].getString()));
}
if (json.has("total_size_limit")) {
- totalSizeLimit = json["total_size_limit"].getUInt();
+ totalSizeLimit = FromString(TStringBuf(json["total_size_limit"].getString()));
+ }
+ if (json.has("file_size_limit")) {
+ fileSizeLimit = FromString(TStringBuf(json["file_size_limit"].getString()));
}
}
+ blockSizeLimit = Min(blockSizeLimit, fileSizeLimit);
builder.UserType(userType);
builder.Args()->Add(inputType).Done();
if (keys.empty())
- builder.Returns(builder.Stream()->Item<const char*>());
+ builder.Returns(builder.Stream()->Item(builder.Optional()->Item<const char*>()));
else {
const auto tuple = builder.Tuple();
- tuple->Add<const char*>();
+ tuple->Add(builder.Optional()->Item<const char*>());
for (auto k =0U; k < keys.size(); ++k)
tuple->Add<TUtf8>();
builder.Returns(builder.Stream()->Item(tuple->Build()));
@@ -1595,7 +1669,7 @@ public:
}
if (!(flags & TFlags::TypesOnly)) {
- builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), tail, std::move(keyIndexes), std::move(payloadIndexes), blockSizeLimit, keysCountLimit, totalSizeLimit, builder.GetSourcePosition(), std::move(inMeta), std::move(columns)));
+ builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), tail, std::move(keyIndexes), std::move(payloadIndexes), blockSizeLimit, keysCountLimit, totalSizeLimit, fileSizeLimit, builder.GetSourcePosition(), std::move(inMeta), std::move(columns)));
}
return;
} else {