aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorarcadia-devtools <arcadia-devtools@yandex-team.ru>2022-05-12 16:52:45 +0300
committerVitalii Gridnev <gvit@qavm-ff2ff183.qemu>2022-05-12 20:00:35 +0300
commit826c7eacd043881b3e5e60fbbfe7ca861a6bcffd (patch)
treeb4cd400526efd5c9234a5c090feca5488075ab02
parent5625647c87521120ca5a1f7d24288caa83cd1804 (diff)
downloadydb-826c7eacd043881b3e5e60fbbfe7ca861a6bcffd.tar.gz
intermediate changes
ref:a6dd4541cd4224304d6eeff974067ff591bc40a7
-rw-r--r--build/rules/autocheck.blacklist4
-rw-r--r--library/cpp/yt/memory/ref_counted-inl.h6
-rw-r--r--ydb/core/persqueue/partition.cpp43
-rw-r--r--ydb/core/persqueue/partition.h4
-rw-r--r--ydb/core/tx/datashard/datashard_ut_erase_rows.cpp12
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp18
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp8
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp16
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.cpp2
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h1
-rw-r--r--ydb/core/tx/schemeshard/ut_base.cpp148
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.cpp27
-rw-r--r--ydb/core/tx/schemeshard/ut_helpers/helpers.h4
-rw-r--r--ydb/library/dynumber/dynumber.cpp19
-rw-r--r--ydb/library/dynumber/ut/dynumber_ut.cpp4
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp33
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp12
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp26
-rw-r--r--ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h1
-rw-r--r--ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp1
-rw-r--r--ydb/services/datastreams/datastreams_ut.cpp11
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.cpp19
-rw-r--r--ydb/services/persqueue_v1/grpc_pq_schema.h2
-rw-r--r--ydb/tests/functional/dynumber/test_dynumber.py28
34 files changed, 446 insertions, 155 deletions
diff --git a/build/rules/autocheck.blacklist b/build/rules/autocheck.blacklist
index e29e5db441..5d5ccf8edd 100644
--- a/build/rules/autocheck.blacklist
+++ b/build/rules/autocheck.blacklist
@@ -1329,7 +1329,6 @@ classifieds/autoru-frontend-sitemap
classifieds/beggar-banker
classifieds/infra/goLB
classifieds/infra/shiva-conf
-haas/einebox
classifieds/nginx-tests
practicum/services/jrunner
classifieds/sphinx-configs
@@ -1404,14 +1403,12 @@ classifieds/cabinet
bunker/nodejs-db-connector
bunker/bunker-queue
bunker/database
-haas/factory-packages
adv/pcode/mobileadssdk/android/library
classifieds/realty/feedloader
classifieds/vos2
classifieds/skypper
admins/icecream/ice
admins/icecream/backend
-haas/infra/graphite-sender
noc/packages/telegraf-noc-conf
admins/icecream/node-icecream
adv/pcode/mobileadssdk/sdk/ios/library
@@ -1420,7 +1417,6 @@ trunk/arcadia/crm/space/frontend_static
crm/space/frontend_static
mail/java/furita
toolbox/tunneler
-samsara/samsara-attachments
education/schoolbook/services/gauss
education/schoolbook/packages/accio
adv/pcode/mobileadssdk/tests/frankenstein
diff --git a/library/cpp/yt/memory/ref_counted-inl.h b/library/cpp/yt/memory/ref_counted-inl.h
index e6d64fec18..b41f64e803 100644
--- a/library/cpp/yt/memory/ref_counted-inl.h
+++ b/library/cpp/yt/memory/ref_counted-inl.h
@@ -66,7 +66,7 @@ struct TMemoryReleaser<T, std::enable_if_t<T::EnableHazard>>
Y_FORCE_INLINE int TRefCounter::GetRefCount() const noexcept
{
- return StrongCount_.load(std::memory_order_relaxed);
+ return StrongCount_.load(std::memory_order_acquire);
}
Y_FORCE_INLINE void TRefCounter::Ref() const noexcept
@@ -96,7 +96,7 @@ Y_FORCE_INLINE bool TRefCounter::Unref() const
auto oldStrongCount = StrongCount_.fetch_sub(1, std::memory_order_release);
YT_ASSERT(oldStrongCount > 0);
if (oldStrongCount == 1) {
- StrongCount_.load(std::memory_order_acquire);
+ std::atomic_thread_fence(std::memory_order_acquire);
return true;
} else {
return false;
@@ -119,7 +119,7 @@ Y_FORCE_INLINE bool TRefCounter::WeakUnref() const
auto oldWeakCount = WeakCount_.fetch_sub(1, std::memory_order_release);
YT_ASSERT(oldWeakCount > 0);
if (oldWeakCount == 1) {
- WeakCount_.load(std::memory_order_acquire);
+ std::atomic_thread_fence(std::memory_order_acquire);
return true;
} else {
return false;
diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp
index ae0e5b7a92..ebb8316f62 100644
--- a/ydb/core/persqueue/partition.cpp
+++ b/ydb/core/persqueue/partition.cpp
@@ -1038,7 +1038,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
WriteTimestampEstimate = now;
THolder <TEvKeyValue::TEvRequest> request = MakeHolder<TEvKeyValue::TEvRequest>();
- bool haveChanges = DropOldStuff(request.Get(), false, ctx);
+ bool haveChanges = CleanUp(request.Get(), false, ctx);
if (DiskIsFull) {
AddCheckDiskRequest(request.Get(), Config.GetPartitionConfig().GetNumChannels());
haveChanges = true;
@@ -1073,29 +1073,30 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
}
-bool TPartition::DropOldStuff(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx) {
- bool haveChanges = false;
- if (DropOldData(request, hasWrites, ctx))
- haveChanges = true;
- LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << request->Record.CmdDeleteRangeSize() << " items to delete old stuff");
- if (SourceIdStorage.DropOldSourceIds(request, ctx.Now(), StartOffset, Partition, Config.GetPartitionConfig())) {
- haveChanges = true;
+bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx) {
+ bool haveChanges = CleanUpBlobs(request, hasWrites, ctx);
+ LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " <<
+ request->Record.CmdDeleteRangeSize() << " items to delete old stuff");
+
+ haveChanges |= SourceIdStorage.DropOldSourceIds(request, ctx.Now(), StartOffset, Partition,
+ Config.GetPartitionConfig());
+ if (haveChanges) {
SourceIdStorage.MarkOwnersForDeletedSourceId(Owners);
}
- LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << request->Record.CmdDeleteRangeSize() << " items to delete all stuff");
+ LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " <<
+ request->Record.CmdDeleteRangeSize() << " items to delete all stuff");
LOG_TRACE(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Delete command " << request->ToString());
+
return haveChanges;
}
-
-bool TPartition::DropOldData(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx) {
- if (StartOffset == EndOffset)
- return false;
- if (DataKeysBody.size() <= 1)
+bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx) {
+ if (StartOffset == EndOffset || DataKeysBody.size() <= 1)
return false;
+ const auto& partConfig = Config.GetPartitionConfig();
ui64 minOffset = EndOffset;
- for (const auto& importantClientId : Config.GetPartitionConfig().GetImportantClientId()) {
+ for (const auto& importantClientId : partConfig.GetImportantClientId()) {
TUserInfo* userInfo = UsersInfoStorage.GetIfExists(importantClientId);
ui64 curOffset = StartOffset;
if (userInfo && userInfo->Offset >= 0) //-1 means no offset
@@ -1122,13 +1123,17 @@ bool TPartition::DropOldData(TEvKeyValue::TEvRequest *request, bool hasWrites, c
Y_VERIFY(!DataKeysBody.empty());
endOffset = DataKeysBody.front().Key.GetOffset();
- if (DataKeysBody.front().Key.GetPartNo() > 0) ++endOffset;
-
+ if (DataKeysBody.front().Key.GetPartNo() > 0) {
+ ++endOffset;
+ }
}
TDataKey lastKey = HeadKeys.empty() ? DataKeysBody.back() : HeadKeys.back();
- if (!hasWrites && ctx.Now() >= lastKey.Timestamp + TDuration::Seconds(Config.GetPartitionConfig().GetLifetimeSeconds()) && minOffset == EndOffset && false) { // disable drop of all data
+ if (!hasWrites &&
+ ctx.Now() >= lastKey.Timestamp + TDuration::Seconds(Config.GetPartitionConfig().GetLifetimeSeconds()) &&
+ minOffset == EndOffset &&
+ false) { // disable drop of all data
Y_VERIFY(!HeadKeys.empty() || !DataKeysBody.empty());
Y_VERIFY(CompactedKeys.empty());
@@ -4714,7 +4719,7 @@ void TPartition::HandleWrites(const TActorContext& ctx)
} else {
haveData = ProcessWrites(request.Get(), ctx);
}
- bool haveDrop = DropOldStuff(request.Get(), haveData, ctx);
+ bool haveDrop = CleanUp(request.Get(), haveData, ctx);
ProcessReserveRequests(ctx);
diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h
index 0b505416ba..0cb644ac12 100644
--- a/ydb/core/persqueue/partition.h
+++ b/ydb/core/persqueue/partition.h
@@ -159,7 +159,7 @@ private:
void HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx);
void Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx);
- bool DropOldStuff(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx);
+ bool CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx);
//will fill sourceIds, request and NewHead
//returns true if head is compacted
@@ -372,7 +372,7 @@ private:
};
}
- bool DropOldData(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx);
+ bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx);
std::pair<TKey, ui32> Compact(const TKey& key, const ui32 size, bool headCleared);
void HandleWrites(const TActorContext& ctx);
diff --git a/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp b/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp
index dcafb2e92f..9ce091e22b 100644
--- a/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp
@@ -560,8 +560,8 @@ UPSERT INTO `/Root/table-1` (key, value) VALUES
(6, CAST("636249600000" As DyNumber)),
(7, NULL);
)", R"(
-key = 5, value = .190244160e10
-key = 6, value = .63624960000e12
+key = 5, value = .19024416e10
+key = 6, value = .6362496e12
key = 7, value = (empty maybe)
)", WithMvcc);
}
@@ -576,8 +576,8 @@ UPSERT INTO `/Root/table-1` (key, value) VALUES
(4, CAST("636249600000000" As DyNumber)),
(5, NULL);
)", R"(
-key = 3, value = .1902441600000e13
-key = 4, value = .636249600000000e15
+key = 3, value = .19024416e13
+key = 4, value = .6362496e15
key = 5, value = (empty maybe)
)", WithMvcc);
}
@@ -595,7 +595,7 @@ UPSERT INTO `/Root/table-1` (key, value) VALUES
(7, CAST("9.9999999999999999999999999999999999999E+125" As DyNumber)),
(8, NULL);
)", R"(
-key = 6, value = .190244160000000e16
+key = 6, value = .19024416e16
key = 7, value = .99999999999999999999999999999999999999e126
key = 8, value = (empty maybe)
)", WithMvcc);
@@ -610,7 +610,7 @@ UPSERT INTO `/Root/table-1` (key, value) VALUES
(3, CAST("1902441600000000000" As DyNumber)),
(4, NULL);
)", R"(
-key = 3, value = .1902441600000000000e19
+key = 3, value = .19024416e19
key = 4, value = (empty maybe)
)", WithMvcc);
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp
index ffe49f67e7..d6e119676f 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp
@@ -113,10 +113,12 @@ public:
++parentDir->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir);
context.SS->ClearDescribePathCaches(parentDir);
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
-
context.SS->ClearDescribePathCaches(path);
- context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ }
context.OnComplete.DoneOperation(OperationId);
@@ -286,10 +288,12 @@ public:
++parentDir.Base()->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir.Base());
context.SS->ClearDescribePathCaches(parentDir.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
-
context.SS->ClearDescribePathCaches(path.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+ }
State = NextState();
SetState(SelectStateFunc(State));
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp
index 1a41593ef8..d76e89c4f8 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp
@@ -117,10 +117,12 @@ public:
++parentDir->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir);
context.SS->ClearDescribePathCaches(parentDir);
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
-
context.SS->ClearDescribePathCaches(path);
- context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ }
context.OnComplete.DoneOperation(OperationId);
@@ -335,10 +337,12 @@ THolder<TProposeResponse> TDropFileStore::Propose(
++parentDir.Base()->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir.Base());
context.SS->ClearDescribePathCaches(parentDir.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
-
context.SS->ClearDescribePathCaches(path.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+ }
State = NextState();
SetState(SelectStateFunc(State));
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp
index 0818da7209..d471698590 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp
@@ -43,12 +43,12 @@ void DropPath(NIceDb::TNiceDb& db, TOperationContext& context,
context.SS->PersistPathDirAlterVersion(db, parentDir.Base());
context.SS->ClearDescribePathCaches(parentDir.Base());
- context.OnComplete.PublishToSchemeBoard(operationId, parentDir->PathId);
-
context.SS->ClearDescribePathCaches(path.Base());
- context.OnComplete.PublishToSchemeBoard(operationId, path->PathId);
-
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(operationId, parentDir->PathId);
+ context.OnComplete.PublishToSchemeBoard(operationId, path->PathId);
+ }
}
class TPropose: public TSubOperationState {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp
index 776814e95b..947ca40808 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp
@@ -108,10 +108,12 @@ public:
++parentDir->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir);
context.SS->ClearDescribePathCaches(parentDir);
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
-
context.SS->ClearDescribePathCaches(path);
- context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ }
context.OnComplete.DoneOperation(OperationId);
@@ -274,10 +276,12 @@ public:
++parentDir.Base()->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir.Base());
context.SS->ClearDescribePathCaches(parentDir.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
-
context.SS->ClearDescribePathCaches(path.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+ }
State = NextState();
SetState(SelectStateFunc(State));
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp
index 9567113545..283607ef70 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp
@@ -102,10 +102,12 @@ public:
++parentDir->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir);
context.SS->ClearDescribePathCaches(parentDir);
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
-
context.SS->ClearDescribePathCaches(path);
- context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ }
context.SS->ChangeTxState(db, OperationId, TTxState::ProposedWaitParts);
return true;
@@ -391,10 +393,12 @@ public:
++parent.Base()->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parent.Base());
context.SS->ClearDescribePathCaches(parent.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, parent.Base()->PathId);
-
context.SS->ClearDescribePathCaches(path.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parent.Base()->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+ }
State = NextState();
SetState(SelectStateFunc(State));
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
index 0370676572..4ee75f39e4 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp
@@ -141,10 +141,12 @@ public:
++parentDir->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir);
context.SS->ClearDescribePathCaches(parentDir);
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
-
context.SS->ClearDescribePathCaches(path);
- context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ }
context.SS->ChangeTxState(db, OperationId, TTxState::ProposedWaitParts);
return true;
@@ -451,10 +453,12 @@ public:
++parent.Base()->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parent.Base());
context.SS->ClearDescribePathCaches(parent.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, parent.Base()->PathId);
-
context.SS->ClearDescribePathCaches(path.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parent.Base()->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+ }
State = NextState();
SetState(SelectStateFunc(State));
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
index a718dbaf91..a85a65dca2 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp
@@ -238,10 +238,12 @@ public:
++parentDir->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir);
context.SS->ClearDescribePathCaches(parentDir);
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
-
context.SS->ClearDescribePathCaches(path);
- context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ }
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
context.OnComplete.ActivateTx(OperationId);
@@ -491,10 +493,12 @@ public:
++parentDir.Base()->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir.Base());
context.SS->ClearDescribePathCaches(parentDir.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
-
context.SS->ClearDescribePathCaches(path.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+ }
State = NextState();
SetState(SelectStateFunc(State));
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp
index a1693c8a35..f6acf0a4d8 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp
@@ -166,12 +166,13 @@ public:
++parentPath->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentPath);
-
context.SS->ClearDescribePathCaches(parentPath);
- context.OnComplete.PublishToSchemeBoard(OperationId, parentPath->PathId);
-
context.SS->ClearDescribePathCaches(path);
- context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentPath->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ }
context.SS->ChangeTxState(db, OperationId, TTxState::Done);
return true;
@@ -327,12 +328,13 @@ public:
++parentPath->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentPath.Base());
-
context.SS->ClearDescribePathCaches(parentPath.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, parentPath->PathId);
-
context.SS->ClearDescribePathCaches(path.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, path->PathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentPath->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, path->PathId);
+ }
context.OnComplete.ActivateTx(OperationId);
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp
index 3f0a2ca03e..16e1cad401 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp
@@ -180,10 +180,12 @@ public:
++parentDir->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir);
context.SS->ClearDescribePathCaches(parentDir);
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
-
context.SS->ClearDescribePathCaches(path);
- context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ }
context.SS->PersistSequenceRemove(db, pathId);
@@ -392,10 +394,12 @@ public:
++parent->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parent.Base());
context.SS->ClearDescribePathCaches(parent.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, parent->PathId);
-
context.SS->ClearDescribePathCaches(path.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, path->PathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parent->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, path->PathId);
+ }
State = NextState();
SetState(SelectStateFunc(State));
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp
index 2d4db56a29..71b3042e6b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp
@@ -93,10 +93,12 @@ public:
++parentDir->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir);
context.SS->ClearDescribePathCaches(parentDir);
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
-
context.SS->ClearDescribePathCaches(path);
- context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ }
context.SS->ChangeTxState(db, OperationId, TTxState::ProposedDeleteParts);
return true;
@@ -258,10 +260,12 @@ public:
++parentDir.Base()->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir.Base());
context.SS->ClearDescribePathCaches(parentDir.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
-
context.SS->ClearDescribePathCaches(path.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+ }
State = NextState();
SetState(SelectStateFunc(State));
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp
index eff1c9d8cc..c558b21fbc 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp
@@ -44,10 +44,12 @@ void DropPath(NIceDb::TNiceDb& db,
context.SS->PersistPathDirAlterVersion(db, parentDir.Base());
context.SS->ClearDescribePathCaches(parentDir.Base());
- context.OnComplete.PublishToSchemeBoard(operationId, parentDir->PathId);
-
context.SS->ClearDescribePathCaches(path.Base());
- context.OnComplete.PublishToSchemeBoard(operationId, path->PathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(operationId, parentDir->PathId);
+ context.OnComplete.PublishToSchemeBoard(operationId, path->PathId);
+ }
}
class TDropParts: public TSubOperationState {
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp
index 9f4d14e7fd..3652b0b24c 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp
@@ -89,10 +89,12 @@ public:
++parentDir->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir);
context.SS->ClearDescribePathCaches(parentDir);
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
- for (const TPathId pathId : pathes) {
- context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
+ for (const TPathId pathId : pathes) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ }
}
context.SS->ChangeTxState(db, OperationId, TTxState::ProposedDeleteParts);
@@ -304,10 +306,12 @@ public:
++parentDir.Base()->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir.Base());
context.SS->ClearDescribePathCaches(parentDir.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
-
context.SS->ClearDescribePathCaches(path.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+ }
State = NextState();
SetState(SelectStateFunc(State));
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp
index cde36e35e8..3701119300 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp
@@ -96,10 +96,12 @@ public:
++parentDir.Base()->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir.Base());
context.SS->ClearDescribePathCaches(parentDir.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
-
context.SS->ClearDescribePathCaches(path.Base());
- context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId);
+ }
return result;
}
@@ -165,10 +167,12 @@ public:
++parentDir->DirAlterVersion;
context.SS->PersistPathDirAlterVersion(db, parentDir);
context.SS->ClearDescribePathCaches(parentDir);
- context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
-
context.SS->ClearDescribePathCaches(path);
- context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+
+ if (!context.SS->DisablePublicationsOfDropping) {
+ context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId);
+ context.OnComplete.PublishToSchemeBoard(OperationId, pathId);
+ }
context.SS->TabletCounters->Simple()[COUNTER_USER_ATTRIBUTES_COUNT].Sub(path->UserAttrs->Size());
context.SS->PersistUserAttributes(db, path->PathId, path->UserAttrs, nullptr);
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
index a16ded6c5d..a117a6690a 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp
@@ -3668,6 +3668,7 @@ TSchemeShard::TSchemeShard(const TActorId &tablet, TTabletStorageInfo *info)
, TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
, AllowConditionalEraseOperations(1, 0, 1)
, AllowServerlessStorageBilling(0, 0, 1)
+ , DisablePublicationsOfDropping(0, 0, 1)
, SplitSettings()
, IsReadOnlyMode(false)
, ParentDomainLink(this)
@@ -3797,6 +3798,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
}
appData->Icb->RegisterSharedControl(AllowConditionalEraseOperations, "SchemeShard_AllowConditionalEraseOperations");
+ appData->Icb->RegisterSharedControl(DisablePublicationsOfDropping, "SchemeShard_DisablePublicationsOfDropping");
AllowDataColumnForIndexTable = appData->FeatureFlags.GetEnableDataColumnForIndexTable();
appData->Icb->RegisterSharedControl(AllowDataColumnForIndexTable, "SchemeShard_AllowDataColumnForIndexTable");
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index 69577641d4..0eba6b3659 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -114,6 +114,7 @@ public:
TControlWrapper AllowConditionalEraseOperations;
TControlWrapper AllowServerlessStorageBilling;
+ TControlWrapper DisablePublicationsOfDropping;
TSplitSettings SplitSettings;
diff --git a/ydb/core/tx/schemeshard/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base.cpp
index 9b135fc880..28392faadd 100644
--- a/ydb/core/tx/schemeshard/ut_base.cpp
+++ b/ydb/core/tx/schemeshard/ut_base.cpp
@@ -10075,4 +10075,152 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
NLs::MinPartitionsCountEqual(1),
NLs::MaxPartitionsCountEqual(100)});
}
+
+ template <typename TCreateFn, typename TDropFn>
+ void DisablePublicationsOfDropping(NSchemeCache::TSchemeCacheNavigate::EOp op, TCreateFn&& createFn, TDropFn&& dropFn) {
+ TTestBasicRuntime runtime;
+ TTestEnv env(runtime);
+ ui64 txId = 100;
+
+ // disable publications
+ {
+ TAtomic unused;
+ runtime.GetAppData().Icb->SetValue("SchemeShard_DisablePublicationsOfDropping", true, unused);
+ }
+
+ createFn(runtime, txId);
+ env.TestWaitNotification(runtime, txId);
+
+ {
+ auto nav = Navigate(runtime, "/MyRoot/Obj", op);
+ const auto& entry = nav->ResultSet.at(0);
+ UNIT_ASSERT_VALUES_EQUAL(entry.Status, NSchemeCache::TSchemeCacheNavigate::EStatus::Ok);
+ }
+
+ dropFn(runtime, txId);
+ env.TestWaitNotification(runtime, txId);
+
+ // still ok
+ {
+ auto nav = Navigate(runtime, "/MyRoot/Obj", op);
+ const auto& entry = nav->ResultSet.at(0);
+ UNIT_ASSERT_VALUES_EQUAL(entry.Status, NSchemeCache::TSchemeCacheNavigate::EStatus::Ok);
+ }
+
+ // check after reboot (should be removed in process of sync)
+ RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());
+
+ while (true) {
+ auto nav = Navigate(runtime, "/MyRoot/Obj", op);
+ const auto& entry = nav->ResultSet.at(0);
+ if ((entry.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown)) {
+ break;
+ }
+
+ env.SimulateSleep(runtime, TDuration::MilliSeconds(100));
+ }
+
+ // enable publications
+ {
+ TAtomic unused;
+ runtime.GetAppData().Icb->SetValue("SchemeShard_DisablePublicationsOfDropping", false, unused);
+ }
+
+ createFn(runtime, txId);
+ env.TestWaitNotification(runtime, txId);
+
+ {
+ auto nav = Navigate(runtime, "/MyRoot/Obj", op);
+ const auto& entry = nav->ResultSet.at(0);
+ UNIT_ASSERT_VALUES_EQUAL(entry.Status, NSchemeCache::TSchemeCacheNavigate::EStatus::Ok);
+ }
+
+ dropFn(runtime, txId);
+ env.TestWaitNotification(runtime, txId);
+
+ {
+ auto nav = Navigate(runtime, "/MyRoot/Obj", op);
+ const auto& entry = nav->ResultSet.at(0);
+ UNIT_ASSERT_VALUES_EQUAL(entry.Status, NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown);
+ }
+ }
+
+ Y_UNIT_TEST(DisablePublicationsOfDropping_Dir) {
+ DisablePublicationsOfDropping(NSchemeCache::TSchemeCacheNavigate::EOp::OpPath,
+ [](TTestBasicRuntime& runtime, ui64& txId) {
+ return TestMkDir(runtime, ++txId, "/MyRoot", "Obj");
+ },
+ [](TTestBasicRuntime& runtime, ui64& txId) {
+ return TestRmDir(runtime, ++txId, "/MyRoot", "Obj");
+ }
+ );
+ }
+
+ Y_UNIT_TEST(DisablePublicationsOfDropping_Table) {
+ DisablePublicationsOfDropping(NSchemeCache::TSchemeCacheNavigate::EOp::OpTable,
+ [](TTestBasicRuntime& runtime, ui64& txId) {
+ return TestCreateTable(runtime, ++txId, "/MyRoot", R"(
+ Name: "Obj"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ )");
+ },
+ [](TTestBasicRuntime& runtime, ui64& txId) {
+ return TestDropTable(runtime, ++txId, "/MyRoot", "Obj");
+ }
+ );
+ }
+
+ Y_UNIT_TEST(DisablePublicationsOfDropping_IndexedTable) {
+ DisablePublicationsOfDropping(NSchemeCache::TSchemeCacheNavigate::EOp::OpTable,
+ [](TTestBasicRuntime& runtime, ui64& txId) {
+ return TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"(
+ TableDescription {
+ Name: "Obj"
+ Columns { Name: "key" Type: "Uint64" }
+ Columns { Name: "value" Type: "Utf8" }
+ KeyColumnNames: ["key"]
+ }
+ IndexDescription {
+ Name: "UserDefinedIndexByValue"
+ KeyColumnNames: ["value"]
+ }
+ )");
+ },
+ [](TTestBasicRuntime& runtime, ui64& txId) {
+ return TestDropTable(runtime, ++txId, "/MyRoot", "Obj");
+ }
+ );
+ }
+
+ Y_UNIT_TEST(DisablePublicationsOfDropping_Pq) {
+ DisablePublicationsOfDropping(NSchemeCache::TSchemeCacheNavigate::EOp::OpTopic,
+ [](TTestBasicRuntime& runtime, ui64& txId) {
+ return TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"(
+ Name: "Obj"
+ TotalGroupCount: 1
+ PartitionPerTablet: 1
+ PQTabletConfig: { PartitionConfig { LifetimeSeconds: 10 } }
+ )");
+ },
+ [](TTestBasicRuntime& runtime, ui64& txId) {
+ return TestDropPQGroup(runtime, ++txId, "/MyRoot", "Obj");
+ }
+ );
+ }
+
+ Y_UNIT_TEST(DisablePublicationsOfDropping_Solomon) {
+ DisablePublicationsOfDropping(NSchemeCache::TSchemeCacheNavigate::EOp::OpPath,
+ [](TTestBasicRuntime& runtime, ui64& txId) {
+ return TestCreateSolomon(runtime, ++txId, "/MyRoot", R"(
+ Name: "Obj"
+ PartitionCount: 1
+ )");
+ },
+ [](TTestBasicRuntime& runtime, ui64& txId) {
+ return TestDropSolomon(runtime, ++txId, "/MyRoot", "Obj");
+ }
+ );
+ }
}
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
index e87d0c6341..b2543b22f1 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp
@@ -294,6 +294,33 @@ namespace NSchemeShardUT_Private {
return record.DebugString();
}
+ THolder<NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TString& path,
+ NSchemeCache::TSchemeCacheNavigate::EOp op)
+ {
+ using TNavigate = NSchemeCache::TSchemeCacheNavigate;
+ using TEvRequest = TEvTxProxySchemeCache::TEvNavigateKeySet;
+ using TEvResponse = TEvTxProxySchemeCache::TEvNavigateKeySetResult;
+
+ const auto sender = runtime.AllocateEdgeActor();
+ auto request = MakeHolder<TNavigate>();
+ auto& entry = request->ResultSet.emplace_back();
+ entry.Path = SplitPath(path);
+ entry.RequestType = TNavigate::TEntry::ERequestType::ByPath;
+ entry.Operation = op;
+ entry.ShowPrivatePath = true;
+ runtime.Send(new IEventHandle(MakeSchemeCacheID(), sender, new TEvRequest(request.Release())));
+
+ auto ev = runtime.GrabEdgeEventRethrow<TEvResponse>(sender);
+ UNIT_ASSERT(ev);
+ UNIT_ASSERT(ev->Get());
+
+ auto* response = ev->Get()->Request.Release();
+ UNIT_ASSERT(response);
+ UNIT_ASSERT_VALUES_EQUAL(response->ResultSet.size(), 1);
+
+ return THolder(response);
+ }
+
TEvSchemeShard::TEvModifySchemeTransaction* CopyTableRequest(ui64 txId, const TString& dstPath, const TString& dstName, const TString& srcFullName, TApplyIf applyIf) {
auto evTx = new TEvSchemeShard::TEvModifySchemeTransaction(txId, TTestTxConfig::SchemeShard);
auto transaction = evTx->Record.AddTransaction();
diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
index 02e8ce5609..de4aaa0f42 100644
--- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h
+++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h
@@ -9,6 +9,7 @@
#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/testlib/minikql_compile.h>
#include <ydb/core/tx/datashard/datashard.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/schemeshard/schemeshard_build_index.h>
#include <ydb/core/tx/schemeshard/schemeshard_export.h>
#include <ydb/core/tx/schemeshard/schemeshard_import.h>
@@ -63,6 +64,9 @@ namespace NSchemeShardUT_Private {
TString TestLs(TTestActorRuntime& runtime, const TString& path, const NKikimrSchemeOp::TDescribeOptions& opts, NLs::TCheckFunc check = nullptr);
TString TestLsPathId(TTestActorRuntime& runtime, ui64 pathId, NLs::TCheckFunc check = nullptr);
+ THolder<NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TString& path,
+ NSchemeCache::TSchemeCacheNavigate::EOp op = NSchemeCache::TSchemeCacheNavigate::EOp::OpPath);
+
////////// modification results
void CheckExpected(const TVector<TEvSchemeShard::EStatus>& expected, TEvSchemeShard::EStatus result, const TString& reason);
void CheckExpected(const TVector<Ydb::StatusIds::StatusCode>& expected, Ydb::StatusIds::StatusCode result, const TString& reason);
diff --git a/ydb/library/dynumber/dynumber.cpp b/ydb/library/dynumber/dynumber.cpp
index af2d50f15a..9287ea01a5 100644
--- a/ydb/library/dynumber/dynumber.cpp
+++ b/ydb/library/dynumber/dynumber.cpp
@@ -5,6 +5,7 @@
#include <util/string/cast.h>
#include <util/string/builder.h>
#include <util/stream/buffer.h>
+#include <util/stream/format.h>
namespace NKikimr::NDyNumber {
@@ -124,6 +125,7 @@ TMaybe<TString> ParseDyNumberString(TStringBuf str) {
auto nonZeroAfterDot = 0U;
bool hasNonZeroAfterDot = false;
auto zeroAfterDot = 0U;
+ auto tailZerosBeforeDot = 0U;
i16 ePower = 0;
auto tailZeros = 0U;
TSmallVec<char> data;
@@ -150,7 +152,14 @@ TMaybe<TString> ParseDyNumberString(TStringBuf str) {
return Nothing();
if (!hasDot) {
++beforeDot;
- data.emplace_back(c - '0');
+ if (isZero) {
+ ++tailZerosBeforeDot;
+ } else {
+ for (; tailZerosBeforeDot; --tailZerosBeforeDot) {
+ data.emplace_back('\x00');
+ }
+ data.emplace_back(c - '0');
+ }
} else {
if (!isZero)
hasNonZeroAfterDot = true;
@@ -158,6 +167,9 @@ TMaybe<TString> ParseDyNumberString(TStringBuf str) {
if (isZero) {
++tailZeros;
} else {
+ for (; tailZerosBeforeDot; --tailZerosBeforeDot) {
+ data.emplace_back('\x00');
+ }
for (; tailZeros; --tailZeros) {
data.emplace_back('\x00');
++nonZeroAfterDot;
@@ -198,6 +210,9 @@ TMaybe<TString> ParseDyNumberString(TStringBuf str) {
for (auto i = 0U; i < data.size(); i += 2U)
result.append((data[i] << '\x04') | data[i + 1]);
}
+
+ // Cerr << str << ": " << HexText(TStringBuf{result.c_str(), result.size()}) << Endl;
+
return result;
}
@@ -237,4 +252,4 @@ TMaybe<TString> DyNumberToString(TStringBuf buffer) {
return out;
}
-} \ No newline at end of file
+}
diff --git a/ydb/library/dynumber/ut/dynumber_ut.cpp b/ydb/library/dynumber/ut/dynumber_ut.cpp
index f46db222de..278965eed1 100644
--- a/ydb/library/dynumber/ut/dynumber_ut.cpp
+++ b/ydb/library/dynumber/ut/dynumber_ut.cpp
@@ -62,6 +62,10 @@ Y_UNIT_TEST_SUITE(TDyNumberTests) {
TestDyNumber(".023");
TestDyNumber("0.93");
TestDyNumber("724.1");
+ TestDyNumber("150e2");
+ TestDyNumber("15e3");
+ TestDyNumber("0.150e4");
+ TestDyNumber("0.15e4");
TestDyNumber("1E-130");
TestDyNumber("9.9999999999999999999999999999999999999E+125");
TestDyNumber("9.9999999999999999999999999999999999999000E+125");
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
index 14a2b8ecd1..9400ba7b6f 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
@@ -37,23 +37,21 @@ namespace NKikimrServices {
constexpr ui32 KQP_COMPUTE = 535;
};
-const TString LogPrefix = "PQ sink. ";
-
-#define SINK_LOG_T(s) \
+#define SRC_LOG_T(s) \
LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG_D(s) \
+#define SRC_LOG_D(s) \
LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG_I(s) \
+#define SRC_LOG_I(s) \
LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG_W(s) \
+#define SRC_LOG_W(s) \
LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG_N(s) \
+#define SRC_LOG_N(s) \
LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG_E(s) \
+#define SRC_LOG_E(s) \
LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG_C(s) \
+#define SRC_LOG_C(s) \
LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
-#define SINK_LOG(prio, s) \
+#define SRC_LOG(prio, s) \
LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
namespace NYql::NDq {
@@ -93,7 +91,7 @@ public:
TDqPqReadActor(
ui64 inputIndex,
- const TString& txId,
+ const TTxId& txId,
const THolderFactory& holderFactory,
NPq::NProto::TDqPqTopicSource&& sourceParams,
NPq::NProto::TDqReadTaskParams&& readParams,
@@ -108,6 +106,7 @@ public:
, BufferSize(bufferSize)
, RangesMode(rangesMode)
, HolderFactory(holderFactory)
+ , LogPrefix(TStringBuilder() << "TxId: " << TxId << ", PQ source. ")
, Driver(std::move(driver))
, CredentialsProviderFactory(std::move(credentialsProviderFactory))
, SourceParams(std::move(sourceParams))
@@ -251,7 +250,7 @@ private:
i64 usedSpace = 0;
for (auto& event : events) {
- std::visit(TPQEventProcessor{*this, buffer, usedSpace}, event);
+ std::visit(TPQEventProcessor{*this, buffer, usedSpace, LogPrefix}, event);
}
SubscribeOnNextEvent();
@@ -309,8 +308,8 @@ private:
for (const auto& message : event.GetMessages()) {
const TString& data = message.GetData();
- LWPROBE(PqReadDataReceived, Self.TxId, Self.SourceParams.GetTopicPath(), data);
- SINK_LOG_T("Data received: " << data);
+ LWPROBE(PqReadDataReceived, TString(TStringBuilder() << Self.TxId), Self.SourceParams.GetTopicPath(), data);
+ SRC_LOG_T("Data received: " << message.DebugString(true));
Batch.emplace_back(NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size())));
UsedSpace += data.Size();
@@ -346,14 +345,16 @@ private:
TDqPqReadActor& Self;
TUnboxedValueVector& Batch;
i64& UsedSpace;
+ const TString& LogPrefix;
};
private:
const ui64 InputIndex;
- const TString TxId;
+ const TTxId TxId;
const i64 BufferSize;
const bool RangesMode;
const THolderFactory& HolderFactory;
+ const TString LogPrefix;
NYdb::TDriver Driver;
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
const NPq::NProto::TDqPqTopicSource SourceParams;
@@ -395,7 +396,7 @@ std::pair<IDqSourceActor*, NActors::IActor*> CreateDqPqReadActor(
TDqPqReadActor* actor = new TDqPqReadActor(
inputIndex,
- std::holds_alternative<ui64>(txId) ? ToString(txId) : std::get<TString>(txId),
+ txId,
holderFactory,
std::move(settings),
std::move(readTaskParamsMsg),
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
index 0aae5a7558..6cd23adcb8 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp
@@ -37,8 +37,6 @@ namespace NKikimrServices {
constexpr ui32 KQP_COMPUTE = 535;
};
-const TString LogPrefix = "PQ sink. ";
-
#define SINK_LOG_T(s) \
LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define SINK_LOG_D(s) \
@@ -92,7 +90,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu
public:
TDqPqWriteActor(
ui64 outputIndex,
- const TString& txId,
+ const TTxId& txId,
NPq::NProto::TDqPqTopicSink&& sinkParams,
NYdb::TDriver driver,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
@@ -105,6 +103,7 @@ public:
, Driver(std::move(driver))
, CredentialsProviderFactory(credentialsProviderFactory)
, Callbacks(callbacks)
+ , LogPrefix(TStringBuilder() << "TxId: " << TxId << ", PQ sink. ")
, FreeSpace(freeSpace)
, PersQueueClient(Driver, GetPersQueueClientSettings())
{ }
@@ -138,7 +137,7 @@ public:
TString data(dataCol.AsStringRef());
- LWPROBE(PqWriteDataToSend, TxId, SinkParams.GetTopicPath(), data);
+ LWPROBE(PqWriteDataToSend, TString(TStringBuilder() << TxId), SinkParams.GetTopicPath(), data);
SINK_LOG_T("Received data for sending: " << data);
const auto messageSize = GetItemSize(data);
@@ -367,11 +366,12 @@ private:
private:
const ui64 OutputIndex;
- const TString TxId;
+ const TTxId TxId;
const NPq::NProto::TDqPqTopicSink SinkParams;
NYdb::TDriver Driver;
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
IDqComputeActorAsyncOutput::ICallbacks* const Callbacks;
+ const TString LogPrefix;
i64 FreeSpace = 0;
NYdb::NPersQueue::TPersQueueClient PersQueueClient;
@@ -404,7 +404,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor(
TDqPqWriteActor* actor = new TDqPqWriteActor(
outputIndex,
- std::holds_alternative<ui64>(txId) ? ToString(txId) : std::get<TString>(txId),
+ txId,
std::move(settings),
std::move(driver),
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token, addBearerToToken),
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
index d6d4141454..584bfcd7be 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp
@@ -32,21 +32,21 @@
#include <variant>
#define SINK_LOG_T(s) \
- LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s)
+ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define SINK_LOG_D(s) \
- LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s)
+ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define SINK_LOG_I(s) \
- LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s)
+ LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define SINK_LOG_W(s) \
- LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s)
+ LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define SINK_LOG_N(s) \
- LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s)
+ LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define SINK_LOG_E(s) \
- LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s)
+ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define SINK_LOG_C(s) \
- LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s)
+ LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define SINK_LOG(prio, s) \
- LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s)
+ LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
namespace NYql::NDq {
@@ -108,6 +108,7 @@ public:
TDqSolomonWriteActor(
ui64 outputIndex,
+ const TTxId& txId,
TDqSolomonWriteParams&& writeParams,
NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* callbacks,
const NMonitoring::TDynamicCounterPtr& counters,
@@ -115,6 +116,8 @@ public:
i64 freeSpace)
: TActor<TDqSolomonWriteActor>(&TDqSolomonWriteActor::StateFunc)
, OutputIndex(outputIndex)
+ , TxId(txId)
+ , LogPrefix(TStringBuilder() << "TxId: " << TxId << ", Solomon sink. ")
, WriteParams(std::move(writeParams))
, Url(GetUrl())
, Callbacks(callbacks)
@@ -246,7 +249,7 @@ private:
}
TIssues issues { TIssue(errorBuilder) };
- SINK_LOG_W("Got error response from solomon " << issues.ToString());
+ SINK_LOG_W("Got error response from solomon " << issues.ToOneLineString());
Callbacks->OnSinkError(OutputIndex, issues, res->IsTerminal);
return;
}
@@ -432,6 +435,8 @@ private:
private:
const ui64 OutputIndex;
+ const TTxId TxId;
+ const TString LogPrefix;
const TDqSolomonWriteParams WriteParams;
const TString Url;
NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* const Callbacks;
@@ -453,6 +458,7 @@ private:
std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolomonWriteActor(
NYql::NSo::NProto::TDqSolomonShard&& settings,
ui64 outputIndex,
+ const TTxId& txId,
const THashMap<TString, TString>& secureParams,
NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* callbacks,
const NMonitoring::TDynamicCounterPtr& counters,
@@ -471,6 +477,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolo
TDqSolomonWriteActor* actor = new TDqSolomonWriteActor(
outputIndex,
+ txId,
std::move(params),
callbacks,
counters,
@@ -490,6 +497,7 @@ void RegisterDQSolomonWriteActorFactory(TDqSinkFactory& factory, ISecuredService
return CreateDqSolomonWriteActor(
std::move(settings),
args.OutputIndex,
+ args.TxId,
args.SecureParams,
args.Callback,
counters,
diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h
index aa5469f10b..3f257970b1 100644
--- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h
+++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h
@@ -21,6 +21,7 @@ constexpr i64 DqSolomonDefaultFreeSpace = 16_MB;
std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolomonWriteActor(
NYql::NSo::NProto::TDqSolomonShard&& settings,
ui64 outputIndex,
+ const TTxId& txId,
const THashMap<TString, TString>& secureParams,
NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* callbacks,
const NMonitoring::TDynamicCounterPtr& counters,
diff --git a/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp b/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp
index a97b2af780..aa82671e1d 100644
--- a/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp
+++ b/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp
@@ -48,6 +48,7 @@ void InitSink(
auto [dqSink, dqSinkAsActor] = CreateDqSolomonWriteActor(
std::move(settings),
0,
+ "TxId-42",
secureParams,
&actor.GetSinkCallbacks(),
counters,
diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp
index f15c26dfbc..cffb6ebc0c 100644
--- a/ydb/services/datastreams/datastreams_ut.cpp
+++ b/ydb/services/datastreams/datastreams_ut.cpp
@@ -32,6 +32,9 @@ static constexpr const char NON_CHARGEABLE_USER[] = "superuser@builtin";
static constexpr const char NON_CHARGEABLE_USER_X[] = "superuser_x@builtin";
static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@builtin";
+static constexpr const char DEFAULT_CLOUD_ID[] = "somecloud";
+static constexpr const char DEFAULT_FOLDER_ID[] = "somefolder";
+
template<class TKikimr, bool secure>
class TDatastreamsTestServer {
public:
@@ -83,7 +86,9 @@ public:
TClient client(*(KikimrServer->ServerSettings));
UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK,
- client.AlterUserAttributes("/", "Root", {{"folder_id", "somefolder"},{"cloud_id", "somecloud"}, {"database_id", "root"}}));
+ client.AlterUserAttributes("/", "Root", {{"folder_id", DEFAULT_FOLDER_ID},
+ {"cloud_id", DEFAULT_CLOUD_ID},
+ {"database_id", "root"}}));
}
public:
@@ -126,8 +131,8 @@ ui32 CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath,
UNIT_ASSERT(map.contains("labels"));
UNIT_ASSERT(map.contains("source_id"));
UNIT_ASSERT(map.contains("source_wt"));
- UNIT_ASSERT(map.find("cloud_id")->second.GetString() == "somecloud");
- UNIT_ASSERT(map.find("folder_id")->second.GetString() == "somefolder");
+ UNIT_ASSERT(map.find("cloud_id")->second.GetString() == DEFAULT_CLOUD_ID);
+ UNIT_ASSERT(map.find("folder_id")->second.GetString() == DEFAULT_FOLDER_ID);
UNIT_ASSERT(map.find("resource_id")->second.GetString() == streamPath);
tags_check(map);
labels_check(map);
diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp
index b7f4e3f004..11f106cbca 100644
--- a/ydb/services/lib/actors/pq_schema_actor.cpp
+++ b/ydb/services/lib/actors/pq_schema_actor.cpp
@@ -11,13 +11,11 @@
namespace NKikimr::NGRpcProxy::V1 {
-#define DEFAULT_PARTITION_SPEED 1048576 // 1Mb
-
- constexpr i32 MAX_READ_RULES_COUNT = 3000;
-
constexpr TStringBuf GRPCS_ENDPOINT_PREFIX = "grpcs://";
- static const i64 DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD = 16*24*60*60*1000;
-
+ constexpr i64 DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD_MS =
+ TDuration::Days(16).MilliSeconds();
+ constexpr ui64 DEFAULT_PARTITION_SPEED = 1_MB;
+ constexpr i32 MAX_READ_RULES_COUNT = 3000;
constexpr i32 MAX_SUPPORTED_CODECS_COUNT = 100;
TClientServiceTypes GetSupportedClientServiceTypes(const TActorContext& ctx) {
@@ -312,8 +310,13 @@ namespace NKikimr::NGRpcProxy::V1 {
error = TStringBuilder() << "message_group_seqno_retention_period_ms (provided " << settings.message_group_seqno_retention_period_ms() << ") must be more then retention_period_ms (provided " << settings.retention_period_ms() << ")";
return Ydb::StatusIds::BAD_REQUEST;
}
- if (settings.message_group_seqno_retention_period_ms() > DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD) {
- error = TStringBuilder() << "message_group_seqno_retention_period_ms (provided " << settings.message_group_seqno_retention_period_ms() << ") must be less then default limit for database " << DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD;
+ if (settings.message_group_seqno_retention_period_ms() >
+ DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD_MS) {
+ error = TStringBuilder() <<
+ "message_group_seqno_retention_period_ms (provided " <<
+ settings.message_group_seqno_retention_period_ms() <<
+ ") must be less then default limit for database " <<
+ DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD_MS;
return Ydb::StatusIds::BAD_REQUEST;
}
if (settings.message_group_seqno_retention_period_ms() < 0) {
diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.h b/ydb/services/persqueue_v1/grpc_pq_schema.h
index 55c4138905..36b24ac43d 100644
--- a/ydb/services/persqueue_v1/grpc_pq_schema.h
+++ b/ydb/services/persqueue_v1/grpc_pq_schema.h
@@ -14,8 +14,6 @@
namespace NKikimr::NGRpcProxy::V1 {
-static const i64 DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD = 16*24*60*60*1000;
-
inline TActorId GetPQSchemaServiceActorID() {
return TActorId(0, "PQSchmSvc");
}
diff --git a/ydb/tests/functional/dynumber/test_dynumber.py b/ydb/tests/functional/dynumber/test_dynumber.py
new file mode 100644
index 0000000000..bc6229ad89
--- /dev/null
+++ b/ydb/tests/functional/dynumber/test_dynumber.py
@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+import os
+import ydb
+
+
+def test_dynumber():
+ config = ydb.DriverConfig(database=os.getenv("YDB_DATABASE"), endpoint=os.getenv("YDB_ENDPOINT"))
+ table_name = os.path.join("/", os.getenv("YDB_DATABASE"), "table")
+ with ydb.Driver(config) as driver:
+ driver.wait(timeout=5)
+ session = ydb.retry_operation_sync(lambda: driver.table_client.session().create())
+ session.create_table(
+ table_name,
+ ydb.TableDescription()
+ .with_primary_key('key')
+ .with_columns(
+ ydb.Column('key', ydb.OptionalType(ydb.PrimitiveType.DyNumber)),
+ )
+ )
+
+ for value in ["DyNumber(\".149e4\")", "DyNumber(\"15e2\")", "DyNumber(\"150e1\")", "DyNumber(\".151e4\")", "DyNumber(\"1500.1\")"]:
+ session.transaction().execute(
+ "upsert into `%s` (key ) values (%s );" % (table_name, value),
+ commit_tx=True,
+ )
+
+ result = session.transaction().execute("select count(*) cnt from `%s`" % table_name, commit_tx=True)
+ assert result[0].rows[0].cnt == 4