aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <Anton.Romanov@ydb.tech>2022-07-15 14:26:24 +0300
committera-romanov <Anton.Romanov@ydb.tech>2022-07-15 14:26:24 +0300
commitcd2a5582557397c5c83028723362019d4f1074be (patch)
treea6d367b7efbd8c79b682d498b1ed5db63da02e12
parente542935517ca9d6ea833104b0258cb5df6b9ab27 (diff)
downloadydb-cd2a5582557397c5c83028723362019d4f1074be.tar.gz
SerializeFormat with split on partitions draft.
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp146
1 files changed, 117 insertions, 29 deletions
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
index d10c8148261..2ed24c17a01 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
@@ -373,6 +373,10 @@ void ConvertInputValue(const TUnboxedValuePod& value, const NDB::IColumn::Mutabl
return col->insert(NDB::Field(value.AsStringRef()));
case EDataSlot::Bool:
return col->insert(NDB::Field(value.Get<bool>()));
+ case EDataSlot::Float:
+ return col->insert(NDB::Field(value.Get<float>()));
+ case EDataSlot::Double:
+ return col->insert(NDB::Field(value.Get<double>()));
case EDataSlot::Uint8:
return col->insert(NDB::Field(value.Get<ui8>()));
case EDataSlot::Uint16:
@@ -747,46 +751,92 @@ private:
class TSerializeFormat : public TBoxedValue {
class TStreamValue : public TBoxedValue {
+ private:
+ using TPartitionByKey = std::vector<TUnboxedValue, TStdAllocatorForUdf<TUnboxedValue>>;
+
+ struct TPartitionMapStuff {
+ explicit TPartitionMapStuff(size_t size) : Size(size) {}
+
+ bool operator()(const TPartitionByKey& left, const TPartitionByKey& right) const {
+ for (auto i = 0U; i < Size; ++i)
+ if (left[i].AsStringRef() != right[i].AsStringRef())
+ return false;
+
+ return true;
+ }
+
+ size_t operator()(const TPartitionByKey& value) const {
+ size_t hash = 0ULL;
+ for (auto i = 0U; i < Size; ++i)
+ hash = CombineHashes(hash, std::hash<std::string_view>()(value[i].AsStringRef()));
+ return hash;
+ }
+
+ const size_t Size;
+ };
+
+ using TPartitionsMap = std::unordered_map<TPartitionByKey, NDB::Block, TPartitionMapStuff, TPartitionMapStuff, TStdAllocatorForUdf<std::pair<const TPartitionByKey, NDB::Block>>>;
public:
- TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos)
+ TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<ui32>& keysIndexes, const std::vector<ui32>& payloadsIndexes, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos)
: ValueBuilder(valueBuilder)
, Stream(stream)
, InMeta(inMeta)
+ , KeysIndexes(keysIndexes)
+ , PayloadsIndexes(payloadsIndexes)
, Pos(pos)
+ , HeaderBlock(columns)
, Buffer(std::make_unique<NDB::WriteBufferFromOwnString>())
- , BlockStream(std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(type, *Buffer, NDB::Block(columns), nullptr, {}, settings)))
- , CurrentBlock(columns)
+ , BlockStream(std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(type, *Buffer, HeaderBlock, nullptr, {}, settings)))
+ , PartitionsMap(0ULL, TPartitionMapStuff(KeysIndexes.size()), TPartitionMapStuff(KeysIndexes.size()))
+ , Cache(KeysIndexes.size() + 1U)
{
BlockStream->writePrefix();
}
private:
EFetchStatus Fetch(TUnboxedValue& result) final try {
- if (IsFinished)
+ if (IsFinished && PartitionsMap.empty())
return EFetchStatus::Finish;
- auto columns = CurrentBlock.mutateColumns();
for (TUnboxedValue row; !IsFinished;) {
switch (const auto status = Stream.Fetch(row)) {
case EFetchStatus::Yield:
return EFetchStatus::Yield;
- case EFetchStatus::Ok:
- for (auto i = 0U; i < columns.size(); ++i)
- ConvertInputValue(row.GetElement(i), columns[i], InMeta[i]);
+ case EFetchStatus::Ok: {
+ TPartitionByKey keys(KeysIndexes.size());
+ for (auto i = 0U; i < keys.size(); ++i)
+ keys[i] = row.GetElement(KeysIndexes[i]);
+
+ const auto ins = PartitionsMap.emplace(keys, HeaderBlock.cloneEmpty());
+ auto columns = ins.first->second.mutateColumns();
+ for (auto i = 0U; i < columns.size(); ++i) {
+ const auto index = PayloadsIndexes[i];
+ ConvertInputValue(row.GetElement(index), columns[i], InMeta[index]);
+ }
+ ins.first->second.setColumns(std::move(columns));
continue;
+ }
case EFetchStatus::Finish:
IsFinished = true;
- BlockStream->writeSuffix();
- BlockStream->flush();
break;
}
}
- CurrentBlock.setColumns(std::move(columns));
- BlockStream->write(CurrentBlock);
+ auto item = PartitionsMap.extract(PartitionsMap.cbegin());
+ BlockStream->write(item.mapped());
+
+ if (IsFinished && PartitionsMap.empty())
+ BlockStream->writeSuffix();
BlockStream->flush();
- CurrentBlock = CurrentBlock.cloneEmpty();
- result = ValueBuilder->NewString(Buffer->str());
+ if (KeysIndexes.empty())
+ result = ValueBuilder->NewString(Buffer->str());
+ else {
+ TUnboxedValue* tupleItems = nullptr;
+ result = Cache.NewArray(*ValueBuilder, tupleItems);
+ *tupleItems++ = ValueBuilder->NewString(Buffer->str());
+ std::move(item.key().begin(), item.key().end(), tupleItems);
+ }
+
Buffer->restart();
return EFetchStatus::Ok;
}
@@ -800,22 +850,27 @@ class TSerializeFormat : public TBoxedValue {
const IValueBuilder* ValueBuilder;
const TUnboxedValue Stream;
const std::vector<TColumnMeta> InMeta;
+ const std::vector<ui32> KeysIndexes;
+ const std::vector<ui32> PayloadsIndexes;
const TSourcePosition Pos;
+ const NDB::Block HeaderBlock;
const std::unique_ptr<NDB::WriteBufferFromOwnString> Buffer;
const std::unique_ptr<NDB::IBlockOutputStream> BlockStream;
- NDB::Block CurrentBlock;
+ TPartitionsMap PartitionsMap;
+
+ TPlainArrayCache Cache;
+
bool IsFinished = false;
};
-
public:
- TSerializeFormat(const std::string_view& type, const std::string_view& settings, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns)
- : Type(type), Settings(GetFormatSettings(settings)), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns))
+ TSerializeFormat(const std::string_view& type, const std::string_view& settings, std::vector<ui32>&& keysIndexes, std::vector<ui32>&& payloadsIndexes, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns)
+ : Type(type), Settings(GetFormatSettings(settings)), KeysIndexes(std::move(keysIndexes)), PayloadsIndexes(std::move(payloadsIndexes)), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns))
{}
TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try {
- return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, InMeta, Columns, Pos));
+ return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, KeysIndexes, PayloadsIndexes, InMeta, Columns, Pos));
}
catch (const Poco::Exception& e) {
UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
@@ -826,6 +881,8 @@ public:
private:
const std::string Type;
const NDB::FormatSettings Settings;
+ const std::vector<ui32> KeysIndexes;
+ const std::vector<ui32> PayloadsIndexes;
const TSourcePosition Pos;
const std::vector<TColumnMeta> InMeta;
@@ -1165,18 +1222,51 @@ public:
const auto inputType = argsTypeInspector.GetElementType(0);
+ const std::string_view& typeCfg = typeConfig;
+ const auto jsonFrom = typeCfg.find('{');
+ std::vector<std::string> keys;
+ const auto& tail = std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom);
+ if (!tail.empty()) {
+ const std::string str(tail);
+ if (const JSON json(str); json.has("keys")) {
+ const auto& list = json["keys"];
+ keys.reserve(list.size());
+ std::transform(list.begin(), list.end(), std::back_inserter(keys), std::mem_fun_ref(&JSON::getString));
+ }
+ }
+
builder.UserType(userType);
builder.Args()->Add(inputType).Done();
- builder.Returns(builder.Stream()->Item<char*>());
+ if (keys.empty())
+ builder.Returns(builder.Stream()->Item<const char*>());
+ else {
+ const auto tuple = builder.Tuple();
+ tuple->Add<const char*>();
+ for (auto k =0U; k < keys.size(); ++k)
+ tuple->Add<TUtf8>();
+ builder.Returns(builder.Stream()->Item(tuple->Build()));
+ }
if (const auto structType = TStructTypeInspector(*typeHelper, TStreamTypeInspector(*typeHelper, inputType).GetItemType())) {
- std::vector<TColumnMeta> outMeta(structType.GetMembersCount());
- NDB::ColumnsWithTypeAndName columns(structType.GetMembersCount());
+ std::vector<TColumnMeta> inMeta(structType.GetMembersCount());
+ std::vector<ui32> keyIndexes(keys.size()), payloadIndexes(structType.GetMembersCount() - keys.size());
+ NDB::ColumnsWithTypeAndName columns;
+ columns.reserve(payloadIndexes.size());
for (ui32 i = 0U; i < structType.GetMembersCount(); ++i) {
- if (auto& meta = outMeta[i]; GetDataType(*typeHelper, structType.GetMemberType(i), meta)) {
- auto& colsumn = columns[i];
- colsumn.type = MetaToClickHouse(meta);
- colsumn.name = structType.GetMemberName(i);
+ if (auto& meta = inMeta[i]; GetDataType(*typeHelper, structType.GetMemberType(i), meta)) {
+ const std::string_view name = structType.GetMemberName(i);
+ bool payload = true;
+ for (ui32 k = 0U; k < keys.size(); ++k) {
+ if (keys[k] == name) {
+ keyIndexes[k] = i;
+ payload = false;
+ }
+ }
+
+ if (payload) {
+ payloadIndexes[columns.size()] = i;
+ columns.emplace_back(MetaToClickHouse(meta), std::string(name));
+ }
} else {
::TStringBuilder sb;
sb << "Incompatible column '" << structType.GetMemberName(i) << "' type: ";
@@ -1186,9 +1276,7 @@ public:
}
if (!(flags & TFlags::TypesOnly)) {
- const std::string_view& typeCfg = typeConfig;
- const auto jsonFrom = typeCfg.find('{');
- builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom), builder.GetSourcePosition(), std::move(outMeta), std::move(columns)));
+ builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), tail, std::move(keyIndexes), std::move(payloadIndexes), builder.GetSourcePosition(), std::move(inMeta), std::move(columns)));
}
return;
} else {