aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/io/proto_table_writer.cpp
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-06-30 11:13:34 +0300
committermax42 <max42@yandex-team.com>2023-06-30 11:13:34 +0300
commit3e1899838408bbad47622007aa382bc8a2b01f87 (patch)
tree0f21c1e6add187ddb6c3ccc048a7d640ce03fb87 /yt/cpp/mapreduce/io/proto_table_writer.cpp
parent5463eb3f5e72a86f858a3d27c886470a724ede34 (diff)
downloadydb-3e1899838408bbad47622007aa382bc8a2b01f87.tar.gz
Revert "YT-19324: move YT provider to ydb/library/yql"
This reverts commit ca272f12fdd0e8d5c3e957fc87939148f1caaf72, reversing changes made to 49f8acfc8b0b5c0071b804423bcf53fda26c7c12.
Diffstat (limited to 'yt/cpp/mapreduce/io/proto_table_writer.cpp')
-rw-r--r--yt/cpp/mapreduce/io/proto_table_writer.cpp184
1 files changed, 0 insertions, 184 deletions
diff --git a/yt/cpp/mapreduce/io/proto_table_writer.cpp b/yt/cpp/mapreduce/io/proto_table_writer.cpp
deleted file mode 100644
index 1ce7811625..0000000000
--- a/yt/cpp/mapreduce/io/proto_table_writer.cpp
+++ /dev/null
@@ -1,184 +0,0 @@
-#include "proto_table_writer.h"
-
-#include "node_table_writer.h"
-#include "proto_helpers.h"
-
-#include <yt/cpp/mapreduce/common/node_builder.h>
-
-#include <yt/cpp/mapreduce/interface/io.h>
-
-#include <yt/yt_proto/yt/formats/extension.pb.h>
-
-#include <google/protobuf/unknown_field_set.h>
-
-namespace NYT {
-
-using ::google::protobuf::Descriptor;
-using ::google::protobuf::FieldDescriptor;
-
-////////////////////////////////////////////////////////////////////////////////
-
-TNode MakeNodeFromMessage(const Message& row)
-{
- TNode node;
- TNodeBuilder builder(&node);
- builder.OnBeginMap();
-
- auto* descriptor = row.GetDescriptor();
- auto* reflection = row.GetReflection();
-
- int count = descriptor->field_count();
- for (int i = 0; i < count; ++i) {
- auto* fieldDesc = descriptor->field(i);
- if (fieldDesc->is_repeated()) {
- Y_ENSURE(reflection->FieldSize(row, fieldDesc) == 0, "Storing repeated protobuf fields is not supported yet");
- continue;
- } else if (!reflection->HasField(row, fieldDesc)) {
- continue;
- }
-
- TString columnName = fieldDesc->options().GetExtension(column_name);
- if (columnName.empty()) {
- const auto& keyColumnName = fieldDesc->options().GetExtension(key_column_name);
- columnName = keyColumnName.empty() ? fieldDesc->name() : keyColumnName;
- }
-
- builder.OnKeyedItem(columnName);
-
- switch (fieldDesc->type()) {
- case FieldDescriptor::TYPE_STRING:
- case FieldDescriptor::TYPE_BYTES:
- builder.OnStringScalar(reflection->GetString(row, fieldDesc));
- break;
- case FieldDescriptor::TYPE_INT64:
- case FieldDescriptor::TYPE_SINT64:
- case FieldDescriptor::TYPE_SFIXED64:
- builder.OnInt64Scalar(reflection->GetInt64(row, fieldDesc));
- break;
- case FieldDescriptor::TYPE_INT32:
- case FieldDescriptor::TYPE_SINT32:
- case FieldDescriptor::TYPE_SFIXED32:
- builder.OnInt64Scalar(reflection->GetInt32(row, fieldDesc));
- break;
- case FieldDescriptor::TYPE_UINT64:
- case FieldDescriptor::TYPE_FIXED64:
- builder.OnUint64Scalar(reflection->GetUInt64(row, fieldDesc));
- break;
- case FieldDescriptor::TYPE_UINT32:
- case FieldDescriptor::TYPE_FIXED32:
- builder.OnUint64Scalar(reflection->GetUInt32(row, fieldDesc));
- break;
- case FieldDescriptor::TYPE_DOUBLE:
- builder.OnDoubleScalar(reflection->GetDouble(row, fieldDesc));
- break;
- case FieldDescriptor::TYPE_FLOAT:
- builder.OnDoubleScalar(reflection->GetFloat(row, fieldDesc));
- break;
- case FieldDescriptor::TYPE_BOOL:
- builder.OnBooleanScalar(reflection->GetBool(row, fieldDesc));
- break;
- case FieldDescriptor::TYPE_ENUM:
- builder.OnStringScalar(reflection->GetEnum(row, fieldDesc)->name());
- break;
- case FieldDescriptor::TYPE_MESSAGE:
- builder.OnStringScalar(reflection->GetMessage(row, fieldDesc).SerializeAsString());
- break;
- default:
- ythrow yexception() << "Invalid field type for column: " << columnName;
- break;
- }
- }
-
- builder.OnEndMap();
- return node;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TProtoTableWriter::TProtoTableWriter(
- THolder<IProxyOutput> output,
- TVector<const Descriptor*>&& descriptors)
- : NodeWriter_(new TNodeTableWriter(std::move(output)))
- , Descriptors_(std::move(descriptors))
-{ }
-
-TProtoTableWriter::~TProtoTableWriter()
-{ }
-
-size_t TProtoTableWriter::GetTableCount() const
-{
- return NodeWriter_->GetTableCount();
-}
-
-void TProtoTableWriter::FinishTable(size_t tableIndex)
-{
- NodeWriter_->FinishTable(tableIndex);
-}
-
-void TProtoTableWriter::AddRow(const Message& row, size_t tableIndex)
-{
- NodeWriter_->AddRow(MakeNodeFromMessage(row), tableIndex);
-}
-
-void TProtoTableWriter::AddRow(Message&& row, size_t tableIndex)
-{
- TProtoTableWriter::AddRow(row, tableIndex);
-}
-
-
-void TProtoTableWriter::Abort()
-{
- NodeWriter_->Abort();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-TLenvalProtoTableWriter::TLenvalProtoTableWriter(
- THolder<IProxyOutput> output,
- TVector<const Descriptor*>&& descriptors)
- : Output_(std::move(output))
- , Descriptors_(std::move(descriptors))
-{ }
-
-TLenvalProtoTableWriter::~TLenvalProtoTableWriter()
-{ }
-
-size_t TLenvalProtoTableWriter::GetTableCount() const
-{
- return Output_->GetStreamCount();
-}
-
-void TLenvalProtoTableWriter::FinishTable(size_t tableIndex)
-{
- Output_->GetStream(tableIndex)->Finish();
-}
-
-void TLenvalProtoTableWriter::AddRow(const Message& row, size_t tableIndex)
-{
- ValidateProtoDescriptor(row, tableIndex, Descriptors_, false);
-
- Y_VERIFY(row.GetReflection()->GetUnknownFields(row).empty(),
- "Message has unknown fields. This probably means bug in client code.\n"
- "Message: %s", row.DebugString().data());
-
- auto* stream = Output_->GetStream(tableIndex);
- i32 size = row.ByteSize();
- stream->Write(&size, sizeof(size));
- bool serializedOk = row.SerializeToArcadiaStream(stream);
- Y_ENSURE(serializedOk, "Failed to serialize protobuf message");
- Output_->OnRowFinished(tableIndex);
-}
-
-void TLenvalProtoTableWriter::AddRow(Message&& row, size_t tableIndex)
-{
- TLenvalProtoTableWriter::AddRow(row, tableIndex);
-}
-
-void TLenvalProtoTableWriter::Abort()
-{
- Output_->Abort();
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT