aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbrgayazov <bulat@ydb.tech>2023-04-17 19:40:48 +0300
committerbrgayazov <bulat@ydb.tech>2023-04-17 19:40:48 +0300
commit2ec6f56358c1cc23520aa2029b575b63e21536ed (patch)
treea4143c7cd85ee8f73bfdbf9aa8d6a9e450431afb
parentf822fe37d70e74e7dc30a4809e3376398a719f25 (diff)
downloadydb-2ec6f56358c1cc23520aa2029b575b63e21536ed.tar.gz
Add reading from different parts of file and --columns option
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp8
-rw-r--r--ydb/public/lib/ydb_cli/commands/ydb_service_import.h2
-rw-r--r--ydb/public/lib/ydb_cli/import/import.cpp283
-rw-r--r--ydb/public/lib/ydb_cli/import/import.h5
4 files changed, 292 insertions, 6 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
index e75cbbeb6f..f14948c786 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
@@ -195,6 +195,12 @@ void TCommandImportFromCsv::Config(TConfig& config) {
config.Opts->AddLongOption("header",
"Set if file contains column names at the first not skipped row")
.StoreTrue(&Header);
+ config.Opts->AddLongOption("columns",
+ "String with column names that replaces header")
+ .RequiredArgument("STR").StoreResult(&HeaderRow);
+ config.Opts->AddLongOption("newline-delimited",
+ "No newline characters inside records, enables some import optimizations (see docs)")
+ .StoreTrue(&NewlineDelimited);
if (InputFormat == EOutputFormat::Csv) {
config.Opts->AddLongOption("delimiter", "Field delimiter in rows")
.RequiredArgument("STRING").StoreResult(&Delimiter).DefaultValue(Delimiter);
@@ -211,6 +217,8 @@ int TCommandImportFromCsv::Run(TConfig& config) {
settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest));
settings.SkipRows(SkipRows);
settings.Header(Header);
+ settings.NewlineDelimited(NewlineDelimited);
+ settings.HeaderRow(HeaderRow);
settings.NullValue(NullValue);
if (Delimiter.size() != 1) {
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
index ddb2c2c272..2c216ac90c 100644
--- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
+++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h
@@ -73,10 +73,12 @@ public:
int Run(TConfig& config) override;
protected:
+ TString HeaderRow;
TString Delimiter;
TString NullValue;
ui32 SkipRows = 0;
bool Header = false;
+ bool NewlineDelimited = true;
};
class TCommandImportFromTsv : public TCommandImportFromCsv {
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp
index c9d75669fe..8a94a859cb 100644
--- a/ydb/public/lib/ydb_cli/import/import.cpp
+++ b/ydb/public/lib/ydb_cli/import/import.cpp
@@ -14,11 +14,13 @@
#include <ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h>
#include <library/cpp/string_utils/csv/csv.h>
+#include <library/cpp/threading/future/async.h>
#include <util/folder/path.h>
#include <util/generic/vector.h>
#include <util/stream/file.h>
#include <util/string/builder.h>
+#include <util/system/mutex.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
#include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h>
@@ -30,6 +32,13 @@
#include <stack>
+#if defined(_win32_)
+#include <windows.h>
+#elif defined(_unix_)
+#include <unistd.h>
+#endif
+
+
namespace NYdb {
namespace NConsoleClient {
namespace {
@@ -65,6 +74,179 @@ TStatus WaitForQueue(const size_t maxQueueSize, std::vector<TAsyncStatus>& inFli
return MakeStatus();
}
+FHANDLE GetStdinFileno() {
+#if defined(_win32_)
+ return GetStdHandle(STD_INPUT_HANDLE);
+#elif defined(_unix_)
+ return STDIN_FILENO;
+#endif
+}
+
+class TCsvFileReader {
+private:
+ // This class exists because NCsvFormat::TLinesSplitter doesn't return a number of characters read(including '\r' and '\n')
+ class TLinesSplitter {
+ private:
+ IInputStream& Input;
+ const char Quote;
+ public:
+ TLinesSplitter(IInputStream& input, const char quote = '"')
+ : Input(input)
+ , Quote(quote) {
+ }
+
+ size_t ConsumeLine(TString& result) {
+ bool escape = false;
+ TString line;
+ size_t length = 0;
+ while (size_t lineLength = Input.ReadLine(line)) {
+ length += lineLength;
+
+ for (auto it = line.begin(); it != line.end(); ++it) {
+ if (*it == Quote) {
+ escape = !escape;
+ }
+ }
+ if (!result) {
+ result = line;
+ } else {
+ result += line;
+ }
+ if (!escape) {
+ break;
+ } else {
+ result += "\n";
+ }
+ }
+ return length;
+ }
+ };
+
+ class TFileChunk {
+ public:
+ TFileChunk(TFile file, THolder<IInputStream>&& stream, i64 currentPos = 0, i64 endPos = std::numeric_limits<i64>::max())
+ : File(file)
+ , Stream(std::move(stream))
+ , CurrentPos(currentPos)
+ , EndPos(endPos) {
+ }
+
+ void UpdateEndPos(i64 endPos) {
+ EndPos = endPos;
+ }
+
+ size_t ConsumeLine(TString& line) {
+ size_t len = TLinesSplitter(*Stream).ConsumeLine(line);
+ MoveCurrentPos(len);
+ return len;
+ }
+
+ bool IsEndReached() const {
+ return EndPos <= CurrentPos;
+ }
+
+ private:
+ void MoveCurrentPos(size_t len) {
+ if (len > 0) {
+ CurrentPos += len;
+ } else {
+ CurrentPos = EndPos;
+ }
+ }
+
+ TFile File;
+ THolder<IInputStream> Stream;
+ i64 CurrentPos;
+ i64 EndPos;
+ };
+
+public:
+ TCsvFileReader(const TString& filePath, const TImportFileSettings& settings, TString& headerRow) {
+ TFile file;
+ if (filePath) {
+ file = TFile(filePath, RdOnly);
+ } else {
+ file = TFile(GetStdinFileno());
+ }
+ auto input = MakeHolder<TFileInput>(file);
+ i64 skipSize = 0;
+ TString temp;
+ if (settings.Header_) {
+ skipSize += TLinesSplitter(*input).ConsumeLine(headerRow);
+ }
+ for (ui32 i = 0; i < settings.SkipRows_; ++i) {
+ skipSize += TLinesSplitter(*input).ConsumeLine(temp);
+ }
+
+ MaxInFlight = settings.MaxInFlightRequests_;
+ i64 fileSize = file.GetLength();
+ if (filePath.empty() || fileSize == -1) {
+ SplitCount = 1;
+ Chunks.emplace_back(file, std::move(input));
+ return;
+ }
+
+ SplitCount = Min(settings.MaxInFlightRequests_, (fileSize - skipSize) / settings.BytesPerRequest_ + 1);
+ i64 chunkSize = (fileSize - skipSize) / SplitCount;
+ if (chunkSize == 0) {
+ SplitCount = 1;
+ chunkSize = fileSize - skipSize;
+ }
+
+ i64 seekPos = skipSize;
+ Chunks.reserve(SplitCount);
+ for (size_t i = 0; i < SplitCount; ++i) {
+ i64 beginPos = seekPos;
+ file = TFile(filePath, RdOnly);
+ file.Seek(seekPos, sSet);
+ if (seekPos > 0) {
+ file.Seek(-1, sCur);
+ }
+
+ auto stream = MakeHolder<TFileInput>(file);
+ if (seekPos > 0) {
+ beginPos += stream->ReadLine(temp);
+ }
+ if (!Chunks.empty()) {
+ Chunks.back().UpdateEndPos(beginPos);
+ }
+ Chunks.emplace_back(file, std::move(stream), beginPos);
+ seekPos += chunkSize;
+ }
+ Chunks.back().UpdateEndPos(fileSize);
+ }
+
+ TFileChunk& GetChunk(size_t threadId) {
+ if (threadId >= Chunks.size()) {
+ throw yexception() << "File chunk number is too big";
+ }
+ return Chunks[threadId];
+ }
+
+ static size_t ConsumeLine(TFileChunk& chunk, TString& line) {
+ if (chunk.IsEndReached()) {
+ return 0;
+ }
+
+ line.clear();
+ size_t len = chunk.ConsumeLine(line);
+ return len;
+ }
+
+ size_t GetThreadLimit(size_t thread_id) const {
+ return MaxInFlight / SplitCount + (thread_id < MaxInFlight % SplitCount ? 1 : 0);
+ }
+
+ size_t GetSplitCount() const {
+ return SplitCount;
+ }
+
+private:
+ TVector<TFileChunk> Chunks;
+ size_t SplitCount;
+ size_t MaxInFlight;
+};
+
} // namespace
TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig)
@@ -118,7 +300,11 @@ TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath
case EOutputFormat::Default:
case EOutputFormat::Csv:
case EOutputFormat::Tsv:
- return UpsertCsv(input, dbPath, settings);
+ if (settings.NewlineDelimited_) {
+ return UpsertCsvByBlocks(filePath, dbPath, settings);
+ } else {
+ return UpsertCsv(input, dbPath, settings);
+ }
case EOutputFormat::Json:
case EOutputFormat::JsonUnicode:
case EOutputFormat::JsonBase64:
@@ -162,8 +348,13 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
NCsvFormat::TLinesSplitter splitter(input);
TString headerRow;
- if (settings.Header_) {
- headerRow = splitter.ConsumeLine();
+ if (settings.Header_ || settings.HeaderRow_) {
+ if (settings.Header_) {
+ headerRow = splitter.ConsumeLine();
+ }
+ if (settings.HeaderRow_) {
+ headerRow = settings.HeaderRow_;
+ }
headerRow += '\n';
buffer = headerRow;
csvSettings.set_header(true);
@@ -185,15 +376,14 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
ui32 idx = settings.SkipRows_;
ui64 readSize = 0;
- const ui32 mb100 = 1 << 27;
- ui64 nextBorder = mb100;
+ ui64 nextBorder = VerboseModeReadSize;
while (TString line = splitter.ConsumeLine()) {
buffer += line;
buffer += '\n';
readSize += line.size();
++idx;
if (readSize >= nextBorder && RetrySettings.Verbose_) {
- nextBorder += mb100;
+ nextBorder += VerboseModeReadSize;
Cerr << "Processed " << 1.0 * readSize / (1 << 20) << "Mb and " << idx << " records" << Endl;
}
if (buffer.Size() >= settings.BytesPerRequest_) {
@@ -214,6 +404,87 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath,
return WaitForQueue(0, inFlightRequests);
}
+TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings) {
+ Ydb::Formats::CsvSettings csvSettings;
+ bool special = false;
+ if (settings.Delimiter_ != settings.DefaultDelimiter) {
+ csvSettings.set_delimiter(settings.Delimiter_);
+ special = true;
+ }
+
+ if (settings.NullValue_.size()) {
+ csvSettings.set_null_value(settings.NullValue_);
+ special = true;
+ }
+ TString headerRow;
+ TCsvFileReader splitter(filePath, settings, headerRow);
+
+ if (settings.Header_ || settings.HeaderRow_) {
+ if (settings.HeaderRow_) {
+ headerRow = settings.HeaderRow_;
+ }
+ headerRow += '\n';
+ csvSettings.set_header(true);
+ special = true;
+ }
+
+ if (special) {
+ TString formatSettings;
+ Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString(&formatSettings);
+ UpsertSettings.FormatSettings(formatSettings);
+ }
+
+ TVector<TAsyncStatus> threadResults(splitter.GetSplitCount());
+ THolder<IThreadPool> pool = CreateThreadPool(splitter.GetSplitCount());
+ for (size_t threadId = 0; threadId < splitter.GetSplitCount(); ++threadId) {
+ auto loadCsv = [this, &settings, &headerRow, &splitter, &dbPath, threadId] () {
+ std::vector<TAsyncStatus> inFlightRequests;
+ TString buffer;
+ buffer = headerRow;
+ ui32 idx = settings.SkipRows_;
+ ui64 readSize = 0;
+ ui64 nextBorder = VerboseModeReadSize;
+ TAsyncStatus status;
+ TString line;
+ while (TCsvFileReader::ConsumeLine(splitter.GetChunk(threadId), line)) {
+ buffer += line;
+ buffer += '\n';
+ readSize += line.size();
+ ++idx;
+ if (readSize >= nextBorder && RetrySettings.Verbose_) {
+ nextBorder += VerboseModeReadSize;
+ TStringBuilder builder;
+ builder << "Processed " << 1.0 * readSize / (1 << 20) << "Mb and " << idx << " records" << Endl;
+ Cerr << builder;
+ }
+ if (buffer.Size() >= settings.BytesPerRequest_) {
+ auto status = WaitForQueue(splitter.GetThreadLimit(threadId), inFlightRequests);
+ if (!status.IsSuccess()) {
+ return status;
+ }
+
+ inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer));
+ buffer = headerRow;
+ }
+ }
+
+ if (!buffer.Empty()) {
+ inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer));
+ }
+
+ return WaitForQueue(0, inFlightRequests);
+ };
+ threadResults[threadId] = NThreading::Async(loadCsv, *pool);
+ }
+ NThreading::WaitAll(threadResults).Wait();
+ for (size_t i = 0; i < splitter.GetSplitCount(); ++i) {
+ if (!threadResults[i].GetValueSync().IsSuccess()) {
+ return threadResults[i].GetValueSync();
+ }
+ }
+ return MakeStatus();
+}
+
inline
TAsyncStatus TImportFileClient::UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder) {
auto upsert = [this, dbPath, rows = builder.Build()]
diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h
index f6290fac71..9f61fb3709 100644
--- a/ydb/public/lib/ydb_cli/import/import.h
+++ b/ydb/public/lib/ydb_cli/import/import.h
@@ -44,6 +44,8 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting
// Settings below are for CSV format only
FLUENT_SETTING_DEFAULT(ui32, SkipRows, 0);
FLUENT_SETTING_DEFAULT(bool, Header, false);
+ FLUENT_SETTING_DEFAULT(bool, NewlineDelimited, false);
+ FLUENT_SETTING_DEFAULT(TString, HeaderRow, "");
FLUENT_SETTING_DEFAULT(TString, Delimiter, DefaultDelimiter);
FLUENT_SETTING_DEFAULT(TString, NullValue, "");
};
@@ -67,7 +69,10 @@ private:
NTable::TBulkUpsertSettings UpsertSettings;
NTable::TRetryOperationSettings RetrySettings;
+ static constexpr ui32 VerboseModeReadSize = 1 << 27; // 100 MB
+
TStatus UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings);
+ TStatus UpsertCsvByBlocks(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings);
TAsyncStatus UpsertCsvBuffer(const TString& dbPath, const TString& buffer);
TStatus UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings);