aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoreshcherbin <eshcherbin@yandex-team.com>2025-03-07 12:16:45 +0300
committereshcherbin <eshcherbin@yandex-team.com>2025-03-07 12:30:49 +0300
commit203633eb83d824fe720dde6186d8bff34defcb15 (patch)
tree4b0be398c7f724cc6d0e420d30a773f850fc641e
parent4abcd208e24fe7d9e49f50337fea7ef0a089c30c (diff)
downloadydb-203633eb83d824fe720dde6186d8bff34defcb15.tar.gz
Archive TSchemalessBufferedDynamicTableWriter
commit_hash:97313fd34863bb185f42b7b6824ba6a92182f4c7
-rw-r--r--yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.cpp154
-rw-r--r--yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.h21
-rw-r--r--yt/yt/client/ya.make1
3 files changed, 0 insertions, 176 deletions
diff --git a/yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.cpp b/yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.cpp
deleted file mode 100644
index db6f2bf8092..00000000000
--- a/yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.cpp
+++ /dev/null
@@ -1,154 +0,0 @@
-#include "schemaless_buffered_dynamic_table_writer.h"
-#include "schemaless_dynamic_table_writer.h"
-#include "unversioned_writer.h"
-#include "config.h"
-
-#include <yt/yt/client/api/client.h>
-#include <yt/yt/client/api/transaction.h>
-
-#include <yt/yt/core/misc/backoff_strategy.h>
-
-#include <yt/yt/core/concurrency/nonblocking_batcher.h>
-
-namespace NYT::NTableClient {
-
-using namespace NApi;
-using namespace NConcurrency;
-using namespace NTransactionClient;
-using namespace NYPath;
-using namespace NProfiling;
-using namespace NCrypto;
-
-////////////////////////////////////////////////////////////////////////////////
-
-// TODO(eshcherin): Make it reconfigurable.
-class TSchemalessBufferedDynamicTableWriter
- : public IUnversionedWriter
-{
-public:
- TSchemalessBufferedDynamicTableWriter(
- TYPath path,
- IClientPtr client,
- TSchemalessBufferedDynamicTableWriterConfigPtr config,
- IInvokerPtr invoker)
- : Writer_(CreateSchemalessDynamicTableWriter(path, std::move(client)))
- , Config_(std::move(config))
- , Batcher_(New<TNonblockingBatcher<TUnversionedRow>>(
- TBatchSizeLimiter(Config_->MaxBatchSize),
- Config_->FlushPeriod))
- , RetryBackoffStrategy_(Config_->RetryBackoff)
- {
- invoker->Invoke(BIND(&TSchemalessBufferedDynamicTableWriter::Loop, MakeWeak(this)));
- }
-
- bool Write(TRange<TUnversionedRow> rows) override
- {
- for (auto row : rows) {
- Batcher_->Enqueue(row);
- }
-
- return true;
- }
-
- TFuture<void> GetReadyEvent() override
- {
- return VoidFuture;
- }
-
- TFuture<void> Close() override
- {
- Closed_ = true;
- return ClosePromise_.ToFuture();
- }
-
- const TTableSchemaPtr& GetSchema() const override
- {
- return Writer_->GetSchema();
- }
-
- const TNameTablePtr& GetNameTable() const override
- {
- return Writer_->GetNameTable();
- }
-
- std::optional<TMD5Hash> GetDigest() const override
- {
- return std::nullopt;
- }
-
-private:
- const IUnversionedWriterPtr Writer_;
- const TSchemalessBufferedDynamicTableWriterConfigPtr Config_;
- const TNonblockingBatcherPtr<TUnversionedRow> Batcher_;
-
- TBackoffStrategy RetryBackoffStrategy_;
-
- const TPromise<void> ClosePromise_ = NewPromise<void>();
- std::atomic<bool> Closed_ = false;
-
- void Loop()
- {
- while (!Closed_) {
- auto asyncBatch = Batcher_->DequeueBatch();
- auto batch = WaitForUnique(asyncBatch)
- .ValueOrThrow();
-
- if (batch.empty()) {
- continue;
- }
-
- WriteBatchWithRetries(batch);
- }
-
- auto batches = Batcher_->Drain();
- for (const auto& batch : batches) {
- WriteBatchWithRetries(batch);
- }
-
- ClosePromise_.Set();
- }
-
- // TODO(eshcherbin): Add logging.
- void WriteBatchWithRetries(const std::vector<TUnversionedRow>& batch)
- {
- RetryBackoffStrategy_.Restart();
- while (RetryBackoffStrategy_.GetInvocationCount() < RetryBackoffStrategy_.GetInvocationCount()) {
- if (TryWriteBatch(batch)) {
- break;
- }
-
- TDelayedExecutor::WaitForDuration(RetryBackoffStrategy_.GetBackoff());
- RetryBackoffStrategy_.Next();
- }
- }
-
- bool TryWriteBatch(const std::vector<TUnversionedRow>& batch)
- {
- try {
- Y_UNUSED(Writer_->Write(batch));
- } catch (const std::exception& ex) {
- return false;
- }
-
- return true;
- }
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-IUnversionedWriterPtr CreateSchemalessBufferedDynamicTableWriter(
- TYPath path,
- IClientPtr client,
- TSchemalessBufferedDynamicTableWriterConfigPtr config,
- IInvokerPtr invoker)
-{
- return New<TSchemalessBufferedDynamicTableWriter>(
- std::move(path),
- std::move(client),
- std::move(config),
- std::move(invoker));
-}
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NTableClient
diff --git a/yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.h b/yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.h
deleted file mode 100644
index 00b2a9eb985..00000000000
--- a/yt/yt/client/table_client/schemaless_buffered_dynamic_table_writer.h
+++ /dev/null
@@ -1,21 +0,0 @@
-#pragma once
-
-#include "public.h"
-
-#include <yt/yt/client/api/public.h>
-
-#include <yt/yt/core/ypath/public.h>
-
-namespace NYT::NTableClient {
-
-////////////////////////////////////////////////////////////////////////////////
-
-IUnversionedWriterPtr CreateSchemalessBufferedDynamicTableWriter(
- NYPath::TYPath path,
- NApi::IClientPtr client,
- TSchemalessBufferedDynamicTableWriterConfigPtr config,
- IInvokerPtr invoker);
-
-////////////////////////////////////////////////////////////////////////////////
-
-} // namespace NYT::NTableClient
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index ddc34398676..a1ca7c303d6 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -131,7 +131,6 @@ SRCS(
table_client/row_buffer.cpp
table_client/schema.cpp
table_client/schema_serialization_helpers.cpp
- table_client/schemaless_buffered_dynamic_table_writer.cpp
table_client/schemaless_dynamic_table_writer.cpp
table_client/serialize.cpp
table_client/table_upload_options.cpp