diff options
author | omgronny <omgronny@yandex-team.com> | 2024-05-27 18:28:42 +0300 |
---|---|---|
committer | omgronny <omgronny@yandex-team.com> | 2024-05-27 18:39:48 +0300 |
commit | 1bb371c641bfeaba2e219c8b9d80065b77f8993c (patch) | |
tree | 264d53a0f9722c00e4462f8d55a152566d1ca44d | |
parent | c86c923f1db2a4d5e12f9407f23470ddece0ac06 (diff) | |
download | ydb-1bb371c641bfeaba2e219c8b9d80065b77f8993c.tar.gz |
YT-21754: Introduce TSchemalessDynamicTableBufferedWriter
c7cb620b1c2302ddb2a65af8216a7f878629f60a
-rw-r--r-- | yt/yt/client/table_client/config.cpp | 12 | ||||
-rw-r--r-- | yt/yt/client/table_client/config.h | 19 | ||||
-rw-r--r-- | yt/yt/client/table_client/public.h | 2 | ||||
-rw-r--r-- | yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.cpp | 196 | ||||
-rw-r--r-- | yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.h | 17 | ||||
-rw-r--r-- | yt/yt/client/table_client/schemaless_dynamic_table_writer.cpp | 100 | ||||
-rw-r--r-- | yt/yt/client/table_client/schemaless_dynamic_table_writer.h | 17 | ||||
-rw-r--r-- | yt/yt/client/ya.make | 2 |
8 files changed, 365 insertions, 0 deletions
diff --git a/yt/yt/client/table_client/config.cpp b/yt/yt/client/table_client/config.cpp index d30ec5c8d2..e3197ed45d 100644 --- a/yt/yt/client/table_client/config.cpp +++ b/yt/yt/client/table_client/config.cpp @@ -529,4 +529,16 @@ void TVersionedRowDigestConfig::Register(TRegistrar registrar) //////////////////////////////////////////////////////////////////////////////// +void TSchemalessBufferedDynamicTableWriterConfig::Register(TRegistrar registrar) +{ + registrar.Parameter("max_batch_size", &TThis::MaxBatchSize) + .Default(1000); + registrar.Parameter("flush_period", &TThis::FlushPeriod) + .Default(TDuration::Seconds(5)); + registrar.Parameter("exponential_backoff_options", &TThis::ExponentialBackoffOptions) + .Default(); +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NTableClient diff --git a/yt/yt/client/table_client/config.h b/yt/yt/client/table_client/config.h index 4f6ebfbad1..bdc3427d15 100644 --- a/yt/yt/client/table_client/config.h +++ b/yt/yt/client/table_client/config.h @@ -8,6 +8,8 @@ #include <yt/yt/core/ytree/yson_struct.h> +#include <yt/yt/core/misc/config.h> + #include <yt/yt/library/quantile_digest/public.h> namespace NYT::NTableClient { @@ -479,4 +481,21 @@ struct TRowBatchReadOptions //////////////////////////////////////////////////////////////////////////////// +class TSchemalessBufferedDynamicTableWriterConfig + : public TTableWriterConfig +{ +public: + i64 MaxBatchSize; + TDuration FlushPeriod; + TExponentialBackoffOptions ExponentialBackoffOptions; + + REGISTER_YSON_STRUCT(TSchemalessBufferedDynamicTableWriterConfig); + + static void Register(TRegistrar registrar); +}; + +DEFINE_REFCOUNTED_TYPE(TSchemalessBufferedDynamicTableWriterConfig) + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NTableClient diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h index 62092a45cd..392cc45a18 100644 --- a/yt/yt/client/table_client/public.h +++ b/yt/yt/client/table_client/public.h @@ -370,6 +370,8 @@ DECLARE_REFCOUNTED_CLASS(TChunkWriterOptions) DECLARE_REFCOUNTED_CLASS(TVersionedRowDigestConfig) +DECLARE_REFCOUNTED_CLASS(TSchemalessBufferedDynamicTableWriterConfig) + class TSaveContext; class TLoadContext; using TPersistenceContext = TCustomPersistenceContext<TSaveContext, TLoadContext>; diff --git a/yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.cpp b/yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.cpp new file mode 100644 index 0000000000..c3ee5a2d55 --- /dev/null +++ b/yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.cpp @@ -0,0 +1,196 @@ +#include "schemaless_dynamic_table_buffered_writer.h" +#include "config.h" + +#include <yt/yt/client/api/client.h> +#include <yt/yt/client/api/transaction.h> + +#include <yt/yt/client/table_client/helpers.h> +#include <yt/yt/client/table_client/name_table.h> +#include <yt/yt/client/table_client/schema.h> +#include <yt/yt/client/table_client/unversioned_writer.h> + +#include <yt/yt/client/table_client/schemaless_dynamic_table_writer.h> + +#include <yt/yt/core/concurrency/periodic_executor.h> + +#include <yt/yt/core/misc/backoff_strategy.h> + +#include <yt/yt/core/concurrency/nonblocking_batcher.h> + +#include <yt/yt/core/utilex/random.h> + +namespace NYT::NTableClient { + +using namespace NApi; +using namespace NConcurrency; +using namespace NTransactionClient; +using namespace NYPath; +using namespace NProfiling; + +//////////////////////////////////////////////////////////////////////////////// + +class TSchemalessBufferedDynamicTableWriter + : public IUnversionedWriter +{ +public: + TSchemalessBufferedDynamicTableWriter( + TYPath path, + IClientPtr client, + IInvokerPtr invoker, + TSchemalessBufferedDynamicTableWriterConfigPtr config) + : Path_(std::move(path)) + , Client_(std::move(client)) + , Config_(std::move(config)) + , Batcher_(New<TNonblockingBatcher<TUnversionedRow>>(TBatchSizeLimiter(Config_->MaxBatchSize), Config_->FlushPeriod)) + , SchemalessDynamicTableWriter_(CreateSchemalessDynamicTableWriter(Path_, Client_)) + , WriteExecutor_(New<TPeriodicExecutor>( + invoker, + BIND(&TSchemalessBufferedDynamicTableWriter::DoWrite, MakeWeak(this)))) + { + Logger.AddTag("Path: %v", Path_); + + WriteExecutor_->Start(); + } + + void UpdateConfig(const TSchemalessBufferedDynamicTableWriterConfigPtr& newConfig) + { + Batcher_->UpdateSettings( + newConfig->FlushPeriod, + TBatchSizeLimiter(newConfig->MaxBatchSize), + /*allowEmptyBatches*/ false); + + Config_ = std::move(newConfig); + } + + bool Write(TRange<TUnversionedRow> rows) override + { + for (auto row : rows) { + Batcher_->Enqueue(std::move(row)); + } + return false; + } + + TFuture<void> GetReadyEvent() override + { + GetReadyPromise_ = NewPromise<void>(); + return GetReadyPromise_.ToFuture(); + } + + TFuture<void> Close() override + { + auto guard = TGuard(PromiseLock_); + ClosePromise_ = NewPromise<void>(); + return ClosePromise_.ToFuture(); + } + + const TTableSchemaPtr& GetSchema() const override + { + return Schema_; + } + + const TNameTablePtr& GetNameTable() const override + { + return NameTable_; + } + +private: + const TYPath Path_; + const IClientPtr Client_; + TSchemalessBufferedDynamicTableWriterConfigPtr Config_; + TNonblockingBatcherPtr<TUnversionedRow> Batcher_; + + IUnversionedWriterPtr SchemalessDynamicTableWriter_; + + TPeriodicExecutorPtr WriteExecutor_; + + NThreading::TSpinLock PromiseLock_; + TPromise<void> ClosePromise_; + + TPromise<void> GetReadyPromise_ = NewPromise<void>(); + + std::atomic<int> PendingCount_ = 0; + std::atomic<int> DroppedCount_ = 0; + std::atomic<int> WriteFailuresCount_ = 0; + + const TNameTablePtr NameTable_ = New<TNameTable>(); + const TTableSchemaPtr Schema_ = New<TTableSchema>(); + + NLogging::TLogger Logger = NLogging::TLogger("SchemalessBufferedDynamicTableWriter"); + +private: + void DoWrite() + { + auto batch = [&] { + auto guard = TGuard(PromiseLock_); + if (!ClosePromise_) { + auto asyncBatch = Batcher_->DequeueBatch(); + return WaitForUnique(asyncBatch) + .ValueOrThrow(); + } + + ClosePromise_.Set(); + auto batches = Batcher_->Drain(); + + std::vector<TUnversionedRow> result; + for (auto& batch : batches) { + result.insert(result.end(), batch.begin(), batch.end()); + } + return result; + }(); + + if (batch.empty()) { + return; + } + + WriteBatchWithExpBackoff(batch); + + GetReadyPromise_.Set(); + } + + void WriteBatchWithExpBackoff(const std::vector<TUnversionedRow>& batch) + { + TBackoffStrategy backoffStrategy(Config_->ExponentialBackoffOptions); + while (true) { + if (TryHandleBatch(batch)) { + return; + } + YT_LOG_WARNING("Failed to upload rows (RetryDelay: %v, PendingRows: %v)", + backoffStrategy.GetBackoff().Seconds(), + GetPendingCount()); + TDelayedExecutor::WaitForDuration(backoffStrategy.GetBackoff()); + backoffStrategy.Next(); + } + } + + bool TryHandleBatch(const std::vector<TUnversionedRow>& batch) + { + YT_LOG_DEBUG("Table transaction starting (Items: %v, PendingItems: %v)", + batch.size(), + GetPendingCount()); + + if (!SchemalessDynamicTableWriter_->Write(batch)) { + return false; + } + + YT_LOG_DEBUG("Table transaction committed (CommittedItems: %v)", + batch.size()); + + return true; + } + + int GetPendingCount() const + { + return PendingCount_.load(); + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +IUnversionedWriterPtr CreateSchemalessBufferedDynamicTableWriter(TYPath path, IClientPtr client, IInvokerPtr invoker, TSchemalessBufferedDynamicTableWriterConfigPtr config) +{ + return New<TSchemalessBufferedDynamicTableWriter>(std::move(path), std::move(client), std::move(invoker), std::move(config)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NTableClient diff --git a/yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.h b/yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.h new file mode 100644 index 0000000000..987152c77b --- /dev/null +++ b/yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.h @@ -0,0 +1,17 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/client/api/public.h> + +#include <yt/yt/core/ypath/public.h> + +namespace NYT::NTableClient { + +//////////////////////////////////////////////////////////////////////////////// + +IUnversionedWriterPtr CreateSchemalessBufferedDynamicTableWriter(NYPath::TYPath path, NApi::IClientPtr client); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NTableClient diff --git a/yt/yt/client/table_client/schemaless_dynamic_table_writer.cpp b/yt/yt/client/table_client/schemaless_dynamic_table_writer.cpp new file mode 100644 index 0000000000..9349c6e7fb --- /dev/null +++ b/yt/yt/client/table_client/schemaless_dynamic_table_writer.cpp @@ -0,0 +1,100 @@ +#include "schemaless_dynamic_table_writer.h" + +#include <yt/yt/client/api/client.h> +#include <yt/yt/client/api/transaction.h> + +#include <yt/yt/client/table_client/helpers.h> +#include <yt/yt/client/table_client/name_table.h> +#include <yt/yt/client/table_client/schema.h> +#include <yt/yt/client/table_client/unversioned_writer.h> + +namespace NYT::NTableClient { + +using namespace NApi; +using namespace NConcurrency; +using namespace NTransactionClient; +using namespace NYPath; + +//////////////////////////////////////////////////////////////////////////////// + +static auto Logger = NLogging::TLogger("SchemalessDynamicTableWriter"); + +//////////////////////////////////////////////////////////////////////////////// + +class TSchemalessDynamicTableWriter + : public IUnversionedWriter +{ +public: + TSchemalessDynamicTableWriter(TYPath path, IClientPtr client) + : Path_(std::move(path)) + , Client_(std::move(client)) + { } + + TFuture<void> GetReadyEvent() override + { + // NB: this writer is synchronous and throws no errors. It just logs + // them instead. See TSchemalessDynamicTableWriter::Write(). + return VoidFuture; + } + + bool Write(TRange<TUnversionedRow> rows) override + { + try { + return DoWrite(rows); + } catch (const std::exception& ex) { + YT_LOG_WARNING(ex, "Could not write to event log"); + return false; + } + } + + bool DoWrite(TRange<TUnversionedRow> rows) + { + TUnversionedRowsBuilder builder; + for (auto row : rows) { + builder.AddRow(row); + } + + auto sharedRows = builder.Build(); + + auto transaction = WaitFor(Client_->StartTransaction(ETransactionType::Tablet)) + .ValueOrThrow(); + transaction->WriteRows(Path_, NameTable_, sharedRows); + WaitFor(transaction->Commit()) + .ThrowOnError(); + + return true; + } + + TFuture<void> Close() override + { + return VoidFuture; + } + + const TTableSchemaPtr& GetSchema() const override + { + return Schema_; + } + + const TNameTablePtr& GetNameTable() const override + { + return NameTable_; + } + +private: + const TYPath Path_; + const IClientPtr Client_; + + const TNameTablePtr NameTable_ = New<TNameTable>(); + const TTableSchemaPtr Schema_ = New<TTableSchema>(); +}; + +//////////////////////////////////////////////////////////////////////////////// + +IUnversionedWriterPtr CreateSchemalessDynamicTableWriter(TYPath path, IClientPtr client) +{ + return New<TSchemalessDynamicTableWriter>(std::move(path), std::move(client)); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NTableClient diff --git a/yt/yt/client/table_client/schemaless_dynamic_table_writer.h b/yt/yt/client/table_client/schemaless_dynamic_table_writer.h new file mode 100644 index 0000000000..571e73fc21 --- /dev/null +++ b/yt/yt/client/table_client/schemaless_dynamic_table_writer.h @@ -0,0 +1,17 @@ +#pragma once + +#include "public.h" + +#include <yt/yt/client/api/public.h> + +#include <yt/yt/core/ypath/public.h> + +namespace NYT::NTableClient { + +//////////////////////////////////////////////////////////////////////////////// + +IUnversionedWriterPtr CreateSchemalessDynamicTableWriter(NYPath::TYPath path, NApi::IClientPtr client); + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NTableClient diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index b06f9fd98c..e51923f956 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -120,6 +120,8 @@ SRCS( table_client/row_buffer.cpp table_client/schema.cpp table_client/schema_serialization_helpers.cpp + table_client/schemaless_dynamic_table_writer.cpp + table_client/schemaless_dynamic_table_buffered_writer.cpp table_client/serialize.cpp table_client/logical_type.cpp table_client/name_table.cpp |