summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIlia Shakhov <[email protected]>2024-12-02 14:10:06 +0300
committerGitHub <[email protected]>2024-12-02 14:10:06 +0300
commitcdcd6b8d553f101b8d7d548eb82b60ad1182a736 (patch)
tree8b5af9ace6ee81418ceadd5a5d375cd7a63c6e39
parentd3a36a5d1c2f49d3328864257652444163453c54 (diff)
Enhanced parallelism of data restoring in `ydb tools restore` (#12203)
-rw-r--r--ydb/apps/ydb/CHANGELOG.md2
-rw-r--r--ydb/public/lib/ydb_cli/dump/restore_compat.cpp30
-rw-r--r--ydb/public/lib/ydb_cli/dump/restore_compat.h2
-rw-r--r--ydb/public/lib/ydb_cli/dump/restore_impl.cpp199
-rw-r--r--ydb/public/lib/ydb_cli/dump/restore_impl.h17
-rw-r--r--ydb/public/lib/ydb_cli/dump/restore_import_data.cpp59
-rw-r--r--ydb/public/lib/ydb_cli/dump/restore_import_data.h2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_import/out.cpp5
-rw-r--r--ydb/public/sdk/cpp/client/ydb_import/ya.make1
9 files changed, 227 insertions, 90 deletions
diff --git a/ydb/apps/ydb/CHANGELOG.md b/ydb/apps/ydb/CHANGELOG.md
index c8568d03354..b7f17f942c1 100644
--- a/ydb/apps/ydb/CHANGELOG.md
+++ b/ydb/apps/ydb/CHANGELOG.md
@@ -1,4 +1,6 @@
+* Enhanced parallelism of data restoring in `ydb tools restore`
+
## 2.16.0 ##
* Improved throughput of `ydb import file csv` command. It is now approximately x3 times faster
diff --git a/ydb/public/lib/ydb_cli/dump/restore_compat.cpp b/ydb/public/lib/ydb_cli/dump/restore_compat.cpp
index 6376406adeb..f53295792b1 100644
--- a/ydb/public/lib/ydb_cli/dump/restore_compat.cpp
+++ b/ydb/public/lib/ydb_cli/dump/restore_compat.cpp
@@ -95,7 +95,10 @@ public:
Bytes = 0;
RequestUnitsX2 = 0;
- return {}; // Writer gets data directly from accumulator
+ // Writer gets data directly from accumulator
+ NPrivate::TBatch batch;
+ batch.SetOriginAccumulator(this);
+ return batch;
}
private:
@@ -115,14 +118,14 @@ public:
explicit TDataWriter(
const TString& path,
TTableClient& tableClient,
- NPrivate::IDataAccumulator* accumulator,
+ const NPrivate::IDataAccumulator* accumulator,
const TRestoreSettings& settings)
: Path(path)
, TableClient(tableClient)
- , Accumulator(dynamic_cast<TDataAccumulator*>(accumulator))
, UseBulkUpsert(settings.Mode_ == TRestoreSettings::EMode::BulkUpsert)
- {
- Y_ENSURE(Accumulator);
+ {
+ const auto* dataAccumulator = dynamic_cast<const TDataAccumulator*>(accumulator);
+ Y_ENSURE(dataAccumulator);
TUploader::TOptions opts;
opts.InFly = settings.InFly_;
@@ -130,20 +133,22 @@ public:
opts.Interval = settings.RateLimiterSettings_.Interval_;
opts.ReactionTime = settings.RateLimiterSettings_.ReactionTime_;
- Uploader = MakeHolder<TUploader>(opts, TableClient, Accumulator->GetQueryString());
+ Uploader = MakeHolder<TUploader>(opts, TableClient, dataAccumulator->GetQueryString());
}
- bool Push(NPrivate::TBatch&&) override {
- bool ok;
+ bool Push(NPrivate::TBatch&& batch) override {
+ auto* accumulator = dynamic_cast<TDataAccumulator*>(batch.GetOriginAccumulator());
+ Y_ENSURE(accumulator);
+ bool ok;
if (UseBulkUpsert) {
- ok = Uploader->Push(Path, Accumulator->EndAndGetResultingValue());
+ ok = Uploader->Push(Path, accumulator->EndAndGetResultingValue());
} else {
- ok = Uploader->Push(Accumulator->EndAndGetResultingParams());
+ ok = Uploader->Push(accumulator->EndAndGetResultingParams());
}
if (ok) {
- Accumulator->Begin();
+ accumulator->Begin();
}
return ok;
@@ -156,7 +161,6 @@ public:
private:
const TString Path;
TTableClient& TableClient;
- TDataAccumulator* Accumulator;
const bool UseBulkUpsert;
THolder<TUploader> Uploader;
@@ -174,7 +178,7 @@ NPrivate::IDataAccumulator* CreateCompatAccumulator(
NPrivate::IDataWriter* CreateCompatWriter(
const TString& path,
TTableClient& tableClient,
- NPrivate::IDataAccumulator* accumulator,
+ const NPrivate::IDataAccumulator* accumulator,
const TRestoreSettings& settings) {
return new TDataWriter(path, tableClient, accumulator, settings);
}
diff --git a/ydb/public/lib/ydb_cli/dump/restore_compat.h b/ydb/public/lib/ydb_cli/dump/restore_compat.h
index c8a20d468d7..0853f4547cf 100644
--- a/ydb/public/lib/ydb_cli/dump/restore_compat.h
+++ b/ydb/public/lib/ydb_cli/dump/restore_compat.h
@@ -13,7 +13,7 @@ NPrivate::IDataAccumulator* CreateCompatAccumulator(
NPrivate::IDataWriter* CreateCompatWriter(
const TString& path,
NTable::TTableClient& tableClient,
- NPrivate::IDataAccumulator* accumulator,
+ const NPrivate::IDataAccumulator* accumulator,
const TRestoreSettings& settings);
} // NDump
diff --git a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp
index fb41ef19aa5..82d8e306a97 100644
--- a/ydb/public/lib/ydb_cli/dump/restore_impl.cpp
+++ b/ydb/public/lib/ydb_cli/dump/restore_impl.cpp
@@ -80,6 +80,24 @@ bool IsOperationStarted(TStatus operationStatus) {
return operationStatus.IsSuccess() || operationStatus.GetStatus() == EStatus::STATUS_UNDEFINED;
}
+ui32 CountDataFiles(const TFsPath& fsPath) {
+ ui32 dataFileId = 0;
+ TFsPath dataFile = fsPath.Child(DataFileName(dataFileId));
+ while (dataFile.Exists()) {
+ dataFile = fsPath.Child(DataFileName(++dataFileId));
+ }
+ return dataFileId;
+}
+
+TRestoreResult CombineResults(const TVector<TRestoreResult>& results) {
+ for (auto result : results) {
+ if (!result.IsSuccess()) {
+ return result;
+ }
+ }
+ return Result<TRestoreResult>();
+}
+
} // anonymous
namespace NPrivate {
@@ -392,31 +410,39 @@ TRestoreResult TRestoreClient::CheckSchema(const TString& dbPath, const TTableDe
return Result<TRestoreResult>();
}
-struct TWriterWaiter {
- NPrivate::IDataWriter& Writer;
-
- TWriterWaiter(NPrivate::IDataWriter& writer)
- : Writer(writer)
- {
- }
-
- ~TWriterWaiter() {
- Writer.Wait();
- }
-};
-
-TRestoreResult TRestoreClient::RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const TTableDescription& desc) {
- THolder<NPrivate::IDataAccumulator> accumulator;
+THolder<NPrivate::IDataWriter> TRestoreClient::CreateDataWriter(const TString& dbPath, const TRestoreSettings& settings,
+ const TTableDescription& desc, const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators)
+{
THolder<NPrivate::IDataWriter> writer;
-
switch (settings.Mode_) {
case TRestoreSettings::EMode::Yql:
case TRestoreSettings::EMode::BulkUpsert: {
- accumulator.Reset(CreateCompatAccumulator(dbPath, desc, settings));
- writer.Reset(CreateCompatWriter(dbPath, TableClient, accumulator.Get(), settings));
+ // Need only one accumulator to initialize query string
+ writer.Reset(CreateCompatWriter(dbPath, TableClient, accumulators[0].Get(), settings));
+ break;
+ }
+ case TRestoreSettings::EMode::ImportData: {
+ writer.Reset(CreateImportDataWriter(dbPath, desc, ImportClient, TableClient, accumulators, settings, Log));
break;
}
+ }
+ return writer;
+}
+
+TRestoreResult TRestoreClient::CreateDataAccumulators(TVector<THolder<NPrivate::IDataAccumulator>>& outAccumulators,
+ const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc, ui32 dataFilesCount)
+{
+ const ui32 accumulatorsCount = std::min(settings.InFly_, dataFilesCount);
+ outAccumulators.resize(accumulatorsCount);
+
+ switch (settings.Mode_) {
+ case TRestoreSettings::EMode::Yql:
+ case TRestoreSettings::EMode::BulkUpsert:
+ for (size_t i = 0; i < accumulatorsCount; ++i) {
+ outAccumulators[i].Reset(CreateCompatAccumulator(dbPath, desc, settings));
+ }
+ break;
case TRestoreSettings::EMode::ImportData: {
TMaybe<TTableDescription> actualDesc;
@@ -424,63 +450,124 @@ TRestoreResult TRestoreClient::RestoreData(const TFsPath& fsPath, const TString&
if (!descResult.IsSuccess()) {
return Result<TRestoreResult>(dbPath, std::move(descResult));
}
-
- accumulator.Reset(CreateImportDataAccumulator(desc, *actualDesc, settings, Log));
- writer.Reset(CreateImportDataWriter(dbPath, desc, ImportClient, TableClient, accumulator.Get(), settings, Log));
-
+ for (size_t i = 0; i < accumulatorsCount; ++i) {
+ outAccumulators[i].Reset(CreateImportDataAccumulator(desc, *actualDesc, settings, Log));
+ }
break;
}
}
+ return Result<TRestoreResult>();
+}
- TWriterWaiter waiter(*writer);
+TRestoreResult TRestoreClient::RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const TTableDescription& desc) {
+ const ui32 dataFilesCount = CountDataFiles(fsPath);
+ if (dataFilesCount == 0) {
+ return Result<TRestoreResult>();
+ }
- ui32 dataFileId = 0;
- TFsPath dataFile = fsPath.Child(DataFileName(dataFileId));
- TVector<TString> dataFileNames;
+ TVector<THolder<NPrivate::IDataAccumulator>> accumulators;
+ if (auto res = CreateDataAccumulators(accumulators, dbPath, settings, desc, dataFilesCount); !res.IsSuccess()) {
+ return res;
+ }
+
+ THolder<NPrivate::IDataWriter> writer = CreateDataWriter(dbPath, settings, desc, accumulators);
- while (dataFile.Exists()) {
- LOG_D("Read data from " << dataFile.GetPath().Quote());
+ TVector<TRestoreResult> accumulatorWorkersResults(accumulators.size(), Result<TRestoreResult>());
+ TThreadPool accumulatorWorkers(TThreadPool::TParams().SetBlocking(true));
+ accumulatorWorkers.Start(accumulators.size(), accumulators.size());
- dataFileNames.push_back(dataFile);
- TFileInput input(dataFile, settings.FileBufferSize_);
- TString line;
- ui64 lineNo = 0;
+ const ui32 dataFilesPerAccumulator = dataFilesCount / accumulators.size();
+ const ui32 dataFilesPerAccumulatorRemainder = dataFilesCount % accumulators.size();
+ for (ui32 i = 0; i < accumulators.size(); ++i) {
+ auto* accumulator = accumulators[i].Get();
- while (input.ReadLine(line)) {
- auto l = NPrivate::TLine(std::move(line), dataFileNames.back(), ++lineNo);
- for (auto status = accumulator->Check(l); status != NPrivate::IDataAccumulator::OK; status = accumulator->Check(l)) {
- if (status == NPrivate::IDataAccumulator::ERROR) {
- return Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR,
- TStringBuilder() << "Invalid data: " << l.GetLocation());
- }
+ ui32 dataFileIdStart = dataFilesPerAccumulator * i + std::min(i, dataFilesPerAccumulatorRemainder);
+ ui32 dataFileIdEnd = dataFilesPerAccumulator * (i + 1) + std::min(i + 1, dataFilesPerAccumulatorRemainder);
+ auto func = [&, i, dataFileIdStart, dataFileIdEnd, accumulator]() {
+ for (size_t id = dataFileIdStart; id < dataFileIdEnd; ++id) {
+ TFsPath dataFile = fsPath.Child(DataFileName(id));
- if (!accumulator->Ready(true)) {
- LOG_E("Error reading data from " << dataFile.GetPath().Quote());
- return Result<TRestoreResult>(dbPath, EStatus::INTERNAL_ERROR, "Data is not ready");
- }
+ LOG_D("Read data from " << dataFile.GetPath().Quote());
- if (!writer->Push(accumulator->GetData(true))) {
- LOG_E("Error writing data to " << dbPath.Quote() << ", file: " << dataFile.GetPath().Quote());
- return Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Cannot write data #1");
+ TFileInput input(dataFile, settings.FileBufferSize_);
+ TString line;
+ ui64 lineNo = 0;
+
+ while (input.ReadLine(line)) {
+ auto l = NPrivate::TLine(std::move(line), dataFile.GetPath(), ++lineNo);
+
+ for (auto status = accumulator->Check(l); status != NPrivate::IDataAccumulator::OK; status = accumulator->Check(l)) {
+ if (status == NPrivate::IDataAccumulator::ERROR) {
+ accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR,
+ TStringBuilder() << "Invalid data: " << l.GetLocation());
+ return;
+ }
+
+ if (!accumulator->Ready(true)) {
+ LOG_E("Error reading data from " << dataFile.GetPath().Quote());
+ accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::INTERNAL_ERROR, "Data is not ready");
+ return;
+ }
+
+ if (!writer->Push(accumulator->GetData(true))) {
+ LOG_E("Error writing data to " << dbPath.Quote() << ", file: " << dataFile.GetPath().Quote());
+ accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Cannot write data #1");
+ return;
+ }
+ }
+
+ accumulator->Feed(std::move(l));
+ if (accumulator->Ready()) {
+ if (!writer->Push(accumulator->GetData())) {
+ LOG_E("Error writing data to " << dbPath.Quote() << ", file: " << dataFile.GetPath().Quote());
+ accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Cannot write data #2");
+ return;
+ }
+ }
}
}
- accumulator->Feed(std::move(l));
- if (accumulator->Ready()) {
- if (!writer->Push(accumulator->GetData())) {
- LOG_E("Error writing data to " << dbPath.Quote() << ", file: " << dataFile.GetPath().Quote());
- return Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Cannot write data #2");
+ while (accumulator->Ready(true)) {
+ if (!writer->Push(accumulator->GetData(true))) {
+ accumulatorWorkersResults[i] = Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Cannot write data #3");
+ return;
}
}
+ };
+
+ if (!accumulatorWorkers.AddFunc(std::move(func))) {
+ return Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Can't start restoring data: queue is full or shutting down");
}
+ }
- dataFile = fsPath.Child(DataFileName(++dataFileId));
+ accumulatorWorkers.Stop();
+ if (auto res = CombineResults(accumulatorWorkersResults); !res.IsSuccess()) {
+ return res;
}
- while (accumulator->Ready(true)) {
- if (!writer->Push(accumulator->GetData(true))) {
- LOG_E("Error writing data to " << dbPath.Quote() << ", file: " << dataFile.GetPath().Quote());
- return Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Cannot write data #3");
+ // ensure that all data is restored
+ while (true) {
+ writer->Wait();
+
+ bool dataFound = false;
+ for (auto& acc : accumulators) {
+ if (acc->Ready(true)) {
+ dataFound = true;
+ break;
+ }
+ }
+
+ if (dataFound) {
+ writer = CreateDataWriter(dbPath, settings, desc, accumulators);
+ for (auto& acc : accumulators) {
+ while (acc->Ready(true)) {
+ if (!writer->Push(acc->GetData(true))) {
+ return Result<TRestoreResult>(dbPath, EStatus::GENERIC_ERROR, "Cannot write data #4");
+ }
+ }
+ }
+ } else {
+ break;
}
}
diff --git a/ydb/public/lib/ydb_cli/dump/restore_impl.h b/ydb/public/lib/ydb_cli/dump/restore_impl.h
index 402eeed08d2..2bf23baca3b 100644
--- a/ydb/public/lib/ydb_cli/dump/restore_impl.h
+++ b/ydb/public/lib/ydb_cli/dump/restore_impl.h
@@ -61,9 +61,12 @@ public:
}
};
+class IDataAccumulator;
+
class TBatch {
TStringBuilder Data;
TVector<TLocation> Locations;
+ IDataAccumulator* OriginAccumulator;
public:
void Add(const TLine& line);
@@ -84,6 +87,14 @@ public:
inline auto size() const {
return Data.size();
}
+
+ inline void SetOriginAccumulator(IDataAccumulator* originAccumulator) {
+ OriginAccumulator = originAccumulator;
+ }
+
+ inline IDataAccumulator* GetOriginAccumulator() const {
+ return OriginAccumulator;
+ }
};
class IDataAccumulator {
@@ -120,6 +131,12 @@ class TRestoreClient {
TRestoreResult RestoreIndexes(const TString& dbPath, const NTable::TTableDescription& desc);
TRestoreResult RestorePermissions(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const THashSet<TString>& oldEntries);
+ THolder<NPrivate::IDataWriter> CreateDataWriter(const TString& dbPath, const TRestoreSettings& settings,
+ const NTable::TTableDescription& desc, const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators);
+ TRestoreResult CreateDataAccumulators(TVector<THolder<NPrivate::IDataAccumulator>>& outAccumulators,
+ const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc,
+ ui32 dataFilesCount);
+
public:
explicit TRestoreClient(const TDriver& driver, const std::shared_ptr<TLog>& log);
diff --git a/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp b/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp
index 1f37fa9e65f..37b095fbe09 100644
--- a/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp
+++ b/ydb/public/lib/ydb_cli/dump/restore_import_data.cpp
@@ -745,6 +745,19 @@ public:
Rows.Add(KeyBuilder.Build(line), std::move(line));
}
+ void Feed(const NPrivate::TBatch& data) {
+ TGuard<TMutex> lock(Mutex);
+
+ TStringInput input(data.GetData());
+ TString line;
+
+ ui64 idx = 0;
+ while (input.ReadLine(line)) {
+ auto l = NPrivate::TLine(std::move(line), data.GetLocation(idx++));
+ Rows.Add(KeyBuilder.Build(l), std::move(l));
+ }
+ }
+
bool Ready(bool force) const override {
TGuard<TMutex> lock(Mutex);
return Rows.HasData(MemLimit, BatchSize, force);
@@ -752,21 +765,14 @@ public:
NPrivate::TBatch GetData(bool force) override {
TGuard<TMutex> lock(Mutex);
- return Rows.GetData(MemLimit, BatchSize, force);
+ auto batch = Rows.GetData(MemLimit, BatchSize, force);
+ batch.SetOriginAccumulator(this);
+ return batch;
}
- void Reshard(const TVector<TKeyRange>& keyRanges, const NPrivate::TBatch& data) {
+ void Reshard(const TVector<TKeyRange>& keyRanges) {
TGuard<TMutex> lock(Mutex);
Rows.Reshard(keyRanges);
-
- TStringInput input(data.GetData());
- TString line;
-
- ui64 idx = 0;
- while (input.ReadLine(line)) {
- auto l = NPrivate::TLine(std::move(line), data.GetLocation(idx++));
- Rows.Add(KeyBuilder.Build(l), std::move(l));
- }
}
private:
@@ -816,7 +822,7 @@ class TDataWriter: public NPrivate::IDataWriter {
}
if (retryNumber == maxRetries) {
- LOG_E("There is no retries left, last result: " << importResult.GetIssues().ToOneLineString());
+ LOG_E("There is no retries left, last result: " << importResult);
return false;
}
@@ -830,7 +836,14 @@ class TDataWriter: public NPrivate::IDataWriter {
return false;
}
- Accumulator->Reshard(desc->GetKeyRanges(), data);
+ for (auto* acc : Accumulators) {
+ acc->Reshard(desc->GetKeyRanges());
+ }
+
+ auto* originAcc = dynamic_cast<TDataAccumulator*>(data.GetOriginAccumulator());
+ Y_ENSURE(originAcc);
+ originAcc->Feed(data);
+
return true;
}
@@ -851,6 +864,9 @@ class TDataWriter: public NPrivate::IDataWriter {
break;
default:
+ LOG_E("Can't import data to " << Path.Quote()
+ << " at location " << data.GetLocation()
+ << ", result: " << importResult);
return false;
}
}
@@ -873,18 +889,23 @@ public:
const TRestoreSettings& settings,
TImportClient& importClient,
TTableClient& tableClient,
- NPrivate::IDataAccumulator* accumulator,
+ const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators,
const std::shared_ptr<TLog>& log)
: Path(path)
, Settings(MakeSettings(settings, desc))
, ImportClient(importClient)
, TableClient(tableClient)
- , Accumulator(dynamic_cast<TDataAccumulator*>(accumulator))
+ , Accumulators(accumulators.size())
, Log(log)
, RateLimiterSettings(settings.RateLimiterSettings_)
, RequestLimiter(RateLimiterSettings.GetRps(), RateLimiterSettings.GetRps())
+ , Stopped(0)
{
- Y_ENSURE(Accumulator);
+ Y_ENSURE(!accumulators.empty());
+ for (size_t i = 0; i < accumulators.size(); ++i) {
+ Accumulators[i] = dynamic_cast<TDataAccumulator*>(accumulators[i].Get());
+ Y_ENSURE(Accumulators[i]);
+ }
TasksQueue = MakeHolder<TThreadPool>(TThreadPool::TParams().SetBlocking(true).SetCatching(true));
TasksQueue->Start(settings.InFly_, settings.InFly_ + 1);
@@ -918,7 +939,7 @@ private:
const TImportYdbDumpDataSettings Settings;
TImportClient& ImportClient;
TTableClient& TableClient;
- TDataAccumulator* Accumulator;
+ TVector<TDataAccumulator*> Accumulators;
const std::shared_ptr<TLog> Log;
const TRateLimiterSettings RateLimiterSettings;
@@ -946,10 +967,10 @@ NPrivate::IDataWriter* CreateImportDataWriter(
const TTableDescription& desc,
TImportClient& importClient,
TTableClient& tableClient,
- NPrivate::IDataAccumulator* accumulator,
+ const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators,
const TRestoreSettings& settings,
const std::shared_ptr<TLog>& log) {
- return new TDataWriter(path, desc, settings, importClient, tableClient, accumulator, log);
+ return new TDataWriter(path, desc, settings, importClient, tableClient, accumulators, log);
}
} // NDump
diff --git a/ydb/public/lib/ydb_cli/dump/restore_import_data.h b/ydb/public/lib/ydb_cli/dump/restore_import_data.h
index 10ff764eac8..f8d5044c7e2 100644
--- a/ydb/public/lib/ydb_cli/dump/restore_import_data.h
+++ b/ydb/public/lib/ydb_cli/dump/restore_import_data.h
@@ -18,7 +18,7 @@ NPrivate::IDataWriter* CreateImportDataWriter(
const NTable::TTableDescription& desc,
NImport::TImportClient& importClient,
NTable::TTableClient& tableClient,
- NPrivate::IDataAccumulator* accumulator,
+ const TVector<THolder<NPrivate::IDataAccumulator>>& accumulators,
const TRestoreSettings& settings,
const std::shared_ptr<TLog>& log);
diff --git a/ydb/public/sdk/cpp/client/ydb_import/out.cpp b/ydb/public/sdk/cpp/client/ydb_import/out.cpp
new file mode 100644
index 00000000000..3d710034f5b
--- /dev/null
+++ b/ydb/public/sdk/cpp/client/ydb_import/out.cpp
@@ -0,0 +1,5 @@
+#include "import.h"
+
+Y_DECLARE_OUT_SPEC(, NYdb::NImport::TImportDataResult, o, x) {
+ return x.Out(o);
+}
diff --git a/ydb/public/sdk/cpp/client/ydb_import/ya.make b/ydb/public/sdk/cpp/client/ydb_import/ya.make
index a670ca501bb..7fa5b23d62d 100644
--- a/ydb/public/sdk/cpp/client/ydb_import/ya.make
+++ b/ydb/public/sdk/cpp/client/ydb_import/ya.make
@@ -2,6 +2,7 @@ LIBRARY()
SRCS(
import.cpp
+ out.cpp
)
GENERATE_ENUM_SERIALIZATION(import.h)