diff options
author | Vitaly Isaev <vitalyisaev@ydb.tech> | 2024-12-12 15:39:00 +0000 |
---|---|---|
committer | Vitaly Isaev <vitalyisaev@ydb.tech> | 2024-12-12 15:39:00 +0000 |
commit | 827b115675004838023427572a7c69f40a86a80a (patch) | |
tree | e99c953fe494b9de8d8597a15859d77c81f118c7 | |
parent | 42701242eaf5be980cb935631586d0e90b82641c (diff) | |
parent | fab222fd8176d00eee5ddafc6bce8cb95a6e3ab0 (diff) | |
download | ydb-827b115675004838023427572a7c69f40a86a80a.tar.gz |
Merge branch 'rightlib' into rightlib_20241212
41 files changed, 502 insertions, 233 deletions
diff --git a/build/sysincl/unsorted.yml b/build/sysincl/unsorted.yml index d101a7cf2d..387eb86d72 100644 --- a/build/sysincl/unsorted.yml +++ b/build/sysincl/unsorted.yml @@ -259,7 +259,6 @@ #endif #if defined(_TRASH_) && TODO - gcrypt.h - - winmmap.h #endif #if defined(__TURBOC__) || defined(__BORLANDC__) - alloc.h diff --git a/library/cpp/testing/common/env.cpp b/library/cpp/testing/common/env.cpp index 67e081c371..1440186d78 100644 --- a/library/cpp/testing/common/env.cpp +++ b/library/cpp/testing/common/env.cpp @@ -41,31 +41,7 @@ TString BinaryPath(TStringBuf path) { } TString GetArcadiaTestsData() { - if (GetEnv("USE_ATD_FROM_SNAPSHOT")) { - return ArcadiaSourceRoot() + "/atd_ro_snapshot"; - } - - TString atdRoot = NPrivate::GetTestEnv().ArcadiaTestsDataDir; - if (atdRoot) { - return atdRoot; - } - - TString path = NPrivate::GetCwd(); - const char pathsep = GetDirectorySeparator(); - while (!path.empty()) { - TString dataDir = path + "/arcadia_tests_data"; - if (IsDir(dataDir)) { - return dataDir; - } - - size_t pos = path.find_last_of(pathsep); - if (pos == TString::npos) { - pos = 0; - } - path.erase(pos); - } - - return {}; + return ArcadiaSourceRoot() + "/atd_ro_snapshot"; } TString GetWorkPath() { diff --git a/library/cpp/testing/common/ut/env_ut.cpp b/library/cpp/testing/common/ut/env_ut.cpp index fe4946a65f..4b67d05efb 100644 --- a/library/cpp/testing/common/ut/env_ut.cpp +++ b/library/cpp/testing/common/ut/env_ut.cpp @@ -45,24 +45,6 @@ TEST(Runtime, BinaryPath) { EXPECT_TRUE(TFsPath(BinaryPath("library/cpp/testing/common/ut")).Exists()); } -TEST(Runtime, GetArcadiaTestsData) { - NTesting::TScopedEnvironment contextGuard("YA_TEST_CONTEXT_FILE", ""); // remove context filename - { - auto tmpDir = ::GetSystemTempDir(); - NTesting::TScopedEnvironment guard("ARCADIA_TESTS_DATA_DIR", tmpDir); - Singleton<NPrivate::TTestEnv>()->ReInitialize(); - EXPECT_EQ(tmpDir, GetArcadiaTestsData()); - } - { - NTesting::TScopedEnvironment guard("ARCADIA_TESTS_DATA_DIR", ""); - Singleton<NPrivate::TTestEnv>()->ReInitialize(); - auto path = GetArcadiaTestsData(); - // it is not error if path is empty - const bool ok = (path.empty() || GetBaseName(path) == "arcadia_tests_data"); - EXPECT_TRUE(ok); - } -} - TEST(Runtime, GetWorkPath) { NTesting::TScopedEnvironment contextGuard("YA_TEST_CONTEXT_FILE", ""); // remove context filename { diff --git a/library/python/testing/yatest_common/yatest/common/runtime.py b/library/python/testing/yatest_common/yatest/common/runtime.py index e4f246d5c8..99a6799109 100644 --- a/library/python/testing/yatest_common/yatest/common/runtime.py +++ b/library/python/testing/yatest_common/yatest/common/runtime.py @@ -190,9 +190,9 @@ def data_path(path=None): :param path: path relative to the arcadia_tests_data directory, e.g. yatest.common.data_path("pers/rerank_service") :return: absolute path inside arcadia_tests_data """ - if "USE_ATD_FROM_SNAPSHOT" in os.environ: - return os.path.join(source_path(), "atd_ro_snapshot", path) - return _join_path(_get_ya_plugin_instance().data_root, path) + if "USE_ATD_FROM_ATD" in os.environ: + return _join_path(_get_ya_plugin_instance().data_root, path) + return _join_path(source_path("atd_ro_snapshot"), path) @default_arg0 diff --git a/yql/essentials/core/common_opt/yql_co_pgselect.cpp b/yql/essentials/core/common_opt/yql_co_pgselect.cpp index 93b307aab7..e8d37492a8 100644 --- a/yql/essentials/core/common_opt/yql_co_pgselect.cpp +++ b/yql/essentials/core/common_opt/yql_co_pgselect.cpp @@ -1022,7 +1022,7 @@ TUsedColumns GatherUsedColumns(const TExprNode::TPtr& result, const TExprNode::T void FillInputIndices(const TExprNode::TPtr& from, const TExprNode::TPtr& finalExtTypes, TUsedColumns& usedColumns, TOptimizeContext& optCtx) { for (auto& x : usedColumns) { - TStringBuf alias; + TString alias; TStringBuf column = NTypeAnnImpl::RemoveAlias(x.first, alias); bool foundColumn = false; diff --git a/yql/essentials/core/type_ann/type_ann_pg.cpp b/yql/essentials/core/type_ann/type_ann_pg.cpp index d0d73f22bb..78a035b70e 100644 --- a/yql/essentials/core/type_ann/type_ann_pg.cpp +++ b/yql/essentials/core/type_ann/type_ann_pg.cpp @@ -346,7 +346,7 @@ IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode TVector<const TItemExprType*> items; for (size_t i = 0; i < proc.OutputArgTypes.size(); ++i) { items.push_back(ctx.Expr.MakeType<TItemExprType>( - resultColumnOrder->AddColumn(proc.OutputArgNames[i]), + resultColumnOrder->AddColumn(proc.OutputArgNames[i]), ctx.Expr.MakeType<TPgExprType>(proc.OutputArgTypes[i]))); } @@ -697,7 +697,7 @@ IGraphTransformer::TStatus PgArrayOpWrapper(const TExprNode::TPtr& input, TExprN const auto& opResDesc = NPg::LookupType(oper.ResultType); if (opResDesc.Name != "bool") { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), TStringBuilder() << + ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), TStringBuilder() << "Expected boolean operator result, but got: " << opResDesc.Name)); return IGraphTransformer::TStatus::Error; } @@ -1021,7 +1021,7 @@ IGraphTransformer::TStatus PgNullIfWrapper(const TExprNode::TPtr& input, TExprNo input->ChildRef(0) = WrapWithPgCast(std::move(input->ChildRef(0)), commonType->TypeId, ctx.Expr); return IGraphTransformer::TStatus::Repeat; } - + input->SetTypeAnn(ctx.Expr.MakeType<TPgExprType>(commonType->TypeId)); return IGraphTransformer::TStatus::Ok; } @@ -1144,7 +1144,7 @@ IGraphTransformer::TStatus PgReplaceUnknownWrapper(const TExprNode::TPtr& input, auto structType = listType->GetItemType()->Cast<TStructExprType>(); auto structItemTypes = structType->GetItems(); - const bool noUnknowns = std::none_of(structItemTypes.cbegin(), structItemTypes.cend(), + const bool noUnknowns = std::none_of(structItemTypes.cbegin(), structItemTypes.cend(), [] (const TItemExprType* item) { const auto* itemType = item->GetItemType(); return itemType->GetKind() == ETypeAnnotationKind::Pg && itemType->Cast<TPgExprType>()->GetId() == NPg::UnknownOid; @@ -2010,12 +2010,23 @@ bool ValidateWindowRefs(const TExprNode::TPtr& root, const TExprNode* windows, T return !isError; } +TString EscapeDotsInAlias(TStringBuf alias) { + TStringBuilder sb; + for (auto c: alias) { + if (c == '.' || c == '\\') { + sb << '\\'; + } + sb << c; + } + return sb; +} + TString MakeAliasedColumn(TStringBuf alias, TStringBuf column) { if (!alias) { return TString(column); } - return TStringBuilder() << "_alias_" << alias << "." << column; + return TStringBuilder() << "_alias_" << EscapeDotsInAlias(alias) << "." << column; } const TItemExprType* AddAlias(const TString& alias, const TItemExprType* item, TExprContext& ctx) { @@ -2027,22 +2038,30 @@ const TItemExprType* AddAlias(const TString& alias, const TItemExprType* item, T } TStringBuf RemoveAlias(TStringBuf column) { - TStringBuf tmp; + TString tmp; return RemoveAlias(column, tmp); } -TStringBuf RemoveAlias(TStringBuf column, TStringBuf& alias) { +TStringBuf RemoveAlias(TStringBuf column, TString& alias) { if (!column.StartsWith("_alias_")) { alias = ""; return column; } - - auto columnPos = column.find('.', 7); - YQL_ENSURE(columnPos != TString::npos); - columnPos += 1; - YQL_ENSURE(columnPos != column.size()); - alias = column.substr(7, columnPos - 7 - 1); - return column.substr(columnPos); + column = column.substr(7); + TStringBuilder aliasBuilder; + for (size_t i = 0; i < column.size(); ++i) { + if (column[i] == '\\') { + YQL_ENSURE(i + 1 < column.size()); + aliasBuilder << column[++i]; + continue; + } + if (column[i] == '.') { + alias = aliasBuilder; + return column.substr(i + 1); + } + aliasBuilder << column[i]; + } + YQL_ENSURE(false, "No dot ('.') found in alised column"); } const TItemExprType* RemoveAlias(const TItemExprType* item, TExprContext& ctx) { @@ -2877,7 +2896,7 @@ bool ReplaceProjectionRefs(TExprNode::TPtr& lambda, const TStringBuf& scope, con for (ui32 i = 0; i < projectionOrders.size(); ++i) { if (index >= current && index < current + projectionOrders[i]->first.Size()) { TStringBuf column = projectionOrders[i]->first[index - current].PhysicalName; - TStringBuf alias; + TString alias; column = RemoveAlias(column, alias); if (result && projectionOrders[i]->second) { @@ -3346,7 +3365,7 @@ bool GatherExtraSortColumns(const TExprNode& data, const TInputs& inputs, TExprN } if (node->IsCallable("Member") && &node->Head() == arg) { - TStringBuf alias; + TString alias; TStringBuf column = NTypeAnnImpl::RemoveAlias(node->Tail().Content(), alias); TMaybe<ui32> index; @@ -3495,7 +3514,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN "Incorrect fill_target_columns option")); return IGraphTransformer::TStatus::Error; } - } + } else if (optionName == "unknowns_allowed") { hasUnknownsAllowed = true; } @@ -4456,7 +4475,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN return IGraphTransformer::TStatus::Error; } } - + if (rightSideUsing.contains(lcase)) { lrNames[1] = ctx.Expr.NewList(inp->Pos(), {ctx.Expr.NewAtom(inp->Pos(), rightSideUsing[lcase])}); } else { @@ -5442,7 +5461,7 @@ IGraphTransformer::TStatus PgArrayWrapper(const TExprNode::TPtr& input, TExprNod bool castsNeeded = false; const NPg::TTypeDesc* elemTypeDesc; - if (const auto issue = NPg::LookupCommonType(argTypes, + if (const auto issue = NPg::LookupCommonType(argTypes, [&input, &ctx](size_t i) { return ctx.Expr.GetPosition(input->Child(i)->Pos()); }, elemTypeDesc, castsNeeded)) diff --git a/yql/essentials/core/type_ann/type_ann_pg.h b/yql/essentials/core/type_ann/type_ann_pg.h index 85ca8285ef..042d2c0092 100644 --- a/yql/essentials/core/type_ann/type_ann_pg.h +++ b/yql/essentials/core/type_ann/type_ann_pg.h @@ -12,7 +12,7 @@ TExprNodePtr WrapWithPgCast(TExprNodePtr&& node, ui32 targetTypeId, TExprContext TString MakeAliasedColumn(TStringBuf alias, TStringBuf column); const TItemExprType* AddAlias(const TString& alias, const TItemExprType* item, TExprContext& ctx); TStringBuf RemoveAlias(TStringBuf column); -TStringBuf RemoveAlias(TStringBuf column, TStringBuf& alias); +TStringBuf RemoveAlias(TStringBuf column, TString& alias); const TItemExprType* RemoveAlias(const TItemExprType* item, TExprContext& ctx); TMap<TString, ui32> ExtractExternalColumns(const TExprNode& select); bool IsPlainMemberOverArg(const TExprNode& expr, TStringBuf& memberName); diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp index 08986bc8fd..871321786b 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp @@ -643,12 +643,15 @@ private: } void SwitchMode(EOperatingMode mode, TComputationContext& ctx) { + LogMemoryUsage(); switch(mode) { case EOperatingMode::InMemory: { + YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to InMemory"; MKQL_ENSURE(false, "Internal logic error"); break; } case EOperatingMode::Spilling: { + YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling"; MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error"); auto spiller = ctx.SpillerFactory->CreateSpiller(); RightPacker->TablePtr->InitializeBucketSpillers(spiller); @@ -656,6 +659,7 @@ private: break; } case EOperatingMode::ProcessSpilled: { + YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to ProcessSpilled"; SpilledBucketsJoinOrder.reserve(GraceJoin::NumberOfBuckets); for (ui32 i = 0; i < GraceJoin::NumberOfBuckets; ++i) SpilledBucketsJoinOrder.push_back(i); @@ -843,9 +847,6 @@ private: if (isYield == EFetchResult::One) return isYield; if (IsSpillingAllowed && ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) { - LogMemoryUsage(); - YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling"; - SwitchMode(EOperatingMode::Spilling, ctx); return EFetchResult::Yield; } @@ -861,14 +862,18 @@ private: << " HaveLeft " << *HaveMoreLeftRows << " LeftPacked " << LeftPacker->TuplesBatchPacked << " LeftBatch " << LeftPacker->BatchSize << " HaveRight " << *HaveMoreRightRows << " RightPacked " << RightPacker->TuplesBatchPacked << " RightBatch " << RightPacker->BatchSize ; + + auto& leftTable = *LeftPacker->TablePtr; + auto& rightTable = SelfJoinSameKeys_ ? *LeftPacker->TablePtr : *RightPacker->TablePtr; + if (IsSpillingAllowed && ctx.SpillerFactory && !JoinedTablePtr->TryToPreallocateMemoryForJoin(leftTable, rightTable, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows)) { + SwitchMode(EOperatingMode::Spilling, ctx); + return EFetchResult::Yield; + } + *PartialJoinCompleted = true; LeftPacker->StartTime = std::chrono::system_clock::now(); RightPacker->StartTime = std::chrono::system_clock::now(); - if ( SelfJoinSameKeys_ ) { - JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); - } else { - JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); - } + JoinedTablePtr->Join(leftTable, rightTable, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); JoinedTablePtr->ResetIterator(); LeftPacker->EndTime = std::chrono::system_clock::now(); RightPacker->EndTime = std::chrono::system_clock::now(); @@ -945,7 +950,6 @@ EFetchResult DoCalculateWithSpilling(TComputationContext& ctx, NUdf::TUnboxedVal } if (!IsReadyForSpilledDataProcessing()) return EFetchResult::Yield; - YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching to ProcessSpilled"; SwitchMode(EOperatingMode::ProcessSpilled, ctx); return EFetchResult::Finish; } diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp index f9b19fdfbc..ae7c89daef 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -320,6 +320,63 @@ void ResizeHashTable(KeysHashTable &t, ui64 newSlots){ } +bool IsTablesSwapRequired(ui64 tuplesNum1, ui64 tuplesNum2, bool table1Batch, bool table2Batch) { + return tuplesNum2 > tuplesNum1 && !table1Batch || table2Batch; +} + +ui64 ComputeJoinSlotsSizeForBucket(const TTableBucket& bucket, const TTableBucketStats& bucketStats, ui64 headerSize, bool tableHasKeyStringColumns, bool tableHasKeyIColumns) { + ui64 tuplesNum = bucketStats.TuplesNum; + + ui64 avgStringsSize = (3 * (bucket.KeyIntVals.size() - tuplesNum * headerSize) ) / ( 2 * tuplesNum + 1) + 1; + ui64 slotSize = headerSize + 1; // Header [Short Strings] SlotIdx + if (tableHasKeyStringColumns || tableHasKeyIColumns) { + slotSize = slotSize + avgStringsSize; + } + + return slotSize; +} + +ui64 ComputeNumberOfSlots(ui64 tuplesNum) { + return (3 * tuplesNum + 1) | 1; +} + +bool TTable::TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples) { + // If the batch is final or the only one, then the buckets are processed sequentially, the memory for the hash tables is freed immediately after processing. + // So, no preallocation is required. + if (!hasMoreLeftTuples && !hasMoreRightTuples) return true; + + for (ui64 bucket = 0; bucket < GraceJoin::NumberOfBuckets; bucket++) { + ui64 tuplesNum1 = t1.TableBucketsStats[bucket].TuplesNum; + ui64 tuplesNum2 = t2.TableBucketsStats[bucket].TuplesNum; + + TTable& tableForPreallocation = IsTablesSwapRequired(tuplesNum1, tuplesNum2, hasMoreLeftTuples || LeftTableBatch_, hasMoreRightTuples || RightTableBatch_) ? t1 : t2; + if (!tableForPreallocation.TableBucketsStats[bucket].TuplesNum || tableForPreallocation.TableBuckets[bucket].NSlots) continue; + + TTableBucket& bucketForPreallocation = tableForPreallocation.TableBuckets[bucket]; + const TTableBucketStats& bucketForPreallocationStats = tableForPreallocation.TableBucketsStats[bucket]; + + const auto nSlots = ComputeJoinSlotsSizeForBucket(bucketForPreallocation, bucketForPreallocationStats, tableForPreallocation.HeaderSize, + tableForPreallocation.NumberOfKeyStringColumns != 0, tableForPreallocation.NumberOfKeyIColumns != 0); + const auto slotSize = ComputeNumberOfSlots(tableForPreallocation.TableBucketsStats[bucket].TuplesNum); + + try { + bucketForPreallocation.JoinSlots.reserve(nSlots*slotSize); + } catch (TMemoryLimitExceededException) { + for (ui64 i = 0; i < bucket; ++i) { + GraceJoin::TTableBucket * b1 = &JoinTable1->TableBuckets[i]; + b1->JoinSlots.resize(0); + b1->JoinSlots.shrink_to_fit(); + GraceJoin::TTableBucket * b2 = &JoinTable2->TableBuckets[i]; + b2->JoinSlots.resize(0); + b2->JoinSlots.shrink_to_fit(); + } + return false; + } + } + + return true; +} + // Joins two tables and returns join result in joined table. Tuples of joined table could be received by // joined table iterator @@ -368,7 +425,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef bool table2HasKeyStringColumns = (JoinTable2->NumberOfKeyStringColumns != 0); bool table1HasKeyIColumns = (JoinTable1->NumberOfKeyIColumns != 0); bool table2HasKeyIColumns = (JoinTable2->NumberOfKeyIColumns != 0); - bool swapTables = tuplesNum2 > tuplesNum1 && !table1Batch || table2Batch; + bool swapTables = IsTablesSwapRequired(tuplesNum1, tuplesNum2, table1Batch, table2Batch); if (swapTables) { @@ -402,13 +459,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef if (tuplesNum1 == 0 && (hasMoreRightTuples || hasMoreLeftTuples || !bucketStats2->HashtableMatches)) continue; - ui64 slotSize = headerSize2 + 1; // Header [Short Strings] SlotIdx - - ui64 avgStringsSize = ( 3 * (bucket2->KeyIntVals.size() - tuplesNum2 * headerSize2) ) / ( 2 * tuplesNum2 + 1) + 1; - - if (table2HasKeyStringColumns || table2HasKeyIColumns ) { - slotSize = slotSize + avgStringsSize; - } + ui64 slotSize = ComputeJoinSlotsSizeForBucket(*bucket2, *bucketStats2, headerSize2, table2HasKeyStringColumns, table2HasKeyIColumns); ui64 &nSlots = bucket2->NSlots; auto &joinSlots = bucket2->JoinSlots; @@ -417,7 +468,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef Y_DEBUG_ABORT_UNLESS(bucketStats2->SlotSize == 0 || bucketStats2->SlotSize == slotSize); if (!nSlots) { - nSlots = (3 * tuplesNum2 + 1) | 1; + nSlots = ComputeNumberOfSlots(tuplesNum2); joinSlots.resize(nSlots*slotSize, 0); bloomFilter.Resize(tuplesNum2); initHashTable = true; diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h index a4846926d1..d6b9a54aca 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h @@ -346,6 +346,8 @@ public: // Returns value of next tuple. Returs true if there are more tuples bool NextTuple(TupleData& td); + bool TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples); + // Joins two tables and stores join result in table data. Tuples of joined table could be received by // joined table iterator. Life time of t1, t2 should be greater than lifetime of joined table // hasMoreLeftTuples, hasMoreRightTuples is true if join is partial and more rows are coming. For final batch hasMoreLeftTuples = false, hasMoreRightTuples = false diff --git a/yql/essentials/minikql/comp_nodes/mkql_match_recognize.cpp b/yql/essentials/minikql/comp_nodes/mkql_match_recognize.cpp index 0f758c7a8b..80caaad37e 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_match_recognize.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_match_recognize.cpp @@ -54,7 +54,7 @@ public: ) : PartitionKey(std::move(partitionKey)) , Parameters(parameters) - , Nfa(nfaTransitions, parameters.MatchedVarsArg, parameters.Defines) + , Nfa(nfaTransitions, parameters.MatchedVarsArg, parameters.Defines, parameters.SkipTo) , Cache(cache) { } @@ -72,10 +72,10 @@ public: NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) { auto match = Nfa.GetMatched(); - if (!match.has_value()) { + if (!match) { return NUdf::TUnboxedValue{}; } - Parameters.MatchedVarsArg->SetValue(ctx, ctx.HolderFactory.Create<TMatchedVarsValue<TRange>>(ctx.HolderFactory, match.value())); + Parameters.MatchedVarsArg->SetValue(ctx, ctx.HolderFactory.Create<TMatchedVarsValue<TRange>>(ctx.HolderFactory, match->Vars)); Parameters.MeasureInputDataArg->SetValue(ctx, ctx.HolderFactory.Create<TMeasureInputDataValue>( ctx.HolderFactory.Create<TListValue<TSparseList>>(Rows), Parameters.MeasureInputColumnOrder, @@ -95,9 +95,7 @@ public: break; } } - if (EAfterMatchSkipTo::PastLastRow == Parameters.SkipTo.To) { - Nfa.Clear(); - } + Nfa.AfterMatchSkip(*match); return result; } bool ProcessEndOfData(TComputationContext& ctx) { diff --git a/yql/essentials/minikql/comp_nodes/mkql_match_recognize_nfa.h b/yql/essentials/minikql/comp_nodes/mkql_match_recognize_nfa.h index 944164e4bc..2b194212f4 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_match_recognize_nfa.h +++ b/yql/essentials/minikql/comp_nodes/mkql_match_recognize_nfa.h @@ -350,19 +350,25 @@ class TNfa { using TRange = TSparseList::TRange; using TMatchedVars = TMatchedVars<TRange>; +public: + struct TMatch { + size_t BeginIndex; + size_t EndIndex; + TMatchedVars Vars; + }; + +private: struct TState { - size_t BeginMatchIndex; - size_t EndMatchIndex; size_t Index; - TMatchedVars Vars; + TMatch Match; std::deque<ui64, TMKQLAllocator<ui64>> Quantifiers; void Save(TMrOutputSerializer& serializer) const { - serializer.Write(BeginMatchIndex); - serializer.Write(EndMatchIndex); serializer.Write(Index); - serializer.Write(Vars.size()); - for (const auto& vector : Vars) { + serializer.Write(Match.BeginIndex); + serializer.Write(Match.EndIndex); + serializer.Write(Match.Vars.size()); + for (const auto& vector : Match.Vars) { serializer.Write(vector.size()); for (const auto& range : vector) { range.Save(serializer); @@ -375,13 +381,13 @@ class TNfa { } void Load(TMrInputSerializer& serializer) { - serializer.Read(BeginMatchIndex); - serializer.Read(EndMatchIndex); serializer.Read(Index); + serializer.Read(Match.BeginIndex); + serializer.Read(Match.EndIndex); auto varsSize = serializer.Read<TMatchedVars::size_type>(); - Vars.clear(); - Vars.resize(varsSize); - for (auto& subvec: Vars) { + Match.Vars.clear(); + Match.Vars.resize(varsSize); + for (auto& subvec: Match.Vars) { ui64 vectorSize = serializer.Read<ui64>(); subvec.resize(vectorSize); for (auto& item : subvec) { @@ -397,24 +403,29 @@ class TNfa { } friend inline bool operator<(const TState& lhs, const TState& rhs) { - auto lhsEndMatchIndex = -static_cast<i64>(lhs.EndMatchIndex); - auto rhsEndMatchIndex = -static_cast<i64>(rhs.EndMatchIndex); - return std::tie(lhs.BeginMatchIndex, lhsEndMatchIndex, lhs.Index, lhs.Quantifiers, lhs.Vars) < std::tie(rhs.BeginMatchIndex, rhsEndMatchIndex, rhs.Index, rhs.Quantifiers, rhs.Vars); + auto lhsMatchEndIndex = -static_cast<i64>(lhs.Match.EndIndex); + auto rhsMatchEndIndex = -static_cast<i64>(rhs.Match.EndIndex); + return std::tie(lhs.Match.BeginIndex, lhsMatchEndIndex, lhs.Index, lhs.Match.Vars, lhs.Quantifiers) < std::tie(rhs.Match.BeginIndex, rhsMatchEndIndex, rhs.Index, rhs.Match.Vars, rhs.Quantifiers); } friend inline bool operator==(const TState& lhs, const TState& rhs) { - return std::tie(lhs.BeginMatchIndex, lhs.EndMatchIndex, lhs.Index, lhs.Quantifiers, lhs.Vars) == std::tie(rhs.BeginMatchIndex, rhs.EndMatchIndex, rhs.Index, rhs.Quantifiers, rhs.Vars); + return std::tie(lhs.Match.BeginIndex, lhs.Match.EndIndex, lhs.Index, lhs.Match.Vars, lhs.Quantifiers) == std::tie(rhs.Match.BeginIndex, rhs.Match.EndIndex, rhs.Index, rhs.Match.Vars, rhs.Quantifiers); } }; public: - TNfa(TNfaTransitionGraph::TPtr transitionGraph, IComputationExternalNode* matchedRangesArg, const TComputationNodePtrVector& defines) - : TransitionGraph(transitionGraph) - , MatchedRangesArg(matchedRangesArg) - , Defines(defines) { - } + TNfa( + TNfaTransitionGraph::TPtr transitionGraph, + IComputationExternalNode* matchedRangesArg, + const TComputationNodePtrVector& defines, + TAfterMatchSkipTo skipTo) + : TransitionGraph(transitionGraph) + , MatchedRangesArg(matchedRangesArg) + , Defines(defines) + , SkipTo_(skipTo) + {} void ProcessRow(TSparseList::TRange&& currentRowLock, TComputationContext& ctx) { - TState state(currentRowLock.From(), currentRowLock.To(), TransitionGraph->Input, TMatchedVars(Defines.size()), std::deque<ui64, TMKQLAllocator<ui64>>{}); + TState state(TransitionGraph->Input, TMatch{currentRowLock.From(), currentRowLock.To(), TMatchedVars(Defines.size())}, std::deque<ui64, TMKQLAllocator<ui64>>{}); Insert(std::move(state)); MakeEpsilonTransitions(); TStateSet newStates; @@ -423,17 +434,17 @@ public: //Here we handle only transitions of TMatchedVarTransition type, //all other transitions are handled in MakeEpsilonTransitions if (const auto* matchedVarTransition = std::get_if<TMatchedVarTransition>(&TransitionGraph->Transitions[state.Index])) { - MatchedRangesArg->SetValue(ctx, ctx.HolderFactory.Create<TMatchedVarsValue<TRange>>(ctx.HolderFactory, state.Vars)); + MatchedRangesArg->SetValue(ctx, ctx.HolderFactory.Create<TMatchedVarsValue<TRange>>(ctx.HolderFactory, state.Match.Vars)); const auto varIndex = matchedVarTransition->VarIndex; const auto& v = Defines[varIndex]->GetValue(ctx); if (v && v.Get<bool>()) { if (matchedVarTransition->SaveState) { - auto vars = state.Vars; //TODO get rid of this copy + auto vars = state.Match.Vars; //TODO get rid of this copy auto& matchedVar = vars[varIndex]; Extend(matchedVar, currentRowLock); - newStates.emplace(state.BeginMatchIndex, currentRowLock.To(), matchedVarTransition->To, std::move(vars), state.Quantifiers); + newStates.emplace(matchedVarTransition->To, TMatch{state.Match.BeginIndex, currentRowLock.To(), std::move(vars)}, state.Quantifiers); } else { - newStates.emplace(state.BeginMatchIndex, currentRowLock.To(), matchedVarTransition->To, state.Vars, state.Quantifiers); + newStates.emplace(matchedVarTransition->To, TMatch{state.Match.BeginIndex, currentRowLock.To(), state.Match.Vars}, state.Quantifiers); } } deletedStates.insert(state); @@ -450,8 +461,8 @@ public: bool HasMatched() const { for (auto& state: ActiveStates) { - if (auto activeStateIter = ActiveStateCounters.find(state.BeginMatchIndex), - finishedStateIter = FinishedStateCounters.find(state.BeginMatchIndex); + if (auto activeStateIter = ActiveStateCounters.find(state.Match.BeginIndex), + finishedStateIter = FinishedStateCounters.find(state.Match.BeginIndex); ((activeStateIter != ActiveStateCounters.end() && finishedStateIter != FinishedStateCounters.end() && activeStateIter->second == finishedStateIter->second) || @@ -463,16 +474,16 @@ public: return false; } - std::optional<TMatchedVars> GetMatched() { + std::optional<TMatch> GetMatched() { for (auto& state: ActiveStates) { - if (auto activeStateIter = ActiveStateCounters.find(state.BeginMatchIndex), - finishedStateIter = FinishedStateCounters.find(state.BeginMatchIndex); + if (auto activeStateIter = ActiveStateCounters.find(state.Match.BeginIndex), + finishedStateIter = FinishedStateCounters.find(state.Match.BeginIndex); ((activeStateIter != ActiveStateCounters.end() && finishedStateIter != FinishedStateCounters.end() && activeStateIter->second == finishedStateIter->second) || EndOfData) && state.Index == TransitionGraph->Output) { - auto result = state.Vars; + auto result = state.Match; Erase(std::move(state)); return result; } @@ -515,9 +526,9 @@ public: auto activeStateCountersSize = serializer.Read<ui64>(); for (size_t i = 0; i < activeStateCountersSize; ++i) { using map_type = decltype(ActiveStateCounters); - auto beginMatchIndex = serializer.Read<map_type::key_type>(); + auto matchBeginIndex = serializer.Read<map_type::key_type>(); auto counter = serializer.Read<map_type::mapped_type>(); - ActiveStateCounters.emplace(beginMatchIndex, counter); + ActiveStateCounters.emplace(matchBeginIndex, counter); } } { @@ -525,9 +536,9 @@ public: auto finishedStateCountersSize = serializer.Read<ui64>(); for (size_t i = 0; i < finishedStateCountersSize; ++i) { using map_type = decltype(FinishedStateCounters); - auto beginMatchIndex = serializer.Read<map_type::key_type>(); + auto matchBeginIndex = serializer.Read<map_type::key_type>(); auto counter = serializer.Read<map_type::mapped_type>(); - FinishedStateCounters.emplace(beginMatchIndex, counter); + FinishedStateCounters.emplace(matchBeginIndex, counter); } } } @@ -537,10 +548,31 @@ public: return HasMatched(); } - void Clear() { - ActiveStates.clear(); - ActiveStateCounters.clear(); - FinishedStateCounters.clear(); + void AfterMatchSkip(const TMatch& match) { + const auto skipToRowIndex = [&]() { + switch (SkipTo_.To) { + case EAfterMatchSkipTo::NextRow: + return match.BeginIndex + 1; + case EAfterMatchSkipTo::PastLastRow: + return match.EndIndex + 1; + case EAfterMatchSkipTo::ToFirst: + MKQL_ENSURE(false, "AFTER MATCH SKIP TO FIRST is not implemented yet"); + case EAfterMatchSkipTo::ToLast: + [[fallthrough]]; + case EAfterMatchSkipTo::To: + MKQL_ENSURE(false, "AFTER MATCH SKIP TO LAST is not implemented yet"); + } + }(); + + TStateSet deletedStates; + for (const auto& state : ActiveStates) { + if (state.Match.BeginIndex < skipToRowIndex) { + deletedStates.insert(state); + } + } + for (auto& state : deletedStates) { + Erase(std::move(state)); + } } private: @@ -561,14 +593,14 @@ private: [&](const TEpsilonTransitions& epsilonTransitions) { deletedStates.insert(state); for (const auto& i : epsilonTransitions.To) { - newStates.emplace(state.BeginMatchIndex, state.EndMatchIndex, i, state.Vars, state.Quantifiers); + newStates.emplace(i, TMatch{state.Match.BeginIndex, state.Match.EndIndex, state.Match.Vars}, state.Quantifiers); } }, [&](const TQuantityEnterTransition& quantityEnterTransition) { deletedStates.insert(state); auto quantifiers = state.Quantifiers; //TODO get rid of this copy quantifiers.push_back(0); - newStates.emplace(state.BeginMatchIndex, state.EndMatchIndex, quantityEnterTransition.To, state.Vars, std::move(quantifiers)); + newStates.emplace(quantityEnterTransition.To, TMatch{state.Match.BeginIndex, state.Match.EndIndex, state.Match.Vars}, std::move(quantifiers)); }, [&](const TQuantityExitTransition& quantityExitTransition) { deletedStates.insert(state); @@ -576,12 +608,12 @@ private: if (state.Quantifiers.back() + 1 < quantityMax) { auto q = state.Quantifiers; q.back()++; - newStates.emplace(state.BeginMatchIndex, state.EndMatchIndex, toFindMore, state.Vars, std::move(q)); + newStates.emplace(toFindMore, TMatch{state.Match.BeginIndex, state.Match.EndIndex, state.Match.Vars}, std::move(q)); } if (quantityMin <= state.Quantifiers.back() + 1 && state.Quantifiers.back() + 1 <= quantityMax) { auto q = state.Quantifiers; q.pop_back(); - newStates.emplace(state.BeginMatchIndex, state.EndMatchIndex, toMatched, state.Vars, std::move(q)); + newStates.emplace(toMatched, TMatch{state.Match.BeginIndex, state.Match.EndIndex, state.Match.Vars}, std::move(q)); } }, }, TransitionGraph->Transitions[state.Index]); @@ -610,22 +642,22 @@ private: } void Insert(TState state) { - auto beginMatchIndex = state.BeginMatchIndex; + auto matchBeginIndex = state.Match.BeginIndex; const auto& transition = TransitionGraph->Transitions[state.Index]; auto diff = static_cast<i64>(ActiveStates.insert(std::move(state)).second); - Add(ActiveStateCounters, beginMatchIndex, diff); + Add(ActiveStateCounters, matchBeginIndex, diff); if (std::holds_alternative<TVoidTransition>(transition)) { - Add(FinishedStateCounters, beginMatchIndex, diff); + Add(FinishedStateCounters, matchBeginIndex, diff); } } void Erase(TState state) { - auto beginMatchIndex = state.BeginMatchIndex; + auto matchBeginIndex = state.Match.BeginIndex; const auto& transition = TransitionGraph->Transitions[state.Index]; auto diff = -static_cast<i64>(ActiveStates.erase(std::move(state))); - Add(ActiveStateCounters, beginMatchIndex, diff); + Add(ActiveStateCounters, matchBeginIndex, diff); if (std::holds_alternative<TVoidTransition>(transition)) { - Add(FinishedStateCounters, beginMatchIndex, diff); + Add(FinishedStateCounters, matchBeginIndex, diff); } } @@ -636,6 +668,7 @@ private: THashMap<size_t, i64> ActiveStateCounters; THashMap<size_t, i64> FinishedStateCounters; bool EndOfData = false; + TAfterMatchSkipTo SkipTo_; }; }//namespace NKikimr::NMiniKQL::NMatchRecognize diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_match_recognize_nfa_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_match_recognize_nfa_ut.cpp index 745c88e084..afdbd6b8e8 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_match_recognize_nfa_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_match_recognize_nfa_ut.cpp @@ -59,7 +59,7 @@ struct TNfaSetup { for (auto& d: Defines) { defines.push_back(d); } - return TNfa(transitionGraph, MatchedVars, defines); + return TNfa(transitionGraph, MatchedVars, defines, TAfterMatchSkipTo{EAfterMatchSkipTo::PastLastRow, ""}); } TComputationNodeFactory GetAuxCallableFactory() { @@ -261,8 +261,8 @@ Y_UNIT_TEST_SUITE(MatchRecognizeNfa) { Iota(expectedTo.begin(), expectedTo.end(), i - seriesLength + 1); for (size_t matchCount = 0; matchCount < seriesLength; ++matchCount) { auto match = setup.Nfa.GetMatched(); - UNIT_ASSERT_C(match.has_value(), i); - auto vars = match.value(); + UNIT_ASSERT_C(match, i); + auto vars = match->Vars; UNIT_ASSERT_VALUES_EQUAL_C(1, vars.size(), i); auto var = vars[0]; UNIT_ASSERT_VALUES_EQUAL_C(1, var.size(), i); diff --git a/yql/essentials/sql/v1/sql_query.cpp b/yql/essentials/sql/v1/sql_query.cpp index 2e213d02ad..e17dcf09aa 100644 --- a/yql/essentials/sql/v1/sql_query.cpp +++ b/yql/essentials/sql/v1/sql_query.cpp @@ -67,7 +67,8 @@ static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out, }; TSet<TString> modeSettings = { - "consistency_mode", + "consistency_mode", // TODO(ilnaz): deprecated + "consistency_level", "commit_interval", }; diff --git a/yql/essentials/sql/v1/sql_ut.cpp b/yql/essentials/sql/v1/sql_ut.cpp index e0d243929f..3ef964fa0d 100644 --- a/yql/essentials/sql/v1/sql_ut.cpp +++ b/yql/essentials/sql/v1/sql_ut.cpp @@ -3086,11 +3086,11 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { { auto req = R"( USE plato; - ALTER ASYNC REPLICATION MyReplication SET (CONSISTENCY_MODE = "STRONG"); + ALTER ASYNC REPLICATION MyReplication SET (CONSISTENCY_LEVEL = "GLOBAL"); )"; auto res = SqlToYql(req); UNIT_ASSERT(!res.Root); - UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:79: Error: CONSISTENCY_MODE is not supported in ALTER\n"); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:80: Error: CONSISTENCY_LEVEL is not supported in ALTER\n"); } { auto req = R"( diff --git a/yql/essentials/sql/v1/sql_ut_antlr4.cpp b/yql/essentials/sql/v1/sql_ut_antlr4.cpp index c561512136..c3c5f9c983 100644 --- a/yql/essentials/sql/v1/sql_ut_antlr4.cpp +++ b/yql/essentials/sql/v1/sql_ut_antlr4.cpp @@ -3086,11 +3086,11 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { { auto req = R"( USE plato; - ALTER ASYNC REPLICATION MyReplication SET (CONSISTENCY_MODE = "STRONG"); + ALTER ASYNC REPLICATION MyReplication SET (CONSISTENCY_LEVEL = "GLOBAL"); )"; auto res = SqlToYql(req); UNIT_ASSERT(!res.Root); - UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:79: Error: CONSISTENCY_MODE is not supported in ALTER\n"); + UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:80: Error: CONSISTENCY_LEVEL is not supported in ALTER\n"); } { auto req = R"( diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json index 47f93caece..e73aa91e9e 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/result.json +++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json @@ -11180,30 +11180,30 @@ ], "test_sql2yql.test[match_recognize-alerts-streaming]": [ { - "checksum": "951e1c6d04cbba2370c2a5e5542d6afd", - "size": 10537, - "uri": "https://{canondata_backend}/1942671/e054628d5e2733e5fbc993cb8c765074de561f06/resource.tar.gz#test_sql2yql.test_match_recognize-alerts-streaming_/sql.yql" + "checksum": "931c9266d12e54a59fb0cd3570c3ccc0", + "size": 9765, + "uri": "https://{canondata_backend}/1903885/44ceadc17896d9c66a7580ce6886870c575e270b/resource.tar.gz#test_sql2yql.test_match_recognize-alerts-streaming_/sql.yql" } ], "test_sql2yql.test[match_recognize-alerts]": [ { - "checksum": "67843aad123bb76d097416bf83bbe749", - "size": 10539, - "uri": "https://{canondata_backend}/1942671/e054628d5e2733e5fbc993cb8c765074de561f06/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_/sql.yql" + "checksum": "81ff4044da026f2de566bc73e499da3d", + "size": 9767, + "uri": "https://{canondata_backend}/1903885/44ceadc17896d9c66a7580ce6886870c575e270b/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_/sql.yql" } ], "test_sql2yql.test[match_recognize-alerts_without_order]": [ { - "checksum": "3260928c483690ee0d3f06917cd1d5cb", - "size": 10420, - "uri": "https://{canondata_backend}/1942671/e054628d5e2733e5fbc993cb8c765074de561f06/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_without_order_/sql.yql" + "checksum": "adf84392f4dd1db143484be8cbbda16c", + "size": 9648, + "uri": "https://{canondata_backend}/1903885/44ceadc17896d9c66a7580ce6886870c575e270b/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_without_order_/sql.yql" } ], "test_sql2yql.test[match_recognize-greedy_quantifiers]": [ { - "checksum": "45d528f98097150c41366da075dc08c8", - "size": 4257, - "uri": "https://{canondata_backend}/1130705/7ccb4b448efbf0410ae97c635cda187d108148f4/resource.tar.gz#test_sql2yql.test_match_recognize-greedy_quantifiers_/sql.yql" + "checksum": "c6b4102b3bf241e7b309e8cc93aaf76c", + "size": 4349, + "uri": "https://{canondata_backend}/1903885/44ceadc17896d9c66a7580ce6886870c575e270b/resource.tar.gz#test_sql2yql.test_match_recognize-greedy_quantifiers_/sql.yql" } ], "test_sql2yql.test[match_recognize-permute]": [ @@ -12368,6 +12368,13 @@ "uri": "https://{canondata_backend}/1937429/434276f26b2857be3c5ad3fdbbf877d2bf775ac5/resource.tar.gz#test_sql2yql.test_pg-aggregate_scalar_minus_zero_/sql.yql" } ], + "test_sql2yql.test[pg-aliased_columns]": [ + { + "checksum": "7ca1f0f6ce1395c4d6f826660886df83", + "size": 953, + "uri": "https://{canondata_backend}/1777230/27c189b00ecc5d4153899123da0bf3c72d8cfd80/resource.tar.gz#test_sql2yql.test_pg-aliased_columns_/sql.yql" + } + ], "test_sql2yql.test[pg-all_data]": [ { "checksum": "e2e983a696817fe3cbebcbbf79264fd1", diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts-streaming_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts-streaming_/formatted.sql index e572efb76d..28d63923cf 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts-streaming_/formatted.sql +++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts-streaming_/formatted.sql @@ -28,13 +28,12 @@ FROM LAST(LOGIN_SUCCESS_REMOTE.user) AS remote_login_user, LAST(LOGIN_SUCCESS_REMOTE.dt) AS remote_login_dt, LAST(SUSPICIOUS_ACTION_SOON.dt) AS suspicious_action_dt, - LAST(SUSPICIOUS_ACTION_TIMEOUT.dt) AS suspicious_action_timeout_dt, FIRST(LOGIN_FAILED_SAME_USER.dt) AS brutforce_begin, FIRST(LOGIN_SUCCESS_SAME_USER.dt) AS brutforce_end, LAST(LOGIN_SUCCESS_SAME_USER.user) AS brutforce_login ONE ROW PER MATCH AFTER MATCH SKIP TO NEXT ROW - PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER) + PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * SUSPICIOUS_ACTION_SOON | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER) DEFINE LOGIN_SUCCESS_REMOTE AS LOGIN_SUCCESS_REMOTE.ev_type == "login" AND LOGIN_SUCCESS_REMOTE.ev_status == "success" @@ -44,7 +43,6 @@ FROM SUSPICIOUS_ACTION_SOON AS SUSPICIOUS_ACTION_SOON.host == LAST(LOGIN_SUCCESS_REMOTE.host) AND SUSPICIOUS_ACTION_SOON.ev_type == "delete_all" AND COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE), - SUSPICIOUS_ACTION_TIMEOUT AS COALESCE(SUSPICIOUS_ACTION_TIMEOUT.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) > 500, TRUE), LOGIN_FAILED_SAME_USER AS LOGIN_FAILED_SAME_USER.ev_type == "login" AND LOGIN_FAILED_SAME_USER.ev_status != "success" AND ( diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_/formatted.sql index 471b7f13ab..ea91539f37 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_/formatted.sql +++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_/formatted.sql @@ -28,13 +28,12 @@ FROM LAST(LOGIN_SUCCESS_REMOTE.user) AS remote_login_user, LAST(LOGIN_SUCCESS_REMOTE.dt) AS remote_login_dt, LAST(SUSPICIOUS_ACTION_SOON.dt) AS suspicious_action_dt, - LAST(SUSPICIOUS_ACTION_TIMEOUT.dt) AS suspicious_action_timeout_dt, FIRST(LOGIN_FAILED_SAME_USER.dt) AS brutforce_begin, FIRST(LOGIN_SUCCESS_SAME_USER.dt) AS brutforce_end, LAST(LOGIN_SUCCESS_SAME_USER.user) AS brutforce_login ONE ROW PER MATCH AFTER MATCH SKIP TO NEXT ROW - PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER) + PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * SUSPICIOUS_ACTION_SOON | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER) DEFINE LOGIN_SUCCESS_REMOTE AS LOGIN_SUCCESS_REMOTE.ev_type == "login" AND LOGIN_SUCCESS_REMOTE.ev_status == "success" @@ -44,7 +43,6 @@ FROM SUSPICIOUS_ACTION_SOON AS SUSPICIOUS_ACTION_SOON.host == LAST(LOGIN_SUCCESS_REMOTE.host) AND SUSPICIOUS_ACTION_SOON.ev_type == "delete_all" AND COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE), - SUSPICIOUS_ACTION_TIMEOUT AS COALESCE(SUSPICIOUS_ACTION_TIMEOUT.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) > 500, TRUE), LOGIN_FAILED_SAME_USER AS LOGIN_FAILED_SAME_USER.ev_type == "login" AND LOGIN_FAILED_SAME_USER.ev_status != "success" AND ( diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql index 70d4e9cc66..4d30431160 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql +++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql @@ -26,13 +26,12 @@ FROM LAST(LOGIN_SUCCESS_REMOTE.user) AS remote_login_user, LAST(LOGIN_SUCCESS_REMOTE.dt) AS remote_login_dt, LAST(SUSPICIOUS_ACTION_SOON.dt) AS suspicious_action_dt, - LAST(SUSPICIOUS_ACTION_TIMEOUT.dt) AS suspicious_action_timeout_dt, FIRST(LOGIN_FAILED_SAME_USER.dt) AS brutforce_begin, FIRST(LOGIN_SUCCESS_SAME_USER.dt) AS brutforce_end, LAST(LOGIN_SUCCESS_SAME_USER.user) AS brutforce_login ONE ROW PER MATCH AFTER MATCH SKIP TO NEXT ROW - PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER) + PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * SUSPICIOUS_ACTION_SOON | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER) DEFINE LOGIN_SUCCESS_REMOTE AS LOGIN_SUCCESS_REMOTE.ev_type == "login" AND LOGIN_SUCCESS_REMOTE.ev_status == "success" @@ -42,7 +41,6 @@ FROM SUSPICIOUS_ACTION_SOON AS SUSPICIOUS_ACTION_SOON.host == LAST(LOGIN_SUCCESS_REMOTE.host) AND SUSPICIOUS_ACTION_SOON.ev_type == "delete_all" AND COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE), - SUSPICIOUS_ACTION_TIMEOUT AS COALESCE(SUSPICIOUS_ACTION_TIMEOUT.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) > 500, TRUE), LOGIN_FAILED_SAME_USER AS LOGIN_FAILED_SAME_USER.ev_type == "login" AND LOGIN_FAILED_SAME_USER.ev_status != "success" AND ( diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-greedy_quantifiers_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-greedy_quantifiers_/formatted.sql index e9e70b561f..887a45e7e3 100644 --- a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-greedy_quantifiers_/formatted.sql +++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-greedy_quantifiers_/formatted.sql @@ -6,12 +6,15 @@ $input = * FROM AS_TABLE([ - <|time: 0, id: 1, name: 'A'|>, - <|time: 200, id: 2, name: 'B'|>, - <|time: 400, id: 3, name: 'C'|>, - <|time: 600, id: 4, name: 'B'|>, - <|time: 800, id: 5, name: 'C'|>, - <|time: 1000, id: 6, name: 'W'|>, + <|time: 0, name: 'A'|>, + <|time: 100, name: 'B'|>, + <|time: 200, name: 'C'|>, + <|time: 300, name: 'B'|>, + <|time: 400, name: 'C'|>, + <|time: 500, name: 'A'|>, + <|time: 600, name: 'B'|>, + <|time: 700, name: 'C'|>, + <|time: 800, name: 'W'|>, ]) ; @@ -22,9 +25,9 @@ FROM ORDER BY CAST(time AS Timestamp) MEASURES - FIRST(A.id) AS a_id, - LAST(B_OR_C.id) AS bc_id, - LAST(C.id) AS c_id + FIRST(A.time) AS a_time, + LAST(B_OR_C.time) AS bc_time, + LAST(C.time) AS c_time PATTERN (A B_OR_C * C) DEFINE A AS A.name == 'A', diff --git a/yql/essentials/tests/sql/suites/match_recognize/alerts-streaming.sql b/yql/essentials/tests/sql/suites/match_recognize/alerts-streaming.sql index 5a9d43a155..f8f13b5411 100644 --- a/yql/essentials/tests/sql/suites/match_recognize/alerts-streaming.sql +++ b/yql/essentials/tests/sql/suites/match_recognize/alerts-streaming.sql @@ -25,7 +25,6 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( LAST(LOGIN_SUCCESS_REMOTE.user) as remote_login_user, LAST(LOGIN_SUCCESS_REMOTE.dt) as remote_login_dt, LAST(SUSPICIOUS_ACTION_SOON.dt) as suspicious_action_dt, - LAST(SUSPICIOUS_ACTION_TIMEOUT.dt) as suspicious_action_timeout_dt, FIRST(LOGIN_FAILED_SAME_USER.dt) as brutforce_begin, FIRST(LOGIN_SUCCESS_SAME_USER.dt) as brutforce_end, LAST(LOGIN_SUCCESS_SAME_USER.user) as brutforce_login @@ -33,7 +32,7 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( ONE ROW PER MATCH AFTER MATCH SKIP TO NEXT ROW PATTERN ( - LOGIN_SUCCESS_REMOTE ANY_ROW1* (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) | + LOGIN_SUCCESS_REMOTE ANY_ROW1* SUSPICIOUS_ACTION_SOON | (LOGIN_FAILED_SAME_USER ANY_ROW2*){2,} LOGIN_SUCCESS_SAME_USER ) DEFINE @@ -48,8 +47,6 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( SUSPICIOUS_ACTION_SOON.host = LAST(LOGIN_SUCCESS_REMOTE.host) and SUSPICIOUS_ACTION_SOON.ev_type = "delete_all" and COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE), - SUSPICIOUS_ACTION_TIMEOUT as - COALESCE(SUSPICIOUS_ACTION_TIMEOUT.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) > 500, TRUE), LOGIN_FAILED_SAME_USER as LOGIN_FAILED_SAME_USER.ev_type = "login" and LOGIN_FAILED_SAME_USER.ev_status <> "success" and diff --git a/yql/essentials/tests/sql/suites/match_recognize/alerts.sql b/yql/essentials/tests/sql/suites/match_recognize/alerts.sql index 9b408b63ed..da1aa8ddd4 100644 --- a/yql/essentials/tests/sql/suites/match_recognize/alerts.sql +++ b/yql/essentials/tests/sql/suites/match_recognize/alerts.sql @@ -25,7 +25,6 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( LAST(LOGIN_SUCCESS_REMOTE.user) as remote_login_user, LAST(LOGIN_SUCCESS_REMOTE.dt) as remote_login_dt, LAST(SUSPICIOUS_ACTION_SOON.dt) as suspicious_action_dt, - LAST(SUSPICIOUS_ACTION_TIMEOUT.dt) as suspicious_action_timeout_dt, FIRST(LOGIN_FAILED_SAME_USER.dt) as brutforce_begin, FIRST(LOGIN_SUCCESS_SAME_USER.dt) as brutforce_end, LAST(LOGIN_SUCCESS_SAME_USER.user) as brutforce_login @@ -33,7 +32,7 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( ONE ROW PER MATCH AFTER MATCH SKIP TO NEXT ROW PATTERN ( - LOGIN_SUCCESS_REMOTE ANY_ROW1* (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) | + LOGIN_SUCCESS_REMOTE ANY_ROW1* SUSPICIOUS_ACTION_SOON | (LOGIN_FAILED_SAME_USER ANY_ROW2*){2,} LOGIN_SUCCESS_SAME_USER ) DEFINE @@ -48,8 +47,6 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( SUSPICIOUS_ACTION_SOON.host = LAST(LOGIN_SUCCESS_REMOTE.host) and SUSPICIOUS_ACTION_SOON.ev_type = "delete_all" and COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE), - SUSPICIOUS_ACTION_TIMEOUT as - COALESCE(SUSPICIOUS_ACTION_TIMEOUT.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) > 500, TRUE), LOGIN_FAILED_SAME_USER as LOGIN_FAILED_SAME_USER.ev_type = "login" and LOGIN_FAILED_SAME_USER.ev_status <> "success" and diff --git a/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order.sql b/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order.sql index bab465988d..006ba67e2e 100644 --- a/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order.sql +++ b/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order.sql @@ -24,7 +24,6 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( LAST(LOGIN_SUCCESS_REMOTE.user) as remote_login_user, LAST(LOGIN_SUCCESS_REMOTE.dt) as remote_login_dt, LAST(SUSPICIOUS_ACTION_SOON.dt) as suspicious_action_dt, - LAST(SUSPICIOUS_ACTION_TIMEOUT.dt) as suspicious_action_timeout_dt, FIRST(LOGIN_FAILED_SAME_USER.dt) as brutforce_begin, FIRST(LOGIN_SUCCESS_SAME_USER.dt) as brutforce_end, LAST(LOGIN_SUCCESS_SAME_USER.user) as brutforce_login @@ -32,7 +31,7 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( ONE ROW PER MATCH AFTER MATCH SKIP TO NEXT ROW PATTERN ( - LOGIN_SUCCESS_REMOTE ANY_ROW1* (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) | + LOGIN_SUCCESS_REMOTE ANY_ROW1* SUSPICIOUS_ACTION_SOON | (LOGIN_FAILED_SAME_USER ANY_ROW2*){2,} LOGIN_SUCCESS_SAME_USER ) DEFINE @@ -47,8 +46,6 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE( SUSPICIOUS_ACTION_SOON.host = LAST(LOGIN_SUCCESS_REMOTE.host) and SUSPICIOUS_ACTION_SOON.ev_type = "delete_all" and COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE), - SUSPICIOUS_ACTION_TIMEOUT as - COALESCE(SUSPICIOUS_ACTION_TIMEOUT.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) > 500, TRUE), LOGIN_FAILED_SAME_USER as LOGIN_FAILED_SAME_USER.ev_type = "login" and LOGIN_FAILED_SAME_USER.ev_status <> "success" and diff --git a/yql/essentials/tests/sql/suites/match_recognize/greedy_quantifiers.sql b/yql/essentials/tests/sql/suites/match_recognize/greedy_quantifiers.sql index 8645b1c5f2..96707478d6 100644 --- a/yql/essentials/tests/sql/suites/match_recognize/greedy_quantifiers.sql +++ b/yql/essentials/tests/sql/suites/match_recognize/greedy_quantifiers.sql @@ -1,26 +1,27 @@ -pragma FeatureR010="prototype"; -pragma config.flags("MatchRecognizeStream", "disable"); +PRAGMA FeatureR010="prototype"; +PRAGMA config.flags("MatchRecognizeStream", "disable"); $input = SELECT * FROM AS_TABLE([ - <|time: 0, id: 1, name: 'A'|>, - <|time: 200, id: 2, name: 'B'|>, - <|time: 400, id: 3, name: 'C'|>, - <|time: 600, id: 4, name: 'B'|>, - <|time: 800, id: 5, name: 'C'|>, - <|time: 1000, id: 6, name: 'W'|>, + <|time: 0, name: 'A'|>, + <|time: 100, name: 'B'|>, + <|time: 200, name: 'C'|>, + <|time: 300, name: 'B'|>, + <|time: 400, name: 'C'|>, + <|time: 500, name: 'A'|>, + <|time: 600, name: 'B'|>, + <|time: 700, name: 'C'|>, + <|time: 800, name: 'W'|>, ]); - SELECT * FROM $input MATCH_RECOGNIZE ( ORDER BY CAST(time AS Timestamp) MEASURES - FIRST(A.id) as a_id, - LAST(B_OR_C.id) as bc_id, - LAST(C.id) as c_id + FIRST(A.time) AS a_time, + LAST(B_OR_C.time) AS bc_time, + LAST(C.time) AS c_time PATTERN (A B_OR_C* C) DEFINE - A AS A.name ='A', - B_OR_C AS (B_OR_C.name ='B' or B_OR_C.name ='C'), - C AS C.name ='C' - ); - + A AS A.name = 'A', + B_OR_C AS (B_OR_C.name = 'B' OR B_OR_C.name = 'C'), + C AS C.name = 'C' +); diff --git a/yql/essentials/tests/sql/suites/pg/aliased_columns.sql b/yql/essentials/tests/sql/suites/pg/aliased_columns.sql new file mode 100644 index 0000000000..6ba1335386 --- /dev/null +++ b/yql/essentials/tests/sql/suites/pg/aliased_columns.sql @@ -0,0 +1,2 @@ +--!syntax_pg +select a, b, c from (select 1 as a, 2 as b, 3 as c) as "A.B" order by b diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp index 43e3864ae2..a2d302212f 100644 --- a/yt/cpp/mapreduce/client/client.cpp +++ b/yt/cpp/mapreduce/client/client.cpp @@ -42,6 +42,7 @@ #include <yt/cpp/mapreduce/library/table_schema/protobuf.h> +#include <yt/cpp/mapreduce/raw_client/raw_client.h> #include <yt/cpp/mapreduce/raw_client/raw_requests.h> #include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h> @@ -90,10 +91,12 @@ void ApplyProxyUrlAliasingRules(TString& url) //////////////////////////////////////////////////////////////////////////////// TClientBase::TClientBase( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& transactionId, IClientRetryPolicyPtr retryPolicy) - : Context_(context) + : RawClient_(std::move(rawClient)) + , Context_(context) , TransactionId_(transactionId) , ClientRetryPolicy_(std::move(retryPolicy)) { } @@ -101,7 +104,7 @@ TClientBase::TClientBase( ITransactionPtr TClientBase::StartTransaction( const TStartTransactionOptions& options) { - return MakeIntrusive<TTransaction>(GetParentClientImpl(), Context_, TransactionId_, options); + return MakeIntrusive<TTransaction>(RawClient_, GetParentClientImpl(), Context_, TransactionId_, options); } TNodeId TClientBase::Create( @@ -138,7 +141,11 @@ void TClientBase::Set( const TNode& value, const TSetOptions& options) { - NRawClient::Set(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options); + RequestWithRetry<void>( + ClientRetryPolicy_->CreatePolicyForGenericRequest(), + [this, &path, &value, &options] (TMutationId& mutationId) { + RawClient_->Set(mutationId, TransactionId_, path, value, options); + }); } void TClientBase::MultisetAttributes( @@ -826,6 +833,11 @@ IClientPtr TClientBase::GetParentClient() return GetParentClientImpl(); } +IRawClientPtr TClientBase::GetRawClient() const +{ + return RawClient_; +} + const TClientContext& TClientBase::GetContext() const { return Context_; @@ -839,11 +851,12 @@ const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const //////////////////////////////////////////////////////////////////////////////// TTransaction::TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& parentTransactionId, const TStartTransactionOptions& options) - : TClientBase(context, parentTransactionId, parentClient->GetRetryPolicy()) + : TClientBase(std::move(rawClient), context, parentTransactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( MakeHolder<TPingableTransaction>( @@ -858,11 +871,12 @@ TTransaction::TTransaction( } TTransaction::TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& transactionId, const TAttachTransactionOptions& options) - : TClientBase(context, transactionId, parentClient->GetRetryPolicy()) + : TClientBase(std::move(rawClient), context, transactionId, parentClient->GetRetryPolicy()) , TransactionPinger_(parentClient->GetTransactionPinger()) , PingableTx_( new TPingableTransaction( @@ -928,10 +942,11 @@ TClientPtr TTransaction::GetParentClientImpl() //////////////////////////////////////////////////////////////////////////////// TClient::TClient( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& globalId, IClientRetryPolicyPtr retryPolicy) - : TClientBase(context, globalId, retryPolicy) + : TClientBase(std::move(rawClient), context, globalId, retryPolicy) , TransactionPinger_(nullptr) { } @@ -943,7 +958,7 @@ ITransactionPtr TClient::AttachTransaction( { CheckShutdown(); - return MakeIntrusive<TTransaction>(this, Context_, transactionId, options); + return MakeIntrusive<TTransaction>(RawClient_, this, Context_, transactionId, options); } void TClient::MountTable( @@ -1435,9 +1450,15 @@ TClientPtr CreateClientImpl( retryConfigProvider = CreateDefaultRetryConfigProvider(); } + auto rawClient = MakeIntrusive<THttpRawClient>(context); + EnsureInitialized(); - return new TClient(context, globalTxId, CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config)); + return new TClient( + std::move(rawClient), + context, + globalTxId, + CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config)); } //////////////////////////////////////////////////////////////////////////////// diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h index 32c458316d..2a0398d056 100644 --- a/yt/cpp/mapreduce/client/client.h +++ b/yt/cpp/mapreduce/client/client.h @@ -5,6 +5,7 @@ #include "transaction_pinger.h" #include <yt/cpp/mapreduce/interface/client.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> #include <yt/cpp/mapreduce/http/context.h> #include <yt/cpp/mapreduce/http/requests.h> @@ -29,6 +30,7 @@ class TClientBase { public: TClientBase( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& transactionId, IClientRetryPolicyPtr retryPolicy); @@ -222,6 +224,8 @@ public: IClientPtr GetParentClient() override; + IRawClientPtr GetRawClient() const; + const TClientContext& GetContext() const; const IClientRetryPolicyPtr& GetRetryPolicy() const; @@ -232,6 +236,7 @@ protected: virtual TClientPtr GetParentClientImpl() = 0; protected: + const IRawClientPtr RawClient_; const TClientContext Context_; TTransactionId TransactionId_; IClientRetryPolicyPtr ClientRetryPolicy_; @@ -287,6 +292,7 @@ public: // // Start a new transaction. TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& parentTransactionId, @@ -295,6 +301,7 @@ public: // // Attach an existing transaction. TTransaction( + IRawClientPtr rawClient, TClientPtr parentClient, const TClientContext& context, const TTransactionId& transactionId, @@ -325,6 +332,7 @@ protected: TClientPtr GetParentClientImpl() override; private: + const IRawClientPtr RawClient_; ITransactionPingerPtr TransactionPinger_; THolder<TPingableTransaction> PingableTx_; TClientPtr ParentClient_; @@ -338,6 +346,7 @@ class TClient { public: TClient( + IRawClientPtr rawClient, const TClientContext& context, const TTransactionId& globalId, IClientRetryPolicyPtr retryPolicy); diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp index 07d00e88d6..a6d424c5a1 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.cpp +++ b/yt/cpp/mapreduce/client/operation_preparer.cpp @@ -17,6 +17,7 @@ #include <yt/cpp/mapreduce/raw_client/raw_batch_request.h> #include <yt/cpp/mapreduce/interface/error_codes.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> #include <yt/cpp/mapreduce/interface/logging/yt_log.h> @@ -395,7 +396,8 @@ TJobPreparer::TJobPreparer( size_t outputTableCount, const TVector<TSmallJobFile>& smallFileList, const TOperationOptions& options) - : OperationPreparer_(operationPreparer) + : RawClient_(operationPreparer.GetClient()->GetRawClient()) + , OperationPreparer_(operationPreparer) , Spec_(spec) , Options_(options) , Layers_(spec.Layers_) @@ -631,6 +633,7 @@ TMaybe<TString> TJobPreparer::TryUploadWithDeduplication(const IItemToUpload& it CreateFileInCypress(cypressPath); auto uploadTx = MakeIntrusive<TTransaction>( + OperationPreparer_.GetClient()->GetRawClient(), OperationPreparer_.GetClient(), OperationPreparer_.GetContext(), TTransactionId(), diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h index 54c978c0fb..41594eb52e 100644 --- a/yt/cpp/mapreduce/client/operation_preparer.h +++ b/yt/cpp/mapreduce/client/operation_preparer.h @@ -83,6 +83,7 @@ public: bool ShouldRedirectStdoutToStderr() const; private: + const IRawClientPtr RawClient_; TOperationPreparer& OperationPreparer_; TUserJobSpec Spec_; TOperationOptions Options_; diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp index ca243a929a..41ee56f672 100644 --- a/yt/cpp/mapreduce/http/http.cpp +++ b/yt/cpp/mapreduce/http/http.cpp @@ -207,7 +207,7 @@ void THttpHeader::AddOperationId(const TOperationId& operationId, bool overwrite AddParameter("operation_id", GetGuidAsString(operationId), overwrite); } -void THttpHeader::AddMutationId() +TMutationId THttpHeader::AddMutationId() { TGUID guid; @@ -220,6 +220,8 @@ void THttpHeader::AddMutationId() guid.dw[2] = GetPID() ^ MicroSeconds(); AddParameter("mutation_id", GetGuidAsString(guid), true); + + return guid; } bool THttpHeader::HasMutationId() const @@ -227,6 +229,10 @@ bool THttpHeader::HasMutationId() const return Parameters_.contains("mutation_id"); } +void THttpHeader::SetMutationId(TMutationId mutationId) { + AddParameter("mutation_id", GetGuidAsString(mutationId), /* overwrite */ true); +} + void THttpHeader::SetToken(const TString& token) { Token_ = token; diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h index 618b1e2c22..0f5e9034ee 100644 --- a/yt/cpp/mapreduce/http/http.h +++ b/yt/cpp/mapreduce/http/http.h @@ -59,8 +59,9 @@ public: void AddTransactionId(const TTransactionId& transactionId, bool overwrite = false); void AddPath(const TString& path, bool overwrite = false); void AddOperationId(const TOperationId& operationId, bool overwrite = false); - void AddMutationId(); + TMutationId AddMutationId(); bool HasMutationId() const; + void SetMutationId(TMutationId mutationId); void SetToken(const TString& token); void SetProxyAddress(const TString& proxyAddress); diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp index 0c719eeda3..3330cf696c 100644 --- a/yt/cpp/mapreduce/http/retry_request.cpp +++ b/yt/cpp/mapreduce/http/retry_request.cpp @@ -49,6 +49,7 @@ static TResponseInfo Request( TResponseInfo RequestWithoutRetry( const TClientContext& context, + TMutationId& mutationId, THttpHeader& header, TMaybe<TStringBuf> body, const TRequestConfig& config) @@ -64,8 +65,13 @@ TResponseInfo RequestWithoutRetry( } if (header.HasMutationId()) { - header.RemoveParameter("retry"); - header.AddMutationId(); + if (mutationId.IsEmpty()) { + header.RemoveParameter("retry"); + mutationId = header.AddMutationId(); + } else { + header.AddParameter("retry", true, /* overwrite */ true); + header.SetMutationId(mutationId); + } } auto requestId = CreateGuidAsString(); return Request(context, header, body, requestId, config); diff --git a/yt/cpp/mapreduce/http/retry_request.h b/yt/cpp/mapreduce/http/retry_request.h index a6007d6eae..19bf0a7c14 100644 --- a/yt/cpp/mapreduce/http/retry_request.h +++ b/yt/cpp/mapreduce/http/retry_request.h @@ -2,8 +2,13 @@ #include "fwd.h" +#include <yt/cpp/mapreduce/interface/errors.h> #include <yt/cpp/mapreduce/interface/fwd.h> +#include <yt/cpp/mapreduce/interface/logging/yt_log.h> + #include <yt/cpp/mapreduce/common/fwd.h> +#include <yt/cpp/mapreduce/common/retry_lib.h> +#include <yt/cpp/mapreduce/common/wait_proxy.h> #include <yt/cpp/mapreduce/http/http_client.h> @@ -32,6 +37,64 @@ struct TRequestConfig //////////////////////////////////////////////////////////////////////////////// +template <typename TResult> +TResult RequestWithRetry( + IRequestRetryPolicyPtr retryPolicy, + std::function<TResult(TMutationId&)> func) +{ + bool useSameMutationId = false; + TMutationId mutationId; + + while (true) { + try { + retryPolicy->NotifyNewAttempt(); + if constexpr (std::is_same_v<TResult, void>) { + func(mutationId); + return; + } else { + return func(mutationId); + } + } catch (const TErrorResponse& e) { + YT_LOG_ERROR("Retry failed %v - %v", + e.GetError().GetMessage(), + retryPolicy->GetAttemptDescription()); + + useSameMutationId = e.IsTransportError(); + + if (!IsRetriable(e)) { + throw; + } + + auto maybeRetryTimeout = retryPolicy->OnRetriableError(e); + if (maybeRetryTimeout) { + TWaitProxy::Get()->Sleep(*maybeRetryTimeout); + } else { + throw; + } + } catch (const std::exception& e) { + YT_LOG_ERROR("Retry failed %v - %v", + e.what(), + retryPolicy->GetAttemptDescription()); + + useSameMutationId = true; + + if (!IsRetriable(e)) { + throw; + } + + auto maybeRetryTimeout = retryPolicy->OnGenericError(e); + if (maybeRetryTimeout) { + TWaitProxy::Get()->Sleep(*maybeRetryTimeout); + } else { + throw; + } + } + if (!useSameMutationId) { + mutationId = {}; + } + } +} + // Retry request with given `header' and `body' using `retryPolicy'. // If `retryPolicy == nullptr' use default, currently `TAttemptLimitedRetryPolicy(TConfig::Get()->RetryCount)`. TResponseInfo RetryRequestWithPolicy( @@ -43,6 +106,7 @@ TResponseInfo RetryRequestWithPolicy( TResponseInfo RequestWithoutRetry( const TClientContext& context, + TMutationId& mutationId, THttpHeader& header, TMaybe<TStringBuf> body = {}, const TRequestConfig& config = TRequestConfig()); diff --git a/yt/cpp/mapreduce/interface/fwd.h b/yt/cpp/mapreduce/interface/fwd.h index 485b45129a..162a6ee3e9 100644 --- a/yt/cpp/mapreduce/interface/fwd.h +++ b/yt/cpp/mapreduce/interface/fwd.h @@ -150,6 +150,7 @@ namespace NYT { // common.h //////////////////////////////////////////////////////////////////////////////// + using TMutationId = TGUID; using TTransactionId = TGUID; using TNodeId = TGUID; using TLockId = TGUID; @@ -396,5 +397,12 @@ namespace NYT { struct TRetryConfig; class IRetryConfigProvider; using IRetryConfigProviderPtr = ::TIntrusivePtr<IRetryConfigProvider>; + + //////////////////////////////////////////////////////////////////////////////// + // raw_client.h + //////////////////////////////////////////////////////////////////////////////// + + class IRawClient; + using IRawClientPtr = ::TIntrusivePtr<IRawClient>; } /// @endcond diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h new file mode 100644 index 0000000000..b1d244ad78 --- /dev/null +++ b/yt/cpp/mapreduce/interface/raw_client.h @@ -0,0 +1,25 @@ +#pragma once + +#include "client_method_options.h" + +namespace NYT { + +//////////////////////////////////////////////////////////////////////////////// + +class IRawClient + : public virtual TThrRefBase +{ +public: + // Cypress + + virtual void Set( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TNode& value, + const TSetOptions& options = {}) = 0; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp new file mode 100644 index 0000000000..9a8a9fca84 --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp @@ -0,0 +1,33 @@ +#include "raw_client.h" + +#include "rpc_parameters_serialization.h" + +#include <yt/cpp/mapreduce/http/http.h> +#include <yt/cpp/mapreduce/http/retry_request.h> + +#include <library/cpp/yson/node/node_io.h> + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +THttpRawClient::THttpRawClient(const TClientContext& context) + : Context_(context) +{ } + +void THttpRawClient::Set( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TNode& value, + const TSetOptions& options) +{ + THttpHeader header("PUT", "set"); + header.MergeParameters(NRawClient::SerializeParamsForSet(transactionId, Context_.Config->Prefix, path, options)); + auto body = NodeToYsonString(value); + RequestWithoutRetry(Context_, mutationId, header, body); +} + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h new file mode 100644 index 0000000000..def521a918 --- /dev/null +++ b/yt/cpp/mapreduce/raw_client/raw_client.h @@ -0,0 +1,33 @@ +#pragma once + +#include <yt/cpp/mapreduce/http/context.h> + +#include <yt/cpp/mapreduce/interface/client_method_options.h> +#include <yt/cpp/mapreduce/interface/raw_client.h> + +namespace NYT::NDetail { + +//////////////////////////////////////////////////////////////////////////////// + +class THttpRawClient + : public IRawClient +{ +public: + THttpRawClient(const TClientContext& context); + + // Cypress + + void Set( + TMutationId& mutationId, + const TTransactionId& transactionId, + const TYPath& path, + const TNode& value, + const TSetOptions& options = {}) override; + +private: + const TClientContext Context_; +}; + +//////////////////////////////////////////////////////////////////////////////// + +} // namespace NYT::NDetail diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp index 0b7bcf2c66..d10c818489 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp +++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp @@ -173,9 +173,10 @@ TNodeId CopyWithoutRetries( const TCopyOptions& options) { THttpHeader header("POST", "copy"); + TMutationId mutationId; header.AddMutationId(); header.MergeParameters(SerializeParamsForCopy(transactionId, context.Config->Prefix, sourcePath, destinationPath, options)); - return ParseGuidFromResponse(RequestWithoutRetry(context, header).Response); + return ParseGuidFromResponse(RequestWithoutRetry(context, mutationId, header).Response); } TNodeId CopyInsideMasterCell( @@ -204,9 +205,10 @@ TNodeId MoveWithoutRetries( const TMoveOptions& options) { THttpHeader header("POST", "move"); + TMutationId mutationId; header.AddMutationId(); header.MergeParameters(SerializeParamsForMove(transactionId, context.Config->Prefix, sourcePath, destinationPath, options)); - return ParseGuidFromResponse(RequestWithoutRetry( context, header).Response); + return ParseGuidFromResponse(RequestWithoutRetry(context, mutationId, header).Response); } TNodeId MoveInsideMasterCell( @@ -309,9 +311,10 @@ void Concatenate( const TConcatenateOptions& options) { THttpHeader header("POST", "concatenate"); + TMutationId mutationId; header.AddMutationId(); header.MergeParameters(SerializeParamsForConcatenate(transactionId, context.Config->Prefix, sourcePaths, destinationPath, options)); - RequestWithoutRetry(context, header); + RequestWithoutRetry(context, mutationId, header); } void PingTx( diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h index 0a183d617c..8cb226ab86 100644 --- a/yt/cpp/mapreduce/raw_client/raw_requests.h +++ b/yt/cpp/mapreduce/raw_client/raw_requests.h @@ -53,14 +53,6 @@ TNode TryGet( const TYPath& path, const TGetOptions& options); -void Set( - const IRequestRetryPolicyPtr& retryPolicy, - const TClientContext& context, - const TTransactionId& transactionId, - const TYPath& path, - const TNode& value, - const TSetOptions& options = TSetOptions()); - void MultisetAttributes( const IRequestRetryPolicyPtr& retryPolicy, const TClientContext& context, diff --git a/yt/cpp/mapreduce/raw_client/ya.make b/yt/cpp/mapreduce/raw_client/ya.make index c201b86f05..a038b2931f 100644 --- a/yt/cpp/mapreduce/raw_client/ya.make +++ b/yt/cpp/mapreduce/raw_client/ya.make @@ -4,6 +4,7 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc) SRCS( raw_batch_request.cpp + raw_client.cpp raw_requests.cpp rpc_parameters_serialization.cpp ) |