diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-20 19:36:43 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-20 19:36:43 +0300 |
commit | d07e0eac7e959cb56fc316686c9ab6af3d898f7e (patch) | |
tree | 2514dd672542cbf64d0e1b8d167ee2155f1f4471 | |
parent | ff4f4c811479c0d1ff38f2b8b950056775bcb44c (diff) | |
download | ydb-d07e0eac7e959cb56fc316686c9ab6af3d898f7e.tar.gz |
Flush blocks.
-rw-r--r-- | ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp | 100 |
1 files changed, 69 insertions, 31 deletions
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp index 2ed24c17a01..2657da002e5 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp @@ -36,6 +36,7 @@ #include "src/Processors/Formats/InputStreamFromInputFormat.h" #include "src/Processors/Formats/OutputStreamToOutputFormat.h" +#include <util/generic/size_literals.h> #include <util/generic/yexception.h> #include <util/string/split.h> @@ -775,24 +776,45 @@ class TSerializeFormat : public TBoxedValue { const size_t Size; }; - using TPartitionsMap = std::unordered_map<TPartitionByKey, NDB::Block, TPartitionMapStuff, TPartitionMapStuff, TStdAllocatorForUdf<std::pair<const TPartitionByKey, NDB::Block>>>; + using TPartitionByPayload = std::pair<NDB::Block, bool>; //+ flag of first block on key. + using TPartitionsMap = std::unordered_map<TPartitionByKey, TPartitionByPayload, TPartitionMapStuff, TPartitionMapStuff, TStdAllocatorForUdf<std::pair<const TPartitionByKey, TPartitionByPayload>>>; public: - TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<ui32>& keysIndexes, const std::vector<ui32>& payloadsIndexes, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos) + TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<ui32>& keysIndexes, const std::vector<ui32>& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos) : ValueBuilder(valueBuilder) , Stream(stream) , InMeta(inMeta) , KeysIndexes(keysIndexes) , PayloadsIndexes(payloadsIndexes) + , BlockSizeLimit(blockSizeLimit) + , KeysCountLimit(keysCountLimit) , Pos(pos) , HeaderBlock(columns) , Buffer(std::make_unique<NDB::WriteBufferFromOwnString>()) , BlockStream(std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(type, *Buffer, HeaderBlock, nullptr, {}, settings))) , PartitionsMap(0ULL, TPartitionMapStuff(KeysIndexes.size()), TPartitionMapStuff(KeysIndexes.size())) , Cache(KeysIndexes.size() + 1U) - { - BlockStream->writePrefix(); - } + {} private: + void FlushKey(const TPartitionsMap::const_iterator it, TUnboxedValue& result) { + if (it->second.second) + BlockStream->writePrefix(); + BlockStream->write(it->second.first); + if (IsFinished) + BlockStream->writeSuffix(); + BlockStream->flush(); + + if (KeysIndexes.empty()) + result = ValueBuilder->NewString(Buffer->str()); + else { + TUnboxedValue* tupleItems = nullptr; + result = Cache.NewArray(*ValueBuilder, tupleItems); + *tupleItems++ = ValueBuilder->NewString(Buffer->str()); + std::copy(it->first.cbegin(), it->first.cend(), tupleItems); + } + + Buffer->restart(); + } + EFetchStatus Fetch(TUnboxedValue& result) final try { if (IsFinished && PartitionsMap.empty()) return EFetchStatus::Finish; @@ -806,14 +828,29 @@ class TSerializeFormat : public TBoxedValue { for (auto i = 0U; i < keys.size(); ++i) keys[i] = row.GetElement(KeysIndexes[i]); - const auto ins = PartitionsMap.emplace(keys, HeaderBlock.cloneEmpty()); - auto columns = ins.first->second.mutateColumns(); + const auto ins = PartitionsMap.emplace(keys, std::make_pair(HeaderBlock.cloneEmpty(), true)); + + if (ins.second && PartitionsMap.size() > KeysCountLimit) { + UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " Too many unique keys: " << PartitionsMap.size()).data()); + } + + const bool flush = !ins.second && ins.first->second.first.bytes() >= BlockSizeLimit; + if (flush) { + FlushKey(ins.first, result); + ins.first->second = std::make_pair(HeaderBlock.cloneEmpty(), false); + } + + auto columns = ins.first->second.first.mutateColumns(); for (auto i = 0U; i < columns.size(); ++i) { const auto index = PayloadsIndexes[i]; ConvertInputValue(row.GetElement(index), columns[i], InMeta[index]); } - ins.first->second.setColumns(std::move(columns)); - continue; + ins.first->second.first.setColumns(std::move(columns)); + + if (flush) + return EFetchStatus::Ok; + else + continue; } case EFetchStatus::Finish: IsFinished = true; @@ -821,23 +858,11 @@ class TSerializeFormat : public TBoxedValue { } } - auto item = PartitionsMap.extract(PartitionsMap.cbegin()); - BlockStream->write(item.mapped()); - - if (IsFinished && PartitionsMap.empty()) - BlockStream->writeSuffix(); - BlockStream->flush(); - - if (KeysIndexes.empty()) - result = ValueBuilder->NewString(Buffer->str()); - else { - TUnboxedValue* tupleItems = nullptr; - result = Cache.NewArray(*ValueBuilder, tupleItems); - *tupleItems++ = ValueBuilder->NewString(Buffer->str()); - std::move(item.key().begin(), item.key().end(), tupleItems); - } + if (PartitionsMap.empty()) + return EFetchStatus::Finish; - Buffer->restart(); + FlushKey(PartitionsMap.cbegin(), result); + PartitionsMap.erase(PartitionsMap.cbegin()); return EFetchStatus::Ok; } catch (const Poco::Exception& e) { @@ -852,6 +877,8 @@ class TSerializeFormat : public TBoxedValue { const std::vector<TColumnMeta> InMeta; const std::vector<ui32> KeysIndexes; const std::vector<ui32> PayloadsIndexes; + const size_t BlockSizeLimit; + const size_t KeysCountLimit; const TSourcePosition Pos; const NDB::Block HeaderBlock; @@ -865,12 +892,12 @@ class TSerializeFormat : public TBoxedValue { bool IsFinished = false; }; public: - TSerializeFormat(const std::string_view& type, const std::string_view& settings, std::vector<ui32>&& keysIndexes, std::vector<ui32>&& payloadsIndexes, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns) - : Type(type), Settings(GetFormatSettings(settings)), KeysIndexes(std::move(keysIndexes)), PayloadsIndexes(std::move(payloadsIndexes)), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns)) + TSerializeFormat(const std::string_view& type, const std::string_view& settings, std::vector<ui32>&& keysIndexes, std::vector<ui32>&& payloadsIndexes, size_t blockSizzeLimit, size_t keysCountLimit, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns) + : Type(type), Settings(GetFormatSettings(settings)), KeysIndexes(std::move(keysIndexes)), PayloadsIndexes(std::move(payloadsIndexes)), BlockSizeLimit(blockSizzeLimit), KeysCountLimit(keysCountLimit), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns)) {} TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try { - return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, KeysIndexes, PayloadsIndexes, InMeta, Columns, Pos)); + return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, KeysIndexes, PayloadsIndexes, BlockSizeLimit, KeysCountLimit, InMeta, Columns, Pos)); } catch (const Poco::Exception& e) { UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); @@ -883,8 +910,10 @@ private: const NDB::FormatSettings Settings; const std::vector<ui32> KeysIndexes; const std::vector<ui32> PayloadsIndexes; - + const size_t BlockSizeLimit; + const size_t KeysCountLimit; const TSourcePosition Pos; + const std::vector<TColumnMeta> InMeta; const NDB::ColumnsWithTypeAndName Columns; }; @@ -1226,13 +1255,22 @@ public: const auto jsonFrom = typeCfg.find('{'); std::vector<std::string> keys; const auto& tail = std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom); + size_t blockSizeLimit = 256_MB; + size_t keysCountLimit = 4096; if (!tail.empty()) { const std::string str(tail); - if (const JSON json(str); json.has("keys")) { + const JSON json(str); + if (json.has("keys")) { const auto& list = json["keys"]; keys.reserve(list.size()); std::transform(list.begin(), list.end(), std::back_inserter(keys), std::mem_fun_ref(&JSON::getString)); } + if (json.has("block_size_limit")) { + blockSizeLimit = json["block_size_limit"].getUInt(); + } + if (json.has("keys_count_limit")) { + keysCountLimit = json["keys_count_limit"].getUInt(); + } } builder.UserType(userType); @@ -1276,7 +1314,7 @@ public: } if (!(flags & TFlags::TypesOnly)) { - builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), tail, std::move(keyIndexes), std::move(payloadIndexes), builder.GetSourcePosition(), std::move(inMeta), std::move(columns))); + builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), tail, std::move(keyIndexes), std::move(payloadIndexes), blockSizeLimit, keysCountLimit, builder.GetSourcePosition(), std::move(inMeta), std::move(columns))); } return; } else { |