summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/apps/ydbd/export.cpp15
-rw-r--r--ydb/apps/ydbd/export.h6
-rw-r--r--ydb/core/testlib/basics/appdata.h19
-rw-r--r--ydb/core/tx/datashard/backup_unit.cpp21
-rw-r--r--ydb/core/tx/datashard/export_iface.h13
-rw-r--r--ydb/core/tx/datashard/export_s3.h21
-rw-r--r--ydb/core/tx/datashard/export_s3_uploader.cpp13
7 files changed, 60 insertions, 48 deletions
diff --git a/ydb/apps/ydbd/export.cpp b/ydb/apps/ydbd/export.cpp
index 785b95f38e6..de0efb5a834 100644
--- a/ydb/apps/ydbd/export.cpp
+++ b/ydb/apps/ydbd/export.cpp
@@ -2,15 +2,22 @@
#include <ydb/core/tx/datashard/export_s3.h>
-NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToYt(bool useTypeV3) const {
- Y_UNUSED(useTypeV3);
+NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToYt(
+ const IExport::TTask& task, const IExport::TTableColumns& columns) const
+{
+ Y_UNUSED(task);
+ Y_UNUSED(columns);
return nullptr; // not supported
}
-NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToS3() const {
+NKikimr::NDataShard::IExport* TDataShardExportFactory::CreateExportToS3(
+ const IExport::TTask& task, const IExport::TTableColumns& columns) const
+{
#ifndef KIKIMR_DISABLE_S3_OPS
- return new NKikimr::NDataShard::TS3Export();
+ return new NKikimr::NDataShard::TS3Export(task, columns);
#else
+ Y_UNUSED(task);
+ Y_UNUSED(columns);
return nullptr;
#endif
}
diff --git a/ydb/apps/ydbd/export.h b/ydb/apps/ydbd/export.h
index c434a71112c..9d077f16aa5 100644
--- a/ydb/apps/ydbd/export.h
+++ b/ydb/apps/ydbd/export.h
@@ -3,8 +3,10 @@
#include <ydb/core/tx/datashard/export_iface.h>
class TDataShardExportFactory : public NKikimr::NDataShard::IExportFactory {
+ using IExport = NKikimr::NDataShard::IExport;
+
public:
- NKikimr::NDataShard::IExport *CreateExportToYt(bool useTypeV3) const override;
- NKikimr::NDataShard::IExport *CreateExportToS3() const override;
+ IExport* CreateExportToYt(const IExport::TTask& task, const IExport::TTableColumns& columns) const override;
+ IExport* CreateExportToS3(const IExport::TTask& task, const IExport::TTableColumns& columns) const override;
void Shutdown() override;
};
diff --git a/ydb/core/testlib/basics/appdata.h b/ydb/core/testlib/basics/appdata.h
index 5618db56f34..0db85b527b7 100644
--- a/ydb/core/testlib/basics/appdata.h
+++ b/ydb/core/testlib/basics/appdata.h
@@ -15,17 +15,26 @@ namespace NKikimr {
// FIXME
// Split this factory
- class TDataShardExportFactory : public NKikimr::NDataShard::IExportFactory {
+ class TDataShardExportFactory : public NDataShard::IExportFactory {
+ using IExport = NDataShard::IExport;
+
public:
- NKikimr::NDataShard::IExport* CreateExportToYt(bool useTypeV3) const override {
- Y_UNUSED(useTypeV3);
+ IExport* CreateExportToYt(
+ const IExport::TTask& task, const IExport::TTableColumns& columns) const override
+ {
+ Y_UNUSED(task);
+ Y_UNUSED(columns);
return nullptr;
}
- NKikimr::NDataShard::IExport* CreateExportToS3() const override {
+ IExport* CreateExportToS3(
+ const IExport::TTask& task, const IExport::TTableColumns& columns) const override
+ {
#ifndef KIKIMR_DISABLE_S3_OPS
- return new NKikimr::NDataShard::TS3Export();
+ return new NDataShard::TS3Export(task, columns);
#else
+ Y_UNUSED(task);
+ Y_UNUSED(columns);
return nullptr;
#endif
}
diff --git a/ydb/core/tx/datashard/backup_unit.cpp b/ydb/core/tx/datashard/backup_unit.cpp
index 002b3e74b2b..a7a0f8429be 100644
--- a/ydb/core/tx/datashard/backup_unit.cpp
+++ b/ydb/core/tx/datashard/backup_unit.cpp
@@ -41,21 +41,19 @@ protected:
Y_VERIFY(txc.DB.GetScheme().GetTableInfo(localTableId));
auto* appData = AppData(ctx);
-
+ const auto& columns = DataShard.GetUserTables().at(tableId)->Columns;
std::shared_ptr<::NKikimr::NDataShard::IExport> exp;
+
if (backup.HasYTSettings()) {
- auto* exportFactory = appData->DataShardExportFactory;
- if (exportFactory) {
- const auto& settings = backup.GetYTSettings();
- std::shared_ptr<IExport>(exportFactory->CreateExportToYt(settings.GetUseTypeV3())).swap(exp);
+ if (auto* exportFactory = appData->DataShardExportFactory) {
+ std::shared_ptr<IExport>(exportFactory->CreateExportToYt(backup, columns)).swap(exp);
} else {
Abort(op, ctx, "Exports to YT are disabled");
return false;
}
} else if (backup.HasS3Settings()) {
- auto* exportFactory = appData->DataShardExportFactory;
- if (exportFactory) {
- std::shared_ptr<IExport>(exportFactory->CreateExportToS3()).swap(exp);
+ if (auto* exportFactory = appData->DataShardExportFactory) {
+ std::shared_ptr<IExport>(exportFactory->CreateExportToS3(backup, columns)).swap(exp);
} else {
Abort(op, ctx, "Exports to S3 are disabled");
return false;
@@ -65,16 +63,15 @@ protected:
return false;
}
- const auto& columns = DataShard.GetUserTables().at(tableId)->Columns;
const auto& scanSettings = backup.GetScanSettings();
const ui64 rowsLimit = scanSettings.GetRowsBatchSize() ? scanSettings.GetRowsBatchSize() : Max<ui64>();
const ui64 bytesLimit = scanSettings.GetBytesBatchSize();
- auto createUploader = [self = DataShard.SelfId(), txId = op->GetTxId(), columns, backup, exp]() {
- return exp->CreateUploader(self, txId, columns, backup);
+ auto createUploader = [self = DataShard.SelfId(), txId = op->GetTxId(), exp]() {
+ return exp->CreateUploader(self, txId);
};
- THolder<IBuffer> buffer{exp->CreateBuffer(columns, rowsLimit, bytesLimit)};
+ THolder<IBuffer> buffer{exp->CreateBuffer(rowsLimit, bytesLimit)};
THolder<NTable::IScan> scan{CreateExportScan(std::move(buffer), createUploader)};
const auto& taskName = appData->DataShardConfig.GetBackupTaskName();
diff --git a/ydb/core/tx/datashard/export_iface.h b/ydb/core/tx/datashard/export_iface.h
index 2374a9ab8b6..5509974b6b0 100644
--- a/ydb/core/tx/datashard/export_iface.h
+++ b/ydb/core/tx/datashard/export_iface.h
@@ -15,13 +15,8 @@ public:
public:
virtual ~IExport() = default;
- virtual IActor* CreateUploader(
- const TActorId& dataShard,
- ui64 txId,
- const TTableColumns& columns,
- const TTask& task) const = 0;
-
- virtual IBuffer* CreateBuffer(const TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit) const = 0;
+ virtual IActor* CreateUploader(const TActorId& dataShard, ui64 txId) const = 0;
+ virtual IBuffer* CreateBuffer(ui64 rowsLimit, ui64 bytesLimit) const = 0;
virtual void Shutdown() const = 0;
};
@@ -30,8 +25,8 @@ class IExportFactory {
public:
virtual ~IExportFactory() = default;
- virtual IExport* CreateExportToYt(bool useTypeV3) const = 0;
- virtual IExport* CreateExportToS3() const = 0;
+ virtual IExport* CreateExportToYt(const IExport::TTask& task, const IExport::TTableColumns& columns) const = 0;
+ virtual IExport* CreateExportToS3(const IExport::TTask& task, const IExport::TTableColumns& columns) const = 0;
virtual void Shutdown() = 0;
};
diff --git a/ydb/core/tx/datashard/export_s3.h b/ydb/core/tx/datashard/export_s3.h
index 95cb9de7643..a922288d90f 100644
--- a/ydb/core/tx/datashard/export_s3.h
+++ b/ydb/core/tx/datashard/export_s3.h
@@ -11,17 +11,24 @@ namespace NDataShard {
class TS3Export: public IExport {
public:
- IActor* CreateUploader(
- const TActorId& dataShard,
- ui64 txId,
- const TTableColumns& columns,
- const TTask& task) const override;
+ explicit TS3Export(const TTask& task, const TTableColumns& columns)
+ : Task(task)
+ , Columns(columns)
+ {
+ Y_VERIFY(task.HasS3Settings());
+ }
+
+ IActor* CreateUploader(const TActorId& dataShard, ui64 txId) const override;
- IBuffer* CreateBuffer(const TTableColumns& columns, ui64 rowsLimit, ui64 bytesLimit) const override {
- return CreateS3ExportBuffer(columns, rowsLimit, bytesLimit);
+ IBuffer* CreateBuffer(ui64 rowsLimit, ui64 bytesLimit) const override {
+ return CreateS3ExportBuffer(Columns, rowsLimit, bytesLimit);
}
void Shutdown() const override {}
+
+protected:
+ const TTask Task;
+ const TTableColumns Columns;
};
} // NDataShard
diff --git a/ydb/core/tx/datashard/export_s3_uploader.cpp b/ydb/core/tx/datashard/export_s3_uploader.cpp
index db48213401d..351544de462 100644
--- a/ydb/core/tx/datashard/export_s3_uploader.cpp
+++ b/ydb/core/tx/datashard/export_s3_uploader.cpp
@@ -20,17 +20,12 @@ public:
}; // TS3Uploader
-IActor* TS3Export::CreateUploader(
- const TActorId& dataShard,
- ui64 txId,
- const TTableColumns& columns,
- const TTask& task) const
-{
- auto scheme = (task.GetShardNum() == 0)
- ? GenYdbScheme(columns, task.GetTable())
+IActor* TS3Export::CreateUploader(const TActorId& dataShard, ui64 txId) const {
+ auto scheme = (Task.GetShardNum() == 0)
+ ? GenYdbScheme(Columns, Task.GetTable())
: Nothing();
- return new TS3Uploader(dataShard, txId, task, std::move(scheme));
+ return new TS3Uploader(dataShard, txId, Task, std::move(scheme));
}
} // NDataShard