aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@ydb.tech>2023-10-12 11:25:55 +0000
committerDaniil Cherednik <dcherednik@ydb.tech>2023-10-12 11:25:55 +0000
commitf0615a5d3f8862c187baef6731ee8da3da0c7322 (patch)
tree10fb3fd81118f8b17b8ddf72e391eb1c2d0a5280
parent052aa976d2efa2bf85bccddadd3390e2892ea3a9 (diff)
downloadydb-23.3.13.tar.gz
Ydb stable 23-3-1323.3.13
x-stable-origin-commit: 7103e48a006e617d91bf3587e6daa7c3927b7b82
-rw-r--r--ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp104
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp4
-rw-r--r--ydb/core/protos/tx.proto7
-rw-r--r--ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp13
-rw-r--r--ydb/core/tx/coordinator/coordinator_impl.cpp6
-rw-r--r--ydb/core/tx/coordinator/coordinator_impl.h3
-rw-r--r--ydb/core/tx/datashard/datashard.cpp8
-rw-r--r--ydb/core/tx/datashard/datashard_ut_snapshot.cpp91
-rw-r--r--ydb/core/tx/time_cast/time_cast.cpp85
-rw-r--r--ydb/core/tx/tx.h20
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_flow1.cpp12
-rw-r--r--ydb/library/yql/core/common_opt/yql_co_simple1.cpp15
-rw-r--r--ydb/library/yql/core/extract_predicate/extract_predicate_impl.cpp85
13 files changed, 413 insertions, 40 deletions
diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
index 2633b343e1..92920ca80a 100644
--- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
+++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
@@ -157,6 +157,110 @@ Y_UNIT_TEST_SUITE(KqpIndexMetadata) {
}
}
+ void TestNoReadFromMainTableBeforeJoin(bool UseExtractPredicates) {
+ using namespace NYql;
+ using namespace NYql::NNodes;
+
+ TKikimrSettings settings;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetPredicateExtract20(UseExtractPredicates);
+ settings.SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr(settings);
+
+ auto& server = kikimr.GetTestServer();
+ auto gateway = GetIcGateway(server);
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ {
+ const TString createTableSql(R"(
+ --!syntax_v1
+ CREATE TABLE `/Root/tg` (
+ id Utf8, b Utf8, am Decimal(22, 9), cur Utf8, pa_id Utf8, system_date Timestamp, status Utf8, type Utf8, product Utf8,
+ PRIMARY KEY (b, id),
+ INDEX tg_index GLOBAL SYNC ON (`b`, `pa_id`, `system_date`, `id`)
+ COVER(status, type, product, am)
+ );)");
+ auto result = session.ExecuteSchemeQuery(createTableSql).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ }
+
+ // core optimizer should inject CoExtractMembert over KqlReadTableIndex with columns set based on ORDER BY
+ // after that KqlReadTableIndex has all columns present in index and should be rewriten in to index read
+ // limit must be applied in to this (index read) stage.
+ // As result we have limited numbers of lookups from main table
+
+ {
+ const TString query(Q1_(R"(
+ --!syntax_v1
+
+ DECLARE $b_1 AS Text;
+ DECLARE $pa_id_1 AS Text;
+ DECLARE $b_2 AS Text;
+ DECLARE $constant_param_1 AS Timestamp;
+ DECLARE $constant_param_2 AS Text;
+ DECLARE $type_1 AS List<Text>;
+ DECLARE $status_1 AS Text;
+ DECLARE $am_1 AS Decimal(22, 9);
+
+ SELECT *
+ FROM tg
+ WHERE (`tg`.`b` = $b_1) AND (`tg`.`id` IN (
+ SELECT `id`
+ FROM (
+ SELECT *
+ FROM `/Root/tg` VIEW tg_index AS tg
+ WHERE (`tg`.`pa_id` = $pa_id_1)
+ AND (`tg`.`b` = $b_2)
+ AND ((`tg`.`system_date`, `tg`.`id`) <= ($constant_param_1, $constant_param_2))
+ AND (`tg`.`type` NOT IN $type_1)
+ AND (`tg`.`status` <> $status_1)
+ AND (`tg`.`am` <> $am_1)
+ ORDER BY
+ `tg`.`b` DESC,
+ `tg`.`pa_id` DESC,
+ `tg`.`system_date` DESC,
+ `tg`.`id` DESC
+ LIMIT 11)
+ ))
+ ORDER BY
+ `tg`.`system_date` DESC,
+ `tg`.`id` DESC
+
+ )"));
+
+ auto explainResult = session.ExplainDataQuery(query).GetValueSync();
+ UNIT_ASSERT_C(explainResult.IsSuccess(), explainResult.GetIssues().ToString());
+
+ Cerr << explainResult.GetAst() << Endl;
+ UNIT_ASSERT_C(explainResult.GetAst().Contains("'('\"Reverse\")"), explainResult.GetAst());
+ UNIT_ASSERT_C(explainResult.GetAst().Contains("'('\"Sorted\")"), explainResult.GetAst());
+
+ NJson::TJsonValue plan;
+ NJson::ReadJsonTree(explainResult.GetPlan(), &plan, true);
+ UNIT_ASSERT(ValidatePlanNodeIds(plan));
+ Cerr << plan << Endl;
+ auto mainTableAccess = CountPlanNodesByKv(plan, "Table", "tg");
+ UNIT_ASSERT_VALUES_EQUAL(mainTableAccess, 1);
+
+ auto indexTableAccess = CountPlanNodesByKv(plan, "Table", "tg/tg_index/indexImplTable");
+ UNIT_ASSERT_VALUES_EQUAL(indexTableAccess, 1);
+
+ auto filterOnIndex = CountPlanNodesByKv(plan, "Node Type", "Limit-Filter-TablePointLookup");
+ UNIT_ASSERT_VALUES_EQUAL(filterOnIndex, 1);
+
+ auto limitFilterNode = FindPlanNodeByKv(plan, "Node Type", "Limit-Filter-TablePointLookup");
+ auto val = FindPlanNodes(limitFilterNode, "Limit");
+ UNIT_ASSERT_VALUES_EQUAL(val.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(val[0], "11");
+ }
+ }
+
+ Y_UNIT_TEST_TWIN(TestNoReadFromMainTableBeforeJoin, ExtractPredicate) {
+ TestNoReadFromMainTableBeforeJoin(ExtractPredicate);
+ }
+
Y_UNIT_TEST(HandleWriteOnlyIndex) {
using namespace NYql;
using namespace NYql::NNodes;
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index 325555f644..8d8bf4923d 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -3871,10 +3871,6 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
}
Y_UNIT_TEST_TWIN(ComplexLookupLimit, NewPredicateExtract) {
- if (NewPredicateExtract) {
- return;
- }
-
TKikimrSettings settings;
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetPredicateExtract20(NewPredicateExtract);
diff --git a/ydb/core/protos/tx.proto b/ydb/core/protos/tx.proto
index eaafd0ef31..7956786702 100644
--- a/ydb/core/protos/tx.proto
+++ b/ydb/core/protos/tx.proto
@@ -131,6 +131,13 @@ message TEvUpdatedLastStep {
optional fixed64 LastStep = 3;
}
+// Time cast to coordinators
+// Notifies coordinator that clients are waiting for some specific steps
+message TEvRequirePlanSteps {
+ optional fixed64 CoordinatorID = 1;
+ repeated uint64 PlanSteps = 2 [packed = true];
+}
+
// coordinator to mediator
message TCoordinatorTransaction {
repeated uint64 AffectedSet = 1; // read and write set - joined and then filtered for concrete mediator
diff --git a/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp b/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp
index 73a6232523..52909ae7f9 100644
--- a/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp
+++ b/ydb/core/tx/coordinator/coordinator__last_step_subscriptions.cpp
@@ -184,6 +184,19 @@ namespace NKikimr::NFlatTxCoordinator {
}
}
+ void TTxCoordinator::Handle(TEvTxProxy::TEvRequirePlanSteps::TPtr& ev) {
+ auto* msg = ev->Get();
+ for (ui64 step : msg->Record.GetPlanSteps()) {
+ // Note: we could schedule an exact volatile step here in the future
+ step = AlignPlanStep(step);
+ // Note: this is not a sibling step, but it behaves similar enough
+ // so we reuse the same queue here.
+ if (step > VolatileState.LastPlanned && PendingSiblingSteps.insert(step).second) {
+ SchedulePlanTickExact(step);
+ }
+ }
+ }
+
void TTxCoordinator::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
auto* msg = ev->Get();
if (auto* state = Siblings.FindPtr(msg->TabletId); state && state->Subscribed) {
diff --git a/ydb/core/tx/coordinator/coordinator_impl.cpp b/ydb/core/tx/coordinator/coordinator_impl.cpp
index 495114a37f..d8118674f1 100644
--- a/ydb/core/tx/coordinator/coordinator_impl.cpp
+++ b/ydb/core/tx/coordinator/coordinator_impl.cpp
@@ -272,8 +272,12 @@ void TTxCoordinator::SchedulePlanTickAligned(ui64 next) {
return;
}
+ SchedulePlanTickExact(AlignPlanStep(next));
+}
+
+ui64 TTxCoordinator::AlignPlanStep(ui64 step) {
const ui64 resolution = Config.Resolution;
- SchedulePlanTickExact((next + resolution - 1) / resolution * resolution);
+ return ((step + resolution - 1) / resolution * resolution);
}
void TTxCoordinator::Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx) {
diff --git a/ydb/core/tx/coordinator/coordinator_impl.h b/ydb/core/tx/coordinator/coordinator_impl.h
index 5c0dde89b5..247d918bcb 100644
--- a/ydb/core/tx/coordinator/coordinator_impl.h
+++ b/ydb/core/tx/coordinator/coordinator_impl.h
@@ -651,6 +651,7 @@ private:
void SubscribeToSibling(TSiblingState& state);
void UnsubscribeFromSiblings();
void Handle(TEvTxProxy::TEvUpdatedLastStep::TPtr &ev);
+ void Handle(TEvTxProxy::TEvRequirePlanSteps::TPtr &ev);
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev);
void Handle(TEvPrivate::TEvPlanTick::TPtr &ev, const TActorContext &ctx);
@@ -674,6 +675,7 @@ private:
void SchedulePlanTick();
void SchedulePlanTickExact(ui64 next);
void SchedulePlanTickAligned(ui64 next);
+ ui64 AlignPlanStep(ui64 step);
bool RestoreMediatorInfo(TTabletId mediatorId, TVector<TAutoPtr<TMediatorStep>> &planned, TTransactionContext &txc, /*TKeyBuilder &kb, */THashMap<TTxId,TVector<TTabletId>> &pushToAffected) const;
void TryInitMonCounters(const TActorContext &ctx);
@@ -737,6 +739,7 @@ public:
HFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
HFunc(TEvPrivate::TEvRestoredProcessingParams, Handle);
hFunc(TEvTxProxy::TEvUpdatedLastStep, Handle);
+ hFunc(TEvTxProxy::TEvRequirePlanSteps, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
)
diff --git a/ydb/core/tx/datashard/datashard.cpp b/ydb/core/tx/datashard/datashard.cpp
index 4c680758ab..bcdf22a1d2 100644
--- a/ydb/core/tx/datashard/datashard.cpp
+++ b/ydb/core/tx/datashard/datashard.cpp
@@ -2285,9 +2285,17 @@ void TDataShard::CheckMediatorStateRestored() {
// writes before the restart, and conversely don't accidentally read any
// data that is definitely not replied yet.
if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) {
+ const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step;
const TRowVersion edge(GetMaxObservedStep(), Max<ui64>());
SnapshotManager.PromoteImmediateWriteEdgeReplied(
Min(edge, SnapshotManager.GetImmediateWriteEdge()));
+ // Try to ensure writes become visible sooner rather than later
+ if (edge.Step < writeStep) {
+ if (MediatorTimeCastWaitingSteps.insert(writeStep).second) {
+ Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), writeStep));
+ LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << writeStep << " from mediator time cast");
+ }
+ }
}
MediatorStateWaiting = false;
diff --git a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
index cce17c176a..8bad05bcb7 100644
--- a/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_snapshot.cpp
@@ -3773,6 +3773,97 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
"{ items { uint32_value: 3 } items { uint32_value: 3 } }");
}
+ Y_UNIT_TEST(ReadIteratorLocalSnapshotThenWrite) {
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(true);
+
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetDomainPlanResolution(100)
+ .SetAppConfig(app);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto &runtime = *server->GetRuntime();
+ auto sender = runtime.AllocateEdgeActor();
+
+ runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
+ runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
+
+ InitRoot(server, sender);
+
+ TDisableDataShardLogBatching disableDataShardLogBatching;
+ auto opts = TShardedTableOptions()
+ .Shards(1)
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", "Uint32", false, false}});
+ CreateShardedTable(server, sender, "/Root", "table-1", opts);
+ CreateShardedTable(server, sender, "/Root", "table-2", opts);
+
+ // Perform a snapshot read, this will persist "reads from snapshots" flag
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-1`
+ UNION ALL
+ SELECT key, value
+ FROM `/Root/table-2`
+ )")),
+ "");
+
+ // Insert rows using a single-shard write
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 1), (2, 2), (3, 3)"));
+
+ // Wait until mediator goes idle
+ size_t timecastUpdates = 0;
+ auto observer = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case TEvMediatorTimecast::TEvUpdate::EventType: {
+ ++timecastUpdates;
+ break;
+ }
+ case TEvDataShard::TEvRead::EventType: {
+ auto* msg = ev->Get<TEvDataShard::TEvRead>();
+ msg->Record.SetMaxRowsInResult(1);
+ break;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+ auto prevObserverFunc = runtime.SetObserverFunc(observer);
+
+ auto waitFor = [&](const auto& condition, const TString& description) {
+ if (!condition()) {
+ Cerr << "... waiting for " << description << Endl;
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() {
+ return condition();
+ };
+ runtime.DispatchEvents(options);
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
+ }
+ };
+
+ waitFor([&]{ return timecastUpdates >= 3; }, "at least 3 timecast updates");
+
+ UNIT_ASSERT_VALUES_EQUAL(
+ KqpSimpleExec(runtime, Q_(R"(
+ SELECT key, value
+ FROM `/Root/table-1`
+ ORDER BY key
+ )")),
+ "{ items { uint32_value: 1 } items { uint32_value: 1 } }, "
+ "{ items { uint32_value: 2 } items { uint32_value: 2 } }, "
+ "{ items { uint32_value: 3 } items { uint32_value: 3 } }");
+
+ auto start = runtime.GetCurrentTime();
+ ExecSQL(server, sender, Q_("UPSERT INTO `/Root/table-1` (key, value) VALUES (1, 11), (2, 22), (3, 33)"));
+ auto duration = runtime.GetCurrentTime() - start;
+ UNIT_ASSERT_C(duration <= TDuration::MilliSeconds(200), "UPSERT takes too much time: " << duration);
+ }
+
}
} // namespace NKikimr
diff --git a/ydb/core/tx/time_cast/time_cast.cpp b/ydb/core/tx/time_cast/time_cast.cpp
index f6e3398546..455777f203 100644
--- a/ydb/core/tx/time_cast/time_cast.cpp
+++ b/ydb/core/tx/time_cast/time_cast.cpp
@@ -2,6 +2,7 @@
#include <library/cpp/actors/core/hfunc.h>
#include <library/cpp/actors/core/log.h>
+#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/base/tx_processing.h>
#include <ydb/core/protos/subdomains.pb.h>
#include <ydb/library/services/services.pb.h>
@@ -32,6 +33,21 @@ void TMediatorTimecastEntry::Update(ui64 step, ui64 *exemption, ui64 exsz) {
}
class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
+ struct TEvPrivate {
+ enum EEv {
+ EvRetryCoordinator = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+ EvEnd,
+ };
+
+ struct TEvRetryCoordinator : public TEventLocal<TEvRetryCoordinator, EvRetryCoordinator> {
+ ui64 Coordinator;
+
+ TEvRetryCoordinator(ui64 coordinator)
+ : Coordinator(coordinator)
+ {}
+ };
+ };
+
struct TWaiter {
TActorId Sender;
ui64 TabletId;
@@ -59,6 +75,7 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
struct TMediator {
const ui32 BucketsSz;
TArrayHolder<TMediatorBucket> Buckets;
+ std::vector<ui64> Coordinators;
TActorId PipeClient;
@@ -68,6 +85,12 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
{}
};
+ struct TMediatorCoordinator {
+ std::set<ui64> WaitingSteps;
+ bool RetryPending = false;
+ bool Subscribed = false;
+ };
+
struct TTabletInfo {
ui64 MediatorTabletId;
TMediator* Mediator;
@@ -92,6 +115,7 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
};
THashMap<ui64, TMediator> Mediators; // mediator tablet -> info
+ THashMap<ui64, TMediatorCoordinator> MediatorCoordinators; // coordinator tablet -> info
THashMap<ui64, TTabletInfo> Tablets;
ui64 LastSeqNo = 0;
@@ -103,6 +127,11 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
if (!pr.second) {
Y_VERIFY(pr.first->second.BucketsSz == processing.GetTimeCastBucketsPerMediator());
}
+ if (pr.first->second.Coordinators.empty()) {
+ pr.first->second.Coordinators.assign(
+ processing.GetCoordinators().begin(),
+ processing.GetCoordinators().end());
+ }
return pr.first->second;
}
@@ -165,6 +194,8 @@ class TMediatorTimecastProxy : public TActor<TMediatorTimecastProxy> {
void Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx);
void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev, const TActorContext &ctx);
+ void Handle(TEvPrivate::TEvRetryCoordinator::TPtr &ev, const TActorContext &ctx);
// Client requests for readstep subscriptions
void Handle(TEvMediatorTimecast::TEvSubscribeReadStep::TPtr &ev, const TActorContext &ctx);
@@ -200,6 +231,8 @@ public:
HFunc(TEvTabletPipe::TEvClientConnected, Handle);
HFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ HFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
+ HFunc(TEvPrivate::TEvRetryCoordinator, Handle);
}
}
};
@@ -278,6 +311,17 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvWaitPlanStep::TPtr &
const ui64 currentStep = bucket.Entry->Get(tabletId);
if (currentStep < planStep) {
bucket.Waiters.emplace(ev->Sender, tabletId, planStep);
+ for (ui64 coordinatorId : mediator.Coordinators) {
+ TMediatorCoordinator &coordinator = MediatorCoordinators[coordinatorId];
+ if (coordinator.WaitingSteps.insert(planStep).second && !coordinator.RetryPending) {
+ Send(MakePipePeNodeCacheID(false),
+ new TEvPipeCache::TEvForward(
+ new TEvTxProxy::TEvRequirePlanSteps(coordinatorId, planStep),
+ coordinatorId,
+ !coordinator.Subscribed));
+ coordinator.Subscribed = true;
+ }
+ }
return;
}
@@ -301,6 +345,37 @@ void TMediatorTimecastProxy::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev,
TryResync(msg->ClientId, msg->TabletId, ctx);
}
+void TMediatorTimecastProxy::Handle(TEvPipeCache::TEvDeliveryProblem::TPtr &ev, const TActorContext &ctx) {
+ auto *msg = ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID.ToString()
+ << " HANDLE EvDeliveryProblem " << msg->TabletId);
+ auto it = MediatorCoordinators.find(msg->TabletId);
+ if (it != MediatorCoordinators.end()) {
+ Y_VERIFY_DEBUG(!it->second.RetryPending);
+ Schedule(TDuration::MilliSeconds(5), new TEvPrivate::TEvRetryCoordinator(msg->TabletId));
+ it->second.RetryPending = true;
+ it->second.Subscribed = false;
+ }
+}
+
+void TMediatorTimecastProxy::Handle(TEvPrivate::TEvRetryCoordinator::TPtr &ev, const TActorContext &ctx) {
+ auto *msg = ev->Get();
+ LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID.ToString()
+ << " HANDLE EvRetryCoordinator " << msg->Coordinator);
+ auto it = MediatorCoordinators.find(msg->Coordinator);
+ if (it != MediatorCoordinators.end() && it->second.RetryPending) {
+ it->second.RetryPending = false;
+ if (!it->second.WaitingSteps.empty()) {
+ Send(MakePipePeNodeCacheID(false),
+ new TEvPipeCache::TEvForward(
+ new TEvTxProxy::TEvRequirePlanSteps(msg->Coordinator, it->second.WaitingSteps),
+ msg->Coordinator,
+ true));
+ it->second.Subscribed = true;
+ }
+ }
+}
+
void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, const TActorContext &ctx) {
LOG_DEBUG_S(ctx, NKikimrServices::TX_MEDIATOR_TIMECAST, "Actor# " << ctx.SelfID.ToString()
<< " HANDLE "<< ev->Get()->ToString());
@@ -314,6 +389,7 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co
auto &mediator = it->second;
Y_VERIFY(record.GetBucket() < mediator.BucketsSz);
auto &bucket = mediator.Buckets[record.GetBucket()];
+ const ui64 step = record.GetTimeBarrier();
switch (bucket.Entry.RefCount()) {
case 0:
break;
@@ -322,7 +398,6 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co
bucket.Waiters = { };
break;
default: {
- const ui64 step = record.GetTimeBarrier();
bucket.Entry->Update(step, nullptr, 0);
THashSet<std::pair<TActorId, ui64>> processed; // a set of processed tablets
while (!bucket.Waiters.empty()) {
@@ -338,6 +413,14 @@ void TMediatorTimecastProxy::Handle(TEvMediatorTimecast::TEvUpdate::TPtr &ev, co
break;
}
}
+ for (ui64 coordinatorId : mediator.Coordinators) {
+ auto it = MediatorCoordinators.find(coordinatorId);
+ if (it != MediatorCoordinators.end()) {
+ while (!it->second.WaitingSteps.empty() && *it->second.WaitingSteps.begin() <= step) {
+ it->second.WaitingSteps.erase(it->second.WaitingSteps.begin());
+ }
+ }
+ }
}
}
diff --git a/ydb/core/tx/tx.h b/ydb/core/tx/tx.h
index 116c79549d..c59b3339e7 100644
--- a/ydb/core/tx/tx.h
+++ b/ydb/core/tx/tx.h
@@ -18,6 +18,7 @@ struct TEvTxProxy {
EvUnsubscribeReadStep,
EvSubscribeLastStep,
EvUnsubscribeLastStep,
+ EvRequirePlanSteps,
EvProposeTransactionStatus = EvProposeTransaction + 1 * 512,
EvAcquireReadStepResult,
@@ -162,6 +163,25 @@ struct TEvTxProxy {
Record.SetLastStep(lastStep);
}
};
+
+ struct TEvRequirePlanSteps
+ : public TEventPB<TEvRequirePlanSteps, NKikimrTx::TEvRequirePlanSteps, EvRequirePlanSteps>
+ {
+ TEvRequirePlanSteps() = default;
+
+ TEvRequirePlanSteps(ui64 coordinator, ui64 planStep) {
+ Record.SetCoordinatorID(coordinator);
+ Record.AddPlanSteps(planStep);
+ }
+
+ TEvRequirePlanSteps(ui64 coordinator, const std::set<ui64>& planSteps) {
+ Record.SetCoordinatorID(coordinator);
+ Record.MutablePlanSteps()->Reserve(planSteps.size());
+ for (ui64 planStep : planSteps) {
+ Record.AddPlanSteps(planStep);
+ }
+ }
+ };
};
// basic
diff --git a/ydb/library/yql/core/common_opt/yql_co_flow1.cpp b/ydb/library/yql/core/common_opt/yql_co_flow1.cpp
index 4e27e05694..fa3e81da7c 100644
--- a/ydb/library/yql/core/common_opt/yql_co_flow1.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_flow1.cpp
@@ -1508,7 +1508,7 @@ void RegisterCoFlowCallables1(TCallableOptimizerMap& map) {
if (!optCtx.IsSingleUsage(node->Head()) && !optCtx.IsPersistentNode(node->Head())) {
return node;
}
-/*TODO: Enable later. Providers is not ready right now.
+
if (node->Head().IsCallable("Sort")) {
YQL_CLOG(DEBUG, Core) << "Fuse " << node->Content() << " over " << node->Head().Content();
auto children = node->Head().ChildrenList();
@@ -1516,7 +1516,15 @@ void RegisterCoFlowCallables1(TCallableOptimizerMap& map) {
children.emplace(++it, node->TailPtr());
return ctx.NewCallable(node->Pos(), "TopSort", std::move(children));
}
-*/
+
+ if (node->Head().IsCallable("ExtractMembers") && node->Head().Head().IsCallable("Sort") && optCtx.IsSingleUsage(node->Head().Head())) {
+ YQL_CLOG(DEBUG, Core) << "Fuse " << node->Content() << " over " << node->Head().Content() << " over " << node->Head().Head().Content();
+ auto children = node->Head().Head().ChildrenList();
+ auto it = children.cbegin();
+ children.emplace(++it, node->TailPtr());
+ return ctx.ChangeChild(node->Head(), 0U, ctx.NewCallable(node->Pos(), "TopSort", std::move(children)));
+ }
+
if (node->Head().IsCallable({"Top", "TopSort"})) {
YQL_CLOG(DEBUG, Core) << "Fuse " << node->Content() << " over " << node->Head().Content();
return ctx.ChangeChild(node->Head(), 1U, ctx.NewCallable(node->Pos(), "Min", {node->TailPtr(), node->Head().ChildPtr(1)}));
diff --git a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
index 1b9af4fe1e..eb80faa1e8 100644
--- a/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
+++ b/ydb/library/yql/core/common_opt/yql_co_simple1.cpp
@@ -2554,8 +2554,19 @@ TExprNode::TPtr OptimizeReorder(const TExprNode::TPtr& node, TExprContext& ctx)
}
if (count <= 1) {
- YQL_CLOG(DEBUG, Core) << node->Content() << " over 0/1 literals";
- return ctx.RenameNode(*node, "AssumeSorted");
+ YQL_CLOG(DEBUG, Core) << node->Content() << " over " << count << " literals.";
+ if constexpr (IsTop) {
+ return ctx.Builder(node->Pos())
+ .Callable("AssumeSorted")
+ .Callable(0, "Take")
+ .Add(0, node->HeadPtr())
+ .Add(1, node->ChildPtr(1))
+ .Seal()
+ .Add(1, node->ChildPtr(2))
+ .Add(2, node->ChildPtr(3))
+ .Seal().Build();
+ } else
+ return ctx.RenameNode(*node, "AssumeSorted");
}
}
diff --git a/ydb/library/yql/core/extract_predicate/extract_predicate_impl.cpp b/ydb/library/yql/core/extract_predicate/extract_predicate_impl.cpp
index 1b09e09974..f38379da18 100644
--- a/ydb/library/yql/core/extract_predicate/extract_predicate_impl.cpp
+++ b/ydb/library/yql/core/extract_predicate/extract_predicate_impl.cpp
@@ -1207,48 +1207,72 @@ TExprNode::TPtr DoRebuildRangeForIndexKeys(const TStructExprType& rowType, const
}
TVector<TNodeAndIndexRange> rests;
- TMap<TIndexRange, TVector<TNodeAndIndexRange>> children;
+ Sort(toRebuild.begin(), toRebuild.end(),
+ [&](const auto& fs, const auto& sc) {
+ return fs.IndexRange < sc.IndexRange;
+ });
+ TMap<size_t, TVector<TNodeAndIndexRange>> byBegin;
+ if (!toRebuild.empty()) {
+ byBegin[toRebuild[0].IndexRange.Begin] = {};
+ }
+
for (auto& current : toRebuild) {
if (current.IndexRange.IsEmpty()) {
YQL_ENSURE(current.Node->IsCallable("RangeRest"));
rests.emplace_back(std::move(current));
} else {
- children[current.IndexRange].push_back(std::move(current));
+ if (byBegin.contains(current.IndexRange.Begin)) {
+ byBegin[current.IndexRange.Begin].push_back(current);
+ byBegin[current.IndexRange.End] = {};
+ } else {
+ rests.push_back(std::move(current));
+ }
}
}
- TVector<TVector<TNodeAndIndexRange>> childrenChains;
- for (auto it = children.begin(); it != children.end(); ++it) {
- if (!commonIndexRange) {
- commonIndexRange = it->first;
- childrenChains.emplace_back(std::move(it->second));
- continue;
- }
- if (commonIndexRange->Begin == it->first.Begin) {
- YQL_ENSURE(it->first.End > commonIndexRange->End);
- for (auto& asRest : childrenChains) {
- rests.insert(rests.end(), asRest.begin(), asRest.end());
+ TMap<size_t, TNodeAndIndexRange> builtRange;
+ for (auto it = byBegin.rbegin(); it != byBegin.rend(); ++it) {
+ size_t end = 0;
+ size_t indexRangeEnd = 0;
+ TVector<TExprNode::TPtr> results;
+ TVector<TExprNode::TPtr> currents;
+ auto flush = [&] () {
+ if (!currents) {
+ return;
}
- childrenChains.clear();
- childrenChains.push_back(std::move(it->second));
- commonIndexRange = it->first;
- continue;
- }
- if (commonIndexRange->End == it->first.Begin) {
- commonIndexRange->End = it->first.End;
- childrenChains.push_back(std::move(it->second));
- } else {
- rests.insert(rests.end(), it->second.begin(), it->second.end());
+ TExprNode::TPtr toAdd = MakeRangeAnd(range->Pos(), std::move(currents), ctx);
+ if (auto ptr = builtRange.FindPtr(end)) {
+ toAdd = MakeRangeAnd(range->Pos(), { toAdd, ptr->Node }, ctx);
+ indexRangeEnd = std::max(indexRangeEnd, ptr->IndexRange.End);
+ }
+ results.push_back(toAdd);
+
+ indexRangeEnd = std::max(end, indexRangeEnd);
+ currents = {};
+ };
+ for (auto node : it->second) {
+ if (end != node.IndexRange.End) {
+ flush();
+ end = node.IndexRange.End;
+ }
+ currents.push_back(node.Node);
+ }
+ if (currents) {
+ flush();
+ }
+ if (results) {
+ auto& built = builtRange[it->first];
+ built.Node = MakeRangeAnd(range->Pos(), std::move(results), ctx);
+ built.IndexRange.Begin = it->first;
+ built.IndexRange.End = indexRangeEnd;
}
}
- for (auto& chain : childrenChains) {
- TExprNodeList chainNodes;
- for (auto& entry : chain) {
- chainNodes.push_back(entry.Node);
- }
- rebuilt.push_back(MakeRangeAnd(range->Pos(), std::move(chainNodes), ctx));
+ if (builtRange) {
+ auto& first = *builtRange.begin();
+ rebuilt.push_back(first.second.Node);
+ commonIndexRange = first.second.IndexRange;
}
if (!rests.empty()) {
@@ -1264,7 +1288,7 @@ TExprNode::TPtr DoRebuildRangeForIndexKeys(const TStructExprType& rowType, const
}
}
- TExprNode::TPtr result = ctx.ChangeChildren(*range, std::move(rebuilt));
+ TExprNode::TPtr result = rebuilt.size() == 1 ? rebuilt[0] : ctx.ChangeChildren(*range, std::move(rebuilt));
if (commonIndexRange) {
resultIndexRange = *commonIndexRange;
} else {
@@ -2018,6 +2042,7 @@ bool TPredicateRangeExtractor::Prepare(const TExprNode::TPtr& filterLambdaNode,
}
return node;
}, ctx, settings);
+
return status == IGraphTransformer::TStatus::Ok;
}