diff options
author | chertus <azuikov@ydb.tech> | 2023-01-25 20:22:15 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-01-25 20:22:15 +0300 |
commit | 42cfc238c37e3c1b1cab50b733448e4e5c754702 (patch) | |
tree | c3b7134a488f9b2e3d880d3cddb97086b69ef803 | |
parent | 060e8315fa63c835a89e4b704e9a8e5153c6e792 (diff) | |
download | ydb-42cfc238c37e3c1b1cab50b733448e4e5c754702.tar.gz |
add retries to TLongTxWrite rpc
-rw-r--r-- | ydb/core/grpc_services/rpc_long_tx.cpp | 96 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp | 70 | ||||
-rw-r--r-- | ydb/core/tx/tx_proxy/upload_rows_common_impl.h | 19 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.cpp | 21 | ||||
-rw-r--r-- | ydb/public/lib/ydb_cli/import/import.h | 3 |
8 files changed, 206 insertions, 31 deletions
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index fe034e2257..b50d99fe42 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -397,7 +397,22 @@ class TLongTxWriteBase : public TActorBootstrapped<TLongTxWriteImpl> { using TBase = TActorBootstrapped<TLongTxWriteImpl>; protected: using TThis = typename TBase::TThis; + public: + struct TRetryData { + static const constexpr ui32 MaxRetriesPerShard = 10; + static const constexpr ui32 OverloadedDelayMs = 200; + + ui64 TableId; + TString DedupId; + TString Data; + ui32 NumRetries; + + static TDuration OverloadTimeout() { + return TDuration::MilliSeconds(OverloadedDelayMs); + } + }; + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::GRPC_REQ; } @@ -423,10 +438,6 @@ public: } protected: - void SetLongTxId(const TLongTxId& longTxId) { - LongTxId = longTxId; - } - void ProceedWithSchema(const NSchemeCache::TSchemeCacheNavigate& resp) { NWilson::TProfileSpan pSpan = ActorSpan.BuildChildrenSpan("ProceedWithSchema"); if (resp.ErrorCount > 0) { @@ -502,15 +513,45 @@ protected: private: void SendWriteRequest(ui64 shardId, ui64 tableId, const TString& dedupId, const TString& data) { - WaitShards.insert(shardId); + TRetryData retry{ + .TableId = tableId, + .DedupId = dedupId, + .Data = data, + .NumRetries = 0 + }; + WaitShards.emplace(shardId, std::move(retry)); SendToTablet(shardId, MakeHolder<TEvColumnShard::TEvWrite>(this->SelfId(), LongTxId, tableId, dedupId, data)); } + bool RetryWriteRequest(ui64 shardId, bool delayed = true) { + if (!WaitShards.count(shardId)) { + return false; + } + + auto& retry = WaitShards[shardId]; + if (retry.NumRetries < TRetryData::MaxRetriesPerShard) { + if (delayed) { + if (ShardsToRetry.empty()) { + TimeoutTimerActorId = CreateLongTimer(TRetryData::OverloadTimeout(), + new IEventHandle(this->SelfId(), this->SelfId(), new TEvents::TEvWakeup())); + } + ShardsToRetry.insert(shardId); + } else { + ++retry.NumRetries; + SendToTablet(shardId, MakeHolder<TEvColumnShard::TEvWrite>(this->SelfId(), LongTxId, + retry.TableId, retry.DedupId, retry.Data)); + } + return true; + } + return false; + } + STFUNC(StateWrite) { Y_UNUSED(ctx); switch (ev->GetTypeRewrite()) { hFunc(TEvColumnShard::TEvWriteResult, Handle); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); + CFunc(TEvents::TSystem::Wakeup, HandleTimeout); } } @@ -552,9 +593,15 @@ private: Y_VERIFY(WaitShards.count(shardId) || ShardsWrites.count(shardId)); auto status = msg->Record.GetStatus(); + if (status == NKikimrTxColumnShard::OVERLOADED) { + if (RetryWriteRequest(shardId)) { + return; + } + } if (status != NKikimrTxColumnShard::SUCCESS) { auto ydbStatus = ConvertToYdbStatus(status); - return ReplyError(ydbStatus, "Write error"); + return ReplyError(ydbStatus, + TStringBuilder() << "Cannot write data into shard " << shardId << " in longTx " << LongTxId.ToString()); } if (!WaitShards.count(shardId)) { @@ -571,12 +618,31 @@ private: void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { NWilson::TProfileSpan pSpan(0, ActorSpan.GetTraceId(), "DeliveryProblem"); const auto* msg = ev->Get(); + ui64 shardId = msg->TabletId; + + if (!WaitShards.count(shardId)) { + return; + } + if (RetryWriteRequest(shardId)) { + return; + } + + TString errMsg = TStringBuilder() << "Shard " << shardId << " is not available after " + << WaitShards[shardId].NumRetries << " retries"; if (msg->NotDelivered) { - return ReplyError(Ydb::StatusIds::UNAVAILABLE, "Shard unavailable"); + return ReplyError(Ydb::StatusIds::UNAVAILABLE, errMsg); } else { - return ReplyError(Ydb::StatusIds::UNDETERMINED, "Shard unavailable"); + return ReplyError(Ydb::StatusIds::UNDETERMINED, errMsg); + } + } + + void HandleTimeout(const TActorContext& /*ctx*/) { + TimeoutTimerActorId = {}; + for (ui64 shardId : ShardsToRetry) { + RetryWriteRequest(shardId, false); } + ShardsToRetry.clear(); } private: @@ -636,13 +702,15 @@ protected: const TString DatabaseName; const TString Path; const TString DedupId; -private: TLongTxId LongTxId; +private: const TActorId LeaderPipeCache; std::optional<NACLib::TUserToken> UserToken; - THashSet<ui64> WaitShards; + THashMap<ui64, TRetryData> WaitShards; THashMap<ui64, ui64> ShardsWrites; + THashSet<ui64> ShardsToRetry; NWilson::TProfileSpan ActorSpan; + TActorId TimeoutTimerActorId; }; @@ -669,11 +737,9 @@ public: const auto* req = GetProtoRequest(); TString errMsg; - TLongTxId longTxId; - if (!longTxId.ParseString(req->tx_id(), &errMsg)) { + if (!LongTxId.ParseString(req->tx_id(), &errMsg)) { return ReplyError(Ydb::StatusIds::BAD_REQUEST, errMsg); } - SetLongTxId(longTxId); if (GetProtoRequest()->data().format() != Ydb::LongTx::Data::APACHE_ARROW) { return ReplyError(Ydb::StatusIds::BAD_REQUEST, "Only APACHE_ARROW data format is supported"); @@ -829,7 +895,7 @@ class TLongTxReadRPC : public TActorBootstrapped<TLongTxReadRPC> { using TBase = TActorBootstrapped<TLongTxReadRPC>; private: - static const constexpr ui64 MaxRetriesPerShard = 10; + static const constexpr ui32 MaxRetriesPerShard = 10; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -970,7 +1036,7 @@ private: ShardRetries[shard] = 0; } - ui64 retries = ++ShardRetries[shard]; + ui32 retries = ++ShardRetries[shard]; if (retries > MaxRetriesPerShard) { return ReplyError(Ydb::StatusIds::UNAVAILABLE, Sprintf("Failed to connect to shard %lu", shard)); } diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index b4a339c8a0..b2bfa0d18d 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -97,6 +97,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { Self->BlobManager->SaveBlobBatch(std::move(Ev->Get()->BlobBatch), blobManagerDb); } else { + LOG_S_DEBUG("TTxWrite duplicate writeId " << writeId << " at tablet " << Self->TabletID()); + // Return EResultStatus::SUCCESS for dups Self->IncCounter(COUNTER_WRITE_DUPLICATE); } @@ -193,6 +195,23 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex TabletID(), metaShard, writeId, tableId, dedupId, NKikimrTxColumnShard::EResultStatus::OVERLOADED); ctx.Send(ev->Get()->GetSource(), result.release()); } else { + if (record.HasLongTxId()) { + // TODO: multiple blobs in one longTx ({longTxId, dedupId} -> writeId) + auto longTxId = NLongTxService::TLongTxId::FromProto(record.GetLongTxId()); + if (ui64 writeId = (ui64)HasLongTxWrite(longTxId)) { + LOG_S_DEBUG("Write (duplicate) into pathId " << tableId + << " longTx " << longTxId.ToString() + << " at tablet " << TabletID()); + + IncCounter(COUNTER_WRITE_DUPLICATE); + + auto result = std::make_unique<TEvColumnShard::TEvWriteResult>( + TabletID(), metaShard, writeId, tableId, dedupId, NKikimrTxColumnShard::EResultStatus::SUCCESS); + ctx.Send(ev->Get()->GetSource(), result.release()); + return; + } + } + LOG_S_DEBUG("Write (blob) " << data.size() << " bytes into pathId " << tableId << (writeId? (" writeId " + ToString(writeId)).c_str() : "") << " at tablet " << TabletID()); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index e3c3eca148..8a2698d929 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -247,6 +247,14 @@ bool TColumnShard::HaveOutdatedTxs() const { return it->MaxStep <= step; } +TWriteId TColumnShard::HasLongTxWrite(const NLongTxService::TLongTxId& longTxId) { + auto it = LongTxWritesByUniqueId.find(longTxId.UniqueId); + if (it != LongTxWritesByUniqueId.end()) { + return (TWriteId)it->second->WriteId; + } + return (TWriteId)0; +} + TWriteId TColumnShard::GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId) { auto it = LongTxWritesByUniqueId.find(longTxId.UniqueId); if (it != LongTxWritesByUniqueId.end()) { diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index dfbd8700d2..53317909e8 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -423,6 +423,7 @@ private: return PrimaryIndex && PrimaryIndex->HasOverloadedGranules(); } + TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId); TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId); void AddLongTxWrite(TWriteId writeId, ui64 txId); void LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLongTxId& longTxId); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 408efc1422..6ad34c2e27 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -478,6 +478,75 @@ void TestWriteReadDup() { } } +void TestWriteReadLongTxDup() { + TTestBasicRuntime runtime; + TTester::Setup(runtime); + + TActorId sender = runtime.AllocateEdgeActor(); + CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); + + TDispatchOptions options; + options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); + runtime.DispatchEvents(options); + + // + + ui64 tableId = 1; + auto ydbSchema = TTestSchema::YdbSchema(); + SetupSchema(runtime, sender, tableId, ydbSchema); + + constexpr ui32 numRows = 10; + std::pair<ui64, ui64> portion = {10, 10 + numRows}; + + NLongTxService::TLongTxId longTxId; + UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1")); + + ui64 txId = 0; + ui64 planStep = 100; + std::optional<ui64> writeId; + + // Only the first blob with dedup pair {longTx, dedupId} should be inserted + // Others should return OK (write retries emulation) + for (ui32 i = 0; i < 4; ++i) { + auto data = MakeTestBlob({portion.first + i, portion.second + i}, ydbSchema); + UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT); + + auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, "0", data); + UNIT_ASSERT(writeIdOpt); + if (!i) { + writeId = *writeIdOpt; + } + UNIT_ASSERT_EQUAL(*writeIdOpt, *writeId); + } + + ProposeCommit(runtime, sender, ++txId, {*writeId}); + TSet<ui64> txIds = {txId}; + PlanCommit(runtime, sender, planStep, txIds); + + // read + TAutoPtr<IEventHandle> handle; + { + ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, + new TEvColumnShard::TEvRead(sender, 0, planStep, txId, tableId)); + auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); + UNIT_ASSERT(event); + + auto& resRead = Proto(event); + UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); + UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), 0); + UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); + UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); + UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); + UNIT_ASSERT(resRead.GetData().size() > 0); + + auto data = resRead.GetData(); + auto meta = resRead.GetMeta(); + UNIT_ASSERT(CheckColumns(data, meta, TTestSchema::ExtractNames(ydbSchema), numRows)); + UNIT_ASSERT(DataHas(TVector<TString>{data}, meta.GetSchema(), portion, true)); + UNIT_ASSERT(DataHasOnly(TVector<TString>{data}, meta.GetSchema(), portion)); + } +} + void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString codec = "") { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -1597,6 +1666,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { Y_UNIT_TEST(WriteReadDuplicate) { TestWriteReadDup(); + TestWriteReadLongTxDup(); } Y_UNIT_TEST(WriteRead) { diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index 649b3fc6ca..4a463a5e74 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -105,7 +105,7 @@ private: TActorId SchemeCache; TActorId LeaderPipeCache; TDuration Timeout; - TInstant Deadline; + TInstant StartTime; TActorId TimeoutTimerActorId; bool WaitingResolveReply; bool Finished; @@ -174,7 +174,7 @@ public: {} void Bootstrap(const NActors::TActorContext& ctx) { - Deadline = AppData(ctx)->TimeProvider->Now() + Timeout; + StartTime = TAppData::TimeProvider->Now(); ResolveTable(GetTable(), ctx); } @@ -189,6 +189,10 @@ public: } protected: + TInstant Deadline() const { + return StartTime + Timeout; + } + const NSchemeCache::TSchemeCacheNavigate* GetResolveNameResult() const { return ResolveNamesResult.get(); } @@ -471,7 +475,9 @@ private: void HandleTimeout(const TActorContext& ctx) { ShardRepliesLeft.clear(); - return ReplyWithError(Ydb::StatusIds::TIMEOUT, "Request timed out", ctx); + return ReplyWithError(Ydb::StatusIds::TIMEOUT, TStringBuilder() << "Bulk upsert to table " << GetTable() + << " longTx " << LongTxId.ToString() + << " timed out, duration: " << (TAppData::TimeProvider->Now() - StartTime).Seconds() << " sec", ctx); } void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) { @@ -599,7 +605,7 @@ private: } LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Bulk upsert to table " << GetTable() - << " startint LongTx"); + << " starting LongTx"); // Begin Long Tx for writing a batch into OLAP table TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()); @@ -716,7 +722,8 @@ private: Y_VERIFY(Batch); TBase::Become(&TThis::StateWaitWriteBatchResult); - TString dedupId = LongTxId.ToString(); // TODO: is this a proper dedup_id? + ui32 batchNo = 0; + TString dedupId = ToString(batchNo); NGRpcService::DoLongTxWriteSameMailbox(ctx, ctx.SelfID, LongTxId, dedupId, GetDatabase(), GetTable(), ResolveNamesResult, Batch, Issues); } @@ -907,7 +914,7 @@ private: if (!ev) { shardRequests[shardIdx].reset(new TEvDataShard::TEvUploadRowsRequest()); ev = shardRequests[shardIdx].get(); - ev->Record.SetCancelDeadlineMs(Deadline.MilliSeconds()); + ev->Record.SetCancelDeadlineMs(Deadline().MilliSeconds()); ev->Record.SetTableId(keyRange->TableId.PathId.LocalPathId); for (const auto& fd : KeyColumnPositions) { diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp index 1d8e5b1928..afab213815 100644 --- a/ydb/public/lib/ydb_cli/import/import.cpp +++ b/ydb/public/lib/ydb_cli/import/import.cpp @@ -51,10 +51,11 @@ TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand , TableClient(std::make_shared<NTable::TTableClient>(driver)) { UpsertSettings - .OperationTimeout(TDuration::Seconds(30)) - .ClientTimeout(TDuration::Seconds(35)); + .OperationTimeout(TDuration::Seconds(TImportFileSettings::OperationTimeoutSec)) + .ClientTimeout(TDuration::Seconds(TImportFileSettings::ClientTimeoutSec)); RetrySettings - .MaxRetries(100000).Verbose(rootConfig.IsVerbose()); + .MaxRetries(TImportFileSettings::MaxRetries) + .Verbose(rootConfig.IsVerbose()); } TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings) { @@ -86,7 +87,7 @@ TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath } // If the filename passed is empty, read from stdin, else from the file. - std::unique_ptr<TFileInput> fileInput = filePath.empty() ? nullptr + std::unique_ptr<TFileInput> fileInput = filePath.empty() ? nullptr : std::make_unique<TFileInput>(filePath, settings.FileBufferSize_); IInputStream& input = fileInput ? *fileInput : Cin; @@ -230,7 +231,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath return tableResult; const TType tableType = GetTableType(tableResult.GetTableDescription()); - const NYdb::EBinaryStringEncoding stringEncoding = + const NYdb::EBinaryStringEncoding stringEncoding = (settings.Format_==EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 : NYdb::EBinaryStringEncoding::Unicode; @@ -275,7 +276,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename, [[maybe_unused]]const TString& dbPath, [[maybe_unused]]const TImportFileSettings& settings) { #if defined (_WIN64) || defined (_WIN32) || defined (__WIN32__) return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not supported on Windows"); - #else + #else std::shared_ptr<arrow::io::ReadableFile> infile; arrow::Result<std::shared_ptr<arrow::io::ReadableFile>> fileResult = arrow::io::ReadableFile::Open(filename); if (!fileResult.ok()) { @@ -309,7 +310,7 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename } std::deque<TAsyncStatus> inFlightRequests; - + auto splitUpsertBatch = [this, &inFlightRequests, dbPath, settings](const std::shared_ptr<arrow::RecordBatch> &recordBatch){ std::vector<std::shared_ptr<arrow::RecordBatch>> slicedRecordBatches; std::deque<std::shared_ptr<arrow::RecordBatch>> batchesDeque; @@ -334,12 +335,12 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename } else { std::shared_ptr<arrow::RecordBatch> left = nextBatch->Slice(0, nextBatch->num_rows() / 2); - std::shared_ptr<arrow::RecordBatch> right = nextBatch->Slice(nextBatch->num_rows() / 2); + std::shared_ptr<arrow::RecordBatch> right = nextBatch->Slice(nextBatch->num_rows() / 2); batchesDeque.push_front(right); batchesDeque.push_front(left); } } - auto schema = recordBatch->schema(); + auto schema = recordBatch->schema(); TString strSchema = NYdb_cli::NArrow::SerializeSchema(*schema); for (size_t i = 0; i < slicedRecordBatches.size(); i++) { TString buffer = NYdb_cli::NArrow::SerializeBatchNoCompression(slicedRecordBatches[i]); @@ -375,7 +376,7 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename #endif } -inline +inline TAsyncStatus TImportFileClient::UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema) { auto upsert = [this, dbPath, buffer, strSchema](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus { return tableClient.BulkUpsert(dbPath, NTable::EDataFormat::ApacheArrow, buffer, strSchema, UpsertSettings) diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h index 3eba2c9751..f6290fac71 100644 --- a/ydb/public/lib/ydb_cli/import/import.h +++ b/ydb/public/lib/ydb_cli/import/import.h @@ -32,6 +32,9 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting static constexpr ui64 MaxBytesPerRequest = 8_MB; static constexpr const char * DefaultDelimiter = ","; + static constexpr ui32 OperationTimeoutSec = 5 * 60; + static constexpr ui32 ClientTimeoutSec = OperationTimeoutSec + 5; + static constexpr ui32 MaxRetries = 10000; // Allowed values: Csv, Tsv, JsonUnicode, JsonBase64. Default means Csv FLUENT_SETTING_DEFAULT(EOutputFormat, Format, EOutputFormat::Default); |