diff options
author | eshcherbin <eshcherbin@yandex-team.com> | 2025-03-07 12:16:45 +0300 |
---|---|---|
committer | eshcherbin <eshcherbin@yandex-team.com> | 2025-03-07 12:30:49 +0300 |
commit | 203633eb83d824fe720dde6186d8bff34defcb15 (patch) | |
tree | 4b0be398c7f724cc6d0e420d30a773f850fc641e | |
parent | 4abcd208e24fe7d9e49f50337fea7ef0a089c30c (diff) | |
download | ydb-203633eb83d824fe720dde6186d8bff34defcb15.tar.gz |
Archive TSchemalessBufferedDynamicTableWriter
commit_hash:97313fd34863bb185f42b7b6824ba6a92182f4c7
-rw-r--r-- | yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.cpp | 154 | ||||
-rw-r--r-- | yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.h | 21 | ||||
-rw-r--r-- | yt/yt/client/ya.make | 1 |
3 files changed, 0 insertions, 176 deletions
diff --git a/yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.cpp b/yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.cpp deleted file mode 100644 index db6f2bf8092..00000000000 --- a/yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.cpp +++ /dev/null @@ -1,154 +0,0 @@ -#include "schemaless_buffered_dynamic_table_writer.h" -#include "schemaless_dynamic_table_writer.h" -#include "unversioned_writer.h" -#include "config.h" - -#include <yt/yt/client/api/client.h> -#include <yt/yt/client/api/transaction.h> - -#include <yt/yt/core/misc/backoff_strategy.h> - -#include <yt/yt/core/concurrency/nonblocking_batcher.h> - -namespace NYT::NTableClient { - -using namespace NApi; -using namespace NConcurrency; -using namespace NTransactionClient; -using namespace NYPath; -using namespace NProfiling; -using namespace NCrypto; - -//////////////////////////////////////////////////////////////////////////////// - -// TODO(eshcherin): Make it reconfigurable. -class TSchemalessBufferedDynamicTableWriter - : public IUnversionedWriter -{ -public: - TSchemalessBufferedDynamicTableWriter( - TYPath path, - IClientPtr client, - TSchemalessBufferedDynamicTableWriterConfigPtr config, - IInvokerPtr invoker) - : Writer_(CreateSchemalessDynamicTableWriter(path, std::move(client))) - , Config_(std::move(config)) - , Batcher_(New<TNonblockingBatcher<TUnversionedRow>>( - TBatchSizeLimiter(Config_->MaxBatchSize), - Config_->FlushPeriod)) - , RetryBackoffStrategy_(Config_->RetryBackoff) - { - invoker->Invoke(BIND(&TSchemalessBufferedDynamicTableWriter::Loop, MakeWeak(this))); - } - - bool Write(TRange<TUnversionedRow> rows) override - { - for (auto row : rows) { - Batcher_->Enqueue(row); - } - - return true; - } - - TFuture<void> GetReadyEvent() override - { - return VoidFuture; - } - - TFuture<void> Close() override - { - Closed_ = true; - return ClosePromise_.ToFuture(); - } - - const TTableSchemaPtr& GetSchema() const override - { - return Writer_->GetSchema(); - } - - const TNameTablePtr& GetNameTable() const override - { - return Writer_->GetNameTable(); - } - - std::optional<TMD5Hash> GetDigest() const override - { - return std::nullopt; - } - -private: - const IUnversionedWriterPtr Writer_; - const TSchemalessBufferedDynamicTableWriterConfigPtr Config_; - const TNonblockingBatcherPtr<TUnversionedRow> Batcher_; - - TBackoffStrategy RetryBackoffStrategy_; - - const TPromise<void> ClosePromise_ = NewPromise<void>(); - std::atomic<bool> Closed_ = false; - - void Loop() - { - while (!Closed_) { - auto asyncBatch = Batcher_->DequeueBatch(); - auto batch = WaitForUnique(asyncBatch) - .ValueOrThrow(); - - if (batch.empty()) { - continue; - } - - WriteBatchWithRetries(batch); - } - - auto batches = Batcher_->Drain(); - for (const auto& batch : batches) { - WriteBatchWithRetries(batch); - } - - ClosePromise_.Set(); - } - - // TODO(eshcherbin): Add logging. - void WriteBatchWithRetries(const std::vector<TUnversionedRow>& batch) - { - RetryBackoffStrategy_.Restart(); - while (RetryBackoffStrategy_.GetInvocationCount() < RetryBackoffStrategy_.GetInvocationCount()) { - if (TryWriteBatch(batch)) { - break; - } - - TDelayedExecutor::WaitForDuration(RetryBackoffStrategy_.GetBackoff()); - RetryBackoffStrategy_.Next(); - } - } - - bool TryWriteBatch(const std::vector<TUnversionedRow>& batch) - { - try { - Y_UNUSED(Writer_->Write(batch)); - } catch (const std::exception& ex) { - return false; - } - - return true; - } -}; - -//////////////////////////////////////////////////////////////////////////////// - -IUnversionedWriterPtr CreateSchemalessBufferedDynamicTableWriter( - TYPath path, - IClientPtr client, - TSchemalessBufferedDynamicTableWriterConfigPtr config, - IInvokerPtr invoker) -{ - return New<TSchemalessBufferedDynamicTableWriter>( - std::move(path), - std::move(client), - std::move(config), - std::move(invoker)); -} - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NTableClient diff --git a/yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.h b/yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.h deleted file mode 100644 index 00b2a9eb985..00000000000 --- a/yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.h +++ /dev/null @@ -1,21 +0,0 @@ -#pragma once - -#include "public.h" - -#include <yt/yt/client/api/public.h> - -#include <yt/yt/core/ypath/public.h> - -namespace NYT::NTableClient { - -//////////////////////////////////////////////////////////////////////////////// - -IUnversionedWriterPtr CreateSchemalessBufferedDynamicTableWriter( - NYPath::TYPath path, - NApi::IClientPtr client, - TSchemalessBufferedDynamicTableWriterConfigPtr config, - IInvokerPtr invoker); - -//////////////////////////////////////////////////////////////////////////////// - -} // namespace NYT::NTableClient diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make index ddc34398676..a1ca7c303d6 100644 --- a/yt/yt/client/ya.make +++ b/yt/yt/client/ya.make @@ -131,7 +131,6 @@ SRCS( table_client/row_buffer.cpp table_client/schema.cpp table_client/schema_serialization_helpers.cpp - table_client/schemaless_buffered_dynamic_table_writer.cpp table_client/schemaless_dynamic_table_writer.cpp table_client/serialize.cpp table_client/table_upload_options.cpp |