aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-03-23 19:30:58 +0300
committera-romanov <a-romanov@yandex-team.ru>2022-03-23 19:30:58 +0300
commit67d78dad74c70c4aae51292e6a9b2a15f48a07a7 (patch)
tree6a1172698be2bce38344a0e42acb2b59c5f00607
parentcb49d99ecdc9c83f0970b0778458ba7b9131c573 (diff)
downloadydb-67d78dad74c70c4aae51292e6a9b2a15f48a07a7.tar.gz
YQ-727 CH method for parse from source under coroutine.
ref:7a5d23054ce0bcfbbf0495da0e6bf163ef015df7
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp2
-rw-r--r--ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp204
2 files changed, 143 insertions, 63 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 86fbfec2772..f52ef28d860 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -329,7 +329,7 @@ private:
const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(RowType), *pb, Cerr);
const auto userType = pb->NewTupleType({pb->NewTupleType({pb->NewDataType(NUdf::EDataSlot::String)}), pb->NewStructType({}), outputItemType});
- const auto root = pb->Apply(pb->Udf("ClickHouseClient.ParseFormat", {}, userType, Format), {stream});
+ const auto root = pb->Apply(pb->Udf("ClickHouseClient.ParseSource", {}, userType, Format), {stream});
TExploringNodeVisitor explorer;
explorer.Walk(root.GetNode(), TypeEnv);
diff --git a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
index c28539ebe99..33c966d9568 100644
--- a/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
+++ b/ydb/library/yql/udfs/common/clickhouse/client/clickhouse_client_udf.cpp
@@ -34,6 +34,7 @@
#include "src/Formats/InputStreamFromInputFormat.h"
#include "src/Formats/registerFormats.h"
+#include <util/generic/yexception.h>
#include <util/string/split.h>
using namespace NYql;
@@ -443,79 +444,154 @@ private:
const std::vector<TColumnMeta> OutMeta; // in struct order
};
-class TParseFormat : public TBoxedValue {
-public:
- class TStreamValue : public TBoxedValue {
+template<bool WithYields>
+class TStreamValue : public TBoxedValue {
+ class TReadBufferFromStream : public DB::ReadBuffer {
public:
- TStreamValue(const std::string& type, const DB::FormatSettings& settings, const IValueBuilder* valueBuilder, const TUnboxedValue& stream,
- const std::vector<TColumnMeta> outMeta, const DB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos, ui32 tzId)
- : ValueBuilder(valueBuilder)
- , Stream(stream)
- , OutMeta(outMeta)
- , Columns(columns)
- , Pos(pos)
- , TzId(tzId)
- , Cache(OutMeta.size())
- , Type(type)
- , Settings(settings)
+ TReadBufferFromStream(TUnboxedValue stream)
+ : DB::ReadBuffer(nullptr, 0ULL), Stream(std::move(stream))
{}
+ private:
+ bool nextImpl() final {
+ switch (Stream.Fetch(Value)) {
+ case EFetchStatus::Finish:
+ return false;
+ case EFetchStatus::Yield:
+ throw yexception() << "Enexpected yield.";
+ case EFetchStatus::Ok:
+ break;
+ }
+ const std::string_view view(Value.AsStringRef());
+ working_buffer = DB::BufferBase::Buffer(const_cast<char*>(view.cbegin()), const_cast<char*>(view.cend()));
+ return true;
+ }
- EFetchStatus Fetch(TUnboxedValue& result) final try {
- for (;;) {
- if (!BlockStream) {
- if (const auto status = Stream.Fetch(Input); EFetchStatus::Ok != status)
- return status;
+ const TUnboxedValue Stream;
+ TUnboxedValue Value;
+ };
+public:
+ TStreamValue(const std::string& type, const DB::FormatSettings& settings, const IValueBuilder* valueBuilder, const TUnboxedValue& stream,
+ const std::vector<TColumnMeta> outMeta, const DB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos, ui32 tzId);
+private:
+ EFetchStatus Fetch(TUnboxedValue& result) final;
- const std::string_view buffer = Input.AsStringRef();
- Buffer = std::make_unique<DB::ReadBufferFromMemory>(buffer.data(), buffer.size());
- BlockStream = std::make_unique<DB::InputStreamFromInputFormat>(DB::FormatFactory::instance().getInputFormat(Type, *Buffer, DB::Block(Columns), nullptr, buffer.size(), Settings));
- }
+ const IValueBuilder* ValueBuilder;
+ const TUnboxedValue Stream;
+ const std::vector<TColumnMeta> OutMeta;
+ const DB::ColumnsWithTypeAndName Columns;
+ const TSourcePosition Pos;
+ const ui32 TzId;
- if (CurrentRow >= CurrentBlock.rows()) {
- CurrentRow = 0;
- if (CurrentBlock = BlockStream->read(); !CurrentBlock) {
- BlockStream.reset();
- Buffer.reset();
- continue;
- }
- }
+ TPlainArrayCache Cache;
- TUnboxedValue* items = nullptr;
- result = Cache.NewArray(*ValueBuilder, items);
- for (ui32 i = 0; i < OutMeta.size(); ++i) {
- *items++ = ConvertOutputValue(CurrentBlock.getByPosition(i).column.get(), OutMeta[i], TzId, ValueBuilder, CurrentRow);
- }
+ TUnboxedValue Input;
+ const TString Type;
+ const DB::FormatSettings Settings;
- ++CurrentRow;
- return EFetchStatus::Ok;
+ std::unique_ptr<DB::ReadBuffer> Buffer;
+ std::unique_ptr<DB::IBlockInputStream> BlockStream;
+ DB::Block CurrentBlock;
+ size_t CurrentRow = 0U;
+};
+
+template<>
+TStreamValue<true>::TStreamValue(const std::string& type, const DB::FormatSettings& settings, const IValueBuilder* valueBuilder, const TUnboxedValue& stream,
+ const std::vector<TColumnMeta> outMeta, const DB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos, ui32 tzId)
+ : ValueBuilder(valueBuilder)
+ , Stream(stream)
+ , OutMeta(outMeta)
+ , Columns(columns)
+ , Pos(pos)
+ , TzId(tzId)
+ , Cache(OutMeta.size())
+ , Type(type)
+ , Settings(settings)
+{}
+
+template<>
+TStreamValue<false>::TStreamValue(const std::string& type, const DB::FormatSettings& settings, const IValueBuilder* valueBuilder, const TUnboxedValue& stream,
+ const std::vector<TColumnMeta> outMeta, const DB::ColumnsWithTypeAndName& columns, const TSourcePosition& pos, ui32 tzId)
+ : ValueBuilder(valueBuilder)
+ , Stream(stream)
+ , OutMeta(outMeta)
+ , Columns(columns)
+ , Pos(pos)
+ , TzId(tzId)
+ , Cache(OutMeta.size())
+ , Type(type)
+ , Settings(settings)
+ , Buffer(std::make_unique<TReadBufferFromStream>(std::move(stream)))
+ , BlockStream(std::make_unique<DB::InputStreamFromInputFormat>(DB::FormatFactory::instance().getInputFormat(Type, *Buffer, DB::Block(Columns), nullptr, 1024 * 1024, Settings)))
+ , CurrentBlock(BlockStream->read())
+{}
+
+template<>
+EFetchStatus TStreamValue<true>::Fetch(TUnboxedValue& result) try {
+ for (;;) {
+ if (!BlockStream) {
+ if (const auto status = Stream.Fetch(Input); EFetchStatus::Ok != status)
+ return status;
+
+ const std::string_view buffer = Input.AsStringRef();
+ Buffer = std::make_unique<DB::ReadBufferFromMemory>(buffer.data(), buffer.size());
+ BlockStream = std::make_unique<DB::InputStreamFromInputFormat>(DB::FormatFactory::instance().getInputFormat(Type, *Buffer, DB::Block(Columns), nullptr, buffer.size(), Settings));
+ }
+
+ if (CurrentRow >= CurrentBlock.rows()) {
+ CurrentRow = 0;
+ if (CurrentBlock = BlockStream->read(); !CurrentBlock) {
+ BlockStream.reset();
+ Buffer.reset();
+ continue;
}
}
- catch (const Poco::Exception& e) {
- UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
- }
- catch (const std::exception& e) {
- UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
+
+ TUnboxedValue* items = nullptr;
+ result = Cache.NewArray(*ValueBuilder, items);
+ for (ui32 i = 0; i < OutMeta.size(); ++i) {
+ *items++ = ConvertOutputValue(CurrentBlock.getByPosition(i).column.get(), OutMeta[i], TzId, ValueBuilder, CurrentRow);
}
- private:
- const IValueBuilder* ValueBuilder;
- const TUnboxedValue Stream;
- const std::vector<TColumnMeta> OutMeta;
- const DB::ColumnsWithTypeAndName Columns;
- const TSourcePosition Pos;
- const ui32 TzId;
- TPlainArrayCache Cache;
+ ++CurrentRow;
+ return EFetchStatus::Ok;
+ }
+}
+catch (const Poco::Exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
+}
+catch (const std::exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
+}
- TUnboxedValue Input;
- const TString Type;
- const DB::FormatSettings Settings;
+template<>
+EFetchStatus TStreamValue<false>::Fetch(TUnboxedValue& result) try {
+ if (CurrentRow >= CurrentBlock.rows()) {
+ CurrentRow = 0;
+ CurrentBlock = BlockStream->read();
- std::unique_ptr<DB::ReadBuffer> Buffer;
- std::unique_ptr<DB::IBlockInputStream> BlockStream;
- DB::Block CurrentBlock;
- size_t CurrentRow = 0;
- };
+ if (!CurrentBlock)
+ return EFetchStatus::Finish;
+ }
+
+ TUnboxedValue* items = nullptr;
+ result = Cache.NewArray(*ValueBuilder, items);
+ for (ui32 i = 0; i < OutMeta.size(); ++i) {
+ *items++ = ConvertOutputValue(CurrentBlock.getByPosition(i).column.get(), OutMeta[i], TzId, ValueBuilder, CurrentRow);
+ }
+ ++CurrentRow;
+ return EFetchStatus::Ok;
+}
+catch (const Poco::Exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
+}
+catch (const std::exception& e) {
+ UdfTerminate((TStringBuilder() << ValueBuilder->WithCalleePosition(Pos) << " " << e.what()).data());
+}
+
+template<bool WithYields>
+class TParseFormat : public TBoxedValue {
+public:
TParseFormat(const std::string_view& type, const std::string_view& settings, const TSourcePosition& pos, std::vector<TColumnMeta>&& outMeta, DB::ColumnsWithTypeAndName&& columns)
: Type(type), Settings(GetFormatSettings(settings)), Pos(pos), OutMeta(std::move(outMeta)), Columns(std::move(columns))
{}
@@ -528,7 +604,7 @@ public:
}
}
- return TUnboxedValuePod(new TStreamValue(Type, Settings, valueBuilder, *args, OutMeta, Columns, Pos, tzId));
+ return TUnboxedValuePod(new TStreamValue<WithYields>(Type, Settings, valueBuilder, *args, OutMeta, Columns, Pos, tzId));
}
catch (const Poco::Exception& e) {
UdfTerminate((TStringBuilder() << valueBuilder->WithCalleePosition(Pos) << " " << e.displayText()).data());
@@ -716,6 +792,7 @@ public:
void GetAllFunctions(IFunctionsSink& sink) const final {
sink.Add(TStringRef::Of("ToYqlType"));
sink.Add(TStringRef::Of("ParseFormat"))->SetTypeAwareness();
+ sink.Add(TStringRef::Of("ParseSource"))->SetTypeAwareness();
sink.Add(TStringRef::Of("ParseFromYdb"))->SetTypeAwareness();
}
@@ -741,7 +818,7 @@ public:
}
const auto typeHelper = builder.TypeInfoHelper();
- if (name == "ParseFormat") {
+ if (name == "ParseFormat" || name == "ParseSource") {
const auto userTypeInspector = TTupleTypeInspector(*typeHelper, userType);
if (!userTypeInspector || userTypeInspector.GetElementsCount() < 3) {
return builder.SetError("Invalid user type.");
@@ -785,7 +862,10 @@ public:
if (!(flags & TFlags::TypesOnly)) {
const std::string_view& typeCfg = typeConfig;
const auto jsonFrom = typeCfg.find('{');
- builder.Implementation(new TParseFormat(typeCfg.substr(0U, jsonFrom), std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom), builder.GetSourcePosition(), std::move(outMeta), std::move(columns)));
+ if (name == "ParseFormat")
+ builder.Implementation(new TParseFormat<true>(typeCfg.substr(0U, jsonFrom), std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom), builder.GetSourcePosition(), std::move(outMeta), std::move(columns)));
+ else
+ builder.Implementation(new TParseFormat<false>(typeCfg.substr(0U, jsonFrom), std::string_view::npos == jsonFrom ? "" : typeCfg.substr(jsonFrom), builder.GetSourcePosition(), std::move(outMeta), std::move(columns)));
}
return;
} else {