diff options
| -rw-r--r-- | ydb/apps/ydbd/export.cpp | 15 | ||||
| -rw-r--r-- | ydb/apps/ydbd/export.h | 6 | ||||
| -rw-r--r-- | ydb/core/testlib/basics/appdata.h | 19 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/backup_unit.cpp | 21 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/export_iface.h | 13 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/export_s3.h | 21 | ||||
| -rw-r--r-- | ydb/core/tx/datashard/export_s3_uploader.cpp | 13 |
7 files changed, 60 insertions, 48 deletions
diff --git a/ydb/apps/ydbd/export.cpp b/ydb/apps/ydbd/export.cpp index 785b95f38e6..de0efb5a834 100644 --- a/ydb/apps/ydbd/export.cpp +++ b/ydb/apps/ydbd/export.cpp @@ -2,15 +2,22 @@ #include <ydb/core/tx/datashard/export_s3.h> -NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToYt(bool useTypeV3) const { - Y_UNUSED(useTypeV3); +NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToYt( + const IExport::TTask& task, const IExport::TTableColumns& columns) const +{ + Y_UNUSED(task); + Y_UNUSED(columns); return nullptr; // not supported } -NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToS3() const { +NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToS3( + const IExport::TTask& task, const IExport::TTableColumns& columns) const +{ #ifndef KIKIMR_DISABLE_S3_OPS - return new NKikimr::NDataShard::TS3Export(); + return new NKikimr::NDataShard::TS3Export(task, columns); #else + Y_UNUSED(task); + Y_UNUSED(columns); return nullptr; #endif } diff --git a/ydb/apps/ydbd/export.h b/ydb/apps/ydbd/export.h index c434a71112c..9d077f16aa5 100644 --- a/ydb/apps/ydbd/export.h +++ b/ydb/apps/ydbd/export.h @@ -3,8 +3,10 @@ #include <ydb/core/tx/datashard/export_iface.h> class TDataShardExportFactory : public NKikimr::NDataShard::IExportFactory { + using IExport = NKikimr::NDataShard::IExport; + public: - NKikimr::NDataShard::IExport *CreateExportToYt(bool useTypeV3) const override; - NKikimr::NDataShard::IExport *CreateExportToS3() const override; + IExport* CreateExportToYt(const IExport::TTask& task, const IExport::TTableColumns& columns) const override; + IExport* CreateExportToS3(const IExport::TTask& task, const IExport::TTableColumns& columns) const override; void Shutdown() override; }; diff --git a/ydb/core/testlib/basics/appdata.h b/ydb/core/testlib/basics/appdata.h index 5618db56f34..0db85b527b7 100644 --- a/ydb/core/testlib/basics/appdata.h +++ b/ydb/core/testlib/basics/appdata.h @@ -15,17 +15,26 @@ namespace NKikimr { // FIXME // Split this factory - class TDataShardExportFactory : public NKikimr::NDataShard::IExportFactory { + class TDataShardExportFactory : public NDataShard::IExportFactory { + using IExport = NDataShard::IExport; + public: - NKikimr::NDataShard::IExport* CreateExportToYt(bool useTypeV3) const override { - Y_UNUSED(useTypeV3); + IExport* CreateExportToYt( + const IExport::TTask& task, const IExport::TTableColumns& columns) const override + { + Y_UNUSED(task); + Y_UNUSED(columns); return nullptr; } - NKikimr::NDataShard::IExport* CreateExportToS3() const override { + IExport* CreateExportToS3( + const IExport::TTask& task, const IExport::TTableColumns& columns) const override + { #ifndef KIKIMR_DISABLE_S3_OPS - return new NKikimr::NDataShard::TS3Export(); + return new NDataShard::TS3Export(task, columns); #else + Y_UNUSED(task); + Y_UNUSED(columns); return nullptr; #endif } diff --git a/ydb/core/tx/datashard/backup_unit.cpp b/ydb/core/tx/datashard/backup_unit.cpp index 002b3e74b2b..a7a0f8429be 100644 --- a/ydb/core/tx/datashard/backup_unit.cpp +++ b/ydb/core/tx/datashard/backup_unit.cpp @@ -41,21 +41,19 @@ protected: Y_VERIFY(txc.DB.GetScheme().GetTableInfo(localTableId)); auto* appData = AppData(ctx); - + const auto& columns = DataShard.GetUserTables().at(tableId)->Columns; std::shared_ptr<::NKikimr::NDataShard::IExport> exp; + if (backup.HasYTSettings()) { - auto* exportFactory = appData->DataShardExportFactory; - if (exportFactory) { - const auto& settings = backup.GetYTSettings(); - std::shared_ptr<IExport>(exportFactory->CreateExportToYt(settings.GetUseTypeV3())).swap(exp); + if (auto* exportFactory = appData->DataShardExportFactory) { + std::shared_ptr<IExport>(exportFactory->CreateExportToYt(backup, columns)).swap(exp); } else { Abort(op, ctx, "Exports to YT are disabled"); return false; } } else if (backup.HasS3Settings()) { - auto* exportFactory = appData->DataShardExportFactory; - if (exportFactory) { - std::shared_ptr<IExport>(exportFactory->CreateExportToS3()).swap(exp); + if (auto* exportFactory = appData->DataShardExportFactory) { + std::shared_ptr<IExport>(exportFactory->CreateExportToS3(backup, columns)).swap(exp); } else { Abort(op, ctx, "Exports to S3 are disabled"); return false; @@ -65,16 +63,15 @@ protected: return false; } - const auto& columns = DataShard.GetUserTables().at(tableId)->Columns; const auto& scanSettings = backup.GetScanSettings(); const ui64 rowsLimit = scanSettings.GetRowsBatchSize() ? scanSettings.GetRowsBatchSize() : Max<ui64>(); const ui64 bytesLimit = scanSettings.GetBytesBatchSize(); - auto createUploader = [self = DataShard.SelfId(), txId = op->GetTxId(), columns, backup, exp]() { - return exp->CreateUploader(self, txId, columns, backup); + auto createUploader = [self = DataShard.SelfId(), txId = op->GetTxId(), exp]() { + return exp->CreateUploader(self, txId); }; - THolder<IBuffer> buffer{exp->CreateBuffer(columns, rowsLimit, bytesLimit)}; + THolder<IBuffer> buffer{exp->CreateBuffer(rowsLimit, bytesLimit)}; THolder<NTable::IScan> scan{CreateExportScan(std::move(buffer), createUploader)}; const auto& taskName = appData->DataShardConfig.GetBackupTaskName(); diff --git a/ydb/core/tx/datashard/export_iface.h b/ydb/core/tx/datashard/export_iface.h index 2374a9ab8b6..5509974b6b0 100644 --- a/ydb/core/tx/datashard/export_iface.h +++ b/ydb/core/tx/datashard/export_iface.h @@ -15,13 +15,8 @@ public: public: virtual ~IExport() = default; - virtual IActor* CreateUploader( - const TActorId& dataShard, - ui64 txId, - const TTableColumns& columns, - const TTask& task) const = 0; - - virtual IBuffer* CreateBuffer(const TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit) const = 0; + virtual IActor* CreateUploader(const TActorId& dataShard, ui64 txId) const = 0; + virtual IBuffer* CreateBuffer(ui64 rowsLimit, ui64 bytesLimit) const = 0; virtual void Shutdown() const = 0; }; @@ -30,8 +25,8 @@ class IExportFactory { public: virtual ~IExportFactory() = default; - virtual IExport* CreateExportToYt(bool useTypeV3) const = 0; - virtual IExport* CreateExportToS3() const = 0; + virtual IExport* CreateExportToYt(const IExport::TTask& task, const IExport::TTableColumns& columns) const = 0; + virtual IExport* CreateExportToS3(const IExport::TTask& task, const IExport::TTableColumns& columns) const = 0; virtual void Shutdown() = 0; }; diff --git a/ydb/core/tx/datashard/export_s3.h b/ydb/core/tx/datashard/export_s3.h index 95cb9de7643..a922288d90f 100644 --- a/ydb/core/tx/datashard/export_s3.h +++ b/ydb/core/tx/datashard/export_s3.h @@ -11,17 +11,24 @@ namespace NDataShard { class TS3Export: public IExport { public: - IActor* CreateUploader( - const TActorId& dataShard, - ui64 txId, - const TTableColumns& columns, - const TTask& task) const override; + explicit TS3Export(const TTask& task, const TTableColumns& columns) + : Task(task) + , Columns(columns) + { + Y_VERIFY(task.HasS3Settings()); + } + + IActor* CreateUploader(const TActorId& dataShard, ui64 txId) const override; - IBuffer* CreateBuffer(const TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit) const override { - return CreateS3ExportBuffer(columns, rowsLimit, bytesLimit); + IBuffer* CreateBuffer(ui64 rowsLimit, ui64 bytesLimit) const override { + return CreateS3ExportBuffer(Columns, rowsLimit, bytesLimit); } void Shutdown() const override {} + +protected: + const TTask Task; + const TTableColumns Columns; }; } // NDataShard diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp index db48213401d..351544de462 100644 --- a/ydb/core/tx/datashard/export_s3_uploader.cpp +++ b/ydb/core/tx/datashard/export_s3_uploader.cpp @@ -20,17 +20,12 @@ public: }; // TS3Uploader -IActor* TS3Export::CreateUploader( - const TActorId& dataShard, - ui64 txId, - const TTableColumns& columns, - const TTask& task) const -{ - auto scheme = (task.GetShardNum() == 0) - ? GenYdbScheme(columns, task.GetTable()) +IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const { + auto scheme = (Task.GetShardNum() == 0) + ? GenYdbScheme(Columns, Task.GetTable()) : Nothing(); - return new TS3Uploader(dataShard, txId, task, std::move(scheme)); + return new TS3Uploader(dataShard, txId, Task, std::move(scheme)); } } // NDataShard |
