aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaly Isaev <vitalyisaev@ydb.tech>2024-12-12 15:39:00 +0000
committerVitaly Isaev <vitalyisaev@ydb.tech>2024-12-12 15:39:00 +0000
commit827b115675004838023427572a7c69f40a86a80a (patch)
treee99c953fe494b9de8d8597a15859d77c81f118c7
parent42701242eaf5be980cb935631586d0e90b82641c (diff)
parentfab222fd8176d00eee5ddafc6bce8cb95a6e3ab0 (diff)
downloadydb-827b115675004838023427572a7c69f40a86a80a.tar.gz
Merge branch 'rightlib' into rightlib_20241212
-rw-r--r--build/sysincl/unsorted.yml1
-rw-r--r--library/cpp/testing/common/env.cpp26
-rw-r--r--library/cpp/testing/common/ut/env_ut.cpp18
-rw-r--r--library/python/testing/yatest_common/yatest/common/runtime.py6
-rw-r--r--yql/essentials/core/common_opt/yql_co_pgselect.cpp2
-rw-r--r--yql/essentials/core/type_ann/type_ann_pg.cpp57
-rw-r--r--yql/essentials/core/type_ann/type_ann_pg.h2
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp22
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp69
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h2
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_match_recognize.cpp10
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_match_recognize_nfa.h133
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_match_recognize_nfa_ut.cpp6
-rw-r--r--yql/essentials/sql/v1/sql_query.cpp3
-rw-r--r--yql/essentials/sql/v1/sql_ut.cpp4
-rw-r--r--yql/essentials/sql/v1/sql_ut_antlr4.cpp4
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/result.json31
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts-streaming_/formatted.sql4
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_/formatted.sql4
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql4
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-greedy_quantifiers_/formatted.sql21
-rw-r--r--yql/essentials/tests/sql/suites/match_recognize/alerts-streaming.sql5
-rw-r--r--yql/essentials/tests/sql/suites/match_recognize/alerts.sql5
-rw-r--r--yql/essentials/tests/sql/suites/match_recognize/alerts_without_order.sql5
-rw-r--r--yql/essentials/tests/sql/suites/match_recognize/greedy_quantifiers.sql35
-rw-r--r--yql/essentials/tests/sql/suites/pg/aliased_columns.sql2
-rw-r--r--yt/cpp/mapreduce/client/client.cpp37
-rw-r--r--yt/cpp/mapreduce/client/client.h9
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.cpp5
-rw-r--r--yt/cpp/mapreduce/client/operation_preparer.h1
-rw-r--r--yt/cpp/mapreduce/http/http.cpp8
-rw-r--r--yt/cpp/mapreduce/http/http.h3
-rw-r--r--yt/cpp/mapreduce/http/retry_request.cpp10
-rw-r--r--yt/cpp/mapreduce/http/retry_request.h64
-rw-r--r--yt/cpp/mapreduce/interface/fwd.h8
-rw-r--r--yt/cpp/mapreduce/interface/raw_client.h25
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.cpp33
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_client.h33
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.cpp9
-rw-r--r--yt/cpp/mapreduce/raw_client/raw_requests.h8
-rw-r--r--yt/cpp/mapreduce/raw_client/ya.make1
41 files changed, 502 insertions, 233 deletions
diff --git a/build/sysincl/unsorted.yml b/build/sysincl/unsorted.yml
index d101a7cf2d..387eb86d72 100644
--- a/build/sysincl/unsorted.yml
+++ b/build/sysincl/unsorted.yml
@@ -259,7 +259,6 @@
#endif
#if defined(_TRASH_) && TODO
- gcrypt.h
- - winmmap.h
#endif
#if defined(__TURBOC__) || defined(__BORLANDC__)
- alloc.h
diff --git a/library/cpp/testing/common/env.cpp b/library/cpp/testing/common/env.cpp
index 67e081c371..1440186d78 100644
--- a/library/cpp/testing/common/env.cpp
+++ b/library/cpp/testing/common/env.cpp
@@ -41,31 +41,7 @@ TString BinaryPath(TStringBuf path) {
}
TString GetArcadiaTestsData() {
- if (GetEnv("USE_ATD_FROM_SNAPSHOT")) {
- return ArcadiaSourceRoot() + "/atd_ro_snapshot";
- }
-
- TString atdRoot = NPrivate::GetTestEnv().ArcadiaTestsDataDir;
- if (atdRoot) {
- return atdRoot;
- }
-
- TString path = NPrivate::GetCwd();
- const char pathsep = GetDirectorySeparator();
- while (!path.empty()) {
- TString dataDir = path + "/arcadia_tests_data";
- if (IsDir(dataDir)) {
- return dataDir;
- }
-
- size_t pos = path.find_last_of(pathsep);
- if (pos == TString::npos) {
- pos = 0;
- }
- path.erase(pos);
- }
-
- return {};
+ return ArcadiaSourceRoot() + "/atd_ro_snapshot";
}
TString GetWorkPath() {
diff --git a/library/cpp/testing/common/ut/env_ut.cpp b/library/cpp/testing/common/ut/env_ut.cpp
index fe4946a65f..4b67d05efb 100644
--- a/library/cpp/testing/common/ut/env_ut.cpp
+++ b/library/cpp/testing/common/ut/env_ut.cpp
@@ -45,24 +45,6 @@ TEST(Runtime, BinaryPath) {
EXPECT_TRUE(TFsPath(BinaryPath("library/cpp/testing/common/ut")).Exists());
}
-TEST(Runtime, GetArcadiaTestsData) {
- NTesting::TScopedEnvironment contextGuard("YA_TEST_CONTEXT_FILE", ""); // remove context filename
- {
- auto tmpDir = ::GetSystemTempDir();
- NTesting::TScopedEnvironment guard("ARCADIA_TESTS_DATA_DIR", tmpDir);
- Singleton<NPrivate::TTestEnv>()->ReInitialize();
- EXPECT_EQ(tmpDir, GetArcadiaTestsData());
- }
- {
- NTesting::TScopedEnvironment guard("ARCADIA_TESTS_DATA_DIR", "");
- Singleton<NPrivate::TTestEnv>()->ReInitialize();
- auto path = GetArcadiaTestsData();
- // it is not error if path is empty
- const bool ok = (path.empty() || GetBaseName(path) == "arcadia_tests_data");
- EXPECT_TRUE(ok);
- }
-}
-
TEST(Runtime, GetWorkPath) {
NTesting::TScopedEnvironment contextGuard("YA_TEST_CONTEXT_FILE", ""); // remove context filename
{
diff --git a/library/python/testing/yatest_common/yatest/common/runtime.py b/library/python/testing/yatest_common/yatest/common/runtime.py
index e4f246d5c8..99a6799109 100644
--- a/library/python/testing/yatest_common/yatest/common/runtime.py
+++ b/library/python/testing/yatest_common/yatest/common/runtime.py
@@ -190,9 +190,9 @@ def data_path(path=None):
:param path: path relative to the arcadia_tests_data directory, e.g. yatest.common.data_path("pers/rerank_service")
:return: absolute path inside arcadia_tests_data
"""
- if "USE_ATD_FROM_SNAPSHOT" in os.environ:
- return os.path.join(source_path(), "atd_ro_snapshot", path)
- return _join_path(_get_ya_plugin_instance().data_root, path)
+ if "USE_ATD_FROM_ATD" in os.environ:
+ return _join_path(_get_ya_plugin_instance().data_root, path)
+ return _join_path(source_path("atd_ro_snapshot"), path)
@default_arg0
diff --git a/yql/essentials/core/common_opt/yql_co_pgselect.cpp b/yql/essentials/core/common_opt/yql_co_pgselect.cpp
index 93b307aab7..e8d37492a8 100644
--- a/yql/essentials/core/common_opt/yql_co_pgselect.cpp
+++ b/yql/essentials/core/common_opt/yql_co_pgselect.cpp
@@ -1022,7 +1022,7 @@ TUsedColumns GatherUsedColumns(const TExprNode::TPtr& result, const TExprNode::T
void FillInputIndices(const TExprNode::TPtr& from, const TExprNode::TPtr& finalExtTypes,
TUsedColumns& usedColumns, TOptimizeContext& optCtx) {
for (auto& x : usedColumns) {
- TStringBuf alias;
+ TString alias;
TStringBuf column = NTypeAnnImpl::RemoveAlias(x.first, alias);
bool foundColumn = false;
diff --git a/yql/essentials/core/type_ann/type_ann_pg.cpp b/yql/essentials/core/type_ann/type_ann_pg.cpp
index d0d73f22bb..78a035b70e 100644
--- a/yql/essentials/core/type_ann/type_ann_pg.cpp
+++ b/yql/essentials/core/type_ann/type_ann_pg.cpp
@@ -346,7 +346,7 @@ IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode
TVector<const TItemExprType*> items;
for (size_t i = 0; i < proc.OutputArgTypes.size(); ++i) {
items.push_back(ctx.Expr.MakeType<TItemExprType>(
- resultColumnOrder->AddColumn(proc.OutputArgNames[i]),
+ resultColumnOrder->AddColumn(proc.OutputArgNames[i]),
ctx.Expr.MakeType<TPgExprType>(proc.OutputArgTypes[i])));
}
@@ -697,7 +697,7 @@ IGraphTransformer::TStatus PgArrayOpWrapper(const TExprNode::TPtr& input, TExprN
const auto& opResDesc = NPg::LookupType(oper.ResultType);
if (opResDesc.Name != "bool") {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), TStringBuilder() <<
+ ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), TStringBuilder() <<
"Expected boolean operator result, but got: " << opResDesc.Name));
return IGraphTransformer::TStatus::Error;
}
@@ -1021,7 +1021,7 @@ IGraphTransformer::TStatus PgNullIfWrapper(const TExprNode::TPtr& input, TExprNo
input->ChildRef(0) = WrapWithPgCast(std::move(input->ChildRef(0)), commonType->TypeId, ctx.Expr);
return IGraphTransformer::TStatus::Repeat;
}
-
+
input->SetTypeAnn(ctx.Expr.MakeType<TPgExprType>(commonType->TypeId));
return IGraphTransformer::TStatus::Ok;
}
@@ -1144,7 +1144,7 @@ IGraphTransformer::TStatus PgReplaceUnknownWrapper(const TExprNode::TPtr& input,
auto structType = listType->GetItemType()->Cast<TStructExprType>();
auto structItemTypes = structType->GetItems();
- const bool noUnknowns = std::none_of(structItemTypes.cbegin(), structItemTypes.cend(),
+ const bool noUnknowns = std::none_of(structItemTypes.cbegin(), structItemTypes.cend(),
[] (const TItemExprType* item) {
const auto* itemType = item->GetItemType();
return itemType->GetKind() == ETypeAnnotationKind::Pg && itemType->Cast<TPgExprType>()->GetId() == NPg::UnknownOid;
@@ -2010,12 +2010,23 @@ bool ValidateWindowRefs(const TExprNode::TPtr& root, const TExprNode* windows, T
return !isError;
}
+TString EscapeDotsInAlias(TStringBuf alias) {
+ TStringBuilder sb;
+ for (auto c: alias) {
+ if (c == '.' || c == '\\') {
+ sb << '\\';
+ }
+ sb << c;
+ }
+ return sb;
+}
+
TString MakeAliasedColumn(TStringBuf alias, TStringBuf column) {
if (!alias) {
return TString(column);
}
- return TStringBuilder() << "_alias_" << alias << "." << column;
+ return TStringBuilder() << "_alias_" << EscapeDotsInAlias(alias) << "." << column;
}
const TItemExprType* AddAlias(const TString& alias, const TItemExprType* item, TExprContext& ctx) {
@@ -2027,22 +2038,30 @@ const TItemExprType* AddAlias(const TString& alias, const TItemExprType* item, T
}
TStringBuf RemoveAlias(TStringBuf column) {
- TStringBuf tmp;
+ TString tmp;
return RemoveAlias(column, tmp);
}
-TStringBuf RemoveAlias(TStringBuf column, TStringBuf& alias) {
+TStringBuf RemoveAlias(TStringBuf column, TString& alias) {
if (!column.StartsWith("_alias_")) {
alias = "";
return column;
}
-
- auto columnPos = column.find('.', 7);
- YQL_ENSURE(columnPos != TString::npos);
- columnPos += 1;
- YQL_ENSURE(columnPos != column.size());
- alias = column.substr(7, columnPos - 7 - 1);
- return column.substr(columnPos);
+ column = column.substr(7);
+ TStringBuilder aliasBuilder;
+ for (size_t i = 0; i < column.size(); ++i) {
+ if (column[i] == '\\') {
+ YQL_ENSURE(i + 1 < column.size());
+ aliasBuilder << column[++i];
+ continue;
+ }
+ if (column[i] == '.') {
+ alias = aliasBuilder;
+ return column.substr(i + 1);
+ }
+ aliasBuilder << column[i];
+ }
+ YQL_ENSURE(false, "No dot ('.') found in alised column");
}
const TItemExprType* RemoveAlias(const TItemExprType* item, TExprContext& ctx) {
@@ -2877,7 +2896,7 @@ bool ReplaceProjectionRefs(TExprNode::TPtr& lambda, const TStringBuf& scope, con
for (ui32 i = 0; i < projectionOrders.size(); ++i) {
if (index >= current && index < current + projectionOrders[i]->first.Size()) {
TStringBuf column = projectionOrders[i]->first[index - current].PhysicalName;
- TStringBuf alias;
+ TString alias;
column = RemoveAlias(column, alias);
if (result && projectionOrders[i]->second) {
@@ -3346,7 +3365,7 @@ bool GatherExtraSortColumns(const TExprNode& data, const TInputs& inputs, TExprN
}
if (node->IsCallable("Member") && &node->Head() == arg) {
- TStringBuf alias;
+ TString alias;
TStringBuf column = NTypeAnnImpl::RemoveAlias(node->Tail().Content(), alias);
TMaybe<ui32> index;
@@ -3495,7 +3514,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
"Incorrect fill_target_columns option"));
return IGraphTransformer::TStatus::Error;
}
- }
+ }
else if (optionName == "unknowns_allowed") {
hasUnknownsAllowed = true;
}
@@ -4456,7 +4475,7 @@ IGraphTransformer::TStatus PgSetItemWrapper(const TExprNode::TPtr& input, TExprN
return IGraphTransformer::TStatus::Error;
}
}
-
+
if (rightSideUsing.contains(lcase)) {
lrNames[1] = ctx.Expr.NewList(inp->Pos(), {ctx.Expr.NewAtom(inp->Pos(), rightSideUsing[lcase])});
} else {
@@ -5442,7 +5461,7 @@ IGraphTransformer::TStatus PgArrayWrapper(const TExprNode::TPtr& input, TExprNod
bool castsNeeded = false;
const NPg::TTypeDesc* elemTypeDesc;
- if (const auto issue = NPg::LookupCommonType(argTypes,
+ if (const auto issue = NPg::LookupCommonType(argTypes,
[&input, &ctx](size_t i) {
return ctx.Expr.GetPosition(input->Child(i)->Pos());
}, elemTypeDesc, castsNeeded))
diff --git a/yql/essentials/core/type_ann/type_ann_pg.h b/yql/essentials/core/type_ann/type_ann_pg.h
index 85ca8285ef..042d2c0092 100644
--- a/yql/essentials/core/type_ann/type_ann_pg.h
+++ b/yql/essentials/core/type_ann/type_ann_pg.h
@@ -12,7 +12,7 @@ TExprNodePtr WrapWithPgCast(TExprNodePtr&& node, ui32 targetTypeId, TExprContext
TString MakeAliasedColumn(TStringBuf alias, TStringBuf column);
const TItemExprType* AddAlias(const TString& alias, const TItemExprType* item, TExprContext& ctx);
TStringBuf RemoveAlias(TStringBuf column);
-TStringBuf RemoveAlias(TStringBuf column, TStringBuf& alias);
+TStringBuf RemoveAlias(TStringBuf column, TString& alias);
const TItemExprType* RemoveAlias(const TItemExprType* item, TExprContext& ctx);
TMap<TString, ui32> ExtractExternalColumns(const TExprNode& select);
bool IsPlainMemberOverArg(const TExprNode& expr, TStringBuf& memberName);
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp
index 08986bc8fd..871321786b 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join.cpp
@@ -643,12 +643,15 @@ private:
}
void SwitchMode(EOperatingMode mode, TComputationContext& ctx) {
+ LogMemoryUsage();
switch(mode) {
case EOperatingMode::InMemory: {
+ YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to InMemory";
MKQL_ENSURE(false, "Internal logic error");
break;
}
case EOperatingMode::Spilling: {
+ YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling";
MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error");
auto spiller = ctx.SpillerFactory->CreateSpiller();
RightPacker->TablePtr->InitializeBucketSpillers(spiller);
@@ -656,6 +659,7 @@ private:
break;
}
case EOperatingMode::ProcessSpilled: {
+ YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to ProcessSpilled";
SpilledBucketsJoinOrder.reserve(GraceJoin::NumberOfBuckets);
for (ui32 i = 0; i < GraceJoin::NumberOfBuckets; ++i) SpilledBucketsJoinOrder.push_back(i);
@@ -843,9 +847,6 @@ private:
if (isYield == EFetchResult::One)
return isYield;
if (IsSpillingAllowed && ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
- LogMemoryUsage();
- YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching Memory mode to Spilling";
-
SwitchMode(EOperatingMode::Spilling, ctx);
return EFetchResult::Yield;
}
@@ -861,14 +862,18 @@ private:
<< " HaveLeft " << *HaveMoreLeftRows << " LeftPacked " << LeftPacker->TuplesBatchPacked << " LeftBatch " << LeftPacker->BatchSize
<< " HaveRight " << *HaveMoreRightRows << " RightPacked " << RightPacker->TuplesBatchPacked << " RightBatch " << RightPacker->BatchSize
;
+
+ auto& leftTable = *LeftPacker->TablePtr;
+ auto& rightTable = SelfJoinSameKeys_ ? *LeftPacker->TablePtr : *RightPacker->TablePtr;
+ if (IsSpillingAllowed && ctx.SpillerFactory && !JoinedTablePtr->TryToPreallocateMemoryForJoin(leftTable, rightTable, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows)) {
+ SwitchMode(EOperatingMode::Spilling, ctx);
+ return EFetchResult::Yield;
+ }
+
*PartialJoinCompleted = true;
LeftPacker->StartTime = std::chrono::system_clock::now();
RightPacker->StartTime = std::chrono::system_clock::now();
- if ( SelfJoinSameKeys_ ) {
- JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
- } else {
- JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
- }
+ JoinedTablePtr->Join(leftTable, rightTable, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
JoinedTablePtr->ResetIterator();
LeftPacker->EndTime = std::chrono::system_clock::now();
RightPacker->EndTime = std::chrono::system_clock::now();
@@ -945,7 +950,6 @@ EFetchResult DoCalculateWithSpilling(TComputationContext& ctx, NUdf::TUnboxedVal
}
if (!IsReadyForSpilledDataProcessing()) return EFetchResult::Yield;
- YQL_LOG(INFO) << (const void *)&*JoinedTablePtr << "# switching to ProcessSpilled";
SwitchMode(EOperatingMode::ProcessSpilled, ctx);
return EFetchResult::Finish;
}
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp
index f9b19fdfbc..ae7c89daef 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.cpp
@@ -320,6 +320,63 @@ void ResizeHashTable(KeysHashTable &t, ui64 newSlots){
}
+bool IsTablesSwapRequired(ui64 tuplesNum1, ui64 tuplesNum2, bool table1Batch, bool table2Batch) {
+ return tuplesNum2 > tuplesNum1 && !table1Batch || table2Batch;
+}
+
+ui64 ComputeJoinSlotsSizeForBucket(const TTableBucket& bucket, const TTableBucketStats& bucketStats, ui64 headerSize, bool tableHasKeyStringColumns, bool tableHasKeyIColumns) {
+ ui64 tuplesNum = bucketStats.TuplesNum;
+
+ ui64 avgStringsSize = (3 * (bucket.KeyIntVals.size() - tuplesNum * headerSize) ) / ( 2 * tuplesNum + 1) + 1;
+ ui64 slotSize = headerSize + 1; // Header [Short Strings] SlotIdx
+ if (tableHasKeyStringColumns || tableHasKeyIColumns) {
+ slotSize = slotSize + avgStringsSize;
+ }
+
+ return slotSize;
+}
+
+ui64 ComputeNumberOfSlots(ui64 tuplesNum) {
+ return (3 * tuplesNum + 1) | 1;
+}
+
+bool TTable::TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples) {
+ // If the batch is final or the only one, then the buckets are processed sequentially, the memory for the hash tables is freed immediately after processing.
+ // So, no preallocation is required.
+ if (!hasMoreLeftTuples && !hasMoreRightTuples) return true;
+
+ for (ui64 bucket = 0; bucket < GraceJoin::NumberOfBuckets; bucket++) {
+ ui64 tuplesNum1 = t1.TableBucketsStats[bucket].TuplesNum;
+ ui64 tuplesNum2 = t2.TableBucketsStats[bucket].TuplesNum;
+
+ TTable& tableForPreallocation = IsTablesSwapRequired(tuplesNum1, tuplesNum2, hasMoreLeftTuples || LeftTableBatch_, hasMoreRightTuples || RightTableBatch_) ? t1 : t2;
+ if (!tableForPreallocation.TableBucketsStats[bucket].TuplesNum || tableForPreallocation.TableBuckets[bucket].NSlots) continue;
+
+ TTableBucket& bucketForPreallocation = tableForPreallocation.TableBuckets[bucket];
+ const TTableBucketStats& bucketForPreallocationStats = tableForPreallocation.TableBucketsStats[bucket];
+
+ const auto nSlots = ComputeJoinSlotsSizeForBucket(bucketForPreallocation, bucketForPreallocationStats, tableForPreallocation.HeaderSize,
+ tableForPreallocation.NumberOfKeyStringColumns != 0, tableForPreallocation.NumberOfKeyIColumns != 0);
+ const auto slotSize = ComputeNumberOfSlots(tableForPreallocation.TableBucketsStats[bucket].TuplesNum);
+
+ try {
+ bucketForPreallocation.JoinSlots.reserve(nSlots*slotSize);
+ } catch (TMemoryLimitExceededException) {
+ for (ui64 i = 0; i < bucket; ++i) {
+ GraceJoin::TTableBucket * b1 = &JoinTable1->TableBuckets[i];
+ b1->JoinSlots.resize(0);
+ b1->JoinSlots.shrink_to_fit();
+ GraceJoin::TTableBucket * b2 = &JoinTable2->TableBuckets[i];
+ b2->JoinSlots.resize(0);
+ b2->JoinSlots.shrink_to_fit();
+ }
+ return false;
+ }
+ }
+
+ return true;
+}
+
// Joins two tables and returns join result in joined table. Tuples of joined table could be received by
// joined table iterator
@@ -368,7 +425,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
bool table2HasKeyStringColumns = (JoinTable2->NumberOfKeyStringColumns != 0);
bool table1HasKeyIColumns = (JoinTable1->NumberOfKeyIColumns != 0);
bool table2HasKeyIColumns = (JoinTable2->NumberOfKeyIColumns != 0);
- bool swapTables = tuplesNum2 > tuplesNum1 && !table1Batch || table2Batch;
+ bool swapTables = IsTablesSwapRequired(tuplesNum1, tuplesNum2, table1Batch, table2Batch);
if (swapTables) {
@@ -402,13 +459,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
if (tuplesNum1 == 0 && (hasMoreRightTuples || hasMoreLeftTuples || !bucketStats2->HashtableMatches))
continue;
- ui64 slotSize = headerSize2 + 1; // Header [Short Strings] SlotIdx
-
- ui64 avgStringsSize = ( 3 * (bucket2->KeyIntVals.size() - tuplesNum2 * headerSize2) ) / ( 2 * tuplesNum2 + 1) + 1;
-
- if (table2HasKeyStringColumns || table2HasKeyIColumns ) {
- slotSize = slotSize + avgStringsSize;
- }
+ ui64 slotSize = ComputeJoinSlotsSizeForBucket(*bucket2, *bucketStats2, headerSize2, table2HasKeyStringColumns, table2HasKeyIColumns);
ui64 &nSlots = bucket2->NSlots;
auto &joinSlots = bucket2->JoinSlots;
@@ -417,7 +468,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
Y_DEBUG_ABORT_UNLESS(bucketStats2->SlotSize == 0 || bucketStats2->SlotSize == slotSize);
if (!nSlots) {
- nSlots = (3 * tuplesNum2 + 1) | 1;
+ nSlots = ComputeNumberOfSlots(tuplesNum2);
joinSlots.resize(nSlots*slotSize, 0);
bloomFilter.Resize(tuplesNum2);
initHashTable = true;
diff --git a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h
index a4846926d1..d6b9a54aca 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h
+++ b/yql/essentials/minikql/comp_nodes/mkql_grace_join_imp.h
@@ -346,6 +346,8 @@ public:
// Returns value of next tuple. Returs true if there are more tuples
bool NextTuple(TupleData& td);
+ bool TryToPreallocateMemoryForJoin(TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples);
+
// Joins two tables and stores join result in table data. Tuples of joined table could be received by
// joined table iterator. Life time of t1, t2 should be greater than lifetime of joined table
// hasMoreLeftTuples, hasMoreRightTuples is true if join is partial and more rows are coming. For final batch hasMoreLeftTuples = false, hasMoreRightTuples = false
diff --git a/yql/essentials/minikql/comp_nodes/mkql_match_recognize.cpp b/yql/essentials/minikql/comp_nodes/mkql_match_recognize.cpp
index 0f758c7a8b..80caaad37e 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_match_recognize.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_match_recognize.cpp
@@ -54,7 +54,7 @@ public:
)
: PartitionKey(std::move(partitionKey))
, Parameters(parameters)
- , Nfa(nfaTransitions, parameters.MatchedVarsArg, parameters.Defines)
+ , Nfa(nfaTransitions, parameters.MatchedVarsArg, parameters.Defines, parameters.SkipTo)
, Cache(cache)
{
}
@@ -72,10 +72,10 @@ public:
NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) {
auto match = Nfa.GetMatched();
- if (!match.has_value()) {
+ if (!match) {
return NUdf::TUnboxedValue{};
}
- Parameters.MatchedVarsArg->SetValue(ctx, ctx.HolderFactory.Create<TMatchedVarsValue<TRange>>(ctx.HolderFactory, match.value()));
+ Parameters.MatchedVarsArg->SetValue(ctx, ctx.HolderFactory.Create<TMatchedVarsValue<TRange>>(ctx.HolderFactory, match->Vars));
Parameters.MeasureInputDataArg->SetValue(ctx, ctx.HolderFactory.Create<TMeasureInputDataValue>(
ctx.HolderFactory.Create<TListValue<TSparseList>>(Rows),
Parameters.MeasureInputColumnOrder,
@@ -95,9 +95,7 @@ public:
break;
}
}
- if (EAfterMatchSkipTo::PastLastRow == Parameters.SkipTo.To) {
- Nfa.Clear();
- }
+ Nfa.AfterMatchSkip(*match);
return result;
}
bool ProcessEndOfData(TComputationContext& ctx) {
diff --git a/yql/essentials/minikql/comp_nodes/mkql_match_recognize_nfa.h b/yql/essentials/minikql/comp_nodes/mkql_match_recognize_nfa.h
index 944164e4bc..2b194212f4 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_match_recognize_nfa.h
+++ b/yql/essentials/minikql/comp_nodes/mkql_match_recognize_nfa.h
@@ -350,19 +350,25 @@ class TNfa {
using TRange = TSparseList::TRange;
using TMatchedVars = TMatchedVars<TRange>;
+public:
+ struct TMatch {
+ size_t BeginIndex;
+ size_t EndIndex;
+ TMatchedVars Vars;
+ };
+
+private:
struct TState {
- size_t BeginMatchIndex;
- size_t EndMatchIndex;
size_t Index;
- TMatchedVars Vars;
+ TMatch Match;
std::deque<ui64, TMKQLAllocator<ui64>> Quantifiers;
void Save(TMrOutputSerializer& serializer) const {
- serializer.Write(BeginMatchIndex);
- serializer.Write(EndMatchIndex);
serializer.Write(Index);
- serializer.Write(Vars.size());
- for (const auto& vector : Vars) {
+ serializer.Write(Match.BeginIndex);
+ serializer.Write(Match.EndIndex);
+ serializer.Write(Match.Vars.size());
+ for (const auto& vector : Match.Vars) {
serializer.Write(vector.size());
for (const auto& range : vector) {
range.Save(serializer);
@@ -375,13 +381,13 @@ class TNfa {
}
void Load(TMrInputSerializer& serializer) {
- serializer.Read(BeginMatchIndex);
- serializer.Read(EndMatchIndex);
serializer.Read(Index);
+ serializer.Read(Match.BeginIndex);
+ serializer.Read(Match.EndIndex);
auto varsSize = serializer.Read<TMatchedVars::size_type>();
- Vars.clear();
- Vars.resize(varsSize);
- for (auto& subvec: Vars) {
+ Match.Vars.clear();
+ Match.Vars.resize(varsSize);
+ for (auto& subvec: Match.Vars) {
ui64 vectorSize = serializer.Read<ui64>();
subvec.resize(vectorSize);
for (auto& item : subvec) {
@@ -397,24 +403,29 @@ class TNfa {
}
friend inline bool operator<(const TState& lhs, const TState& rhs) {
- auto lhsEndMatchIndex = -static_cast<i64>(lhs.EndMatchIndex);
- auto rhsEndMatchIndex = -static_cast<i64>(rhs.EndMatchIndex);
- return std::tie(lhs.BeginMatchIndex, lhsEndMatchIndex, lhs.Index, lhs.Quantifiers, lhs.Vars) < std::tie(rhs.BeginMatchIndex, rhsEndMatchIndex, rhs.Index, rhs.Quantifiers, rhs.Vars);
+ auto lhsMatchEndIndex = -static_cast<i64>(lhs.Match.EndIndex);
+ auto rhsMatchEndIndex = -static_cast<i64>(rhs.Match.EndIndex);
+ return std::tie(lhs.Match.BeginIndex, lhsMatchEndIndex, lhs.Index, lhs.Match.Vars, lhs.Quantifiers) < std::tie(rhs.Match.BeginIndex, rhsMatchEndIndex, rhs.Index, rhs.Match.Vars, rhs.Quantifiers);
}
friend inline bool operator==(const TState& lhs, const TState& rhs) {
- return std::tie(lhs.BeginMatchIndex, lhs.EndMatchIndex, lhs.Index, lhs.Quantifiers, lhs.Vars) == std::tie(rhs.BeginMatchIndex, rhs.EndMatchIndex, rhs.Index, rhs.Quantifiers, rhs.Vars);
+ return std::tie(lhs.Match.BeginIndex, lhs.Match.EndIndex, lhs.Index, lhs.Match.Vars, lhs.Quantifiers) == std::tie(rhs.Match.BeginIndex, rhs.Match.EndIndex, rhs.Index, rhs.Match.Vars, rhs.Quantifiers);
}
};
public:
- TNfa(TNfaTransitionGraph::TPtr transitionGraph, IComputationExternalNode* matchedRangesArg, const TComputationNodePtrVector& defines)
- : TransitionGraph(transitionGraph)
- , MatchedRangesArg(matchedRangesArg)
- , Defines(defines) {
- }
+ TNfa(
+ TNfaTransitionGraph::TPtr transitionGraph,
+ IComputationExternalNode* matchedRangesArg,
+ const TComputationNodePtrVector& defines,
+ TAfterMatchSkipTo skipTo)
+ : TransitionGraph(transitionGraph)
+ , MatchedRangesArg(matchedRangesArg)
+ , Defines(defines)
+ , SkipTo_(skipTo)
+ {}
void ProcessRow(TSparseList::TRange&& currentRowLock, TComputationContext& ctx) {
- TState state(currentRowLock.From(), currentRowLock.To(), TransitionGraph->Input, TMatchedVars(Defines.size()), std::deque<ui64, TMKQLAllocator<ui64>>{});
+ TState state(TransitionGraph->Input, TMatch{currentRowLock.From(), currentRowLock.To(), TMatchedVars(Defines.size())}, std::deque<ui64, TMKQLAllocator<ui64>>{});
Insert(std::move(state));
MakeEpsilonTransitions();
TStateSet newStates;
@@ -423,17 +434,17 @@ public:
//Here we handle only transitions of TMatchedVarTransition type,
//all other transitions are handled in MakeEpsilonTransitions
if (const auto* matchedVarTransition = std::get_if<TMatchedVarTransition>(&TransitionGraph->Transitions[state.Index])) {
- MatchedRangesArg->SetValue(ctx, ctx.HolderFactory.Create<TMatchedVarsValue<TRange>>(ctx.HolderFactory, state.Vars));
+ MatchedRangesArg->SetValue(ctx, ctx.HolderFactory.Create<TMatchedVarsValue<TRange>>(ctx.HolderFactory, state.Match.Vars));
const auto varIndex = matchedVarTransition->VarIndex;
const auto& v = Defines[varIndex]->GetValue(ctx);
if (v && v.Get<bool>()) {
if (matchedVarTransition->SaveState) {
- auto vars = state.Vars; //TODO get rid of this copy
+ auto vars = state.Match.Vars; //TODO get rid of this copy
auto& matchedVar = vars[varIndex];
Extend(matchedVar, currentRowLock);
- newStates.emplace(state.BeginMatchIndex, currentRowLock.To(), matchedVarTransition->To, std::move(vars), state.Quantifiers);
+ newStates.emplace(matchedVarTransition->To, TMatch{state.Match.BeginIndex, currentRowLock.To(), std::move(vars)}, state.Quantifiers);
} else {
- newStates.emplace(state.BeginMatchIndex, currentRowLock.To(), matchedVarTransition->To, state.Vars, state.Quantifiers);
+ newStates.emplace(matchedVarTransition->To, TMatch{state.Match.BeginIndex, currentRowLock.To(), state.Match.Vars}, state.Quantifiers);
}
}
deletedStates.insert(state);
@@ -450,8 +461,8 @@ public:
bool HasMatched() const {
for (auto& state: ActiveStates) {
- if (auto activeStateIter = ActiveStateCounters.find(state.BeginMatchIndex),
- finishedStateIter = FinishedStateCounters.find(state.BeginMatchIndex);
+ if (auto activeStateIter = ActiveStateCounters.find(state.Match.BeginIndex),
+ finishedStateIter = FinishedStateCounters.find(state.Match.BeginIndex);
((activeStateIter != ActiveStateCounters.end() &&
finishedStateIter != FinishedStateCounters.end() &&
activeStateIter->second == finishedStateIter->second) ||
@@ -463,16 +474,16 @@ public:
return false;
}
- std::optional<TMatchedVars> GetMatched() {
+ std::optional<TMatch> GetMatched() {
for (auto& state: ActiveStates) {
- if (auto activeStateIter = ActiveStateCounters.find(state.BeginMatchIndex),
- finishedStateIter = FinishedStateCounters.find(state.BeginMatchIndex);
+ if (auto activeStateIter = ActiveStateCounters.find(state.Match.BeginIndex),
+ finishedStateIter = FinishedStateCounters.find(state.Match.BeginIndex);
((activeStateIter != ActiveStateCounters.end() &&
finishedStateIter != FinishedStateCounters.end() &&
activeStateIter->second == finishedStateIter->second) ||
EndOfData) &&
state.Index == TransitionGraph->Output) {
- auto result = state.Vars;
+ auto result = state.Match;
Erase(std::move(state));
return result;
}
@@ -515,9 +526,9 @@ public:
auto activeStateCountersSize = serializer.Read<ui64>();
for (size_t i = 0; i < activeStateCountersSize; ++i) {
using map_type = decltype(ActiveStateCounters);
- auto beginMatchIndex = serializer.Read<map_type::key_type>();
+ auto matchBeginIndex = serializer.Read<map_type::key_type>();
auto counter = serializer.Read<map_type::mapped_type>();
- ActiveStateCounters.emplace(beginMatchIndex, counter);
+ ActiveStateCounters.emplace(matchBeginIndex, counter);
}
}
{
@@ -525,9 +536,9 @@ public:
auto finishedStateCountersSize = serializer.Read<ui64>();
for (size_t i = 0; i < finishedStateCountersSize; ++i) {
using map_type = decltype(FinishedStateCounters);
- auto beginMatchIndex = serializer.Read<map_type::key_type>();
+ auto matchBeginIndex = serializer.Read<map_type::key_type>();
auto counter = serializer.Read<map_type::mapped_type>();
- FinishedStateCounters.emplace(beginMatchIndex, counter);
+ FinishedStateCounters.emplace(matchBeginIndex, counter);
}
}
}
@@ -537,10 +548,31 @@ public:
return HasMatched();
}
- void Clear() {
- ActiveStates.clear();
- ActiveStateCounters.clear();
- FinishedStateCounters.clear();
+ void AfterMatchSkip(const TMatch& match) {
+ const auto skipToRowIndex = [&]() {
+ switch (SkipTo_.To) {
+ case EAfterMatchSkipTo::NextRow:
+ return match.BeginIndex + 1;
+ case EAfterMatchSkipTo::PastLastRow:
+ return match.EndIndex + 1;
+ case EAfterMatchSkipTo::ToFirst:
+ MKQL_ENSURE(false, "AFTER MATCH SKIP TO FIRST is not implemented yet");
+ case EAfterMatchSkipTo::ToLast:
+ [[fallthrough]];
+ case EAfterMatchSkipTo::To:
+ MKQL_ENSURE(false, "AFTER MATCH SKIP TO LAST is not implemented yet");
+ }
+ }();
+
+ TStateSet deletedStates;
+ for (const auto& state : ActiveStates) {
+ if (state.Match.BeginIndex < skipToRowIndex) {
+ deletedStates.insert(state);
+ }
+ }
+ for (auto& state : deletedStates) {
+ Erase(std::move(state));
+ }
}
private:
@@ -561,14 +593,14 @@ private:
[&](const TEpsilonTransitions& epsilonTransitions) {
deletedStates.insert(state);
for (const auto& i : epsilonTransitions.To) {
- newStates.emplace(state.BeginMatchIndex, state.EndMatchIndex, i, state.Vars, state.Quantifiers);
+ newStates.emplace(i, TMatch{state.Match.BeginIndex, state.Match.EndIndex, state.Match.Vars}, state.Quantifiers);
}
},
[&](const TQuantityEnterTransition& quantityEnterTransition) {
deletedStates.insert(state);
auto quantifiers = state.Quantifiers; //TODO get rid of this copy
quantifiers.push_back(0);
- newStates.emplace(state.BeginMatchIndex, state.EndMatchIndex, quantityEnterTransition.To, state.Vars, std::move(quantifiers));
+ newStates.emplace(quantityEnterTransition.To, TMatch{state.Match.BeginIndex, state.Match.EndIndex, state.Match.Vars}, std::move(quantifiers));
},
[&](const TQuantityExitTransition& quantityExitTransition) {
deletedStates.insert(state);
@@ -576,12 +608,12 @@ private:
if (state.Quantifiers.back() + 1 < quantityMax) {
auto q = state.Quantifiers;
q.back()++;
- newStates.emplace(state.BeginMatchIndex, state.EndMatchIndex, toFindMore, state.Vars, std::move(q));
+ newStates.emplace(toFindMore, TMatch{state.Match.BeginIndex, state.Match.EndIndex, state.Match.Vars}, std::move(q));
}
if (quantityMin <= state.Quantifiers.back() + 1 && state.Quantifiers.back() + 1 <= quantityMax) {
auto q = state.Quantifiers;
q.pop_back();
- newStates.emplace(state.BeginMatchIndex, state.EndMatchIndex, toMatched, state.Vars, std::move(q));
+ newStates.emplace(toMatched, TMatch{state.Match.BeginIndex, state.Match.EndIndex, state.Match.Vars}, std::move(q));
}
},
}, TransitionGraph->Transitions[state.Index]);
@@ -610,22 +642,22 @@ private:
}
void Insert(TState state) {
- auto beginMatchIndex = state.BeginMatchIndex;
+ auto matchBeginIndex = state.Match.BeginIndex;
const auto& transition = TransitionGraph->Transitions[state.Index];
auto diff = static_cast<i64>(ActiveStates.insert(std::move(state)).second);
- Add(ActiveStateCounters, beginMatchIndex, diff);
+ Add(ActiveStateCounters, matchBeginIndex, diff);
if (std::holds_alternative<TVoidTransition>(transition)) {
- Add(FinishedStateCounters, beginMatchIndex, diff);
+ Add(FinishedStateCounters, matchBeginIndex, diff);
}
}
void Erase(TState state) {
- auto beginMatchIndex = state.BeginMatchIndex;
+ auto matchBeginIndex = state.Match.BeginIndex;
const auto& transition = TransitionGraph->Transitions[state.Index];
auto diff = -static_cast<i64>(ActiveStates.erase(std::move(state)));
- Add(ActiveStateCounters, beginMatchIndex, diff);
+ Add(ActiveStateCounters, matchBeginIndex, diff);
if (std::holds_alternative<TVoidTransition>(transition)) {
- Add(FinishedStateCounters, beginMatchIndex, diff);
+ Add(FinishedStateCounters, matchBeginIndex, diff);
}
}
@@ -636,6 +668,7 @@ private:
THashMap<size_t, i64> ActiveStateCounters;
THashMap<size_t, i64> FinishedStateCounters;
bool EndOfData = false;
+ TAfterMatchSkipTo SkipTo_;
};
}//namespace NKikimr::NMiniKQL::NMatchRecognize
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_match_recognize_nfa_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_match_recognize_nfa_ut.cpp
index 745c88e084..afdbd6b8e8 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_match_recognize_nfa_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_match_recognize_nfa_ut.cpp
@@ -59,7 +59,7 @@ struct TNfaSetup {
for (auto& d: Defines) {
defines.push_back(d);
}
- return TNfa(transitionGraph, MatchedVars, defines);
+ return TNfa(transitionGraph, MatchedVars, defines, TAfterMatchSkipTo{EAfterMatchSkipTo::PastLastRow, ""});
}
TComputationNodeFactory GetAuxCallableFactory() {
@@ -261,8 +261,8 @@ Y_UNIT_TEST_SUITE(MatchRecognizeNfa) {
Iota(expectedTo.begin(), expectedTo.end(), i - seriesLength + 1);
for (size_t matchCount = 0; matchCount < seriesLength; ++matchCount) {
auto match = setup.Nfa.GetMatched();
- UNIT_ASSERT_C(match.has_value(), i);
- auto vars = match.value();
+ UNIT_ASSERT_C(match, i);
+ auto vars = match->Vars;
UNIT_ASSERT_VALUES_EQUAL_C(1, vars.size(), i);
auto var = vars[0];
UNIT_ASSERT_VALUES_EQUAL_C(1, var.size(), i);
diff --git a/yql/essentials/sql/v1/sql_query.cpp b/yql/essentials/sql/v1/sql_query.cpp
index 2e213d02ad..e17dcf09aa 100644
--- a/yql/essentials/sql/v1/sql_query.cpp
+++ b/yql/essentials/sql/v1/sql_query.cpp
@@ -67,7 +67,8 @@ static bool AsyncReplicationSettingsEntry(std::map<TString, TNodePtr>& out,
};
TSet<TString> modeSettings = {
- "consistency_mode",
+ "consistency_mode", // TODO(ilnaz): deprecated
+ "consistency_level",
"commit_interval",
};
diff --git a/yql/essentials/sql/v1/sql_ut.cpp b/yql/essentials/sql/v1/sql_ut.cpp
index e0d243929f..3ef964fa0d 100644
--- a/yql/essentials/sql/v1/sql_ut.cpp
+++ b/yql/essentials/sql/v1/sql_ut.cpp
@@ -3086,11 +3086,11 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
{
auto req = R"(
USE plato;
- ALTER ASYNC REPLICATION MyReplication SET (CONSISTENCY_MODE = "STRONG");
+ ALTER ASYNC REPLICATION MyReplication SET (CONSISTENCY_LEVEL = "GLOBAL");
)";
auto res = SqlToYql(req);
UNIT_ASSERT(!res.Root);
- UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:79: Error: CONSISTENCY_MODE is not supported in ALTER\n");
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:80: Error: CONSISTENCY_LEVEL is not supported in ALTER\n");
}
{
auto req = R"(
diff --git a/yql/essentials/sql/v1/sql_ut_antlr4.cpp b/yql/essentials/sql/v1/sql_ut_antlr4.cpp
index c561512136..c3c5f9c983 100644
--- a/yql/essentials/sql/v1/sql_ut_antlr4.cpp
+++ b/yql/essentials/sql/v1/sql_ut_antlr4.cpp
@@ -3086,11 +3086,11 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) {
{
auto req = R"(
USE plato;
- ALTER ASYNC REPLICATION MyReplication SET (CONSISTENCY_MODE = "STRONG");
+ ALTER ASYNC REPLICATION MyReplication SET (CONSISTENCY_LEVEL = "GLOBAL");
)";
auto res = SqlToYql(req);
UNIT_ASSERT(!res.Root);
- UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:79: Error: CONSISTENCY_MODE is not supported in ALTER\n");
+ UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:3:80: Error: CONSISTENCY_LEVEL is not supported in ALTER\n");
}
{
auto req = R"(
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json
index 47f93caece..e73aa91e9e 100644
--- a/yql/essentials/tests/sql/sql2yql/canondata/result.json
+++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json
@@ -11180,30 +11180,30 @@
],
"test_sql2yql.test[match_recognize-alerts-streaming]": [
{
- "checksum": "951e1c6d04cbba2370c2a5e5542d6afd",
- "size": 10537,
- "uri": "https://{canondata_backend}/1942671/e054628d5e2733e5fbc993cb8c765074de561f06/resource.tar.gz#test_sql2yql.test_match_recognize-alerts-streaming_/sql.yql"
+ "checksum": "931c9266d12e54a59fb0cd3570c3ccc0",
+ "size": 9765,
+ "uri": "https://{canondata_backend}/1903885/44ceadc17896d9c66a7580ce6886870c575e270b/resource.tar.gz#test_sql2yql.test_match_recognize-alerts-streaming_/sql.yql"
}
],
"test_sql2yql.test[match_recognize-alerts]": [
{
- "checksum": "67843aad123bb76d097416bf83bbe749",
- "size": 10539,
- "uri": "https://{canondata_backend}/1942671/e054628d5e2733e5fbc993cb8c765074de561f06/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_/sql.yql"
+ "checksum": "81ff4044da026f2de566bc73e499da3d",
+ "size": 9767,
+ "uri": "https://{canondata_backend}/1903885/44ceadc17896d9c66a7580ce6886870c575e270b/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_/sql.yql"
}
],
"test_sql2yql.test[match_recognize-alerts_without_order]": [
{
- "checksum": "3260928c483690ee0d3f06917cd1d5cb",
- "size": 10420,
- "uri": "https://{canondata_backend}/1942671/e054628d5e2733e5fbc993cb8c765074de561f06/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_without_order_/sql.yql"
+ "checksum": "adf84392f4dd1db143484be8cbbda16c",
+ "size": 9648,
+ "uri": "https://{canondata_backend}/1903885/44ceadc17896d9c66a7580ce6886870c575e270b/resource.tar.gz#test_sql2yql.test_match_recognize-alerts_without_order_/sql.yql"
}
],
"test_sql2yql.test[match_recognize-greedy_quantifiers]": [
{
- "checksum": "45d528f98097150c41366da075dc08c8",
- "size": 4257,
- "uri": "https://{canondata_backend}/1130705/7ccb4b448efbf0410ae97c635cda187d108148f4/resource.tar.gz#test_sql2yql.test_match_recognize-greedy_quantifiers_/sql.yql"
+ "checksum": "c6b4102b3bf241e7b309e8cc93aaf76c",
+ "size": 4349,
+ "uri": "https://{canondata_backend}/1903885/44ceadc17896d9c66a7580ce6886870c575e270b/resource.tar.gz#test_sql2yql.test_match_recognize-greedy_quantifiers_/sql.yql"
}
],
"test_sql2yql.test[match_recognize-permute]": [
@@ -12368,6 +12368,13 @@
"uri": "https://{canondata_backend}/1937429/434276f26b2857be3c5ad3fdbbf877d2bf775ac5/resource.tar.gz#test_sql2yql.test_pg-aggregate_scalar_minus_zero_/sql.yql"
}
],
+ "test_sql2yql.test[pg-aliased_columns]": [
+ {
+ "checksum": "7ca1f0f6ce1395c4d6f826660886df83",
+ "size": 953,
+ "uri": "https://{canondata_backend}/1777230/27c189b00ecc5d4153899123da0bf3c72d8cfd80/resource.tar.gz#test_sql2yql.test_pg-aliased_columns_/sql.yql"
+ }
+ ],
"test_sql2yql.test[pg-all_data]": [
{
"checksum": "e2e983a696817fe3cbebcbbf79264fd1",
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts-streaming_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts-streaming_/formatted.sql
index e572efb76d..28d63923cf 100644
--- a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts-streaming_/formatted.sql
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts-streaming_/formatted.sql
@@ -28,13 +28,12 @@ FROM
LAST(LOGIN_SUCCESS_REMOTE.user) AS remote_login_user,
LAST(LOGIN_SUCCESS_REMOTE.dt) AS remote_login_dt,
LAST(SUSPICIOUS_ACTION_SOON.dt) AS suspicious_action_dt,
- LAST(SUSPICIOUS_ACTION_TIMEOUT.dt) AS suspicious_action_timeout_dt,
FIRST(LOGIN_FAILED_SAME_USER.dt) AS brutforce_begin,
FIRST(LOGIN_SUCCESS_SAME_USER.dt) AS brutforce_end,
LAST(LOGIN_SUCCESS_SAME_USER.user) AS brutforce_login
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
- PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER)
+ PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * SUSPICIOUS_ACTION_SOON | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER)
DEFINE
LOGIN_SUCCESS_REMOTE AS LOGIN_SUCCESS_REMOTE.ev_type == "login"
AND LOGIN_SUCCESS_REMOTE.ev_status == "success"
@@ -44,7 +43,6 @@ FROM
SUSPICIOUS_ACTION_SOON AS SUSPICIOUS_ACTION_SOON.host == LAST(LOGIN_SUCCESS_REMOTE.host)
AND SUSPICIOUS_ACTION_SOON.ev_type == "delete_all"
AND COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE),
- SUSPICIOUS_ACTION_TIMEOUT AS COALESCE(SUSPICIOUS_ACTION_TIMEOUT.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) > 500, TRUE),
LOGIN_FAILED_SAME_USER AS LOGIN_FAILED_SAME_USER.ev_type == "login"
AND LOGIN_FAILED_SAME_USER.ev_status != "success"
AND (
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_/formatted.sql
index 471b7f13ab..ea91539f37 100644
--- a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_/formatted.sql
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_/formatted.sql
@@ -28,13 +28,12 @@ FROM
LAST(LOGIN_SUCCESS_REMOTE.user) AS remote_login_user,
LAST(LOGIN_SUCCESS_REMOTE.dt) AS remote_login_dt,
LAST(SUSPICIOUS_ACTION_SOON.dt) AS suspicious_action_dt,
- LAST(SUSPICIOUS_ACTION_TIMEOUT.dt) AS suspicious_action_timeout_dt,
FIRST(LOGIN_FAILED_SAME_USER.dt) AS brutforce_begin,
FIRST(LOGIN_SUCCESS_SAME_USER.dt) AS brutforce_end,
LAST(LOGIN_SUCCESS_SAME_USER.user) AS brutforce_login
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
- PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER)
+ PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * SUSPICIOUS_ACTION_SOON | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER)
DEFINE
LOGIN_SUCCESS_REMOTE AS LOGIN_SUCCESS_REMOTE.ev_type == "login"
AND LOGIN_SUCCESS_REMOTE.ev_status == "success"
@@ -44,7 +43,6 @@ FROM
SUSPICIOUS_ACTION_SOON AS SUSPICIOUS_ACTION_SOON.host == LAST(LOGIN_SUCCESS_REMOTE.host)
AND SUSPICIOUS_ACTION_SOON.ev_type == "delete_all"
AND COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE),
- SUSPICIOUS_ACTION_TIMEOUT AS COALESCE(SUSPICIOUS_ACTION_TIMEOUT.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) > 500, TRUE),
LOGIN_FAILED_SAME_USER AS LOGIN_FAILED_SAME_USER.ev_type == "login"
AND LOGIN_FAILED_SAME_USER.ev_status != "success"
AND (
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql
index 70d4e9cc66..4d30431160 100644
--- a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-alerts_without_order_/formatted.sql
@@ -26,13 +26,12 @@ FROM
LAST(LOGIN_SUCCESS_REMOTE.user) AS remote_login_user,
LAST(LOGIN_SUCCESS_REMOTE.dt) AS remote_login_dt,
LAST(SUSPICIOUS_ACTION_SOON.dt) AS suspicious_action_dt,
- LAST(SUSPICIOUS_ACTION_TIMEOUT.dt) AS suspicious_action_timeout_dt,
FIRST(LOGIN_FAILED_SAME_USER.dt) AS brutforce_begin,
FIRST(LOGIN_SUCCESS_SAME_USER.dt) AS brutforce_end,
LAST(LOGIN_SUCCESS_SAME_USER.user) AS brutforce_login
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
- PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER)
+ PATTERN (LOGIN_SUCCESS_REMOTE ANY_ROW1 * SUSPICIOUS_ACTION_SOON | (LOGIN_FAILED_SAME_USER ANY_ROW2 *) {2,} LOGIN_SUCCESS_SAME_USER)
DEFINE
LOGIN_SUCCESS_REMOTE AS LOGIN_SUCCESS_REMOTE.ev_type == "login"
AND LOGIN_SUCCESS_REMOTE.ev_status == "success"
@@ -42,7 +41,6 @@ FROM
SUSPICIOUS_ACTION_SOON AS SUSPICIOUS_ACTION_SOON.host == LAST(LOGIN_SUCCESS_REMOTE.host)
AND SUSPICIOUS_ACTION_SOON.ev_type == "delete_all"
AND COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE),
- SUSPICIOUS_ACTION_TIMEOUT AS COALESCE(SUSPICIOUS_ACTION_TIMEOUT.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) > 500, TRUE),
LOGIN_FAILED_SAME_USER AS LOGIN_FAILED_SAME_USER.ev_type == "login"
AND LOGIN_FAILED_SAME_USER.ev_status != "success"
AND (
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-greedy_quantifiers_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-greedy_quantifiers_/formatted.sql
index e9e70b561f..887a45e7e3 100644
--- a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-greedy_quantifiers_/formatted.sql
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_match_recognize-greedy_quantifiers_/formatted.sql
@@ -6,12 +6,15 @@ $input =
*
FROM
AS_TABLE([
- <|time: 0, id: 1, name: 'A'|>,
- <|time: 200, id: 2, name: 'B'|>,
- <|time: 400, id: 3, name: 'C'|>,
- <|time: 600, id: 4, name: 'B'|>,
- <|time: 800, id: 5, name: 'C'|>,
- <|time: 1000, id: 6, name: 'W'|>,
+ <|time: 0, name: 'A'|>,
+ <|time: 100, name: 'B'|>,
+ <|time: 200, name: 'C'|>,
+ <|time: 300, name: 'B'|>,
+ <|time: 400, name: 'C'|>,
+ <|time: 500, name: 'A'|>,
+ <|time: 600, name: 'B'|>,
+ <|time: 700, name: 'C'|>,
+ <|time: 800, name: 'W'|>,
])
;
@@ -22,9 +25,9 @@ FROM
ORDER BY
CAST(time AS Timestamp)
MEASURES
- FIRST(A.id) AS a_id,
- LAST(B_OR_C.id) AS bc_id,
- LAST(C.id) AS c_id
+ FIRST(A.time) AS a_time,
+ LAST(B_OR_C.time) AS bc_time,
+ LAST(C.time) AS c_time
PATTERN (A B_OR_C * C)
DEFINE
A AS A.name == 'A',
diff --git a/yql/essentials/tests/sql/suites/match_recognize/alerts-streaming.sql b/yql/essentials/tests/sql/suites/match_recognize/alerts-streaming.sql
index 5a9d43a155..f8f13b5411 100644
--- a/yql/essentials/tests/sql/suites/match_recognize/alerts-streaming.sql
+++ b/yql/essentials/tests/sql/suites/match_recognize/alerts-streaming.sql
@@ -25,7 +25,6 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE(
LAST(LOGIN_SUCCESS_REMOTE.user) as remote_login_user,
LAST(LOGIN_SUCCESS_REMOTE.dt) as remote_login_dt,
LAST(SUSPICIOUS_ACTION_SOON.dt) as suspicious_action_dt,
- LAST(SUSPICIOUS_ACTION_TIMEOUT.dt) as suspicious_action_timeout_dt,
FIRST(LOGIN_FAILED_SAME_USER.dt) as brutforce_begin,
FIRST(LOGIN_SUCCESS_SAME_USER.dt) as brutforce_end,
LAST(LOGIN_SUCCESS_SAME_USER.user) as brutforce_login
@@ -33,7 +32,7 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE(
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATTERN (
- LOGIN_SUCCESS_REMOTE ANY_ROW1* (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) |
+ LOGIN_SUCCESS_REMOTE ANY_ROW1* SUSPICIOUS_ACTION_SOON |
(LOGIN_FAILED_SAME_USER ANY_ROW2*){2,} LOGIN_SUCCESS_SAME_USER
)
DEFINE
@@ -48,8 +47,6 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE(
SUSPICIOUS_ACTION_SOON.host = LAST(LOGIN_SUCCESS_REMOTE.host) and
SUSPICIOUS_ACTION_SOON.ev_type = "delete_all" and
COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE),
- SUSPICIOUS_ACTION_TIMEOUT as
- COALESCE(SUSPICIOUS_ACTION_TIMEOUT.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) > 500, TRUE),
LOGIN_FAILED_SAME_USER as
LOGIN_FAILED_SAME_USER.ev_type = "login" and
LOGIN_FAILED_SAME_USER.ev_status <> "success" and
diff --git a/yql/essentials/tests/sql/suites/match_recognize/alerts.sql b/yql/essentials/tests/sql/suites/match_recognize/alerts.sql
index 9b408b63ed..da1aa8ddd4 100644
--- a/yql/essentials/tests/sql/suites/match_recognize/alerts.sql
+++ b/yql/essentials/tests/sql/suites/match_recognize/alerts.sql
@@ -25,7 +25,6 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE(
LAST(LOGIN_SUCCESS_REMOTE.user) as remote_login_user,
LAST(LOGIN_SUCCESS_REMOTE.dt) as remote_login_dt,
LAST(SUSPICIOUS_ACTION_SOON.dt) as suspicious_action_dt,
- LAST(SUSPICIOUS_ACTION_TIMEOUT.dt) as suspicious_action_timeout_dt,
FIRST(LOGIN_FAILED_SAME_USER.dt) as brutforce_begin,
FIRST(LOGIN_SUCCESS_SAME_USER.dt) as brutforce_end,
LAST(LOGIN_SUCCESS_SAME_USER.user) as brutforce_login
@@ -33,7 +32,7 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE(
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATTERN (
- LOGIN_SUCCESS_REMOTE ANY_ROW1* (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) |
+ LOGIN_SUCCESS_REMOTE ANY_ROW1* SUSPICIOUS_ACTION_SOON |
(LOGIN_FAILED_SAME_USER ANY_ROW2*){2,} LOGIN_SUCCESS_SAME_USER
)
DEFINE
@@ -48,8 +47,6 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE(
SUSPICIOUS_ACTION_SOON.host = LAST(LOGIN_SUCCESS_REMOTE.host) and
SUSPICIOUS_ACTION_SOON.ev_type = "delete_all" and
COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE),
- SUSPICIOUS_ACTION_TIMEOUT as
- COALESCE(SUSPICIOUS_ACTION_TIMEOUT.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) > 500, TRUE),
LOGIN_FAILED_SAME_USER as
LOGIN_FAILED_SAME_USER.ev_type = "login" and
LOGIN_FAILED_SAME_USER.ev_status <> "success" and
diff --git a/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order.sql b/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order.sql
index bab465988d..006ba67e2e 100644
--- a/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order.sql
+++ b/yql/essentials/tests/sql/suites/match_recognize/alerts_without_order.sql
@@ -24,7 +24,6 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE(
LAST(LOGIN_SUCCESS_REMOTE.user) as remote_login_user,
LAST(LOGIN_SUCCESS_REMOTE.dt) as remote_login_dt,
LAST(SUSPICIOUS_ACTION_SOON.dt) as suspicious_action_dt,
- LAST(SUSPICIOUS_ACTION_TIMEOUT.dt) as suspicious_action_timeout_dt,
FIRST(LOGIN_FAILED_SAME_USER.dt) as brutforce_begin,
FIRST(LOGIN_SUCCESS_SAME_USER.dt) as brutforce_end,
LAST(LOGIN_SUCCESS_SAME_USER.user) as brutforce_login
@@ -32,7 +31,7 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE(
ONE ROW PER MATCH
AFTER MATCH SKIP TO NEXT ROW
PATTERN (
- LOGIN_SUCCESS_REMOTE ANY_ROW1* (SUSPICIOUS_ACTION_SOON | SUSPICIOUS_ACTION_TIMEOUT) |
+ LOGIN_SUCCESS_REMOTE ANY_ROW1* SUSPICIOUS_ACTION_SOON |
(LOGIN_FAILED_SAME_USER ANY_ROW2*){2,} LOGIN_SUCCESS_SAME_USER
)
DEFINE
@@ -47,8 +46,6 @@ FROM AS_TABLE($osquery_data) MATCH_RECOGNIZE(
SUSPICIOUS_ACTION_SOON.host = LAST(LOGIN_SUCCESS_REMOTE.host) and
SUSPICIOUS_ACTION_SOON.ev_type = "delete_all" and
COALESCE(SUSPICIOUS_ACTION_SOON.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) <= 500, TRUE),
- SUSPICIOUS_ACTION_TIMEOUT as
- COALESCE(SUSPICIOUS_ACTION_TIMEOUT.dt - FIRST(LOGIN_SUCCESS_REMOTE.dt) > 500, TRUE),
LOGIN_FAILED_SAME_USER as
LOGIN_FAILED_SAME_USER.ev_type = "login" and
LOGIN_FAILED_SAME_USER.ev_status <> "success" and
diff --git a/yql/essentials/tests/sql/suites/match_recognize/greedy_quantifiers.sql b/yql/essentials/tests/sql/suites/match_recognize/greedy_quantifiers.sql
index 8645b1c5f2..96707478d6 100644
--- a/yql/essentials/tests/sql/suites/match_recognize/greedy_quantifiers.sql
+++ b/yql/essentials/tests/sql/suites/match_recognize/greedy_quantifiers.sql
@@ -1,26 +1,27 @@
-pragma FeatureR010="prototype";
-pragma config.flags("MatchRecognizeStream", "disable");
+PRAGMA FeatureR010="prototype";
+PRAGMA config.flags("MatchRecognizeStream", "disable");
$input = SELECT * FROM AS_TABLE([
- <|time: 0, id: 1, name: 'A'|>,
- <|time: 200, id: 2, name: 'B'|>,
- <|time: 400, id: 3, name: 'C'|>,
- <|time: 600, id: 4, name: 'B'|>,
- <|time: 800, id: 5, name: 'C'|>,
- <|time: 1000, id: 6, name: 'W'|>,
+ <|time: 0, name: 'A'|>,
+ <|time: 100, name: 'B'|>,
+ <|time: 200, name: 'C'|>,
+ <|time: 300, name: 'B'|>,
+ <|time: 400, name: 'C'|>,
+ <|time: 500, name: 'A'|>,
+ <|time: 600, name: 'B'|>,
+ <|time: 700, name: 'C'|>,
+ <|time: 800, name: 'W'|>,
]);
-
SELECT * FROM $input MATCH_RECOGNIZE (
ORDER BY CAST(time AS Timestamp)
MEASURES
- FIRST(A.id) as a_id,
- LAST(B_OR_C.id) as bc_id,
- LAST(C.id) as c_id
+ FIRST(A.time) AS a_time,
+ LAST(B_OR_C.time) AS bc_time,
+ LAST(C.time) AS c_time
PATTERN (A B_OR_C* C)
DEFINE
- A AS A.name ='A',
- B_OR_C AS (B_OR_C.name ='B' or B_OR_C.name ='C'),
- C AS C.name ='C'
- );
-
+ A AS A.name = 'A',
+ B_OR_C AS (B_OR_C.name = 'B' OR B_OR_C.name = 'C'),
+ C AS C.name = 'C'
+);
diff --git a/yql/essentials/tests/sql/suites/pg/aliased_columns.sql b/yql/essentials/tests/sql/suites/pg/aliased_columns.sql
new file mode 100644
index 0000000000..6ba1335386
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/pg/aliased_columns.sql
@@ -0,0 +1,2 @@
+--!syntax_pg
+select a, b, c from (select 1 as a, 2 as b, 3 as c) as "A.B" order by b
diff --git a/yt/cpp/mapreduce/client/client.cpp b/yt/cpp/mapreduce/client/client.cpp
index 43e3864ae2..a2d302212f 100644
--- a/yt/cpp/mapreduce/client/client.cpp
+++ b/yt/cpp/mapreduce/client/client.cpp
@@ -42,6 +42,7 @@
#include <yt/cpp/mapreduce/library/table_schema/protobuf.h>
+#include <yt/cpp/mapreduce/raw_client/raw_client.h>
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
#include <yt/cpp/mapreduce/raw_client/rpc_parameters_serialization.h>
@@ -90,10 +91,12 @@ void ApplyProxyUrlAliasingRules(TString& url)
////////////////////////////////////////////////////////////////////////////////
TClientBase::TClientBase(
+ IRawClientPtr rawClient,
const TClientContext& context,
const TTransactionId& transactionId,
IClientRetryPolicyPtr retryPolicy)
- : Context_(context)
+ : RawClient_(std::move(rawClient))
+ , Context_(context)
, TransactionId_(transactionId)
, ClientRetryPolicy_(std::move(retryPolicy))
{ }
@@ -101,7 +104,7 @@ TClientBase::TClientBase(
ITransactionPtr TClientBase::StartTransaction(
const TStartTransactionOptions& options)
{
- return MakeIntrusive<TTransaction>(GetParentClientImpl(), Context_, TransactionId_, options);
+ return MakeIntrusive<TTransaction>(RawClient_, GetParentClientImpl(), Context_, TransactionId_, options);
}
TNodeId TClientBase::Create(
@@ -138,7 +141,11 @@ void TClientBase::Set(
const TNode& value,
const TSetOptions& options)
{
- NRawClient::Set(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
+ RequestWithRetry<void>(
+ ClientRetryPolicy_->CreatePolicyForGenericRequest(),
+ [this, &path, &value, &options] (TMutationId& mutationId) {
+ RawClient_->Set(mutationId, TransactionId_, path, value, options);
+ });
}
void TClientBase::MultisetAttributes(
@@ -826,6 +833,11 @@ IClientPtr TClientBase::GetParentClient()
return GetParentClientImpl();
}
+IRawClientPtr TClientBase::GetRawClient() const
+{
+ return RawClient_;
+}
+
const TClientContext& TClientBase::GetContext() const
{
return Context_;
@@ -839,11 +851,12 @@ const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const
////////////////////////////////////////////////////////////////////////////////
TTransaction::TTransaction(
+ IRawClientPtr rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& parentTransactionId,
const TStartTransactionOptions& options)
- : TClientBase(context, parentTransactionId, parentClient->GetRetryPolicy())
+ : TClientBase(std::move(rawClient), context, parentTransactionId, parentClient->GetRetryPolicy())
, TransactionPinger_(parentClient->GetTransactionPinger())
, PingableTx_(
MakeHolder<TPingableTransaction>(
@@ -858,11 +871,12 @@ TTransaction::TTransaction(
}
TTransaction::TTransaction(
+ IRawClientPtr rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& transactionId,
const TAttachTransactionOptions& options)
- : TClientBase(context, transactionId, parentClient->GetRetryPolicy())
+ : TClientBase(std::move(rawClient), context, transactionId, parentClient->GetRetryPolicy())
, TransactionPinger_(parentClient->GetTransactionPinger())
, PingableTx_(
new TPingableTransaction(
@@ -928,10 +942,11 @@ TClientPtr TTransaction::GetParentClientImpl()
////////////////////////////////////////////////////////////////////////////////
TClient::TClient(
+ IRawClientPtr rawClient,
const TClientContext& context,
const TTransactionId& globalId,
IClientRetryPolicyPtr retryPolicy)
- : TClientBase(context, globalId, retryPolicy)
+ : TClientBase(std::move(rawClient), context, globalId, retryPolicy)
, TransactionPinger_(nullptr)
{ }
@@ -943,7 +958,7 @@ ITransactionPtr TClient::AttachTransaction(
{
CheckShutdown();
- return MakeIntrusive<TTransaction>(this, Context_, transactionId, options);
+ return MakeIntrusive<TTransaction>(RawClient_, this, Context_, transactionId, options);
}
void TClient::MountTable(
@@ -1435,9 +1450,15 @@ TClientPtr CreateClientImpl(
retryConfigProvider = CreateDefaultRetryConfigProvider();
}
+ auto rawClient = MakeIntrusive<THttpRawClient>(context);
+
EnsureInitialized();
- return new TClient(context, globalTxId, CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config));
+ return new TClient(
+ std::move(rawClient),
+ context,
+ globalTxId,
+ CreateDefaultClientRetryPolicy(retryConfigProvider, context.Config));
}
////////////////////////////////////////////////////////////////////////////////
diff --git a/yt/cpp/mapreduce/client/client.h b/yt/cpp/mapreduce/client/client.h
index 32c458316d..2a0398d056 100644
--- a/yt/cpp/mapreduce/client/client.h
+++ b/yt/cpp/mapreduce/client/client.h
@@ -5,6 +5,7 @@
#include "transaction_pinger.h"
#include <yt/cpp/mapreduce/interface/client.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/http/context.h>
#include <yt/cpp/mapreduce/http/requests.h>
@@ -29,6 +30,7 @@ class TClientBase
{
public:
TClientBase(
+ IRawClientPtr rawClient,
const TClientContext& context,
const TTransactionId& transactionId,
IClientRetryPolicyPtr retryPolicy);
@@ -222,6 +224,8 @@ public:
IClientPtr GetParentClient() override;
+ IRawClientPtr GetRawClient() const;
+
const TClientContext& GetContext() const;
const IClientRetryPolicyPtr& GetRetryPolicy() const;
@@ -232,6 +236,7 @@ protected:
virtual TClientPtr GetParentClientImpl() = 0;
protected:
+ const IRawClientPtr RawClient_;
const TClientContext Context_;
TTransactionId TransactionId_;
IClientRetryPolicyPtr ClientRetryPolicy_;
@@ -287,6 +292,7 @@ public:
//
// Start a new transaction.
TTransaction(
+ IRawClientPtr rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& parentTransactionId,
@@ -295,6 +301,7 @@ public:
//
// Attach an existing transaction.
TTransaction(
+ IRawClientPtr rawClient,
TClientPtr parentClient,
const TClientContext& context,
const TTransactionId& transactionId,
@@ -325,6 +332,7 @@ protected:
TClientPtr GetParentClientImpl() override;
private:
+ const IRawClientPtr RawClient_;
ITransactionPingerPtr TransactionPinger_;
THolder<TPingableTransaction> PingableTx_;
TClientPtr ParentClient_;
@@ -338,6 +346,7 @@ class TClient
{
public:
TClient(
+ IRawClientPtr rawClient,
const TClientContext& context,
const TTransactionId& globalId,
IClientRetryPolicyPtr retryPolicy);
diff --git a/yt/cpp/mapreduce/client/operation_preparer.cpp b/yt/cpp/mapreduce/client/operation_preparer.cpp
index 07d00e88d6..a6d424c5a1 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.cpp
+++ b/yt/cpp/mapreduce/client/operation_preparer.cpp
@@ -17,6 +17,7 @@
#include <yt/cpp/mapreduce/raw_client/raw_batch_request.h>
#include <yt/cpp/mapreduce/interface/error_codes.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
@@ -395,7 +396,8 @@ TJobPreparer::TJobPreparer(
size_t outputTableCount,
const TVector<TSmallJobFile>& smallFileList,
const TOperationOptions& options)
- : OperationPreparer_(operationPreparer)
+ : RawClient_(operationPreparer.GetClient()->GetRawClient())
+ , OperationPreparer_(operationPreparer)
, Spec_(spec)
, Options_(options)
, Layers_(spec.Layers_)
@@ -631,6 +633,7 @@ TMaybe<TString> TJobPreparer::TryUploadWithDeduplication(const IItemToUpload& it
CreateFileInCypress(cypressPath);
auto uploadTx = MakeIntrusive<TTransaction>(
+ OperationPreparer_.GetClient()->GetRawClient(),
OperationPreparer_.GetClient(),
OperationPreparer_.GetContext(),
TTransactionId(),
diff --git a/yt/cpp/mapreduce/client/operation_preparer.h b/yt/cpp/mapreduce/client/operation_preparer.h
index 54c978c0fb..41594eb52e 100644
--- a/yt/cpp/mapreduce/client/operation_preparer.h
+++ b/yt/cpp/mapreduce/client/operation_preparer.h
@@ -83,6 +83,7 @@ public:
bool ShouldRedirectStdoutToStderr() const;
private:
+ const IRawClientPtr RawClient_;
TOperationPreparer& OperationPreparer_;
TUserJobSpec Spec_;
TOperationOptions Options_;
diff --git a/yt/cpp/mapreduce/http/http.cpp b/yt/cpp/mapreduce/http/http.cpp
index ca243a929a..41ee56f672 100644
--- a/yt/cpp/mapreduce/http/http.cpp
+++ b/yt/cpp/mapreduce/http/http.cpp
@@ -207,7 +207,7 @@ void THttpHeader::AddOperationId(const TOperationId& operationId, bool overwrite
AddParameter("operation_id", GetGuidAsString(operationId), overwrite);
}
-void THttpHeader::AddMutationId()
+TMutationId THttpHeader::AddMutationId()
{
TGUID guid;
@@ -220,6 +220,8 @@ void THttpHeader::AddMutationId()
guid.dw[2] = GetPID() ^ MicroSeconds();
AddParameter("mutation_id", GetGuidAsString(guid), true);
+
+ return guid;
}
bool THttpHeader::HasMutationId() const
@@ -227,6 +229,10 @@ bool THttpHeader::HasMutationId() const
return Parameters_.contains("mutation_id");
}
+void THttpHeader::SetMutationId(TMutationId mutationId) {
+ AddParameter("mutation_id", GetGuidAsString(mutationId), /* overwrite */ true);
+}
+
void THttpHeader::SetToken(const TString& token)
{
Token_ = token;
diff --git a/yt/cpp/mapreduce/http/http.h b/yt/cpp/mapreduce/http/http.h
index 618b1e2c22..0f5e9034ee 100644
--- a/yt/cpp/mapreduce/http/http.h
+++ b/yt/cpp/mapreduce/http/http.h
@@ -59,8 +59,9 @@ public:
void AddTransactionId(const TTransactionId& transactionId, bool overwrite = false);
void AddPath(const TString& path, bool overwrite = false);
void AddOperationId(const TOperationId& operationId, bool overwrite = false);
- void AddMutationId();
+ TMutationId AddMutationId();
bool HasMutationId() const;
+ void SetMutationId(TMutationId mutationId);
void SetToken(const TString& token);
void SetProxyAddress(const TString& proxyAddress);
diff --git a/yt/cpp/mapreduce/http/retry_request.cpp b/yt/cpp/mapreduce/http/retry_request.cpp
index 0c719eeda3..3330cf696c 100644
--- a/yt/cpp/mapreduce/http/retry_request.cpp
+++ b/yt/cpp/mapreduce/http/retry_request.cpp
@@ -49,6 +49,7 @@ static TResponseInfo Request(
TResponseInfo RequestWithoutRetry(
const TClientContext& context,
+ TMutationId& mutationId,
THttpHeader& header,
TMaybe<TStringBuf> body,
const TRequestConfig& config)
@@ -64,8 +65,13 @@ TResponseInfo RequestWithoutRetry(
}
if (header.HasMutationId()) {
- header.RemoveParameter("retry");
- header.AddMutationId();
+ if (mutationId.IsEmpty()) {
+ header.RemoveParameter("retry");
+ mutationId = header.AddMutationId();
+ } else {
+ header.AddParameter("retry", true, /* overwrite */ true);
+ header.SetMutationId(mutationId);
+ }
}
auto requestId = CreateGuidAsString();
return Request(context, header, body, requestId, config);
diff --git a/yt/cpp/mapreduce/http/retry_request.h b/yt/cpp/mapreduce/http/retry_request.h
index a6007d6eae..19bf0a7c14 100644
--- a/yt/cpp/mapreduce/http/retry_request.h
+++ b/yt/cpp/mapreduce/http/retry_request.h
@@ -2,8 +2,13 @@
#include "fwd.h"
+#include <yt/cpp/mapreduce/interface/errors.h>
#include <yt/cpp/mapreduce/interface/fwd.h>
+#include <yt/cpp/mapreduce/interface/logging/yt_log.h>
+
#include <yt/cpp/mapreduce/common/fwd.h>
+#include <yt/cpp/mapreduce/common/retry_lib.h>
+#include <yt/cpp/mapreduce/common/wait_proxy.h>
#include <yt/cpp/mapreduce/http/http_client.h>
@@ -32,6 +37,64 @@ struct TRequestConfig
////////////////////////////////////////////////////////////////////////////////
+template <typename TResult>
+TResult RequestWithRetry(
+ IRequestRetryPolicyPtr retryPolicy,
+ std::function<TResult(TMutationId&)> func)
+{
+ bool useSameMutationId = false;
+ TMutationId mutationId;
+
+ while (true) {
+ try {
+ retryPolicy->NotifyNewAttempt();
+ if constexpr (std::is_same_v<TResult, void>) {
+ func(mutationId);
+ return;
+ } else {
+ return func(mutationId);
+ }
+ } catch (const TErrorResponse& e) {
+ YT_LOG_ERROR("Retry failed %v - %v",
+ e.GetError().GetMessage(),
+ retryPolicy->GetAttemptDescription());
+
+ useSameMutationId = e.IsTransportError();
+
+ if (!IsRetriable(e)) {
+ throw;
+ }
+
+ auto maybeRetryTimeout = retryPolicy->OnRetriableError(e);
+ if (maybeRetryTimeout) {
+ TWaitProxy::Get()->Sleep(*maybeRetryTimeout);
+ } else {
+ throw;
+ }
+ } catch (const std::exception& e) {
+ YT_LOG_ERROR("Retry failed %v - %v",
+ e.what(),
+ retryPolicy->GetAttemptDescription());
+
+ useSameMutationId = true;
+
+ if (!IsRetriable(e)) {
+ throw;
+ }
+
+ auto maybeRetryTimeout = retryPolicy->OnGenericError(e);
+ if (maybeRetryTimeout) {
+ TWaitProxy::Get()->Sleep(*maybeRetryTimeout);
+ } else {
+ throw;
+ }
+ }
+ if (!useSameMutationId) {
+ mutationId = {};
+ }
+ }
+}
+
// Retry request with given `header' and `body' using `retryPolicy'.
// If `retryPolicy == nullptr' use default, currently `TAttemptLimitedRetryPolicy(TConfig::Get()->RetryCount)`.
TResponseInfo RetryRequestWithPolicy(
@@ -43,6 +106,7 @@ TResponseInfo RetryRequestWithPolicy(
TResponseInfo RequestWithoutRetry(
const TClientContext& context,
+ TMutationId& mutationId,
THttpHeader& header,
TMaybe<TStringBuf> body = {},
const TRequestConfig& config = TRequestConfig());
diff --git a/yt/cpp/mapreduce/interface/fwd.h b/yt/cpp/mapreduce/interface/fwd.h
index 485b45129a..162a6ee3e9 100644
--- a/yt/cpp/mapreduce/interface/fwd.h
+++ b/yt/cpp/mapreduce/interface/fwd.h
@@ -150,6 +150,7 @@ namespace NYT {
// common.h
////////////////////////////////////////////////////////////////////////////////
+ using TMutationId = TGUID;
using TTransactionId = TGUID;
using TNodeId = TGUID;
using TLockId = TGUID;
@@ -396,5 +397,12 @@ namespace NYT {
struct TRetryConfig;
class IRetryConfigProvider;
using IRetryConfigProviderPtr = ::TIntrusivePtr<IRetryConfigProvider>;
+
+ ////////////////////////////////////////////////////////////////////////////////
+ // raw_client.h
+ ////////////////////////////////////////////////////////////////////////////////
+
+ class IRawClient;
+ using IRawClientPtr = ::TIntrusivePtr<IRawClient>;
}
/// @endcond
diff --git a/yt/cpp/mapreduce/interface/raw_client.h b/yt/cpp/mapreduce/interface/raw_client.h
new file mode 100644
index 0000000000..b1d244ad78
--- /dev/null
+++ b/yt/cpp/mapreduce/interface/raw_client.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include "client_method_options.h"
+
+namespace NYT {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class IRawClient
+ : public virtual TThrRefBase
+{
+public:
+ // Cypress
+
+ virtual void Set(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TNode& value,
+ const TSetOptions& options = {}) = 0;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.cpp b/yt/cpp/mapreduce/raw_client/raw_client.cpp
new file mode 100644
index 0000000000..9a8a9fca84
--- /dev/null
+++ b/yt/cpp/mapreduce/raw_client/raw_client.cpp
@@ -0,0 +1,33 @@
+#include "raw_client.h"
+
+#include "rpc_parameters_serialization.h"
+
+#include <yt/cpp/mapreduce/http/http.h>
+#include <yt/cpp/mapreduce/http/retry_request.h>
+
+#include <library/cpp/yson/node/node_io.h>
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+THttpRawClient::THttpRawClient(const TClientContext& context)
+ : Context_(context)
+{ }
+
+void THttpRawClient::Set(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TNode& value,
+ const TSetOptions& options)
+{
+ THttpHeader header("PUT", "set");
+ header.MergeParameters(NRawClient::SerializeParamsForSet(transactionId, Context_.Config->Prefix, path, options));
+ auto body = NodeToYsonString(value);
+ RequestWithoutRetry(Context_, mutationId, header, body);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/raw_client/raw_client.h b/yt/cpp/mapreduce/raw_client/raw_client.h
new file mode 100644
index 0000000000..def521a918
--- /dev/null
+++ b/yt/cpp/mapreduce/raw_client/raw_client.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include <yt/cpp/mapreduce/http/context.h>
+
+#include <yt/cpp/mapreduce/interface/client_method_options.h>
+#include <yt/cpp/mapreduce/interface/raw_client.h>
+
+namespace NYT::NDetail {
+
+////////////////////////////////////////////////////////////////////////////////
+
+class THttpRawClient
+ : public IRawClient
+{
+public:
+ THttpRawClient(const TClientContext& context);
+
+ // Cypress
+
+ void Set(
+ TMutationId& mutationId,
+ const TTransactionId& transactionId,
+ const TYPath& path,
+ const TNode& value,
+ const TSetOptions& options = {}) override;
+
+private:
+ const TClientContext Context_;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+} // namespace NYT::NDetail
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.cpp b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
index 0b7bcf2c66..d10c818489 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.cpp
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.cpp
@@ -173,9 +173,10 @@ TNodeId CopyWithoutRetries(
const TCopyOptions& options)
{
THttpHeader header("POST", "copy");
+ TMutationId mutationId;
header.AddMutationId();
header.MergeParameters(SerializeParamsForCopy(transactionId, context.Config->Prefix, sourcePath, destinationPath, options));
- return ParseGuidFromResponse(RequestWithoutRetry(context, header).Response);
+ return ParseGuidFromResponse(RequestWithoutRetry(context, mutationId, header).Response);
}
TNodeId CopyInsideMasterCell(
@@ -204,9 +205,10 @@ TNodeId MoveWithoutRetries(
const TMoveOptions& options)
{
THttpHeader header("POST", "move");
+ TMutationId mutationId;
header.AddMutationId();
header.MergeParameters(SerializeParamsForMove(transactionId, context.Config->Prefix, sourcePath, destinationPath, options));
- return ParseGuidFromResponse(RequestWithoutRetry( context, header).Response);
+ return ParseGuidFromResponse(RequestWithoutRetry(context, mutationId, header).Response);
}
TNodeId MoveInsideMasterCell(
@@ -309,9 +311,10 @@ void Concatenate(
const TConcatenateOptions& options)
{
THttpHeader header("POST", "concatenate");
+ TMutationId mutationId;
header.AddMutationId();
header.MergeParameters(SerializeParamsForConcatenate(transactionId, context.Config->Prefix, sourcePaths, destinationPath, options));
- RequestWithoutRetry(context, header);
+ RequestWithoutRetry(context, mutationId, header);
}
void PingTx(
diff --git a/yt/cpp/mapreduce/raw_client/raw_requests.h b/yt/cpp/mapreduce/raw_client/raw_requests.h
index 0a183d617c..8cb226ab86 100644
--- a/yt/cpp/mapreduce/raw_client/raw_requests.h
+++ b/yt/cpp/mapreduce/raw_client/raw_requests.h
@@ -53,14 +53,6 @@ TNode TryGet(
const TYPath& path,
const TGetOptions& options);
-void Set(
- const IRequestRetryPolicyPtr& retryPolicy,
- const TClientContext& context,
- const TTransactionId& transactionId,
- const TYPath& path,
- const TNode& value,
- const TSetOptions& options = TSetOptions());
-
void MultisetAttributes(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,
diff --git a/yt/cpp/mapreduce/raw_client/ya.make b/yt/cpp/mapreduce/raw_client/ya.make
index c201b86f05..a038b2931f 100644
--- a/yt/cpp/mapreduce/raw_client/ya.make
+++ b/yt/cpp/mapreduce/raw_client/ya.make
@@ -4,6 +4,7 @@ INCLUDE(${ARCADIA_ROOT}/yt/ya_cpp.make.inc)
SRCS(
raw_batch_request.cpp
+ raw_client.cpp
raw_requests.cpp
rpc_parameters_serialization.cpp
)