diff options
author | whatsername <whatsername@yandex-team.com> | 2023-12-01 03:12:29 +0300 |
---|---|---|
committer | whatsername <whatsername@yandex-team.com> | 2023-12-01 03:43:03 +0300 |
commit | daa13a041fb3abb4eb601846282c2398523e39d2 (patch) | |
tree | f2a0b3da8af96c1d948c49721a80dc36745c96a9 | |
parent | d45f28ae1b15b45f5026ae1add338558570a49d9 (diff) | |
download | ydb-daa13a041fb3abb4eb601846282c2398523e39d2.tar.gz |
ROREN-48: Use single table writer in TWriteProtoParDo
-rw-r--r-- | yt/cpp/mapreduce/io/proto_table_writer.cpp | 26 | ||||
-rw-r--r-- | yt/cpp/mapreduce/io/proto_table_writer.h | 16 |
2 files changed, 41 insertions, 1 deletions
diff --git a/yt/cpp/mapreduce/io/proto_table_writer.cpp b/yt/cpp/mapreduce/io/proto_table_writer.cpp index 955bba26e1..62593ff21e 100644 --- a/yt/cpp/mapreduce/io/proto_table_writer.cpp +++ b/yt/cpp/mapreduce/io/proto_table_writer.cpp @@ -7,6 +7,8 @@ #include <yt/cpp/mapreduce/interface/io.h> +#include <yt/cpp/mapreduce/io/job_writer.h> + #include <yt/yt_proto/yt/formats/extension.pb.h> #include <google/protobuf/unknown_field_set.h> @@ -191,4 +193,28 @@ void TLenvalProtoTableWriter::Abort() //////////////////////////////////////////////////////////////////////////////// +TLenvalProtoSingleTableWriter::TLenvalProtoSingleTableWriter( + THolder<IProxyOutput> output, + const Descriptor* descriptor) + : TLenvalProtoTableWriter(std::move(output), {descriptor}) +{ } + +void TLenvalProtoSingleTableWriter::AddRow(const Message& row, size_t tableIndex) +{ + ValidateProtoDescriptor(row, 0, Descriptors_, false); + + Y_ABORT_UNLESS(row.GetReflection()->GetUnknownFields(row).empty(), + "Message has unknown fields. This probably means bug in client code.\n" + "Message: %s", row.DebugString().data()); + + auto* stream = Output_->GetStream(tableIndex); + i32 size = row.ByteSize(); + stream->Write(&size, sizeof(size)); + bool serializedOk = row.SerializeToArcadiaStream(stream); + Y_ENSURE(serializedOk, "Failed to serialize protobuf message"); + Output_->OnRowFinished(tableIndex); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT diff --git a/yt/cpp/mapreduce/io/proto_table_writer.h b/yt/cpp/mapreduce/io/proto_table_writer.h index 1bf43d2f98..532cf3302b 100644 --- a/yt/cpp/mapreduce/io/proto_table_writer.h +++ b/yt/cpp/mapreduce/io/proto_table_writer.h @@ -50,7 +50,7 @@ public: void FinishTable(size_t) override; void Abort() override; -private: +protected: THolder<IProxyOutput> Output_; TVector<const ::google::protobuf::Descriptor*> Descriptors_; }; @@ -65,4 +65,18 @@ TNode MakeNodeFromMessage(const ::google::protobuf::Message& row); //////////////////////////////////////////////////////////////////////////////// +class TLenvalProtoSingleTableWriter + : public TLenvalProtoTableWriter +{ +public: + TLenvalProtoSingleTableWriter( + THolder<IProxyOutput> output, + const ::google::protobuf::Descriptor* descriptor); + ~TLenvalProtoSingleTableWriter() override = default; + + void AddRow(const Message& row, size_t tableIndex) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT |