aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2025-04-21 16:09:23 +0300
committerGitHub <noreply@github.com>2025-04-21 13:09:23 +0000
commit7aa367add8eb8304aa00b2291090ed93587f818a (patch)
treeeed55cdc6fe26ddac6d73ba020a227bb12ecbf2e
parentef35ffef726ed786e6145e3172dae71aeaddceac (diff)
downloadydb-7aa367add8eb8304aa00b2291090ed93587f818a.tar.gz
Fix empty upsert with sink (#17482)
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp15
-rw-r--r--ydb/core/kqp/ut/query/kqp_query_ut.cpp54
2 files changed, 66 insertions, 3 deletions
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index 9c29d190d2a..f28d0e3e6df 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -1747,7 +1747,6 @@ public:
void Handle(TEvBufferWrite::TPtr& ev) {
Counters->ForwardActorWritesLatencyHistogram->Collect((TInstant::Now() - ev->Get()->SendTime).MicroSeconds());
-
TWriteToken token;
if (!ev->Get()->Token) {
AFL_ENSURE(ev->Get()->Settings);
@@ -2967,6 +2966,12 @@ private:
void Handle(TEvBufferWriteResult::TPtr& result) {
CA_LOG_D("TKqpForwardWriteActor recieve EvBufferWriteResult from " << BufferActorId);
+
+ WriteToken = result->Get()->Token;
+ OnFlushed();
+ }
+
+ void OnFlushed() {
InFlight = false;
EgressStats.Bytes += DataSize;
@@ -2975,8 +2980,6 @@ private:
EgressStats.Resume();
Counters->ForwardActorWritesSizeHistogram->Collect(DataSize);
-
- WriteToken = result->Get()->Token;
DataSize = 0;
if (Closed) {
@@ -3032,6 +3035,12 @@ private:
ev->SendTime = TInstant::Now();
+ if (ev->Data->IsEmpty() && ev->Close && WriteToken.IsEmpty()) {
+ // Nothing was written
+ OnFlushed();
+ return;
+ }
+
CA_LOG_D("Send data=" << DataSize << ", closed=" << Closed << ", bufferActorId=" << BufferActorId);
AFL_ENSURE(Send(BufferActorId, ev.release()));
}
diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
index 1f2d3be087c..4b0212c9a79 100644
--- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp
+++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp
@@ -2580,6 +2580,60 @@ Y_UNIT_TEST_SUITE(KqpQuery) {
}
}
+ Y_UNIT_TEST_TWIN(UpdateThenDelete, UseSink) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig)
+ .SetWithSampleTables(true);
+
+ TKikimrRunner kikimr(settings);
+ auto client = kikimr.GetTableClient();
+
+ {
+ const TString query = R"(
+ DECLARE $data AS List<Struct<
+ Key: String,
+ Value: String
+ >>;
+
+ UPSERT INTO KeyValue2 SELECT * FROM AS_TABLE($data);
+
+ DELETE FROM KeyValue2 ON SELECT * FROM KeyValue2 AS a LEFT ONLY JOIN AS_TABLE($data) AS b USING (Key);
+ )";
+
+ TTypeBuilder builder;
+ builder
+ .BeginStruct()
+ .AddMember("Key", TTypeBuilder().Primitive(NYdb::EPrimitiveType::String).Build())
+ .AddMember("Value", TTypeBuilder().Primitive(NYdb::EPrimitiveType::String).Build())
+ .EndStruct();
+
+ auto params = client.GetParamsBuilder()
+ .AddParam("$data")
+ .EmptyList(builder.Build())
+ .Build()
+ .Build();
+
+ auto session = client.CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx(), std::move(params)).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ }
+ {
+ const TString query = R"(
+ SELECT * FROM KeyValue2;
+ )";
+
+ auto session = client.CreateSession().GetValueSync().GetSession();
+ auto result = session.ExecuteDataQuery(query, NYdb::NTable::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ Cerr << FormatResultSetYson(result.GetResultSet(0)) << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(0, result.GetResultSet(0).RowsCount());
+ }
+ }
+
}
} // namespace NKqp