aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoromgronny <omgronny@yandex-team.com>2024-05-27 18:28:42 +0300
committeromgronny <omgronny@yandex-team.com>2024-05-27 18:39:48 +0300
commit1bb371c641bfeaba2e219c8b9d80065b77f8993c (patch)
tree264d53a0f9722c00e4462f8d55a152566d1ca44d
parentc86c923f1db2a4d5e12f9407f23470ddece0ac06 (diff)
downloadydb-1bb371c641bfeaba2e219c8b9d80065b77f8993c.tar.gz
YT-21754: Introduce TSchemalessDynamicTableBufferedWriter
c7cb620b1c2302ddb2a65af8216a7f878629f60a
-rw-r--r--yt/yt/client/table_client/config.cpp12
-rw-r--r--yt/yt/client/table_client/config.h19
-rw-r--r--yt/yt/client/table_client/public.h2
-rw-r--r--yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.cpp196
-rw-r--r--yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.h17
-rw-r--r--yt/yt/client/table_client/schemaless_dynamic_table_writer.cpp100
-rw-r--r--yt/yt/client/table_client/schemaless_dynamic_table_writer.h17
-rw-r--r--yt/yt/client/ya.make2
8 files changed, 365 insertions, 0 deletions
diff --git a/yt/yt/client/table_client/config.cpp b/yt/yt/client/table_client/config.cpp
index d30ec5c8d2..e3197ed45d 100644
--- a/yt/yt/client/table_client/config.cpp
+++ b/yt/yt/client/table_client/config.cpp
@@ -529,4 +529,16 @@ void TVersionedRowDigestConfig::Register(TRegistrar registrar)
////////////////////////////////////////////////////////////////////////////////
+void TSchemalessBufferedDynamicTableWriterConfig::Register(TRegistrar registrar)
+{
+ registrar.Parameter("max_batch_size", &TThis::MaxBatchSize)
+ .Default(1000);
+ registrar.Parameter("flush_period", &TThis::FlushPeriod)
+ .Default(TDuration::Seconds(5));
+ registrar.Parameter("exponential_backoff_options", &TThis::ExponentialBackoffOptions)
+ .Default();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NTableClient
diff --git a/yt/yt/client/table_client/config.h b/yt/yt/client/table_client/config.h
index 4f6ebfbad1..bdc3427d15 100644
--- a/yt/yt/client/table_client/config.h
+++ b/yt/yt/client/table_client/config.h
@@ -8,6 +8,8 @@
#include <yt/yt/core/ytree/yson_struct.h>
+#include <yt/yt/core/misc/config.h>
+
#include <yt/yt/library/quantile_digest/public.h>
namespace NYT::NTableClient {
@@ -479,4 +481,21 @@ struct TRowBatchReadOptions
////////////////////////////////////////////////////////////////////////////////
+class TSchemalessBufferedDynamicTableWriterConfig
+ : public TTableWriterConfig
+{
+public:
+ i64 MaxBatchSize;
+ TDuration FlushPeriod;
+ TExponentialBackoffOptions ExponentialBackoffOptions;
+
+ REGISTER_YSON_STRUCT(TSchemalessBufferedDynamicTableWriterConfig);
+
+ static void Register(TRegistrar registrar);
+};
+
+DEFINE_REFCOUNTED_TYPE(TSchemalessBufferedDynamicTableWriterConfig)
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NTableClient
diff --git a/yt/yt/client/table_client/public.h b/yt/yt/client/table_client/public.h
index 62092a45cd..392cc45a18 100644
--- a/yt/yt/client/table_client/public.h
+++ b/yt/yt/client/table_client/public.h
@@ -370,6 +370,8 @@ DECLARE_REFCOUNTED_CLASS(TChunkWriterOptions)
DECLARE_REFCOUNTED_CLASS(TVersionedRowDigestConfig)
+DECLARE_REFCOUNTED_CLASS(TSchemalessBufferedDynamicTableWriterConfig)
+
class TSaveContext;
class TLoadContext;
using TPersistenceContext = TCustomPersistenceContext<TSaveContext, TLoadContext>;
diff --git a/yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.cpp b/yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.cpp
new file mode 100644
index 0000000000..c3ee5a2d55
--- /dev/null
+++ b/yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.cpp
@@ -0,0 +1,196 @@
+#include "schemaless_dynamic_table_buffered_writer.h"
+#include "config.h"
+
+#include <yt/yt/client/api/client.h>
+#include <yt/yt/client/api/transaction.h>
+
+#include <yt/yt/client/table_client/helpers.h>
+#include <yt/yt/client/table_client/name_table.h>
+#include <yt/yt/client/table_client/schema.h>
+#include <yt/yt/client/table_client/unversioned_writer.h>
+
+#include <yt/yt/client/table_client/schemaless_dynamic_table_writer.h>
+
+#include <yt/yt/core/concurrency/periodic_executor.h>
+
+#include <yt/yt/core/misc/backoff_strategy.h>
+
+#include <yt/yt/core/concurrency/nonblocking_batcher.h>
+
+#include <yt/yt/core/utilex/random.h>
+
+namespace NYT::NTableClient {
+
+using namespace NApi;
+using namespace NConcurrency;
+using namespace NTransactionClient;
+using namespace NYPath;
+using namespace NProfiling;
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSchemalessBufferedDynamicTableWriter
+ : public IUnversionedWriter
+{
+public:
+ TSchemalessBufferedDynamicTableWriter(
+ TYPath path,
+ IClientPtr client,
+ IInvokerPtr invoker,
+ TSchemalessBufferedDynamicTableWriterConfigPtr config)
+ : Path_(std::move(path))
+ , Client_(std::move(client))
+ , Config_(std::move(config))
+ , Batcher_(New<TNonblockingBatcher<TUnversionedRow>>(TBatchSizeLimiter(Config_->MaxBatchSize), Config_->FlushPeriod))
+ , SchemalessDynamicTableWriter_(CreateSchemalessDynamicTableWriter(Path_, Client_))
+ , WriteExecutor_(New<TPeriodicExecutor>(
+ invoker,
+ BIND(&TSchemalessBufferedDynamicTableWriter::DoWrite, MakeWeak(this))))
+ {
+ Logger.AddTag("Path: %v", Path_);
+
+ WriteExecutor_->Start();
+ }
+
+ void UpdateConfig(const TSchemalessBufferedDynamicTableWriterConfigPtr& newConfig)
+ {
+ Batcher_->UpdateSettings(
+ newConfig->FlushPeriod,
+ TBatchSizeLimiter(newConfig->MaxBatchSize),
+ /*allowEmptyBatches*/ false);
+
+ Config_ = std::move(newConfig);
+ }
+
+ bool Write(TRange<TUnversionedRow> rows) override
+ {
+ for (auto row : rows) {
+ Batcher_->Enqueue(std::move(row));
+ }
+ return false;
+ }
+
+ TFuture<void> GetReadyEvent() override
+ {
+ GetReadyPromise_ = NewPromise<void>();
+ return GetReadyPromise_.ToFuture();
+ }
+
+ TFuture<void> Close() override
+ {
+ auto guard = TGuard(PromiseLock_);
+ ClosePromise_ = NewPromise<void>();
+ return ClosePromise_.ToFuture();
+ }
+
+ const TTableSchemaPtr& GetSchema() const override
+ {
+ return Schema_;
+ }
+
+ const TNameTablePtr& GetNameTable() const override
+ {
+ return NameTable_;
+ }
+
+private:
+ const TYPath Path_;
+ const IClientPtr Client_;
+ TSchemalessBufferedDynamicTableWriterConfigPtr Config_;
+ TNonblockingBatcherPtr<TUnversionedRow> Batcher_;
+
+ IUnversionedWriterPtr SchemalessDynamicTableWriter_;
+
+ TPeriodicExecutorPtr WriteExecutor_;
+
+ NThreading::TSpinLock PromiseLock_;
+ TPromise<void> ClosePromise_;
+
+ TPromise<void> GetReadyPromise_ = NewPromise<void>();
+
+ std::atomic<int> PendingCount_ = 0;
+ std::atomic<int> DroppedCount_ = 0;
+ std::atomic<int> WriteFailuresCount_ = 0;
+
+ const TNameTablePtr NameTable_ = New<TNameTable>();
+ const TTableSchemaPtr Schema_ = New<TTableSchema>();
+
+ NLogging::TLogger Logger = NLogging::TLogger("SchemalessBufferedDynamicTableWriter");
+
+private:
+ void DoWrite()
+ {
+ auto batch = [&] {
+ auto guard = TGuard(PromiseLock_);
+ if (!ClosePromise_) {
+ auto asyncBatch = Batcher_->DequeueBatch();
+ return WaitForUnique(asyncBatch)
+ .ValueOrThrow();
+ }
+
+ ClosePromise_.Set();
+ auto batches = Batcher_->Drain();
+
+ std::vector<TUnversionedRow> result;
+ for (auto& batch : batches) {
+ result.insert(result.end(), batch.begin(), batch.end());
+ }
+ return result;
+ }();
+
+ if (batch.empty()) {
+ return;
+ }
+
+ WriteBatchWithExpBackoff(batch);
+
+ GetReadyPromise_.Set();
+ }
+
+ void WriteBatchWithExpBackoff(const std::vector<TUnversionedRow>& batch)
+ {
+ TBackoffStrategy backoffStrategy(Config_->ExponentialBackoffOptions);
+ while (true) {
+ if (TryHandleBatch(batch)) {
+ return;
+ }
+ YT_LOG_WARNING("Failed to upload rows (RetryDelay: %v, PendingRows: %v)",
+ backoffStrategy.GetBackoff().Seconds(),
+ GetPendingCount());
+ TDelayedExecutor::WaitForDuration(backoffStrategy.GetBackoff());
+ backoffStrategy.Next();
+ }
+ }
+
+ bool TryHandleBatch(const std::vector<TUnversionedRow>& batch)
+ {
+ YT_LOG_DEBUG("Table transaction starting (Items: %v, PendingItems: %v)",
+ batch.size(),
+ GetPendingCount());
+
+ if (!SchemalessDynamicTableWriter_->Write(batch)) {
+ return false;
+ }
+
+ YT_LOG_DEBUG("Table transaction committed (CommittedItems: %v)",
+ batch.size());
+
+ return true;
+ }
+
+ int GetPendingCount() const
+ {
+ return PendingCount_.load();
+ }
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IUnversionedWriterPtr CreateSchemalessBufferedDynamicTableWriter(TYPath path, IClientPtr client, IInvokerPtr invoker, TSchemalessBufferedDynamicTableWriterConfigPtr config)
+{
+ return New<TSchemalessBufferedDynamicTableWriter>(std::move(path), std::move(client), std::move(invoker), std::move(config));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NTableClient
diff --git a/yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.h b/yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.h
new file mode 100644
index 0000000000..987152c77b
--- /dev/null
+++ b/yt/yt/client/table_client/schemaless_dynamic_table_buffered_writer.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/client/api/public.h>
+
+#include <yt/yt/core/ypath/public.h>
+
+namespace NYT::NTableClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+IUnversionedWriterPtr CreateSchemalessBufferedDynamicTableWriter(NYPath::TYPath path, NApi::IClientPtr client);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NTableClient
diff --git a/yt/yt/client/table_client/schemaless_dynamic_table_writer.cpp b/yt/yt/client/table_client/schemaless_dynamic_table_writer.cpp
new file mode 100644
index 0000000000..9349c6e7fb
--- /dev/null
+++ b/yt/yt/client/table_client/schemaless_dynamic_table_writer.cpp
@@ -0,0 +1,100 @@
+#include "schemaless_dynamic_table_writer.h"
+
+#include <yt/yt/client/api/client.h>
+#include <yt/yt/client/api/transaction.h>
+
+#include <yt/yt/client/table_client/helpers.h>
+#include <yt/yt/client/table_client/name_table.h>
+#include <yt/yt/client/table_client/schema.h>
+#include <yt/yt/client/table_client/unversioned_writer.h>
+
+namespace NYT::NTableClient {
+
+using namespace NApi;
+using namespace NConcurrency;
+using namespace NTransactionClient;
+using namespace NYPath;
+
+////////////////////////////////////////////////////////////////////////////////
+
+static auto Logger = NLogging::TLogger("SchemalessDynamicTableWriter");
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSchemalessDynamicTableWriter
+ : public IUnversionedWriter
+{
+public:
+ TSchemalessDynamicTableWriter(TYPath path, IClientPtr client)
+ : Path_(std::move(path))
+ , Client_(std::move(client))
+ { }
+
+ TFuture<void> GetReadyEvent() override
+ {
+ // NB: this writer is synchronous and throws no errors. It just logs
+ // them instead. See TSchemalessDynamicTableWriter::Write().
+ return VoidFuture;
+ }
+
+ bool Write(TRange<TUnversionedRow> rows) override
+ {
+ try {
+ return DoWrite(rows);
+ } catch (const std::exception& ex) {
+ YT_LOG_WARNING(ex, "Could not write to event log");
+ return false;
+ }
+ }
+
+ bool DoWrite(TRange<TUnversionedRow> rows)
+ {
+ TUnversionedRowsBuilder builder;
+ for (auto row : rows) {
+ builder.AddRow(row);
+ }
+
+ auto sharedRows = builder.Build();
+
+ auto transaction = WaitFor(Client_->StartTransaction(ETransactionType::Tablet))
+ .ValueOrThrow();
+ transaction->WriteRows(Path_, NameTable_, sharedRows);
+ WaitFor(transaction->Commit())
+ .ThrowOnError();
+
+ return true;
+ }
+
+ TFuture<void> Close() override
+ {
+ return VoidFuture;
+ }
+
+ const TTableSchemaPtr& GetSchema() const override
+ {
+ return Schema_;
+ }
+
+ const TNameTablePtr& GetNameTable() const override
+ {
+ return NameTable_;
+ }
+
+private:
+ const TYPath Path_;
+ const IClientPtr Client_;
+
+ const TNameTablePtr NameTable_ = New<TNameTable>();
+ const TTableSchemaPtr Schema_ = New<TTableSchema>();
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+IUnversionedWriterPtr CreateSchemalessDynamicTableWriter(TYPath path, IClientPtr client)
+{
+ return New<TSchemalessDynamicTableWriter>(std::move(path), std::move(client));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NTableClient
diff --git a/yt/yt/client/table_client/schemaless_dynamic_table_writer.h b/yt/yt/client/table_client/schemaless_dynamic_table_writer.h
new file mode 100644
index 0000000000..571e73fc21
--- /dev/null
+++ b/yt/yt/client/table_client/schemaless_dynamic_table_writer.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include "public.h"
+
+#include <yt/yt/client/api/public.h>
+
+#include <yt/yt/core/ypath/public.h>
+
+namespace NYT::NTableClient {
+
+////////////////////////////////////////////////////////////////////////////////
+
+IUnversionedWriterPtr CreateSchemalessDynamicTableWriter(NYPath::TYPath path, NApi::IClientPtr client);
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NTableClient
diff --git a/yt/yt/client/ya.make b/yt/yt/client/ya.make
index b06f9fd98c..e51923f956 100644
--- a/yt/yt/client/ya.make
+++ b/yt/yt/client/ya.make
@@ -120,6 +120,8 @@ SRCS(
table_client/row_buffer.cpp
table_client/schema.cpp
table_client/schema_serialization_helpers.cpp
+ table_client/schemaless_dynamic_table_writer.cpp
+ table_client/schemaless_dynamic_table_buffered_writer.cpp
table_client/serialize.cpp
table_client/logical_type.cpp
table_client/name_table.cpp