aboutsummaryrefslogtreecommitdiffstats
path: root/yt/cpp/mapreduce/io/proto_table_reader.cpp
diff options
context:
space:
mode:
authormax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
committermax42 <max42@yandex-team.com>2023-07-29 00:02:16 +0300
commit73b89de71748a21e102d27b9f3ed1bf658766cb5 (patch)
tree188bbd2d622fa91cdcbb1b6d6d77fbc84a0646f5 /yt/cpp/mapreduce/io/proto_table_reader.cpp
parent528e321bcc2a2b67b53aeba58c3bd88305a141ee (diff)
downloadydb-73b89de71748a21e102d27b9f3ed1bf658766cb5.tar.gz
YT-19210: expose YQL shared library for YT.
After this, a new target libyqlplugin.so appears. in open-source cmake build. Diff in open-source YDB repo looks like the following: https://paste.yandex-team.ru/f302bdb4-7ef2-4362-91c7-6ca45f329264
Diffstat (limited to 'yt/cpp/mapreduce/io/proto_table_reader.cpp')
-rw-r--r--yt/cpp/mapreduce/io/proto_table_reader.cpp305
1 files changed, 305 insertions, 0 deletions
diff --git a/yt/cpp/mapreduce/io/proto_table_reader.cpp b/yt/cpp/mapreduce/io/proto_table_reader.cpp
new file mode 100644
index 0000000000..28a4bc8719
--- /dev/null
+++ b/yt/cpp/mapreduce/io/proto_table_reader.cpp
@@ -0,0 +1,305 @@
+#include "proto_table_reader.h"
+
+#include "node_table_reader.h"
+
+#include "proto_helpers.h"
+
+#include <yt/yt_proto/yt/formats/extension.pb.h>
+
+#include <util/string/escape.h>
+#include <util/string/printf.h>
+
+namespace NYT {
+
+using ::google::protobuf::Descriptor;
+using ::google::protobuf::FieldDescriptor;
+using ::google::protobuf::EnumValueDescriptor;
+
+const TString& GetFieldColumnName(const FieldDescriptor* fieldDesc) {
+ const auto& columnName = fieldDesc->options().GetExtension(column_name);
+ if (!columnName.empty()) {
+ return columnName;
+ }
+ const auto& keyColumnName = fieldDesc->options().GetExtension(key_column_name);
+ if (!keyColumnName.empty()) {
+ return keyColumnName;
+ }
+ return fieldDesc->name();
+}
+
+void ReadMessageFromNode(const TNode& node, Message* row)
+{
+ auto* descriptor = row->GetDescriptor();
+ auto* reflection = row->GetReflection();
+
+ int count = descriptor->field_count();
+ for (int i = 0; i < count; ++i) {
+ auto* fieldDesc = descriptor->field(i);
+
+ const auto& columnName = GetFieldColumnName(fieldDesc);
+
+ const auto& nodeMap = node.AsMap();
+ auto it = nodeMap.find(columnName);
+ if (it == nodeMap.end()) {
+ continue; // no such column
+ }
+ auto actualType = it->second.GetType();
+ if (actualType == TNode::Null) {
+ continue; // null field
+ }
+
+ auto checkType = [&columnName] (TNode::EType expected, TNode::EType actual) {
+ if (expected != actual) {
+ ythrow TNode::TTypeError() << "expected node type " << expected
+ << ", actual " << actual << " for node " << columnName.data();
+ }
+ };
+
+ switch (fieldDesc->type()) {
+ case FieldDescriptor::TYPE_STRING:
+ case FieldDescriptor::TYPE_BYTES:
+ checkType(TNode::String, actualType);
+ reflection->SetString(row, fieldDesc, it->second.AsString());
+ break;
+ case FieldDescriptor::TYPE_INT64:
+ case FieldDescriptor::TYPE_SINT64:
+ case FieldDescriptor::TYPE_SFIXED64:
+ checkType(TNode::Int64, actualType);
+ reflection->SetInt64(row, fieldDesc, it->second.AsInt64());
+ break;
+ case FieldDescriptor::TYPE_INT32:
+ case FieldDescriptor::TYPE_SINT32:
+ case FieldDescriptor::TYPE_SFIXED32:
+ checkType(TNode::Int64, actualType);
+ reflection->SetInt32(row, fieldDesc, it->second.AsInt64());
+ break;
+ case FieldDescriptor::TYPE_UINT64:
+ case FieldDescriptor::TYPE_FIXED64:
+ checkType(TNode::Uint64, actualType);
+ reflection->SetUInt64(row, fieldDesc, it->second.AsUint64());
+ break;
+ case FieldDescriptor::TYPE_UINT32:
+ case FieldDescriptor::TYPE_FIXED32:
+ checkType(TNode::Uint64, actualType);
+ reflection->SetUInt32(row, fieldDesc, it->second.AsUint64());
+ break;
+ case FieldDescriptor::TYPE_DOUBLE:
+ checkType(TNode::Double, actualType);
+ reflection->SetDouble(row, fieldDesc, it->second.AsDouble());
+ break;
+ case FieldDescriptor::TYPE_FLOAT:
+ checkType(TNode::Double, actualType);
+ reflection->SetFloat(row, fieldDesc, it->second.AsDouble());
+ break;
+ case FieldDescriptor::TYPE_BOOL:
+ checkType(TNode::Bool, actualType);
+ reflection->SetBool(row, fieldDesc, it->second.AsBool());
+ break;
+ case FieldDescriptor::TYPE_ENUM: {
+ TNode::EType columnType = TNode::String;
+ for (const auto& flag : fieldDesc->options().GetRepeatedExtension(flags)) {
+ if (flag == EWrapperFieldFlag::ENUM_INT) {
+ columnType = TNode::Int64;
+ break;
+ }
+ }
+ checkType(columnType, actualType);
+
+ const EnumValueDescriptor* valueDesc = nullptr;
+ TString stringValue;
+ if (columnType == TNode::String) {
+ const auto& value = it->second.AsString();
+ valueDesc = fieldDesc->enum_type()->FindValueByName(value);
+ stringValue = value;
+ } else if (columnType == TNode::Int64) {
+ const auto& value = it->second.AsInt64();
+ valueDesc = fieldDesc->enum_type()->FindValueByNumber(value);
+ stringValue = ToString(value);
+ } else {
+ Y_FAIL();
+ }
+
+ if (valueDesc == nullptr) {
+ ythrow yexception() << "Failed to parse value '" << EscapeC(stringValue) << "' as " << fieldDesc->enum_type()->full_name();
+ }
+
+ reflection->SetEnum(row, fieldDesc, valueDesc);
+
+ break;
+ }
+ case FieldDescriptor::TYPE_MESSAGE: {
+ checkType(TNode::String, actualType);
+ Message* message = reflection->MutableMessage(row, fieldDesc);
+ if (!message->ParseFromArray(it->second.AsString().data(), it->second.AsString().size())) {
+ ythrow yexception() << "Failed to parse protobuf message";
+ }
+ break;
+ }
+ default:
+ ythrow yexception() << "Incorrect protobuf type";
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TProtoTableReader::TProtoTableReader(
+ ::TIntrusivePtr<TRawTableReader> input,
+ TVector<const Descriptor*>&& descriptors)
+ : NodeReader_(new TNodeTableReader(std::move(input)))
+ , Descriptors_(std::move(descriptors))
+{ }
+
+TProtoTableReader::~TProtoTableReader()
+{ }
+
+void TProtoTableReader::ReadRow(Message* row)
+{
+ const auto& node = NodeReader_->GetRow();
+ ReadMessageFromNode(node, row);
+}
+
+bool TProtoTableReader::IsValid() const
+{
+ return NodeReader_->IsValid();
+}
+
+void TProtoTableReader::Next()
+{
+ NodeReader_->Next();
+}
+
+ui32 TProtoTableReader::GetTableIndex() const
+{
+ return NodeReader_->GetTableIndex();
+}
+
+ui32 TProtoTableReader::GetRangeIndex() const
+{
+ return NodeReader_->GetRangeIndex();
+}
+
+ui64 TProtoTableReader::GetRowIndex() const
+{
+ return NodeReader_->GetRowIndex();
+}
+
+void TProtoTableReader::NextKey()
+{
+ NodeReader_->NextKey();
+}
+
+TMaybe<size_t> TProtoTableReader::GetReadByteCount() const
+{
+ return NodeReader_->GetReadByteCount();
+}
+
+bool TProtoTableReader::IsEndOfStream() const
+{
+ return NodeReader_->IsEndOfStream();
+}
+
+bool TProtoTableReader::IsRawReaderExhausted() const
+{
+ return NodeReader_->IsRawReaderExhausted();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+TLenvalProtoTableReader::TLenvalProtoTableReader(
+ ::TIntrusivePtr<TRawTableReader> input,
+ TVector<const Descriptor*>&& descriptors)
+ : TLenvalTableReader(std::move(input))
+ , Descriptors_(std::move(descriptors))
+{ }
+
+TLenvalProtoTableReader::~TLenvalProtoTableReader()
+{ }
+
+void TLenvalProtoTableReader::ReadRow(Message* row)
+{
+ ValidateProtoDescriptor(*row, GetTableIndex(), Descriptors_, true);
+
+ while (true) {
+ try {
+ ParseFromArcadiaStream(&Input_, *row, Length_);
+ RowTaken_ = true;
+
+ // We successfully parsed one more row from the stream,
+ // so reset retry count to their initial value.
+ Input_.ResetRetries();
+
+ break;
+ } catch (const std::exception& ) {
+ if (!TLenvalTableReader::Retry()) {
+ throw;
+ }
+ }
+ }
+}
+
+bool TLenvalProtoTableReader::IsValid() const
+{
+ return TLenvalTableReader::IsValid();
+}
+
+void TLenvalProtoTableReader::Next()
+{
+ TLenvalTableReader::Next();
+}
+
+ui32 TLenvalProtoTableReader::GetTableIndex() const
+{
+ return TLenvalTableReader::GetTableIndex();
+}
+
+ui32 TLenvalProtoTableReader::GetRangeIndex() const
+{
+ return TLenvalTableReader::GetRangeIndex();
+}
+
+ui64 TLenvalProtoTableReader::GetRowIndex() const
+{
+ return TLenvalTableReader::GetRowIndex();
+}
+
+void TLenvalProtoTableReader::NextKey()
+{
+ TLenvalTableReader::NextKey();
+}
+
+TMaybe<size_t> TLenvalProtoTableReader::GetReadByteCount() const
+{
+ return TLenvalTableReader::GetReadByteCount();
+}
+
+bool TLenvalProtoTableReader::IsEndOfStream() const
+{
+ return TLenvalTableReader::IsEndOfStream();
+}
+
+bool TLenvalProtoTableReader::IsRawReaderExhausted() const
+{
+ return TLenvalTableReader::IsRawReaderExhausted();
+}
+
+void TLenvalProtoTableReader::SkipRow()
+{
+ while (true) {
+ try {
+ size_t skipped = Input_.Skip(Length_);
+ if (skipped != Length_) {
+ ythrow yexception() << "Premature end of stream";
+ }
+ break;
+ } catch (const std::exception& ) {
+ if (!TLenvalTableReader::Retry()) {
+ throw;
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT