diff options
author | brgayazov <bulat@ydb.tech> | 2023-04-17 19:40:48 +0300 |
---|---|---|
committer | brgayazov <bulat@ydb.tech> | 2023-04-17 19:40:48 +0300 |
commit | 2ec6f56358c1cc23520aa2029b575b63e21536ed (patch) | |
tree | a4143c7cd85ee8f73bfdbf9aa8d6a9e450431afb | |
parent | f822fe37d70e74e7dc30a4809e3376398a719f25 (diff) | |
download | ydb-2ec6f56358c1cc23520aa2029b575b63e21536ed.tar.gz |
Add reading from different parts of file and --columns option
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp | 8 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/commands/ydb_service_import.h | 2 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.cpp | 283 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.h | 5 |
4 files changed, 292 insertions, 6 deletions
diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp index e75cbbeb6f..f14948c786 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp @@ -195,6 +195,12 @@ void TCommandImportFromCsv::Config(TConfig& config) { config.Opts->AddLongOption("header", "Set if file contains column names at the first not skipped row") .StoreTrue(&Header); + config.Opts->AddLongOption("columns", + "String with column names that replaces header") + .RequiredArgument("STR").StoreResult(&HeaderRow); + config.Opts->AddLongOption("newline-delimited", + "No newline characters inside records, enables some import optimizations (see docs)") + .StoreTrue(&NewlineDelimited); if (InputFormat == EOutputFormat::Csv) { config.Opts->AddLongOption("delimiter", "Field delimiter in rows") .RequiredArgument("STRING").StoreResult(&Delimiter).DefaultValue(Delimiter); @@ -211,6 +217,8 @@ int TCommandImportFromCsv::Run(TConfig& config) { settings.BytesPerRequest(NYdb::SizeFromString(BytesPerRequest)); settings.SkipRows(SkipRows); settings.Header(Header); + settings.NewlineDelimited(NewlineDelimited); + settings.HeaderRow(HeaderRow); settings.NullValue(NullValue); if (Delimiter.size() != 1) { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h index ddb2c2c272..2c216ac90c 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_import.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_import.h @@ -73,10 +73,12 @@ public: int Run(TConfig& config) override; protected: + TString HeaderRow; TString Delimiter; TString NullValue; ui32 SkipRows = 0; bool Header = false; + bool NewlineDelimited = true; }; class TCommandImportFromTsv : public TCommandImportFromCsv { diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index c9d75669fe..8a94a859cb 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -14,11 +14,13 @@ #include <ydb/public/lib/ydb_cli/import/cli_arrow_helpers.h> #include <library/cpp/string_utils/csv/csv.h> +#include <library/cpp/threading/future/async.h> #include <util/folder/path.h> #include <util/generic/vector.h> #include <util/stream/file.h> #include <util/string/builder.h> +#include <util/system/mutex.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h> #include <contrib/libs/apache/arrow/cpp/src/arrow/io/api.h> @@ -30,6 +32,13 @@ #include <stack> +#if defined(_win32_) +#include <windows.h> +#elif defined(_unix_) +#include <unistd.h> +#endif + + namespace NYdb { namespace NConsoleClient { namespace { @@ -65,6 +74,179 @@ TStatus WaitForQueue(const size_t maxQueueSize, std::vector<TAsyncStatus>& inFli return MakeStatus(); } +FHANDLE GetStdinFileno() { +#if defined(_win32_) + return GetStdHandle(STD_INPUT_HANDLE); +#elif defined(_unix_) + return STDIN_FILENO; +#endif +} + +class TCsvFileReader { +private: + // This class exists because NCsvFormat::TLinesSplitter doesn't return a number of characters read(including '\r' and '\n') + class TLinesSplitter { + private: + IInputStream& Input; + const char Quote; + public: + TLinesSplitter(IInputStream& input, const char quote = '"') + : Input(input) + , Quote(quote) { + } + + size_t ConsumeLine(TString& result) { + bool escape = false; + TString line; + size_t length = 0; + while (size_t lineLength = Input.ReadLine(line)) { + length += lineLength; + + for (auto it = line.begin(); it != line.end(); ++it) { + if (*it == Quote) { + escape = !escape; + } + } + if (!result) { + result = line; + } else { + result += line; + } + if (!escape) { + break; + } else { + result += "\n"; + } + } + return length; + } + }; + + class TFileChunk { + public: + TFileChunk(TFile file, THolder<IInputStream>&& stream, i64 currentPos = 0, i64 endPos = std::numeric_limits<i64>::max()) + : File(file) + , Stream(std::move(stream)) + , CurrentPos(currentPos) + , EndPos(endPos) { + } + + void UpdateEndPos(i64 endPos) { + EndPos = endPos; + } + + size_t ConsumeLine(TString& line) { + size_t len = TLinesSplitter(*Stream).ConsumeLine(line); + MoveCurrentPos(len); + return len; + } + + bool IsEndReached() const { + return EndPos <= CurrentPos; + } + + private: + void MoveCurrentPos(size_t len) { + if (len > 0) { + CurrentPos += len; + } else { + CurrentPos = EndPos; + } + } + + TFile File; + THolder<IInputStream> Stream; + i64 CurrentPos; + i64 EndPos; + }; + +public: + TCsvFileReader(const TString& filePath, const TImportFileSettings& settings, TString& headerRow) { + TFile file; + if (filePath) { + file = TFile(filePath, RdOnly); + } else { + file = TFile(GetStdinFileno()); + } + auto input = MakeHolder<TFileInput>(file); + i64 skipSize = 0; + TString temp; + if (settings.Header_) { + skipSize += TLinesSplitter(*input).ConsumeLine(headerRow); + } + for (ui32 i = 0; i < settings.SkipRows_; ++i) { + skipSize += TLinesSplitter(*input).ConsumeLine(temp); + } + + MaxInFlight = settings.MaxInFlightRequests_; + i64 fileSize = file.GetLength(); + if (filePath.empty() || fileSize == -1) { + SplitCount = 1; + Chunks.emplace_back(file, std::move(input)); + return; + } + + SplitCount = Min(settings.MaxInFlightRequests_, (fileSize - skipSize) / settings.BytesPerRequest_ + 1); + i64 chunkSize = (fileSize - skipSize) / SplitCount; + if (chunkSize == 0) { + SplitCount = 1; + chunkSize = fileSize - skipSize; + } + + i64 seekPos = skipSize; + Chunks.reserve(SplitCount); + for (size_t i = 0; i < SplitCount; ++i) { + i64 beginPos = seekPos; + file = TFile(filePath, RdOnly); + file.Seek(seekPos, sSet); + if (seekPos > 0) { + file.Seek(-1, sCur); + } + + auto stream = MakeHolder<TFileInput>(file); + if (seekPos > 0) { + beginPos += stream->ReadLine(temp); + } + if (!Chunks.empty()) { + Chunks.back().UpdateEndPos(beginPos); + } + Chunks.emplace_back(file, std::move(stream), beginPos); + seekPos += chunkSize; + } + Chunks.back().UpdateEndPos(fileSize); + } + + TFileChunk& GetChunk(size_t threadId) { + if (threadId >= Chunks.size()) { + throw yexception() << "File chunk number is too big"; + } + return Chunks[threadId]; + } + + static size_t ConsumeLine(TFileChunk& chunk, TString& line) { + if (chunk.IsEndReached()) { + return 0; + } + + line.clear(); + size_t len = chunk.ConsumeLine(line); + return len; + } + + size_t GetThreadLimit(size_t thread_id) const { + return MaxInFlight / SplitCount + (thread_id < MaxInFlight % SplitCount ? 1 : 0); + } + + size_t GetSplitCount() const { + return SplitCount; + } + +private: + TVector<TFileChunk> Chunks; + size_t SplitCount; + size_t MaxInFlight; +}; + } // namespace TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand::TConfig& rootConfig) @@ -118,7 +300,11 @@ TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath case EOutputFormat::Default: case EOutputFormat::Csv: case EOutputFormat::Tsv: - return UpsertCsv(input, dbPath, settings); + if (settings.NewlineDelimited_) { + return UpsertCsvByBlocks(filePath, dbPath, settings); + } else { + return UpsertCsv(input, dbPath, settings); + } case EOutputFormat::Json: case EOutputFormat::JsonUnicode: case EOutputFormat::JsonBase64: @@ -162,8 +348,13 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, NCsvFormat::TLinesSplitter splitter(input); TString headerRow; - if (settings.Header_) { - headerRow = splitter.ConsumeLine(); + if (settings.Header_ || settings.HeaderRow_) { + if (settings.Header_) { + headerRow = splitter.ConsumeLine(); + } + if (settings.HeaderRow_) { + headerRow = settings.HeaderRow_; + } headerRow += '\n'; buffer = headerRow; csvSettings.set_header(true); @@ -185,15 +376,14 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, ui32 idx = settings.SkipRows_; ui64 readSize = 0; - const ui32 mb100 = 1 << 27; - ui64 nextBorder = mb100; + ui64 nextBorder = VerboseModeReadSize; while (TString line = splitter.ConsumeLine()) { buffer += line; buffer += '\n'; readSize += line.size(); ++idx; if (readSize >= nextBorder && RetrySettings.Verbose_) { - nextBorder += mb100; + nextBorder += VerboseModeReadSize; Cerr << "Processed " << 1.0 * readSize / (1 << 20) << "Mb and " << idx << " records" << Endl; } if (buffer.Size() >= settings.BytesPerRequest_) { @@ -214,6 +404,87 @@ TStatus TImportFileClient::UpsertCsv(IInputStream& input, const TString& dbPath, return WaitForQueue(0, inFlightRequests); } +TStatus TImportFileClient::UpsertCsvByBlocks(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings) { + Ydb::Formats::CsvSettings csvSettings; + bool special = false; + if (settings.Delimiter_ != settings.DefaultDelimiter) { + csvSettings.set_delimiter(settings.Delimiter_); + special = true; + } + + if (settings.NullValue_.size()) { + csvSettings.set_null_value(settings.NullValue_); + special = true; + } + TString headerRow; + TCsvFileReader splitter(filePath, settings, headerRow); + + if (settings.Header_ || settings.HeaderRow_) { + if (settings.HeaderRow_) { + headerRow = settings.HeaderRow_; + } + headerRow += '\n'; + csvSettings.set_header(true); + special = true; + } + + if (special) { + TString formatSettings; + Y_PROTOBUF_SUPPRESS_NODISCARD csvSettings.SerializeToString(&formatSettings); + UpsertSettings.FormatSettings(formatSettings); + } + + TVector<TAsyncStatus> threadResults(splitter.GetSplitCount()); + THolder<IThreadPool> pool = CreateThreadPool(splitter.GetSplitCount()); + for (size_t threadId = 0; threadId < splitter.GetSplitCount(); ++threadId) { + auto loadCsv = [this, &settings, &headerRow, &splitter, &dbPath, threadId] () { + std::vector<TAsyncStatus> inFlightRequests; + TString buffer; + buffer = headerRow; + ui32 idx = settings.SkipRows_; + ui64 readSize = 0; + ui64 nextBorder = VerboseModeReadSize; + TAsyncStatus status; + TString line; + while (TCsvFileReader::ConsumeLine(splitter.GetChunk(threadId), line)) { + buffer += line; + buffer += '\n'; + readSize += line.size(); + ++idx; + if (readSize >= nextBorder && RetrySettings.Verbose_) { + nextBorder += VerboseModeReadSize; + TStringBuilder builder; + builder << "Processed " << 1.0 * readSize / (1 << 20) << "Mb and " << idx << " records" << Endl; + Cerr << builder; + } + if (buffer.Size() >= settings.BytesPerRequest_) { + auto status = WaitForQueue(splitter.GetThreadLimit(threadId), inFlightRequests); + if (!status.IsSuccess()) { + return status; + } + + inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer)); + buffer = headerRow; + } + } + + if (!buffer.Empty()) { + inFlightRequests.push_back(UpsertCsvBuffer(dbPath, buffer)); + } + + return WaitForQueue(0, inFlightRequests); + }; + threadResults[threadId] = NThreading::Async(loadCsv, *pool); + } + NThreading::WaitAll(threadResults).Wait(); + for (size_t i = 0; i < splitter.GetSplitCount(); ++i) { + if (!threadResults[i].GetValueSync().IsSuccess()) { + return threadResults[i].GetValueSync(); + } + } + return MakeStatus(); +} + inline TAsyncStatus TImportFileClient::UpsertJsonBuffer(const TString& dbPath, TValueBuilder& builder) { auto upsert = [this, dbPath, rows = builder.Build()] diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h index f6290fac71..9f61fb3709 100644 --- a/ydb/public/lib/ydb_cli/import/import.h +++ b/ydb/public/lib/ydb_cli/import/import.h @@ -44,6 +44,8 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting // Settings below are for CSV format only FLUENT_SETTING_DEFAULT(ui32, SkipRows, 0); FLUENT_SETTING_DEFAULT(bool, Header, false); + FLUENT_SETTING_DEFAULT(bool, NewlineDelimited, false); + FLUENT_SETTING_DEFAULT(TString, HeaderRow, ""); FLUENT_SETTING_DEFAULT(TString, Delimiter, DefaultDelimiter); FLUENT_SETTING_DEFAULT(TString, NullValue, ""); }; @@ -67,7 +69,10 @@ private: NTable::TBulkUpsertSettings UpsertSettings; NTable::TRetryOperationSettings RetrySettings; + static constexpr ui32 VerboseModeReadSize = 1 << 27; // 100 MB + TStatus UpsertCsv(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings); + TStatus UpsertCsvByBlocks(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings); TAsyncStatus UpsertCsvBuffer(const TString& dbPath, const TString& buffer); TStatus UpsertJson(IInputStream& input, const TString& dbPath, const TImportFileSettings& settings); |