diff options
| author | achains <[email protected]> | 2025-08-14 11:05:32 +0300 |
|---|---|---|
| committer | achains <[email protected]> | 2025-08-14 11:52:13 +0300 |
| commit | 322ee7d149464c6f18d6a330d937227cb022b9f3 (patch) | |
| tree | 38b37d0d2c35b493d94a783d41a1b796ace4edd2 | |
| parent | 491a3d707dcdb960c48b673a67f0bd88fc2bb87d (diff) | |
YT-25072: add blob format writer
* Changelog entry
Type: feature
Component: proxy
Support blob format
commit_hash:9e931f3208087abb0a45c6c0c06589fb54b8c105
| -rw-r--r-- | yt/yt/client/formats/config.cpp | 10 | ||||
| -rw-r--r-- | yt/yt/client/formats/config.h | 15 | ||||
| -rw-r--r-- | yt/yt/client/formats/public.h | 2 | ||||
| -rw-r--r-- | yt/yt/library/formats/blob_writer.cpp | 178 | ||||
| -rw-r--r-- | yt/yt/library/formats/blob_writer.h | 31 | ||||
| -rw-r--r-- | yt/yt/library/formats/format.cpp | 9 | ||||
| -rw-r--r-- | yt/yt/library/formats/ya.make | 1 |
7 files changed, 246 insertions, 0 deletions
diff --git a/yt/yt/client/formats/config.cpp b/yt/yt/client/formats/config.cpp index 89f0a2d27fa..96f847154e4 100644 --- a/yt/yt/client/formats/config.cpp +++ b/yt/yt/client/formats/config.cpp @@ -368,4 +368,14 @@ void TArrowFormatConfig::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// +void TBlobFormatConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("part_index_column_name", &TThis::PartIndexColumnName) + .Default(); + registrar.Parameter("data_column_name", &TThis::DataColumnName) + .Default(); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NFormats diff --git a/yt/yt/client/formats/config.h b/yt/yt/client/formats/config.h index 2016db008c3..b670f1cf6e0 100644 --- a/yt/yt/client/formats/config.h +++ b/yt/yt/client/formats/config.h @@ -434,4 +434,19 @@ DEFINE_REFCOUNTED_TYPE(TArrowFormatConfig) //////////////////////////////////////////////////////////////////////////////// +struct TBlobFormatConfig + : public NYTree::TYsonStruct +{ + std::optional<std::string> PartIndexColumnName; + std::optional<std::string> DataColumnName; + + REGISTER_YSON_STRUCT(TBlobFormatConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TBlobFormatConfig) + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NFormats diff --git a/yt/yt/client/formats/public.h b/yt/yt/client/formats/public.h index 555285b44aa..d9b4127ad84 100644 --- a/yt/yt/client/formats/public.h +++ b/yt/yt/client/formats/public.h @@ -59,6 +59,7 @@ DEFINE_ENUM(EFormatType, (Skiff) (Arrow) (Yaml) + (Blob) ); //////////////////////////////////////////////////////////////////////////////// @@ -79,6 +80,7 @@ DECLARE_REFCOUNTED_STRUCT(TWebJsonFormatConfig) DECLARE_REFCOUNTED_STRUCT(TSkiffFormatConfig) DECLARE_REFCOUNTED_STRUCT(TYamlFormatConfig) DECLARE_REFCOUNTED_STRUCT(TArrowFormatConfig) +DECLARE_REFCOUNTED_STRUCT(TBlobFormatConfig) DECLARE_REFCOUNTED_STRUCT(IYamrConsumer) diff --git a/yt/yt/library/formats/blob_writer.cpp b/yt/yt/library/formats/blob_writer.cpp new file mode 100644 index 00000000000..91d3e7ae3a5 --- /dev/null +++ b/yt/yt/library/formats/blob_writer.cpp @@ -0,0 +1,178 @@ +#include "blob_writer.h" + +#include <yt/yt/client/table_client/blob_reader.h> +#include <yt/yt/client/table_client/name_table.h> + +#include <yt/yt/core/misc/error.h> + +namespace NYT::NFormats { + +using namespace NConcurrency; +using namespace NYTree; +using namespace NTableClient; + +//////////////////////////////////////////////////////////////////////////////// + +class TBlobWriter + : public TSchemalessFormatWriterBase +{ +public: + TBlobWriter( + TNameTablePtr nameTable, + IAsyncOutputStreamPtr output, + bool enableContextSaving, + TControlAttributesConfigPtr controlAttributesConfig, + TBlobFormatConfigPtr config); +private: + const TBlobFormatConfigPtr Config_; + const std::string DataColumnName_; + const std::string PartIndexColumnName_; + const int DataColumnId_; + const int PartIndexColumnId_; + + std::optional<i64> LastPartIndex_; + + void DoWrite(TRange<TUnversionedRow> rows) override; + + TUnversionedValue GetTypedValue( + TUnversionedRow row, + int columnId, + std::string_view columnName, + EValueType expectedType) const; + + void ValidatePartIndex(i64 currentPartIndex) const; +}; + +//////////////////////////////////////////////////////////////////////////////// + +TBlobWriter::TBlobWriter( + TNameTablePtr nameTable, + IAsyncOutputStreamPtr output, + bool enableContextSaving, + TControlAttributesConfigPtr controlAttributesConfig, + TBlobFormatConfigPtr config) + : TSchemalessFormatWriterBase( + nameTable, + std::move(output), + enableContextSaving, + controlAttributesConfig, + /*keyColumnCount*/ 0) + , Config_(config) + , DataColumnName_(Config_->DataColumnName.value_or(TBlobTableSchema::DataColumn)) + , PartIndexColumnName_(Config_->PartIndexColumnName.value_or(TBlobTableSchema::PartIndexColumn)) + , DataColumnId_(NameTable_->GetIdOrRegisterName(DataColumnName_)) + , PartIndexColumnId_(NameTable_->GetIdOrRegisterName(PartIndexColumnName_)) + { } + +void TBlobWriter::DoWrite(TRange<TUnversionedRow> rows) +{ + auto* output = GetOutputStream(); + for (auto row : rows) { + auto partIndexValue = GetTypedValue(row, PartIndexColumnId_, PartIndexColumnName_, EValueType::Int64); + i64 currentPartIndex = partIndexValue.Data.Int64; + ValidatePartIndex(currentPartIndex); + LastPartIndex_ = currentPartIndex; + + auto dataValue = GetTypedValue(row, DataColumnId_, DataColumnName_, EValueType::String); + output->Write(dataValue.AsStringBuf()); + + TryFlushBuffer(false); + } + TryFlushBuffer(true); +} + +TUnversionedValue TBlobWriter::GetTypedValue( + TUnversionedRow row, + int columnId, + std::string_view columnName, + EValueType expectedType) const +{ + std::optional<TUnversionedValue> foundValue; + for (const auto& value : row) { + if (value.Id == columnId) { + foundValue = value; + } + } + + if (!foundValue) { + THROW_ERROR_EXCEPTION("Column %Qv not found", columnName); + } + + if (foundValue->Type != expectedType) { + THROW_ERROR_EXCEPTION("Column %Qv must be of type %Qlv but has type %Qlv", + columnName, + expectedType, + foundValue->Type); + } + + return *foundValue; +} + +void TBlobWriter::ValidatePartIndex(i64 currentPartIndex) const +{ + if (LastPartIndex_ && *LastPartIndex_ + 1 != currentPartIndex) { + THROW_ERROR_EXCEPTION("Values of column %Qv must be consecutive but values %v and %v violate this property", + PartIndexColumnName_, + *LastPartIndex_, + currentPartIndex); + } +} + +//////////////////////////////////////////////////////////////////////////////// + +ISchemalessFormatWriterPtr CreateSchemalessWriterForBlob( + TBlobFormatConfigPtr config, + TNameTablePtr nameTable, + IAsyncOutputStreamPtr output, + bool enableContextSaving, + TControlAttributesConfigPtr controlAttributesConfig, + /*keyColumnCount*/ int) +{ + if (controlAttributesConfig->EnableKeySwitch) { + THROW_ERROR_EXCEPTION("Key switches are not supported in blob format"); + } + + if (controlAttributesConfig->EnableRangeIndex) { + THROW_ERROR_EXCEPTION("Range indices are not supported in blob format"); + } + + if (controlAttributesConfig->EnableRowIndex) { + THROW_ERROR_EXCEPTION("Row indices are not supported in blob format"); + } + + if (controlAttributesConfig->EnableTableIndex) { + THROW_ERROR_EXCEPTION("Table indices are not supported in blob format"); + } + + if (controlAttributesConfig->EnableTabletIndex) { + THROW_ERROR_EXCEPTION("Tablet indices are not supported in blob format"); + } + + return New<TBlobWriter>(nameTable, output, enableContextSaving, controlAttributesConfig, config); +} + +ISchemalessFormatWriterPtr CreateSchemalessWriterForBlob( + const IAttributeDictionary& attributes, + TNameTablePtr nameTable, + IAsyncOutputStreamPtr output, + bool enableContextSaving, + TControlAttributesConfigPtr controlAttributesConfig, + int keyColumnCount) +{ + try { + auto config = ConvertTo<TBlobFormatConfigPtr>(&attributes); + return CreateSchemalessWriterForBlob( + config, + nameTable, + output, + enableContextSaving, + controlAttributesConfig, + keyColumnCount); + } catch (const std::exception& ex) { + THROW_ERROR_EXCEPTION(NFormats::EErrorCode::InvalidFormat, "Failed to parse config for blob format") << ex; + } +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NFormats diff --git a/yt/yt/library/formats/blob_writer.h b/yt/yt/library/formats/blob_writer.h new file mode 100644 index 00000000000..a88c1b2e758 --- /dev/null +++ b/yt/yt/library/formats/blob_writer.h @@ -0,0 +1,31 @@ +#pragma once + +#include "schemaless_writer_adapter.h" + +#include <yt/yt/client/formats/public.h> + +#include <yt/yt/core/concurrency/async_stream.h> + +namespace NYT::NFormats { + +//////////////////////////////////////////////////////////////////////////////// + +ISchemalessFormatWriterPtr CreateSchemalessWriterForBlob( + TBlobFormatConfigPtr config, + NTableClient::TNameTablePtr nameTable, + NConcurrency::IAsyncOutputStreamPtr output, + bool enableContextSaving, + TControlAttributesConfigPtr controlAttributesConfig, + int keyColumnCount); + +ISchemalessFormatWriterPtr CreateSchemalessWriterForBlob( + const NYTree::IAttributeDictionary& attributes, + NTableClient::TNameTablePtr nameTable, + NConcurrency::IAsyncOutputStreamPtr output, + bool enableContextSaving, + TControlAttributesConfigPtr controlAttributesConfig, + int keyColumnCount); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NFormats diff --git a/yt/yt/library/formats/format.cpp b/yt/yt/library/formats/format.cpp index 0edbc352616..6ced7de5b25 100644 --- a/yt/yt/library/formats/format.cpp +++ b/yt/yt/library/formats/format.cpp @@ -2,6 +2,7 @@ #include "arrow_parser.h" #include "arrow_writer.h" +#include "blob_writer.h" #include "dsv_parser.h" #include "dsv_writer.h" #include "protobuf_parser.h" @@ -357,6 +358,14 @@ ISchemalessFormatWriterPtr CreateStaticTableWriterForFormat( enableContextSaving, controlAttributesConfig, keyColumnCount); + case EFormatType::Blob: + return CreateSchemalessWriterForBlob( + format.Attributes(), + nameTable, + std::move(output), + enableContextSaving, + controlAttributesConfig, + keyColumnCount); default: auto adapter = New<TSchemalessWriterAdapter>( nameTable, diff --git a/yt/yt/library/formats/ya.make b/yt/yt/library/formats/ya.make index 0cd99715429..0deee781899 100644 --- a/yt/yt/library/formats/ya.make +++ b/yt/yt/library/formats/ya.make @@ -5,6 +5,7 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) SRCS( arrow_parser.cpp arrow_writer.cpp + blob_writer.cpp dsv_parser.cpp dsv_writer.cpp escape.cpp |
