aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-07-20 19:36:43 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-07-20 19:36:43 +0300
commitd07e0eac7e959cb56fc316686c9ab6af3d898f7e (patch)
tree2514dd672542cbf64d0e1b8d167ee2155f1f4471
parentff4f4c811479c0d1ff38f2b8b950056775bcb44c (diff)
downloadydb-d07e0eac7e959cb56fc316686c9ab6af3d898f7e.tar.gz
Flush blocks.
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp100
1 files changed, 69 insertions, 31 deletions
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
index 2ed24c17a01..2657da002e5 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
@@ -36,6 +36,7 @@
#include "src/Processors/Formats/InputStreamFromInputFormat.h"
#include "src/Processors/Formats/OutputStreamToOutputFormat.h"
+#include <util/generic/size_literals.h>
#include <util/generic/yexception.h>
#include <util/string/split.h>
@@ -775,24 +776,45 @@ class TSerializeFormat : public TBoxedValue {
const size_t Size;
};
- using TPartitionsMap = std::unordered_map<TPartitionByKey, NDB::Block, TPartitionMapStuff, TPartitionMapStuff, TStdAllocatorForUdf<std::pair<const TPartitionByKey, NDB::Block>>>;
+ using TPartitionByPayload = std::pair<NDB::Block, bool>; //+ flag of first block on key.
+ using TPartitionsMap = std::unordered_map<TPartitionByKey, TPartitionByPayload, TPartitionMapStuff, TPartitionMapStuff, TStdAllocatorForUdf<std::pair<const TPartitionByKey, TPartitionByPayload>>>;
public:
- TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<ui32>& keysIndexes, const std::vector<ui32>& payloadsIndexes, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos)
+ TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<ui32>& keysIndexes, const std::vector<ui32>& payloadsIndexes, size_t blockSizeLimit, size_t keysCountLimit, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos)
: ValueBuilder(valueBuilder)
, Stream(stream)
, InMeta(inMeta)
, KeysIndexes(keysIndexes)
, PayloadsIndexes(payloadsIndexes)
+ , BlockSizeLimit(blockSizeLimit)
+ , KeysCountLimit(keysCountLimit)
, Pos(pos)
, HeaderBlock(columns)
, Buffer(std::make_unique<NDB::WriteBufferFromOwnString>())
, BlockStream(std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(type, *Buffer, HeaderBlock, nullptr, {}, settings)))
, PartitionsMap(0ULL, TPartitionMapStuff(KeysIndexes.size()), TPartitionMapStuff(KeysIndexes.size()))
, Cache(KeysIndexes.size() + 1U)
- {
- BlockStream->writePrefix();
- }
+ {}
private:
+ void FlushKey(const TPartitionsMap::const_iterator it, TUnboxedValue& result) {
+ if (it->second.second)
+ BlockStream->writePrefix();
+ BlockStream->write(it->second.first);
+ if (IsFinished)
+ BlockStream->writeSuffix();
+ BlockStream->flush();
+
+ if (KeysIndexes.empty())
+ result = ValueBuilder->NewString(Buffer->str());
+ else {
+ TUnboxedValue* tupleItems = nullptr;
+ result = Cache.NewArray(*ValueBuilder, tupleItems);
+ *tupleItems++ = ValueBuilder->NewString(Buffer->str());
+ std::copy(it->first.cbegin(), it->first.cend(), tupleItems);
+ }
+
+ Buffer->restart();
+ }
+
EFetchStatus Fetch(TUnboxedValue& result) final try {
if (IsFinished && PartitionsMap.empty())
return EFetchStatus::Finish;
@@ -806,14 +828,29 @@ class TSerializeFormat : public TBoxedValue {
for (auto i = 0U; i < keys.size(); ++i)
keys[i] = row.GetElement(KeysIndexes[i]);
- const auto ins = PartitionsMap.emplace(keys, HeaderBlock.cloneEmpty());
- auto columns = ins.first->second.mutateColumns();
+ const auto ins = PartitionsMap.emplace(keys, std::make_pair(HeaderBlock.cloneEmpty(), true));
+
+ if (ins.second && PartitionsMap.size() > KeysCountLimit) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " Too many unique keys: " << PartitionsMap.size()).data());
+ }
+
+ const bool flush = !ins.second && ins.first->second.first.bytes() >= BlockSizeLimit;
+ if (flush) {
+ FlushKey(ins.first, result);
+ ins.first->second = std::make_pair(HeaderBlock.cloneEmpty(), false);
+ }
+
+ auto columns = ins.first->second.first.mutateColumns();
for (auto i = 0U; i < columns.size(); ++i) {
const auto index = PayloadsIndexes[i];
ConvertInputValue(row.GetElement(index), columns[i], InMeta[index]);
}
- ins.first->second.setColumns(std::move(columns));
- continue;
+ ins.first->second.first.setColumns(std::move(columns));
+
+ if (flush)
+ return EFetchStatus::Ok;
+ else
+ continue;
}
case EFetchStatus::Finish:
IsFinished = true;
@@ -821,23 +858,11 @@ class TSerializeFormat : public TBoxedValue {
}
}
- auto item = PartitionsMap.extract(PartitionsMap.cbegin());
- BlockStream->write(item.mapped());
-
- if (IsFinished && PartitionsMap.empty())
- BlockStream->writeSuffix();
- BlockStream->flush();
-
- if (KeysIndexes.empty())
- result = ValueBuilder->NewString(Buffer->str());
- else {
- TUnboxedValue* tupleItems = nullptr;
- result = Cache.NewArray(*ValueBuilder, tupleItems);
- *tupleItems++ = ValueBuilder->NewString(Buffer->str());
- std::move(item.key().begin(), item.key().end(), tupleItems);
- }
+ if (PartitionsMap.empty())
+ return EFetchStatus::Finish;
- Buffer->restart();
+ FlushKey(PartitionsMap.cbegin(), result);
+ PartitionsMap.erase(PartitionsMap.cbegin());
return EFetchStatus::Ok;
}
catch (const Poco::Exception& e) {
@@ -852,6 +877,8 @@ class TSerializeFormat : public TBoxedValue {
const std::vector<TColumnMeta> InMeta;
const std::vector<ui32> KeysIndexes;
const std::vector<ui32> PayloadsIndexes;
+ const size_t BlockSizeLimit;
+ const size_t KeysCountLimit;
const TSourcePosition Pos;
const NDB::Block HeaderBlock;
@@ -865,12 +892,12 @@ class TSerializeFormat : public TBoxedValue {
bool IsFinished = false;
};
public:
- TSerializeFormat(const std::string_view& type, const std::string_view& settings, std::vector<ui32>&& keysIndexes, std::vector<ui32>&& payloadsIndexes, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns)
- : Type(type), Settings(GetFormatSettings(settings)), KeysIndexes(std::move(keysIndexes)), PayloadsIndexes(std::move(payloadsIndexes)), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns))
+ TSerializeFormat(const std::string_view& type, const std::string_view& settings, std::vector<ui32>&& keysIndexes, std::vector<ui32>&& payloadsIndexes, size_t blockSizzeLimit, size_t keysCountLimit, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns)
+ : Type(type), Settings(GetFormatSettings(settings)), KeysIndexes(std::move(keysIndexes)), PayloadsIndexes(std::move(payloadsIndexes)), BlockSizeLimit(blockSizzeLimit), KeysCountLimit(keysCountLimit), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns))
{}
TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try {
- return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, KeysIndexes, PayloadsIndexes, InMeta, Columns, Pos));
+ return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, KeysIndexes, PayloadsIndexes, BlockSizeLimit, KeysCountLimit, InMeta, Columns, Pos));
}
catch (const Poco::Exception& e) {
UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
@@ -883,8 +910,10 @@ private:
const NDB::FormatSettings Settings;
const std::vector<ui32> KeysIndexes;
const std::vector<ui32> PayloadsIndexes;
-
+ const size_t BlockSizeLimit;
+ const size_t KeysCountLimit;
const TSourcePosition Pos;
+
const std::vector<TColumnMeta> InMeta;
const NDB::ColumnsWithTypeAndName Columns;
};
@@ -1226,13 +1255,22 @@ public:
const auto jsonFrom = typeCfg.find('{');
std::vector<std::string> keys;
const auto& tail = std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom);
+ size_t blockSizeLimit = 256_MB;
+ size_t keysCountLimit = 4096;
if (!tail.empty()) {
const std::string str(tail);
- if (const JSON json(str); json.has("keys")) {
+ const JSON json(str);
+ if (json.has("keys")) {
const auto& list = json["keys"];
keys.reserve(list.size());
std::transform(list.begin(), list.end(), std::back_inserter(keys), std::mem_fun_ref(&JSON::getString));
}
+ if (json.has("block_size_limit")) {
+ blockSizeLimit = json["block_size_limit"].getUInt();
+ }
+ if (json.has("keys_count_limit")) {
+ keysCountLimit = json["keys_count_limit"].getUInt();
+ }
}
builder.UserType(userType);
@@ -1276,7 +1314,7 @@ public:
}
if (!(flags & TFlags::TypesOnly)) {
- builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), tail, std::move(keyIndexes), std::move(payloadIndexes), builder.GetSourcePosition(), std::move(inMeta), std::move(columns)));
+ builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), tail, std::move(keyIndexes), std::move(payloadIndexes), blockSizeLimit, keysCountLimit, builder.GetSourcePosition(), std::move(inMeta), std::move(columns)));
}
return;
} else {