diff options
author | arcadia-devtools <arcadia-devtools@yandex-team.ru> | 2022-05-12 16:52:45 +0300 |
---|---|---|
committer | Vitalii Gridnev <gvit@qavm-ff2ff183.qemu> | 2022-05-12 20:00:35 +0300 |
commit | 826c7eacd043881b3e5e60fbbfe7ca861a6bcffd (patch) | |
tree | b4cd400526efd5c9234a5c090feca5488075ab02 | |
parent | 5625647c87521120ca5a1f7d24288caa83cd1804 (diff) | |
download | ydb-826c7eacd043881b3e5e60fbbfe7ca861a6bcffd.tar.gz |
intermediate changes
ref:a6dd4541cd4224304d6eeff974067ff591bc40a7
34 files changed, 446 insertions, 155 deletions
diff --git a/build/rules/autocheck.blacklist b/build/rules/autocheck.blacklist index e29e5db441..5d5ccf8edd 100644 --- a/build/rules/autocheck.blacklist +++ b/build/rules/autocheck.blacklist @@ -1329,7 +1329,6 @@ classifieds/autoru-frontend-sitemap classifieds/beggar-banker classifieds/infra/goLB classifieds/infra/shiva-conf -haas/einebox classifieds/nginx-tests practicum/services/jrunner classifieds/sphinx-configs @@ -1404,14 +1403,12 @@ classifieds/cabinet bunker/nodejs-db-connector bunker/bunker-queue bunker/database -haas/factory-packages adv/pcode/mobileadssdk/android/library classifieds/realty/feedloader classifieds/vos2 classifieds/skypper admins/icecream/ice admins/icecream/backend -haas/infra/graphite-sender noc/packages/telegraf-noc-conf admins/icecream/node-icecream adv/pcode/mobileadssdk/sdk/ios/library @@ -1420,7 +1417,6 @@ trunk/arcadia/crm/space/frontend_static crm/space/frontend_static mail/java/furita toolbox/tunneler -samsara/samsara-attachments education/schoolbook/services/gauss education/schoolbook/packages/accio adv/pcode/mobileadssdk/tests/frankenstein diff --git a/library/cpp/yt/memory/ref_counted-inl.h b/library/cpp/yt/memory/ref_counted-inl.h index e6d64fec18..b41f64e803 100644 --- a/library/cpp/yt/memory/ref_counted-inl.h +++ b/library/cpp/yt/memory/ref_counted-inl.h @@ -66,7 +66,7 @@ struct TMemoryReleaser<T, std::enable_if_t<T::EnableHazard>> Y_FORCE_INLINE int TRefCounter::GetRefCount() const noexcept { - return StrongCount_.load(std::memory_order_relaxed); + return StrongCount_.load(std::memory_order_acquire); } Y_FORCE_INLINE void TRefCounter::Ref() const noexcept @@ -96,7 +96,7 @@ Y_FORCE_INLINE bool TRefCounter::Unref() const auto oldStrongCount = StrongCount_.fetch_sub(1, std::memory_order_release); YT_ASSERT(oldStrongCount > 0); if (oldStrongCount == 1) { - StrongCount_.load(std::memory_order_acquire); + std::atomic_thread_fence(std::memory_order_acquire); return true; } else { return false; @@ -119,7 +119,7 @@ Y_FORCE_INLINE bool TRefCounter::WeakUnref() const auto oldWeakCount = WeakCount_.fetch_sub(1, std::memory_order_release); YT_ASSERT(oldWeakCount > 0); if (oldWeakCount == 1) { - WeakCount_.load(std::memory_order_acquire); + std::atomic_thread_fence(std::memory_order_acquire); return true; } else { return false; diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index ae0e5b7a92..ebb8316f62 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -1038,7 +1038,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) { WriteTimestampEstimate = now; THolder <TEvKeyValue::TEvRequest> request = MakeHolder<TEvKeyValue::TEvRequest>(); - bool haveChanges = DropOldStuff(request.Get(), false, ctx); + bool haveChanges = CleanUp(request.Get(), false, ctx); if (DiskIsFull) { AddCheckDiskRequest(request.Get(), Config.GetPartitionConfig().GetNumChannels()); haveChanges = true; @@ -1073,29 +1073,30 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) { } -bool TPartition::DropOldStuff(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx) { - bool haveChanges = false; - if (DropOldData(request, hasWrites, ctx)) - haveChanges = true; - LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << request->Record.CmdDeleteRangeSize() << " items to delete old stuff"); - if (SourceIdStorage.DropOldSourceIds(request, ctx.Now(), StartOffset, Partition, Config.GetPartitionConfig())) { - haveChanges = true; +bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx) { + bool haveChanges = CleanUpBlobs(request, hasWrites, ctx); + LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << + request->Record.CmdDeleteRangeSize() << " items to delete old stuff"); + + haveChanges |= SourceIdStorage.DropOldSourceIds(request, ctx.Now(), StartOffset, Partition, + Config.GetPartitionConfig()); + if (haveChanges) { SourceIdStorage.MarkOwnersForDeletedSourceId(Owners); } - LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << request->Record.CmdDeleteRangeSize() << " items to delete all stuff"); + LOG_DEBUG(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Have " << + request->Record.CmdDeleteRangeSize() << " items to delete all stuff"); LOG_TRACE(ctx, NKikimrServices::PERSQUEUE, TStringBuilder() << "Delete command " << request->ToString()); + return haveChanges; } - -bool TPartition::DropOldData(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx) { - if (StartOffset == EndOffset) - return false; - if (DataKeysBody.size() <= 1) +bool TPartition::CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx) { + if (StartOffset == EndOffset || DataKeysBody.size() <= 1) return false; + const auto& partConfig = Config.GetPartitionConfig(); ui64 minOffset = EndOffset; - for (const auto& importantClientId : Config.GetPartitionConfig().GetImportantClientId()) { + for (const auto& importantClientId : partConfig.GetImportantClientId()) { TUserInfo* userInfo = UsersInfoStorage.GetIfExists(importantClientId); ui64 curOffset = StartOffset; if (userInfo && userInfo->Offset >= 0) //-1 means no offset @@ -1122,13 +1123,17 @@ bool TPartition::DropOldData(TEvKeyValue::TEvRequest *request, bool hasWrites, c Y_VERIFY(!DataKeysBody.empty()); endOffset = DataKeysBody.front().Key.GetOffset(); - if (DataKeysBody.front().Key.GetPartNo() > 0) ++endOffset; - + if (DataKeysBody.front().Key.GetPartNo() > 0) { + ++endOffset; + } } TDataKey lastKey = HeadKeys.empty() ? DataKeysBody.back() : HeadKeys.back(); - if (!hasWrites && ctx.Now() >= lastKey.Timestamp + TDuration::Seconds(Config.GetPartitionConfig().GetLifetimeSeconds()) && minOffset == EndOffset && false) { // disable drop of all data + if (!hasWrites && + ctx.Now() >= lastKey.Timestamp + TDuration::Seconds(Config.GetPartitionConfig().GetLifetimeSeconds()) && + minOffset == EndOffset && + false) { // disable drop of all data Y_VERIFY(!HeadKeys.empty() || !DataKeysBody.empty()); Y_VERIFY(CompactedKeys.empty()); @@ -4714,7 +4719,7 @@ void TPartition::HandleWrites(const TActorContext& ctx) } else { haveData = ProcessWrites(request.Get(), ctx); } - bool haveDrop = DropOldStuff(request.Get(), haveData, ctx); + bool haveDrop = CleanUp(request.Get(), haveData, ctx); ProcessReserveRequests(ctx); diff --git a/ydb/core/persqueue/partition.h b/ydb/core/persqueue/partition.h index 0b505416ba..0cb644ac12 100644 --- a/ydb/core/persqueue/partition.h +++ b/ydb/core/persqueue/partition.h @@ -159,7 +159,7 @@ private: void HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx); void Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx); - bool DropOldStuff(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx); + bool CleanUp(TEvKeyValue::TEvRequest* request, bool hasWrites, const TActorContext& ctx); //will fill sourceIds, request and NewHead //returns true if head is compacted @@ -372,7 +372,7 @@ private: }; } - bool DropOldData(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx); + bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, bool hasWrites, const TActorContext& ctx); std::pair<TKey, ui32> Compact(const TKey& key, const ui32 size, bool headCleared); void HandleWrites(const TActorContext& ctx); diff --git a/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp b/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp index dcafb2e92f..9ce091e22b 100644 --- a/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp +++ b/ydb/core/tx/datashard/datashard_ut_erase_rows.cpp @@ -560,8 +560,8 @@ UPSERT INTO `/Root/table-1` (key, value) VALUES (6, CAST("636249600000" As DyNumber)), (7, NULL); )", R"( -key = 5, value = .190244160e10 -key = 6, value = .63624960000e12 +key = 5, value = .19024416e10 +key = 6, value = .6362496e12 key = 7, value = (empty maybe) )", WithMvcc); } @@ -576,8 +576,8 @@ UPSERT INTO `/Root/table-1` (key, value) VALUES (4, CAST("636249600000000" As DyNumber)), (5, NULL); )", R"( -key = 3, value = .1902441600000e13 -key = 4, value = .636249600000000e15 +key = 3, value = .19024416e13 +key = 4, value = .6362496e15 key = 5, value = (empty maybe) )", WithMvcc); } @@ -595,7 +595,7 @@ UPSERT INTO `/Root/table-1` (key, value) VALUES (7, CAST("9.9999999999999999999999999999999999999E+125" As DyNumber)), (8, NULL); )", R"( -key = 6, value = .190244160000000e16 +key = 6, value = .19024416e16 key = 7, value = .99999999999999999999999999999999999999e126 key = 8, value = (empty maybe) )", WithMvcc); @@ -610,7 +610,7 @@ UPSERT INTO `/Root/table-1` (key, value) VALUES (3, CAST("1902441600000000000" As DyNumber)), (4, NULL); )", R"( -key = 3, value = .1902441600000000000e19 +key = 3, value = .19024416e19 key = 4, value = (empty maybe) )", WithMvcc); } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp index ffe49f67e7..d6e119676f 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_bsv.cpp @@ -113,10 +113,12 @@ public: ++parentDir->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir); context.SS->ClearDescribePathCaches(parentDir); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); - context.SS->ClearDescribePathCaches(path); - context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + } context.OnComplete.DoneOperation(OperationId); @@ -286,10 +288,12 @@ public: ++parentDir.Base()->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir.Base()); context.SS->ClearDescribePathCaches(parentDir.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); - context.SS->ClearDescribePathCaches(path.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + } State = NextState(); SetState(SelectStateFunc(State)); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp index 1a41593ef8..d76e89c4f8 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_fs.cpp @@ -117,10 +117,12 @@ public: ++parentDir->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir); context.SS->ClearDescribePathCaches(parentDir); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); - context.SS->ClearDescribePathCaches(path); - context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + } context.OnComplete.DoneOperation(OperationId); @@ -335,10 +337,12 @@ THolder<TProposeResponse> TDropFileStore::Propose( ++parentDir.Base()->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir.Base()); context.SS->ClearDescribePathCaches(parentDir.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); - context.SS->ClearDescribePathCaches(path.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + } State = NextState(); SetState(SelectStateFunc(State)); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp index 0818da7209..d471698590 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_indexed_table.cpp @@ -43,12 +43,12 @@ void DropPath(NIceDb::TNiceDb& db, TOperationContext& context, context.SS->PersistPathDirAlterVersion(db, parentDir.Base()); context.SS->ClearDescribePathCaches(parentDir.Base()); - context.OnComplete.PublishToSchemeBoard(operationId, parentDir->PathId); - context.SS->ClearDescribePathCaches(path.Base()); - context.OnComplete.PublishToSchemeBoard(operationId, path->PathId); - + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(operationId, parentDir->PathId); + context.OnComplete.PublishToSchemeBoard(operationId, path->PathId); + } } class TPropose: public TSubOperationState { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp index 776814e95b..947ca40808 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_kesus.cpp @@ -108,10 +108,12 @@ public: ++parentDir->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir); context.SS->ClearDescribePathCaches(parentDir); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); - context.SS->ClearDescribePathCaches(path); - context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + } context.OnComplete.DoneOperation(OperationId); @@ -274,10 +276,12 @@ public: ++parentDir.Base()->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir.Base()); context.SS->ClearDescribePathCaches(parentDir.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); - context.SS->ClearDescribePathCaches(path.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + } State = NextState(); SetState(SelectStateFunc(State)); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp index 9567113545..283607ef70 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_store.cpp @@ -102,10 +102,12 @@ public: ++parentDir->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir); context.SS->ClearDescribePathCaches(parentDir); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); - context.SS->ClearDescribePathCaches(path); - context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + } context.SS->ChangeTxState(db, OperationId, TTxState::ProposedWaitParts); return true; @@ -391,10 +393,12 @@ public: ++parent.Base()->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parent.Base()); context.SS->ClearDescribePathCaches(parent.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, parent.Base()->PathId); - context.SS->ClearDescribePathCaches(path.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parent.Base()->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + } State = NextState(); SetState(SelectStateFunc(State)); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp index 0370676572..4ee75f39e4 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_olap_table.cpp @@ -141,10 +141,12 @@ public: ++parentDir->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir); context.SS->ClearDescribePathCaches(parentDir); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); - context.SS->ClearDescribePathCaches(path); - context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + } context.SS->ChangeTxState(db, OperationId, TTxState::ProposedWaitParts); return true; @@ -451,10 +453,12 @@ public: ++parent.Base()->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parent.Base()); context.SS->ClearDescribePathCaches(parent.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, parent.Base()->PathId); - context.SS->ClearDescribePathCaches(path.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parent.Base()->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + } State = NextState(); SetState(SelectStateFunc(State)); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp index a718dbaf91..a85a65dca2 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_pq.cpp @@ -238,10 +238,12 @@ public: ++parentDir->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir); context.SS->ClearDescribePathCaches(parentDir); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); - context.SS->ClearDescribePathCaches(path); - context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + } context.SS->ChangeTxState(db, OperationId, TTxState::Done); context.OnComplete.ActivateTx(OperationId); @@ -491,10 +493,12 @@ public: ++parentDir.Base()->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir.Base()); context.SS->ClearDescribePathCaches(parentDir.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); - context.SS->ClearDescribePathCaches(path.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + } State = NextState(); SetState(SelectStateFunc(State)); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp index a1693c8a35..f6acf0a4d8 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_replication.cpp @@ -166,12 +166,13 @@ public: ++parentPath->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentPath); - context.SS->ClearDescribePathCaches(parentPath); - context.OnComplete.PublishToSchemeBoard(OperationId, parentPath->PathId); - context.SS->ClearDescribePathCaches(path); - context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentPath->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + } context.SS->ChangeTxState(db, OperationId, TTxState::Done); return true; @@ -327,12 +328,13 @@ public: ++parentPath->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentPath.Base()); - context.SS->ClearDescribePathCaches(parentPath.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, parentPath->PathId); - context.SS->ClearDescribePathCaches(path.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, path->PathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentPath->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, path->PathId); + } context.OnComplete.ActivateTx(OperationId); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp index 3f0a2ca03e..16e1cad401 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_sequence.cpp @@ -180,10 +180,12 @@ public: ++parentDir->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir); context.SS->ClearDescribePathCaches(parentDir); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); - context.SS->ClearDescribePathCaches(path); - context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + } context.SS->PersistSequenceRemove(db, pathId); @@ -392,10 +394,12 @@ public: ++parent->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parent.Base()); context.SS->ClearDescribePathCaches(parent.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, parent->PathId); - context.SS->ClearDescribePathCaches(path.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, path->PathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parent->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, path->PathId); + } State = NextState(); SetState(SelectStateFunc(State)); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp index 2d4db56a29..71b3042e6b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_solomon.cpp @@ -93,10 +93,12 @@ public: ++parentDir->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir); context.SS->ClearDescribePathCaches(parentDir); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); - context.SS->ClearDescribePathCaches(path); - context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + } context.SS->ChangeTxState(db, OperationId, TTxState::ProposedDeleteParts); return true; @@ -258,10 +260,12 @@ public: ++parentDir.Base()->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir.Base()); context.SS->ClearDescribePathCaches(parentDir.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); - context.SS->ClearDescribePathCaches(path.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + } State = NextState(); SetState(SelectStateFunc(State)); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp index eff1c9d8cc..c558b21fbc 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_table.cpp @@ -44,10 +44,12 @@ void DropPath(NIceDb::TNiceDb& db, context.SS->PersistPathDirAlterVersion(db, parentDir.Base()); context.SS->ClearDescribePathCaches(parentDir.Base()); - context.OnComplete.PublishToSchemeBoard(operationId, parentDir->PathId); - context.SS->ClearDescribePathCaches(path.Base()); - context.OnComplete.PublishToSchemeBoard(operationId, path->PathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(operationId, parentDir->PathId); + context.OnComplete.PublishToSchemeBoard(operationId, path->PathId); + } } class TDropParts: public TSubOperationState { diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp index 9f4d14e7fd..3652b0b24c 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_drop_unsafe.cpp @@ -89,10 +89,12 @@ public: ++parentDir->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir); context.SS->ClearDescribePathCaches(parentDir); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); - for (const TPathId pathId : pathes) { - context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + for (const TPathId pathId : pathes) { + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + } } context.SS->ChangeTxState(db, OperationId, TTxState::ProposedDeleteParts); @@ -304,10 +306,12 @@ public: ++parentDir.Base()->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir.Base()); context.SS->ClearDescribePathCaches(parentDir.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); - context.SS->ClearDescribePathCaches(path.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + } State = NextState(); SetState(SelectStateFunc(State)); diff --git a/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp b/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp index cde36e35e8..3701119300 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation_rmdir.cpp @@ -96,10 +96,12 @@ public: ++parentDir.Base()->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir.Base()); context.SS->ClearDescribePathCaches(parentDir.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); - context.SS->ClearDescribePathCaches(path.Base()); - context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir.Base()->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, path.Base()->PathId); + } return result; } @@ -165,10 +167,12 @@ public: ++parentDir->DirAlterVersion; context.SS->PersistPathDirAlterVersion(db, parentDir); context.SS->ClearDescribePathCaches(parentDir); - context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); - context.SS->ClearDescribePathCaches(path); - context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + + if (!context.SS->DisablePublicationsOfDropping) { + context.OnComplete.PublishToSchemeBoard(OperationId, parentDir->PathId); + context.OnComplete.PublishToSchemeBoard(OperationId, pathId); + } context.SS->TabletCounters->Simple()[COUNTER_USER_ATTRIBUTES_COUNT].Sub(path->UserAttrs->Size()); context.SS->PersistUserAttributes(db, path->PathId, path->UserAttrs, nullptr); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index a16ded6c5d..a117a6690a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -3668,6 +3668,7 @@ TSchemeShard::TSchemeShard(const TActorId &tablet, TTabletStorageInfo *info) , TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory) , AllowConditionalEraseOperations(1, 0, 1) , AllowServerlessStorageBilling(0, 0, 1) + , DisablePublicationsOfDropping(0, 0, 1) , SplitSettings() , IsReadOnlyMode(false) , ParentDomainLink(this) @@ -3797,6 +3798,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) { } appData->Icb->RegisterSharedControl(AllowConditionalEraseOperations, "SchemeShard_AllowConditionalEraseOperations"); + appData->Icb->RegisterSharedControl(DisablePublicationsOfDropping, "SchemeShard_DisablePublicationsOfDropping"); AllowDataColumnForIndexTable = appData->FeatureFlags.GetEnableDataColumnForIndexTable(); appData->Icb->RegisterSharedControl(AllowDataColumnForIndexTable, "SchemeShard_AllowDataColumnForIndexTable"); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 69577641d4..0eba6b3659 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -114,6 +114,7 @@ public: TControlWrapper AllowConditionalEraseOperations; TControlWrapper AllowServerlessStorageBilling; + TControlWrapper DisablePublicationsOfDropping; TSplitSettings SplitSettings; diff --git a/ydb/core/tx/schemeshard/ut_base.cpp b/ydb/core/tx/schemeshard/ut_base.cpp index 9b135fc880..28392faadd 100644 --- a/ydb/core/tx/schemeshard/ut_base.cpp +++ b/ydb/core/tx/schemeshard/ut_base.cpp @@ -10075,4 +10075,152 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) { NLs::MinPartitionsCountEqual(1), NLs::MaxPartitionsCountEqual(100)}); } + + template <typename TCreateFn, typename TDropFn> + void DisablePublicationsOfDropping(NSchemeCache::TSchemeCacheNavigate::EOp op, TCreateFn&& createFn, TDropFn&& dropFn) { + TTestBasicRuntime runtime; + TTestEnv env(runtime); + ui64 txId = 100; + + // disable publications + { + TAtomic unused; + runtime.GetAppData().Icb->SetValue("SchemeShard_DisablePublicationsOfDropping", true, unused); + } + + createFn(runtime, txId); + env.TestWaitNotification(runtime, txId); + + { + auto nav = Navigate(runtime, "/MyRoot/Obj", op); + const auto& entry = nav->ResultSet.at(0); + UNIT_ASSERT_VALUES_EQUAL(entry.Status, NSchemeCache::TSchemeCacheNavigate::EStatus::Ok); + } + + dropFn(runtime, txId); + env.TestWaitNotification(runtime, txId); + + // still ok + { + auto nav = Navigate(runtime, "/MyRoot/Obj", op); + const auto& entry = nav->ResultSet.at(0); + UNIT_ASSERT_VALUES_EQUAL(entry.Status, NSchemeCache::TSchemeCacheNavigate::EStatus::Ok); + } + + // check after reboot (should be removed in process of sync) + RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor()); + + while (true) { + auto nav = Navigate(runtime, "/MyRoot/Obj", op); + const auto& entry = nav->ResultSet.at(0); + if ((entry.Status == NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown)) { + break; + } + + env.SimulateSleep(runtime, TDuration::MilliSeconds(100)); + } + + // enable publications + { + TAtomic unused; + runtime.GetAppData().Icb->SetValue("SchemeShard_DisablePublicationsOfDropping", false, unused); + } + + createFn(runtime, txId); + env.TestWaitNotification(runtime, txId); + + { + auto nav = Navigate(runtime, "/MyRoot/Obj", op); + const auto& entry = nav->ResultSet.at(0); + UNIT_ASSERT_VALUES_EQUAL(entry.Status, NSchemeCache::TSchemeCacheNavigate::EStatus::Ok); + } + + dropFn(runtime, txId); + env.TestWaitNotification(runtime, txId); + + { + auto nav = Navigate(runtime, "/MyRoot/Obj", op); + const auto& entry = nav->ResultSet.at(0); + UNIT_ASSERT_VALUES_EQUAL(entry.Status, NSchemeCache::TSchemeCacheNavigate::EStatus::PathErrorUnknown); + } + } + + Y_UNIT_TEST(DisablePublicationsOfDropping_Dir) { + DisablePublicationsOfDropping(NSchemeCache::TSchemeCacheNavigate::EOp::OpPath, + [](TTestBasicRuntime& runtime, ui64& txId) { + return TestMkDir(runtime, ++txId, "/MyRoot", "Obj"); + }, + [](TTestBasicRuntime& runtime, ui64& txId) { + return TestRmDir(runtime, ++txId, "/MyRoot", "Obj"); + } + ); + } + + Y_UNIT_TEST(DisablePublicationsOfDropping_Table) { + DisablePublicationsOfDropping(NSchemeCache::TSchemeCacheNavigate::EOp::OpTable, + [](TTestBasicRuntime& runtime, ui64& txId) { + return TestCreateTable(runtime, ++txId, "/MyRoot", R"( + Name: "Obj" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )"); + }, + [](TTestBasicRuntime& runtime, ui64& txId) { + return TestDropTable(runtime, ++txId, "/MyRoot", "Obj"); + } + ); + } + + Y_UNIT_TEST(DisablePublicationsOfDropping_IndexedTable) { + DisablePublicationsOfDropping(NSchemeCache::TSchemeCacheNavigate::EOp::OpTable, + [](TTestBasicRuntime& runtime, ui64& txId) { + return TestCreateIndexedTable(runtime, ++txId, "/MyRoot", R"( + TableDescription { + Name: "Obj" + Columns { Name: "key" Type: "Uint64" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + } + IndexDescription { + Name: "UserDefinedIndexByValue" + KeyColumnNames: ["value"] + } + )"); + }, + [](TTestBasicRuntime& runtime, ui64& txId) { + return TestDropTable(runtime, ++txId, "/MyRoot", "Obj"); + } + ); + } + + Y_UNIT_TEST(DisablePublicationsOfDropping_Pq) { + DisablePublicationsOfDropping(NSchemeCache::TSchemeCacheNavigate::EOp::OpTopic, + [](TTestBasicRuntime& runtime, ui64& txId) { + return TestCreatePQGroup(runtime, ++txId, "/MyRoot", R"( + Name: "Obj" + TotalGroupCount: 1 + PartitionPerTablet: 1 + PQTabletConfig: { PartitionConfig { LifetimeSeconds: 10 } } + )"); + }, + [](TTestBasicRuntime& runtime, ui64& txId) { + return TestDropPQGroup(runtime, ++txId, "/MyRoot", "Obj"); + } + ); + } + + Y_UNIT_TEST(DisablePublicationsOfDropping_Solomon) { + DisablePublicationsOfDropping(NSchemeCache::TSchemeCacheNavigate::EOp::OpPath, + [](TTestBasicRuntime& runtime, ui64& txId) { + return TestCreateSolomon(runtime, ++txId, "/MyRoot", R"( + Name: "Obj" + PartitionCount: 1 + )"); + }, + [](TTestBasicRuntime& runtime, ui64& txId) { + return TestDropSolomon(runtime, ++txId, "/MyRoot", "Obj"); + } + ); + } } diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp index e87d0c6341..b2543b22f1 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.cpp @@ -294,6 +294,33 @@ namespace NSchemeShardUT_Private { return record.DebugString(); } + THolder<NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TString& path, + NSchemeCache::TSchemeCacheNavigate::EOp op) + { + using TNavigate = NSchemeCache::TSchemeCacheNavigate; + using TEvRequest = TEvTxProxySchemeCache::TEvNavigateKeySet; + using TEvResponse = TEvTxProxySchemeCache::TEvNavigateKeySetResult; + + const auto sender = runtime.AllocateEdgeActor(); + auto request = MakeHolder<TNavigate>(); + auto& entry = request->ResultSet.emplace_back(); + entry.Path = SplitPath(path); + entry.RequestType = TNavigate::TEntry::ERequestType::ByPath; + entry.Operation = op; + entry.ShowPrivatePath = true; + runtime.Send(new IEventHandle(MakeSchemeCacheID(), sender, new TEvRequest(request.Release()))); + + auto ev = runtime.GrabEdgeEventRethrow<TEvResponse>(sender); + UNIT_ASSERT(ev); + UNIT_ASSERT(ev->Get()); + + auto* response = ev->Get()->Request.Release(); + UNIT_ASSERT(response); + UNIT_ASSERT_VALUES_EQUAL(response->ResultSet.size(), 1); + + return THolder(response); + } + TEvSchemeShard::TEvModifySchemeTransaction* CopyTableRequest(ui64 txId, const TString& dstPath, const TString& dstName, const TString& srcFullName, TApplyIf applyIf) { auto evTx = new TEvSchemeShard::TEvModifySchemeTransaction(txId, TTestTxConfig::SchemeShard); auto transaction = evTx->Record.AddTransaction(); diff --git a/ydb/core/tx/schemeshard/ut_helpers/helpers.h b/ydb/core/tx/schemeshard/ut_helpers/helpers.h index 02e8ce5609..de4aaa0f42 100644 --- a/ydb/core/tx/schemeshard/ut_helpers/helpers.h +++ b/ydb/core/tx/schemeshard/ut_helpers/helpers.h @@ -9,6 +9,7 @@ #include <ydb/core/protos/tx_datashard.pb.h> #include <ydb/core/testlib/minikql_compile.h> #include <ydb/core/tx/datashard/datashard.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/core/tx/schemeshard/schemeshard_build_index.h> #include <ydb/core/tx/schemeshard/schemeshard_export.h> #include <ydb/core/tx/schemeshard/schemeshard_import.h> @@ -63,6 +64,9 @@ namespace NSchemeShardUT_Private { TString TestLs(TTestActorRuntime& runtime, const TString& path, const NKikimrSchemeOp::TDescribeOptions& opts, NLs::TCheckFunc check = nullptr); TString TestLsPathId(TTestActorRuntime& runtime, ui64 pathId, NLs::TCheckFunc check = nullptr); + THolder<NSchemeCache::TSchemeCacheNavigate> Navigate(TTestActorRuntime& runtime, const TString& path, + NSchemeCache::TSchemeCacheNavigate::EOp op = NSchemeCache::TSchemeCacheNavigate::EOp::OpPath); + ////////// modification results void CheckExpected(const TVector<TEvSchemeShard::EStatus>& expected, TEvSchemeShard::EStatus result, const TString& reason); void CheckExpected(const TVector<Ydb::StatusIds::StatusCode>& expected, Ydb::StatusIds::StatusCode result, const TString& reason); diff --git a/ydb/library/dynumber/dynumber.cpp b/ydb/library/dynumber/dynumber.cpp index af2d50f15a..9287ea01a5 100644 --- a/ydb/library/dynumber/dynumber.cpp +++ b/ydb/library/dynumber/dynumber.cpp @@ -5,6 +5,7 @@ #include <util/string/cast.h> #include <util/string/builder.h> #include <util/stream/buffer.h> +#include <util/stream/format.h> namespace NKikimr::NDyNumber { @@ -124,6 +125,7 @@ TMaybe<TString> ParseDyNumberString(TStringBuf str) { auto nonZeroAfterDot = 0U; bool hasNonZeroAfterDot = false; auto zeroAfterDot = 0U; + auto tailZerosBeforeDot = 0U; i16 ePower = 0; auto tailZeros = 0U; TSmallVec<char> data; @@ -150,7 +152,14 @@ TMaybe<TString> ParseDyNumberString(TStringBuf str) { return Nothing(); if (!hasDot) { ++beforeDot; - data.emplace_back(c - '0'); + if (isZero) { + ++tailZerosBeforeDot; + } else { + for (; tailZerosBeforeDot; --tailZerosBeforeDot) { + data.emplace_back('\x00'); + } + data.emplace_back(c - '0'); + } } else { if (!isZero) hasNonZeroAfterDot = true; @@ -158,6 +167,9 @@ TMaybe<TString> ParseDyNumberString(TStringBuf str) { if (isZero) { ++tailZeros; } else { + for (; tailZerosBeforeDot; --tailZerosBeforeDot) { + data.emplace_back('\x00'); + } for (; tailZeros; --tailZeros) { data.emplace_back('\x00'); ++nonZeroAfterDot; @@ -198,6 +210,9 @@ TMaybe<TString> ParseDyNumberString(TStringBuf str) { for (auto i = 0U; i < data.size(); i += 2U) result.append((data[i] << '\x04') | data[i + 1]); } + + // Cerr << str << ": " << HexText(TStringBuf{result.c_str(), result.size()}) << Endl; + return result; } @@ -237,4 +252,4 @@ TMaybe<TString> DyNumberToString(TStringBuf buffer) { return out; } -}
\ No newline at end of file +} diff --git a/ydb/library/dynumber/ut/dynumber_ut.cpp b/ydb/library/dynumber/ut/dynumber_ut.cpp index f46db222de..278965eed1 100644 --- a/ydb/library/dynumber/ut/dynumber_ut.cpp +++ b/ydb/library/dynumber/ut/dynumber_ut.cpp @@ -62,6 +62,10 @@ Y_UNIT_TEST_SUITE(TDyNumberTests) { TestDyNumber(".023"); TestDyNumber("0.93"); TestDyNumber("724.1"); + TestDyNumber("150e2"); + TestDyNumber("15e3"); + TestDyNumber("0.150e4"); + TestDyNumber("0.15e4"); TestDyNumber("1E-130"); TestDyNumber("9.9999999999999999999999999999999999999E+125"); TestDyNumber("9.9999999999999999999999999999999999999000E+125"); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index 14a2b8ecd1..9400ba7b6f 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -37,23 +37,21 @@ namespace NKikimrServices { constexpr ui32 KQP_COMPUTE = 535; }; -const TString LogPrefix = "PQ sink. "; - -#define SINK_LOG_T(s) \ +#define SRC_LOG_T(s) \ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG_D(s) \ +#define SRC_LOG_D(s) \ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG_I(s) \ +#define SRC_LOG_I(s) \ LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG_W(s) \ +#define SRC_LOG_W(s) \ LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG_N(s) \ +#define SRC_LOG_N(s) \ LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG_E(s) \ +#define SRC_LOG_E(s) \ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG_C(s) \ +#define SRC_LOG_C(s) \ LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) -#define SINK_LOG(prio, s) \ +#define SRC_LOG(prio, s) \ LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s) namespace NYql::NDq { @@ -93,7 +91,7 @@ public: TDqPqReadActor( ui64 inputIndex, - const TString& txId, + const TTxId& txId, const THolderFactory& holderFactory, NPq::NProto::TDqPqTopicSource&& sourceParams, NPq::NProto::TDqReadTaskParams&& readParams, @@ -108,6 +106,7 @@ public: , BufferSize(bufferSize) , RangesMode(rangesMode) , HolderFactory(holderFactory) + , LogPrefix(TStringBuilder() << "TxId: " << TxId << ", PQ source. ") , Driver(std::move(driver)) , CredentialsProviderFactory(std::move(credentialsProviderFactory)) , SourceParams(std::move(sourceParams)) @@ -251,7 +250,7 @@ private: i64 usedSpace = 0; for (auto& event : events) { - std::visit(TPQEventProcessor{*this, buffer, usedSpace}, event); + std::visit(TPQEventProcessor{*this, buffer, usedSpace, LogPrefix}, event); } SubscribeOnNextEvent(); @@ -309,8 +308,8 @@ private: for (const auto& message : event.GetMessages()) { const TString& data = message.GetData(); - LWPROBE(PqReadDataReceived, Self.TxId, Self.SourceParams.GetTopicPath(), data); - SINK_LOG_T("Data received: " << data); + LWPROBE(PqReadDataReceived, TString(TStringBuilder() << Self.TxId), Self.SourceParams.GetTopicPath(), data); + SRC_LOG_T("Data received: " << message.DebugString(true)); Batch.emplace_back(NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size()))); UsedSpace += data.Size(); @@ -346,14 +345,16 @@ private: TDqPqReadActor& Self; TUnboxedValueVector& Batch; i64& UsedSpace; + const TString& LogPrefix; }; private: const ui64 InputIndex; - const TString TxId; + const TTxId TxId; const i64 BufferSize; const bool RangesMode; const THolderFactory& HolderFactory; + const TString LogPrefix; NYdb::TDriver Driver; std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory; const NPq::NProto::TDqPqTopicSource SourceParams; @@ -395,7 +396,7 @@ std::pair<IDqSourceActor*, NActors::IActor*> CreateDqPqReadActor( TDqPqReadActor* actor = new TDqPqReadActor( inputIndex, - std::holds_alternative<ui64>(txId) ? ToString(txId) : std::get<TString>(txId), + txId, holderFactory, std::move(settings), std::move(readTaskParamsMsg), diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp index 0aae5a7558..6cd23adcb8 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.cpp @@ -37,8 +37,6 @@ namespace NKikimrServices { constexpr ui32 KQP_COMPUTE = 535; }; -const TString LogPrefix = "PQ sink. "; - #define SINK_LOG_T(s) \ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define SINK_LOG_D(s) \ @@ -92,7 +90,7 @@ class TDqPqWriteActor : public NActors::TActor<TDqPqWriteActor>, public IDqCompu public: TDqPqWriteActor( ui64 outputIndex, - const TString& txId, + const TTxId& txId, NPq::NProto::TDqPqTopicSink&& sinkParams, NYdb::TDriver driver, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, @@ -105,6 +103,7 @@ public: , Driver(std::move(driver)) , CredentialsProviderFactory(credentialsProviderFactory) , Callbacks(callbacks) + , LogPrefix(TStringBuilder() << "TxId: " << TxId << ", PQ sink. ") , FreeSpace(freeSpace) , PersQueueClient(Driver, GetPersQueueClientSettings()) { } @@ -138,7 +137,7 @@ public: TString data(dataCol.AsStringRef()); - LWPROBE(PqWriteDataToSend, TxId, SinkParams.GetTopicPath(), data); + LWPROBE(PqWriteDataToSend, TString(TStringBuilder() << TxId), SinkParams.GetTopicPath(), data); SINK_LOG_T("Received data for sending: " << data); const auto messageSize = GetItemSize(data); @@ -367,11 +366,12 @@ private: private: const ui64 OutputIndex; - const TString TxId; + const TTxId TxId; const NPq::NProto::TDqPqTopicSink SinkParams; NYdb::TDriver Driver; std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory; IDqComputeActorAsyncOutput::ICallbacks* const Callbacks; + const TString LogPrefix; i64 FreeSpace = 0; NYdb::NPersQueue::TPersQueueClient PersQueueClient; @@ -404,7 +404,7 @@ std::pair<IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqPqWriteActor( TDqPqWriteActor* actor = new TDqPqWriteActor( outputIndex, - std::holds_alternative<ui64>(txId) ? ToString(txId) : std::get<TString>(txId), + txId, std::move(settings), std::move(driver), CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token, addBearerToToken), diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp index d6d4141454..584bfcd7be 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.cpp @@ -32,21 +32,21 @@ #include <variant> #define SINK_LOG_T(s) \ - LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s) + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define SINK_LOG_D(s) \ - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s) + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define SINK_LOG_I(s) \ - LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s) + LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define SINK_LOG_W(s) \ - LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s) + LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define SINK_LOG_N(s) \ - LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s) + LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define SINK_LOG_E(s) \ - LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s) + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define SINK_LOG_C(s) \ - LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s) + LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define SINK_LOG(prio, s) \ - LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, "Solomon sink. " << s) + LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s) namespace NYql::NDq { @@ -108,6 +108,7 @@ public: TDqSolomonWriteActor( ui64 outputIndex, + const TTxId& txId, TDqSolomonWriteParams&& writeParams, NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* callbacks, const NMonitoring::TDynamicCounterPtr& counters, @@ -115,6 +116,8 @@ public: i64 freeSpace) : TActor<TDqSolomonWriteActor>(&TDqSolomonWriteActor::StateFunc) , OutputIndex(outputIndex) + , TxId(txId) + , LogPrefix(TStringBuilder() << "TxId: " << TxId << ", Solomon sink. ") , WriteParams(std::move(writeParams)) , Url(GetUrl()) , Callbacks(callbacks) @@ -246,7 +249,7 @@ private: } TIssues issues { TIssue(errorBuilder) }; - SINK_LOG_W("Got error response from solomon " << issues.ToString()); + SINK_LOG_W("Got error response from solomon " << issues.ToOneLineString()); Callbacks->OnSinkError(OutputIndex, issues, res->IsTerminal); return; } @@ -432,6 +435,8 @@ private: private: const ui64 OutputIndex; + const TTxId TxId; + const TString LogPrefix; const TDqSolomonWriteParams WriteParams; const TString Url; NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* const Callbacks; @@ -453,6 +458,7 @@ private: std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolomonWriteActor( NYql::NSo::NProto::TDqSolomonShard&& settings, ui64 outputIndex, + const TTxId& txId, const THashMap<TString, TString>& secureParams, NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* callbacks, const NMonitoring::TDynamicCounterPtr& counters, @@ -471,6 +477,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolo TDqSolomonWriteActor* actor = new TDqSolomonWriteActor( outputIndex, + txId, std::move(params), callbacks, counters, @@ -490,6 +497,7 @@ void RegisterDQSolomonWriteActorFactory(TDqSinkFactory& factory, ISecuredService return CreateDqSolomonWriteActor( std::move(settings), args.OutputIndex, + args.TxId, args.SecureParams, args.Callback, counters, diff --git a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h index aa5469f10b..3f257970b1 100644 --- a/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h +++ b/ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h @@ -21,6 +21,7 @@ constexpr i64 DqSolomonDefaultFreeSpace = 16_MB; std::pair<NYql::NDq::IDqComputeActorAsyncOutput*, NActors::IActor*> CreateDqSolomonWriteActor( NYql::NSo::NProto::TDqSolomonShard&& settings, ui64 outputIndex, + const TTxId& txId, const THashMap<TString, TString>& secureParams, NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks* callbacks, const NMonitoring::TDynamicCounterPtr& counters, diff --git a/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp b/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp index a97b2af780..aa82671e1d 100644 --- a/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp +++ b/ydb/library/yql/providers/solomon/async_io/ut/ut_helpers.cpp @@ -48,6 +48,7 @@ void InitSink( auto [dqSink, dqSinkAsActor] = CreateDqSolomonWriteActor( std::move(settings), 0, + "TxId-42", secureParams, &actor.GetSinkCallbacks(), counters, diff --git a/ydb/services/datastreams/datastreams_ut.cpp b/ydb/services/datastreams/datastreams_ut.cpp index f15c26dfbc..cffb6ebc0c 100644 --- a/ydb/services/datastreams/datastreams_ut.cpp +++ b/ydb/services/datastreams/datastreams_ut.cpp @@ -32,6 +32,9 @@ static constexpr const char NON_CHARGEABLE_USER[] = "superuser@builtin"; static constexpr const char NON_CHARGEABLE_USER_X[] = "superuser_x@builtin"; static constexpr const char NON_CHARGEABLE_USER_Y[] = "superuser_y@builtin"; +static constexpr const char DEFAULT_CLOUD_ID[] = "somecloud"; +static constexpr const char DEFAULT_FOLDER_ID[] = "somefolder"; + template<class TKikimr, bool secure> class TDatastreamsTestServer { public: @@ -83,7 +86,9 @@ public: TClient client(*(KikimrServer->ServerSettings)); UNIT_ASSERT_VALUES_EQUAL(NMsgBusProxy::MSTATUS_OK, - client.AlterUserAttributes("/", "Root", {{"folder_id", "somefolder"},{"cloud_id", "somecloud"}, {"database_id", "root"}})); + client.AlterUserAttributes("/", "Root", {{"folder_id", DEFAULT_FOLDER_ID}, + {"cloud_id", DEFAULT_CLOUD_ID}, + {"database_id", "root"}})); } public: @@ -126,8 +131,8 @@ ui32 CheckMeteringFile(TTempFileHandle* meteringFile, const TString& streamPath, UNIT_ASSERT(map.contains("labels")); UNIT_ASSERT(map.contains("source_id")); UNIT_ASSERT(map.contains("source_wt")); - UNIT_ASSERT(map.find("cloud_id")->second.GetString() == "somecloud"); - UNIT_ASSERT(map.find("folder_id")->second.GetString() == "somefolder"); + UNIT_ASSERT(map.find("cloud_id")->second.GetString() == DEFAULT_CLOUD_ID); + UNIT_ASSERT(map.find("folder_id")->second.GetString() == DEFAULT_FOLDER_ID); UNIT_ASSERT(map.find("resource_id")->second.GetString() == streamPath); tags_check(map); labels_check(map); diff --git a/ydb/services/lib/actors/pq_schema_actor.cpp b/ydb/services/lib/actors/pq_schema_actor.cpp index b7f4e3f004..11f106cbca 100644 --- a/ydb/services/lib/actors/pq_schema_actor.cpp +++ b/ydb/services/lib/actors/pq_schema_actor.cpp @@ -11,13 +11,11 @@ namespace NKikimr::NGRpcProxy::V1 { -#define DEFAULT_PARTITION_SPEED 1048576 // 1Mb - - constexpr i32 MAX_READ_RULES_COUNT = 3000; - constexpr TStringBuf GRPCS_ENDPOINT_PREFIX = "grpcs://"; - static const i64 DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD = 16*24*60*60*1000; - + constexpr i64 DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD_MS = + TDuration::Days(16).MilliSeconds(); + constexpr ui64 DEFAULT_PARTITION_SPEED = 1_MB; + constexpr i32 MAX_READ_RULES_COUNT = 3000; constexpr i32 MAX_SUPPORTED_CODECS_COUNT = 100; TClientServiceTypes GetSupportedClientServiceTypes(const TActorContext& ctx) { @@ -312,8 +310,13 @@ namespace NKikimr::NGRpcProxy::V1 { error = TStringBuilder() << "message_group_seqno_retention_period_ms (provided " << settings.message_group_seqno_retention_period_ms() << ") must be more then retention_period_ms (provided " << settings.retention_period_ms() << ")"; return Ydb::StatusIds::BAD_REQUEST; } - if (settings.message_group_seqno_retention_period_ms() > DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD) { - error = TStringBuilder() << "message_group_seqno_retention_period_ms (provided " << settings.message_group_seqno_retention_period_ms() << ") must be less then default limit for database " << DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD; + if (settings.message_group_seqno_retention_period_ms() > + DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD_MS) { + error = TStringBuilder() << + "message_group_seqno_retention_period_ms (provided " << + settings.message_group_seqno_retention_period_ms() << + ") must be less then default limit for database " << + DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD_MS; return Ydb::StatusIds::BAD_REQUEST; } if (settings.message_group_seqno_retention_period_ms() < 0) { diff --git a/ydb/services/persqueue_v1/grpc_pq_schema.h b/ydb/services/persqueue_v1/grpc_pq_schema.h index 55c4138905..36b24ac43d 100644 --- a/ydb/services/persqueue_v1/grpc_pq_schema.h +++ b/ydb/services/persqueue_v1/grpc_pq_schema.h @@ -14,8 +14,6 @@ namespace NKikimr::NGRpcProxy::V1 { -static const i64 DEFAULT_MAX_DATABASE_MESSAGEGROUP_SEQNO_RETENTION_PERIOD = 16*24*60*60*1000; - inline TActorId GetPQSchemaServiceActorID() { return TActorId(0, "PQSchmSvc"); } diff --git a/ydb/tests/functional/dynumber/test_dynumber.py b/ydb/tests/functional/dynumber/test_dynumber.py new file mode 100644 index 0000000000..bc6229ad89 --- /dev/null +++ b/ydb/tests/functional/dynumber/test_dynumber.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +import os +import ydb + + +def test_dynumber(): + config = ydb.DriverConfig(database=os.getenv("YDB_DATABASE"), endpoint=os.getenv("YDB_ENDPOINT")) + table_name = os.path.join("/", os.getenv("YDB_DATABASE"), "table") + with ydb.Driver(config) as driver: + driver.wait(timeout=5) + session = ydb.retry_operation_sync(lambda: driver.table_client.session().create()) + session.create_table( + table_name, + ydb.TableDescription() + .with_primary_key('key') + .with_columns( + ydb.Column('key', ydb.OptionalType(ydb.PrimitiveType.DyNumber)), + ) + ) + + for value in ["DyNumber(\".149e4\")", "DyNumber(\"15e2\")", "DyNumber(\"150e1\")", "DyNumber(\".151e4\")", "DyNumber(\"1500.1\")"]: + session.transaction().execute( + "upsert into `%s` (key ) values (%s );" % (table_name, value), + commit_tx=True, + ) + + result = session.transaction().execute("select count(*) cnt from `%s`" % table_name, commit_tx=True) + assert result[0].rows[0].cnt == 4 |