aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchertus <azuikov@ydb.tech>2023-01-25 20:22:15 +0300
committerchertus <azuikov@ydb.tech>2023-01-25 20:22:15 +0300
commit42cfc238c37e3c1b1cab50b733448e4e5c754702 (patch)
treec3b7134a488f9b2e3d880d3cddb97086b69ef803
parent060e8315fa63c835a89e4b704e9a8e5153c6e792 (diff)
downloadydb-42cfc238c37e3c1b1cab50b733448e4e5c754702.tar.gz
add retries to TLongTxWrite rpc
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.cpp96
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp19
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp8
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h1
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp70
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h19
-rw-r--r--ydb/public/lib/ydb_cli/import/import.cpp21
-rw-r--r--ydb/public/lib/ydb_cli/import/import.h3
8 files changed, 206 insertions, 31 deletions
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp
index fe034e2257..b50d99fe42 100644
--- a/ydb/core/grpc_services/rpc_long_tx.cpp
+++ b/ydb/core/grpc_services/rpc_long_tx.cpp
@@ -397,7 +397,22 @@ class TLongTxWriteBase : public TActorBootstrapped<TLongTxWriteImpl> {
using TBase = TActorBootstrapped<TLongTxWriteImpl>;
protected:
using TThis = typename TBase::TThis;
+
public:
+ struct TRetryData {
+ static const constexpr ui32 MaxRetriesPerShard = 10;
+ static const constexpr ui32 OverloadedDelayMs = 200;
+
+ ui64 TableId;
+ TString DedupId;
+ TString Data;
+ ui32 NumRetries;
+
+ static TDuration OverloadTimeout() {
+ return TDuration::MilliSeconds(OverloadedDelayMs);
+ }
+ };
+
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::GRPC_REQ;
}
@@ -423,10 +438,6 @@ public:
}
protected:
- void SetLongTxId(const TLongTxId& longTxId) {
- LongTxId = longTxId;
- }
-
void ProceedWithSchema(const NSchemeCache::TSchemeCacheNavigate& resp) {
NWilson::TProfileSpan pSpan = ActorSpan.BuildChildrenSpan("ProceedWithSchema");
if (resp.ErrorCount > 0) {
@@ -502,15 +513,45 @@ protected:
private:
void SendWriteRequest(ui64 shardId, ui64 tableId, const TString& dedupId, const TString& data) {
- WaitShards.insert(shardId);
+ TRetryData retry{
+ .TableId = tableId,
+ .DedupId = dedupId,
+ .Data = data,
+ .NumRetries = 0
+ };
+ WaitShards.emplace(shardId, std::move(retry));
SendToTablet(shardId, MakeHolder<TEvColumnShard::TEvWrite>(this->SelfId(), LongTxId, tableId, dedupId, data));
}
+ bool RetryWriteRequest(ui64 shardId, bool delayed = true) {
+ if (!WaitShards.count(shardId)) {
+ return false;
+ }
+
+ auto& retry = WaitShards[shardId];
+ if (retry.NumRetries < TRetryData::MaxRetriesPerShard) {
+ if (delayed) {
+ if (ShardsToRetry.empty()) {
+ TimeoutTimerActorId = CreateLongTimer(TRetryData::OverloadTimeout(),
+ new IEventHandle(this->SelfId(), this->SelfId(), new TEvents::TEvWakeup()));
+ }
+ ShardsToRetry.insert(shardId);
+ } else {
+ ++retry.NumRetries;
+ SendToTablet(shardId, MakeHolder<TEvColumnShard::TEvWrite>(this->SelfId(), LongTxId,
+ retry.TableId, retry.DedupId, retry.Data));
+ }
+ return true;
+ }
+ return false;
+ }
+
STFUNC(StateWrite) {
Y_UNUSED(ctx);
switch (ev->GetTypeRewrite()) {
hFunc(TEvColumnShard::TEvWriteResult, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
+ CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
}
}
@@ -552,9 +593,15 @@ private:
Y_VERIFY(WaitShards.count(shardId) || ShardsWrites.count(shardId));
auto status = msg->Record.GetStatus();
+ if (status == NKikimrTxColumnShard::OVERLOADED) {
+ if (RetryWriteRequest(shardId)) {
+ return;
+ }
+ }
if (status != NKikimrTxColumnShard::SUCCESS) {
auto ydbStatus = ConvertToYdbStatus(status);
- return ReplyError(ydbStatus, "Write error");
+ return ReplyError(ydbStatus,
+ TStringBuilder() << "Cannot write data into shard " << shardId << " in longTx " << LongTxId.ToString());
}
if (!WaitShards.count(shardId)) {
@@ -571,12 +618,31 @@ private:
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
NWilson::TProfileSpan pSpan(0, ActorSpan.GetTraceId(), "DeliveryProblem");
const auto* msg = ev->Get();
+ ui64 shardId = msg->TabletId;
+
+ if (!WaitShards.count(shardId)) {
+ return;
+ }
+ if (RetryWriteRequest(shardId)) {
+ return;
+ }
+
+ TString errMsg = TStringBuilder() << "Shard " << shardId << " is not available after "
+ << WaitShards[shardId].NumRetries << " retries";
if (msg->NotDelivered) {
- return ReplyError(Ydb::StatusIds::UNAVAILABLE, "Shard unavailable");
+ return ReplyError(Ydb::StatusIds::UNAVAILABLE, errMsg);
} else {
- return ReplyError(Ydb::StatusIds::UNDETERMINED, "Shard unavailable");
+ return ReplyError(Ydb::StatusIds::UNDETERMINED, errMsg);
+ }
+ }
+
+ void HandleTimeout(const TActorContext& /*ctx*/) {
+ TimeoutTimerActorId = {};
+ for (ui64 shardId : ShardsToRetry) {
+ RetryWriteRequest(shardId, false);
}
+ ShardsToRetry.clear();
}
private:
@@ -636,13 +702,15 @@ protected:
const TString DatabaseName;
const TString Path;
const TString DedupId;
-private:
TLongTxId LongTxId;
+private:
const TActorId LeaderPipeCache;
std::optional<NACLib::TUserToken> UserToken;
- THashSet<ui64> WaitShards;
+ THashMap<ui64, TRetryData> WaitShards;
THashMap<ui64, ui64> ShardsWrites;
+ THashSet<ui64> ShardsToRetry;
NWilson::TProfileSpan ActorSpan;
+ TActorId TimeoutTimerActorId;
};
@@ -669,11 +737,9 @@ public:
const auto* req = GetProtoRequest();
TString errMsg;
- TLongTxId longTxId;
- if (!longTxId.ParseString(req->tx_id(), &errMsg)) {
+ if (!LongTxId.ParseString(req->tx_id(), &errMsg)) {
return ReplyError(Ydb::StatusIds::BAD_REQUEST, errMsg);
}
- SetLongTxId(longTxId);
if (GetProtoRequest()->data().format() != Ydb::LongTx::Data::APACHE_ARROW) {
return ReplyError(Ydb::StatusIds::BAD_REQUEST, "Only APACHE_ARROW data format is supported");
@@ -829,7 +895,7 @@ class TLongTxReadRPC : public TActorBootstrapped<TLongTxReadRPC> {
using TBase = TActorBootstrapped<TLongTxReadRPC>;
private:
- static const constexpr ui64 MaxRetriesPerShard = 10;
+ static const constexpr ui32 MaxRetriesPerShard = 10;
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
@@ -970,7 +1036,7 @@ private:
ShardRetries[shard] = 0;
}
- ui64 retries = ++ShardRetries[shard];
+ ui32 retries = ++ShardRetries[shard];
if (retries > MaxRetriesPerShard) {
return ReplyError(Ydb::StatusIds::UNAVAILABLE, Sprintf("Failed to connect to shard %lu", shard));
}
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index b4a339c8a0..b2bfa0d18d 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -97,6 +97,8 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
Self->BlobManager->SaveBlobBatch(std::move(Ev->Get()->BlobBatch), blobManagerDb);
} else {
+ LOG_S_DEBUG("TTxWrite duplicate writeId " << writeId << " at tablet " << Self->TabletID());
+
// Return EResultStatus::SUCCESS for dups
Self->IncCounter(COUNTER_WRITE_DUPLICATE);
}
@@ -193,6 +195,23 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
TabletID(), metaShard, writeId, tableId, dedupId, NKikimrTxColumnShard::EResultStatus::OVERLOADED);
ctx.Send(ev->Get()->GetSource(), result.release());
} else {
+ if (record.HasLongTxId()) {
+ // TODO: multiple blobs in one longTx ({longTxId, dedupId} -> writeId)
+ auto longTxId = NLongTxService::TLongTxId::FromProto(record.GetLongTxId());
+ if (ui64 writeId = (ui64)HasLongTxWrite(longTxId)) {
+ LOG_S_DEBUG("Write (duplicate) into pathId " << tableId
+ << " longTx " << longTxId.ToString()
+ << " at tablet " << TabletID());
+
+ IncCounter(COUNTER_WRITE_DUPLICATE);
+
+ auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(
+ TabletID(), metaShard, writeId, tableId, dedupId, NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ ctx.Send(ev->Get()->GetSource(), result.release());
+ return;
+ }
+ }
+
LOG_S_DEBUG("Write (blob) " << data.size() << " bytes into pathId " << tableId
<< (writeId? (" writeId " + ToString(writeId)).c_str() : "")
<< " at tablet " << TabletID());
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index e3c3eca148..8a2698d929 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -247,6 +247,14 @@ bool TColumnShard::HaveOutdatedTxs() const {
return it->MaxStep <= step;
}
+TWriteId TColumnShard::HasLongTxWrite(const NLongTxService::TLongTxId& longTxId) {
+ auto it = LongTxWritesByUniqueId.find(longTxId.UniqueId);
+ if (it != LongTxWritesByUniqueId.end()) {
+ return (TWriteId)it->second->WriteId;
+ }
+ return (TWriteId)0;
+}
+
TWriteId TColumnShard::GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId) {
auto it = LongTxWritesByUniqueId.find(longTxId.UniqueId);
if (it != LongTxWritesByUniqueId.end()) {
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index dfbd8700d2..53317909e8 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -423,6 +423,7 @@ private:
return PrimaryIndex && PrimaryIndex->HasOverloadedGranules();
}
+ TWriteId HasLongTxWrite(const NLongTxService::TLongTxId& longTxId);
TWriteId GetLongTxWrite(NIceDb::TNiceDb& db, const NLongTxService::TLongTxId& longTxId);
void AddLongTxWrite(TWriteId writeId, ui64 txId);
void LoadLongTxWrite(TWriteId writeId, const NLongTxService::TLongTxId& longTxId);
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 408efc1422..6ad34c2e27 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -478,6 +478,75 @@ void TestWriteReadDup() {
}
}
+void TestWriteReadLongTxDup() {
+ TTestBasicRuntime runtime;
+ TTester::Setup(runtime);
+
+ TActorId sender = runtime.AllocateEdgeActor();
+ CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
+
+ TDispatchOptions options;
+ options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot));
+ runtime.DispatchEvents(options);
+
+ //
+
+ ui64 tableId = 1;
+ auto ydbSchema = TTestSchema::YdbSchema();
+ SetupSchema(runtime, sender, tableId, ydbSchema);
+
+ constexpr ui32 numRows = 10;
+ std::pair<ui64, ui64> portion = {10, 10 + numRows};
+
+ NLongTxService::TLongTxId longTxId;
+ UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));
+
+ ui64 txId = 0;
+ ui64 planStep = 100;
+ std::optional<ui64> writeId;
+
+ // Only the first blob with dedup pair {longTx, dedupId} should be inserted
+ // Others should return OK (write retries emulation)
+ for (ui32 i = 0; i < 4; ++i) {
+ auto data = MakeTestBlob({portion.first + i, portion.second + i}, ydbSchema);
+ UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
+
+ auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, "0", data);
+ UNIT_ASSERT(writeIdOpt);
+ if (!i) {
+ writeId = *writeIdOpt;
+ }
+ UNIT_ASSERT_EQUAL(*writeIdOpt, *writeId);
+ }
+
+ ProposeCommit(runtime, sender, ++txId, {*writeId});
+ TSet<ui64> txIds = {txId};
+ PlanCommit(runtime, sender, planStep, txIds);
+
+ // read
+ TAutoPtr<IEventHandle> handle;
+ {
+ ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender,
+ new TEvColumnShard::TEvRead(sender, 0, planStep, txId, tableId));
+ auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle);
+ UNIT_ASSERT(event);
+
+ auto& resRead = Proto(event);
+ UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0);
+ UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), 0);
+ UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
+ UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0);
+ UNIT_ASSERT_EQUAL(resRead.GetFinished(), true);
+ UNIT_ASSERT(resRead.GetData().size() > 0);
+
+ auto data = resRead.GetData();
+ auto meta = resRead.GetMeta();
+ UNIT_ASSERT(CheckColumns(data, meta, TTestSchema::ExtractNames(ydbSchema), numRows));
+ UNIT_ASSERT(DataHas(TVector<TString>{data}, meta.GetSchema(), portion, true));
+ UNIT_ASSERT(DataHasOnly(TVector<TString>{data}, meta.GetSchema(), portion));
+ }
+}
+
void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString codec = "") {
TTestBasicRuntime runtime;
TTester::Setup(runtime);
@@ -1597,6 +1666,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
Y_UNIT_TEST(WriteReadDuplicate) {
TestWriteReadDup();
+ TestWriteReadLongTxDup();
}
Y_UNIT_TEST(WriteRead) {
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index 649b3fc6ca..4a463a5e74 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -105,7 +105,7 @@ private:
TActorId SchemeCache;
TActorId LeaderPipeCache;
TDuration Timeout;
- TInstant Deadline;
+ TInstant StartTime;
TActorId TimeoutTimerActorId;
bool WaitingResolveReply;
bool Finished;
@@ -174,7 +174,7 @@ public:
{}
void Bootstrap(const NActors::TActorContext& ctx) {
- Deadline = AppData(ctx)->TimeProvider->Now() + Timeout;
+ StartTime = TAppData::TimeProvider->Now();
ResolveTable(GetTable(), ctx);
}
@@ -189,6 +189,10 @@ public:
}
protected:
+ TInstant Deadline() const {
+ return StartTime + Timeout;
+ }
+
const NSchemeCache::TSchemeCacheNavigate* GetResolveNameResult() const {
return ResolveNamesResult.get();
}
@@ -471,7 +475,9 @@ private:
void HandleTimeout(const TActorContext& ctx) {
ShardRepliesLeft.clear();
- return ReplyWithError(Ydb::StatusIds::TIMEOUT, "Request timed out", ctx);
+ return ReplyWithError(Ydb::StatusIds::TIMEOUT, TStringBuilder() << "Bulk upsert to table " << GetTable()
+ << " longTx " << LongTxId.ToString()
+ << " timed out, duration: " << (TAppData::TimeProvider->Now() - StartTime).Seconds() << " sec", ctx);
}
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev, const TActorContext& ctx) {
@@ -599,7 +605,7 @@ private:
}
LOG_DEBUG_S(ctx, NKikimrServices::MSGBUS_REQUEST, "Bulk upsert to table " << GetTable()
- << " startint LongTx");
+ << " starting LongTx");
// Begin Long Tx for writing a batch into OLAP table
TActorId longTxServiceId = NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId());
@@ -716,7 +722,8 @@ private:
Y_VERIFY(Batch);
TBase::Become(&TThis::StateWaitWriteBatchResult);
- TString dedupId = LongTxId.ToString(); // TODO: is this a proper dedup_id?
+ ui32 batchNo = 0;
+ TString dedupId = ToString(batchNo);
NGRpcService::DoLongTxWriteSameMailbox(ctx, ctx.SelfID, LongTxId, dedupId,
GetDatabase(), GetTable(), ResolveNamesResult, Batch, Issues);
}
@@ -907,7 +914,7 @@ private:
if (!ev) {
shardRequests[shardIdx].reset(new TEvDataShard::TEvUploadRowsRequest());
ev = shardRequests[shardIdx].get();
- ev->Record.SetCancelDeadlineMs(Deadline.MilliSeconds());
+ ev->Record.SetCancelDeadlineMs(Deadline().MilliSeconds());
ev->Record.SetTableId(keyRange->TableId.PathId.LocalPathId);
for (const auto& fd : KeyColumnPositions) {
diff --git a/ydb/public/lib/ydb_cli/import/import.cpp b/ydb/public/lib/ydb_cli/import/import.cpp
index 1d8e5b1928..afab213815 100644
--- a/ydb/public/lib/ydb_cli/import/import.cpp
+++ b/ydb/public/lib/ydb_cli/import/import.cpp
@@ -51,10 +51,11 @@ TImportFileClient::TImportFileClient(const TDriver& driver, const TClientCommand
, TableClient(std::make_shared<NTable::TTableClient>(driver))
{
UpsertSettings
- .OperationTimeout(TDuration::Seconds(30))
- .ClientTimeout(TDuration::Seconds(35));
+ .OperationTimeout(TDuration::Seconds(TImportFileSettings::OperationTimeoutSec))
+ .ClientTimeout(TDuration::Seconds(TImportFileSettings::ClientTimeoutSec));
RetrySettings
- .MaxRetries(100000).Verbose(rootConfig.IsVerbose());
+ .MaxRetries(TImportFileSettings::MaxRetries)
+ .Verbose(rootConfig.IsVerbose());
}
TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath, const TImportFileSettings& settings) {
@@ -86,7 +87,7 @@ TStatus TImportFileClient::Import(const TString& filePath, const TString& dbPath
}
// If the filename passed is empty, read from stdin, else from the file.
- std::unique_ptr<TFileInput> fileInput = filePath.empty() ? nullptr
+ std::unique_ptr<TFileInput> fileInput = filePath.empty() ? nullptr
: std::make_unique<TFileInput>(filePath, settings.FileBufferSize_);
IInputStream& input = fileInput ? *fileInput : Cin;
@@ -230,7 +231,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath
return tableResult;
const TType tableType = GetTableType(tableResult.GetTableDescription());
- const NYdb::EBinaryStringEncoding stringEncoding =
+ const NYdb::EBinaryStringEncoding stringEncoding =
(settings.Format_==EOutputFormat::JsonBase64) ? NYdb::EBinaryStringEncoding::Base64 :
NYdb::EBinaryStringEncoding::Unicode;
@@ -275,7 +276,7 @@ TStatus TImportFileClient::UpsertJson(IInputStream& input, const TString& dbPath
TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename, [[maybe_unused]]const TString& dbPath, [[maybe_unused]]const TImportFileSettings& settings) {
#if defined (_WIN64) || defined (_WIN32) || defined (__WIN32__)
return MakeStatus(EStatus::BAD_REQUEST, TStringBuilder() << "Not supported on Windows");
- #else
+ #else
std::shared_ptr<arrow::io::ReadableFile> infile;
arrow::Result<std::shared_ptr<arrow::io::ReadableFile>> fileResult = arrow::io::ReadableFile::Open(filename);
if (!fileResult.ok()) {
@@ -309,7 +310,7 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename
}
std::deque<TAsyncStatus> inFlightRequests;
-
+
auto splitUpsertBatch = [this, &inFlightRequests, dbPath, settings](const std::shared_ptr<arrow::RecordBatch> &recordBatch){
std::vector<std::shared_ptr<arrow::RecordBatch>> slicedRecordBatches;
std::deque<std::shared_ptr<arrow::RecordBatch>> batchesDeque;
@@ -334,12 +335,12 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename
}
else {
std::shared_ptr<arrow::RecordBatch> left = nextBatch->Slice(0, nextBatch->num_rows() / 2);
- std::shared_ptr<arrow::RecordBatch> right = nextBatch->Slice(nextBatch->num_rows() / 2);
+ std::shared_ptr<arrow::RecordBatch> right = nextBatch->Slice(nextBatch->num_rows() / 2);
batchesDeque.push_front(right);
batchesDeque.push_front(left);
}
}
- auto schema = recordBatch->schema();
+ auto schema = recordBatch->schema();
TString strSchema = NYdb_cli::NArrow::SerializeSchema(*schema);
for (size_t i = 0; i < slicedRecordBatches.size(); i++) {
TString buffer = NYdb_cli::NArrow::SerializeBatchNoCompression(slicedRecordBatches[i]);
@@ -375,7 +376,7 @@ TStatus TImportFileClient::UpsertParquet([[maybe_unused]]const TString& filename
#endif
}
-inline
+inline
TAsyncStatus TImportFileClient::UpsertParquetBuffer(const TString& dbPath, const TString& buffer, const TString& strSchema) {
auto upsert = [this, dbPath, buffer, strSchema](NYdb::NTable::TTableClient& tableClient) -> TAsyncStatus {
return tableClient.BulkUpsert(dbPath, NTable::EDataFormat::ApacheArrow, buffer, strSchema, UpsertSettings)
diff --git a/ydb/public/lib/ydb_cli/import/import.h b/ydb/public/lib/ydb_cli/import/import.h
index 3eba2c9751..f6290fac71 100644
--- a/ydb/public/lib/ydb_cli/import/import.h
+++ b/ydb/public/lib/ydb_cli/import/import.h
@@ -32,6 +32,9 @@ struct TImportFileSettings : public TOperationRequestSettings<TImportFileSetting
static constexpr ui64 MaxBytesPerRequest = 8_MB;
static constexpr const char * DefaultDelimiter = ",";
+ static constexpr ui32 OperationTimeoutSec = 5 * 60;
+ static constexpr ui32 ClientTimeoutSec = OperationTimeoutSec + 5;
+ static constexpr ui32 MaxRetries = 10000;
// Allowed values: Csv, Tsv, JsonUnicode, JsonBase64. Default means Csv
FLUENT_SETTING_DEFAULT(EOutputFormat, Format, EOutputFormat::Default);