aboutsummaryrefslogtreecommitdiffstats
path: root/yt
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-11-21 00:02:41 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-11-21 00:12:18 +0300
commit97e1725f1ab6026dfbe6b8d26955ab3f75279f0d (patch)
tree14e88bf66563f2cc514ed1b1fe8af098f2a0469e /yt
parent23af03dc8b791ccb81bd4ac9f671b1ddfdb8878f (diff)
downloadydb-97e1725f1ab6026dfbe6b8d26955ab3f75279f0d.tar.gz
Intermediate changes
commit_hash:c9890b6966b6c4d2d0cb13bda9b30004d232450e
Diffstat (limited to 'yt')
-rw-r--r--yt/yt/client/formats/versioned_writer.cpp28
-rw-r--r--yt/yt/client/formats/versioned_writer.h4
2 files changed, 30 insertions, 2 deletions
diff --git a/yt/yt/client/formats/versioned_writer.cpp b/yt/yt/client/formats/versioned_writer.cpp
index 056c86d5e1..545112d825 100644
--- a/yt/yt/client/formats/versioned_writer.cpp
+++ b/yt/yt/client/formats/versioned_writer.cpp
@@ -1,10 +1,14 @@
#include "versioned_writer.h"
#include "config.h"
+#include <yt/yt/client/table_client/logical_type.h>
+#include <yt/yt/client/table_client/name_table.h>
+
#include <yt/yt/core/concurrency/async_stream.h>
namespace NYT::NFormats {
+using namespace NComplexTypes;
using namespace NConcurrency;
using namespace NYson;
using namespace NTableClient;
@@ -18,7 +22,20 @@ TVersionedWriter::TVersionedWriter(
: Stream_(std::move(stream))
, Schema_(std::move(schema))
, Consumer_(consumerBuilder(&Buffer_))
-{ }
+{
+ auto nameTable = TNameTable::FromSchema(*Schema_);
+
+ for (const auto& column : Schema_->Columns()) {
+ if (IsV3Composite(column.LogicalType())) {
+ auto id = nameTable->GetIdOrThrow(column.Name());
+ TComplexTypeFieldDescriptor descriptor(column.Name(), column.LogicalType());
+ auto converter = CreateYsonServerToClientConverter(descriptor, /*config*/ {});
+ if (converter) {
+ ColumnConverters_.emplace(id, std::move(converter));
+ }
+ }
+ }
+}
TFuture<void> TVersionedWriter::Close()
{
@@ -52,7 +69,14 @@ bool TVersionedWriter::Write(TRange<TVersionedRow> rows)
case EValueType::Any:
Consumer_->OnRaw(value.AsStringBuf(), EYsonType::Node);
return;
- case EValueType::Composite:
+ case EValueType::Composite: {
+ if (auto it = ColumnConverters_.find(value.Id); it != ColumnConverters_.end()) {
+ it->second(value, Consumer_.get());
+ } else {
+ Consumer_->OnRaw(value.AsStringBuf(), EYsonType::Node);
+ }
+ return;
+ }
case EValueType::Min:
case EValueType::Max:
case EValueType::TheBottom:
diff --git a/yt/yt/client/formats/versioned_writer.h b/yt/yt/client/formats/versioned_writer.h
index 1427d5bf71..4432ddfe52 100644
--- a/yt/yt/client/formats/versioned_writer.h
+++ b/yt/yt/client/formats/versioned_writer.h
@@ -2,6 +2,8 @@
#include <yt/yt/client/formats/public.h>
+#include <yt/yt/client/complex_types/yson_format_conversion.h>
+
#include <yt/yt/client/table_client/versioned_writer.h>
#include <yt/yt/client/table_client/schema.h>
@@ -40,6 +42,8 @@ private:
TFuture<void> Result_;
const std::unique_ptr<NYson::IFlushableYsonConsumer> Consumer_;
+
+ THashMap<int, NComplexTypes::TYsonServerToClientConverter> ColumnConverters_;
};
////////////////////////////////////////////////////////////////////////////////