summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorachains <[email protected]>2025-08-14 11:05:32 +0300
committerachains <[email protected]>2025-08-14 11:52:13 +0300
commit322ee7d149464c6f18d6a330d937227cb022b9f3 (patch)
tree38b37d0d2c35b493d94a783d41a1b796ace4edd2
parent491a3d707dcdb960c48b673a67f0bd88fc2bb87d (diff)
YT-25072: add blob format writer
* Changelog entry Type: feature Component: proxy Support blob format commit_hash:9e931f3208087abb0a45c6c0c06589fb54b8c105
-rw-r--r--yt/yt/client/formats/config.cpp10
-rw-r--r--yt/yt/client/formats/config.h15
-rw-r--r--yt/yt/client/formats/public.h2
-rw-r--r--yt/yt/library/formats/blob_writer.cpp178
-rw-r--r--yt/yt/library/formats/blob_writer.h31
-rw-r--r--yt/yt/library/formats/format.cpp9
-rw-r--r--yt/yt/library/formats/ya.make1
7 files changed, 246 insertions, 0 deletions
diff --git a/yt/yt/client/formats/config.cpp b/yt/yt/client/formats/config.cpp
index 89f0a2d27fa..96f847154e4 100644
--- a/yt/yt/client/formats/config.cpp
+++ b/yt/yt/client/formats/config.cpp
@@ -368,4 +368,14 @@ void TArrowFormatConfig::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
+void TBlobFormatConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("part_index_column_name", &TThis::PartIndexColumnName)
+ .Default();
+ registrar.Parameter("data_column_name", &TThis::DataColumnName)
+ .Default();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NFormats
diff --git a/yt/yt/client/formats/config.h b/yt/yt/client/formats/config.h
index 2016db008c3..b670f1cf6e0 100644
--- a/yt/yt/client/formats/config.h
+++ b/yt/yt/client/formats/config.h
@@ -434,4 +434,19 @@ DEFINE_REFCOUNTED_TYPE(TArrowFormatConfig)
////////////////////////////////////////////////////////////////////////////////
+struct TBlobFormatConfig
+ : public NYTree::TYsonStruct
+{
+ std::optional<std::string> PartIndexColumnName;
+ std::optional<std::string> DataColumnName;
+
+ REGISTER_YSON_STRUCT(TBlobFormatConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TBlobFormatConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NFormats
diff --git a/yt/yt/client/formats/public.h b/yt/yt/client/formats/public.h
index 555285b44aa..d9b4127ad84 100644
--- a/yt/yt/client/formats/public.h
+++ b/yt/yt/client/formats/public.h
@@ -59,6 +59,7 @@ DEFINE_ENUM(EFormatType,
(Skiff)
(Arrow)
(Yaml)
+ (Blob)
);
////////////////////////////////////////////////////////////////////////////////
@@ -79,6 +80,7 @@ DECLARE_REFCOUNTED_STRUCT(TWebJsonFormatConfig)
DECLARE_REFCOUNTED_STRUCT(TSkiffFormatConfig)
DECLARE_REFCOUNTED_STRUCT(TYamlFormatConfig)
DECLARE_REFCOUNTED_STRUCT(TArrowFormatConfig)
+DECLARE_REFCOUNTED_STRUCT(TBlobFormatConfig)
DECLARE_REFCOUNTED_STRUCT(IYamrConsumer)
diff --git a/yt/yt/library/formats/blob_writer.cpp b/yt/yt/library/formats/blob_writer.cpp
new file mode 100644
index 00000000000..91d3e7ae3a5
--- /dev/null
+++ b/yt/yt/library/formats/blob_writer.cpp
@@ -0,0 +1,178 @@
+#include "blob_writer.h"
+
+#include <yt/yt/client/table_client/blob_reader.h>
+#include <yt/yt/client/table_client/name_table.h>
+
+#include <yt/yt/core/misc/error.h>
+
+namespace NYT::NFormats {
+
+using namespace NConcurrency;
+using namespace NYTree;
+using namespace NTableClient;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TBlobWriter
+ : public TSchemalessFormatWriterBase
+{
+public:
+ TBlobWriter(
+ TNameTablePtr nameTable,
+ IAsyncOutputStreamPtr output,
+ bool enableContextSaving,
+ TControlAttributesConfigPtr controlAttributesConfig,
+ TBlobFormatConfigPtr config);
+private:
+ const TBlobFormatConfigPtr Config_;
+ const std::string DataColumnName_;
+ const std::string PartIndexColumnName_;
+ const int DataColumnId_;
+ const int PartIndexColumnId_;
+
+ std::optional<i64> LastPartIndex_;
+
+ void DoWrite(TRange<TUnversionedRow> rows) override;
+
+ TUnversionedValue GetTypedValue(
+ TUnversionedRow row,
+ int columnId,
+ std::string_view columnName,
+ EValueType expectedType) const;
+
+ void ValidatePartIndex(i64 currentPartIndex) const;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+TBlobWriter::TBlobWriter(
+ TNameTablePtr nameTable,
+ IAsyncOutputStreamPtr output,
+ bool enableContextSaving,
+ TControlAttributesConfigPtr controlAttributesConfig,
+ TBlobFormatConfigPtr config)
+ : TSchemalessFormatWriterBase(
+ nameTable,
+ std::move(output),
+ enableContextSaving,
+ controlAttributesConfig,
+ /*keyColumnCount*/ 0)
+ , Config_(config)
+ , DataColumnName_(Config_->DataColumnName.value_or(TBlobTableSchema::DataColumn))
+ , PartIndexColumnName_(Config_->PartIndexColumnName.value_or(TBlobTableSchema::PartIndexColumn))
+ , DataColumnId_(NameTable_->GetIdOrRegisterName(DataColumnName_))
+ , PartIndexColumnId_(NameTable_->GetIdOrRegisterName(PartIndexColumnName_))
+ { }
+
+void TBlobWriter::DoWrite(TRange<TUnversionedRow> rows)
+{
+ auto* output = GetOutputStream();
+ for (auto row : rows) {
+ auto partIndexValue = GetTypedValue(row, PartIndexColumnId_, PartIndexColumnName_, EValueType::Int64);
+ i64 currentPartIndex = partIndexValue.Data.Int64;
+ ValidatePartIndex(currentPartIndex);
+ LastPartIndex_ = currentPartIndex;
+
+ auto dataValue = GetTypedValue(row, DataColumnId_, DataColumnName_, EValueType::String);
+ output->Write(dataValue.AsStringBuf());
+
+ TryFlushBuffer(false);
+ }
+ TryFlushBuffer(true);
+}
+
+TUnversionedValue TBlobWriter::GetTypedValue(
+ TUnversionedRow row,
+ int columnId,
+ std::string_view columnName,
+ EValueType expectedType) const
+{
+ std::optional<TUnversionedValue> foundValue;
+ for (const auto& value : row) {
+ if (value.Id == columnId) {
+ foundValue = value;
+ }
+ }
+
+ if (!foundValue) {
+ THROW_ERROR_EXCEPTION("Column %Qv not found", columnName);
+ }
+
+ if (foundValue->Type != expectedType) {
+ THROW_ERROR_EXCEPTION("Column %Qv must be of type %Qlv but has type %Qlv",
+ columnName,
+ expectedType,
+ foundValue->Type);
+ }
+
+ return *foundValue;
+}
+
+void TBlobWriter::ValidatePartIndex(i64 currentPartIndex) const
+{
+ if (LastPartIndex_ && *LastPartIndex_ + 1 != currentPartIndex) {
+ THROW_ERROR_EXCEPTION("Values of column %Qv must be consecutive but values %v and %v violate this property",
+ PartIndexColumnName_,
+ *LastPartIndex_,
+ currentPartIndex);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+ISchemalessFormatWriterPtr CreateSchemalessWriterForBlob(
+ TBlobFormatConfigPtr config,
+ TNameTablePtr nameTable,
+ IAsyncOutputStreamPtr output,
+ bool enableContextSaving,
+ TControlAttributesConfigPtr controlAttributesConfig,
+ /*keyColumnCount*/ int)
+{
+ if (controlAttributesConfig->EnableKeySwitch) {
+ THROW_ERROR_EXCEPTION("Key switches are not supported in blob format");
+ }
+
+ if (controlAttributesConfig->EnableRangeIndex) {
+ THROW_ERROR_EXCEPTION("Range indices are not supported in blob format");
+ }
+
+ if (controlAttributesConfig->EnableRowIndex) {
+ THROW_ERROR_EXCEPTION("Row indices are not supported in blob format");
+ }
+
+ if (controlAttributesConfig->EnableTableIndex) {
+ THROW_ERROR_EXCEPTION("Table indices are not supported in blob format");
+ }
+
+ if (controlAttributesConfig->EnableTabletIndex) {
+ THROW_ERROR_EXCEPTION("Tablet indices are not supported in blob format");
+ }
+
+ return New<TBlobWriter>(nameTable, output, enableContextSaving, controlAttributesConfig, config);
+}
+
+ISchemalessFormatWriterPtr CreateSchemalessWriterForBlob(
+ const IAttributeDictionary& attributes,
+ TNameTablePtr nameTable,
+ IAsyncOutputStreamPtr output,
+ bool enableContextSaving,
+ TControlAttributesConfigPtr controlAttributesConfig,
+ int keyColumnCount)
+{
+ try {
+ auto config = ConvertTo<TBlobFormatConfigPtr>(&attributes);
+ return CreateSchemalessWriterForBlob(
+ config,
+ nameTable,
+ output,
+ enableContextSaving,
+ controlAttributesConfig,
+ keyColumnCount);
+ } catch (const std::exception& ex) {
+ THROW_ERROR_EXCEPTION(NFormats::EErrorCode::InvalidFormat, "Failed to parse config for blob format") << ex;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NFormats
diff --git a/yt/yt/library/formats/blob_writer.h b/yt/yt/library/formats/blob_writer.h
new file mode 100644
index 00000000000..a88c1b2e758
--- /dev/null
+++ b/yt/yt/library/formats/blob_writer.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include "schemaless_writer_adapter.h"
+
+#include <yt/yt/client/formats/public.h>
+
+#include <yt/yt/core/concurrency/async_stream.h>
+
+namespace NYT::NFormats {
+
+////////////////////////////////////////////////////////////////////////////////
+
+ISchemalessFormatWriterPtr CreateSchemalessWriterForBlob(
+ TBlobFormatConfigPtr config,
+ NTableClient::TNameTablePtr nameTable,
+ NConcurrency::IAsyncOutputStreamPtr output,
+ bool enableContextSaving,
+ TControlAttributesConfigPtr controlAttributesConfig,
+ int keyColumnCount);
+
+ISchemalessFormatWriterPtr CreateSchemalessWriterForBlob(
+ const NYTree::IAttributeDictionary& attributes,
+ NTableClient::TNameTablePtr nameTable,
+ NConcurrency::IAsyncOutputStreamPtr output,
+ bool enableContextSaving,
+ TControlAttributesConfigPtr controlAttributesConfig,
+ int keyColumnCount);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NFormats
diff --git a/yt/yt/library/formats/format.cpp b/yt/yt/library/formats/format.cpp
index 0edbc352616..6ced7de5b25 100644
--- a/yt/yt/library/formats/format.cpp
+++ b/yt/yt/library/formats/format.cpp
@@ -2,6 +2,7 @@
#include "arrow_parser.h"
#include "arrow_writer.h"
+#include "blob_writer.h"
#include "dsv_parser.h"
#include "dsv_writer.h"
#include "protobuf_parser.h"
@@ -357,6 +358,14 @@ ISchemalessFormatWriterPtr CreateStaticTableWriterForFormat(
enableContextSaving,
controlAttributesConfig,
keyColumnCount);
+ case EFormatType::Blob:
+ return CreateSchemalessWriterForBlob(
+ format.Attributes(),
+ nameTable,
+ std::move(output),
+ enableContextSaving,
+ controlAttributesConfig,
+ keyColumnCount);
default:
auto adapter = New<TSchemalessWriterAdapter>(
nameTable,
diff --git a/yt/yt/library/formats/ya.make b/yt/yt/library/formats/ya.make
index 0cd99715429..0deee781899 100644
--- a/yt/yt/library/formats/ya.make
+++ b/yt/yt/library/formats/ya.make
@@ -5,6 +5,7 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
SRCS(
arrow_parser.cpp
arrow_writer.cpp
+ blob_writer.cpp
dsv_parser.cpp
dsv_writer.cpp
escape.cpp