diff options
author | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-15 14:26:24 +0300 |
---|---|---|
committer | a-romanov <Anton.Romanov@ydb.tech> | 2022-07-15 14:26:24 +0300 |
commit | cd2a5582557397c5c83028723362019d4f1074be (patch) | |
tree | a6d367b7efbd8c79b682d498b1ed5db63da02e12 | |
parent | e542935517ca9d6ea833104b0258cb5df6b9ab27 (diff) | |
download | ydb-cd2a5582557397c5c83028723362019d4f1074be.tar.gz |
SerializeFormat with split on partitions draft.
-rw-r--r-- | ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp | 146 |
1 files changed, 117 insertions, 29 deletions
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp index d10c8148261..2ed24c17a01 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp @@ -373,6 +373,10 @@ void ConvertInputValue(const TUnboxedValuePod& value, const NDB::IColumn::Mutabl return col->insert(NDB::Field(value.AsStringRef())); case EDataSlot::Bool: return col->insert(NDB::Field(value.Get<bool>())); + case EDataSlot::Float: + return col->insert(NDB::Field(value.Get<float>())); + case EDataSlot::Double: + return col->insert(NDB::Field(value.Get<double>())); case EDataSlot::Uint8: return col->insert(NDB::Field(value.Get<ui8>())); case EDataSlot::Uint16: @@ -747,46 +751,92 @@ private: class TSerializeFormat : public TBoxedValue { class TStreamValue : public TBoxedValue { + private: + using TPartitionByKey = std::vector<TUnboxedValue, TStdAllocatorForUdf<TUnboxedValue>>; + + struct TPartitionMapStuff { + explicit TPartitionMapStuff(size_t size) : Size(size) {} + + bool operator()(const TPartitionByKey& left, const TPartitionByKey& right) const { + for (auto i = 0U; i < Size; ++i) + if (left[i].AsStringRef() != right[i].AsStringRef()) + return false; + + return true; + } + + size_t operator()(const TPartitionByKey& value) const { + size_t hash = 0ULL; + for (auto i = 0U; i < Size; ++i) + hash = CombineHashes(hash, std::hash<std::string_view>()(value[i].AsStringRef())); + return hash; + } + + const size_t Size; + }; + + using TPartitionsMap = std::unordered_map<TPartitionByKey, NDB::Block, TPartitionMapStuff, TPartitionMapStuff, TStdAllocatorForUdf<std::pair<const TPartitionByKey, NDB::Block>>>; public: - TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos) + TStreamValue(const IValueBuilder* valueBuilder, const TUnboxedValue& stream, const std::string& type, const NDB::FormatSettings& settings, const std::vector<ui32>& keysIndexes, const std::vector<ui32>& payloadsIndexes, const std::vector<TColumnMeta>& inMeta, const NDB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos) : ValueBuilder(valueBuilder) , Stream(stream) , InMeta(inMeta) + , KeysIndexes(keysIndexes) + , PayloadsIndexes(payloadsIndexes) , Pos(pos) + , HeaderBlock(columns) , Buffer(std::make_unique<NDB::WriteBufferFromOwnString>()) - , BlockStream(std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(type, *Buffer, NDB::Block(columns), nullptr, {}, settings))) - , CurrentBlock(columns) + , BlockStream(std::make_unique<NDB::OutputStreamToOutputFormat>(NDB::FormatFactory::instance().getOutputFormat(type, *Buffer, HeaderBlock, nullptr, {}, settings))) + , PartitionsMap(0ULL, TPartitionMapStuff(KeysIndexes.size()), TPartitionMapStuff(KeysIndexes.size())) + , Cache(KeysIndexes.size() + 1U) { BlockStream->writePrefix(); } private: EFetchStatus Fetch(TUnboxedValue& result) final try { - if (IsFinished) + if (IsFinished && PartitionsMap.empty()) return EFetchStatus::Finish; - auto columns = CurrentBlock.mutateColumns(); for (TUnboxedValue row; !IsFinished;) { switch (const auto status = Stream.Fetch(row)) { case EFetchStatus::Yield: return EFetchStatus::Yield; - case EFetchStatus::Ok: - for (auto i = 0U; i < columns.size(); ++i) - ConvertInputValue(row.GetElement(i), columns[i], InMeta[i]); + case EFetchStatus::Ok: { + TPartitionByKey keys(KeysIndexes.size()); + for (auto i = 0U; i < keys.size(); ++i) + keys[i] = row.GetElement(KeysIndexes[i]); + + const auto ins = PartitionsMap.emplace(keys, HeaderBlock.cloneEmpty()); + auto columns = ins.first->second.mutateColumns(); + for (auto i = 0U; i < columns.size(); ++i) { + const auto index = PayloadsIndexes[i]; + ConvertInputValue(row.GetElement(index), columns[i], InMeta[index]); + } + ins.first->second.setColumns(std::move(columns)); continue; + } case EFetchStatus::Finish: IsFinished = true; - BlockStream->writeSuffix(); - BlockStream->flush(); break; } } - CurrentBlock.setColumns(std::move(columns)); - BlockStream->write(CurrentBlock); + auto item = PartitionsMap.extract(PartitionsMap.cbegin()); + BlockStream->write(item.mapped()); + + if (IsFinished && PartitionsMap.empty()) + BlockStream->writeSuffix(); BlockStream->flush(); - CurrentBlock = CurrentBlock.cloneEmpty(); - result = ValueBuilder->NewString(Buffer->str()); + if (KeysIndexes.empty()) + result = ValueBuilder->NewString(Buffer->str()); + else { + TUnboxedValue* tupleItems = nullptr; + result = Cache.NewArray(*ValueBuilder, tupleItems); + *tupleItems++ = ValueBuilder->NewString(Buffer->str()); + std::move(item.key().begin(), item.key().end(), tupleItems); + } + Buffer->restart(); return EFetchStatus::Ok; } @@ -800,22 +850,27 @@ class TSerializeFormat : public TBoxedValue { const IValueBuilder* ValueBuilder; const TUnboxedValue Stream; const std::vector<TColumnMeta> InMeta; + const std::vector<ui32> KeysIndexes; + const std::vector<ui32> PayloadsIndexes; const TSourcePosition Pos; + const NDB::Block HeaderBlock; const std::unique_ptr<NDB::WriteBufferFromOwnString> Buffer; const std::unique_ptr<NDB::IBlockOutputStream> BlockStream; - NDB::Block CurrentBlock; + TPartitionsMap PartitionsMap; + + TPlainArrayCache Cache; + bool IsFinished = false; }; - public: - TSerializeFormat(const std::string_view& type, const std::string_view& settings, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns) - : Type(type), Settings(GetFormatSettings(settings)), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns)) + TSerializeFormat(const std::string_view& type, const std::string_view& settings, std::vector<ui32>&& keysIndexes, std::vector<ui32>&& payloadsIndexes, const TSourcePosition& pos, std::vector<TColumnMeta>&& inMeta, NDB::ColumnsWithTypeAndName&& columns) + : Type(type), Settings(GetFormatSettings(settings)), KeysIndexes(std::move(keysIndexes)), PayloadsIndexes(std::move(payloadsIndexes)), Pos(pos), InMeta(std::move(inMeta)), Columns(std::move(columns)) {} TUnboxedValue Run(const IValueBuilder* valueBuilder, const TUnboxedValuePod* args) const final try { - return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, InMeta, Columns, Pos)); + return TUnboxedValuePod(new TStreamValue(valueBuilder, *args, Type, Settings, KeysIndexes, PayloadsIndexes, InMeta, Columns, Pos)); } catch (const Poco::Exception& e) { UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); @@ -826,6 +881,8 @@ public: private: const std::string Type; const NDB::FormatSettings Settings; + const std::vector<ui32> KeysIndexes; + const std::vector<ui32> PayloadsIndexes; const TSourcePosition Pos; const std::vector<TColumnMeta> InMeta; @@ -1165,18 +1222,51 @@ public: const auto inputType = argsTypeInspector.GetElementType(0); + const std::string_view& typeCfg = typeConfig; + const auto jsonFrom = typeCfg.find('{'); + std::vector<std::string> keys; + const auto& tail = std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom); + if (!tail.empty()) { + const std::string str(tail); + if (const JSON json(str); json.has("keys")) { + const auto& list = json["keys"]; + keys.reserve(list.size()); + std::transform(list.begin(), list.end(), std::back_inserter(keys), std::mem_fun_ref(&JSON::getString)); + } + } + builder.UserType(userType); builder.Args()->Add(inputType).Done(); - builder.Returns(builder.Stream()->Item<char*>()); + if (keys.empty()) + builder.Returns(builder.Stream()->Item<const char*>()); + else { + const auto tuple = builder.Tuple(); + tuple->Add<const char*>(); + for (auto k =0U; k < keys.size(); ++k) + tuple->Add<TUtf8>(); + builder.Returns(builder.Stream()->Item(tuple->Build())); + } if (const auto structType = TStructTypeInspector(*typeHelper, TStreamTypeInspector(*typeHelper, inputType).GetItemType())) { - std::vector<TColumnMeta> outMeta(structType.GetMembersCount()); - NDB::ColumnsWithTypeAndName columns(structType.GetMembersCount()); + std::vector<TColumnMeta> inMeta(structType.GetMembersCount()); + std::vector<ui32> keyIndexes(keys.size()), payloadIndexes(structType.GetMembersCount() - keys.size()); + NDB::ColumnsWithTypeAndName columns; + columns.reserve(payloadIndexes.size()); for (ui32 i = 0U; i < structType.GetMembersCount(); ++i) { - if (auto& meta = outMeta[i]; GetDataType(*typeHelper, structType.GetMemberType(i), meta)) { - auto& colsumn = columns[i]; - colsumn.type = MetaToClickHouse(meta); - colsumn.name = structType.GetMemberName(i); + if (auto& meta = inMeta[i]; GetDataType(*typeHelper, structType.GetMemberType(i), meta)) { + const std::string_view name = structType.GetMemberName(i); + bool payload = true; + for (ui32 k = 0U; k < keys.size(); ++k) { + if (keys[k] == name) { + keyIndexes[k] = i; + payload = false; + } + } + + if (payload) { + payloadIndexes[columns.size()] = i; + columns.emplace_back(MetaToClickHouse(meta), std::string(name)); + } } else { ::TStringBuilder sb; sb << "Incompatible column '" << structType.GetMemberName(i) << "' type: "; @@ -1186,9 +1276,7 @@ public: } if (!(flags & TFlags::TypesOnly)) { - const std::string_view& typeCfg = typeConfig; - const auto jsonFrom = typeCfg.find('{'); - builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom), builder.GetSourcePosition(), std::move(outMeta), std::move(columns))); + builder.Implementation(new TSerializeFormat(typeCfg.substr(0U, jsonFrom), tail, std::move(keyIndexes), std::move(payloadIndexes), builder.GetSourcePosition(), std::move(inMeta), std::move(columns))); } return; } else { |