diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-03-23 19:30:58 +0300 |
---|---|---|
committer | a-romanov <a-romanov@yandex-team.ru> | 2022-03-23 19:30:58 +0300 |
commit | 67d78dad74c70c4aae51292e6a9b2a15f48a07a7 (patch) | |
tree | 6a1172698be2bce38344a0e42acb2b59c5f00607 | |
parent | cb49d99ecdc9c83f0970b0778458ba7b9131c573 (diff) | |
download | ydb-67d78dad74c70c4aae51292e6a9b2a15f48a07a7.tar.gz |
YQ-727 CH method for parse from source under coroutine.
ref:7a5d23054ce0bcfbbf0495da0e6bf163ef015df7
-rw-r--r-- | ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp | 204 |
2 files changed, 143 insertions, 63 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 86fbfec2772..f52ef28d860 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -329,7 +329,7 @@ private: const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(RowType), *pb, Cerr); const auto userType = pb->NewTupleType({pb->NewTupleType({pb->NewDataType(NUdf::EDataSlot::String)}), pb->NewStructType({}), outputItemType}); - const auto root = pb->Apply(pb->Udf("ClickHouseClient.ParseFormat", {}, userType, Format), {stream}); + const auto root = pb->Apply(pb->Udf("ClickHouseClient.ParseSource", {}, userType, Format), {stream}); TExploringNodeVisitor explorer; explorer.Walk(root.GetNode(), TypeEnv); diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp index c28539ebe99..33c966d9568 100644 --- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp +++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp @@ -34,6 +34,7 @@ #include "src/Formats/InputStreamFromInputFormat.h" #include "src/Formats/registerFormats.h" +#include <util/generic/yexception.h> #include <util/string/split.h> using namespace NYql; @@ -443,79 +444,154 @@ private: const std::vector<TColumnMeta> OutMeta; // in struct order }; -class TParseFormat : public TBoxedValue { -public: - class TStreamValue : public TBoxedValue { +template<bool WithYields> +class TStreamValue : public TBoxedValue { + class TReadBufferFromStream : public DB::ReadBuffer { public: - TStreamValue(const std::string& type, const DB::FormatSettings& settings, const IValueBuilder* valueBuilder, const TUnboxedValue& stream, - const std::vector<TColumnMeta> outMeta, const DB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos, ui32 tzId) - : ValueBuilder(valueBuilder) - , Stream(stream) - , OutMeta(outMeta) - , Columns(columns) - , Pos(pos) - , TzId(tzId) - , Cache(OutMeta.size()) - , Type(type) - , Settings(settings) + TReadBufferFromStream(TUnboxedValue stream) + : DB::ReadBuffer(nullptr, 0ULL), Stream(std::move(stream)) {} + private: + bool nextImpl() final { + switch (Stream.Fetch(Value)) { + case EFetchStatus::Finish: + return false; + case EFetchStatus::Yield: + throw yexception() << "Enexpected yield."; + case EFetchStatus::Ok: + break; + } + const std::string_view view(Value.AsStringRef()); + working_buffer = DB::BufferBase::Buffer(const_cast<char*>(view.cbegin()), const_cast<char*>(view.cend())); + return true; + } - EFetchStatus Fetch(TUnboxedValue& result) final try { - for (;;) { - if (!BlockStream) { - if (const auto status = Stream.Fetch(Input); EFetchStatus::Ok != status) - return status; + const TUnboxedValue Stream; + TUnboxedValue Value; + }; +public: + TStreamValue(const std::string& type, const DB::FormatSettings& settings, const IValueBuilder* valueBuilder, const TUnboxedValue& stream, + const std::vector<TColumnMeta> outMeta, const DB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos, ui32 tzId); +private: + EFetchStatus Fetch(TUnboxedValue& result) final; - const std::string_view buffer = Input.AsStringRef(); - Buffer = std::make_unique<DB::ReadBufferFromMemory>(buffer.data(), buffer.size()); - BlockStream = std::make_unique<DB::InputStreamFromInputFormat>(DB::FormatFactory::instance().getInputFormat(Type, *Buffer, DB::Block(Columns), nullptr, buffer.size(), Settings)); - } + const IValueBuilder* ValueBuilder; + const TUnboxedValue Stream; + const std::vector<TColumnMeta> OutMeta; + const DB::ColumnsWithTypeAndName Columns; + const TSourcePosition Pos; + const ui32 TzId; - if (CurrentRow >= CurrentBlock.rows()) { - CurrentRow = 0; - if (CurrentBlock = BlockStream->read(); !CurrentBlock) { - BlockStream.reset(); - Buffer.reset(); - continue; - } - } + TPlainArrayCache Cache; - TUnboxedValue* items = nullptr; - result = Cache.NewArray(*ValueBuilder, items); - for (ui32 i = 0; i < OutMeta.size(); ++i) { - *items++ = ConvertOutputValue(CurrentBlock.getByPosition(i).column.get(), OutMeta[i], TzId, ValueBuilder, CurrentRow); - } + TUnboxedValue Input; + const TString Type; + const DB::FormatSettings Settings; - ++CurrentRow; - return EFetchStatus::Ok; + std::unique_ptr<DB::ReadBuffer> Buffer; + std::unique_ptr<DB::IBlockInputStream> BlockStream; + DB::Block CurrentBlock; + size_t CurrentRow = 0U; +}; + +template<> +TStreamValue<true>::TStreamValue(const std::string& type, const DB::FormatSettings& settings, const IValueBuilder* valueBuilder, const TUnboxedValue& stream, + const std::vector<TColumnMeta> outMeta, const DB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos, ui32 tzId) + : ValueBuilder(valueBuilder) + , Stream(stream) + , OutMeta(outMeta) + , Columns(columns) + , Pos(pos) + , TzId(tzId) + , Cache(OutMeta.size()) + , Type(type) + , Settings(settings) +{} + +template<> +TStreamValue<false>::TStreamValue(const std::string& type, const DB::FormatSettings& settings, const IValueBuilder* valueBuilder, const TUnboxedValue& stream, + const std::vector<TColumnMeta> outMeta, const DB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos, ui32 tzId) + : ValueBuilder(valueBuilder) + , Stream(stream) + , OutMeta(outMeta) + , Columns(columns) + , Pos(pos) + , TzId(tzId) + , Cache(OutMeta.size()) + , Type(type) + , Settings(settings) + , Buffer(std::make_unique<TReadBufferFromStream>(std::move(stream))) + , BlockStream(std::make_unique<DB::InputStreamFromInputFormat>(DB::FormatFactory::instance().getInputFormat(Type, *Buffer, DB::Block(Columns), nullptr, 1024 * 1024, Settings))) + , CurrentBlock(BlockStream->read()) +{} + +template<> +EFetchStatus TStreamValue<true>::Fetch(TUnboxedValue& result) try { + for (;;) { + if (!BlockStream) { + if (const auto status = Stream.Fetch(Input); EFetchStatus::Ok != status) + return status; + + const std::string_view buffer = Input.AsStringRef(); + Buffer = std::make_unique<DB::ReadBufferFromMemory>(buffer.data(), buffer.size()); + BlockStream = std::make_unique<DB::InputStreamFromInputFormat>(DB::FormatFactory::instance().getInputFormat(Type, *Buffer, DB::Block(Columns), nullptr, buffer.size(), Settings)); + } + + if (CurrentRow >= CurrentBlock.rows()) { + CurrentRow = 0; + if (CurrentBlock = BlockStream->read(); !CurrentBlock) { + BlockStream.reset(); + Buffer.reset(); + continue; } } - catch (const Poco::Exception& e) { - UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); - } - catch (const std::exception& e) { - UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); + + TUnboxedValue* items = nullptr; + result = Cache.NewArray(*ValueBuilder, items); + for (ui32 i = 0; i < OutMeta.size(); ++i) { + *items++ = ConvertOutputValue(CurrentBlock.getByPosition(i).column.get(), OutMeta[i], TzId, ValueBuilder, CurrentRow); } - private: - const IValueBuilder* ValueBuilder; - const TUnboxedValue Stream; - const std::vector<TColumnMeta> OutMeta; - const DB::ColumnsWithTypeAndName Columns; - const TSourcePosition Pos; - const ui32 TzId; - TPlainArrayCache Cache; + ++CurrentRow; + return EFetchStatus::Ok; + } +} +catch (const Poco::Exception& e) { + UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); +} +catch (const std::exception& e) { + UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); +} - TUnboxedValue Input; - const TString Type; - const DB::FormatSettings Settings; +template<> +EFetchStatus TStreamValue<false>::Fetch(TUnboxedValue& result) try { + if (CurrentRow >= CurrentBlock.rows()) { + CurrentRow = 0; + CurrentBlock = BlockStream->read(); - std::unique_ptr<DB::ReadBuffer> Buffer; - std::unique_ptr<DB::IBlockInputStream> BlockStream; - DB::Block CurrentBlock; - size_t CurrentRow = 0; - }; + if (!CurrentBlock) + return EFetchStatus::Finish; + } + + TUnboxedValue* items = nullptr; + result = Cache.NewArray(*ValueBuilder, items); + for (ui32 i = 0; i < OutMeta.size(); ++i) { + *items++ = ConvertOutputValue(CurrentBlock.getByPosition(i).column.get(), OutMeta[i], TzId, ValueBuilder, CurrentRow); + } + ++CurrentRow; + return EFetchStatus::Ok; +} +catch (const Poco::Exception& e) { + UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); +} +catch (const std::exception& e) { + UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data()); +} + +template<bool WithYields> +class TParseFormat : public TBoxedValue { +public: TParseFormat(const std::string_view& type, const std::string_view& settings, const TSourcePosition& pos, std::vector<TColumnMeta>&& outMeta, DB::ColumnsWithTypeAndName&& columns) : Type(type), Settings(GetFormatSettings(settings)), Pos(pos), OutMeta(std::move(outMeta)), Columns(std::move(columns)) {} @@ -528,7 +604,7 @@ public: } } - return TUnboxedValuePod(new TStreamValue(Type, Settings, valueBuilder, *args, OutMeta, Columns, Pos, tzId)); + return TUnboxedValuePod(new TStreamValue<WithYields>(Type, Settings, valueBuilder, *args, OutMeta, Columns, Pos, tzId)); } catch (const Poco::Exception& e) { UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data()); @@ -716,6 +792,7 @@ public: void GetAllFunctions(IFunctionsSink& sink) const final { sink.Add(TStringRef::Of("ToYqlType")); sink.Add(TStringRef::Of("ParseFormat"))->SetTypeAwareness(); + sink.Add(TStringRef::Of("ParseSource"))->SetTypeAwareness(); sink.Add(TStringRef::Of("ParseFromYdb"))->SetTypeAwareness(); } @@ -741,7 +818,7 @@ public: } const auto typeHelper = builder.TypeInfoHelper(); - if (name == "ParseFormat") { + if (name == "ParseFormat" || name == "ParseSource") { const auto userTypeInspector = TTupleTypeInspector(*typeHelper, userType); if (!userTypeInspector || userTypeInspector.GetElementsCount() < 3) { return builder.SetError("Invalid user type."); @@ -785,7 +862,10 @@ public: if (!(flags & TFlags::TypesOnly)) { const std::string_view& typeCfg = typeConfig; const auto jsonFrom = typeCfg.find('{'); - builder.Implementation(new TParseFormat(typeCfg.substr(0U, jsonFrom), std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom), builder.GetSourcePosition(), std::move(outMeta), std::move(columns))); + if (name == "ParseFormat") + builder.Implementation(new TParseFormat<true>(typeCfg.substr(0U, jsonFrom), std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom), builder.GetSourcePosition(), std::move(outMeta), std::move(columns))); + else + builder.Implementation(new TParseFormat<false>(typeCfg.substr(0U, jsonFrom), std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom), builder.GetSourcePosition(), std::move(outMeta), std::move(columns))); } return; } else { |